calloop/sources/futures.rs
1//! A futures executor as an event source
2//!
3//! Only available with the `executor` cargo feature of `calloop`.
4//!
5//! This executor is intended for light futures, which will be polled as part of your
6//! event loop. Such futures may be waiting for IO, or for some external computation on an
7//! other thread for example.
8//!
9//! You can create a new executor using the `executor` function, which creates a pair
10//! `(Executor<T>, Scheduler<T>)` to handle futures that all evaluate to type `T`. The
11//! executor should be inserted into your event loop, and will yield the return values of
12//! the futures as they finish into your callback. The scheduler can be cloned and used
13//! to send futures to be executed into the executor. A generic executor can be obtained
14//! by choosing `T = ()` and letting futures handle the forwarding of their return values
15//! (if any) by their own means.
16//!
17//! **Note:** The futures must have their own means of being woken up, as this executor is,
18//! by itself, not I/O aware. See [`LoopHandle::adapt_io`](crate::LoopHandle#method.adapt_io)
19//! for that, or you can use some other mechanism if you prefer.
20
21use async_task::{Builder, Runnable};
22use slab::Slab;
23use std::{
24 cell::RefCell,
25 fmt,
26 future::Future,
27 rc::Rc,
28 sync::{
29 atomic::{AtomicBool, Ordering},
30 mpsc, Arc, Mutex,
31 },
32 task::Waker,
33};
34
35use crate::{
36 sources::{
37 channel::ChannelError,
38 ping::{make_ping, Ping, PingError, PingSource},
39 EventSource,
40 },
41 Poll, PostAction, Readiness, Token, TokenFactory,
42};
43
44/// A future executor as an event source
45#[derive(Debug)]
46pub struct Executor<T> {
47 /// Shared state between the executor and the scheduler.
48 state: Rc<State<T>>,
49
50 /// Notifies us when the executor is woken up.
51 source: PingSource,
52
53 /// Used for when we need to wake ourselves up.
54 ping: Ping,
55}
56
57/// A scheduler to send futures to an executor
58#[derive(Clone, Debug)]
59pub struct Scheduler<T> {
60 /// Shared state between the executor and the scheduler.
61 state: Rc<State<T>>,
62}
63
64/// The inner state of the executor.
65#[derive(Debug)]
66struct State<T> {
67 /// The incoming queue of runnables to be executed.
68 incoming: mpsc::Receiver<Runnable<usize>>,
69
70 /// The sender corresponding to `incoming`.
71 sender: Arc<Sender>,
72
73 /// The list of currently active tasks.
74 ///
75 /// This is set to `None` when the executor is destroyed.
76 active_tasks: RefCell<Option<Slab<Active<T>>>>,
77}
78
79/// Send a future to an executor.
80///
81/// This needs to be thread-safe, as it is called from a `Waker` that may be on a different thread.
82#[derive(Debug)]
83struct Sender {
84 /// The sender used to send runnables to the executor.
85 ///
86 /// `mpsc::Sender` is `!Sync`, wrapping it in a `Mutex` makes it `Sync`.
87 sender: Mutex<mpsc::Sender<Runnable<usize>>>,
88
89 /// The ping source used to wake up the executor.
90 wake_up: Ping,
91
92 /// Whether the executor has already been woken.
93 notified: AtomicBool,
94}
95
96/// An active future or its result.
97#[derive(Debug)]
98enum Active<T> {
99 /// The future is currently being polled.
100 ///
101 /// Waking this waker will insert the runnable into `incoming`.
102 Future(Waker),
103
104 /// The future has finished polling, and its result is stored here.
105 Finished(T),
106}
107
108impl<T> Active<T> {
109 fn is_finished(&self) -> bool {
110 matches!(self, Active::Finished(_))
111 }
112}
113
114impl<T> Scheduler<T> {
115 /// Sends the given future to the executor associated to this scheduler
116 ///
117 /// Returns an error if the the executor not longer exists.
118 pub fn schedule<Fut>(&self, future: Fut) -> Result<(), ExecutorDestroyed>
119 where
120 Fut: Future<Output = T> + 'static,
121 T: 'static,
122 {
123 /// Store this future's result in the executor.
124 struct StoreOnDrop<'a, T> {
125 index: usize,
126 value: Option<T>,
127 state: &'a State<T>,
128 }
129
130 impl<T> Drop for StoreOnDrop<'_, T> {
131 fn drop(&mut self) {
132 let mut active_tasks = self.state.active_tasks.borrow_mut();
133 if let Some(active_tasks) = active_tasks.as_mut() {
134 if let Some(value) = self.value.take() {
135 active_tasks[self.index] = Active::Finished(value);
136 } else {
137 // The future was dropped before it finished.
138 // Remove it from the active list.
139 active_tasks.remove(self.index);
140 }
141 }
142 }
143 }
144
145 fn assert_send_and_sync<T: Send + Sync>(_: &T) {}
146
147 let mut active_guard = self.state.active_tasks.borrow_mut();
148 let active_tasks = active_guard.as_mut().ok_or(ExecutorDestroyed)?;
149
150 // Wrap the future in another future that polls it and stores the result.
151 let index = active_tasks.vacant_key();
152 let future = {
153 let state = self.state.clone();
154 async move {
155 let mut guard = StoreOnDrop {
156 index,
157 value: None,
158 state: &state,
159 };
160
161 // Get the value of the future.
162 let value = future.await;
163
164 // Store it in the executor.
165 guard.value = Some(value);
166 }
167 };
168
169 // A schedule function that inserts the runnable into the incoming queue.
170 let schedule = {
171 let sender = self.state.sender.clone();
172 move |runnable| sender.send(runnable)
173 };
174
175 assert_send_and_sync(&schedule);
176
177 // Spawn the future.
178 let (runnable, task) = Builder::new()
179 .metadata(index)
180 .spawn_local(move |_| future, schedule);
181
182 // Insert the runnable into the set of active tasks.
183 active_tasks.insert(Active::Future(runnable.waker()));
184 drop(active_guard);
185
186 // Schedule the runnable and detach the task so it isn't cancellable.
187 runnable.schedule();
188 task.detach();
189
190 Ok(())
191 }
192}
193
194impl Sender {
195 /// Send a runnable to the executor.
196 fn send(&self, runnable: Runnable<usize>) {
197 // Send on the channel.
198 //
199 // All we do with the lock is call `send`, so there's no chance of any state being corrupted on
200 // panic. Therefore it's safe to ignore the mutex poison.
201 if let Err(e) = self
202 .sender
203 .lock()
204 .unwrap_or_else(|e| e.into_inner())
205 .send(runnable)
206 {
207 // The runnable must be dropped on its origin thread, since the original future might be
208 // !Send. This channel immediately sends it back to the Executor, which is pinned to the
209 // origin thread. The executor's Drop implementation will force all of the runnables to be
210 // dropped, therefore the channel should always be available. If we can't send the runnable,
211 // it indicates that the above behavior is broken and that unsoundness has occurred. The
212 // only option at this stage is to forget the runnable and leak the future.
213
214 std::mem::forget(e);
215 unreachable!("Attempted to send runnable to a stopped executor");
216 }
217
218 // If the executor is already awake, don't bother waking it up again.
219 if self.notified.swap(true, Ordering::SeqCst) {
220 return;
221 }
222
223 // Wake the executor.
224 self.wake_up.ping();
225 }
226}
227
228impl<T> Drop for Executor<T> {
229 fn drop(&mut self) {
230 let active_tasks = self.state.active_tasks.borrow_mut().take().unwrap();
231
232 // Wake all of the active tasks in order to destroy their runnables.
233 for (_, task) in active_tasks {
234 if let Active::Future(waker) = task {
235 // Don't let a panicking waker blow everything up.
236 //
237 // There is a chance that a future will panic and, during the unwinding process,
238 // drop this executor. However, since the future panicked, there is a possibility
239 // that the internal state of the waker will be invalid in such a way that the waker
240 // panics as well. Since this would be a panic during a panic, Rust will upgrade it
241 // into an abort.
242 //
243 // In the interest of not aborting without a good reason, we just drop the panic here.
244 std::panic::catch_unwind(|| waker.wake()).ok();
245 }
246 }
247
248 // Drain the queue in order to drop all of the runnables.
249 while self.state.incoming.try_recv().is_ok() {}
250 }
251}
252
253/// Error generated when trying to schedule a future after the
254/// executor was destroyed.
255#[derive(Debug)]
256pub struct ExecutorDestroyed;
257
258impl fmt::Display for ExecutorDestroyed {
259 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
260 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
261 f.write_str("the executor was destroyed")
262 }
263}
264
265impl std::error::Error for ExecutorDestroyed {}
266
267/// Create a new executor, and its associated scheduler
268///
269/// May fail due to OS errors preventing calloop to setup its internal pipes (if your
270/// process has reatched its file descriptor limit for example).
271pub fn executor<T>() -> crate::Result<(Executor<T>, Scheduler<T>)> {
272 let (sender, incoming) = mpsc::channel();
273 let (wake_up, ping) = make_ping()?;
274
275 let state = Rc::new(State {
276 incoming,
277 active_tasks: RefCell::new(Some(Slab::new())),
278 sender: Arc::new(Sender {
279 sender: Mutex::new(sender),
280 wake_up: wake_up.clone(),
281 notified: AtomicBool::new(false),
282 }),
283 });
284
285 Ok((
286 Executor {
287 state: state.clone(),
288 source: ping,
289 ping: wake_up,
290 },
291 Scheduler { state },
292 ))
293}
294
295impl<T> EventSource for Executor<T> {
296 type Event = T;
297 type Metadata = ();
298 type Ret = ();
299 type Error = ExecutorError;
300
301 fn process_events<F>(
302 &mut self,
303 readiness: Readiness,
304 token: Token,
305 mut callback: F,
306 ) -> Result<PostAction, Self::Error>
307 where
308 F: FnMut(T, &mut ()),
309 {
310 let state = &self.state;
311
312 let (clear_readiness, action) = {
313 let mut clear_readiness = false;
314
315 let action = self
316 .source
317 .process_events(readiness, token, |(), &mut ()| {
318 // Set to the unnotified state.
319 state.sender.notified.store(false, Ordering::SeqCst);
320
321 // Process runnables, but not too many at a time; better to move onto the next event quickly!
322 for _ in 0..1024 {
323 let runnable = match state.incoming.try_recv() {
324 Ok(runnable) => runnable,
325 Err(_) => {
326 // Make sure to clear the readiness if there are no more runnables.
327 clear_readiness = true;
328 break;
329 }
330 };
331
332 // Run the runnable.
333 let index = *runnable.metadata();
334 runnable.run();
335
336 // If the runnable finished with a result, call the callback.
337 let mut active_guard = state.active_tasks.borrow_mut();
338 let active_tasks = active_guard.as_mut().unwrap();
339
340 if let Some(state) = active_tasks.get(index) {
341 if state.is_finished() {
342 // Take out the state and provide it to the caller.
343 let result = match active_tasks.remove(index) {
344 Active::Finished(result) => result,
345 _ => unreachable!(),
346 };
347
348 // Drop the guard since the callback may register another future to the scheduler.
349 drop(active_guard);
350
351 callback(result, &mut ());
352 }
353 }
354 }
355 })
356 .map_err(ExecutorError::WakeError)?;
357
358 (clear_readiness, action)
359 };
360
361 // Re-ready the ping source if we need to re-run this handler.
362 if !clear_readiness {
363 self.ping.ping();
364 Ok(PostAction::Continue)
365 } else {
366 Ok(action)
367 }
368 }
369
370 fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
371 self.source.register(poll, token_factory)?;
372 Ok(())
373 }
374
375 fn reregister(
376 &mut self,
377 poll: &mut Poll,
378 token_factory: &mut TokenFactory,
379 ) -> crate::Result<()> {
380 self.source.reregister(poll, token_factory)?;
381 Ok(())
382 }
383
384 fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
385 self.source.unregister(poll)?;
386 Ok(())
387 }
388}
389
390/// An error arising from processing events in an async executor event source.
391#[derive(Debug)]
392pub enum ExecutorError {
393 /// Error while reading new futures added via [`Scheduler::schedule()`].
394 NewFutureError(ChannelError),
395
396 /// Error while processing wake events from existing futures.
397 WakeError(PingError),
398}
399
400impl fmt::Display for ExecutorError {
401 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
402 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
403 match self {
404 Self::NewFutureError(err) => write!(f, "error adding new futures: {}", err),
405 Self::WakeError(err) => write!(f, "error processing wake events: {}", err),
406 }
407 }
408}
409
410impl std::error::Error for ExecutorError {}
411
412#[cfg(test)]
413mod tests {
414 use super::*;
415
416 use std::cell::RefCell;
417 use std::rc::Rc;
418
419 #[test]
420 fn ready() {
421 let mut event_loop = crate::EventLoop::<u32>::try_new().unwrap();
422
423 let handle = event_loop.handle();
424
425 let (exec, sched) = executor::<u32>().unwrap();
426
427 handle
428 .insert_source(exec, move |ret, &mut (), got| {
429 *got = ret;
430 })
431 .unwrap();
432
433 let mut got = 0;
434
435 let fut = async { 42 };
436
437 event_loop
438 .dispatch(Some(::std::time::Duration::ZERO), &mut got)
439 .unwrap();
440
441 // the future is not yet inserted, and thus has not yet run
442 assert_eq!(got, 0);
443
444 sched.schedule(fut).unwrap();
445
446 event_loop
447 .dispatch(Some(::std::time::Duration::ZERO), &mut got)
448 .unwrap();
449
450 // the future has run
451 assert_eq!(got, 42);
452 }
453
454 #[test]
455 fn more_than_1024() {
456 let mut event_loop = crate::EventLoop::<()>::try_new().unwrap();
457 let handle = event_loop.handle();
458
459 let (exec, sched) = executor::<()>().unwrap();
460 handle.insert_source(exec, move |_, _, _| ()).unwrap();
461
462 let counter = Rc::new(RefCell::new(0));
463 for _ in 0..1025 {
464 let counter = counter.clone();
465 sched
466 .schedule(async move {
467 *counter.borrow_mut() += 1;
468 })
469 .unwrap();
470 }
471
472 event_loop
473 .dispatch(Some(::std::time::Duration::ZERO), &mut ())
474 .unwrap();
475
476 assert_eq!(*counter.borrow(), 1024);
477
478 event_loop
479 .dispatch(Some(::std::time::Duration::ZERO), &mut ())
480 .unwrap();
481
482 assert_eq!(*counter.borrow(), 1025);
483 }
484}