1//! Eventfd based implementation of the ping event source.
2//!
3//! # Implementation notes
4//!
5//! The eventfd is a much lighter signalling mechanism provided by the Linux
6//! kernel. Rather than write an arbitrary sequence of bytes, it only has a
7//! 64-bit counter.
8//!
9//! To avoid closing the eventfd early, we wrap it in a RAII-style closer
10//! `CloseOnDrop` in `make_ping()`. When all the senders are dropped, another
11//! wrapper `FlagOnDrop` handles signalling this to the event source, which is
12//! the sole owner of the eventfd itself. The senders have weak references to
13//! the eventfd, and if the source is dropped before the senders, they will
14//! simply not do anything (except log a message).
15//!
16//! To differentiate between regular ping events and close ping events, we add 2
17//! to the counter for regular events and 1 for close events. In the source we
18//! can then check the LSB and if it's set, we know it was a close event. This
19//! only works if a close event never fires more than once.
2021use std::os::unix::io::{AsFd, BorrowedFd, OwnedFd};
22use std::sync::Arc;
2324use rustix::event::{eventfd, EventfdFlags};
25use rustix::io::{read, write, Errno};
26use tracing::warn;
2728use super::PingError;
29use crate::{
30 generic::Generic, EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory,
31};
3233// These are not bitfields! They are increments to add to the eventfd counter.
34// Since the fd can only be closed once, we can effectively use the
35// INCREMENT_CLOSE value as a bitmask when checking.
36const INCREMENT_PING: u64 = 0x2;
37const INCREMENT_CLOSE: u64 = 0x1;
3839#[inline]
40pub fn make_ping() -> std::io::Result<(Ping, PingSource)> {
41let read = eventfd(0, EventfdFlags::CLOEXEC | EventfdFlags::NONBLOCK)?;
4243// We only have one fd for the eventfd. If the sending end closes it when
44 // all copies are dropped, the receiving end will be closed as well. We need
45 // to make sure the fd is not closed until all holders of it have dropped
46 // it.
4748let fd = Arc::new(read);
4950let ping = Ping {
51 event: Arc::new(FlagOnDrop(Arc::clone(&fd))),
52 };
5354let source = PingSource {
55 event: Generic::new(ArcAsFd(fd), Interest::READ, Mode::Level),
56 };
5758Ok((ping, source))
59}
6061// Helper functions for the event source IO.
6263#[inline]
64fn send_ping(fd: BorrowedFd<'_>, count: u64) -> std::io::Result<()> {
65assert!(count > 0);
66match write(fd, &count.to_ne_bytes()) {
67// The write succeeded, the ping will wake up the loop.
68Ok(_) => Ok(()),
6970// The counter hit its cap, which means previous calls to write() will
71 // wake up the loop.
72Err(Errno::AGAIN) => Ok(()),
7374// Anything else is a real error.
75Err(e) => Err(e.into()),
76 }
77}
7879#[inline]
80fn drain_ping(fd: BorrowedFd<'_>) -> std::io::Result<u64> {
81// The eventfd counter is effectively a u64.
82const NBYTES: usize = 8;
83let mut buf = [0u8; NBYTES];
8485match read(fd, &mut buf) {
86// Reading from an eventfd should only ever produce 8 bytes. No looping
87 // is required.
88Ok(NBYTES) => Ok(u64::from_ne_bytes(buf)),
8990Ok(_) => unreachable!(),
9192// Any other error can be propagated.
93Err(e) => Err(e.into()),
94 }
95}
9697// Rust 1.64.0 adds an `AsFd` implementation for `Arc`, so this won't be needed
98#[derive(Debug)]
99struct ArcAsFd(Arc<OwnedFd>);
100101impl AsFd for ArcAsFd {
102fn as_fd(&self) -> BorrowedFd {
103self.0.as_fd()
104 }
105}
106107// The event source is simply a generic source with one of the eventfds.
108#[derive(Debug)]
109pub struct PingSource {
110 event: Generic<ArcAsFd>,
111}
112113impl EventSource for PingSource {
114type Event = ();
115type Metadata = ();
116type Ret = ();
117type Error = PingError;
118119fn process_events<C>(
120&mut self,
121 readiness: Readiness,
122 token: Token,
123mut callback: C,
124 ) -> Result<PostAction, Self::Error>
125where
126C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
127 {
128self.event
129 .process_events(readiness, token, |_, fd| {
130let counter = drain_ping(fd.as_fd())?;
131132// If the LSB is set, it means we were closed. If anything else
133 // is also set, it means we were pinged. The two are not
134 // mutually exclusive.
135let close = (counter & INCREMENT_CLOSE) != 0;
136let ping = (counter & (u64::MAX - 1)) != 0;
137138if ping {
139 callback((), &mut ());
140 }
141142if close {
143Ok(PostAction::Remove)
144 } else {
145Ok(PostAction::Continue)
146 }
147 })
148 .map_err(|e| PingError(e.into()))
149 }
150151fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
152self.event.register(poll, token_factory)
153 }
154155fn reregister(
156&mut self,
157 poll: &mut Poll,
158 token_factory: &mut TokenFactory,
159 ) -> crate::Result<()> {
160self.event.reregister(poll, token_factory)
161 }
162163fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
164self.event.unregister(poll)
165 }
166}
167168#[derive(Clone, Debug)]
169pub struct Ping {
170// This is an Arc because it's potentially shared with clones. The last one
171 // dropped needs to signal to the event source via the eventfd.
172event: Arc<FlagOnDrop>,
173}
174175impl Ping {
176/// Send a ping to the `PingSource`.
177pub fn ping(&self) {
178if let Err(e) = send_ping(self.event.0.as_fd(), INCREMENT_PING) {
179warn!("Failed to write a ping: {e:?}");
180 }
181 }
182}
183184/// This manages signalling to the PingSource when it's dropped. There should
185/// only ever be one of these per PingSource.
186#[derive(Debug)]
187struct FlagOnDrop(Arc<OwnedFd>);
188189impl Drop for FlagOnDrop {
190fn drop(&mut self) {
191if let Err(e) = send_ping(self.0.as_fd(), INCREMENT_CLOSE) {
192warn!("Failed to send close ping: {e:?}");
193 }
194 }
195}