calloop/sources/
transient.rs

1//! Wrapper for a transient Calloop event source.
2//!
3//! If you have high level event source that you expect to remain in the event
4//! loop indefinitely, and another event source nested inside that one that you
5//! expect to require removal or disabling from time to time, this module can
6//! handle it for you.
7
8/// A [`TransientSource`] wraps a Calloop event source and manages its
9/// registration. A user of this type only needs to perform the usual Calloop
10/// calls (`process_events()` and `*register()`) and the return value of
11/// [`process_events()`](crate::EventSource::process_events).
12///
13/// Rather than needing to check for the full set of
14/// [`PostAction`](crate::PostAction) values returned from `process_events()`,
15/// you can just check for `Continue` or `Reregister` and pass that back out
16/// through your own `process_events()` implementation. In your registration
17/// functions, you then only need to call the same function on this type ie.
18/// `register()` inside `register()` etc.
19///
20/// For example, say you have a source that contains a channel along with some
21/// other logic. If the channel's sending end has been dropped, it needs to be
22/// removed from the loop. So to manage this, you use this in your struct:
23///
24/// ```none,actually-rust-but-see-https://github.com/rust-lang/rust/issues/63193
25/// struct CompositeSource {
26///    // Event source for channel.
27///    mpsc_receiver: TransientSource<calloop::channel::Channel<T>>,
28///
29///    // Any other fields go here...
30/// }
31/// ```
32///
33/// To create the transient source, you can simply use the `Into`
34/// implementation:
35///
36/// ```none,actually-rust-but-see-https://github.com/rust-lang/rust/issues/63193
37/// let (sender, source) = channel();
38/// let mpsc_receiver: TransientSource<Channel> = source.into();
39/// ```
40///
41/// (If you want to start off with an empty `TransientSource`, you can just use
42/// `Default::default()` instead.)
43///
44/// `TransientSource` implements [`EventSource`](crate::EventSource) and passes
45/// through `process_events()` calls, so in the parent's `process_events()`
46/// implementation you can just do this:
47///
48/// ```none,actually-rust-but-see-https://github.com/rust-lang/rust/issues/63193
49/// fn process_events<F>(
50///     &mut self,
51///     readiness: calloop::Readiness,
52///     token: calloop::Token,
53///     callback: F,
54/// ) -> Result<calloop::PostAction, Self::Error>
55/// where
56///     F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
57/// {
58///     let channel_return = self.mpsc_receiver.process_events(readiness, token, callback)?;
59///
60///     // Perform other logic here...
61///
62///     Ok(channel_return)
63/// }
64/// ```
65///
66/// Note that:
67///
68///   - You can call `process_events()` on the `TransientSource<Channel>` even
69///     if the channel has been unregistered and dropped. All that will happen
70///     is that you won't get any events from it.
71///
72///   - The [`PostAction`](crate::PostAction) returned from `process_events()`
73///     will only ever be `PostAction::Continue` or `PostAction::Reregister`.
74///     You will still need to combine this with the result of any other sources
75///     (transient or not).
76///
77/// Once you return `channel_return` from your `process_events()` method (and
78/// assuming it propagates all the way up to the event loop itself through any
79/// other event sources), the event loop might call `reregister()` on your
80/// source. All your source has to do is:
81///
82/// ```none,actually-rust-but-see-https://github.com/rust-lang/rust/issues/63193
83/// fn reregister(
84///     &mut self,
85///     poll: &mut calloop::Poll,
86///     token_factory: &mut calloop::TokenFactory,
87/// ) -> crate::Result<()> {
88///     self.mpsc_receiver.reregister(poll, token_factory)?;
89///
90///     // Other registration actions...
91///
92///     Ok(())
93/// }
94/// ```
95///
96/// The `TransientSource` will take care of updating the registration of the
97/// inner source, even if it actually needs to be unregistered or initially
98/// registered.
99///
100/// ## Replacing or removing `TransientSource`s
101///
102/// Not properly removing or replacing `TransientSource`s can cause spurious
103/// wakeups of the event loop, and in some cases can leak file descriptors or
104/// fail to free entries in Calloop's internal data structures. No unsoundness
105/// or undefined behaviour will result, but leaking file descriptors can result
106/// in errors or panics.
107///
108/// If you want to remove a source before it returns `PostAction::Remove`, use
109/// the [`TransientSource::remove()`] method. If you want to replace a source
110/// with another one, use the [`TransientSource::replace()`] method. Either of
111/// these may be called at any time during processing or from outside the event
112/// loop. Both require either returning `PostAction::Reregister` from the
113/// `process_event()` call that does this, or reregistering the event source
114/// some other way eg. via the top-level loop handle.
115///
116/// If, instead, you directly assign a new source to the variable holding the
117/// `TransientSource`, the inner source will be dropped before it can be
118/// unregistered. For example:
119///
120/// ```none,actually-rust-but-see-https://github.com/rust-lang/rust/issues/63193
121/// self.mpsc_receiver = Default::default();
122/// self.mpsc_receiver = new_channel.into();
123/// ```
124#[derive(Debug, Default)]
125pub struct TransientSource<T> {
126    state: TransientSourceState<T>,
127}
128
129/// This is the internal state of the [`TransientSource`], as a separate type so
130/// it's not exposed.
131#[derive(Debug)]
132enum TransientSourceState<T> {
133    /// The source should be kept in the loop.
134    Keep(T),
135    /// The source needs to be registered with the loop.
136    Register(T),
137    /// The source needs to be disabled but kept.
138    Disable(T),
139    /// The source needs to be removed from the loop.
140    Remove(T),
141    /// The source is being replaced by another. For most API purposes (eg.
142    /// `map()`), this will be treated as the `Register` state enclosing the new
143    /// source.
144    Replace {
145        /// The new source, which will be registered and used from now on.
146        new: T,
147        /// The old source, which will be unregistered and dropped.
148        old: T,
149    },
150    /// The source has been removed from the loop and dropped (this might also
151    /// be observed if there is a panic while changing states).
152    None,
153}
154
155impl<T> Default for TransientSourceState<T> {
156    fn default() -> Self {
157        Self::None
158    }
159}
160
161impl<T> TransientSourceState<T> {
162    /// If a caller needs to flag the contained source for removal or
163    /// registration, we need to replace the enum variant safely. This requires
164    /// having a `None` value in there temporarily while we do the swap.
165    ///
166    /// If the variant is `None` the value will not change and `replacer` will
167    /// not be called. If the variant is `Replace` then `replacer` will be
168    /// called **on the new source**, which may cause the old source to leak
169    /// registration in the event loop if it has not yet been unregistered.
170    ///
171    /// The `replacer` function here is expected to be one of the enum variant
172    /// constructors eg. `replace(TransientSource::Remove)`.
173    fn replace_state<F>(&mut self, replacer: F)
174    where
175        F: FnOnce(T) -> Self,
176    {
177        *self = match std::mem::take(self) {
178            Self::Keep(source)
179            | Self::Register(source)
180            | Self::Remove(source)
181            | Self::Disable(source)
182            | Self::Replace { new: source, .. } => replacer(source),
183            Self::None => return,
184        };
185    }
186}
187
188impl<T> TransientSource<T> {
189    /// Apply a function to the enclosed source, if it exists and is not about
190    /// to be removed.
191    pub fn map<F, U>(&mut self, f: F) -> Option<U>
192    where
193        F: FnOnce(&mut T) -> U,
194    {
195        match &mut self.state {
196            TransientSourceState::Keep(source)
197            | TransientSourceState::Register(source)
198            | TransientSourceState::Disable(source)
199            | TransientSourceState::Replace { new: source, .. } => Some(f(source)),
200            TransientSourceState::Remove(_) | TransientSourceState::None => None,
201        }
202    }
203
204    /// Returns `true` if there is no wrapped event source.
205    pub fn is_none(&self) -> bool {
206        matches!(self.state, TransientSourceState::None)
207    }
208
209    /// Removes the wrapped event source from the event loop and this wrapper.
210    ///
211    /// If this is called from outside of the event loop, you will need to wake
212    /// up the event loop for any changes to take place. If it is called from
213    /// within the event loop, you must return `PostAction::Reregister` from
214    /// your own event source's `process_events()`, and the source will be
215    /// unregistered as needed after it exits.
216    pub fn remove(&mut self) {
217        self.state.replace_state(TransientSourceState::Remove);
218    }
219
220    /// Replace the currently wrapped source with the given one.  No more events
221    /// will be generated from the old source after this point. The old source
222    /// will not be dropped immediately, it will be kept so that it can be
223    /// deregistered.
224    ///
225    /// If this is called from outside of the event loop, you will need to wake
226    /// up the event loop for any changes to take place. If it is called from
227    /// within the event loop, you must return `PostAction::Reregister` from
228    /// your own event source's `process_events()`, and the sources will be
229    /// registered and unregistered as needed after it exits.
230    pub fn replace(&mut self, new: T) {
231        self.state
232            .replace_state(|old| TransientSourceState::Replace { new, old });
233    }
234}
235
236impl<T: crate::EventSource> From<T> for TransientSource<T> {
237    fn from(source: T) -> Self {
238        Self {
239            state: TransientSourceState::Register(source),
240        }
241    }
242}
243
244impl<T: crate::EventSource> crate::EventSource for TransientSource<T> {
245    type Event = T::Event;
246    type Metadata = T::Metadata;
247    type Ret = T::Ret;
248    type Error = T::Error;
249
250    fn process_events<F>(
251        &mut self,
252        readiness: crate::Readiness,
253        token: crate::Token,
254        callback: F,
255    ) -> Result<crate::PostAction, Self::Error>
256    where
257        F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
258    {
259        let reregister = if let TransientSourceState::Keep(source) = &mut self.state {
260            let child_post_action = source.process_events(readiness, token, callback)?;
261
262            match child_post_action {
263                // Nothing needs to change.
264                crate::PostAction::Continue => false,
265
266                // Our child source needs re-registration, therefore this
267                // wrapper needs re-registration.
268                crate::PostAction::Reregister => true,
269
270                // If our nested source needs to be removed or disabled, we need
271                // to swap it out for the "Remove" or "Disable" variant.
272                crate::PostAction::Disable => {
273                    self.state.replace_state(TransientSourceState::Disable);
274                    true
275                }
276
277                crate::PostAction::Remove => {
278                    self.state.replace_state(TransientSourceState::Remove);
279                    true
280                }
281            }
282        } else {
283            false
284        };
285
286        let post_action = if reregister {
287            crate::PostAction::Reregister
288        } else {
289            crate::PostAction::Continue
290        };
291
292        Ok(post_action)
293    }
294
295    fn register(
296        &mut self,
297        poll: &mut crate::Poll,
298        token_factory: &mut crate::TokenFactory,
299    ) -> crate::Result<()> {
300        match &mut self.state {
301            TransientSourceState::Keep(source) => {
302                source.register(poll, token_factory)?;
303            }
304            TransientSourceState::Register(source)
305            | TransientSourceState::Disable(source)
306            | TransientSourceState::Replace { new: source, .. } => {
307                source.register(poll, token_factory)?;
308                self.state.replace_state(TransientSourceState::Keep);
309                // Drops the disposed source in the Replace case.
310            }
311            TransientSourceState::Remove(_source) => {
312                self.state.replace_state(|_| TransientSourceState::None);
313            }
314            TransientSourceState::None => (),
315        }
316        Ok(())
317    }
318
319    fn reregister(
320        &mut self,
321        poll: &mut crate::Poll,
322        token_factory: &mut crate::TokenFactory,
323    ) -> crate::Result<()> {
324        match &mut self.state {
325            TransientSourceState::Keep(source) => source.reregister(poll, token_factory)?,
326            TransientSourceState::Register(source) => {
327                source.register(poll, token_factory)?;
328                self.state.replace_state(TransientSourceState::Keep);
329            }
330            TransientSourceState::Disable(source) => {
331                source.unregister(poll)?;
332            }
333            TransientSourceState::Remove(source) => {
334                source.unregister(poll)?;
335                self.state.replace_state(|_| TransientSourceState::None);
336            }
337            TransientSourceState::Replace { new, old } => {
338                old.unregister(poll)?;
339                new.register(poll, token_factory)?;
340                self.state.replace_state(TransientSourceState::Keep);
341                // Drops 'dispose'.
342            }
343            TransientSourceState::None => (),
344        }
345        Ok(())
346    }
347
348    fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
349        match &mut self.state {
350            TransientSourceState::Keep(source)
351            | TransientSourceState::Register(source)
352            | TransientSourceState::Disable(source) => source.unregister(poll)?,
353            TransientSourceState::Remove(source) => {
354                source.unregister(poll)?;
355                self.state.replace_state(|_| TransientSourceState::None);
356            }
357            TransientSourceState::Replace { new, old } => {
358                old.unregister(poll)?;
359                new.unregister(poll)?;
360                self.state.replace_state(TransientSourceState::Register);
361            }
362            TransientSourceState::None => (),
363        }
364        Ok(())
365    }
366}
367
368#[cfg(test)]
369mod tests {
370    use super::*;
371    use crate::{
372        channel::{channel, Channel, Event},
373        ping::{make_ping, PingSource},
374        Dispatcher, EventSource, PostAction,
375    };
376    use std::{
377        rc::Rc,
378        sync::atomic::{AtomicBool, Ordering},
379        time::Duration,
380    };
381
382    #[test]
383    fn test_transient_drop() {
384        // A test source that sets a flag when it's dropped.
385        struct TestSource<'a> {
386            dropped: &'a AtomicBool,
387            ping: PingSource,
388        }
389
390        impl<'a> Drop for TestSource<'a> {
391            fn drop(&mut self) {
392                self.dropped.store(true, Ordering::Relaxed)
393            }
394        }
395
396        impl<'a> crate::EventSource for TestSource<'a> {
397            type Event = ();
398            type Metadata = ();
399            type Ret = ();
400            type Error = Box<dyn std::error::Error + Sync + Send>;
401
402            fn process_events<F>(
403                &mut self,
404                readiness: crate::Readiness,
405                token: crate::Token,
406                callback: F,
407            ) -> Result<crate::PostAction, Self::Error>
408            where
409                F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
410            {
411                self.ping.process_events(readiness, token, callback)?;
412                Ok(PostAction::Remove)
413            }
414
415            fn register(
416                &mut self,
417                poll: &mut crate::Poll,
418                token_factory: &mut crate::TokenFactory,
419            ) -> crate::Result<()> {
420                self.ping.register(poll, token_factory)
421            }
422
423            fn reregister(
424                &mut self,
425                poll: &mut crate::Poll,
426                token_factory: &mut crate::TokenFactory,
427            ) -> crate::Result<()> {
428                self.ping.reregister(poll, token_factory)
429            }
430
431            fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
432                self.ping.unregister(poll)
433            }
434        }
435
436        // Test that the inner source is actually dropped when it asks to be
437        // removed from the loop, while the TransientSource remains. We use two
438        // flags for this:
439        // - fired: should be set only when the inner event source has an event
440        // - dropped: set by the drop handler for the inner source (it's an
441        //   AtomicBool becaues it requires a longer lifetime than the fired
442        //   flag)
443        let mut fired = false;
444        let dropped = false.into();
445
446        // The inner source that should be dropped after the first loop run.
447        let (pinger, ping) = make_ping().unwrap();
448        let inner = TestSource {
449            dropped: &dropped,
450            ping,
451        };
452
453        // The TransientSource wrapper.
454        let outer: TransientSource<_> = inner.into();
455
456        let mut event_loop = crate::EventLoop::try_new().unwrap();
457        let handle = event_loop.handle();
458
459        let _token = handle
460            .insert_source(outer, |_, _, fired| {
461                *fired = true;
462            })
463            .unwrap();
464
465        // First loop run: the ping generates an event for the inner source.
466        pinger.ping();
467
468        event_loop.dispatch(Duration::ZERO, &mut fired).unwrap();
469
470        assert!(fired);
471        assert!(dropped.load(Ordering::Relaxed));
472
473        // Second loop run: the ping does nothing because the receiver has been
474        // dropped.
475        fired = false;
476
477        pinger.ping();
478
479        event_loop.dispatch(Duration::ZERO, &mut fired).unwrap();
480        assert!(!fired);
481    }
482
483    #[test]
484    fn test_transient_passthrough() {
485        // Test that event processing works when a source is nested inside a
486        // TransientSource. In particular, we want to ensure that the final
487        // event is received even if it corresponds to that same event source
488        // returning `PostAction::Remove`.
489        let (sender, receiver) = channel();
490        let outer: TransientSource<_> = receiver.into();
491
492        let mut event_loop = crate::EventLoop::try_new().unwrap();
493        let handle = event_loop.handle();
494
495        // Our callback puts the receied events in here for us to check later.
496        let mut msg_queue = vec![];
497
498        let _token = handle
499            .insert_source(outer, |msg, _, queue: &mut Vec<_>| {
500                queue.push(msg);
501            })
502            .unwrap();
503
504        // Send some data and drop the sender. We specifically want to test that
505        // we get the "closed" message.
506        sender.send(0u32).unwrap();
507        sender.send(1u32).unwrap();
508        sender.send(2u32).unwrap();
509        sender.send(3u32).unwrap();
510        drop(sender);
511
512        // Run loop once to process events.
513        event_loop.dispatch(Duration::ZERO, &mut msg_queue).unwrap();
514
515        assert!(matches!(
516            msg_queue.as_slice(),
517            &[
518                Event::Msg(0u32),
519                Event::Msg(1u32),
520                Event::Msg(2u32),
521                Event::Msg(3u32),
522                Event::Closed
523            ]
524        ));
525    }
526
527    #[test]
528    fn test_transient_map() {
529        struct IdSource {
530            id: u32,
531            ping: PingSource,
532        }
533
534        impl EventSource for IdSource {
535            type Event = u32;
536            type Metadata = ();
537            type Ret = ();
538            type Error = Box<dyn std::error::Error + Sync + Send>;
539
540            fn process_events<F>(
541                &mut self,
542                readiness: crate::Readiness,
543                token: crate::Token,
544                mut callback: F,
545            ) -> Result<PostAction, Self::Error>
546            where
547                F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
548            {
549                let id = self.id;
550                self.ping
551                    .process_events(readiness, token, |_, md| callback(id, md))?;
552
553                let action = if self.id > 2 {
554                    PostAction::Remove
555                } else {
556                    PostAction::Continue
557                };
558
559                Ok(action)
560            }
561
562            fn register(
563                &mut self,
564                poll: &mut crate::Poll,
565                token_factory: &mut crate::TokenFactory,
566            ) -> crate::Result<()> {
567                self.ping.register(poll, token_factory)
568            }
569
570            fn reregister(
571                &mut self,
572                poll: &mut crate::Poll,
573                token_factory: &mut crate::TokenFactory,
574            ) -> crate::Result<()> {
575                self.ping.reregister(poll, token_factory)
576            }
577
578            fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
579                self.ping.unregister(poll)
580            }
581        }
582
583        struct WrapperSource(TransientSource<IdSource>);
584
585        impl EventSource for WrapperSource {
586            type Event = <IdSource as EventSource>::Event;
587            type Metadata = <IdSource as EventSource>::Metadata;
588            type Ret = <IdSource as EventSource>::Ret;
589            type Error = <IdSource as EventSource>::Error;
590
591            fn process_events<F>(
592                &mut self,
593                readiness: crate::Readiness,
594                token: crate::Token,
595                callback: F,
596            ) -> Result<PostAction, Self::Error>
597            where
598                F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
599            {
600                let action = self.0.process_events(readiness, token, callback);
601                self.0.map(|inner| inner.id += 1);
602                action
603            }
604
605            fn register(
606                &mut self,
607                poll: &mut crate::Poll,
608                token_factory: &mut crate::TokenFactory,
609            ) -> crate::Result<()> {
610                self.0.map(|inner| inner.id += 1);
611                self.0.register(poll, token_factory)
612            }
613
614            fn reregister(
615                &mut self,
616                poll: &mut crate::Poll,
617                token_factory: &mut crate::TokenFactory,
618            ) -> crate::Result<()> {
619                self.0.map(|inner| inner.id += 1);
620                self.0.reregister(poll, token_factory)
621            }
622
623            fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
624                self.0.map(|inner| inner.id += 1);
625                self.0.unregister(poll)
626            }
627        }
628
629        // To test the id later.
630        let mut id = 0;
631
632        // Create our source.
633        let (pinger, ping) = make_ping().unwrap();
634        let inner = IdSource { id, ping };
635
636        // The TransientSource wrapper.
637        let outer: TransientSource<_> = inner.into();
638
639        // The top level source.
640        let top = WrapperSource(outer);
641
642        // Create a dispatcher so we can check the source afterwards.
643        let dispatcher = Dispatcher::new(top, |got_id, _, test_id| {
644            *test_id = got_id;
645        });
646
647        let mut event_loop = crate::EventLoop::try_new().unwrap();
648        let handle = event_loop.handle();
649
650        let token = handle.register_dispatcher(dispatcher.clone()).unwrap();
651
652        // First loop run: the ping generates an event for the inner source.
653        // The ID should be 1 after the increment in register().
654        pinger.ping();
655        event_loop.dispatch(Duration::ZERO, &mut id).unwrap();
656        assert_eq!(id, 1);
657
658        // Second loop run: the ID should be 2 after the previous
659        // process_events().
660        pinger.ping();
661        event_loop.dispatch(Duration::ZERO, &mut id).unwrap();
662        assert_eq!(id, 2);
663
664        // Third loop run: the ID should be 3 after another process_events().
665        pinger.ping();
666        event_loop.dispatch(Duration::ZERO, &mut id).unwrap();
667        assert_eq!(id, 3);
668
669        // Fourth loop run: the callback is no longer called by the inner
670        // source, so our local ID is not incremented.
671        pinger.ping();
672        event_loop.dispatch(Duration::ZERO, &mut id).unwrap();
673        assert_eq!(id, 3);
674
675        // Remove the dispatcher so we can inspect the sources.
676        handle.remove(token);
677
678        let mut top_after = dispatcher.into_source_inner();
679
680        // I expect the inner source to be dropped, so the TransientSource
681        // variant is None (its version of None, not Option::None), so its map()
682        // won't call the passed-in function (hence the unreachable!()) and its
683        // return value should be Option::None.
684        assert!(top_after.0.map(|_| unreachable!()).is_none());
685    }
686
687    #[test]
688    fn test_transient_disable() {
689        // Test that disabling and enabling is handled properly.
690        struct DisablingSource(PingSource);
691
692        impl EventSource for DisablingSource {
693            type Event = ();
694            type Metadata = ();
695            type Ret = ();
696            type Error = Box<dyn std::error::Error + Sync + Send>;
697
698            fn process_events<F>(
699                &mut self,
700                readiness: crate::Readiness,
701                token: crate::Token,
702                callback: F,
703            ) -> Result<PostAction, Self::Error>
704            where
705                F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
706            {
707                self.0.process_events(readiness, token, callback)?;
708                Ok(PostAction::Disable)
709            }
710
711            fn register(
712                &mut self,
713                poll: &mut crate::Poll,
714                token_factory: &mut crate::TokenFactory,
715            ) -> crate::Result<()> {
716                self.0.register(poll, token_factory)
717            }
718
719            fn reregister(
720                &mut self,
721                poll: &mut crate::Poll,
722                token_factory: &mut crate::TokenFactory,
723            ) -> crate::Result<()> {
724                self.0.reregister(poll, token_factory)
725            }
726
727            fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
728                self.0.unregister(poll)
729            }
730        }
731
732        // Flag for checking when the source fires.
733        let mut fired = false;
734
735        // Create our source.
736        let (pinger, ping) = make_ping().unwrap();
737
738        let inner = DisablingSource(ping);
739
740        // The TransientSource wrapper.
741        let outer: TransientSource<_> = inner.into();
742
743        let mut event_loop = crate::EventLoop::try_new().unwrap();
744        let handle = event_loop.handle();
745        let token = handle
746            .insert_source(outer, |_, _, fired| {
747                *fired = true;
748            })
749            .unwrap();
750
751        // Ping here and not later, to check that disabling after an event is
752        // triggered but not processed does not discard the event.
753        pinger.ping();
754        event_loop.dispatch(Duration::ZERO, &mut fired).unwrap();
755        assert!(fired);
756
757        // Source should now be disabled.
758        pinger.ping();
759        fired = false;
760        event_loop.dispatch(Duration::ZERO, &mut fired).unwrap();
761        assert!(!fired);
762
763        // Re-enable the source.
764        handle.enable(&token).unwrap();
765
766        // Trigger another event.
767        pinger.ping();
768        fired = false;
769        event_loop.dispatch(Duration::ZERO, &mut fired).unwrap();
770        assert!(fired);
771    }
772
773    #[test]
774    fn test_transient_replace_unregister() {
775        // This is a bit of a complex test, but it essentially boils down to:
776        // how can a "parent" event source containing a TransientSource replace
777        // the "child" source without leaking the source's registration?
778
779        // First, a source that finishes immediately. This is so we cover the
780        // edge case of replacing a source as soon as it wants to be removed.
781        struct FinishImmediatelySource {
782            source: PingSource,
783            data: Option<i32>,
784            registered: bool,
785            dropped: Rc<AtomicBool>,
786        }
787
788        impl FinishImmediatelySource {
789            // The constructor passes out the drop flag so we can check that
790            // this source was or wasn't dropped.
791            fn new(source: PingSource, data: i32) -> (Self, Rc<AtomicBool>) {
792                let dropped = Rc::new(false.into());
793
794                (
795                    Self {
796                        source,
797                        data: Some(data),
798                        registered: false,
799                        dropped: Rc::clone(&dropped),
800                    },
801                    dropped,
802                )
803            }
804        }
805
806        impl EventSource for FinishImmediatelySource {
807            type Event = i32;
808            type Metadata = ();
809            type Ret = ();
810            type Error = Box<dyn std::error::Error + Sync + Send>;
811
812            fn process_events<F>(
813                &mut self,
814                readiness: crate::Readiness,
815                token: crate::Token,
816                mut callback: F,
817            ) -> Result<PostAction, Self::Error>
818            where
819                F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
820            {
821                let mut data = self.data.take();
822
823                self.source.process_events(readiness, token, |_, _| {
824                    if let Some(data) = data.take() {
825                        callback(data, &mut ())
826                    }
827                })?;
828
829                self.data = data;
830
831                Ok(if self.data.is_none() {
832                    PostAction::Remove
833                } else {
834                    PostAction::Continue
835                })
836            }
837
838            fn register(
839                &mut self,
840                poll: &mut crate::Poll,
841                token_factory: &mut crate::TokenFactory,
842            ) -> crate::Result<()> {
843                self.registered = true;
844                self.source.register(poll, token_factory)
845            }
846
847            fn reregister(
848                &mut self,
849                poll: &mut crate::Poll,
850                token_factory: &mut crate::TokenFactory,
851            ) -> crate::Result<()> {
852                self.source.reregister(poll, token_factory)
853            }
854
855            fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
856                self.registered = false;
857                self.source.unregister(poll)
858            }
859        }
860
861        // The drop handler sets a flag we can check for debugging (we want to
862        // know that the source itself was dropped), and also checks that the
863        // source was unregistered. Ultimately neither the source nor its
864        // registration should be leaked.
865
866        impl Drop for FinishImmediatelySource {
867            fn drop(&mut self) {
868                assert!(!self.registered, "source dropped while still registered");
869                self.dropped.store(true, Ordering::Relaxed);
870            }
871        }
872
873        // Our wrapper source handles detecting when the child source finishes,
874        // and replacing that child source with another one that will generate
875        // more events. This is one intended use case of the TransientSource.
876
877        struct WrapperSource {
878            current: TransientSource<FinishImmediatelySource>,
879            replacement: Option<FinishImmediatelySource>,
880            dropped: Rc<AtomicBool>,
881        }
882
883        impl WrapperSource {
884            // The constructor passes out the drop flag so we can check that
885            // this source was or wasn't dropped.
886            fn new(
887                first: FinishImmediatelySource,
888                second: FinishImmediatelySource,
889            ) -> (Self, Rc<AtomicBool>) {
890                let dropped = Rc::new(false.into());
891
892                (
893                    Self {
894                        current: first.into(),
895                        replacement: second.into(),
896                        dropped: Rc::clone(&dropped),
897                    },
898                    dropped,
899                )
900            }
901        }
902
903        impl EventSource for WrapperSource {
904            type Event = i32;
905            type Metadata = ();
906            type Ret = ();
907            type Error = Box<dyn std::error::Error + Sync + Send>;
908
909            fn process_events<F>(
910                &mut self,
911                readiness: crate::Readiness,
912                token: crate::Token,
913                mut callback: F,
914            ) -> Result<PostAction, Self::Error>
915            where
916                F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
917            {
918                // Did our event source generate an event?
919                let mut fired = false;
920
921                let post_action = self.current.process_events(readiness, token, |data, _| {
922                    callback(data, &mut ());
923                    fired = true;
924                })?;
925
926                if fired {
927                    // The event source will be unregistered after the current
928                    // process_events() iteration is finished. The replace()
929                    // method will handle doing that even while we've added a
930                    // new source.
931                    if let Some(replacement) = self.replacement.take() {
932                        self.current.replace(replacement);
933                    }
934
935                    // Parent source is responsible for flagging this, but it's
936                    // already set.
937                    assert_eq!(post_action, PostAction::Reregister);
938                }
939
940                Ok(post_action)
941            }
942
943            fn register(
944                &mut self,
945                poll: &mut crate::Poll,
946                token_factory: &mut crate::TokenFactory,
947            ) -> crate::Result<()> {
948                self.current.register(poll, token_factory)
949            }
950
951            fn reregister(
952                &mut self,
953                poll: &mut crate::Poll,
954                token_factory: &mut crate::TokenFactory,
955            ) -> crate::Result<()> {
956                self.current.reregister(poll, token_factory)
957            }
958
959            fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
960                self.current.unregister(poll)
961            }
962        }
963
964        impl Drop for WrapperSource {
965            fn drop(&mut self) {
966                self.dropped.store(true, Ordering::Relaxed);
967            }
968        }
969
970        // Construct the various nested sources - FinishImmediatelySource inside
971        // TransientSource inside WrapperSource. The numbers let us verify which
972        // event source fires first.
973        let (ping0_tx, ping0_rx) = crate::ping::make_ping().unwrap();
974        let (ping1_tx, ping1_rx) = crate::ping::make_ping().unwrap();
975        let (inner0, inner0_dropped) = FinishImmediatelySource::new(ping0_rx, 0);
976        let (inner1, inner1_dropped) = FinishImmediatelySource::new(ping1_rx, 1);
977        let (outer, outer_dropped) = WrapperSource::new(inner0, inner1);
978
979        // Now the actual test starts.
980
981        let mut event_loop: crate::EventLoop<(Option<i32>, crate::LoopSignal)> =
982            crate::EventLoop::try_new().unwrap();
983        let handle = event_loop.handle();
984        let signal = event_loop.get_signal();
985
986        // This is how we communicate with the event sources.
987        let mut context = (None, signal);
988
989        let _token = handle
990            .insert_source(outer, |data, _, (evt, sig)| {
991                *evt = Some(data);
992                sig.stop();
993            })
994            .unwrap();
995
996        // Ensure our sources fire.
997        ping0_tx.ping();
998        ping1_tx.ping();
999
1000        // Use run() rather than dispatch() because it's not strictly part of
1001        // any API contract as to how many runs of the event loop it takes to
1002        // replace the nested source.
1003        event_loop.run(None, &mut context, |_| {}).unwrap();
1004
1005        // First, make sure the inner source actually did fire.
1006        assert_eq!(context.0.take(), Some(0), "first inner source did not fire");
1007
1008        // Make sure that the outer source is still alive.
1009        assert!(
1010            !outer_dropped.load(Ordering::Relaxed),
1011            "outer source already dropped"
1012        );
1013
1014        // Make sure that the inner child source IS dropped now.
1015        assert!(
1016            inner0_dropped.load(Ordering::Relaxed),
1017            "first inner source not dropped"
1018        );
1019
1020        // Make sure that, in between the first event and second event, the
1021        // replacement child source still exists.
1022        assert!(
1023            !inner1_dropped.load(Ordering::Relaxed),
1024            "replacement inner source dropped"
1025        );
1026
1027        // Run the event loop until we get a second event.
1028        event_loop.run(None, &mut context, |_| {}).unwrap();
1029
1030        // Ensure the replacement source fired (which checks that it was
1031        // registered and is being processed by the TransientSource).
1032        assert_eq!(context.0.take(), Some(1), "replacement source did not fire");
1033    }
1034
1035    #[test]
1036    fn test_transient_remove() {
1037        // This tests that calling remove(), even before an event source has
1038        // requested its own removal, results in the event source being removed.
1039
1040        const STOP_AT: i32 = 2;
1041
1042        // A wrapper source to automate the removal of the inner source.
1043        struct WrapperSource {
1044            inner: TransientSource<Channel<i32>>,
1045        }
1046
1047        impl EventSource for WrapperSource {
1048            type Event = i32;
1049            type Metadata = ();
1050            type Ret = ();
1051            type Error = Box<dyn std::error::Error + Sync + Send>;
1052
1053            fn process_events<F>(
1054                &mut self,
1055                readiness: crate::Readiness,
1056                token: crate::Token,
1057                mut callback: F,
1058            ) -> Result<PostAction, Self::Error>
1059            where
1060                F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
1061            {
1062                let mut remove = false;
1063
1064                let mut post_action = self.inner.process_events(readiness, token, |evt, _| {
1065                    if let Event::Msg(num) = evt {
1066                        callback(num, &mut ());
1067                        remove = num >= STOP_AT;
1068                    }
1069                })?;
1070
1071                if remove {
1072                    self.inner.remove();
1073                    post_action |= PostAction::Reregister;
1074                }
1075
1076                Ok(post_action)
1077            }
1078
1079            fn register(
1080                &mut self,
1081                poll: &mut crate::Poll,
1082                token_factory: &mut crate::TokenFactory,
1083            ) -> crate::Result<()> {
1084                self.inner.register(poll, token_factory)
1085            }
1086
1087            fn reregister(
1088                &mut self,
1089                poll: &mut crate::Poll,
1090                token_factory: &mut crate::TokenFactory,
1091            ) -> crate::Result<()> {
1092                self.inner.reregister(poll, token_factory)
1093            }
1094
1095            fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
1096                self.inner.unregister(poll)
1097            }
1098        }
1099
1100        // Create our sources and loop.
1101
1102        let (sender, receiver) = channel();
1103        let wrapper = WrapperSource {
1104            inner: receiver.into(),
1105        };
1106
1107        let mut event_loop = crate::EventLoop::try_new().unwrap();
1108        let handle = event_loop.handle();
1109
1110        handle
1111            .insert_source(wrapper, |num, _, out: &mut Option<_>| {
1112                *out = Some(num);
1113            })
1114            .unwrap();
1115
1116        // Storage for callback data.
1117        let mut out = None;
1118
1119        // Send some data we expect to get callbacks for.
1120        for num in 0..=STOP_AT {
1121            sender.send(num).unwrap();
1122            event_loop.dispatch(Duration::ZERO, &mut out).unwrap();
1123            assert_eq!(out.take(), Some(num));
1124        }
1125
1126        // Now we expect the receiver to be gone.
1127        assert!(matches!(
1128            sender.send(STOP_AT + 1),
1129            Err(std::sync::mpsc::SendError { .. })
1130        ));
1131    }
1132}