1use std::{
2 cell::{Ref, RefCell, RefMut},
3 ops::{BitOr, BitOrAssign},
4 rc::Rc,
5};
6
7use tracing::trace;
8
9pub use crate::loop_logic::EventIterator;
10use crate::{sys::TokenFactory, Poll, Readiness, RegistrationToken, Token};
11
12pub mod channel;
13#[cfg(feature = "executor")]
14#[cfg_attr(docsrs, doc(cfg(feature = "executor")))]
15pub mod futures;
16pub mod generic;
17pub mod ping;
18#[cfg(all(target_os = "linux", feature = "signals"))]
19#[cfg_attr(docsrs, doc(cfg(target_os = "linux")))]
20pub mod signals;
21#[cfg(feature = "stream")]
22pub mod stream;
23pub mod timer;
24pub mod transient;
25
26#[derive(Copy, Clone, Debug, PartialEq, Eq)]
40pub enum PostAction {
41 Continue,
43 Reregister,
45 Disable,
49 Remove,
53}
54
55impl BitOr for PostAction {
57 type Output = Self;
58
59 fn bitor(self, rhs: Self) -> Self::Output {
60 if matches!(self, x if x == rhs) {
61 self
62 } else {
63 Self::Reregister
64 }
65 }
66}
67
68impl BitOrAssign for PostAction {
70 fn bitor_assign(&mut self, rhs: Self) {
71 if *self != rhs {
72 *self = Self::Reregister;
73 }
74 }
75}
76
77pub trait EventSource {
105 type Event;
107 type Metadata;
115 type Ret;
123 type Error: Into<Box<dyn std::error::Error + Sync + Send>>;
126
127 fn process_events<F>(
138 &mut self,
139 readiness: Readiness,
140 token: Token,
141 callback: F,
142 ) -> Result<PostAction, Self::Error>
143 where
144 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret;
145
146 fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()>;
154
155 fn reregister(
161 &mut self,
162 poll: &mut Poll,
163 token_factory: &mut TokenFactory,
164 ) -> crate::Result<()>;
165
166 fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()>;
171
172 const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = false;
176 fn before_sleep(&mut self) -> crate::Result<Option<(Readiness, Token)>> {
190 Ok(None)
191 }
192 #[allow(unused_variables)]
203 fn before_handle_events(&mut self, events: EventIterator<'_>) {}
204}
205
206impl<T: EventSource> EventSource for Box<T> {
209 type Event = T::Event;
210 type Metadata = T::Metadata;
211 type Ret = T::Ret;
212 type Error = T::Error;
213
214 fn process_events<F>(
215 &mut self,
216 readiness: Readiness,
217 token: Token,
218 callback: F,
219 ) -> Result<PostAction, Self::Error>
220 where
221 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
222 {
223 T::process_events(&mut **self, readiness, token, callback)
224 }
225
226 fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
227 T::register(&mut **self, poll, token_factory)
228 }
229
230 fn reregister(
231 &mut self,
232 poll: &mut Poll,
233 token_factory: &mut TokenFactory,
234 ) -> crate::Result<()> {
235 T::reregister(&mut **self, poll, token_factory)
236 }
237
238 fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
239 T::unregister(&mut **self, poll)
240 }
241
242 const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = T::NEEDS_EXTRA_LIFECYCLE_EVENTS;
243
244 fn before_sleep(&mut self) -> crate::Result<Option<(Readiness, Token)>> {
245 T::before_sleep(&mut **self)
246 }
247
248 fn before_handle_events(&mut self, events: EventIterator) {
249 T::before_handle_events(&mut **self, events)
250 }
251}
252
253impl<T: EventSource> EventSource for &mut T {
257 type Event = T::Event;
258 type Metadata = T::Metadata;
259 type Ret = T::Ret;
260 type Error = T::Error;
261
262 fn process_events<F>(
263 &mut self,
264 readiness: Readiness,
265 token: Token,
266 callback: F,
267 ) -> Result<PostAction, Self::Error>
268 where
269 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
270 {
271 T::process_events(&mut **self, readiness, token, callback)
272 }
273
274 fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
275 T::register(&mut **self, poll, token_factory)
276 }
277
278 fn reregister(
279 &mut self,
280 poll: &mut Poll,
281 token_factory: &mut TokenFactory,
282 ) -> crate::Result<()> {
283 T::reregister(&mut **self, poll, token_factory)
284 }
285
286 fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
287 T::unregister(&mut **self, poll)
288 }
289
290 const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = T::NEEDS_EXTRA_LIFECYCLE_EVENTS;
291
292 fn before_sleep(&mut self) -> crate::Result<Option<(Readiness, Token)>> {
293 T::before_sleep(&mut **self)
294 }
295
296 fn before_handle_events(&mut self, events: EventIterator) {
297 T::before_handle_events(&mut **self, events)
298 }
299}
300
301pub(crate) struct DispatcherInner<S, F> {
302 source: S,
303 callback: F,
304 needs_additional_lifecycle_events: bool,
305}
306
307impl<Data, S, F> EventDispatcher<Data> for RefCell<DispatcherInner<S, F>>
308where
309 S: EventSource,
310 F: FnMut(S::Event, &mut S::Metadata, &mut Data) -> S::Ret,
311{
312 fn process_events(
313 &self,
314 readiness: Readiness,
315 token: Token,
316 data: &mut Data,
317 ) -> crate::Result<PostAction> {
318 let mut disp = self.borrow_mut();
319 let DispatcherInner {
320 ref mut source,
321 ref mut callback,
322 ..
323 } = *disp;
324 trace!(
325 "Processing events for source type {}",
326 std::any::type_name::<S>()
327 );
328 source
329 .process_events(readiness, token, |event, meta| callback(event, meta, data))
330 .map_err(|e| crate::Error::OtherError(e.into()))
331 }
332
333 fn register(
334 &self,
335 poll: &mut Poll,
336 additional_lifecycle_register: &mut AdditionalLifecycleEventsSet,
337 token_factory: &mut TokenFactory,
338 ) -> crate::Result<()> {
339 let mut this = self.borrow_mut();
340
341 if this.needs_additional_lifecycle_events {
342 additional_lifecycle_register.register(token_factory.registration_token());
343 }
344 this.source.register(poll, token_factory)
345 }
346
347 fn reregister(
348 &self,
349 poll: &mut Poll,
350 additional_lifecycle_register: &mut AdditionalLifecycleEventsSet,
351 token_factory: &mut TokenFactory,
352 ) -> crate::Result<bool> {
353 if let Ok(mut me) = self.try_borrow_mut() {
354 me.source.reregister(poll, token_factory)?;
355 if me.needs_additional_lifecycle_events {
356 additional_lifecycle_register.register(token_factory.registration_token());
357 }
358 Ok(true)
359 } else {
360 Ok(false)
361 }
362 }
363
364 fn unregister(
365 &self,
366 poll: &mut Poll,
367 additional_lifecycle_register: &mut AdditionalLifecycleEventsSet,
368 registration_token: RegistrationToken,
369 ) -> crate::Result<bool> {
370 if let Ok(mut me) = self.try_borrow_mut() {
371 me.source.unregister(poll)?;
372 if me.needs_additional_lifecycle_events {
373 additional_lifecycle_register.unregister(registration_token);
374 }
375 Ok(true)
376 } else {
377 Ok(false)
378 }
379 }
380
381 fn before_sleep(&self) -> crate::Result<Option<(Readiness, Token)>> {
382 let mut disp = self.borrow_mut();
383 let DispatcherInner { ref mut source, .. } = *disp;
384 source.before_sleep()
385 }
386
387 fn before_handle_events(&self, events: EventIterator<'_>) {
388 let mut disp = self.borrow_mut();
389 let DispatcherInner { ref mut source, .. } = *disp;
390 source.before_handle_events(events);
391 }
392}
393
394pub(crate) trait EventDispatcher<Data> {
395 fn process_events(
396 &self,
397 readiness: Readiness,
398 token: Token,
399 data: &mut Data,
400 ) -> crate::Result<PostAction>;
401
402 fn register(
403 &self,
404 poll: &mut Poll,
405 additional_lifecycle_register: &mut AdditionalLifecycleEventsSet,
406 token_factory: &mut TokenFactory,
407 ) -> crate::Result<()>;
408
409 fn reregister(
410 &self,
411 poll: &mut Poll,
412 additional_lifecycle_register: &mut AdditionalLifecycleEventsSet,
413 token_factory: &mut TokenFactory,
414 ) -> crate::Result<bool>;
415
416 fn unregister(
417 &self,
418 poll: &mut Poll,
419 additional_lifecycle_register: &mut AdditionalLifecycleEventsSet,
420 registration_token: RegistrationToken,
421 ) -> crate::Result<bool>;
422
423 fn before_sleep(&self) -> crate::Result<Option<(Readiness, Token)>>;
424 fn before_handle_events(&self, events: EventIterator<'_>);
425}
426
427#[derive(Default)]
428pub(crate) struct AdditionalLifecycleEventsSet {
430 pub(crate) values: Vec<RegistrationToken>,
432}
433
434impl AdditionalLifecycleEventsSet {
435 fn register(&mut self, token: RegistrationToken) {
436 self.values.push(token)
437 }
438
439 fn unregister(&mut self, token: RegistrationToken) {
440 self.values.retain(|it| it != &token)
441 }
442}
443
444trait ErasedDispatcher<'a, S, Data> {
446 fn as_source_ref(&self) -> Ref<S>;
447 fn as_source_mut(&self) -> RefMut<S>;
448 fn into_source_inner(self: Rc<Self>) -> S;
449 fn into_event_dispatcher(self: Rc<Self>) -> Rc<dyn EventDispatcher<Data> + 'a>;
450}
451
452impl<'a, S, Data, F> ErasedDispatcher<'a, S, Data> for RefCell<DispatcherInner<S, F>>
453where
454 S: EventSource + 'a,
455 F: FnMut(S::Event, &mut S::Metadata, &mut Data) -> S::Ret + 'a,
456{
457 fn as_source_ref(&self) -> Ref<S> {
458 Ref::map(self.borrow(), |inner| &inner.source)
459 }
460
461 fn as_source_mut(&self) -> RefMut<S> {
462 RefMut::map(self.borrow_mut(), |inner| &mut inner.source)
463 }
464
465 fn into_source_inner(self: Rc<Self>) -> S {
466 if let Ok(ref_cell) = Rc::try_unwrap(self) {
467 ref_cell.into_inner().source
468 } else {
469 panic!("Dispatcher is still registered");
470 }
471 }
472
473 fn into_event_dispatcher(self: Rc<Self>) -> Rc<dyn EventDispatcher<Data> + 'a>
474 where
475 S: 'a,
476 {
477 self as Rc<dyn EventDispatcher<Data> + 'a>
478 }
479}
480
481pub struct Dispatcher<'a, S, Data>(Rc<dyn ErasedDispatcher<'a, S, Data> + 'a>);
487
488impl<S, Data> std::fmt::Debug for Dispatcher<'_, S, Data> {
489 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
490 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
491 f.write_str("Dispatcher { ... }")
492 }
493}
494
495impl<'a, S, Data> Dispatcher<'a, S, Data>
496where
497 S: EventSource + 'a,
498{
499 pub fn new<F>(source: S, callback: F) -> Self
503 where
504 F: FnMut(S::Event, &mut S::Metadata, &mut Data) -> S::Ret + 'a,
505 {
506 Dispatcher(Rc::new(RefCell::new(DispatcherInner {
507 source,
508 callback,
509 needs_additional_lifecycle_events: S::NEEDS_EXTRA_LIFECYCLE_EVENTS,
510 })))
511 }
512
513 pub fn as_source_ref(&self) -> Ref<S> {
522 self.0.as_source_ref()
523 }
524
525 pub fn as_source_mut(&self) -> RefMut<S> {
534 self.0.as_source_mut()
535 }
536
537 pub fn into_source_inner(self) -> S {
543 self.0.into_source_inner()
544 }
545
546 pub(crate) fn clone_as_event_dispatcher(&self) -> Rc<dyn EventDispatcher<Data> + 'a> {
547 Rc::clone(&self.0).into_event_dispatcher()
548 }
549}
550
551impl<'a, S, Data> Clone for Dispatcher<'a, S, Data> {
552 fn clone(&self) -> Dispatcher<'a, S, Data> {
553 Dispatcher(Rc::clone(&self.0))
554 }
555}
556
557pub struct Idle<'i> {
562 pub(crate) callback: Rc<RefCell<dyn CancellableIdle + 'i>>,
563}
564
565impl std::fmt::Debug for Idle<'_> {
566 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
567 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
568 f.write_str("Idle { ... }")
569 }
570}
571
572impl Idle<'_> {
573 pub fn cancel(self) {
575 self.callback.borrow_mut().cancel();
576 }
577}
578
579pub(crate) trait CancellableIdle {
580 fn cancel(&mut self);
581}
582
583impl<F> CancellableIdle for Option<F> {
584 fn cancel(&mut self) {
585 self.take();
586 }
587}
588
589pub(crate) trait IdleDispatcher<Data> {
590 fn dispatch(&mut self, data: &mut Data);
591}
592
593impl<Data, F> IdleDispatcher<Data> for Option<F>
594where
595 F: FnMut(&mut Data),
596{
597 fn dispatch(&mut self, data: &mut Data) {
598 if let Some(callabck) = self.as_mut() {
599 callabck(data);
600 }
601 }
602}
603
604#[cfg(test)]
605mod tests {
606 use std::time::Duration;
607
608 use crate::{ping::make_ping, EventLoop};
609
610 #[test]
612 fn test_boxed_source() {
613 let mut fired = false;
614
615 let (pinger, source) = make_ping().unwrap();
616 let boxed = Box::new(source);
617
618 let mut event_loop = EventLoop::try_new().unwrap();
619 let handle = event_loop.handle();
620
621 let token = handle
622 .insert_source(boxed, |_, _, fired| *fired = true)
623 .unwrap();
624
625 pinger.ping();
626
627 event_loop
628 .dispatch(Duration::new(0, 0), &mut fired)
629 .unwrap();
630
631 assert!(fired);
632 fired = false;
633
634 handle.update(&token).unwrap();
635
636 pinger.ping();
637
638 event_loop
639 .dispatch(Duration::new(0, 0), &mut fired)
640 .unwrap();
641
642 assert!(fired);
643 fired = false;
644
645 handle.remove(token);
646
647 event_loop
648 .dispatch(Duration::new(0, 0), &mut fired)
649 .unwrap();
650
651 assert!(!fired);
652 }
653
654 #[test]
656 fn test_mut_ref_source() {
657 let mut fired = false;
658
659 let (pinger, mut source) = make_ping().unwrap();
660 let source_ref = &mut source;
661
662 let mut event_loop = EventLoop::try_new().unwrap();
663 let handle = event_loop.handle();
664
665 let token = handle
666 .insert_source(source_ref, |_, _, fired| *fired = true)
667 .unwrap();
668
669 pinger.ping();
670
671 event_loop
672 .dispatch(Duration::new(0, 0), &mut fired)
673 .unwrap();
674
675 assert!(fired);
676 fired = false;
677
678 handle.update(&token).unwrap();
679
680 pinger.ping();
681
682 event_loop
683 .dispatch(Duration::new(0, 0), &mut fired)
684 .unwrap();
685
686 assert!(fired);
687 fired = false;
688
689 handle.remove(token);
690
691 event_loop
692 .dispatch(Duration::new(0, 0), &mut fired)
693 .unwrap();
694
695 assert!(!fired);
696 }
697
698 #[test]
700 fn post_action_combine() {
701 use super::PostAction::*;
702 assert_eq!(Continue | Continue, Continue);
703 assert_eq!(Continue | Reregister, Reregister);
704 assert_eq!(Continue | Disable, Reregister);
705 assert_eq!(Continue | Remove, Reregister);
706
707 assert_eq!(Reregister | Continue, Reregister);
708 assert_eq!(Reregister | Reregister, Reregister);
709 assert_eq!(Reregister | Disable, Reregister);
710 assert_eq!(Reregister | Remove, Reregister);
711
712 assert_eq!(Disable | Continue, Reregister);
713 assert_eq!(Disable | Reregister, Reregister);
714 assert_eq!(Disable | Disable, Disable);
715 assert_eq!(Disable | Remove, Reregister);
716
717 assert_eq!(Remove | Continue, Reregister);
718 assert_eq!(Remove | Reregister, Reregister);
719 assert_eq!(Remove | Disable, Reregister);
720 assert_eq!(Remove | Remove, Remove);
721 }
722
723 #[test]
725 fn post_action_combine_assign() {
726 use super::PostAction::*;
727
728 let mut action = Continue;
729 action |= Continue;
730 assert_eq!(action, Continue);
731
732 let mut action = Continue;
733 action |= Reregister;
734 assert_eq!(action, Reregister);
735
736 let mut action = Continue;
737 action |= Disable;
738 assert_eq!(action, Reregister);
739
740 let mut action = Continue;
741 action |= Remove;
742 assert_eq!(action, Reregister);
743
744 let mut action = Reregister;
745 action |= Continue;
746 assert_eq!(action, Reregister);
747
748 let mut action = Reregister;
749 action |= Reregister;
750 assert_eq!(action, Reregister);
751
752 let mut action = Reregister;
753 action |= Disable;
754 assert_eq!(action, Reregister);
755
756 let mut action = Reregister;
757 action |= Remove;
758 assert_eq!(action, Reregister);
759
760 let mut action = Disable;
761 action |= Continue;
762 assert_eq!(action, Reregister);
763
764 let mut action = Disable;
765 action |= Reregister;
766 assert_eq!(action, Reregister);
767
768 let mut action = Disable;
769 action |= Disable;
770 assert_eq!(action, Disable);
771
772 let mut action = Disable;
773 action |= Remove;
774 assert_eq!(action, Reregister);
775
776 let mut action = Remove;
777 action |= Continue;
778 assert_eq!(action, Reregister);
779
780 let mut action = Remove;
781 action |= Reregister;
782 assert_eq!(action, Reregister);
783
784 let mut action = Remove;
785 action |= Disable;
786 assert_eq!(action, Reregister);
787
788 let mut action = Remove;
789 action |= Remove;
790 assert_eq!(action, Remove);
791 }
792}