calloop/
io.rs

1//! Adapters for async IO objects
2//!
3//! This module mainly hosts the [`Async`] adapter for making IO objects async with readiness
4//! monitoring backed by an [`EventLoop`](crate::EventLoop). See [`LoopHandle::adapt_io`] for
5//! how to create them.
6//!
7//! [`LoopHandle::adapt_io`]: crate::LoopHandle#method.adapt_io
8
9use std::cell::RefCell;
10use std::pin::Pin;
11use std::rc::Rc;
12use std::task::{Context, Poll as TaskPoll, Waker};
13
14#[cfg(unix)]
15use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
16#[cfg(windows)]
17use std::os::windows::io::{
18    AsRawSocket as AsRawFd, AsSocket as AsFd, BorrowedSocket as BorrowedFd, RawSocket as RawFd,
19};
20
21#[cfg(feature = "futures-io")]
22use futures_io::{AsyncRead, AsyncWrite, IoSlice, IoSliceMut};
23
24use crate::loop_logic::EventIterator;
25use crate::{
26    loop_logic::LoopInner, sources::EventDispatcher, Interest, Mode, Poll, PostAction, Readiness,
27    Token, TokenFactory,
28};
29use crate::{AdditionalLifecycleEventsSet, RegistrationToken};
30
31/// Adapter for async IO manipulations
32///
33/// This type wraps an IO object, providing methods to create futures waiting for its
34/// readiness.
35///
36/// If the `futures-io` cargo feature is enabled, it also implements `AsyncRead` and/or
37/// `AsyncWrite` if the underlying type implements `Read` and/or `Write`.
38///
39/// Note that this adapter and the futures procuded from it and *not* threadsafe.
40///
41/// ## Platform-Specific
42///
43/// - **Windows:** Usually, on drop, the file descriptor is set back to its previous status.
44///   For example, if the file was previously nonblocking it will be set to nonblocking, and
45///   if the file was blocking it will be set to blocking. However, on Windows, it is impossible
46///   to tell what its status was before. Therefore it will always be set to blocking.
47pub struct Async<'l, F: AsFd> {
48    fd: Option<F>,
49    dispatcher: Rc<RefCell<IoDispatcher>>,
50    inner: Rc<dyn IoLoopInner + 'l>,
51    was_nonblocking: bool,
52}
53
54impl<'l, F: AsFd + std::fmt::Debug> std::fmt::Debug for Async<'l, F> {
55    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
56    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57        f.debug_struct("Async").field("fd", &self.fd).finish()
58    }
59}
60
61impl<'l, F: AsFd> Async<'l, F> {
62    pub(crate) fn new<Data>(inner: Rc<LoopInner<'l, Data>>, fd: F) -> crate::Result<Async<'l, F>> {
63        // set non-blocking
64        let was_nonblocking = set_nonblocking(
65            #[cfg(unix)]
66            fd.as_fd(),
67            #[cfg(windows)]
68            fd.as_socket(),
69            true,
70        )?;
71        // register in the loop
72        let dispatcher = Rc::new(RefCell::new(IoDispatcher {
73            #[cfg(unix)]
74            fd: fd.as_fd().as_raw_fd(),
75            #[cfg(windows)]
76            fd: fd.as_socket().as_raw_socket(),
77            token: None,
78            waker: None,
79            is_registered: false,
80            interest: Interest::EMPTY,
81            last_readiness: Readiness::EMPTY,
82        }));
83
84        {
85            let mut sources = inner.sources.borrow_mut();
86            let slot = sources.vacant_entry();
87            slot.source = Some(dispatcher.clone());
88            dispatcher.borrow_mut().token = Some(Token { inner: slot.token });
89        }
90
91        // SAFETY: We are sure to deregister on drop.
92        unsafe {
93            inner.register(&dispatcher)?;
94        }
95
96        // Straightforward casting would require us to add the bound `Data: 'l` but we don't actually need it
97        // as this module never accesses the dispatch data, so we use transmute to erase it
98        let inner: Rc<dyn IoLoopInner + 'l> =
99            unsafe { std::mem::transmute(inner as Rc<dyn IoLoopInner>) };
100
101        Ok(Async {
102            fd: Some(fd),
103            dispatcher,
104            inner,
105            was_nonblocking,
106        })
107    }
108
109    /// Mutably access the underlying IO object
110    pub fn get_mut(&mut self) -> &mut F {
111        self.fd.as_mut().unwrap()
112    }
113
114    /// A future that resolves once the object becomes ready for reading
115    pub fn readable<'s>(&'s mut self) -> Readable<'s, 'l, F> {
116        Readable { io: self }
117    }
118
119    /// A future that resolves once the object becomes ready for writing
120    pub fn writable<'s>(&'s mut self) -> Writable<'s, 'l, F> {
121        Writable { io: self }
122    }
123
124    /// Remove the async adapter and retrieve the underlying object
125    pub fn into_inner(mut self) -> F {
126        self.fd.take().unwrap()
127    }
128
129    fn readiness(&self) -> Readiness {
130        self.dispatcher.borrow_mut().readiness()
131    }
132
133    fn register_waker(&self, interest: Interest, waker: Waker) -> crate::Result<()> {
134        {
135            let mut disp = self.dispatcher.borrow_mut();
136            disp.interest = interest;
137            disp.waker = Some(waker);
138        }
139        self.inner.reregister(&self.dispatcher)
140    }
141}
142
143/// A future that resolves once the associated object becomes ready for reading
144#[derive(Debug)]
145pub struct Readable<'s, 'l, F: AsFd> {
146    io: &'s mut Async<'l, F>,
147}
148
149impl<'s, 'l, F: AsFd> std::future::Future for Readable<'s, 'l, F> {
150    type Output = ();
151    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> TaskPoll<()> {
152        let io = &mut self.as_mut().io;
153        let readiness = io.readiness();
154        if readiness.readable || readiness.error {
155            TaskPoll::Ready(())
156        } else {
157            let _ = io.register_waker(Interest::READ, cx.waker().clone());
158            TaskPoll::Pending
159        }
160    }
161}
162
163/// A future that resolves once the associated object becomes ready for writing
164#[derive(Debug)]
165pub struct Writable<'s, 'l, F: AsFd> {
166    io: &'s mut Async<'l, F>,
167}
168
169impl<'s, 'l, F: AsFd> std::future::Future for Writable<'s, 'l, F> {
170    type Output = ();
171    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> TaskPoll<()> {
172        let io = &mut self.as_mut().io;
173        let readiness = io.readiness();
174        if readiness.writable || readiness.error {
175            TaskPoll::Ready(())
176        } else {
177            let _ = io.register_waker(Interest::WRITE, cx.waker().clone());
178            TaskPoll::Pending
179        }
180    }
181}
182
183impl<'l, F: AsFd> Drop for Async<'l, F> {
184    fn drop(&mut self) {
185        self.inner.kill(&self.dispatcher);
186        // restore flags
187        let _ = set_nonblocking(
188            unsafe { BorrowedFd::borrow_raw(self.dispatcher.borrow().fd) },
189            self.was_nonblocking,
190        );
191    }
192}
193
194impl<'l, F: AsFd> Unpin for Async<'l, F> {}
195
196trait IoLoopInner {
197    unsafe fn register(&self, dispatcher: &RefCell<IoDispatcher>) -> crate::Result<()>;
198    fn reregister(&self, dispatcher: &RefCell<IoDispatcher>) -> crate::Result<()>;
199    fn kill(&self, dispatcher: &RefCell<IoDispatcher>);
200}
201
202impl<'l, Data> IoLoopInner for LoopInner<'l, Data> {
203    unsafe fn register(&self, dispatcher: &RefCell<IoDispatcher>) -> crate::Result<()> {
204        let disp = dispatcher.borrow();
205        self.poll.borrow_mut().register(
206            unsafe { BorrowedFd::borrow_raw(disp.fd) },
207            Interest::EMPTY,
208            Mode::OneShot,
209            disp.token.expect("No token for IO dispatcher"),
210        )
211    }
212
213    fn reregister(&self, dispatcher: &RefCell<IoDispatcher>) -> crate::Result<()> {
214        let disp = dispatcher.borrow();
215        self.poll.borrow_mut().reregister(
216            unsafe { BorrowedFd::borrow_raw(disp.fd) },
217            disp.interest,
218            Mode::OneShot,
219            disp.token.expect("No token for IO dispatcher"),
220        )
221    }
222
223    fn kill(&self, dispatcher: &RefCell<IoDispatcher>) {
224        let token = dispatcher
225            .borrow()
226            .token
227            .expect("No token for IO dispatcher");
228        if let Ok(slot) = self.sources.borrow_mut().get_mut(token.inner) {
229            slot.source = None;
230        }
231    }
232}
233
234struct IoDispatcher {
235    fd: RawFd, // FIXME: `BorrowedFd`? How to statically verify it doesn't outlive file?
236    token: Option<Token>,
237    waker: Option<Waker>,
238    is_registered: bool,
239    interest: Interest,
240    last_readiness: Readiness,
241}
242
243impl IoDispatcher {
244    fn readiness(&mut self) -> Readiness {
245        std::mem::replace(&mut self.last_readiness, Readiness::EMPTY)
246    }
247}
248
249impl<Data> EventDispatcher<Data> for RefCell<IoDispatcher> {
250    fn process_events(
251        &self,
252        readiness: Readiness,
253        _token: Token,
254        _data: &mut Data,
255    ) -> crate::Result<PostAction> {
256        let mut disp = self.borrow_mut();
257        disp.last_readiness = readiness;
258        if let Some(waker) = disp.waker.take() {
259            waker.wake();
260        }
261        Ok(PostAction::Continue)
262    }
263
264    fn register(
265        &self,
266        _: &mut Poll,
267        _: &mut AdditionalLifecycleEventsSet,
268        _: &mut TokenFactory,
269    ) -> crate::Result<()> {
270        // registration is handled by IoLoopInner
271        unreachable!()
272    }
273
274    fn reregister(
275        &self,
276        _: &mut Poll,
277        _: &mut AdditionalLifecycleEventsSet,
278        _: &mut TokenFactory,
279    ) -> crate::Result<bool> {
280        // registration is handled by IoLoopInner
281        unreachable!()
282    }
283
284    fn unregister(
285        &self,
286        poll: &mut Poll,
287        _: &mut AdditionalLifecycleEventsSet,
288        _: RegistrationToken,
289    ) -> crate::Result<bool> {
290        let disp = self.borrow();
291        if disp.is_registered {
292            poll.unregister(unsafe { BorrowedFd::borrow_raw(disp.fd) })?;
293        }
294        Ok(true)
295    }
296
297    fn before_sleep(&self) -> crate::Result<Option<(Readiness, Token)>> {
298        Ok(None)
299    }
300    fn before_handle_events(&self, _: EventIterator<'_>) {}
301}
302
303/*
304 * Async IO trait implementations
305 */
306
307#[cfg(feature = "futures-io")]
308#[cfg_attr(docsrs, doc(cfg(feature = "futures-io")))]
309impl<'l, F: AsFd + std::io::Read> AsyncRead for Async<'l, F> {
310    fn poll_read(
311        mut self: Pin<&mut Self>,
312        cx: &mut Context<'_>,
313        buf: &mut [u8],
314    ) -> TaskPoll<std::io::Result<usize>> {
315        match (*self).get_mut().read(buf) {
316            Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {}
317            res => return TaskPoll::Ready(res),
318        }
319        self.register_waker(Interest::READ, cx.waker().clone())?;
320        TaskPoll::Pending
321    }
322
323    fn poll_read_vectored(
324        mut self: Pin<&mut Self>,
325        cx: &mut Context<'_>,
326        bufs: &mut [IoSliceMut<'_>],
327    ) -> TaskPoll<std::io::Result<usize>> {
328        match (*self).get_mut().read_vectored(bufs) {
329            Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {}
330            res => return TaskPoll::Ready(res),
331        }
332        self.register_waker(Interest::READ, cx.waker().clone())?;
333        TaskPoll::Pending
334    }
335}
336
337#[cfg(feature = "futures-io")]
338#[cfg_attr(docsrs, doc(cfg(feature = "futures-io")))]
339impl<'l, F: AsFd + std::io::Write> AsyncWrite for Async<'l, F> {
340    fn poll_write(
341        mut self: Pin<&mut Self>,
342        cx: &mut Context<'_>,
343        buf: &[u8],
344    ) -> TaskPoll<std::io::Result<usize>> {
345        match (*self).get_mut().write(buf) {
346            Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {}
347            res => return TaskPoll::Ready(res),
348        }
349        self.register_waker(Interest::WRITE, cx.waker().clone())?;
350        TaskPoll::Pending
351    }
352
353    fn poll_write_vectored(
354        mut self: Pin<&mut Self>,
355        cx: &mut Context<'_>,
356        bufs: &[IoSlice<'_>],
357    ) -> TaskPoll<std::io::Result<usize>> {
358        match (*self).get_mut().write_vectored(bufs) {
359            Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {}
360            res => return TaskPoll::Ready(res),
361        }
362        self.register_waker(Interest::WRITE, cx.waker().clone())?;
363        TaskPoll::Pending
364    }
365
366    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> TaskPoll<std::io::Result<()>> {
367        match (*self).get_mut().flush() {
368            Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {}
369            res => return TaskPoll::Ready(res),
370        }
371        self.register_waker(Interest::WRITE, cx.waker().clone())?;
372        TaskPoll::Pending
373    }
374
375    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> TaskPoll<std::io::Result<()>> {
376        self.poll_flush(cx)
377    }
378}
379
380// https://github.com/smol-rs/async-io/blob/6499077421495f2200d5b86918399f3a84bbe8e4/src/lib.rs#L2171-L2195
381/// Set the nonblocking status of an FD and return whether it was nonblocking before.
382#[allow(clippy::needless_return)]
383#[inline]
384fn set_nonblocking(fd: BorrowedFd<'_>, is_nonblocking: bool) -> std::io::Result<bool> {
385    #[cfg(windows)]
386    {
387        rustix::io::ioctl_fionbio(fd, is_nonblocking)?;
388
389        // Unfortunately it is impossible to tell if a socket was nonblocking on Windows.
390        // Just say it wasn't for now.
391        return Ok(false);
392    }
393
394    #[cfg(not(windows))]
395    {
396        let previous = rustix::fs::fcntl_getfl(fd)?;
397        let new = if is_nonblocking {
398            previous | rustix::fs::OFlags::NONBLOCK
399        } else {
400            previous & !(rustix::fs::OFlags::NONBLOCK)
401        };
402        if new != previous {
403            rustix::fs::fcntl_setfl(fd, new)?;
404        }
405
406        return Ok(previous.contains(rustix::fs::OFlags::NONBLOCK));
407    }
408}
409
410#[cfg(all(test, unix, feature = "executor", feature = "futures-io"))]
411mod tests {
412    use futures::io::{AsyncReadExt, AsyncWriteExt};
413
414    use crate::sources::futures::executor;
415
416    #[test]
417    fn read_write() {
418        let mut event_loop = crate::EventLoop::try_new().unwrap();
419        let handle = event_loop.handle();
420        let (exec, sched) = executor().unwrap();
421        handle
422            .insert_source(exec, move |ret, &mut (), got| {
423                *got = ret;
424            })
425            .unwrap();
426
427        let (tx, rx) = std::os::unix::net::UnixStream::pair().unwrap();
428        let mut tx = handle.adapt_io(tx).unwrap();
429        let mut rx = handle.adapt_io(rx).unwrap();
430        let received = std::rc::Rc::new(std::cell::Cell::new(false));
431        let fut_received = received.clone();
432
433        sched
434            .schedule(async move {
435                let mut buf = [0; 12];
436                rx.read_exact(&mut buf).await.unwrap();
437                assert_eq!(&buf, b"Hello World!");
438                fut_received.set(true);
439            })
440            .unwrap();
441
442        // The receiving future alone cannot advance
443        event_loop
444            .dispatch(Some(std::time::Duration::from_millis(10)), &mut ())
445            .unwrap();
446        assert!(!received.get());
447
448        // schedule the writing future as well and wait until finish
449        sched
450            .schedule(async move {
451                tx.write_all(b"Hello World!").await.unwrap();
452                tx.flush().await.unwrap();
453            })
454            .unwrap();
455
456        while !received.get() {
457            event_loop.dispatch(None, &mut ()).unwrap();
458        }
459    }
460
461    #[test]
462    fn read_write_vectored() {
463        let mut event_loop = crate::EventLoop::try_new().unwrap();
464        let handle = event_loop.handle();
465        let (exec, sched) = executor().unwrap();
466        handle
467            .insert_source(exec, move |ret, &mut (), got| {
468                *got = ret;
469            })
470            .unwrap();
471
472        let (tx, rx) = std::os::unix::net::UnixStream::pair().unwrap();
473        let mut tx = handle.adapt_io(tx).unwrap();
474        let mut rx = handle.adapt_io(rx).unwrap();
475        let received = std::rc::Rc::new(std::cell::Cell::new(false));
476        let fut_received = received.clone();
477
478        sched
479            .schedule(async move {
480                let mut buf = [0; 12];
481                let mut ioslices = buf
482                    .chunks_mut(2)
483                    .map(std::io::IoSliceMut::new)
484                    .collect::<Vec<_>>();
485                let count = rx.read_vectored(&mut ioslices).await.unwrap();
486                assert_eq!(count, 12);
487                assert_eq!(&buf, b"Hello World!");
488                fut_received.set(true);
489            })
490            .unwrap();
491
492        // The receiving future alone cannot advance
493        event_loop
494            .dispatch(Some(std::time::Duration::from_millis(10)), &mut ())
495            .unwrap();
496        assert!(!received.get());
497
498        // schedule the writing future as well and wait until finish
499        sched
500            .schedule(async move {
501                let buf = b"Hello World!";
502                let ioslices = buf.chunks(2).map(std::io::IoSlice::new).collect::<Vec<_>>();
503                let count = tx.write_vectored(&ioslices).await.unwrap();
504                assert_eq!(count, 12);
505                tx.flush().await.unwrap();
506            })
507            .unwrap();
508
509        while !received.get() {
510            event_loop.dispatch(None, &mut ()).unwrap();
511        }
512    }
513
514    #[test]
515    fn readable() {
516        use std::io::Write;
517
518        let mut event_loop = crate::EventLoop::try_new().unwrap();
519        let handle = event_loop.handle();
520        let (exec, sched) = executor().unwrap();
521        handle
522            .insert_source(exec, move |(), &mut (), got| {
523                *got = true;
524            })
525            .unwrap();
526
527        let (mut tx, rx) = std::os::unix::net::UnixStream::pair().unwrap();
528
529        let mut rx = handle.adapt_io(rx).unwrap();
530        sched
531            .schedule(async move {
532                rx.readable().await;
533            })
534            .unwrap();
535
536        let mut dispatched = false;
537
538        event_loop
539            .dispatch(Some(std::time::Duration::from_millis(100)), &mut dispatched)
540            .unwrap();
541        // The socket is not yet readable, so the readable() future has not completed
542        assert!(!dispatched);
543
544        tx.write_all(&[42]).unwrap();
545        tx.flush().unwrap();
546
547        // Now we should become readable
548        while !dispatched {
549            event_loop.dispatch(None, &mut dispatched).unwrap();
550        }
551    }
552
553    #[test]
554    fn writable() {
555        use std::io::{BufReader, BufWriter, Read, Write};
556
557        let mut event_loop = crate::EventLoop::try_new().unwrap();
558        let handle = event_loop.handle();
559        let (exec, sched) = executor().unwrap();
560        handle
561            .insert_source(exec, move |(), &mut (), got| {
562                *got = true;
563            })
564            .unwrap();
565
566        let (mut tx, mut rx) = std::os::unix::net::UnixStream::pair().unwrap();
567        tx.set_nonblocking(true).unwrap();
568        rx.set_nonblocking(true).unwrap();
569
570        // First, fill the socket buffers
571        {
572            let mut writer = BufWriter::new(&mut tx);
573            let data = vec![42u8; 1024];
574            loop {
575                if writer.write(&data).is_err() {
576                    break;
577                }
578            }
579        }
580
581        // Now, wait for it to be readable
582        let mut tx = handle.adapt_io(tx).unwrap();
583        sched
584            .schedule(async move {
585                tx.writable().await;
586            })
587            .unwrap();
588
589        let mut dispatched = false;
590
591        event_loop
592            .dispatch(Some(std::time::Duration::from_millis(100)), &mut dispatched)
593            .unwrap();
594        // The socket is not yet writable, so the readable() future has not completed
595        assert!(!dispatched);
596
597        // now read everything
598        {
599            let mut reader = BufReader::new(&mut rx);
600            let mut buffer = vec![0u8; 1024];
601            loop {
602                if reader.read(&mut buffer).is_err() {
603                    break;
604                }
605            }
606        }
607
608        // Now we should become writable
609        while !dispatched {
610            event_loop.dispatch(None, &mut dispatched).unwrap();
611        }
612    }
613}