calloop/sources/
futures.rs

1//! A futures executor as an event source
2//!
3//! Only available with the `executor` cargo feature of `calloop`.
4//!
5//! This executor is intended for light futures, which will be polled as part of your
6//! event loop. Such futures may be waiting for IO, or for some external computation on an
7//! other thread for example.
8//!
9//! You can create a new executor using the `executor` function, which creates a pair
10//! `(Executor<T>, Scheduler<T>)` to handle futures that all evaluate to type `T`. The
11//! executor should be inserted into your event loop, and will yield the return values of
12//! the futures as they finish into your callback. The scheduler can be cloned and used
13//! to send futures to be executed into the executor. A generic executor can be obtained
14//! by choosing `T = ()` and letting futures handle the forwarding of their return values
15//! (if any) by their own means.
16//!
17//! **Note:** The futures must have their own means of being woken up, as this executor is,
18//! by itself, not I/O aware. See [`LoopHandle::adapt_io`](crate::LoopHandle#method.adapt_io)
19//! for that, or you can use some other mechanism if you prefer.
20
21use async_task::{Builder, Runnable};
22use slab::Slab;
23use std::{
24    cell::RefCell,
25    fmt,
26    future::Future,
27    rc::Rc,
28    sync::{
29        atomic::{AtomicBool, Ordering},
30        mpsc, Arc, Mutex,
31    },
32    task::Waker,
33};
34
35use crate::{
36    sources::{
37        channel::ChannelError,
38        ping::{make_ping, Ping, PingError, PingSource},
39        EventSource,
40    },
41    Poll, PostAction, Readiness, Token, TokenFactory,
42};
43
44/// A future executor as an event source
45#[derive(Debug)]
46pub struct Executor<T> {
47    /// Shared state between the executor and the scheduler.
48    state: Rc<State<T>>,
49
50    /// Notifies us when the executor is woken up.
51    source: PingSource,
52
53    /// Used for when we need to wake ourselves up.
54    ping: Ping,
55}
56
57/// A scheduler to send futures to an executor
58#[derive(Clone, Debug)]
59pub struct Scheduler<T> {
60    /// Shared state between the executor and the scheduler.
61    state: Rc<State<T>>,
62}
63
64/// The inner state of the executor.
65#[derive(Debug)]
66struct State<T> {
67    /// The incoming queue of runnables to be executed.
68    incoming: mpsc::Receiver<Runnable<usize>>,
69
70    /// The sender corresponding to `incoming`.
71    sender: Arc<Sender>,
72
73    /// The list of currently active tasks.
74    ///
75    /// This is set to `None` when the executor is destroyed.
76    active_tasks: RefCell<Option<Slab<Active<T>>>>,
77}
78
79/// Send a future to an executor.
80///
81/// This needs to be thread-safe, as it is called from a `Waker` that may be on a different thread.
82#[derive(Debug)]
83struct Sender {
84    /// The sender used to send runnables to the executor.
85    ///
86    /// `mpsc::Sender` is `!Sync`, wrapping it in a `Mutex` makes it `Sync`.
87    sender: Mutex<mpsc::Sender<Runnable<usize>>>,
88
89    /// The ping source used to wake up the executor.
90    wake_up: Ping,
91
92    /// Whether the executor has already been woken.
93    notified: AtomicBool,
94}
95
96/// An active future or its result.
97#[derive(Debug)]
98enum Active<T> {
99    /// The future is currently being polled.
100    ///
101    /// Waking this waker will insert the runnable into `incoming`.
102    Future(Waker),
103
104    /// The future has finished polling, and its result is stored here.
105    Finished(T),
106}
107
108impl<T> Active<T> {
109    fn is_finished(&self) -> bool {
110        matches!(self, Active::Finished(_))
111    }
112}
113
114impl<T> Scheduler<T> {
115    /// Sends the given future to the executor associated to this scheduler
116    ///
117    /// Returns an error if the the executor not longer exists.
118    pub fn schedule<Fut>(&self, future: Fut) -> Result<(), ExecutorDestroyed>
119    where
120        Fut: Future<Output = T> + 'static,
121        T: 'static,
122    {
123        /// Store this future's result in the executor.
124        struct StoreOnDrop<'a, T> {
125            index: usize,
126            value: Option<T>,
127            state: &'a State<T>,
128        }
129
130        impl<T> Drop for StoreOnDrop<'_, T> {
131            fn drop(&mut self) {
132                let mut active_tasks = self.state.active_tasks.borrow_mut();
133                if let Some(active_tasks) = active_tasks.as_mut() {
134                    if let Some(value) = self.value.take() {
135                        active_tasks[self.index] = Active::Finished(value);
136                    } else {
137                        // The future was dropped before it finished.
138                        // Remove it from the active list.
139                        active_tasks.remove(self.index);
140                    }
141                }
142            }
143        }
144
145        fn assert_send_and_sync<T: Send + Sync>(_: &T) {}
146
147        let mut active_guard = self.state.active_tasks.borrow_mut();
148        let active_tasks = active_guard.as_mut().ok_or(ExecutorDestroyed)?;
149
150        // Wrap the future in another future that polls it and stores the result.
151        let index = active_tasks.vacant_key();
152        let future = {
153            let state = self.state.clone();
154            async move {
155                let mut guard = StoreOnDrop {
156                    index,
157                    value: None,
158                    state: &state,
159                };
160
161                // Get the value of the future.
162                let value = future.await;
163
164                // Store it in the executor.
165                guard.value = Some(value);
166            }
167        };
168
169        // A schedule function that inserts the runnable into the incoming queue.
170        let schedule = {
171            let sender = self.state.sender.clone();
172            move |runnable| sender.send(runnable)
173        };
174
175        assert_send_and_sync(&schedule);
176
177        // Spawn the future.
178        let (runnable, task) = Builder::new()
179            .metadata(index)
180            .spawn_local(move |_| future, schedule);
181
182        // Insert the runnable into the set of active tasks.
183        active_tasks.insert(Active::Future(runnable.waker()));
184        drop(active_guard);
185
186        // Schedule the runnable and detach the task so it isn't cancellable.
187        runnable.schedule();
188        task.detach();
189
190        Ok(())
191    }
192}
193
194impl Sender {
195    /// Send a runnable to the executor.
196    fn send(&self, runnable: Runnable<usize>) {
197        // Send on the channel.
198        //
199        // All we do with the lock is call `send`, so there's no chance of any state being corrupted on
200        // panic. Therefore it's safe to ignore the mutex poison.
201        if let Err(e) = self
202            .sender
203            .lock()
204            .unwrap_or_else(|e| e.into_inner())
205            .send(runnable)
206        {
207            // The runnable must be dropped on its origin thread, since the original future might be
208            // !Send. This channel immediately sends it back to the Executor, which is pinned to the
209            // origin thread. The executor's Drop implementation will force all of the runnables to be
210            // dropped, therefore the channel should always be available. If we can't send the runnable,
211            // it indicates that the above behavior is broken and that unsoundness has occurred. The
212            // only option at this stage is to forget the runnable and leak the future.
213
214            std::mem::forget(e);
215            unreachable!("Attempted to send runnable to a stopped executor");
216        }
217
218        // If the executor is already awake, don't bother waking it up again.
219        if self.notified.swap(true, Ordering::SeqCst) {
220            return;
221        }
222
223        // Wake the executor.
224        self.wake_up.ping();
225    }
226}
227
228impl<T> Drop for Executor<T> {
229    fn drop(&mut self) {
230        let active_tasks = self.state.active_tasks.borrow_mut().take().unwrap();
231
232        // Wake all of the active tasks in order to destroy their runnables.
233        for (_, task) in active_tasks {
234            if let Active::Future(waker) = task {
235                // Don't let a panicking waker blow everything up.
236                //
237                // There is a chance that a future will panic and, during the unwinding process,
238                // drop this executor. However, since the future panicked, there is a possibility
239                // that the internal state of the waker will be invalid in such a way that the waker
240                // panics as well. Since this would be a panic during a panic, Rust will upgrade it
241                // into an abort.
242                //
243                // In the interest of not aborting without a good reason, we just drop the panic here.
244                std::panic::catch_unwind(|| waker.wake()).ok();
245            }
246        }
247
248        // Drain the queue in order to drop all of the runnables.
249        while self.state.incoming.try_recv().is_ok() {}
250    }
251}
252
253/// Error generated when trying to schedule a future after the
254/// executor was destroyed.
255#[derive(Debug)]
256pub struct ExecutorDestroyed;
257
258impl fmt::Display for ExecutorDestroyed {
259    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
260    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
261        f.write_str("the executor was destroyed")
262    }
263}
264
265impl std::error::Error for ExecutorDestroyed {}
266
267/// Create a new executor, and its associated scheduler
268///
269/// May fail due to OS errors preventing calloop to setup its internal pipes (if your
270/// process has reatched its file descriptor limit for example).
271pub fn executor<T>() -> crate::Result<(Executor<T>, Scheduler<T>)> {
272    let (sender, incoming) = mpsc::channel();
273    let (wake_up, ping) = make_ping()?;
274
275    let state = Rc::new(State {
276        incoming,
277        active_tasks: RefCell::new(Some(Slab::new())),
278        sender: Arc::new(Sender {
279            sender: Mutex::new(sender),
280            wake_up: wake_up.clone(),
281            notified: AtomicBool::new(false),
282        }),
283    });
284
285    Ok((
286        Executor {
287            state: state.clone(),
288            source: ping,
289            ping: wake_up,
290        },
291        Scheduler { state },
292    ))
293}
294
295impl<T> EventSource for Executor<T> {
296    type Event = T;
297    type Metadata = ();
298    type Ret = ();
299    type Error = ExecutorError;
300
301    fn process_events<F>(
302        &mut self,
303        readiness: Readiness,
304        token: Token,
305        mut callback: F,
306    ) -> Result<PostAction, Self::Error>
307    where
308        F: FnMut(T, &mut ()),
309    {
310        let state = &self.state;
311
312        let (clear_readiness, action) = {
313            let mut clear_readiness = false;
314
315            let action = self
316                .source
317                .process_events(readiness, token, |(), &mut ()| {
318                    // Set to the unnotified state.
319                    state.sender.notified.store(false, Ordering::SeqCst);
320
321                    // Process runnables, but not too many at a time; better to move onto the next event quickly!
322                    for _ in 0..1024 {
323                        let runnable = match state.incoming.try_recv() {
324                            Ok(runnable) => runnable,
325                            Err(_) => {
326                                // Make sure to clear the readiness if there are no more runnables.
327                                clear_readiness = true;
328                                break;
329                            }
330                        };
331
332                        // Run the runnable.
333                        let index = *runnable.metadata();
334                        runnable.run();
335
336                        // If the runnable finished with a result, call the callback.
337                        let mut active_guard = state.active_tasks.borrow_mut();
338                        let active_tasks = active_guard.as_mut().unwrap();
339
340                        if let Some(state) = active_tasks.get(index) {
341                            if state.is_finished() {
342                                // Take out the state and provide it to the caller.
343                                let result = match active_tasks.remove(index) {
344                                    Active::Finished(result) => result,
345                                    _ => unreachable!(),
346                                };
347
348                                // Drop the guard since the callback may register another future to the scheduler.
349                                drop(active_guard);
350
351                                callback(result, &mut ());
352                            }
353                        }
354                    }
355                })
356                .map_err(ExecutorError::WakeError)?;
357
358            (clear_readiness, action)
359        };
360
361        // Re-ready the ping source if we need to re-run this handler.
362        if !clear_readiness {
363            self.ping.ping();
364            Ok(PostAction::Continue)
365        } else {
366            Ok(action)
367        }
368    }
369
370    fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
371        self.source.register(poll, token_factory)?;
372        Ok(())
373    }
374
375    fn reregister(
376        &mut self,
377        poll: &mut Poll,
378        token_factory: &mut TokenFactory,
379    ) -> crate::Result<()> {
380        self.source.reregister(poll, token_factory)?;
381        Ok(())
382    }
383
384    fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
385        self.source.unregister(poll)?;
386        Ok(())
387    }
388}
389
390/// An error arising from processing events in an async executor event source.
391#[derive(Debug)]
392pub enum ExecutorError {
393    /// Error while reading new futures added via [`Scheduler::schedule()`].
394    NewFutureError(ChannelError),
395
396    /// Error while processing wake events from existing futures.
397    WakeError(PingError),
398}
399
400impl fmt::Display for ExecutorError {
401    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
402    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
403        match self {
404            Self::NewFutureError(err) => write!(f, "error adding new futures: {}", err),
405            Self::WakeError(err) => write!(f, "error processing wake events: {}", err),
406        }
407    }
408}
409
410impl std::error::Error for ExecutorError {}
411
412#[cfg(test)]
413mod tests {
414    use super::*;
415
416    use std::cell::RefCell;
417    use std::rc::Rc;
418
419    #[test]
420    fn ready() {
421        let mut event_loop = crate::EventLoop::<u32>::try_new().unwrap();
422
423        let handle = event_loop.handle();
424
425        let (exec, sched) = executor::<u32>().unwrap();
426
427        handle
428            .insert_source(exec, move |ret, &mut (), got| {
429                *got = ret;
430            })
431            .unwrap();
432
433        let mut got = 0;
434
435        let fut = async { 42 };
436
437        event_loop
438            .dispatch(Some(::std::time::Duration::ZERO), &mut got)
439            .unwrap();
440
441        // the future is not yet inserted, and thus has not yet run
442        assert_eq!(got, 0);
443
444        sched.schedule(fut).unwrap();
445
446        event_loop
447            .dispatch(Some(::std::time::Duration::ZERO), &mut got)
448            .unwrap();
449
450        // the future has run
451        assert_eq!(got, 42);
452    }
453
454    #[test]
455    fn more_than_1024() {
456        let mut event_loop = crate::EventLoop::<()>::try_new().unwrap();
457        let handle = event_loop.handle();
458
459        let (exec, sched) = executor::<()>().unwrap();
460        handle.insert_source(exec, move |_, _, _| ()).unwrap();
461
462        let counter = Rc::new(RefCell::new(0));
463        for _ in 0..1025 {
464            let counter = counter.clone();
465            sched
466                .schedule(async move {
467                    *counter.borrow_mut() += 1;
468                })
469                .unwrap();
470        }
471
472        event_loop
473            .dispatch(Some(::std::time::Duration::ZERO), &mut ())
474            .unwrap();
475
476        assert_eq!(*counter.borrow(), 1024);
477
478        event_loop
479            .dispatch(Some(::std::time::Duration::ZERO), &mut ())
480            .unwrap();
481
482        assert_eq!(*counter.borrow(), 1025);
483    }
484}