calloop/
loop_logic.rs

1use std::cell::{Cell, RefCell};
2use std::fmt::Debug;
3use std::rc::Rc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7use std::{io, slice};
8
9#[cfg(feature = "block_on")]
10use std::future::Future;
11
12#[cfg(unix)]
13use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
14#[cfg(windows)]
15use std::os::windows::io::{AsHandle, AsRawHandle, AsSocket as AsFd, BorrowedHandle, RawHandle};
16
17use polling::Poller;
18use tracing::{trace, warn};
19
20use crate::list::{SourceEntry, SourceList};
21use crate::sources::{Dispatcher, EventSource, Idle, IdleDispatcher};
22use crate::sys::{Notifier, PollEvent};
23use crate::token::TokenInner;
24use crate::{
25    AdditionalLifecycleEventsSet, InsertError, Poll, PostAction, Readiness, Token, TokenFactory,
26};
27
28type IdleCallback<'i, Data> = Rc<RefCell<dyn IdleDispatcher<Data> + 'i>>;
29
30/// A token representing a registration in the [`EventLoop`].
31///
32/// This token is given to you by the [`EventLoop`] when an [`EventSource`] is inserted or
33/// a [`Dispatcher`] is registered. You can use it to [disable](LoopHandle#method.disable),
34/// [enable](LoopHandle#method.enable), [update`](LoopHandle#method.update),
35/// [remove](LoopHandle#method.remove) or [kill](LoopHandle#method.kill) it.
36#[derive(Clone, Copy, Debug, PartialEq, Eq)]
37pub struct RegistrationToken {
38    inner: TokenInner,
39}
40
41impl RegistrationToken {
42    /// Create the RegistrationToken corresponding to the given raw key
43    /// This is needed because some methods use `RegistrationToken`s as
44    /// raw usizes within this crate
45    pub(crate) fn new(inner: TokenInner) -> Self {
46        Self { inner }
47    }
48}
49
50pub(crate) struct LoopInner<'l, Data> {
51    pub(crate) poll: RefCell<Poll>,
52    // The `Option` is used to keep slots of the slab occipied, to prevent id reuse
53    // while in-flight events might still referr to a recently destroyed event source.
54    pub(crate) sources: RefCell<SourceList<'l, Data>>,
55    pub(crate) sources_with_additional_lifecycle_events: RefCell<AdditionalLifecycleEventsSet>,
56    idles: RefCell<Vec<IdleCallback<'l, Data>>>,
57    pending_action: Cell<PostAction>,
58}
59
60/// An handle to an event loop
61///
62/// This handle allows you to insert new sources and idles in this event loop,
63/// it can be cloned, and it is possible to insert new sources from within a source
64/// callback.
65pub struct LoopHandle<'l, Data> {
66    inner: Rc<LoopInner<'l, Data>>,
67}
68
69impl<'l, Data> std::fmt::Debug for LoopHandle<'l, Data> {
70    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        f.write_str("LoopHandle { ... }")
73    }
74}
75
76impl<'l, Data> Clone for LoopHandle<'l, Data> {
77    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
78    fn clone(&self) -> Self {
79        LoopHandle {
80            inner: self.inner.clone(),
81        }
82    }
83}
84
85impl<'l, Data> LoopHandle<'l, Data> {
86    /// Inserts a new event source in the loop.
87    ///
88    /// The provided callback will be called during the dispatching cycles whenever the
89    /// associated source generates events, see `EventLoop::dispatch(..)` for details.
90    ///
91    /// This function takes ownership of the event source. Use `register_dispatcher`
92    /// if you need access to the event source after this call.
93    pub fn insert_source<S, F>(
94        &self,
95        source: S,
96        callback: F,
97    ) -> Result<RegistrationToken, InsertError<S>>
98    where
99        S: EventSource + 'l,
100        F: FnMut(S::Event, &mut S::Metadata, &mut Data) -> S::Ret + 'l,
101    {
102        let dispatcher = Dispatcher::new(source, callback);
103        self.register_dispatcher(dispatcher.clone())
104            .map_err(|error| InsertError {
105                error,
106                inserted: dispatcher.into_source_inner(),
107            })
108    }
109
110    /// Registers a `Dispatcher` in the loop.
111    ///
112    /// Use this function if you need access to the event source after its insertion in the loop.
113    ///
114    /// See also `insert_source`.
115    #[cfg_attr(feature = "nightly_coverage", coverage(off))] // Contains a branch we can't hit w/o OOM
116    pub fn register_dispatcher<S>(
117        &self,
118        dispatcher: Dispatcher<'l, S, Data>,
119    ) -> crate::Result<RegistrationToken>
120    where
121        S: EventSource + 'l,
122    {
123        let mut sources = self.inner.sources.borrow_mut();
124        let mut poll = self.inner.poll.borrow_mut();
125
126        // Find an empty slot if any
127        let slot = sources.vacant_entry();
128
129        slot.source = Some(dispatcher.clone_as_event_dispatcher());
130        trace!(source = slot.token.get_id(), "Inserting new source");
131        let ret = slot.source.as_ref().unwrap().register(
132            &mut poll,
133            &mut self
134                .inner
135                .sources_with_additional_lifecycle_events
136                .borrow_mut(),
137            &mut TokenFactory::new(slot.token),
138        );
139
140        if let Err(error) = ret {
141            slot.source = None;
142            return Err(error);
143        }
144
145        Ok(RegistrationToken { inner: slot.token })
146    }
147
148    /// Inserts an idle callback.
149    ///
150    /// This callback will be called during a dispatching cycle when the event loop has
151    /// finished processing all pending events from the sources and becomes idle.
152    pub fn insert_idle<'i, F: FnOnce(&mut Data) + 'l + 'i>(&self, callback: F) -> Idle<'i> {
153        let mut opt_cb = Some(callback);
154        let callback = Rc::new(RefCell::new(Some(move |data: &mut Data| {
155            if let Some(cb) = opt_cb.take() {
156                cb(data);
157            }
158        })));
159        self.inner.idles.borrow_mut().push(callback.clone());
160        Idle { callback }
161    }
162
163    /// Enables this previously disabled event source.
164    ///
165    /// This previously disabled source will start generating events again.
166    ///
167    /// **Note:** this cannot be done from within the source callback.
168    pub fn enable(&self, token: &RegistrationToken) -> crate::Result<()> {
169        if let &SourceEntry {
170            token: entry_token,
171            source: Some(ref source),
172        } = self.inner.sources.borrow().get(token.inner)?
173        {
174            trace!(source = entry_token.get_id(), "Registering source");
175            source.register(
176                &mut self.inner.poll.borrow_mut(),
177                &mut self
178                    .inner
179                    .sources_with_additional_lifecycle_events
180                    .borrow_mut(),
181                &mut TokenFactory::new(entry_token),
182            )
183        } else {
184            Err(crate::Error::InvalidToken)
185        }
186    }
187
188    /// Makes this source update its registration.
189    ///
190    /// If after accessing the source you changed its parameters in a way that requires
191    /// updating its registration.
192    pub fn update(&self, token: &RegistrationToken) -> crate::Result<()> {
193        if let &SourceEntry {
194            token: entry_token,
195            source: Some(ref source),
196        } = self.inner.sources.borrow().get(token.inner)?
197        {
198            trace!(
199                source = entry_token.get_id(),
200                "Updating registration of source"
201            );
202            if !source.reregister(
203                &mut self.inner.poll.borrow_mut(),
204                &mut self
205                    .inner
206                    .sources_with_additional_lifecycle_events
207                    .borrow_mut(),
208                &mut TokenFactory::new(entry_token),
209            )? {
210                trace!(
211                    source = entry_token.get_id(),
212                    "Can't update registration withing a callback, storing for later."
213                );
214                // we are in a callback, store for later processing
215                self.inner.pending_action.set(PostAction::Reregister);
216            }
217            Ok(())
218        } else {
219            Err(crate::Error::InvalidToken)
220        }
221    }
222
223    /// Disables this event source.
224    ///
225    /// The source remains in the event loop, but it'll no longer generate events
226    pub fn disable(&self, token: &RegistrationToken) -> crate::Result<()> {
227        if let &SourceEntry {
228            token: entry_token,
229            source: Some(ref source),
230        } = self.inner.sources.borrow().get(token.inner)?
231        {
232            if !token.inner.same_source_as(entry_token) {
233                // The token provided by the user is no longer valid
234                return Err(crate::Error::InvalidToken);
235            }
236            trace!(source = entry_token.get_id(), "Unregistering source");
237            if !source.unregister(
238                &mut self.inner.poll.borrow_mut(),
239                &mut self
240                    .inner
241                    .sources_with_additional_lifecycle_events
242                    .borrow_mut(),
243                *token,
244            )? {
245                trace!(
246                    source = entry_token.get_id(),
247                    "Cannot unregister source in callback, storing for later."
248                );
249                // we are in a callback, store for later processing
250                self.inner.pending_action.set(PostAction::Disable);
251            }
252            Ok(())
253        } else {
254            Err(crate::Error::InvalidToken)
255        }
256    }
257
258    /// Removes this source from the event loop.
259    pub fn remove(&self, token: RegistrationToken) {
260        if let Ok(&mut SourceEntry {
261            token: entry_token,
262            ref mut source,
263        }) = self.inner.sources.borrow_mut().get_mut(token.inner)
264        {
265            if let Some(source) = source.take() {
266                trace!(source = entry_token.get_id(), "Removing source");
267                if let Err(e) = source.unregister(
268                    &mut self.inner.poll.borrow_mut(),
269                    &mut self
270                        .inner
271                        .sources_with_additional_lifecycle_events
272                        .borrow_mut(),
273                    token,
274                ) {
275                    warn!("Failed to unregister source from the polling system: {e:?}");
276                }
277            }
278        }
279    }
280
281    /// Wrap an IO object into an async adapter
282    ///
283    /// This adapter turns the IO object into an async-aware one that can be used in futures.
284    /// The readiness of these futures will be driven by the event loop.
285    ///
286    /// The produced futures can be polled in any executor, and notably the one provided by
287    /// calloop.
288    pub fn adapt_io<F: AsFd>(&self, fd: F) -> crate::Result<crate::io::Async<'l, F>> {
289        crate::io::Async::new(self.inner.clone(), fd)
290    }
291}
292
293/// An event loop
294///
295/// This loop can host several event sources, that can be dynamically added or removed.
296pub struct EventLoop<'l, Data> {
297    #[allow(dead_code)]
298    poller: Arc<Poller>,
299    handle: LoopHandle<'l, Data>,
300    signals: Arc<Signals>,
301    // A caching vector for synthetic poll events
302    synthetic_events: Vec<PollEvent>,
303}
304
305impl<'l, Data> std::fmt::Debug for EventLoop<'l, Data> {
306    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
307    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
308        f.write_str("EventLoop { ... }")
309    }
310}
311
312/// Signals related to the event loop.
313struct Signals {
314    /// Signal to stop the event loop.
315    stop: AtomicBool,
316
317    /// Signal that the future is ready.
318    #[cfg(feature = "block_on")]
319    future_ready: AtomicBool,
320}
321
322impl<'l, Data> EventLoop<'l, Data> {
323    /// Create a new event loop
324    ///
325    /// Fails if the initialization of the polling system failed.
326    pub fn try_new() -> crate::Result<Self> {
327        let poll = Poll::new()?;
328        let poller = poll.poller.clone();
329        let handle = LoopHandle {
330            inner: Rc::new(LoopInner {
331                poll: RefCell::new(poll),
332                sources: RefCell::new(SourceList::new()),
333                idles: RefCell::new(Vec::new()),
334                pending_action: Cell::new(PostAction::Continue),
335                sources_with_additional_lifecycle_events: Default::default(),
336            }),
337        };
338
339        Ok(EventLoop {
340            handle,
341            signals: Arc::new(Signals {
342                stop: AtomicBool::new(false),
343                #[cfg(feature = "block_on")]
344                future_ready: AtomicBool::new(false),
345            }),
346            poller,
347            synthetic_events: vec![],
348        })
349    }
350
351    /// Retrieve a loop handle
352    pub fn handle(&self) -> LoopHandle<'l, Data> {
353        self.handle.clone()
354    }
355
356    fn dispatch_events(
357        &mut self,
358        mut timeout: Option<Duration>,
359        data: &mut Data,
360    ) -> crate::Result<()> {
361        let now = Instant::now();
362        {
363            let mut extra_lifecycle_sources = self
364                .handle
365                .inner
366                .sources_with_additional_lifecycle_events
367                .borrow_mut();
368            let sources = &self.handle.inner.sources.borrow();
369            for source in &mut *extra_lifecycle_sources.values {
370                if let Ok(SourceEntry {
371                    source: Some(disp), ..
372                }) = sources.get(source.inner)
373                {
374                    if let Some((readiness, token)) = disp.before_sleep()? {
375                        // Wake up instantly after polling if we recieved an event
376                        timeout = Some(Duration::ZERO);
377                        self.synthetic_events.push(PollEvent { readiness, token });
378                    }
379                } else {
380                    unreachable!()
381                }
382            }
383        }
384        let events = {
385            let poll = self.handle.inner.poll.borrow();
386            loop {
387                let result = poll.poll(timeout);
388
389                match result {
390                    Ok(events) => break events,
391                    Err(crate::Error::IoError(err)) if err.kind() == io::ErrorKind::Interrupted => {
392                        // Interrupted by a signal. Update timeout and retry.
393                        if let Some(to) = timeout {
394                            let elapsed = now.elapsed();
395                            if elapsed >= to {
396                                return Ok(());
397                            } else {
398                                timeout = Some(to - elapsed);
399                            }
400                        }
401                    }
402                    Err(err) => return Err(err),
403                };
404            }
405        };
406        {
407            let mut extra_lifecycle_sources = self
408                .handle
409                .inner
410                .sources_with_additional_lifecycle_events
411                .borrow_mut();
412            if !extra_lifecycle_sources.values.is_empty() {
413                for source in &mut *extra_lifecycle_sources.values {
414                    if let Ok(SourceEntry {
415                        source: Some(disp), ..
416                    }) = self.handle.inner.sources.borrow().get(source.inner)
417                    {
418                        let iter = EventIterator {
419                            inner: events.iter(),
420                            registration_token: *source,
421                        };
422                        disp.before_handle_events(iter);
423                    } else {
424                        unreachable!()
425                    }
426                }
427            }
428        }
429
430        for event in self.synthetic_events.drain(..).chain(events) {
431            // Get the registration token associated with the event.
432            let reg_token = event.token.inner.forget_sub_id();
433
434            let opt_disp = self
435                .handle
436                .inner
437                .sources
438                .borrow()
439                .get(reg_token)
440                .ok()
441                .and_then(|entry| entry.source.clone());
442
443            if let Some(disp) = opt_disp {
444                trace!(source = reg_token.get_id(), "Dispatching events for source");
445                let mut ret = disp.process_events(event.readiness, event.token, data)?;
446
447                // if the returned PostAction is Continue, it may be overwritten by an user-specified pending action
448                let pending_action = self
449                    .handle
450                    .inner
451                    .pending_action
452                    .replace(PostAction::Continue);
453                if let PostAction::Continue = ret {
454                    ret = pending_action;
455                }
456
457                match ret {
458                    PostAction::Reregister => {
459                        trace!(
460                            source = reg_token.get_id(),
461                            "Postaction reregister for source"
462                        );
463                        disp.reregister(
464                            &mut self.handle.inner.poll.borrow_mut(),
465                            &mut self
466                                .handle
467                                .inner
468                                .sources_with_additional_lifecycle_events
469                                .borrow_mut(),
470                            &mut TokenFactory::new(reg_token),
471                        )?;
472                    }
473                    PostAction::Disable => {
474                        trace!(
475                            source = reg_token.get_id(),
476                            "Postaction unregister for source"
477                        );
478                        disp.unregister(
479                            &mut self.handle.inner.poll.borrow_mut(),
480                            &mut self
481                                .handle
482                                .inner
483                                .sources_with_additional_lifecycle_events
484                                .borrow_mut(),
485                            RegistrationToken::new(reg_token),
486                        )?;
487                    }
488                    PostAction::Remove => {
489                        trace!(source = reg_token.get_id(), "Postaction remove for source");
490                        if let Ok(entry) = self.handle.inner.sources.borrow_mut().get_mut(reg_token)
491                        {
492                            entry.source = None;
493                        }
494                    }
495                    PostAction::Continue => {}
496                }
497
498                if self
499                    .handle
500                    .inner
501                    .sources
502                    .borrow()
503                    .get(reg_token)
504                    .ok()
505                    .map(|entry| entry.source.is_none())
506                    .unwrap_or(true)
507                {
508                    // the source has been removed from within its callback, unregister it
509                    let mut poll = self.handle.inner.poll.borrow_mut();
510                    if let Err(e) = disp.unregister(
511                        &mut poll,
512                        &mut self
513                            .handle
514                            .inner
515                            .sources_with_additional_lifecycle_events
516                            .borrow_mut(),
517                        RegistrationToken::new(reg_token),
518                    ) {
519                        warn!("Failed to unregister source from the polling system: {e:?}",);
520                    }
521                }
522            } else {
523                warn!(?reg_token, "Received an event for non-existence source");
524            }
525        }
526
527        Ok(())
528    }
529
530    fn dispatch_idles(&mut self, data: &mut Data) {
531        let idles = std::mem::take(&mut *self.handle.inner.idles.borrow_mut());
532        for idle in idles {
533            idle.borrow_mut().dispatch(data);
534        }
535    }
536
537    /// Dispatch pending events to their callbacks
538    ///
539    /// If some sources have events available, their callbacks will be immediatly called.
540    /// Otherwise this will wait until an event is receive or the provided `timeout`
541    /// is reached. If `timeout` is `None`, it will wait without a duration limit.
542    ///
543    /// Once pending events have been processed or the timeout is reached, all pending
544    /// idle callbacks will be fired before this method returns.
545    pub fn dispatch<D: Into<Option<Duration>>>(
546        &mut self,
547        timeout: D,
548        data: &mut Data,
549    ) -> crate::Result<()> {
550        self.dispatch_events(timeout.into(), data)?;
551        self.dispatch_idles(data);
552
553        Ok(())
554    }
555
556    /// Get a signal to stop this event loop from running
557    ///
558    /// To be used in conjunction with the `run()` method.
559    pub fn get_signal(&self) -> LoopSignal {
560        LoopSignal {
561            signal: self.signals.clone(),
562            notifier: self.handle.inner.poll.borrow().notifier(),
563        }
564    }
565
566    /// Run this event loop
567    ///
568    /// This will repeatedly try to dispatch events (see the `dispatch()` method) on
569    /// this event loop, waiting at most `timeout` every time.
570    ///
571    /// Between each dispatch wait, your provided callback will be called.
572    ///
573    /// You can use the `get_signal()` method to retrieve a way to stop or wakeup
574    /// the event loop from anywhere.
575    pub fn run<F, D: Into<Option<Duration>>>(
576        &mut self,
577        timeout: D,
578        data: &mut Data,
579        mut cb: F,
580    ) -> crate::Result<()>
581    where
582        F: FnMut(&mut Data),
583    {
584        let timeout = timeout.into();
585        self.signals.stop.store(false, Ordering::Release);
586        while !self.signals.stop.load(Ordering::Acquire) {
587            self.dispatch(timeout, data)?;
588            cb(data);
589        }
590        Ok(())
591    }
592
593    /// Block a future on this event loop.
594    ///
595    /// This will run the provided future on this event loop, blocking until it is
596    /// resolved.
597    ///
598    /// If [`LoopSignal::stop()`] is called before the future is resolved, this function returns
599    /// `None`.
600    #[cfg(feature = "block_on")]
601    pub fn block_on<R>(
602        &mut self,
603        future: impl Future<Output = R>,
604        data: &mut Data,
605        mut cb: impl FnMut(&mut Data),
606    ) -> crate::Result<Option<R>> {
607        use std::task::{Context, Poll, Wake, Waker};
608
609        /// A waker that will wake up the event loop when it is ready to make progress.
610        struct EventLoopWaker(LoopSignal);
611
612        impl Wake for EventLoopWaker {
613            fn wake(self: Arc<Self>) {
614                // Set the waker.
615                self.0.signal.future_ready.store(true, Ordering::Release);
616                self.0.notifier.notify().ok();
617            }
618
619            fn wake_by_ref(self: &Arc<Self>) {
620                // Set the waker.
621                self.0.signal.future_ready.store(true, Ordering::Release);
622                self.0.notifier.notify().ok();
623            }
624        }
625
626        // Pin the future to the stack.
627        pin_utils::pin_mut!(future);
628
629        // Create a waker that will wake up the event loop when it is ready to make progress.
630        let waker = {
631            let handle = EventLoopWaker(self.get_signal());
632
633            Waker::from(Arc::new(handle))
634        };
635        let mut context = Context::from_waker(&waker);
636
637        // Begin running the loop.
638        let mut output = None;
639
640        self.signals.stop.store(false, Ordering::Release);
641        self.signals.future_ready.store(true, Ordering::Release);
642
643        while !self.signals.stop.load(Ordering::Acquire) {
644            // If the future is ready to be polled, poll it.
645            if self.signals.future_ready.swap(false, Ordering::AcqRel) {
646                // Poll the future and break the loop if it's ready.
647                if let Poll::Ready(result) = future.as_mut().poll(&mut context) {
648                    output = Some(result);
649                    break;
650                }
651            }
652
653            // Otherwise, block on the event loop.
654            self.dispatch_events(None, data)?;
655            self.dispatch_idles(data);
656            cb(data);
657        }
658
659        Ok(output)
660    }
661}
662
663#[cfg(unix)]
664impl<'l, Data> AsRawFd for EventLoop<'l, Data> {
665    /// Get the underlying raw-fd of the poller.
666    ///
667    /// This could be used to create [`Generic`] source out of the current loop
668    /// and inserting into some other [`EventLoop`]. It's recommended to clone `fd`
669    /// before doing so.
670    ///
671    /// [`Generic`]: crate::generic::Generic
672    fn as_raw_fd(&self) -> RawFd {
673        self.poller.as_raw_fd()
674    }
675}
676
677#[cfg(unix)]
678impl<'l, Data> AsFd for EventLoop<'l, Data> {
679    /// Get the underlying fd of the poller.
680    ///
681    /// This could be used to create [`Generic`] source out of the current loop
682    /// and inserting into some other [`EventLoop`].
683    ///
684    /// [`Generic`]: crate::generic::Generic
685    fn as_fd(&self) -> BorrowedFd<'_> {
686        self.poller.as_fd()
687    }
688}
689
690#[cfg(windows)]
691impl<Data> AsRawHandle for EventLoop<'_, Data> {
692    fn as_raw_handle(&self) -> RawHandle {
693        self.poller.as_raw_handle()
694    }
695}
696
697#[cfg(windows)]
698impl<Data> AsHandle for EventLoop<'_, Data> {
699    fn as_handle(&self) -> BorrowedHandle<'_> {
700        self.poller.as_handle()
701    }
702}
703
704#[derive(Clone, Debug)]
705/// The EventIterator is an `Iterator` over the events relevant to a particular source
706/// This type is used in the [`EventSource::before_handle_events`] methods for
707/// two main reasons:
708///
709/// - To avoid dynamic dispatch overhead
710/// - Secondly, it is to allow this type to be `Clone`, which is not
711///   possible with dynamic dispatch
712pub struct EventIterator<'a> {
713    inner: slice::Iter<'a, PollEvent>,
714    registration_token: RegistrationToken,
715}
716
717impl<'a> Iterator for EventIterator<'a> {
718    type Item = (Readiness, Token);
719
720    fn next(&mut self) -> Option<Self::Item> {
721        for next in self.inner.by_ref() {
722            if next
723                .token
724                .inner
725                .same_source_as(self.registration_token.inner)
726            {
727                return Some((next.readiness, next.token));
728            }
729        }
730        None
731    }
732}
733
734/// A signal that can be shared between thread to stop or wakeup a running
735/// event loop
736#[derive(Clone)]
737pub struct LoopSignal {
738    signal: Arc<Signals>,
739    notifier: Notifier,
740}
741
742impl std::fmt::Debug for LoopSignal {
743    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
744    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
745        f.write_str("LoopSignal { ... }")
746    }
747}
748
749impl LoopSignal {
750    /// Stop the event loop
751    ///
752    /// Once this method is called, the next time the event loop has finished
753    /// waiting for events, it will return rather than starting to wait again.
754    ///
755    /// This is only useful if you are using the `EventLoop::run()` method.
756    pub fn stop(&self) {
757        self.signal.stop.store(true, Ordering::Release);
758    }
759
760    /// Wake up the event loop
761    ///
762    /// This sends a dummy event to the event loop to simulate the reception
763    /// of an event, making the wait return early. Called after `stop()`, this
764    /// ensures the event loop will terminate quickly if you specified a long
765    /// timeout (or no timeout at all) to the `dispatch` or `run` method.
766    pub fn wakeup(&self) {
767        self.notifier.notify().ok();
768    }
769}
770
771#[cfg(test)]
772mod tests {
773    use std::{cell::Cell, rc::Rc, time::Duration};
774
775    use crate::{
776        channel::{channel, Channel},
777        ping::*,
778        EventIterator, EventSource, Poll, PostAction, Readiness, RegistrationToken, Token,
779        TokenFactory,
780    };
781
782    #[cfg(unix)]
783    use crate::{generic::Generic, Dispatcher, Interest, Mode};
784
785    use super::EventLoop;
786
787    #[test]
788    fn dispatch_idle() {
789        let mut event_loop = EventLoop::try_new().unwrap();
790
791        let mut dispatched = false;
792
793        event_loop.handle().insert_idle(|d| {
794            *d = true;
795        });
796
797        event_loop
798            .dispatch(Some(Duration::ZERO), &mut dispatched)
799            .unwrap();
800
801        assert!(dispatched);
802    }
803
804    #[test]
805    fn cancel_idle() {
806        let mut event_loop = EventLoop::try_new().unwrap();
807
808        let mut dispatched = false;
809
810        let handle = event_loop.handle();
811        let idle = handle.insert_idle(move |d| {
812            *d = true;
813        });
814
815        idle.cancel();
816
817        event_loop
818            .dispatch(Duration::ZERO, &mut dispatched)
819            .unwrap();
820
821        assert!(!dispatched);
822    }
823
824    #[test]
825    fn wakeup() {
826        let mut event_loop = EventLoop::try_new().unwrap();
827
828        let signal = event_loop.get_signal();
829
830        ::std::thread::spawn(move || {
831            ::std::thread::sleep(Duration::from_millis(500));
832            signal.wakeup();
833        });
834
835        // the test should return
836        event_loop.dispatch(None, &mut ()).unwrap();
837    }
838
839    #[test]
840    fn wakeup_stop() {
841        let mut event_loop = EventLoop::try_new().unwrap();
842
843        let signal = event_loop.get_signal();
844
845        ::std::thread::spawn(move || {
846            ::std::thread::sleep(Duration::from_millis(500));
847            signal.stop();
848            signal.wakeup();
849        });
850
851        // the test should return
852        event_loop.run(None, &mut (), |_| {}).unwrap();
853    }
854
855    #[test]
856    fn additional_events() {
857        let mut event_loop: EventLoop<'_, Lock> = EventLoop::try_new().unwrap();
858        let mut lock = Lock {
859            lock: Rc::new((
860                // Whether the lock is locked
861                Cell::new(false),
862                // The total number of events processed in process_events
863                Cell::new(0),
864                // The total number of events processed in before_handle_events
865                // This is used to ensure that the count seen in before_handle_events is expected
866                Cell::new(0),
867            )),
868        };
869        let (sender, channel) = channel();
870        let token = event_loop
871            .handle()
872            .insert_source(
873                LockingSource {
874                    channel,
875                    lock: lock.clone(),
876                },
877                |_, _, lock| {
878                    lock.lock();
879                    lock.unlock();
880                },
881            )
882            .unwrap();
883        sender.send(()).unwrap();
884
885        event_loop.dispatch(None, &mut lock).unwrap();
886        // We should have been locked twice so far
887        assert_eq!(lock.lock.1.get(), 2);
888        // And we should have received one event
889        assert_eq!(lock.lock.2.get(), 1);
890        event_loop.handle().disable(&token).unwrap();
891        event_loop
892            .dispatch(Some(Duration::ZERO), &mut lock)
893            .unwrap();
894        assert_eq!(lock.lock.1.get(), 2);
895
896        event_loop.handle().enable(&token).unwrap();
897        event_loop
898            .dispatch(Some(Duration::ZERO), &mut lock)
899            .unwrap();
900        assert_eq!(lock.lock.1.get(), 3);
901        event_loop.handle().remove(token);
902        event_loop
903            .dispatch(Some(Duration::ZERO), &mut lock)
904            .unwrap();
905        assert_eq!(lock.lock.1.get(), 3);
906        assert_eq!(lock.lock.2.get(), 1);
907
908        #[derive(Clone)]
909        struct Lock {
910            lock: Rc<(Cell<bool>, Cell<u32>, Cell<u32>)>,
911        }
912        impl Lock {
913            fn lock(&self) {
914                if self.lock.0.get() {
915                    panic!();
916                }
917                // Increase the count
918                self.lock.1.set(self.lock.1.get() + 1);
919                self.lock.0.set(true)
920            }
921            fn unlock(&self) {
922                if !self.lock.0.get() {
923                    panic!();
924                }
925                self.lock.0.set(false);
926            }
927        }
928        struct LockingSource {
929            channel: Channel<()>,
930            lock: Lock,
931        }
932        impl EventSource for LockingSource {
933            type Event = <Channel<()> as EventSource>::Event;
934
935            type Metadata = <Channel<()> as EventSource>::Metadata;
936
937            type Ret = <Channel<()> as EventSource>::Ret;
938
939            type Error = <Channel<()> as EventSource>::Error;
940
941            fn process_events<F>(
942                &mut self,
943                readiness: Readiness,
944                token: Token,
945                callback: F,
946            ) -> Result<PostAction, Self::Error>
947            where
948                F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
949            {
950                self.channel.process_events(readiness, token, callback)
951            }
952
953            fn register(
954                &mut self,
955                poll: &mut Poll,
956                token_factory: &mut TokenFactory,
957            ) -> crate::Result<()> {
958                self.channel.register(poll, token_factory)
959            }
960
961            fn reregister(
962                &mut self,
963                poll: &mut Poll,
964                token_factory: &mut TokenFactory,
965            ) -> crate::Result<()> {
966                self.channel.reregister(poll, token_factory)
967            }
968
969            fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
970                self.channel.unregister(poll)
971            }
972
973            const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = true;
974
975            fn before_sleep(&mut self) -> crate::Result<Option<(Readiness, Token)>> {
976                self.lock.lock();
977                Ok(None)
978            }
979
980            fn before_handle_events(&mut self, events: EventIterator) {
981                let events_count = events.count();
982                let lock = &self.lock.lock;
983                lock.2.set(lock.2.get() + events_count as u32);
984                self.lock.unlock();
985            }
986        }
987    }
988    #[test]
989    fn default_additional_events() {
990        let (sender, channel) = channel();
991        let mut test_source = NoopWithDefaultHandlers { channel };
992        let mut event_loop = EventLoop::try_new().unwrap();
993        event_loop
994            .handle()
995            .insert_source(Box::new(&mut test_source), |_, _, _| {})
996            .unwrap();
997        sender.send(()).unwrap();
998
999        event_loop.dispatch(None, &mut ()).unwrap();
1000        struct NoopWithDefaultHandlers {
1001            channel: Channel<()>,
1002        }
1003        impl EventSource for NoopWithDefaultHandlers {
1004            type Event = <Channel<()> as EventSource>::Event;
1005
1006            type Metadata = <Channel<()> as EventSource>::Metadata;
1007
1008            type Ret = <Channel<()> as EventSource>::Ret;
1009
1010            type Error = <Channel<()> as EventSource>::Error;
1011
1012            fn process_events<F>(
1013                &mut self,
1014                readiness: Readiness,
1015                token: Token,
1016                callback: F,
1017            ) -> Result<PostAction, Self::Error>
1018            where
1019                F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
1020            {
1021                self.channel.process_events(readiness, token, callback)
1022            }
1023
1024            fn register(
1025                &mut self,
1026                poll: &mut Poll,
1027                token_factory: &mut TokenFactory,
1028            ) -> crate::Result<()> {
1029                self.channel.register(poll, token_factory)
1030            }
1031
1032            fn reregister(
1033                &mut self,
1034                poll: &mut Poll,
1035                token_factory: &mut TokenFactory,
1036            ) -> crate::Result<()> {
1037                self.channel.reregister(poll, token_factory)
1038            }
1039
1040            fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
1041                self.channel.unregister(poll)
1042            }
1043
1044            const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = true;
1045        }
1046    }
1047
1048    #[test]
1049    fn additional_events_synthetic() {
1050        let mut event_loop: EventLoop<'_, Lock> = EventLoop::try_new().unwrap();
1051        let mut lock = Lock {
1052            lock: Rc::new(Cell::new(false)),
1053        };
1054        event_loop
1055            .handle()
1056            .insert_source(
1057                InstantWakeupLockingSource {
1058                    lock: lock.clone(),
1059                    token: None,
1060                },
1061                |_, _, lock| {
1062                    lock.lock();
1063                    lock.unlock();
1064                },
1065            )
1066            .unwrap();
1067
1068        // Loop should finish, as
1069        event_loop.dispatch(None, &mut lock).unwrap();
1070        #[derive(Clone)]
1071        struct Lock {
1072            lock: Rc<Cell<bool>>,
1073        }
1074        impl Lock {
1075            fn lock(&self) {
1076                if self.lock.get() {
1077                    panic!();
1078                }
1079                self.lock.set(true)
1080            }
1081            fn unlock(&self) {
1082                if !self.lock.get() {
1083                    panic!();
1084                }
1085                self.lock.set(false);
1086            }
1087        }
1088        struct InstantWakeupLockingSource {
1089            lock: Lock,
1090            token: Option<Token>,
1091        }
1092        impl EventSource for InstantWakeupLockingSource {
1093            type Event = ();
1094
1095            type Metadata = ();
1096
1097            type Ret = ();
1098
1099            type Error = <Channel<()> as EventSource>::Error;
1100
1101            fn process_events<F>(
1102                &mut self,
1103                _: Readiness,
1104                token: Token,
1105                mut callback: F,
1106            ) -> Result<PostAction, Self::Error>
1107            where
1108                F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
1109            {
1110                assert_eq!(token, self.token.unwrap());
1111                callback((), &mut ());
1112                Ok(PostAction::Continue)
1113            }
1114
1115            fn register(
1116                &mut self,
1117                _: &mut Poll,
1118                token_factory: &mut TokenFactory,
1119            ) -> crate::Result<()> {
1120                self.token = Some(token_factory.token());
1121                Ok(())
1122            }
1123
1124            fn reregister(&mut self, _: &mut Poll, _: &mut TokenFactory) -> crate::Result<()> {
1125                unreachable!()
1126            }
1127
1128            fn unregister(&mut self, _: &mut Poll) -> crate::Result<()> {
1129                unreachable!()
1130            }
1131
1132            const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = true;
1133
1134            fn before_sleep(&mut self) -> crate::Result<Option<(Readiness, Token)>> {
1135                self.lock.lock();
1136                Ok(Some((Readiness::EMPTY, self.token.unwrap())))
1137            }
1138
1139            fn before_handle_events(&mut self, _: EventIterator) {
1140                self.lock.unlock();
1141            }
1142        }
1143    }
1144
1145    #[cfg(unix)]
1146    #[test]
1147    fn insert_bad_source() {
1148        use std::mem::ManuallyDrop;
1149        use std::os::unix::io::{AsFd, FromRawFd, OwnedFd};
1150
1151        struct LeakedFd(ManuallyDrop<OwnedFd>);
1152
1153        impl AsFd for LeakedFd {
1154            fn as_fd(&self) -> std::os::unix::prelude::BorrowedFd<'_> {
1155                self.0.as_fd()
1156            }
1157        }
1158
1159        let event_loop = EventLoop::<()>::try_new().unwrap();
1160        let fd = LeakedFd(ManuallyDrop::new(unsafe {
1161            std::os::unix::io::OwnedFd::from_raw_fd(420)
1162        }));
1163        let ret = event_loop.handle().insert_source(
1164            crate::sources::generic::Generic::new(fd, Interest::READ, Mode::Level),
1165            |_, _, _| Ok(PostAction::Continue),
1166        );
1167        assert!(ret.is_err());
1168    }
1169
1170    #[test]
1171    fn invalid_token() {
1172        let (_ping, source) = crate::sources::ping::make_ping().unwrap();
1173
1174        let event_loop = EventLoop::<()>::try_new().unwrap();
1175        let handle = event_loop.handle();
1176        let reg_token = handle.insert_source(source, |_, _, _| {}).unwrap();
1177        handle.remove(reg_token);
1178
1179        let ret = handle.enable(&reg_token);
1180        assert!(ret.is_err());
1181    }
1182
1183    #[cfg(unix)]
1184    #[test]
1185    fn insert_source_no_interest() {
1186        use rustix::pipe::pipe;
1187
1188        // Create a pipe to get an arbitrary fd.
1189        let (read, _write) = pipe().unwrap();
1190
1191        let source = crate::sources::generic::Generic::new(read, Interest::EMPTY, Mode::Level);
1192        let dispatcher = Dispatcher::new(source, |_, _, _| Ok(PostAction::Continue));
1193
1194        let event_loop = EventLoop::<()>::try_new().unwrap();
1195        let handle = event_loop.handle();
1196        let ret = handle.register_dispatcher(dispatcher.clone());
1197
1198        if let Ok(token) = ret {
1199            // Unwrap the dispatcher+source and close the read end.
1200            handle.remove(token);
1201        } else {
1202            // Fail the test.
1203            panic!();
1204        }
1205    }
1206
1207    #[test]
1208    fn disarm_rearm() {
1209        let mut event_loop = EventLoop::<bool>::try_new().unwrap();
1210        let (ping, ping_source) = make_ping().unwrap();
1211
1212        let ping_token = event_loop
1213            .handle()
1214            .insert_source(ping_source, |(), &mut (), dispatched| {
1215                *dispatched = true;
1216            })
1217            .unwrap();
1218
1219        ping.ping();
1220        let mut dispatched = false;
1221        event_loop
1222            .dispatch(Duration::ZERO, &mut dispatched)
1223            .unwrap();
1224        assert!(dispatched);
1225
1226        // disable the source
1227        ping.ping();
1228        event_loop.handle().disable(&ping_token).unwrap();
1229        let mut dispatched = false;
1230        event_loop
1231            .dispatch(Duration::ZERO, &mut dispatched)
1232            .unwrap();
1233        assert!(!dispatched);
1234
1235        // reenable it, the previous ping now gets dispatched
1236        event_loop.handle().enable(&ping_token).unwrap();
1237        let mut dispatched = false;
1238        event_loop
1239            .dispatch(Duration::ZERO, &mut dispatched)
1240            .unwrap();
1241        assert!(dispatched);
1242    }
1243
1244    #[test]
1245    fn multiple_tokens() {
1246        struct DoubleSource {
1247            ping1: PingSource,
1248            ping2: PingSource,
1249        }
1250
1251        impl crate::EventSource for DoubleSource {
1252            type Event = u32;
1253            type Metadata = ();
1254            type Ret = ();
1255            type Error = PingError;
1256
1257            fn process_events<F>(
1258                &mut self,
1259                readiness: Readiness,
1260                token: Token,
1261                mut callback: F,
1262            ) -> Result<PostAction, Self::Error>
1263            where
1264                F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
1265            {
1266                self.ping1
1267                    .process_events(readiness, token, |(), &mut ()| callback(1, &mut ()))?;
1268                self.ping2
1269                    .process_events(readiness, token, |(), &mut ()| callback(2, &mut ()))?;
1270                Ok(PostAction::Continue)
1271            }
1272
1273            fn register(
1274                &mut self,
1275                poll: &mut Poll,
1276                token_factory: &mut TokenFactory,
1277            ) -> crate::Result<()> {
1278                self.ping1.register(poll, token_factory)?;
1279                self.ping2.register(poll, token_factory)?;
1280                Ok(())
1281            }
1282
1283            fn reregister(
1284                &mut self,
1285                poll: &mut Poll,
1286                token_factory: &mut TokenFactory,
1287            ) -> crate::Result<()> {
1288                self.ping1.reregister(poll, token_factory)?;
1289                self.ping2.reregister(poll, token_factory)?;
1290                Ok(())
1291            }
1292
1293            fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
1294                self.ping1.unregister(poll)?;
1295                self.ping2.unregister(poll)?;
1296                Ok(())
1297            }
1298        }
1299
1300        let mut event_loop = EventLoop::<u32>::try_new().unwrap();
1301
1302        let (ping1, source1) = make_ping().unwrap();
1303        let (ping2, source2) = make_ping().unwrap();
1304
1305        let source = DoubleSource {
1306            ping1: source1,
1307            ping2: source2,
1308        };
1309
1310        event_loop
1311            .handle()
1312            .insert_source(source, |i, _, d| {
1313                eprintln!("Dispatching {}", i);
1314                *d += i
1315            })
1316            .unwrap();
1317
1318        let mut dispatched = 0;
1319        ping1.ping();
1320        event_loop
1321            .dispatch(Duration::ZERO, &mut dispatched)
1322            .unwrap();
1323        assert_eq!(dispatched, 1);
1324
1325        dispatched = 0;
1326        ping2.ping();
1327        event_loop
1328            .dispatch(Duration::ZERO, &mut dispatched)
1329            .unwrap();
1330        assert_eq!(dispatched, 2);
1331
1332        dispatched = 0;
1333        ping1.ping();
1334        ping2.ping();
1335        event_loop
1336            .dispatch(Duration::ZERO, &mut dispatched)
1337            .unwrap();
1338        assert_eq!(dispatched, 3);
1339    }
1340
1341    #[cfg(unix)]
1342    #[test]
1343    fn change_interests() {
1344        use rustix::io::write;
1345        use rustix::net::{recv, socketpair, AddressFamily, RecvFlags, SocketFlags, SocketType};
1346        let mut event_loop = EventLoop::<bool>::try_new().unwrap();
1347
1348        let (sock1, sock2) = socketpair(
1349            AddressFamily::UNIX,
1350            SocketType::STREAM,
1351            SocketFlags::empty(),
1352            None, // recv with DONTWAIT will suffice for platforms without SockFlag::SOCK_NONBLOCKING such as macOS
1353        )
1354        .unwrap();
1355
1356        let source = Generic::new(sock1, Interest::READ, Mode::Level);
1357        let dispatcher = Dispatcher::new(source, |_, fd, dispatched| {
1358            *dispatched = true;
1359            // read all contents available to drain the socket
1360            let mut buf = [0u8; 32];
1361            loop {
1362                match recv(&*fd, &mut buf, RecvFlags::DONTWAIT) {
1363                    Ok(0) => break, // closed pipe, we are now inert
1364                    Ok(_) => {}
1365                    Err(e) => {
1366                        let e: std::io::Error = e.into();
1367                        if e.kind() == std::io::ErrorKind::WouldBlock {
1368                            break;
1369                        // nothing more to read
1370                        } else {
1371                            // propagate error
1372                            return Err(e);
1373                        }
1374                    }
1375                }
1376            }
1377            Ok(PostAction::Continue)
1378        });
1379
1380        let sock_token_1 = event_loop
1381            .handle()
1382            .register_dispatcher(dispatcher.clone())
1383            .unwrap();
1384
1385        // first dispatch, nothing is readable
1386        let mut dispatched = false;
1387        event_loop
1388            .dispatch(Duration::ZERO, &mut dispatched)
1389            .unwrap();
1390        assert!(!dispatched);
1391
1392        // write something, the socket becomes readable
1393        write(&sock2, &[1, 2, 3]).unwrap();
1394        dispatched = false;
1395        event_loop
1396            .dispatch(Duration::ZERO, &mut dispatched)
1397            .unwrap();
1398        assert!(dispatched);
1399
1400        // All has been read, no longer readable
1401        dispatched = false;
1402        event_loop
1403            .dispatch(Duration::ZERO, &mut dispatched)
1404            .unwrap();
1405        assert!(!dispatched);
1406
1407        // change the interests for writability instead
1408        dispatcher.as_source_mut().interest = Interest::WRITE;
1409        event_loop.handle().update(&sock_token_1).unwrap();
1410
1411        // the socket is writable
1412        dispatched = false;
1413        event_loop
1414            .dispatch(Duration::ZERO, &mut dispatched)
1415            .unwrap();
1416        assert!(dispatched);
1417
1418        // change back to readable
1419        dispatcher.as_source_mut().interest = Interest::READ;
1420        event_loop.handle().update(&sock_token_1).unwrap();
1421
1422        // the socket is not readable
1423        dispatched = false;
1424        event_loop
1425            .dispatch(Duration::ZERO, &mut dispatched)
1426            .unwrap();
1427        assert!(!dispatched);
1428    }
1429
1430    #[test]
1431    fn kill_source() {
1432        let mut event_loop = EventLoop::<Option<RegistrationToken>>::try_new().unwrap();
1433
1434        let handle = event_loop.handle();
1435        let (ping, ping_source) = make_ping().unwrap();
1436        let ping_token = event_loop
1437            .handle()
1438            .insert_source(ping_source, move |(), &mut (), opt_src| {
1439                if let Some(src) = opt_src.take() {
1440                    handle.remove(src);
1441                }
1442            })
1443            .unwrap();
1444
1445        ping.ping();
1446
1447        let mut opt_src = Some(ping_token);
1448
1449        event_loop.dispatch(Duration::ZERO, &mut opt_src).unwrap();
1450
1451        assert!(opt_src.is_none());
1452    }
1453
1454    #[test]
1455    fn non_static_data() {
1456        use std::sync::mpsc;
1457
1458        let (sender, receiver) = mpsc::channel();
1459
1460        {
1461            struct RefSender<'a>(&'a mpsc::Sender<()>);
1462            let mut ref_sender = RefSender(&sender);
1463
1464            let mut event_loop = EventLoop::<RefSender<'_>>::try_new().unwrap();
1465            let (ping, ping_source) = make_ping().unwrap();
1466            let _ping_token = event_loop
1467                .handle()
1468                .insert_source(ping_source, |_, _, ref_sender| {
1469                    ref_sender.0.send(()).unwrap();
1470                })
1471                .unwrap();
1472
1473            ping.ping();
1474
1475            event_loop
1476                .dispatch(Duration::ZERO, &mut ref_sender)
1477                .unwrap();
1478        }
1479
1480        receiver.recv().unwrap();
1481        // sender still usable (e.g. for another EventLoop)
1482        drop(sender);
1483    }
1484
1485    #[cfg(feature = "block_on")]
1486    #[test]
1487    fn block_on_test() {
1488        use crate::sources::timer::TimeoutFuture;
1489        use std::time::Duration;
1490
1491        let mut evl = EventLoop::<()>::try_new().unwrap();
1492
1493        let mut data = 22;
1494        let timeout = {
1495            let data = &mut data;
1496            let evl_handle = evl.handle();
1497
1498            async move {
1499                TimeoutFuture::from_duration(&evl_handle, Duration::from_secs(2)).await;
1500                *data = 32;
1501                11
1502            }
1503        };
1504
1505        let result = evl.block_on(timeout, &mut (), |&mut ()| {}).unwrap();
1506        assert_eq!(result, Some(11));
1507        assert_eq!(data, 32);
1508    }
1509
1510    #[cfg(feature = "block_on")]
1511    #[test]
1512    fn block_on_early_cancel() {
1513        use crate::sources::timer;
1514        use std::time::Duration;
1515
1516        let mut evl = EventLoop::<()>::try_new().unwrap();
1517
1518        let mut data = 22;
1519        let timeout = {
1520            let data = &mut data;
1521            let evl_handle = evl.handle();
1522
1523            async move {
1524                timer::TimeoutFuture::from_duration(&evl_handle, Duration::from_secs(2)).await;
1525                *data = 32;
1526                11
1527            }
1528        };
1529
1530        let timer_source = timer::Timer::from_duration(Duration::from_secs(1));
1531        let handle = evl.get_signal();
1532        let _timer_token = evl
1533            .handle()
1534            .insert_source(timer_source, move |_, _, _| {
1535                handle.stop();
1536                timer::TimeoutAction::Drop
1537            })
1538            .unwrap();
1539
1540        let result = evl.block_on(timeout, &mut (), |&mut ()| {}).unwrap();
1541        assert_eq!(result, None);
1542        assert_eq!(data, 22);
1543    }
1544
1545    #[test]
1546    fn reuse() {
1547        use crate::sources::timer;
1548        use std::sync::{Arc, Mutex};
1549        use std::time::{Duration, Instant};
1550
1551        let mut evl = EventLoop::<RegistrationToken>::try_new().unwrap();
1552        let handle = evl.handle();
1553
1554        let data = Arc::new(Mutex::new(1));
1555        let data_cloned = data.clone();
1556
1557        let timer_source = timer::Timer::from_duration(Duration::from_secs(1));
1558        let mut first_timer_token = evl
1559            .handle()
1560            .insert_source(timer_source, move |_, _, own_token| {
1561                handle.remove(*own_token);
1562                let data_cloned = data_cloned.clone();
1563                let _ = handle.insert_source(timer::Timer::immediate(), move |_, _, _| {
1564                    *data_cloned.lock().unwrap() = 2;
1565                    timer::TimeoutAction::Drop
1566                });
1567                timer::TimeoutAction::Drop
1568            })
1569            .unwrap();
1570
1571        let now = Instant::now();
1572        loop {
1573            evl.dispatch(Some(Duration::from_secs(3)), &mut first_timer_token)
1574                .unwrap();
1575            if Instant::now().duration_since(now) > Duration::from_secs(3) {
1576                break;
1577            }
1578        }
1579
1580        assert_eq!(*data.lock().unwrap(), 2);
1581    }
1582
1583    #[test]
1584    fn drop_of_subsource() {
1585        struct WithSubSource {
1586            token: Option<Token>,
1587        }
1588
1589        impl crate::EventSource for WithSubSource {
1590            type Event = ();
1591            type Metadata = ();
1592            type Ret = ();
1593            type Error = crate::Error;
1594            const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = true;
1595
1596            fn process_events<F>(
1597                &mut self,
1598                _: Readiness,
1599                _: Token,
1600                mut callback: F,
1601            ) -> Result<PostAction, Self::Error>
1602            where
1603                F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
1604            {
1605                callback((), &mut ());
1606                // Drop the source
1607                Ok(PostAction::Remove)
1608            }
1609
1610            fn register(&mut self, _: &mut Poll, fact: &mut TokenFactory) -> crate::Result<()> {
1611                // produce a few tokens to emulate a subsource
1612                fact.token();
1613                fact.token();
1614                self.token = Some(fact.token());
1615                Ok(())
1616            }
1617
1618            fn reregister(&mut self, _: &mut Poll, _: &mut TokenFactory) -> crate::Result<()> {
1619                Ok(())
1620            }
1621
1622            fn unregister(&mut self, _: &mut Poll) -> crate::Result<()> {
1623                Ok(())
1624            }
1625
1626            // emulate a readiness
1627            fn before_sleep(&mut self) -> crate::Result<Option<(Readiness, Token)>> {
1628                Ok(self.token.map(|token| {
1629                    (
1630                        Readiness {
1631                            readable: true,
1632                            writable: false,
1633                            error: false,
1634                        },
1635                        token,
1636                    )
1637                }))
1638            }
1639        }
1640
1641        // Now the actual test
1642        let mut evl = EventLoop::<bool>::try_new().unwrap();
1643        evl.handle()
1644            .insert_source(WithSubSource { token: None }, |_, _, ran| {
1645                *ran = true;
1646            })
1647            .unwrap();
1648
1649        let mut ran = false;
1650
1651        evl.dispatch(Some(Duration::ZERO), &mut ran).unwrap();
1652
1653        assert!(ran);
1654    }
1655
1656    // A dummy EventSource to test insertion and removal of sources
1657    struct DummySource;
1658
1659    impl crate::EventSource for DummySource {
1660        type Event = ();
1661        type Metadata = ();
1662        type Ret = ();
1663        type Error = crate::Error;
1664
1665        fn process_events<F>(
1666            &mut self,
1667            _: Readiness,
1668            _: Token,
1669            mut callback: F,
1670        ) -> Result<PostAction, Self::Error>
1671        where
1672            F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
1673        {
1674            callback((), &mut ());
1675            Ok(PostAction::Continue)
1676        }
1677
1678        fn register(&mut self, _: &mut Poll, _: &mut TokenFactory) -> crate::Result<()> {
1679            Ok(())
1680        }
1681
1682        fn reregister(&mut self, _: &mut Poll, _: &mut TokenFactory) -> crate::Result<()> {
1683            Ok(())
1684        }
1685
1686        fn unregister(&mut self, _: &mut Poll) -> crate::Result<()> {
1687            Ok(())
1688        }
1689    }
1690}