calloop/
sys.rs

1use std::{
2    cell::RefCell,
3    collections::HashMap,
4    rc::Rc,
5    sync::Arc,
6    time::{Duration, Instant},
7};
8
9#[cfg(unix)]
10use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd as Borrowed, RawFd as Raw};
11
12#[cfg(windows)]
13use std::os::windows::io::{AsRawSocket, AsSocket, BorrowedSocket as Borrowed, RawSocket as Raw};
14
15use polling::{Event, Events, PollMode, Poller};
16
17use crate::sources::timer::TimerWheel;
18use crate::token::TokenInner;
19use crate::RegistrationToken;
20
21/// Possible modes for registering a file descriptor
22#[derive(Copy, Clone, Debug)]
23pub enum Mode {
24    /// Single event generation
25    ///
26    /// This FD will be disabled as soon as it has generated one event.
27    ///
28    /// The user will need to use `LoopHandle::update()` to re-enable it if
29    /// desired.
30    OneShot,
31
32    /// Level-triggering
33    ///
34    /// This FD will report events on every poll as long as the requested interests
35    /// are available.
36    Level,
37
38    /// Edge-triggering
39    ///
40    /// This FD will report events only when it *gains* one of the requested interests.
41    /// it must thus be fully processed before it'll generate events again.
42    ///
43    /// This mode is not supported on certain platforms, and an error will be returned
44    /// if it is used.
45    ///
46    /// ## Supported Platforms
47    ///
48    /// As of the time of writing, the platforms that support edge triggered polling are
49    /// as follows:
50    ///
51    /// - Linux/Android
52    /// - macOS/iOS/tvOS/watchOS
53    /// - FreeBSD/OpenBSD/NetBSD/DragonflyBSD
54    Edge,
55}
56
57/// Interest to register regarding the file descriptor
58#[derive(Copy, Clone, Debug)]
59pub struct Interest {
60    /// Wait for the FD to be readable
61    pub readable: bool,
62
63    /// Wait for the FD to be writable
64    pub writable: bool,
65}
66
67impl Interest {
68    /// Shorthand for empty interest
69    pub const EMPTY: Interest = Interest {
70        readable: false,
71        writable: false,
72    };
73
74    /// Shorthand for read interest
75    pub const READ: Interest = Interest {
76        readable: true,
77        writable: false,
78    };
79
80    /// Shorthand for write interest
81    pub const WRITE: Interest = Interest {
82        readable: false,
83        writable: true,
84    };
85
86    /// Shorthand for read and write interest
87    pub const BOTH: Interest = Interest {
88        readable: true,
89        writable: true,
90    };
91}
92
93/// Readiness for a file descriptor notification
94#[derive(Copy, Clone, Debug)]
95pub struct Readiness {
96    /// Is the FD readable
97    pub readable: bool,
98
99    /// Is the FD writable
100    pub writable: bool,
101
102    /// Is the FD in an error state
103    pub error: bool,
104}
105
106impl Readiness {
107    /// Shorthand for empty readiness
108    pub const EMPTY: Readiness = Readiness {
109        readable: false,
110        writable: false,
111        error: false,
112    };
113}
114
115#[derive(Debug)]
116pub(crate) struct PollEvent {
117    pub(crate) readiness: Readiness,
118    pub(crate) token: Token,
119}
120
121/// Factory for creating tokens in your registrations
122///
123/// When composing event sources, each sub-source needs to
124/// have its own token to identify itself. This factory is
125/// provided to produce such unique tokens.
126
127#[derive(Debug)]
128pub struct TokenFactory {
129    next_token: TokenInner,
130}
131
132impl TokenFactory {
133    pub(crate) fn new(token: TokenInner) -> TokenFactory {
134        TokenFactory {
135            next_token: token.forget_sub_id(),
136        }
137    }
138
139    /// Get the "raw" registration token of this TokenFactory
140    pub(crate) fn registration_token(&self) -> RegistrationToken {
141        RegistrationToken::new(self.next_token.forget_sub_id())
142    }
143
144    /// Produce a new unique token
145    pub fn token(&mut self) -> Token {
146        let token = self.next_token;
147        self.next_token = token.increment_sub_id();
148        Token { inner: token }
149    }
150}
151
152/// A token (for implementation of the [`EventSource`](crate::EventSource) trait)
153///
154/// This token is produced by the [`TokenFactory`] and is used when calling the
155/// [`EventSource`](crate::EventSource) implementations to process event, in order
156/// to identify which sub-source produced them.
157///
158/// You should forward it to the [`Poll`] when registering your file descriptors.
159#[derive(Clone, Copy, Debug, PartialEq, Eq)]
160pub struct Token {
161    pub(crate) inner: TokenInner,
162}
163
164/// The polling system
165///
166/// This type represents the polling system of calloop, on which you
167/// can register your file descriptors. This interface is only accessible in
168/// implementations of the [`EventSource`](crate::EventSource) trait.
169///
170/// You only need to interact with this type if you are implementing your
171/// own event sources, while implementing the [`EventSource`](crate::EventSource) trait.
172/// And even in this case, you can often just use the [`Generic`](crate::generic::Generic) event
173/// source and delegate the implementations to it.
174pub struct Poll {
175    /// The handle to wepoll/epoll/kqueue/... used to poll for events.
176    pub(crate) poller: Arc<Poller>,
177
178    /// The buffer of events returned by the poller.
179    events: RefCell<Events>,
180
181    /// The sources registered as level triggered.
182    ///
183    /// Some platforms that `polling` supports do not support level-triggered events. As of the time
184    /// of writing, this only includes Solaris and illumos. To work around this, we emulate level
185    /// triggered events by keeping this map of file descriptors.
186    ///
187    /// One can emulate level triggered events on top of oneshot events by just re-registering the
188    /// file descriptor every time it is polled. However, this is not ideal, as it requires a
189    /// system call every time. It's better to use the intergrated system, if available.
190    level_triggered: Option<RefCell<HashMap<usize, (Raw, polling::Event)>>>,
191
192    pub(crate) timers: Rc<RefCell<TimerWheel>>,
193}
194
195impl std::fmt::Debug for Poll {
196    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
197    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
198        f.write_str("Poll { ... }")
199    }
200}
201
202impl Poll {
203    pub(crate) fn new() -> crate::Result<Poll> {
204        Self::new_inner(false)
205    }
206
207    fn new_inner(force_fallback_lt: bool) -> crate::Result<Poll> {
208        let poller = Poller::new()?;
209        let level_triggered = if poller.supports_level() && !force_fallback_lt {
210            None
211        } else {
212            Some(RefCell::new(HashMap::new()))
213        };
214
215        Ok(Poll {
216            poller: Arc::new(poller),
217            events: RefCell::new(Events::new()),
218            timers: Rc::new(RefCell::new(TimerWheel::new())),
219            level_triggered,
220        })
221    }
222
223    pub(crate) fn poll(&self, mut timeout: Option<Duration>) -> crate::Result<Vec<PollEvent>> {
224        // Adjust the timeout for the timers.
225        let next_timeout = self
226            .timers
227            .borrow()
228            .next_deadline()
229            .map(|deadline| deadline.saturating_duration_since(Instant::now()));
230        timeout = match (timeout, next_timeout) {
231            (Some(timeout), Some(next_timeout)) => Some(timeout.min(next_timeout)),
232            _ => timeout.or(next_timeout),
233        };
234
235        let mut events = self.events.borrow_mut();
236        events.clear();
237        self.poller.wait(&mut events, timeout)?;
238
239        // Convert `polling` events to `calloop` events.
240        let level_triggered = self.level_triggered.as_ref().map(RefCell::borrow);
241        let mut poll_events = events
242            .iter()
243            .map(|ev| {
244                // If we need to emulate level-triggered events...
245                if let Some(level_triggered) = level_triggered.as_ref() {
246                    // ...and this event is from a level-triggered source...
247                    if let Some((source, interest)) = level_triggered.get(&ev.key) {
248                        // ...then we need to re-register the source.
249                        // SAFETY: The source is valid.
250                        self.poller
251                            .modify(unsafe { Borrowed::borrow_raw(*source) }, *interest)?;
252                    }
253                }
254
255                Ok(PollEvent {
256                    readiness: Readiness {
257                        readable: ev.readable,
258                        writable: ev.writable,
259                        error: false,
260                    },
261                    token: Token {
262                        inner: TokenInner::from(ev.key),
263                    },
264                })
265            })
266            .collect::<std::io::Result<Vec<_>>>()?;
267
268        drop(events);
269
270        let now = Instant::now();
271        let mut timers = self.timers.borrow_mut();
272        while let Some((_, token)) = timers.next_expired(now) {
273            poll_events.push(PollEvent {
274                readiness: Readiness {
275                    readable: true,
276                    writable: false,
277                    error: false,
278                },
279                token,
280            });
281        }
282
283        Ok(poll_events)
284    }
285
286    /// Register a new file descriptor for polling
287    ///
288    /// The file descriptor will be registered with given interest,
289    /// mode and token. This function will fail if given a
290    /// bad file descriptor or if the provided file descriptor is already
291    /// registered.
292    ///
293    /// # Safety
294    ///
295    /// The registered source must not be dropped before it is unregistered.
296    ///
297    /// # Leaking tokens
298    ///
299    /// If your event source is dropped without being unregistered, the token
300    /// passed in here will remain on the heap and continue to be used by the
301    /// polling system even though no event source will match it.
302    pub unsafe fn register(
303        &self,
304        #[cfg(unix)] fd: impl AsFd,
305        #[cfg(windows)] fd: impl AsSocket,
306        interest: Interest,
307        mode: Mode,
308        token: Token,
309    ) -> crate::Result<()> {
310        let raw = {
311            #[cfg(unix)]
312            {
313                fd.as_fd().as_raw_fd()
314            }
315
316            #[cfg(windows)]
317            {
318                fd.as_socket().as_raw_socket()
319            }
320        };
321
322        let ev = cvt_interest(interest, token);
323
324        // SAFETY: See invariant on function.
325        unsafe {
326            self.poller
327                .add_with_mode(raw, ev, cvt_mode(mode, self.poller.supports_level()))?;
328        }
329
330        // If this is level triggered and we're emulating level triggered mode...
331        if let (Mode::Level, Some(level_triggered)) = (mode, self.level_triggered.as_ref()) {
332            // ...then we need to keep track of the source.
333            let mut level_triggered = level_triggered.borrow_mut();
334            level_triggered.insert(ev.key, (raw, ev));
335        }
336
337        Ok(())
338    }
339
340    /// Update the registration for a file descriptor
341    ///
342    /// This allows you to change the interest, mode or token of a file
343    /// descriptor. Fails if the provided fd is not currently registered.
344    ///
345    /// See note on [`register()`](Self::register()) regarding leaking.
346    pub fn reregister(
347        &self,
348        #[cfg(unix)] fd: impl AsFd,
349        #[cfg(windows)] fd: impl AsSocket,
350        interest: Interest,
351        mode: Mode,
352        token: Token,
353    ) -> crate::Result<()> {
354        let (borrowed, raw) = {
355            #[cfg(unix)]
356            {
357                (fd.as_fd(), fd.as_fd().as_raw_fd())
358            }
359
360            #[cfg(windows)]
361            {
362                (fd.as_socket(), fd.as_socket().as_raw_socket())
363            }
364        };
365
366        let ev = cvt_interest(interest, token);
367        self.poller
368            .modify_with_mode(borrowed, ev, cvt_mode(mode, self.poller.supports_level()))?;
369
370        // If this is level triggered and we're emulating level triggered mode...
371        if let (Mode::Level, Some(level_triggered)) = (mode, self.level_triggered.as_ref()) {
372            // ...then we need to keep track of the source.
373            let mut level_triggered = level_triggered.borrow_mut();
374            level_triggered.insert(ev.key, (raw, ev));
375        }
376
377        Ok(())
378    }
379
380    /// Unregister a file descriptor
381    ///
382    /// This file descriptor will no longer generate events. Fails if the
383    /// provided file descriptor is not currently registered.
384    pub fn unregister(
385        &self,
386        #[cfg(unix)] fd: impl AsFd,
387        #[cfg(windows)] fd: impl AsSocket,
388    ) -> crate::Result<()> {
389        let (borrowed, raw) = {
390            #[cfg(unix)]
391            {
392                (fd.as_fd(), fd.as_fd().as_raw_fd())
393            }
394
395            #[cfg(windows)]
396            {
397                (fd.as_socket(), fd.as_socket().as_raw_socket())
398            }
399        };
400        self.poller.delete(borrowed)?;
401
402        if let Some(level_triggered) = self.level_triggered.as_ref() {
403            let mut level_triggered = level_triggered.borrow_mut();
404            level_triggered.retain(|_, (source, _)| *source != raw);
405        }
406
407        Ok(())
408    }
409
410    /// Get a thread-safe handle which can be used to wake up the `Poll`.
411    pub(crate) fn notifier(&self) -> Notifier {
412        Notifier(self.poller.clone())
413    }
414
415    /// Get a reference to the poller.
416    pub(crate) fn poller(&self) -> &Arc<Poller> {
417        &self.poller
418    }
419}
420
421/// Thread-safe handle which can be used to wake up the `Poll`.
422#[derive(Clone)]
423pub(crate) struct Notifier(Arc<Poller>);
424
425impl Notifier {
426    pub(crate) fn notify(&self) -> crate::Result<()> {
427        self.0.notify()?;
428
429        Ok(())
430    }
431}
432
433fn cvt_interest(interest: Interest, tok: Token) -> Event {
434    let mut event = Event::none(tok.inner.into());
435    event.readable = interest.readable;
436    event.writable = interest.writable;
437    event
438}
439
440fn cvt_mode(mode: Mode, supports_other_modes: bool) -> PollMode {
441    if !supports_other_modes {
442        return PollMode::Oneshot;
443    }
444
445    match mode {
446        Mode::Edge => PollMode::Edge,
447        Mode::Level => PollMode::Level,
448        Mode::OneShot => PollMode::Oneshot,
449    }
450}