calloop/sources/
generic.rs

1//! A generic event source wrapping an IO objects or file descriptor
2//!
3//! You can use this general purpose adapter around file-descriptor backed objects to
4//! insert into an [`EventLoop`](crate::EventLoop).
5//!
6//! The event generated by this [`Generic`] event source are the [`Readiness`](crate::Readiness)
7//! notification itself, and the monitored object is provided to your callback as the second
8//! argument.
9//!
10#![cfg_attr(unix, doc = "```")]
11#![cfg_attr(not(unix), doc = "```no_run")]
12//! # extern crate calloop;
13//! use calloop::{generic::Generic, Interest, Mode, PostAction};
14//!
15//! # fn main() {
16//! # let mut event_loop = calloop::EventLoop::<()>::try_new()
17//! #                .expect("Failed to initialize the event loop!");
18//! # let handle = event_loop.handle();
19//! # #[cfg(unix)]
20//! # let io_object = std::io::stdin();
21//! # #[cfg(windows)]
22//! # let io_object: std::net::TcpStream = panic!();
23//! handle.insert_source(
24//!     // wrap your IO object in a Generic, here we register for read readiness
25//!     // in level-triggering mode
26//!     Generic::new(io_object, Interest::READ, Mode::Level),
27//!     |readiness, io_object, shared_data| {
28//!         // The first argument of the callback is a Readiness
29//!         // The second is a &mut reference to your object
30//!
31//!         // your callback needs to return a Result<PostAction, std::io::Error>
32//!         // if it returns an error, the event loop will consider this event
33//!         // event source as erroring and report it to the user.
34//!         Ok(PostAction::Continue)
35//!     }
36//! );
37//! # }
38//! ```
39//!
40//! It can also help you implementing your own event sources: just have
41//! these `Generic<_>` as fields of your event source, and delegate the
42//! [`EventSource`](crate::EventSource) implementation to them.
43
44use polling::Poller;
45use std::{borrow, marker::PhantomData, ops, panic::AssertUnwindSafe, sync::Arc};
46
47#[cfg(unix)]
48use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd};
49#[cfg(windows)]
50use std::os::windows::io::{
51    AsRawSocket as AsRawFd, AsSocket as AsFd, BorrowedSocket as BorrowedFd,
52};
53
54use crate::{EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory};
55
56/// Wrapper to use a type implementing `AsRawFd` but not `AsFd` with `Generic`
57#[derive(Debug)]
58pub struct FdWrapper<T: AsRawFd>(T);
59
60impl<T: AsRawFd> FdWrapper<T> {
61    /// Wrap `inner` with an `AsFd` implementation.
62    ///
63    /// # Safety
64    /// This is safe if the `AsRawFd` implementation of `inner` always returns
65    /// a valid fd. This should usually be true for types implementing
66    /// `AsRawFd`. But this isn't guaranteed with `FdWrapper<RawFd>`.
67    pub unsafe fn new(inner: T) -> Self {
68        Self(inner)
69    }
70}
71
72impl<T: AsRawFd> ops::Deref for FdWrapper<T> {
73    type Target = T;
74
75    fn deref(&self) -> &Self::Target {
76        &self.0
77    }
78}
79
80impl<T: AsRawFd> ops::DerefMut for FdWrapper<T> {
81    fn deref_mut(&mut self) -> &mut Self::Target {
82        &mut self.0
83    }
84}
85
86impl<T: AsRawFd> AsFd for FdWrapper<T> {
87    #[cfg(unix)]
88    fn as_fd(&self) -> BorrowedFd {
89        unsafe { BorrowedFd::borrow_raw(self.0.as_raw_fd()) }
90    }
91
92    #[cfg(windows)]
93    fn as_socket(&self) -> BorrowedFd {
94        unsafe { BorrowedFd::borrow_raw(self.0.as_raw_socket()) }
95    }
96}
97
98/// A wrapper around a type that doesn't expose it mutably safely.
99///
100/// The [`EventSource`] trait's `Metadata` type demands mutable access to the inner I/O source.
101/// However, the inner polling source used by `calloop` keeps the handle-based equivalent of an
102/// immutable pointer to the underlying object's I/O handle. Therefore, if the inner source is
103/// dropped, this leaves behind a dangling pointer which immediately invokes undefined behavior
104/// on the next poll of the event loop.
105///
106/// In order to prevent this from happening, the [`Generic`] I/O source must not directly expose
107/// a mutable reference to the underlying handle. This type wraps around the underlying handle and
108/// easily allows users to take immutable (`&`) references to the type, but makes mutable (`&mut`)
109/// references unsafe to get. Therefore, it prevents the source from being moved out and dropped
110/// while it is still registered in the event loop.
111///
112/// [`EventSource`]: crate::EventSource
113#[derive(Debug)]
114pub struct NoIoDrop<T>(T);
115
116impl<T> NoIoDrop<T> {
117    /// Get a mutable reference.
118    ///
119    /// # Safety
120    ///
121    /// The inner type's I/O source must not be dropped.
122    pub unsafe fn get_mut(&mut self) -> &mut T {
123        &mut self.0
124    }
125}
126
127impl<T> AsRef<T> for NoIoDrop<T> {
128    fn as_ref(&self) -> &T {
129        &self.0
130    }
131}
132
133impl<T> borrow::Borrow<T> for NoIoDrop<T> {
134    fn borrow(&self) -> &T {
135        &self.0
136    }
137}
138
139impl<T> ops::Deref for NoIoDrop<T> {
140    type Target = T;
141
142    fn deref(&self) -> &Self::Target {
143        &self.0
144    }
145}
146
147impl<T: AsFd> AsFd for NoIoDrop<T> {
148    #[cfg(unix)]
149    fn as_fd(&self) -> BorrowedFd<'_> {
150        // SAFETY: The innter type is not mutated.
151        self.0.as_fd()
152    }
153
154    #[cfg(windows)]
155    fn as_socket(&self) -> BorrowedFd<'_> {
156        // SAFETY: The innter type is not mutated.
157        self.0.as_socket()
158    }
159}
160
161/// A generic event source wrapping a FD-backed type
162#[derive(Debug)]
163pub struct Generic<F: AsFd, E = std::io::Error> {
164    /// The wrapped FD-backed type.
165    ///
166    /// This must be deregistered before it is dropped.
167    file: Option<NoIoDrop<F>>,
168    /// The programmed interest
169    pub interest: Interest,
170    /// The programmed mode
171    pub mode: Mode,
172
173    /// Back-reference to the poller.
174    ///
175    /// This is needed to drop the original file.
176    poller: Option<Arc<Poller>>,
177
178    // This token is used by the event loop logic to look up this source when an
179    // event occurs.
180    token: Option<Token>,
181
182    // This allows us to make the associated error and return types generic.
183    _error_type: PhantomData<AssertUnwindSafe<E>>,
184}
185
186impl<F: AsFd> Generic<F, std::io::Error> {
187    /// Wrap a FD-backed type into a `Generic` event source that uses
188    /// [`std::io::Error`] as its error type.
189    pub fn new(file: F, interest: Interest, mode: Mode) -> Generic<F, std::io::Error> {
190        Generic {
191            file: Some(NoIoDrop(file)),
192            interest,
193            mode,
194            token: None,
195            poller: None,
196            _error_type: PhantomData,
197        }
198    }
199
200    /// Wrap a FD-backed type into a `Generic` event source using an arbitrary error type.
201    pub fn new_with_error<E>(file: F, interest: Interest, mode: Mode) -> Generic<F, E> {
202        Generic {
203            file: Some(NoIoDrop(file)),
204            interest,
205            mode,
206            token: None,
207            poller: None,
208            _error_type: PhantomData,
209        }
210    }
211}
212
213impl<F: AsFd, E> Generic<F, E> {
214    /// Unwrap the `Generic` source to retrieve the underlying type
215    pub fn unwrap(mut self) -> F {
216        let NoIoDrop(file) = self.file.take().unwrap();
217
218        // Remove it from the poller.
219        if let Some(poller) = self.poller.take() {
220            poller
221                .delete(
222                    #[cfg(unix)]
223                    file.as_fd(),
224                    #[cfg(windows)]
225                    file.as_socket(),
226                )
227                .ok();
228        }
229
230        file
231    }
232
233    /// Get a reference to the underlying type.
234    pub fn get_ref(&self) -> &F {
235        &self.file.as_ref().unwrap().0
236    }
237
238    /// Get a mutable reference to the underlying type.
239    ///
240    /// # Safety
241    ///
242    /// This is unsafe because it allows you to modify the underlying type, which
243    /// allows you to drop the underlying event source. Dropping the underlying source
244    /// leads to a dangling reference.
245    pub unsafe fn get_mut(&mut self) -> &mut F {
246        self.file.as_mut().unwrap().get_mut()
247    }
248}
249
250impl<F: AsFd, E> Drop for Generic<F, E> {
251    fn drop(&mut self) {
252        // Remove it from the poller.
253        if let (Some(file), Some(poller)) = (self.file.take(), self.poller.take()) {
254            poller
255                .delete(
256                    #[cfg(unix)]
257                    file.as_fd(),
258                    #[cfg(windows)]
259                    file.as_socket(),
260                )
261                .ok();
262        }
263    }
264}
265
266impl<F, E> EventSource for Generic<F, E>
267where
268    F: AsFd,
269    E: Into<Box<dyn std::error::Error + Send + Sync>>,
270{
271    type Event = Readiness;
272    type Metadata = NoIoDrop<F>;
273    type Ret = Result<PostAction, E>;
274    type Error = E;
275
276    fn process_events<C>(
277        &mut self,
278        readiness: Readiness,
279        token: Token,
280        mut callback: C,
281    ) -> Result<PostAction, Self::Error>
282    where
283        C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
284    {
285        // If the token is invalid or not ours, skip processing.
286        if self.token != Some(token) {
287            return Ok(PostAction::Continue);
288        }
289
290        callback(readiness, self.file.as_mut().unwrap())
291    }
292
293    fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
294        let token = token_factory.token();
295
296        // SAFETY: We ensure that we have a poller to deregister with (see below).
297        unsafe {
298            poll.register(
299                &self.file.as_ref().unwrap().0,
300                self.interest,
301                self.mode,
302                token,
303            )?;
304        }
305
306        // Make sure we can use the poller to deregister if need be.
307        // But only if registration actually succeeded
308        // So that we don't try to unregister the FD on drop if it wasn't registered
309        // in the first place (for example if registration failed because of a duplicate insertion)
310        self.poller = Some(poll.poller().clone());
311        self.token = Some(token);
312
313        Ok(())
314    }
315
316    fn reregister(
317        &mut self,
318        poll: &mut Poll,
319        token_factory: &mut TokenFactory,
320    ) -> crate::Result<()> {
321        let token = token_factory.token();
322
323        poll.reregister(
324            &self.file.as_ref().unwrap().0,
325            self.interest,
326            self.mode,
327            token,
328        )?;
329
330        self.token = Some(token);
331        Ok(())
332    }
333
334    fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
335        poll.unregister(&self.file.as_ref().unwrap().0)?;
336        self.poller = None;
337        self.token = None;
338        Ok(())
339    }
340}
341
342#[cfg(all(unix, test))]
343mod tests {
344    use std::io::{Read, Write};
345
346    use super::Generic;
347    use crate::{Dispatcher, Interest, Mode, PostAction};
348    #[cfg(unix)]
349    #[test]
350    fn dispatch_unix() {
351        use std::os::unix::net::UnixStream;
352
353        let mut event_loop = crate::EventLoop::try_new().unwrap();
354
355        let handle = event_loop.handle();
356
357        let (mut tx, rx) = UnixStream::pair().unwrap();
358
359        let generic = Generic::new(rx, Interest::READ, Mode::Level);
360
361        let mut dispached = false;
362
363        let _generic_token = handle
364            .insert_source(generic, move |readiness, file, d| {
365                assert!(readiness.readable);
366                // we have not registered for writability
367                assert!(!readiness.writable);
368                let mut buffer = vec![0; 10];
369                let ret = (&**file).read(&mut buffer).unwrap();
370                assert_eq!(ret, 6);
371                assert_eq!(&buffer[..6], &[1, 2, 3, 4, 5, 6]);
372
373                *d = true;
374                Ok(PostAction::Continue)
375            })
376            .unwrap();
377
378        event_loop
379            .dispatch(Some(::std::time::Duration::ZERO), &mut dispached)
380            .unwrap();
381
382        assert!(!dispached);
383
384        let ret = tx.write(&[1, 2, 3, 4, 5, 6]).unwrap();
385        assert_eq!(ret, 6);
386        tx.flush().unwrap();
387
388        event_loop
389            .dispatch(Some(::std::time::Duration::ZERO), &mut dispached)
390            .unwrap();
391
392        assert!(dispached);
393    }
394
395    #[test]
396    fn register_deregister_unix() {
397        use std::os::unix::net::UnixStream;
398
399        let mut event_loop = crate::EventLoop::try_new().unwrap();
400
401        let handle = event_loop.handle();
402
403        let (mut tx, rx) = UnixStream::pair().unwrap();
404
405        let generic = Generic::new(rx, Interest::READ, Mode::Level);
406        let dispatcher = Dispatcher::new(generic, move |_, _, d| {
407            *d = true;
408            Ok(PostAction::Continue)
409        });
410
411        let mut dispached = false;
412
413        let generic_token = handle.register_dispatcher(dispatcher.clone()).unwrap();
414
415        event_loop
416            .dispatch(Some(::std::time::Duration::ZERO), &mut dispached)
417            .unwrap();
418
419        assert!(!dispached);
420
421        // remove the source, and then write something
422
423        event_loop.handle().remove(generic_token);
424
425        let ret = tx.write(&[1, 2, 3, 4, 5, 6]).unwrap();
426        assert_eq!(ret, 6);
427        tx.flush().unwrap();
428
429        event_loop
430            .dispatch(Some(::std::time::Duration::ZERO), &mut dispached)
431            .unwrap();
432
433        // the source has not been dispatched, as the source is no longer here
434        assert!(!dispached);
435
436        // insert it again
437        let generic = dispatcher.into_source_inner();
438        let _generic_token = handle
439            .insert_source(generic, move |readiness, file, d| {
440                assert!(readiness.readable);
441                // we have not registered for writability
442                assert!(!readiness.writable);
443                let mut buffer = vec![0; 10];
444                let ret = (&**file).read(&mut buffer).unwrap();
445                assert_eq!(ret, 6);
446                assert_eq!(&buffer[..6], &[1, 2, 3, 4, 5, 6]);
447
448                *d = true;
449                Ok(PostAction::Continue)
450            })
451            .unwrap();
452
453        event_loop
454            .dispatch(Some(::std::time::Duration::ZERO), &mut dispached)
455            .unwrap();
456
457        // the has now been properly dispatched
458        assert!(dispached);
459    }
460
461    // Duplicate insertion does not fail on all platforms, but does on Linux
462    #[cfg(target_os = "linux")]
463    #[test]
464    fn duplicate_insert() {
465        use std::os::unix::{
466            io::{AsFd, BorrowedFd},
467            net::UnixStream,
468        };
469        let event_loop = crate::EventLoop::<()>::try_new().unwrap();
470
471        let handle = event_loop.handle();
472
473        let (_, rx) = UnixStream::pair().unwrap();
474
475        // Rc only implements AsFd since 1.69...
476        struct RcFd<T> {
477            rc: std::rc::Rc<T>,
478        }
479
480        impl<T: AsFd> AsFd for RcFd<T> {
481            fn as_fd(&self) -> BorrowedFd<'_> {
482                self.rc.as_fd()
483            }
484        }
485
486        let rx = std::rc::Rc::new(rx);
487
488        let token = handle
489            .insert_source(
490                Generic::new(RcFd { rc: rx.clone() }, Interest::READ, Mode::Level),
491                |_, _, _| Ok(PostAction::Continue),
492            )
493            .unwrap();
494
495        // inserting the same FD a second time should fail
496        let ret = handle.insert_source(
497            Generic::new(RcFd { rc: rx.clone() }, Interest::READ, Mode::Level),
498            |_, _, _| Ok(PostAction::Continue),
499        );
500        assert!(ret.is_err());
501        std::mem::drop(ret);
502
503        // but the original token is still registered
504        handle.update(&token).unwrap();
505    }
506}