calloop/sys.rs
1use std::{
2 cell::RefCell,
3 collections::HashMap,
4 rc::Rc,
5 sync::Arc,
6 time::{Duration, Instant},
7};
8
9#[cfg(unix)]
10use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd as Borrowed, RawFd as Raw};
11
12#[cfg(windows)]
13use std::os::windows::io::{AsRawSocket, AsSocket, BorrowedSocket as Borrowed, RawSocket as Raw};
14
15use polling::{Event, Events, PollMode, Poller};
16
17use crate::sources::timer::TimerWheel;
18use crate::token::TokenInner;
19use crate::RegistrationToken;
20
21/// Possible modes for registering a file descriptor
22#[derive(Copy, Clone, Debug)]
23pub enum Mode {
24 /// Single event generation
25 ///
26 /// This FD will be disabled as soon as it has generated one event.
27 ///
28 /// The user will need to use `LoopHandle::update()` to re-enable it if
29 /// desired.
30 OneShot,
31
32 /// Level-triggering
33 ///
34 /// This FD will report events on every poll as long as the requested interests
35 /// are available.
36 Level,
37
38 /// Edge-triggering
39 ///
40 /// This FD will report events only when it *gains* one of the requested interests.
41 /// it must thus be fully processed before it'll generate events again.
42 ///
43 /// This mode is not supported on certain platforms, and an error will be returned
44 /// if it is used.
45 ///
46 /// ## Supported Platforms
47 ///
48 /// As of the time of writing, the platforms that support edge triggered polling are
49 /// as follows:
50 ///
51 /// - Linux/Android
52 /// - macOS/iOS/tvOS/watchOS
53 /// - FreeBSD/OpenBSD/NetBSD/DragonflyBSD
54 Edge,
55}
56
57/// Interest to register regarding the file descriptor
58#[derive(Copy, Clone, Debug)]
59pub struct Interest {
60 /// Wait for the FD to be readable
61 pub readable: bool,
62
63 /// Wait for the FD to be writable
64 pub writable: bool,
65}
66
67impl Interest {
68 /// Shorthand for empty interest
69 pub const EMPTY: Interest = Interest {
70 readable: false,
71 writable: false,
72 };
73
74 /// Shorthand for read interest
75 pub const READ: Interest = Interest {
76 readable: true,
77 writable: false,
78 };
79
80 /// Shorthand for write interest
81 pub const WRITE: Interest = Interest {
82 readable: false,
83 writable: true,
84 };
85
86 /// Shorthand for read and write interest
87 pub const BOTH: Interest = Interest {
88 readable: true,
89 writable: true,
90 };
91}
92
93/// Readiness for a file descriptor notification
94#[derive(Copy, Clone, Debug)]
95pub struct Readiness {
96 /// Is the FD readable
97 pub readable: bool,
98
99 /// Is the FD writable
100 pub writable: bool,
101
102 /// Is the FD in an error state
103 pub error: bool,
104}
105
106impl Readiness {
107 /// Shorthand for empty readiness
108 pub const EMPTY: Readiness = Readiness {
109 readable: false,
110 writable: false,
111 error: false,
112 };
113}
114
115#[derive(Debug)]
116pub(crate) struct PollEvent {
117 pub(crate) readiness: Readiness,
118 pub(crate) token: Token,
119}
120
121/// Factory for creating tokens in your registrations
122///
123/// When composing event sources, each sub-source needs to
124/// have its own token to identify itself. This factory is
125/// provided to produce such unique tokens.
126
127#[derive(Debug)]
128pub struct TokenFactory {
129 next_token: TokenInner,
130}
131
132impl TokenFactory {
133 pub(crate) fn new(token: TokenInner) -> TokenFactory {
134 TokenFactory {
135 next_token: token.forget_sub_id(),
136 }
137 }
138
139 /// Get the "raw" registration token of this TokenFactory
140 pub(crate) fn registration_token(&self) -> RegistrationToken {
141 RegistrationToken::new(self.next_token.forget_sub_id())
142 }
143
144 /// Produce a new unique token
145 pub fn token(&mut self) -> Token {
146 let token = self.next_token;
147 self.next_token = token.increment_sub_id();
148 Token { inner: token }
149 }
150}
151
152/// A token (for implementation of the [`EventSource`](crate::EventSource) trait)
153///
154/// This token is produced by the [`TokenFactory`] and is used when calling the
155/// [`EventSource`](crate::EventSource) implementations to process event, in order
156/// to identify which sub-source produced them.
157///
158/// You should forward it to the [`Poll`] when registering your file descriptors.
159#[derive(Clone, Copy, Debug, PartialEq, Eq)]
160pub struct Token {
161 pub(crate) inner: TokenInner,
162}
163
164/// The polling system
165///
166/// This type represents the polling system of calloop, on which you
167/// can register your file descriptors. This interface is only accessible in
168/// implementations of the [`EventSource`](crate::EventSource) trait.
169///
170/// You only need to interact with this type if you are implementing your
171/// own event sources, while implementing the [`EventSource`](crate::EventSource) trait.
172/// And even in this case, you can often just use the [`Generic`](crate::generic::Generic) event
173/// source and delegate the implementations to it.
174pub struct Poll {
175 /// The handle to wepoll/epoll/kqueue/... used to poll for events.
176 pub(crate) poller: Arc<Poller>,
177
178 /// The buffer of events returned by the poller.
179 events: RefCell<Events>,
180
181 /// The sources registered as level triggered.
182 ///
183 /// Some platforms that `polling` supports do not support level-triggered events. As of the time
184 /// of writing, this only includes Solaris and illumos. To work around this, we emulate level
185 /// triggered events by keeping this map of file descriptors.
186 ///
187 /// One can emulate level triggered events on top of oneshot events by just re-registering the
188 /// file descriptor every time it is polled. However, this is not ideal, as it requires a
189 /// system call every time. It's better to use the intergrated system, if available.
190 level_triggered: Option<RefCell<HashMap<usize, (Raw, polling::Event)>>>,
191
192 pub(crate) timers: Rc<RefCell<TimerWheel>>,
193}
194
195impl std::fmt::Debug for Poll {
196 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
197 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
198 f.write_str("Poll { ... }")
199 }
200}
201
202impl Poll {
203 pub(crate) fn new() -> crate::Result<Poll> {
204 Self::new_inner(false)
205 }
206
207 fn new_inner(force_fallback_lt: bool) -> crate::Result<Poll> {
208 let poller = Poller::new()?;
209 let level_triggered = if poller.supports_level() && !force_fallback_lt {
210 None
211 } else {
212 Some(RefCell::new(HashMap::new()))
213 };
214
215 Ok(Poll {
216 poller: Arc::new(poller),
217 events: RefCell::new(Events::new()),
218 timers: Rc::new(RefCell::new(TimerWheel::new())),
219 level_triggered,
220 })
221 }
222
223 pub(crate) fn poll(&self, mut timeout: Option<Duration>) -> crate::Result<Vec<PollEvent>> {
224 // Adjust the timeout for the timers.
225 let next_timeout = self
226 .timers
227 .borrow()
228 .next_deadline()
229 .map(|deadline| deadline.saturating_duration_since(Instant::now()));
230 timeout = match (timeout, next_timeout) {
231 (Some(timeout), Some(next_timeout)) => Some(timeout.min(next_timeout)),
232 _ => timeout.or(next_timeout),
233 };
234
235 let mut events = self.events.borrow_mut();
236 events.clear();
237 self.poller.wait(&mut events, timeout)?;
238
239 // Convert `polling` events to `calloop` events.
240 let level_triggered = self.level_triggered.as_ref().map(RefCell::borrow);
241 let mut poll_events = events
242 .iter()
243 .map(|ev| {
244 // If we need to emulate level-triggered events...
245 if let Some(level_triggered) = level_triggered.as_ref() {
246 // ...and this event is from a level-triggered source...
247 if let Some((source, interest)) = level_triggered.get(&ev.key) {
248 // ...then we need to re-register the source.
249 // SAFETY: The source is valid.
250 self.poller
251 .modify(unsafe { Borrowed::borrow_raw(*source) }, *interest)?;
252 }
253 }
254
255 Ok(PollEvent {
256 readiness: Readiness {
257 readable: ev.readable,
258 writable: ev.writable,
259 error: false,
260 },
261 token: Token {
262 inner: TokenInner::from(ev.key),
263 },
264 })
265 })
266 .collect::<std::io::Result<Vec<_>>>()?;
267
268 drop(events);
269
270 let now = Instant::now();
271 let mut timers = self.timers.borrow_mut();
272 while let Some((_, token)) = timers.next_expired(now) {
273 poll_events.push(PollEvent {
274 readiness: Readiness {
275 readable: true,
276 writable: false,
277 error: false,
278 },
279 token,
280 });
281 }
282
283 Ok(poll_events)
284 }
285
286 /// Register a new file descriptor for polling
287 ///
288 /// The file descriptor will be registered with given interest,
289 /// mode and token. This function will fail if given a
290 /// bad file descriptor or if the provided file descriptor is already
291 /// registered.
292 ///
293 /// # Safety
294 ///
295 /// The registered source must not be dropped before it is unregistered.
296 ///
297 /// # Leaking tokens
298 ///
299 /// If your event source is dropped without being unregistered, the token
300 /// passed in here will remain on the heap and continue to be used by the
301 /// polling system even though no event source will match it.
302 pub unsafe fn register(
303 &self,
304 #[cfg(unix)] fd: impl AsFd,
305 #[cfg(windows)] fd: impl AsSocket,
306 interest: Interest,
307 mode: Mode,
308 token: Token,
309 ) -> crate::Result<()> {
310 let raw = {
311 #[cfg(unix)]
312 {
313 fd.as_fd().as_raw_fd()
314 }
315
316 #[cfg(windows)]
317 {
318 fd.as_socket().as_raw_socket()
319 }
320 };
321
322 let ev = cvt_interest(interest, token);
323
324 // SAFETY: See invariant on function.
325 unsafe {
326 self.poller
327 .add_with_mode(raw, ev, cvt_mode(mode, self.poller.supports_level()))?;
328 }
329
330 // If this is level triggered and we're emulating level triggered mode...
331 if let (Mode::Level, Some(level_triggered)) = (mode, self.level_triggered.as_ref()) {
332 // ...then we need to keep track of the source.
333 let mut level_triggered = level_triggered.borrow_mut();
334 level_triggered.insert(ev.key, (raw, ev));
335 }
336
337 Ok(())
338 }
339
340 /// Update the registration for a file descriptor
341 ///
342 /// This allows you to change the interest, mode or token of a file
343 /// descriptor. Fails if the provided fd is not currently registered.
344 ///
345 /// See note on [`register()`](Self::register()) regarding leaking.
346 pub fn reregister(
347 &self,
348 #[cfg(unix)] fd: impl AsFd,
349 #[cfg(windows)] fd: impl AsSocket,
350 interest: Interest,
351 mode: Mode,
352 token: Token,
353 ) -> crate::Result<()> {
354 let (borrowed, raw) = {
355 #[cfg(unix)]
356 {
357 (fd.as_fd(), fd.as_fd().as_raw_fd())
358 }
359
360 #[cfg(windows)]
361 {
362 (fd.as_socket(), fd.as_socket().as_raw_socket())
363 }
364 };
365
366 let ev = cvt_interest(interest, token);
367 self.poller
368 .modify_with_mode(borrowed, ev, cvt_mode(mode, self.poller.supports_level()))?;
369
370 // If this is level triggered and we're emulating level triggered mode...
371 if let (Mode::Level, Some(level_triggered)) = (mode, self.level_triggered.as_ref()) {
372 // ...then we need to keep track of the source.
373 let mut level_triggered = level_triggered.borrow_mut();
374 level_triggered.insert(ev.key, (raw, ev));
375 }
376
377 Ok(())
378 }
379
380 /// Unregister a file descriptor
381 ///
382 /// This file descriptor will no longer generate events. Fails if the
383 /// provided file descriptor is not currently registered.
384 pub fn unregister(
385 &self,
386 #[cfg(unix)] fd: impl AsFd,
387 #[cfg(windows)] fd: impl AsSocket,
388 ) -> crate::Result<()> {
389 let (borrowed, raw) = {
390 #[cfg(unix)]
391 {
392 (fd.as_fd(), fd.as_fd().as_raw_fd())
393 }
394
395 #[cfg(windows)]
396 {
397 (fd.as_socket(), fd.as_socket().as_raw_socket())
398 }
399 };
400 self.poller.delete(borrowed)?;
401
402 if let Some(level_triggered) = self.level_triggered.as_ref() {
403 let mut level_triggered = level_triggered.borrow_mut();
404 level_triggered.retain(|_, (source, _)| *source != raw);
405 }
406
407 Ok(())
408 }
409
410 /// Get a thread-safe handle which can be used to wake up the `Poll`.
411 pub(crate) fn notifier(&self) -> Notifier {
412 Notifier(self.poller.clone())
413 }
414
415 /// Get a reference to the poller.
416 pub(crate) fn poller(&self) -> &Arc<Poller> {
417 &self.poller
418 }
419}
420
421/// Thread-safe handle which can be used to wake up the `Poll`.
422#[derive(Clone)]
423pub(crate) struct Notifier(Arc<Poller>);
424
425impl Notifier {
426 pub(crate) fn notify(&self) -> crate::Result<()> {
427 self.0.notify()?;
428
429 Ok(())
430 }
431}
432
433fn cvt_interest(interest: Interest, tok: Token) -> Event {
434 let mut event = Event::none(tok.inner.into());
435 event.readable = interest.readable;
436 event.writable = interest.writable;
437 event
438}
439
440fn cvt_mode(mode: Mode, supports_other_modes: bool) -> PollMode {
441 if !supports_other_modes {
442 return PollMode::Oneshot;
443 }
444
445 match mode {
446 Mode::Edge => PollMode::Edge,
447 Mode::Level => PollMode::Level,
448 Mode::OneShot => PollMode::Oneshot,
449 }
450}