calloop/
loop_logic.rs

1use std::cell::{Cell, RefCell};
2use std::fmt::Debug;
3use std::rc::{Rc, Weak};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7use std::{io, slice};
8
9#[cfg(feature = "block_on")]
10use std::future::Future;
11
12#[cfg(unix)]
13use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
14#[cfg(windows)]
15use std::os::windows::io::{AsHandle, AsRawHandle, AsSocket as AsFd, BorrowedHandle, RawHandle};
16
17use polling::Poller;
18use tracing::{trace, warn};
19
20use crate::list::{SourceEntry, SourceList};
21use crate::sources::{Dispatcher, EventSource, Idle, IdleDispatcher};
22use crate::sys::{Notifier, PollEvent};
23use crate::token::TokenInner;
24use crate::{
25    AdditionalLifecycleEventsSet, InsertError, Poll, PostAction, Readiness, Token, TokenFactory,
26};
27
28type IdleCallback<'i, Data> = Rc<RefCell<dyn IdleDispatcher<Data> + 'i>>;
29
30/// A token representing a registration in the [`EventLoop`].
31///
32/// This token is given to you by the [`EventLoop`] when an [`EventSource`] is inserted or
33/// a [`Dispatcher`] is registered. You can use it to [disable](LoopHandle#method.disable),
34/// [enable](LoopHandle#method.enable), [update`](LoopHandle#method.update),
35/// [remove](LoopHandle#method.remove) or [kill](LoopHandle#method.kill) it.
36#[derive(Clone, Copy, Debug, PartialEq, Eq)]
37pub struct RegistrationToken {
38    inner: TokenInner,
39}
40
41impl RegistrationToken {
42    /// Create the RegistrationToken corresponding to the given raw key
43    /// This is needed because some methods use `RegistrationToken`s as
44    /// raw usizes within this crate
45    pub(crate) fn new(inner: TokenInner) -> Self {
46        Self { inner }
47    }
48}
49
50pub(crate) struct LoopInner<'l, Data> {
51    pub(crate) poll: RefCell<Poll>,
52    // The `Option` is used to keep slots of the slab occupied, to prevent id reuse
53    // while in-flight events might still refer to a recently destroyed event source.
54    pub(crate) sources: RefCell<SourceList<'l, Data>>,
55    pub(crate) sources_with_additional_lifecycle_events: RefCell<AdditionalLifecycleEventsSet>,
56    idles: RefCell<Vec<IdleCallback<'l, Data>>>,
57    pending_action: Cell<PostAction>,
58}
59
60/// A handle to an event loop
61///
62/// This handle allows you to insert new sources and idles in this event loop,
63/// it can be cloned, and it is possible to insert new sources from within a source
64/// callback.
65pub struct LoopHandle<'l, Data> {
66    inner: Rc<LoopInner<'l, Data>>,
67}
68
69/// Weak variant of a [`LoopHandle`]
70pub struct WeakLoopHandle<'l, Data> {
71    inner: Weak<LoopInner<'l, Data>>,
72}
73
74impl<Data> Debug for LoopHandle<'_, Data> {
75    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        f.write_str("LoopHandle { ... }")
78    }
79}
80
81/// Manually implemented `Clone`.
82///
83/// The derived implementation adds a `Clone` bound on the generic parameter `Data`.
84impl<Data> Clone for LoopHandle<'_, Data> {
85    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
86    fn clone(&self) -> Self {
87        LoopHandle {
88            inner: self.inner.clone(),
89        }
90    }
91}
92
93impl<'l, Data> LoopHandle<'l, Data> {
94    /// Inserts a new event source in the loop.
95    ///
96    /// The provided callback will be called during the dispatching cycles whenever the
97    /// associated source generates events, see `EventLoop::dispatch(..)` for details.
98    ///
99    /// This function takes ownership of the event source. Use `register_dispatcher`
100    /// if you need access to the event source after this call.
101    pub fn insert_source<S, F>(
102        &self,
103        source: S,
104        callback: F,
105    ) -> Result<RegistrationToken, InsertError<S>>
106    where
107        S: EventSource + 'l,
108        F: FnMut(S::Event, &mut S::Metadata, &mut Data) -> S::Ret + 'l,
109    {
110        let dispatcher = Dispatcher::new(source, callback);
111        self.register_dispatcher(dispatcher.clone())
112            .map_err(|error| InsertError {
113                error,
114                inserted: dispatcher.into_source_inner(),
115            })
116    }
117
118    /// Registers a `Dispatcher` in the loop.
119    ///
120    /// Use this function if you need access to the event source after its insertion in the loop.
121    ///
122    /// See also `insert_source`.
123    #[cfg_attr(feature = "nightly_coverage", coverage(off))] // Contains a branch we can't hit w/o OOM
124    pub fn register_dispatcher<S>(
125        &self,
126        dispatcher: Dispatcher<'l, S, Data>,
127    ) -> crate::Result<RegistrationToken>
128    where
129        S: EventSource + 'l,
130    {
131        let mut sources = self.inner.sources.borrow_mut();
132        let mut poll = self.inner.poll.borrow_mut();
133
134        // Find an empty slot if any
135        let slot = sources.vacant_entry();
136
137        slot.source = Some(dispatcher.clone_as_event_dispatcher());
138        trace!(source = slot.token.get_id(), "Inserting new source");
139        let ret = slot.source.as_ref().unwrap().register(
140            &mut poll,
141            &mut self
142                .inner
143                .sources_with_additional_lifecycle_events
144                .borrow_mut(),
145            &mut TokenFactory::new(slot.token),
146        );
147
148        if let Err(error) = ret {
149            slot.source = None;
150            return Err(error);
151        }
152
153        Ok(RegistrationToken { inner: slot.token })
154    }
155
156    /// Inserts an idle callback.
157    ///
158    /// This callback will be called during a dispatching cycle when the event loop has
159    /// finished processing all pending events from the sources and becomes idle.
160    pub fn insert_idle<'i, F: FnOnce(&mut Data) + 'l + 'i>(&self, callback: F) -> Idle<'i> {
161        let mut opt_cb = Some(callback);
162        let callback = Rc::new(RefCell::new(Some(move |data: &mut Data| {
163            if let Some(cb) = opt_cb.take() {
164                cb(data);
165            }
166        })));
167        self.inner.idles.borrow_mut().push(callback.clone());
168        Idle { callback }
169    }
170
171    /// Enables this previously disabled event source.
172    ///
173    /// This previously disabled source will start generating events again.
174    ///
175    /// **Note:** this cannot be done from within the source callback.
176    pub fn enable(&self, token: &RegistrationToken) -> crate::Result<()> {
177        if let &SourceEntry {
178            token: entry_token,
179            source: Some(ref source),
180        } = self.inner.sources.borrow().get(token.inner)?
181        {
182            trace!(source = entry_token.get_id(), "Registering source");
183            source.register(
184                &mut self.inner.poll.borrow_mut(),
185                &mut self
186                    .inner
187                    .sources_with_additional_lifecycle_events
188                    .borrow_mut(),
189                &mut TokenFactory::new(entry_token),
190            )
191        } else {
192            Err(crate::Error::InvalidToken)
193        }
194    }
195
196    /// Makes this source update its registration.
197    ///
198    /// If after accessing the source you changed its parameters in a way that requires
199    /// updating its registration.
200    pub fn update(&self, token: &RegistrationToken) -> crate::Result<()> {
201        if let &SourceEntry {
202            token: entry_token,
203            source: Some(ref source),
204        } = self.inner.sources.borrow().get(token.inner)?
205        {
206            trace!(
207                source = entry_token.get_id(),
208                "Updating registration of source"
209            );
210            if !source.reregister(
211                &mut self.inner.poll.borrow_mut(),
212                &mut self
213                    .inner
214                    .sources_with_additional_lifecycle_events
215                    .borrow_mut(),
216                &mut TokenFactory::new(entry_token),
217            )? {
218                trace!(
219                    source = entry_token.get_id(),
220                    "Can't update registration withing a callback, storing for later."
221                );
222                // we are in a callback, store for later processing
223                self.inner.pending_action.set(PostAction::Reregister);
224            }
225            Ok(())
226        } else {
227            Err(crate::Error::InvalidToken)
228        }
229    }
230
231    /// Disables this event source.
232    ///
233    /// The source remains in the event loop, but it'll no longer generate events
234    pub fn disable(&self, token: &RegistrationToken) -> crate::Result<()> {
235        if let &SourceEntry {
236            token: entry_token,
237            source: Some(ref source),
238        } = self.inner.sources.borrow().get(token.inner)?
239        {
240            if !token.inner.same_source_as(entry_token) {
241                // The token provided by the user is no longer valid
242                return Err(crate::Error::InvalidToken);
243            }
244            trace!(source = entry_token.get_id(), "Unregistering source");
245            if !source.unregister(
246                &mut self.inner.poll.borrow_mut(),
247                &mut self
248                    .inner
249                    .sources_with_additional_lifecycle_events
250                    .borrow_mut(),
251                *token,
252            )? {
253                trace!(
254                    source = entry_token.get_id(),
255                    "Cannot unregister source in callback, storing for later."
256                );
257                // we are in a callback, store for later processing
258                self.inner.pending_action.set(PostAction::Disable);
259            }
260            Ok(())
261        } else {
262            Err(crate::Error::InvalidToken)
263        }
264    }
265
266    /// Removes this source from the event loop.
267    pub fn remove(&self, token: RegistrationToken) {
268        if let Ok(&mut SourceEntry {
269            token: entry_token,
270            ref mut source,
271        }) = self.inner.sources.borrow_mut().get_mut(token.inner)
272        {
273            if let Some(source) = source.take() {
274                trace!(source = entry_token.get_id(), "Removing source");
275                if let Err(e) = source.unregister(
276                    &mut self.inner.poll.borrow_mut(),
277                    &mut self
278                        .inner
279                        .sources_with_additional_lifecycle_events
280                        .borrow_mut(),
281                    token,
282                ) {
283                    warn!("Failed to unregister source from the polling system: {e:?}");
284                }
285            }
286        }
287    }
288
289    /// Wrap an IO object into an async adapter
290    ///
291    /// This adapter turns the IO object into an async-aware one that can be used in futures.
292    /// The readiness of these futures will be driven by the event loop.
293    ///
294    /// The produced futures can be polled in any executor, and notably the one provided by
295    /// calloop.
296    pub fn adapt_io<F: AsFd>(&self, fd: F) -> crate::Result<crate::io::Async<'l, F>> {
297        crate::io::Async::new(self.inner.clone(), fd)
298    }
299
300    /// Create a weak reference to this loop data.
301    ///
302    /// # Examples
303    ///
304    /// ```
305    /// use calloop::timer::{TimeoutAction, Timer};
306    /// use calloop::EventLoop;
307    ///
308    /// let event_loop: EventLoop<()> = EventLoop::try_new().unwrap();
309    /// let weak_handle = event_loop.handle().downgrade();
310    ///
311    /// event_loop
312    ///    .handle()
313    ///    .insert_source(Timer::immediate(), move |_, _, _| {
314    ///        // Hold its weak handle in the event loop's callback to break the reference cycle.
315    ///        let handle = weak_handle.upgrade().unwrap();
316    ///
317    ///        // Use the upgraded handle later...
318    ///
319    ///        TimeoutAction::Drop
320    ///    })
321    ///    .unwrap();
322    /// ```
323    pub fn downgrade(&self) -> WeakLoopHandle<'l, Data> {
324        WeakLoopHandle {
325            inner: Rc::downgrade(&self.inner),
326        }
327    }
328}
329
330impl<Data> Debug for WeakLoopHandle<'_, Data> {
331    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
332    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
333        f.write_str("WeakLoopHandle { ... }")
334    }
335}
336
337/// Manually implemented `Clone`.
338///
339/// The derived implementation adds a `Clone` bound on the generic parameter `Data`.
340impl<Data> Clone for WeakLoopHandle<'_, Data> {
341    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
342    fn clone(&self) -> Self {
343        WeakLoopHandle {
344            inner: self.inner.clone(),
345        }
346    }
347}
348
349/// Manually implemented `Default`.
350///
351/// The derived implementation adds a `Default` bound on the generic parameter `Data`.
352impl<Data> Default for WeakLoopHandle<'_, Data> {
353    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
354    fn default() -> Self {
355        WeakLoopHandle {
356            inner: Weak::default(),
357        }
358    }
359}
360
361impl<'l, Data> WeakLoopHandle<'l, Data> {
362    /// Try to get a [`LoopHandle`] from this weak reference.
363    ///
364    /// Returns [`None`] if the loop data has been dropped.
365    pub fn upgrade(&self) -> Option<LoopHandle<'l, Data>> {
366        self.inner.upgrade().map(|inner| LoopHandle { inner })
367    }
368
369    /// Check if the loop data has been dropped.
370    pub fn expired(&self) -> bool {
371        self.inner.strong_count() == 0
372    }
373}
374
375/// An event loop
376///
377/// This loop can host several event sources, that can be dynamically added or removed.
378pub struct EventLoop<'l, Data> {
379    #[allow(dead_code)]
380    poller: Arc<Poller>,
381    handle: LoopHandle<'l, Data>,
382    signals: Arc<Signals>,
383    // A caching vector for synthetic poll events
384    synthetic_events: Vec<PollEvent>,
385}
386
387impl<Data> Debug for EventLoop<'_, Data> {
388    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
389    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
390        f.write_str("EventLoop { ... }")
391    }
392}
393
394/// Signals related to the event loop.
395struct Signals {
396    /// Signal to stop the event loop.
397    stop: AtomicBool,
398
399    /// Signal that the future is ready.
400    #[cfg(feature = "block_on")]
401    future_ready: AtomicBool,
402}
403
404impl<'l, Data> EventLoop<'l, Data> {
405    /// Create a new event loop
406    ///
407    /// Fails if the initialization of the polling system failed.
408    pub fn try_new() -> crate::Result<Self> {
409        let poll = Poll::new()?;
410        let poller = poll.poller.clone();
411        let handle = LoopHandle {
412            inner: Rc::new(LoopInner {
413                poll: RefCell::new(poll),
414                sources: RefCell::new(SourceList::new()),
415                idles: RefCell::new(Vec::new()),
416                pending_action: Cell::new(PostAction::Continue),
417                sources_with_additional_lifecycle_events: Default::default(),
418            }),
419        };
420
421        Ok(EventLoop {
422            handle,
423            signals: Arc::new(Signals {
424                stop: AtomicBool::new(false),
425                #[cfg(feature = "block_on")]
426                future_ready: AtomicBool::new(false),
427            }),
428            poller,
429            synthetic_events: vec![],
430        })
431    }
432
433    /// Retrieve a loop handle
434    pub fn handle(&self) -> LoopHandle<'l, Data> {
435        self.handle.clone()
436    }
437
438    fn dispatch_events(
439        &mut self,
440        mut timeout: Option<Duration>,
441        data: &mut Data,
442    ) -> crate::Result<()> {
443        let now = Instant::now();
444        {
445            let mut extra_lifecycle_sources = self
446                .handle
447                .inner
448                .sources_with_additional_lifecycle_events
449                .borrow_mut();
450            let sources = &self.handle.inner.sources.borrow();
451            for source in &mut *extra_lifecycle_sources.values {
452                if let Ok(SourceEntry {
453                    source: Some(disp), ..
454                }) = sources.get(source.inner)
455                {
456                    if let Some((readiness, token)) = disp.before_sleep()? {
457                        // Wake up instantly after polling if we recieved an event
458                        timeout = Some(Duration::ZERO);
459                        self.synthetic_events.push(PollEvent { readiness, token });
460                    }
461                } else {
462                    unreachable!()
463                }
464            }
465        }
466        let events = {
467            let poll = self.handle.inner.poll.borrow();
468            loop {
469                let result = poll.poll(timeout);
470
471                match result {
472                    Ok(events) => break events,
473                    Err(crate::Error::IoError(err)) if err.kind() == io::ErrorKind::Interrupted => {
474                        // Interrupted by a signal. Update timeout and retry.
475                        if let Some(to) = timeout {
476                            let elapsed = now.elapsed();
477                            if elapsed >= to {
478                                return Ok(());
479                            } else {
480                                timeout = Some(to - elapsed);
481                            }
482                        }
483                    }
484                    Err(err) => return Err(err),
485                };
486            }
487        };
488        {
489            let mut extra_lifecycle_sources = self
490                .handle
491                .inner
492                .sources_with_additional_lifecycle_events
493                .borrow_mut();
494            if !extra_lifecycle_sources.values.is_empty() {
495                for source in &mut *extra_lifecycle_sources.values {
496                    if let Ok(SourceEntry {
497                        source: Some(disp), ..
498                    }) = self.handle.inner.sources.borrow().get(source.inner)
499                    {
500                        let iter = EventIterator {
501                            inner: events.iter(),
502                            registration_token: *source,
503                        };
504                        disp.before_handle_events(iter);
505                    } else {
506                        unreachable!()
507                    }
508                }
509            }
510        }
511
512        for event in self.synthetic_events.drain(..).chain(events) {
513            // Get the registration token associated with the event.
514            let reg_token = event.token.inner.forget_sub_id();
515
516            let opt_disp = self
517                .handle
518                .inner
519                .sources
520                .borrow()
521                .get(reg_token)
522                .ok()
523                .and_then(|entry| entry.source.clone());
524
525            if let Some(disp) = opt_disp {
526                trace!(source = reg_token.get_id(), "Dispatching events for source");
527                let mut ret = disp.process_events(event.readiness, event.token, data)?;
528
529                // if the returned PostAction is Continue, it may be overwritten by a user-specified pending action
530                let pending_action = self
531                    .handle
532                    .inner
533                    .pending_action
534                    .replace(PostAction::Continue);
535                if let PostAction::Continue = ret {
536                    ret = pending_action;
537                }
538
539                match ret {
540                    PostAction::Reregister => {
541                        trace!(
542                            source = reg_token.get_id(),
543                            "Postaction reregister for source"
544                        );
545                        disp.reregister(
546                            &mut self.handle.inner.poll.borrow_mut(),
547                            &mut self
548                                .handle
549                                .inner
550                                .sources_with_additional_lifecycle_events
551                                .borrow_mut(),
552                            &mut TokenFactory::new(reg_token),
553                        )?;
554                    }
555                    PostAction::Disable => {
556                        trace!(
557                            source = reg_token.get_id(),
558                            "Postaction unregister for source"
559                        );
560                        disp.unregister(
561                            &mut self.handle.inner.poll.borrow_mut(),
562                            &mut self
563                                .handle
564                                .inner
565                                .sources_with_additional_lifecycle_events
566                                .borrow_mut(),
567                            RegistrationToken::new(reg_token),
568                        )?;
569                    }
570                    PostAction::Remove => {
571                        trace!(source = reg_token.get_id(), "Postaction remove for source");
572                        if let Ok(entry) = self.handle.inner.sources.borrow_mut().get_mut(reg_token)
573                        {
574                            entry.source = None;
575                        }
576                    }
577                    PostAction::Continue => {}
578                }
579
580                if self
581                    .handle
582                    .inner
583                    .sources
584                    .borrow()
585                    .get(reg_token)
586                    .ok()
587                    .map(|entry| entry.source.is_none())
588                    .unwrap_or(true)
589                {
590                    // the source has been removed from within its callback, unregister it
591                    let mut poll = self.handle.inner.poll.borrow_mut();
592                    if let Err(e) = disp.unregister(
593                        &mut poll,
594                        &mut self
595                            .handle
596                            .inner
597                            .sources_with_additional_lifecycle_events
598                            .borrow_mut(),
599                        RegistrationToken::new(reg_token),
600                    ) {
601                        warn!("Failed to unregister source from the polling system: {e:?}",);
602                    }
603                }
604            } else {
605                warn!(?reg_token, "Received an event for non-existent source");
606            }
607        }
608
609        Ok(())
610    }
611
612    fn dispatch_idles(&mut self, data: &mut Data) {
613        let idles = std::mem::take(&mut *self.handle.inner.idles.borrow_mut());
614        for idle in idles {
615            idle.borrow_mut().dispatch(data);
616        }
617    }
618
619    /// Dispatch pending events to their callbacks
620    ///
621    /// If some sources have events available, their callbacks will be immediately called.
622    /// Otherwise, this will wait until an event is received or the provided `timeout`
623    /// is reached. If `timeout` is `None`, it will wait without a duration limit.
624    ///
625    /// Once pending events have been processed or the timeout is reached, all pending
626    /// idle callbacks will be fired before this method returns.
627    pub fn dispatch<D: Into<Option<Duration>>>(
628        &mut self,
629        timeout: D,
630        data: &mut Data,
631    ) -> crate::Result<()> {
632        self.dispatch_events(timeout.into(), data)?;
633        self.dispatch_idles(data);
634
635        Ok(())
636    }
637
638    /// Get a signal to stop this event loop from running
639    ///
640    /// To be used in conjunction with the `run()` method.
641    pub fn get_signal(&self) -> LoopSignal {
642        LoopSignal {
643            signal: self.signals.clone(),
644            notifier: self.handle.inner.poll.borrow().notifier(),
645        }
646    }
647
648    /// Run this event loop
649    ///
650    /// This will repeatedly try to dispatch events (see the `dispatch()` method) on
651    /// this event loop, waiting at most `timeout` every time.
652    ///
653    /// Between each dispatch wait, your provided callback will be called.
654    ///
655    /// You can use the `get_signal()` method to retrieve a way to stop or wakeup
656    /// the event loop from anywhere.
657    pub fn run<F, D: Into<Option<Duration>>>(
658        &mut self,
659        timeout: D,
660        data: &mut Data,
661        mut cb: F,
662    ) -> crate::Result<()>
663    where
664        F: FnMut(&mut Data),
665    {
666        let timeout = timeout.into();
667        self.signals.stop.store(false, Ordering::Release);
668        while !self.signals.stop.load(Ordering::Acquire) {
669            self.dispatch(timeout, data)?;
670            cb(data);
671        }
672        Ok(())
673    }
674
675    /// Block a future on this event loop.
676    ///
677    /// This will run the provided future on this event loop, blocking until it is
678    /// resolved.
679    ///
680    /// If [`LoopSignal::stop()`] is called before the future is resolved, this function returns
681    /// `None`.
682    #[cfg(feature = "block_on")]
683    pub fn block_on<R>(
684        &mut self,
685        future: impl Future<Output = R>,
686        data: &mut Data,
687        mut cb: impl FnMut(&mut Data),
688    ) -> crate::Result<Option<R>> {
689        use std::task::{Context, Poll, Wake, Waker};
690
691        /// A waker that will wake up the event loop when it is ready to make progress.
692        struct EventLoopWaker(LoopSignal);
693
694        impl Wake for EventLoopWaker {
695            fn wake(self: Arc<Self>) {
696                // Set the waker.
697                self.0.signal.future_ready.store(true, Ordering::Release);
698                self.0.notifier.notify().ok();
699            }
700
701            fn wake_by_ref(self: &Arc<Self>) {
702                // Set the waker.
703                self.0.signal.future_ready.store(true, Ordering::Release);
704                self.0.notifier.notify().ok();
705            }
706        }
707
708        // Pin the future to the stack.
709        pin_utils::pin_mut!(future);
710
711        // Create a waker that will wake up the event loop when it is ready to make progress.
712        let waker = {
713            let handle = EventLoopWaker(self.get_signal());
714
715            Waker::from(Arc::new(handle))
716        };
717        let mut context = Context::from_waker(&waker);
718
719        // Begin running the loop.
720        let mut output = None;
721
722        self.signals.stop.store(false, Ordering::Release);
723        self.signals.future_ready.store(true, Ordering::Release);
724
725        while !self.signals.stop.load(Ordering::Acquire) {
726            // If the future is ready to be polled, poll it.
727            if self.signals.future_ready.swap(false, Ordering::AcqRel) {
728                // Poll the future and break the loop if it's ready.
729                if let Poll::Ready(result) = future.as_mut().poll(&mut context) {
730                    output = Some(result);
731                    break;
732                }
733            }
734
735            // Otherwise, block on the event loop.
736            self.dispatch_events(None, data)?;
737            self.dispatch_idles(data);
738            cb(data);
739        }
740
741        Ok(output)
742    }
743}
744
745#[cfg(unix)]
746impl<Data> AsRawFd for EventLoop<'_, Data> {
747    /// Get the underlying raw-fd of the poller.
748    ///
749    /// This could be used to create [`Generic`] source out of the current loop
750    /// and inserting into some other [`EventLoop`]. It's recommended to clone `fd`
751    /// before doing so.
752    ///
753    /// [`Generic`]: crate::generic::Generic
754    fn as_raw_fd(&self) -> RawFd {
755        self.poller.as_raw_fd()
756    }
757}
758
759#[cfg(unix)]
760impl<Data> AsFd for EventLoop<'_, Data> {
761    /// Get the underlying fd of the poller.
762    ///
763    /// This could be used to create [`Generic`] source out of the current loop
764    /// and inserting into some other [`EventLoop`].
765    ///
766    /// [`Generic`]: crate::generic::Generic
767    fn as_fd(&self) -> BorrowedFd<'_> {
768        self.poller.as_fd()
769    }
770}
771
772#[cfg(windows)]
773impl<Data> AsRawHandle for EventLoop<'_, Data> {
774    fn as_raw_handle(&self) -> RawHandle {
775        self.poller.as_raw_handle()
776    }
777}
778
779#[cfg(windows)]
780impl<Data> AsHandle for EventLoop<'_, Data> {
781    fn as_handle(&self) -> BorrowedHandle<'_> {
782        self.poller.as_handle()
783    }
784}
785
786#[derive(Clone, Debug)]
787/// The EventIterator is an `Iterator` over the events relevant to a particular source
788/// This type is used in the [`EventSource::before_handle_events`] methods for
789/// two main reasons:
790///
791/// - To avoid dynamic dispatch overhead
792/// - Secondly, it is to allow this type to be `Clone`, which is not
793///   possible with dynamic dispatch
794pub struct EventIterator<'a> {
795    inner: slice::Iter<'a, PollEvent>,
796    registration_token: RegistrationToken,
797}
798
799impl Iterator for EventIterator<'_> {
800    type Item = (Readiness, Token);
801
802    fn next(&mut self) -> Option<Self::Item> {
803        for next in self.inner.by_ref() {
804            if next
805                .token
806                .inner
807                .same_source_as(self.registration_token.inner)
808            {
809                return Some((next.readiness, next.token));
810            }
811        }
812        None
813    }
814}
815
816/// A signal that can be shared between thread to stop or wakeup a running
817/// event loop
818#[derive(Clone)]
819pub struct LoopSignal {
820    signal: Arc<Signals>,
821    notifier: Notifier,
822}
823
824impl Debug for LoopSignal {
825    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
826    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
827        f.write_str("LoopSignal { ... }")
828    }
829}
830
831impl LoopSignal {
832    /// Stop the event loop
833    ///
834    /// Once this method is called, the next time the event loop has finished
835    /// waiting for events, it will return rather than starting to wait again.
836    ///
837    /// This is only useful if you are using the `EventLoop::run()` method.
838    pub fn stop(&self) {
839        self.signal.stop.store(true, Ordering::Release);
840    }
841
842    /// Wake up the event loop
843    ///
844    /// This sends a dummy event to the event loop to simulate the reception
845    /// of an event, making the wait return early. Called after `stop()`, this
846    /// ensures the event loop will terminate quickly if you specified a long
847    /// timeout (or no timeout at all) to the `dispatch` or `run` method.
848    pub fn wakeup(&self) {
849        self.notifier.notify().ok();
850    }
851}
852
853#[cfg(test)]
854mod tests {
855    use std::{cell::Cell, rc::Rc, time::Duration};
856
857    use crate::{
858        channel::{channel, Channel},
859        ping::*,
860        timer, EventIterator, EventSource, Poll, PostAction, Readiness, RegistrationToken, Token,
861        TokenFactory,
862    };
863
864    #[cfg(unix)]
865    use crate::{generic::Generic, Dispatcher, Interest, Mode};
866
867    use super::EventLoop;
868
869    #[test]
870    fn dispatch_idle() {
871        let mut event_loop = EventLoop::try_new().unwrap();
872
873        let mut dispatched = false;
874
875        event_loop.handle().insert_idle(|d| {
876            *d = true;
877        });
878
879        event_loop
880            .dispatch(Some(Duration::ZERO), &mut dispatched)
881            .unwrap();
882
883        assert!(dispatched);
884    }
885
886    #[test]
887    fn cancel_idle() {
888        let mut event_loop = EventLoop::try_new().unwrap();
889
890        let mut dispatched = false;
891
892        let handle = event_loop.handle();
893        let idle = handle.insert_idle(move |d| {
894            *d = true;
895        });
896
897        idle.cancel();
898
899        event_loop
900            .dispatch(Duration::ZERO, &mut dispatched)
901            .unwrap();
902
903        assert!(!dispatched);
904    }
905
906    #[test]
907    fn wakeup() {
908        let mut event_loop = EventLoop::try_new().unwrap();
909
910        let signal = event_loop.get_signal();
911
912        ::std::thread::spawn(move || {
913            ::std::thread::sleep(Duration::from_millis(500));
914            signal.wakeup();
915        });
916
917        // the test should return
918        event_loop.dispatch(None, &mut ()).unwrap();
919    }
920
921    #[test]
922    fn wakeup_stop() {
923        let mut event_loop = EventLoop::try_new().unwrap();
924
925        let signal = event_loop.get_signal();
926
927        ::std::thread::spawn(move || {
928            ::std::thread::sleep(Duration::from_millis(500));
929            signal.stop();
930            signal.wakeup();
931        });
932
933        // the test should return
934        event_loop.run(None, &mut (), |_| {}).unwrap();
935    }
936
937    #[test]
938    fn additional_events() {
939        let mut event_loop: EventLoop<'_, Lock> = EventLoop::try_new().unwrap();
940        let mut lock = Lock {
941            lock: Rc::new((
942                // Whether the lock is locked
943                Cell::new(false),
944                // The total number of events processed in process_events
945                Cell::new(0),
946                // The total number of events processed in before_handle_events
947                // This is used to ensure that the count seen in before_handle_events is expected
948                Cell::new(0),
949            )),
950        };
951        let (sender, channel) = channel();
952        let token = event_loop
953            .handle()
954            .insert_source(
955                LockingSource {
956                    channel,
957                    lock: lock.clone(),
958                },
959                |_, _, lock| {
960                    lock.lock();
961                    lock.unlock();
962                },
963            )
964            .unwrap();
965        sender.send(()).unwrap();
966
967        event_loop.dispatch(None, &mut lock).unwrap();
968        // We should have been locked twice so far
969        assert_eq!(lock.lock.1.get(), 2);
970        // And we should have received one event
971        assert_eq!(lock.lock.2.get(), 1);
972        event_loop.handle().disable(&token).unwrap();
973        event_loop
974            .dispatch(Some(Duration::ZERO), &mut lock)
975            .unwrap();
976        assert_eq!(lock.lock.1.get(), 2);
977
978        event_loop.handle().enable(&token).unwrap();
979        event_loop
980            .dispatch(Some(Duration::ZERO), &mut lock)
981            .unwrap();
982        assert_eq!(lock.lock.1.get(), 3);
983        event_loop.handle().remove(token);
984        event_loop
985            .dispatch(Some(Duration::ZERO), &mut lock)
986            .unwrap();
987        assert_eq!(lock.lock.1.get(), 3);
988        assert_eq!(lock.lock.2.get(), 1);
989
990        #[derive(Clone)]
991        struct Lock {
992            lock: Rc<(Cell<bool>, Cell<u32>, Cell<u32>)>,
993        }
994        impl Lock {
995            fn lock(&self) {
996                if self.lock.0.get() {
997                    panic!();
998                }
999                // Increase the count
1000                self.lock.1.set(self.lock.1.get() + 1);
1001                self.lock.0.set(true)
1002            }
1003            fn unlock(&self) {
1004                if !self.lock.0.get() {
1005                    panic!();
1006                }
1007                self.lock.0.set(false);
1008            }
1009        }
1010        struct LockingSource {
1011            channel: Channel<()>,
1012            lock: Lock,
1013        }
1014        impl EventSource for LockingSource {
1015            type Event = <Channel<()> as EventSource>::Event;
1016
1017            type Metadata = <Channel<()> as EventSource>::Metadata;
1018
1019            type Ret = <Channel<()> as EventSource>::Ret;
1020
1021            type Error = <Channel<()> as EventSource>::Error;
1022
1023            fn process_events<F>(
1024                &mut self,
1025                readiness: Readiness,
1026                token: Token,
1027                callback: F,
1028            ) -> Result<PostAction, Self::Error>
1029            where
1030                F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
1031            {
1032                self.channel.process_events(readiness, token, callback)
1033            }
1034
1035            fn register(
1036                &mut self,
1037                poll: &mut Poll,
1038                token_factory: &mut TokenFactory,
1039            ) -> crate::Result<()> {
1040                self.channel.register(poll, token_factory)
1041            }
1042
1043            fn reregister(
1044                &mut self,
1045                poll: &mut Poll,
1046                token_factory: &mut TokenFactory,
1047            ) -> crate::Result<()> {
1048                self.channel.reregister(poll, token_factory)
1049            }
1050
1051            fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
1052                self.channel.unregister(poll)
1053            }
1054
1055            const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = true;
1056
1057            fn before_sleep(&mut self) -> crate::Result<Option<(Readiness, Token)>> {
1058                self.lock.lock();
1059                Ok(None)
1060            }
1061
1062            fn before_handle_events(&mut self, events: EventIterator) {
1063                let events_count = events.count();
1064                let lock = &self.lock.lock;
1065                lock.2.set(lock.2.get() + events_count as u32);
1066                self.lock.unlock();
1067            }
1068        }
1069    }
1070    #[test]
1071    fn default_additional_events() {
1072        let (sender, channel) = channel();
1073        let mut test_source = NoopWithDefaultHandlers { channel };
1074        let mut event_loop = EventLoop::try_new().unwrap();
1075        event_loop
1076            .handle()
1077            .insert_source(Box::new(&mut test_source), |_, _, _| {})
1078            .unwrap();
1079        sender.send(()).unwrap();
1080
1081        event_loop.dispatch(None, &mut ()).unwrap();
1082        struct NoopWithDefaultHandlers {
1083            channel: Channel<()>,
1084        }
1085        impl EventSource for NoopWithDefaultHandlers {
1086            type Event = <Channel<()> as EventSource>::Event;
1087
1088            type Metadata = <Channel<()> as EventSource>::Metadata;
1089
1090            type Ret = <Channel<()> as EventSource>::Ret;
1091
1092            type Error = <Channel<()> as EventSource>::Error;
1093
1094            fn process_events<F>(
1095                &mut self,
1096                readiness: Readiness,
1097                token: Token,
1098                callback: F,
1099            ) -> Result<PostAction, Self::Error>
1100            where
1101                F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
1102            {
1103                self.channel.process_events(readiness, token, callback)
1104            }
1105
1106            fn register(
1107                &mut self,
1108                poll: &mut Poll,
1109                token_factory: &mut TokenFactory,
1110            ) -> crate::Result<()> {
1111                self.channel.register(poll, token_factory)
1112            }
1113
1114            fn reregister(
1115                &mut self,
1116                poll: &mut Poll,
1117                token_factory: &mut TokenFactory,
1118            ) -> crate::Result<()> {
1119                self.channel.reregister(poll, token_factory)
1120            }
1121
1122            fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
1123                self.channel.unregister(poll)
1124            }
1125
1126            const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = true;
1127        }
1128    }
1129
1130    #[test]
1131    fn additional_events_synthetic() {
1132        let mut event_loop: EventLoop<'_, Lock> = EventLoop::try_new().unwrap();
1133        let mut lock = Lock {
1134            lock: Rc::new(Cell::new(false)),
1135        };
1136        event_loop
1137            .handle()
1138            .insert_source(
1139                InstantWakeupLockingSource {
1140                    lock: lock.clone(),
1141                    token: None,
1142                },
1143                |_, _, lock| {
1144                    lock.lock();
1145                    lock.unlock();
1146                },
1147            )
1148            .unwrap();
1149
1150        // Loop should finish, as
1151        event_loop.dispatch(None, &mut lock).unwrap();
1152        #[derive(Clone)]
1153        struct Lock {
1154            lock: Rc<Cell<bool>>,
1155        }
1156        impl Lock {
1157            fn lock(&self) {
1158                if self.lock.get() {
1159                    panic!();
1160                }
1161                self.lock.set(true)
1162            }
1163            fn unlock(&self) {
1164                if !self.lock.get() {
1165                    panic!();
1166                }
1167                self.lock.set(false);
1168            }
1169        }
1170        struct InstantWakeupLockingSource {
1171            lock: Lock,
1172            token: Option<Token>,
1173        }
1174        impl EventSource for InstantWakeupLockingSource {
1175            type Event = ();
1176
1177            type Metadata = ();
1178
1179            type Ret = ();
1180
1181            type Error = <Channel<()> as EventSource>::Error;
1182
1183            fn process_events<F>(
1184                &mut self,
1185                _: Readiness,
1186                token: Token,
1187                mut callback: F,
1188            ) -> Result<PostAction, Self::Error>
1189            where
1190                F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
1191            {
1192                assert_eq!(token, self.token.unwrap());
1193                callback((), &mut ());
1194                Ok(PostAction::Continue)
1195            }
1196
1197            fn register(
1198                &mut self,
1199                _: &mut Poll,
1200                token_factory: &mut TokenFactory,
1201            ) -> crate::Result<()> {
1202                self.token = Some(token_factory.token());
1203                Ok(())
1204            }
1205
1206            fn reregister(&mut self, _: &mut Poll, _: &mut TokenFactory) -> crate::Result<()> {
1207                unreachable!()
1208            }
1209
1210            fn unregister(&mut self, _: &mut Poll) -> crate::Result<()> {
1211                unreachable!()
1212            }
1213
1214            const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = true;
1215
1216            fn before_sleep(&mut self) -> crate::Result<Option<(Readiness, Token)>> {
1217                self.lock.lock();
1218                Ok(Some((Readiness::EMPTY, self.token.unwrap())))
1219            }
1220
1221            fn before_handle_events(&mut self, _: EventIterator) {
1222                self.lock.unlock();
1223            }
1224        }
1225    }
1226
1227    #[cfg(unix)]
1228    #[test]
1229    fn insert_bad_source() {
1230        use std::mem::ManuallyDrop;
1231        use std::os::unix::io::{AsFd, FromRawFd, OwnedFd};
1232
1233        struct LeakedFd(ManuallyDrop<OwnedFd>);
1234
1235        impl AsFd for LeakedFd {
1236            fn as_fd(&self) -> std::os::unix::prelude::BorrowedFd<'_> {
1237                self.0.as_fd()
1238            }
1239        }
1240
1241        let event_loop = EventLoop::<()>::try_new().unwrap();
1242        let fd = LeakedFd(ManuallyDrop::new(unsafe {
1243            std::os::unix::io::OwnedFd::from_raw_fd(420)
1244        }));
1245        let ret = event_loop.handle().insert_source(
1246            crate::sources::generic::Generic::new(fd, Interest::READ, Mode::Level),
1247            |_, _, _| Ok(PostAction::Continue),
1248        );
1249        assert!(ret.is_err());
1250    }
1251
1252    #[test]
1253    fn invalid_token() {
1254        let (_ping, source) = crate::sources::ping::make_ping().unwrap();
1255
1256        let event_loop = EventLoop::<()>::try_new().unwrap();
1257        let handle = event_loop.handle();
1258        let reg_token = handle.insert_source(source, |_, _, _| {}).unwrap();
1259        handle.remove(reg_token);
1260
1261        let ret = handle.enable(&reg_token);
1262        assert!(ret.is_err());
1263    }
1264
1265    #[cfg(unix)]
1266    #[test]
1267    fn insert_source_no_interest() {
1268        use rustix::pipe::pipe;
1269
1270        // Create a pipe to get an arbitrary fd.
1271        let (read, _write) = pipe().unwrap();
1272
1273        let source = crate::sources::generic::Generic::new(read, Interest::EMPTY, Mode::Level);
1274        let dispatcher = Dispatcher::new(source, |_, _, _| Ok(PostAction::Continue));
1275
1276        let event_loop = EventLoop::<()>::try_new().unwrap();
1277        let handle = event_loop.handle();
1278        let ret = handle.register_dispatcher(dispatcher.clone());
1279
1280        if let Ok(token) = ret {
1281            // Unwrap the dispatcher+source and close the read end.
1282            handle.remove(token);
1283        } else {
1284            // Fail the test.
1285            panic!();
1286        }
1287    }
1288
1289    #[test]
1290    fn disarm_rearm() {
1291        let mut event_loop = EventLoop::<bool>::try_new().unwrap();
1292        let (ping, ping_source) = make_ping().unwrap();
1293
1294        let ping_token = event_loop
1295            .handle()
1296            .insert_source(ping_source, |(), &mut (), dispatched| {
1297                *dispatched = true;
1298            })
1299            .unwrap();
1300
1301        ping.ping();
1302        let mut dispatched = false;
1303        event_loop
1304            .dispatch(Duration::ZERO, &mut dispatched)
1305            .unwrap();
1306        assert!(dispatched);
1307
1308        // disable the source
1309        ping.ping();
1310        event_loop.handle().disable(&ping_token).unwrap();
1311        let mut dispatched = false;
1312        event_loop
1313            .dispatch(Duration::ZERO, &mut dispatched)
1314            .unwrap();
1315        assert!(!dispatched);
1316
1317        // reenable it, the previous ping now gets dispatched
1318        event_loop.handle().enable(&ping_token).unwrap();
1319        let mut dispatched = false;
1320        event_loop
1321            .dispatch(Duration::ZERO, &mut dispatched)
1322            .unwrap();
1323        assert!(dispatched);
1324    }
1325
1326    #[test]
1327    fn multiple_tokens() {
1328        struct DoubleSource {
1329            ping1: PingSource,
1330            ping2: PingSource,
1331        }
1332
1333        impl crate::EventSource for DoubleSource {
1334            type Event = u32;
1335            type Metadata = ();
1336            type Ret = ();
1337            type Error = PingError;
1338
1339            fn process_events<F>(
1340                &mut self,
1341                readiness: Readiness,
1342                token: Token,
1343                mut callback: F,
1344            ) -> Result<PostAction, Self::Error>
1345            where
1346                F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
1347            {
1348                self.ping1
1349                    .process_events(readiness, token, |(), &mut ()| callback(1, &mut ()))?;
1350                self.ping2
1351                    .process_events(readiness, token, |(), &mut ()| callback(2, &mut ()))?;
1352                Ok(PostAction::Continue)
1353            }
1354
1355            fn register(
1356                &mut self,
1357                poll: &mut Poll,
1358                token_factory: &mut TokenFactory,
1359            ) -> crate::Result<()> {
1360                self.ping1.register(poll, token_factory)?;
1361                self.ping2.register(poll, token_factory)?;
1362                Ok(())
1363            }
1364
1365            fn reregister(
1366                &mut self,
1367                poll: &mut Poll,
1368                token_factory: &mut TokenFactory,
1369            ) -> crate::Result<()> {
1370                self.ping1.reregister(poll, token_factory)?;
1371                self.ping2.reregister(poll, token_factory)?;
1372                Ok(())
1373            }
1374
1375            fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
1376                self.ping1.unregister(poll)?;
1377                self.ping2.unregister(poll)?;
1378                Ok(())
1379            }
1380        }
1381
1382        let mut event_loop = EventLoop::<u32>::try_new().unwrap();
1383
1384        let (ping1, source1) = make_ping().unwrap();
1385        let (ping2, source2) = make_ping().unwrap();
1386
1387        let source = DoubleSource {
1388            ping1: source1,
1389            ping2: source2,
1390        };
1391
1392        event_loop
1393            .handle()
1394            .insert_source(source, |i, _, d| {
1395                eprintln!("Dispatching {}", i);
1396                *d += i
1397            })
1398            .unwrap();
1399
1400        let mut dispatched = 0;
1401        ping1.ping();
1402        event_loop
1403            .dispatch(Duration::ZERO, &mut dispatched)
1404            .unwrap();
1405        assert_eq!(dispatched, 1);
1406
1407        dispatched = 0;
1408        ping2.ping();
1409        event_loop
1410            .dispatch(Duration::ZERO, &mut dispatched)
1411            .unwrap();
1412        assert_eq!(dispatched, 2);
1413
1414        dispatched = 0;
1415        ping1.ping();
1416        ping2.ping();
1417        event_loop
1418            .dispatch(Duration::ZERO, &mut dispatched)
1419            .unwrap();
1420        assert_eq!(dispatched, 3);
1421    }
1422
1423    #[cfg(unix)]
1424    #[test]
1425    fn change_interests() {
1426        use rustix::io::write;
1427        use rustix::net::{recv, socketpair, AddressFamily, RecvFlags, SocketFlags, SocketType};
1428        let mut event_loop = EventLoop::<bool>::try_new().unwrap();
1429
1430        let (sock1, sock2) = socketpair(
1431            AddressFamily::UNIX,
1432            SocketType::STREAM,
1433            SocketFlags::empty(),
1434            None, // recv with DONTWAIT will suffice for platforms without SockFlag::SOCK_NONBLOCKING such as macOS
1435        )
1436        .unwrap();
1437
1438        let source = Generic::new(sock1, Interest::READ, Mode::Level);
1439        let dispatcher = Dispatcher::new(source, |_, fd, dispatched| {
1440            *dispatched = true;
1441            // read all contents available to drain the socket
1442            let mut buf = [0u8; 32];
1443            loop {
1444                match recv(&*fd, &mut buf, RecvFlags::DONTWAIT) {
1445                    Ok((0, _)) => break, // closed pipe, we are now inert
1446                    Ok(_) => {}
1447                    Err(e) => {
1448                        let e: std::io::Error = e.into();
1449                        if e.kind() == std::io::ErrorKind::WouldBlock {
1450                            break;
1451                        // nothing more to read
1452                        } else {
1453                            // propagate error
1454                            return Err(e);
1455                        }
1456                    }
1457                }
1458            }
1459            Ok(PostAction::Continue)
1460        });
1461
1462        let sock_token_1 = event_loop
1463            .handle()
1464            .register_dispatcher(dispatcher.clone())
1465            .unwrap();
1466
1467        // first dispatch, nothing is readable
1468        let mut dispatched = false;
1469        event_loop
1470            .dispatch(Duration::ZERO, &mut dispatched)
1471            .unwrap();
1472        assert!(!dispatched);
1473
1474        // write something, the socket becomes readable
1475        write(&sock2, &[1, 2, 3]).unwrap();
1476        dispatched = false;
1477        event_loop
1478            .dispatch(Duration::ZERO, &mut dispatched)
1479            .unwrap();
1480        assert!(dispatched);
1481
1482        // All has been read, no longer readable
1483        dispatched = false;
1484        event_loop
1485            .dispatch(Duration::ZERO, &mut dispatched)
1486            .unwrap();
1487        assert!(!dispatched);
1488
1489        // change the interests for writability instead
1490        dispatcher.as_source_mut().interest = Interest::WRITE;
1491        event_loop.handle().update(&sock_token_1).unwrap();
1492
1493        // the socket is writable
1494        dispatched = false;
1495        event_loop
1496            .dispatch(Duration::ZERO, &mut dispatched)
1497            .unwrap();
1498        assert!(dispatched);
1499
1500        // change back to readable
1501        dispatcher.as_source_mut().interest = Interest::READ;
1502        event_loop.handle().update(&sock_token_1).unwrap();
1503
1504        // the socket is not readable
1505        dispatched = false;
1506        event_loop
1507            .dispatch(Duration::ZERO, &mut dispatched)
1508            .unwrap();
1509        assert!(!dispatched);
1510    }
1511
1512    #[test]
1513    fn kill_source() {
1514        let mut event_loop = EventLoop::<Option<RegistrationToken>>::try_new().unwrap();
1515
1516        let handle = event_loop.handle();
1517        let (ping, ping_source) = make_ping().unwrap();
1518        let ping_token = event_loop
1519            .handle()
1520            .insert_source(ping_source, move |(), &mut (), opt_src| {
1521                if let Some(src) = opt_src.take() {
1522                    handle.remove(src);
1523                }
1524            })
1525            .unwrap();
1526
1527        ping.ping();
1528
1529        let mut opt_src = Some(ping_token);
1530
1531        event_loop.dispatch(Duration::ZERO, &mut opt_src).unwrap();
1532
1533        assert!(opt_src.is_none());
1534    }
1535
1536    #[test]
1537    fn non_static_data() {
1538        use std::sync::mpsc;
1539
1540        let (sender, receiver) = mpsc::channel();
1541
1542        {
1543            struct RefSender<'a>(&'a mpsc::Sender<()>);
1544            let mut ref_sender = RefSender(&sender);
1545
1546            let mut event_loop = EventLoop::<RefSender<'_>>::try_new().unwrap();
1547            let (ping, ping_source) = make_ping().unwrap();
1548            let _ping_token = event_loop
1549                .handle()
1550                .insert_source(ping_source, |_, _, ref_sender| {
1551                    ref_sender.0.send(()).unwrap();
1552                })
1553                .unwrap();
1554
1555            ping.ping();
1556
1557            event_loop
1558                .dispatch(Duration::ZERO, &mut ref_sender)
1559                .unwrap();
1560        }
1561
1562        receiver.recv().unwrap();
1563        // sender still usable (e.g. for another EventLoop)
1564        drop(sender);
1565    }
1566
1567    #[cfg(feature = "block_on")]
1568    #[test]
1569    fn block_on_test() {
1570        use crate::sources::timer::TimeoutFuture;
1571        use std::time::Duration;
1572
1573        let mut evl = EventLoop::<()>::try_new().unwrap();
1574
1575        let mut data = 22;
1576        let timeout = {
1577            let data = &mut data;
1578            let evl_handle = evl.handle();
1579
1580            async move {
1581                TimeoutFuture::from_duration(&evl_handle, Duration::from_secs(2)).await;
1582                *data = 32;
1583                11
1584            }
1585        };
1586
1587        let result = evl.block_on(timeout, &mut (), |&mut ()| {}).unwrap();
1588        assert_eq!(result, Some(11));
1589        assert_eq!(data, 32);
1590    }
1591
1592    #[cfg(feature = "block_on")]
1593    #[test]
1594    fn block_on_early_cancel() {
1595        use crate::sources::timer;
1596        use std::time::Duration;
1597
1598        let mut evl = EventLoop::<()>::try_new().unwrap();
1599
1600        let mut data = 22;
1601        let timeout = {
1602            let data = &mut data;
1603            let evl_handle = evl.handle();
1604
1605            async move {
1606                timer::TimeoutFuture::from_duration(&evl_handle, Duration::from_secs(2)).await;
1607                *data = 32;
1608                11
1609            }
1610        };
1611
1612        let timer_source = timer::Timer::from_duration(Duration::from_secs(1));
1613        let handle = evl.get_signal();
1614        let _timer_token = evl
1615            .handle()
1616            .insert_source(timer_source, move |_, _, _| {
1617                handle.stop();
1618                timer::TimeoutAction::Drop
1619            })
1620            .unwrap();
1621
1622        let result = evl.block_on(timeout, &mut (), |&mut ()| {}).unwrap();
1623        assert_eq!(result, None);
1624        assert_eq!(data, 22);
1625    }
1626
1627    #[test]
1628    fn reuse() {
1629        use crate::sources::timer;
1630        use std::sync::{Arc, Mutex};
1631        use std::time::{Duration, Instant};
1632
1633        let mut evl = EventLoop::<RegistrationToken>::try_new().unwrap();
1634        let handle = evl.handle();
1635
1636        let data = Arc::new(Mutex::new(1));
1637        let data_cloned = data.clone();
1638
1639        let timer_source = timer::Timer::from_duration(Duration::from_secs(1));
1640        let mut first_timer_token = evl
1641            .handle()
1642            .insert_source(timer_source, move |_, _, own_token| {
1643                handle.remove(*own_token);
1644                let data_cloned = data_cloned.clone();
1645                let _ = handle.insert_source(timer::Timer::immediate(), move |_, _, _| {
1646                    *data_cloned.lock().unwrap() = 2;
1647                    timer::TimeoutAction::Drop
1648                });
1649                timer::TimeoutAction::Drop
1650            })
1651            .unwrap();
1652
1653        let now = Instant::now();
1654        loop {
1655            evl.dispatch(Some(Duration::from_secs(3)), &mut first_timer_token)
1656                .unwrap();
1657            if Instant::now().duration_since(now) > Duration::from_secs(3) {
1658                break;
1659            }
1660        }
1661
1662        assert_eq!(*data.lock().unwrap(), 2);
1663    }
1664
1665    #[test]
1666    fn drop_of_subsource() {
1667        struct WithSubSource {
1668            token: Option<Token>,
1669        }
1670
1671        impl crate::EventSource for WithSubSource {
1672            type Event = ();
1673            type Metadata = ();
1674            type Ret = ();
1675            type Error = crate::Error;
1676            const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = true;
1677
1678            fn process_events<F>(
1679                &mut self,
1680                _: Readiness,
1681                _: Token,
1682                mut callback: F,
1683            ) -> Result<PostAction, Self::Error>
1684            where
1685                F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
1686            {
1687                callback((), &mut ());
1688                // Drop the source
1689                Ok(PostAction::Remove)
1690            }
1691
1692            fn register(&mut self, _: &mut Poll, fact: &mut TokenFactory) -> crate::Result<()> {
1693                // produce a few tokens to emulate a subsource
1694                fact.token();
1695                fact.token();
1696                self.token = Some(fact.token());
1697                Ok(())
1698            }
1699
1700            fn reregister(&mut self, _: &mut Poll, _: &mut TokenFactory) -> crate::Result<()> {
1701                Ok(())
1702            }
1703
1704            fn unregister(&mut self, _: &mut Poll) -> crate::Result<()> {
1705                Ok(())
1706            }
1707
1708            // emulate a readiness
1709            fn before_sleep(&mut self) -> crate::Result<Option<(Readiness, Token)>> {
1710                Ok(self.token.map(|token| {
1711                    (
1712                        Readiness {
1713                            readable: true,
1714                            writable: false,
1715                            error: false,
1716                        },
1717                        token,
1718                    )
1719                }))
1720            }
1721        }
1722
1723        // Now the actual test
1724        let mut evl = EventLoop::<bool>::try_new().unwrap();
1725        evl.handle()
1726            .insert_source(WithSubSource { token: None }, |_, _, ran| {
1727                *ran = true;
1728            })
1729            .unwrap();
1730
1731        let mut ran = false;
1732
1733        evl.dispatch(Some(Duration::ZERO), &mut ran).unwrap();
1734
1735        assert!(ran);
1736    }
1737
1738    // A dummy EventSource to test insertion and removal of sources
1739    struct DummySource;
1740
1741    impl crate::EventSource for DummySource {
1742        type Event = ();
1743        type Metadata = ();
1744        type Ret = ();
1745        type Error = crate::Error;
1746
1747        fn process_events<F>(
1748            &mut self,
1749            _: Readiness,
1750            _: Token,
1751            mut callback: F,
1752        ) -> Result<PostAction, Self::Error>
1753        where
1754            F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
1755        {
1756            callback((), &mut ());
1757            Ok(PostAction::Continue)
1758        }
1759
1760        fn register(&mut self, _: &mut Poll, _: &mut TokenFactory) -> crate::Result<()> {
1761            Ok(())
1762        }
1763
1764        fn reregister(&mut self, _: &mut Poll, _: &mut TokenFactory) -> crate::Result<()> {
1765            Ok(())
1766        }
1767
1768        fn unregister(&mut self, _: &mut Poll) -> crate::Result<()> {
1769            Ok(())
1770        }
1771    }
1772
1773    #[test]
1774    fn weak_loop_handle() {
1775        let mut event_loop: EventLoop<()> = EventLoop::try_new().unwrap();
1776        let weak_handle1 = event_loop.handle().downgrade();
1777        let weak_handle2 = weak_handle1.clone();
1778        let weak_handle3 = weak_handle1.clone();
1779
1780        event_loop
1781            .handle()
1782            .insert_source(timer::Timer::immediate(), move |_, _, _| {
1783                // Hold and use its weak handle in the event loop's callback
1784                assert!(weak_handle1.upgrade().is_some());
1785                timer::TimeoutAction::Drop
1786            })
1787            .unwrap();
1788
1789        event_loop.handle().insert_idle(move |_| {
1790            // Hold and use its weak handle in the event loop's idle callback
1791            assert!(weak_handle2.upgrade().is_some());
1792        });
1793
1794        event_loop.dispatch(None, &mut ()).unwrap();
1795
1796        drop(event_loop);
1797
1798        // Test if memory is freed
1799        assert!(weak_handle3.expired());
1800    }
1801}