calloop/sources/ping/
eventfd.rs

1//! Eventfd based implementation of the ping event source.
2//!
3//! # Implementation notes
4//!
5//! The eventfd is a much lighter signalling mechanism provided by the Linux
6//! kernel. Rather than write an arbitrary sequence of bytes, it only has a
7//! 64-bit counter.
8//!
9//! To avoid closing the eventfd early, we wrap it in a RAII-style closer
10//! `CloseOnDrop` in `make_ping()`. When all the senders are dropped, another
11//! wrapper `FlagOnDrop` handles signalling this to the event source, which is
12//! the sole owner of the eventfd itself. The senders have weak references to
13//! the eventfd, and if the source is dropped before the senders, they will
14//! simply not do anything (except log a message).
15//!
16//! To differentiate between regular ping events and close ping events, we add 2
17//! to the counter for regular events and 1 for close events. In the source we
18//! can then check the LSB and if it's set, we know it was a close event. This
19//! only works if a close event never fires more than once.
20
21use std::os::unix::io::{AsFd, BorrowedFd, OwnedFd};
22use std::sync::Arc;
23
24use rustix::event::{eventfd, EventfdFlags};
25use rustix::io::{read, write, Errno};
26use tracing::warn;
27
28use super::PingError;
29use crate::{
30    generic::Generic, EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory,
31};
32
33// These are not bitfields! They are increments to add to the eventfd counter.
34// Since the fd can only be closed once, we can effectively use the
35// INCREMENT_CLOSE value as a bitmask when checking.
36const INCREMENT_PING: u64 = 0x2;
37const INCREMENT_CLOSE: u64 = 0x1;
38
39#[inline]
40pub fn make_ping() -> std::io::Result<(Ping, PingSource)> {
41    let read = eventfd(0, EventfdFlags::CLOEXEC | EventfdFlags::NONBLOCK)?;
42
43    // We only have one fd for the eventfd. If the sending end closes it when
44    // all copies are dropped, the receiving end will be closed as well. We need
45    // to make sure the fd is not closed until all holders of it have dropped
46    // it.
47
48    let fd = Arc::new(read);
49
50    let ping = Ping {
51        event: Arc::new(FlagOnDrop(Arc::clone(&fd))),
52    };
53
54    let source = PingSource {
55        event: Generic::new(ArcAsFd(fd), Interest::READ, Mode::Level),
56    };
57
58    Ok((ping, source))
59}
60
61// Helper functions for the event source IO.
62
63#[inline]
64fn send_ping(fd: BorrowedFd<'_>, count: u64) -> std::io::Result<()> {
65    assert!(count > 0);
66    match write(fd, &count.to_ne_bytes()) {
67        // The write succeeded, the ping will wake up the loop.
68        Ok(_) => Ok(()),
69
70        // The counter hit its cap, which means previous calls to write() will
71        // wake up the loop.
72        Err(Errno::AGAIN) => Ok(()),
73
74        // Anything else is a real error.
75        Err(e) => Err(e.into()),
76    }
77}
78
79#[inline]
80fn drain_ping(fd: BorrowedFd<'_>) -> std::io::Result<u64> {
81    // The eventfd counter is effectively a u64.
82    const NBYTES: usize = 8;
83    let mut buf = [0u8; NBYTES];
84
85    match read(fd, &mut buf) {
86        // Reading from an eventfd should only ever produce 8 bytes. No looping
87        // is required.
88        Ok(NBYTES) => Ok(u64::from_ne_bytes(buf)),
89
90        Ok(_) => unreachable!(),
91
92        // Any other error can be propagated.
93        Err(e) => Err(e.into()),
94    }
95}
96
97// Rust 1.64.0 adds an `AsFd` implementation for `Arc`, so this won't be needed
98#[derive(Debug)]
99struct ArcAsFd(Arc<OwnedFd>);
100
101impl AsFd for ArcAsFd {
102    fn as_fd(&self) -> BorrowedFd {
103        self.0.as_fd()
104    }
105}
106
107// The event source is simply a generic source with one of the eventfds.
108#[derive(Debug)]
109pub struct PingSource {
110    event: Generic<ArcAsFd>,
111}
112
113impl EventSource for PingSource {
114    type Event = ();
115    type Metadata = ();
116    type Ret = ();
117    type Error = PingError;
118
119    fn process_events<C>(
120        &mut self,
121        readiness: Readiness,
122        token: Token,
123        mut callback: C,
124    ) -> Result<PostAction, Self::Error>
125    where
126        C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
127    {
128        self.event
129            .process_events(readiness, token, |_, fd| {
130                let counter = drain_ping(fd.as_fd())?;
131
132                // If the LSB is set, it means we were closed. If anything else
133                // is also set, it means we were pinged. The two are not
134                // mutually exclusive.
135                let close = (counter & INCREMENT_CLOSE) != 0;
136                let ping = (counter & (u64::MAX - 1)) != 0;
137
138                if ping {
139                    callback((), &mut ());
140                }
141
142                if close {
143                    Ok(PostAction::Remove)
144                } else {
145                    Ok(PostAction::Continue)
146                }
147            })
148            .map_err(|e| PingError(e.into()))
149    }
150
151    fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
152        self.event.register(poll, token_factory)
153    }
154
155    fn reregister(
156        &mut self,
157        poll: &mut Poll,
158        token_factory: &mut TokenFactory,
159    ) -> crate::Result<()> {
160        self.event.reregister(poll, token_factory)
161    }
162
163    fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
164        self.event.unregister(poll)
165    }
166}
167
168#[derive(Clone, Debug)]
169pub struct Ping {
170    // This is an Arc because it's potentially shared with clones. The last one
171    // dropped needs to signal to the event source via the eventfd.
172    event: Arc<FlagOnDrop>,
173}
174
175impl Ping {
176    /// Send a ping to the `PingSource`.
177    pub fn ping(&self) {
178        if let Err(e) = send_ping(self.event.0.as_fd(), INCREMENT_PING) {
179            warn!("Failed to write a ping: {e:?}");
180        }
181    }
182}
183
184/// This manages signalling to the PingSource when it's dropped. There should
185/// only ever be one of these per PingSource.
186#[derive(Debug)]
187struct FlagOnDrop(Arc<OwnedFd>);
188
189impl Drop for FlagOnDrop {
190    fn drop(&mut self) {
191        if let Err(e) = send_ping(self.0.as_fd(), INCREMENT_CLOSE) {
192            warn!("Failed to send close ping: {e:?}");
193        }
194    }
195}