1use std::cell::{Cell, RefCell};
2use std::fmt::Debug;
3use std::rc::{Rc, Weak};
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7use std::{io, slice};
8
9#[cfg(feature = "block_on")]
10use std::future::Future;
11
12#[cfg(unix)]
13use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
14#[cfg(windows)]
15use std::os::windows::io::{AsHandle, AsRawHandle, AsSocket as AsFd, BorrowedHandle, RawHandle};
16
17use polling::Poller;
18use tracing::{trace, warn};
19
20use crate::list::{SourceEntry, SourceList};
21use crate::sources::{Dispatcher, EventSource, Idle, IdleDispatcher};
22use crate::sys::{Notifier, PollEvent};
23use crate::token::TokenInner;
24use crate::{
25 AdditionalLifecycleEventsSet, InsertError, Poll, PostAction, Readiness, Token, TokenFactory,
26};
27
28type IdleCallback<'i, Data> = Rc<RefCell<dyn IdleDispatcher<Data> + 'i>>;
29
30#[derive(Clone, Copy, Debug, PartialEq, Eq)]
37pub struct RegistrationToken {
38 inner: TokenInner,
39}
40
41impl RegistrationToken {
42 pub(crate) fn new(inner: TokenInner) -> Self {
46 Self { inner }
47 }
48}
49
50pub(crate) struct LoopInner<'l, Data> {
51 pub(crate) poll: RefCell<Poll>,
52 pub(crate) sources: RefCell<SourceList<'l, Data>>,
55 pub(crate) sources_with_additional_lifecycle_events: RefCell<AdditionalLifecycleEventsSet>,
56 idles: RefCell<Vec<IdleCallback<'l, Data>>>,
57 pending_action: Cell<PostAction>,
58}
59
60pub struct LoopHandle<'l, Data> {
66 inner: Rc<LoopInner<'l, Data>>,
67}
68
69pub struct WeakLoopHandle<'l, Data> {
71 inner: Weak<LoopInner<'l, Data>>,
72}
73
74impl<Data> Debug for LoopHandle<'_, Data> {
75 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 f.write_str("LoopHandle { ... }")
78 }
79}
80
81impl<Data> Clone for LoopHandle<'_, Data> {
85 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
86 fn clone(&self) -> Self {
87 LoopHandle {
88 inner: self.inner.clone(),
89 }
90 }
91}
92
93impl<'l, Data> LoopHandle<'l, Data> {
94 pub fn insert_source<S, F>(
102 &self,
103 source: S,
104 callback: F,
105 ) -> Result<RegistrationToken, InsertError<S>>
106 where
107 S: EventSource + 'l,
108 F: FnMut(S::Event, &mut S::Metadata, &mut Data) -> S::Ret + 'l,
109 {
110 let dispatcher = Dispatcher::new(source, callback);
111 self.register_dispatcher(dispatcher.clone())
112 .map_err(|error| InsertError {
113 error,
114 inserted: dispatcher.into_source_inner(),
115 })
116 }
117
118 #[cfg_attr(feature = "nightly_coverage", coverage(off))] pub fn register_dispatcher<S>(
125 &self,
126 dispatcher: Dispatcher<'l, S, Data>,
127 ) -> crate::Result<RegistrationToken>
128 where
129 S: EventSource + 'l,
130 {
131 let mut sources = self.inner.sources.borrow_mut();
132 let mut poll = self.inner.poll.borrow_mut();
133
134 let slot = sources.vacant_entry();
136
137 slot.source = Some(dispatcher.clone_as_event_dispatcher());
138 trace!(source = slot.token.get_id(), "Inserting new source");
139 let ret = slot.source.as_ref().unwrap().register(
140 &mut poll,
141 &mut self
142 .inner
143 .sources_with_additional_lifecycle_events
144 .borrow_mut(),
145 &mut TokenFactory::new(slot.token),
146 );
147
148 if let Err(error) = ret {
149 slot.source = None;
150 return Err(error);
151 }
152
153 Ok(RegistrationToken { inner: slot.token })
154 }
155
156 pub fn insert_idle<'i, F: FnOnce(&mut Data) + 'l + 'i>(&self, callback: F) -> Idle<'i> {
161 let mut opt_cb = Some(callback);
162 let callback = Rc::new(RefCell::new(Some(move |data: &mut Data| {
163 if let Some(cb) = opt_cb.take() {
164 cb(data);
165 }
166 })));
167 self.inner.idles.borrow_mut().push(callback.clone());
168 Idle { callback }
169 }
170
171 pub fn enable(&self, token: &RegistrationToken) -> crate::Result<()> {
177 if let &SourceEntry {
178 token: entry_token,
179 source: Some(ref source),
180 } = self.inner.sources.borrow().get(token.inner)?
181 {
182 trace!(source = entry_token.get_id(), "Registering source");
183 source.register(
184 &mut self.inner.poll.borrow_mut(),
185 &mut self
186 .inner
187 .sources_with_additional_lifecycle_events
188 .borrow_mut(),
189 &mut TokenFactory::new(entry_token),
190 )
191 } else {
192 Err(crate::Error::InvalidToken)
193 }
194 }
195
196 pub fn update(&self, token: &RegistrationToken) -> crate::Result<()> {
201 if let &SourceEntry {
202 token: entry_token,
203 source: Some(ref source),
204 } = self.inner.sources.borrow().get(token.inner)?
205 {
206 trace!(
207 source = entry_token.get_id(),
208 "Updating registration of source"
209 );
210 if !source.reregister(
211 &mut self.inner.poll.borrow_mut(),
212 &mut self
213 .inner
214 .sources_with_additional_lifecycle_events
215 .borrow_mut(),
216 &mut TokenFactory::new(entry_token),
217 )? {
218 trace!(
219 source = entry_token.get_id(),
220 "Can't update registration withing a callback, storing for later."
221 );
222 self.inner.pending_action.set(PostAction::Reregister);
224 }
225 Ok(())
226 } else {
227 Err(crate::Error::InvalidToken)
228 }
229 }
230
231 pub fn disable(&self, token: &RegistrationToken) -> crate::Result<()> {
235 if let &SourceEntry {
236 token: entry_token,
237 source: Some(ref source),
238 } = self.inner.sources.borrow().get(token.inner)?
239 {
240 if !token.inner.same_source_as(entry_token) {
241 return Err(crate::Error::InvalidToken);
243 }
244 trace!(source = entry_token.get_id(), "Unregistering source");
245 if !source.unregister(
246 &mut self.inner.poll.borrow_mut(),
247 &mut self
248 .inner
249 .sources_with_additional_lifecycle_events
250 .borrow_mut(),
251 *token,
252 )? {
253 trace!(
254 source = entry_token.get_id(),
255 "Cannot unregister source in callback, storing for later."
256 );
257 self.inner.pending_action.set(PostAction::Disable);
259 }
260 Ok(())
261 } else {
262 Err(crate::Error::InvalidToken)
263 }
264 }
265
266 pub fn remove(&self, token: RegistrationToken) {
268 if let Ok(&mut SourceEntry {
269 token: entry_token,
270 ref mut source,
271 }) = self.inner.sources.borrow_mut().get_mut(token.inner)
272 {
273 if let Some(source) = source.take() {
274 trace!(source = entry_token.get_id(), "Removing source");
275 if let Err(e) = source.unregister(
276 &mut self.inner.poll.borrow_mut(),
277 &mut self
278 .inner
279 .sources_with_additional_lifecycle_events
280 .borrow_mut(),
281 token,
282 ) {
283 warn!("Failed to unregister source from the polling system: {e:?}");
284 }
285 }
286 }
287 }
288
289 pub fn adapt_io<F: AsFd>(&self, fd: F) -> crate::Result<crate::io::Async<'l, F>> {
297 crate::io::Async::new(self.inner.clone(), fd)
298 }
299
300 pub fn downgrade(&self) -> WeakLoopHandle<'l, Data> {
324 WeakLoopHandle {
325 inner: Rc::downgrade(&self.inner),
326 }
327 }
328}
329
330impl<Data> Debug for WeakLoopHandle<'_, Data> {
331 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
332 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
333 f.write_str("WeakLoopHandle { ... }")
334 }
335}
336
337impl<Data> Clone for WeakLoopHandle<'_, Data> {
341 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
342 fn clone(&self) -> Self {
343 WeakLoopHandle {
344 inner: self.inner.clone(),
345 }
346 }
347}
348
349impl<Data> Default for WeakLoopHandle<'_, Data> {
353 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
354 fn default() -> Self {
355 WeakLoopHandle {
356 inner: Weak::default(),
357 }
358 }
359}
360
361impl<'l, Data> WeakLoopHandle<'l, Data> {
362 pub fn upgrade(&self) -> Option<LoopHandle<'l, Data>> {
366 self.inner.upgrade().map(|inner| LoopHandle { inner })
367 }
368
369 pub fn expired(&self) -> bool {
371 self.inner.strong_count() == 0
372 }
373}
374
375pub struct EventLoop<'l, Data> {
379 #[allow(dead_code)]
380 poller: Arc<Poller>,
381 handle: LoopHandle<'l, Data>,
382 signals: Arc<Signals>,
383 synthetic_events: Vec<PollEvent>,
385}
386
387impl<Data> Debug for EventLoop<'_, Data> {
388 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
389 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
390 f.write_str("EventLoop { ... }")
391 }
392}
393
394struct Signals {
396 stop: AtomicBool,
398
399 #[cfg(feature = "block_on")]
401 future_ready: AtomicBool,
402}
403
404impl<'l, Data> EventLoop<'l, Data> {
405 pub fn try_new() -> crate::Result<Self> {
409 let poll = Poll::new()?;
410 let poller = poll.poller.clone();
411 let handle = LoopHandle {
412 inner: Rc::new(LoopInner {
413 poll: RefCell::new(poll),
414 sources: RefCell::new(SourceList::new()),
415 idles: RefCell::new(Vec::new()),
416 pending_action: Cell::new(PostAction::Continue),
417 sources_with_additional_lifecycle_events: Default::default(),
418 }),
419 };
420
421 Ok(EventLoop {
422 handle,
423 signals: Arc::new(Signals {
424 stop: AtomicBool::new(false),
425 #[cfg(feature = "block_on")]
426 future_ready: AtomicBool::new(false),
427 }),
428 poller,
429 synthetic_events: vec![],
430 })
431 }
432
433 pub fn handle(&self) -> LoopHandle<'l, Data> {
435 self.handle.clone()
436 }
437
438 fn dispatch_events(
439 &mut self,
440 mut timeout: Option<Duration>,
441 data: &mut Data,
442 ) -> crate::Result<()> {
443 let now = Instant::now();
444 {
445 let mut extra_lifecycle_sources = self
446 .handle
447 .inner
448 .sources_with_additional_lifecycle_events
449 .borrow_mut();
450 let sources = &self.handle.inner.sources.borrow();
451 for source in &mut *extra_lifecycle_sources.values {
452 if let Ok(SourceEntry {
453 source: Some(disp), ..
454 }) = sources.get(source.inner)
455 {
456 if let Some((readiness, token)) = disp.before_sleep()? {
457 timeout = Some(Duration::ZERO);
459 self.synthetic_events.push(PollEvent { readiness, token });
460 }
461 } else {
462 unreachable!()
463 }
464 }
465 }
466 let events = {
467 let poll = self.handle.inner.poll.borrow();
468 loop {
469 let result = poll.poll(timeout);
470
471 match result {
472 Ok(events) => break events,
473 Err(crate::Error::IoError(err)) if err.kind() == io::ErrorKind::Interrupted => {
474 if let Some(to) = timeout {
476 let elapsed = now.elapsed();
477 if elapsed >= to {
478 return Ok(());
479 } else {
480 timeout = Some(to - elapsed);
481 }
482 }
483 }
484 Err(err) => return Err(err),
485 };
486 }
487 };
488 {
489 let mut extra_lifecycle_sources = self
490 .handle
491 .inner
492 .sources_with_additional_lifecycle_events
493 .borrow_mut();
494 if !extra_lifecycle_sources.values.is_empty() {
495 for source in &mut *extra_lifecycle_sources.values {
496 if let Ok(SourceEntry {
497 source: Some(disp), ..
498 }) = self.handle.inner.sources.borrow().get(source.inner)
499 {
500 let iter = EventIterator {
501 inner: events.iter(),
502 registration_token: *source,
503 };
504 disp.before_handle_events(iter);
505 } else {
506 unreachable!()
507 }
508 }
509 }
510 }
511
512 for event in self.synthetic_events.drain(..).chain(events) {
513 let reg_token = event.token.inner.forget_sub_id();
515
516 let opt_disp = self
517 .handle
518 .inner
519 .sources
520 .borrow()
521 .get(reg_token)
522 .ok()
523 .and_then(|entry| entry.source.clone());
524
525 if let Some(disp) = opt_disp {
526 trace!(source = reg_token.get_id(), "Dispatching events for source");
527 let mut ret = disp.process_events(event.readiness, event.token, data)?;
528
529 let pending_action = self
531 .handle
532 .inner
533 .pending_action
534 .replace(PostAction::Continue);
535 if let PostAction::Continue = ret {
536 ret = pending_action;
537 }
538
539 match ret {
540 PostAction::Reregister => {
541 trace!(
542 source = reg_token.get_id(),
543 "Postaction reregister for source"
544 );
545 disp.reregister(
546 &mut self.handle.inner.poll.borrow_mut(),
547 &mut self
548 .handle
549 .inner
550 .sources_with_additional_lifecycle_events
551 .borrow_mut(),
552 &mut TokenFactory::new(reg_token),
553 )?;
554 }
555 PostAction::Disable => {
556 trace!(
557 source = reg_token.get_id(),
558 "Postaction unregister for source"
559 );
560 disp.unregister(
561 &mut self.handle.inner.poll.borrow_mut(),
562 &mut self
563 .handle
564 .inner
565 .sources_with_additional_lifecycle_events
566 .borrow_mut(),
567 RegistrationToken::new(reg_token),
568 )?;
569 }
570 PostAction::Remove => {
571 trace!(source = reg_token.get_id(), "Postaction remove for source");
572 if let Ok(entry) = self.handle.inner.sources.borrow_mut().get_mut(reg_token)
573 {
574 entry.source = None;
575 }
576 }
577 PostAction::Continue => {}
578 }
579
580 if self
581 .handle
582 .inner
583 .sources
584 .borrow()
585 .get(reg_token)
586 .ok()
587 .map(|entry| entry.source.is_none())
588 .unwrap_or(true)
589 {
590 let mut poll = self.handle.inner.poll.borrow_mut();
592 if let Err(e) = disp.unregister(
593 &mut poll,
594 &mut self
595 .handle
596 .inner
597 .sources_with_additional_lifecycle_events
598 .borrow_mut(),
599 RegistrationToken::new(reg_token),
600 ) {
601 warn!("Failed to unregister source from the polling system: {e:?}",);
602 }
603 }
604 } else {
605 warn!(?reg_token, "Received an event for non-existent source");
606 }
607 }
608
609 Ok(())
610 }
611
612 fn dispatch_idles(&mut self, data: &mut Data) {
613 let idles = std::mem::take(&mut *self.handle.inner.idles.borrow_mut());
614 for idle in idles {
615 idle.borrow_mut().dispatch(data);
616 }
617 }
618
619 pub fn dispatch<D: Into<Option<Duration>>>(
628 &mut self,
629 timeout: D,
630 data: &mut Data,
631 ) -> crate::Result<()> {
632 self.dispatch_events(timeout.into(), data)?;
633 self.dispatch_idles(data);
634
635 Ok(())
636 }
637
638 pub fn get_signal(&self) -> LoopSignal {
642 LoopSignal {
643 signal: self.signals.clone(),
644 notifier: self.handle.inner.poll.borrow().notifier(),
645 }
646 }
647
648 pub fn run<F, D: Into<Option<Duration>>>(
658 &mut self,
659 timeout: D,
660 data: &mut Data,
661 mut cb: F,
662 ) -> crate::Result<()>
663 where
664 F: FnMut(&mut Data),
665 {
666 let timeout = timeout.into();
667 self.signals.stop.store(false, Ordering::Release);
668 while !self.signals.stop.load(Ordering::Acquire) {
669 self.dispatch(timeout, data)?;
670 cb(data);
671 }
672 Ok(())
673 }
674
675 #[cfg(feature = "block_on")]
683 pub fn block_on<R>(
684 &mut self,
685 future: impl Future<Output = R>,
686 data: &mut Data,
687 mut cb: impl FnMut(&mut Data),
688 ) -> crate::Result<Option<R>> {
689 use std::task::{Context, Poll, Wake, Waker};
690
691 struct EventLoopWaker(LoopSignal);
693
694 impl Wake for EventLoopWaker {
695 fn wake(self: Arc<Self>) {
696 self.0.signal.future_ready.store(true, Ordering::Release);
698 self.0.notifier.notify().ok();
699 }
700
701 fn wake_by_ref(self: &Arc<Self>) {
702 self.0.signal.future_ready.store(true, Ordering::Release);
704 self.0.notifier.notify().ok();
705 }
706 }
707
708 pin_utils::pin_mut!(future);
710
711 let waker = {
713 let handle = EventLoopWaker(self.get_signal());
714
715 Waker::from(Arc::new(handle))
716 };
717 let mut context = Context::from_waker(&waker);
718
719 let mut output = None;
721
722 self.signals.stop.store(false, Ordering::Release);
723 self.signals.future_ready.store(true, Ordering::Release);
724
725 while !self.signals.stop.load(Ordering::Acquire) {
726 if self.signals.future_ready.swap(false, Ordering::AcqRel) {
728 if let Poll::Ready(result) = future.as_mut().poll(&mut context) {
730 output = Some(result);
731 break;
732 }
733 }
734
735 self.dispatch_events(None, data)?;
737 self.dispatch_idles(data);
738 cb(data);
739 }
740
741 Ok(output)
742 }
743}
744
745#[cfg(unix)]
746impl<Data> AsRawFd for EventLoop<'_, Data> {
747 fn as_raw_fd(&self) -> RawFd {
755 self.poller.as_raw_fd()
756 }
757}
758
759#[cfg(unix)]
760impl<Data> AsFd for EventLoop<'_, Data> {
761 fn as_fd(&self) -> BorrowedFd<'_> {
768 self.poller.as_fd()
769 }
770}
771
772#[cfg(windows)]
773impl<Data> AsRawHandle for EventLoop<'_, Data> {
774 fn as_raw_handle(&self) -> RawHandle {
775 self.poller.as_raw_handle()
776 }
777}
778
779#[cfg(windows)]
780impl<Data> AsHandle for EventLoop<'_, Data> {
781 fn as_handle(&self) -> BorrowedHandle<'_> {
782 self.poller.as_handle()
783 }
784}
785
786#[derive(Clone, Debug)]
787pub struct EventIterator<'a> {
795 inner: slice::Iter<'a, PollEvent>,
796 registration_token: RegistrationToken,
797}
798
799impl Iterator for EventIterator<'_> {
800 type Item = (Readiness, Token);
801
802 fn next(&mut self) -> Option<Self::Item> {
803 for next in self.inner.by_ref() {
804 if next
805 .token
806 .inner
807 .same_source_as(self.registration_token.inner)
808 {
809 return Some((next.readiness, next.token));
810 }
811 }
812 None
813 }
814}
815
816#[derive(Clone)]
819pub struct LoopSignal {
820 signal: Arc<Signals>,
821 notifier: Notifier,
822}
823
824impl Debug for LoopSignal {
825 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
826 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
827 f.write_str("LoopSignal { ... }")
828 }
829}
830
831impl LoopSignal {
832 pub fn stop(&self) {
839 self.signal.stop.store(true, Ordering::Release);
840 }
841
842 pub fn wakeup(&self) {
849 self.notifier.notify().ok();
850 }
851}
852
853#[cfg(test)]
854mod tests {
855 use std::{cell::Cell, rc::Rc, time::Duration};
856
857 use crate::{
858 channel::{channel, Channel},
859 ping::*,
860 timer, EventIterator, EventSource, Poll, PostAction, Readiness, RegistrationToken, Token,
861 TokenFactory,
862 };
863
864 #[cfg(unix)]
865 use crate::{generic::Generic, Dispatcher, Interest, Mode};
866
867 use super::EventLoop;
868
869 #[test]
870 fn dispatch_idle() {
871 let mut event_loop = EventLoop::try_new().unwrap();
872
873 let mut dispatched = false;
874
875 event_loop.handle().insert_idle(|d| {
876 *d = true;
877 });
878
879 event_loop
880 .dispatch(Some(Duration::ZERO), &mut dispatched)
881 .unwrap();
882
883 assert!(dispatched);
884 }
885
886 #[test]
887 fn cancel_idle() {
888 let mut event_loop = EventLoop::try_new().unwrap();
889
890 let mut dispatched = false;
891
892 let handle = event_loop.handle();
893 let idle = handle.insert_idle(move |d| {
894 *d = true;
895 });
896
897 idle.cancel();
898
899 event_loop
900 .dispatch(Duration::ZERO, &mut dispatched)
901 .unwrap();
902
903 assert!(!dispatched);
904 }
905
906 #[test]
907 fn wakeup() {
908 let mut event_loop = EventLoop::try_new().unwrap();
909
910 let signal = event_loop.get_signal();
911
912 ::std::thread::spawn(move || {
913 ::std::thread::sleep(Duration::from_millis(500));
914 signal.wakeup();
915 });
916
917 event_loop.dispatch(None, &mut ()).unwrap();
919 }
920
921 #[test]
922 fn wakeup_stop() {
923 let mut event_loop = EventLoop::try_new().unwrap();
924
925 let signal = event_loop.get_signal();
926
927 ::std::thread::spawn(move || {
928 ::std::thread::sleep(Duration::from_millis(500));
929 signal.stop();
930 signal.wakeup();
931 });
932
933 event_loop.run(None, &mut (), |_| {}).unwrap();
935 }
936
937 #[test]
938 fn additional_events() {
939 let mut event_loop: EventLoop<'_, Lock> = EventLoop::try_new().unwrap();
940 let mut lock = Lock {
941 lock: Rc::new((
942 Cell::new(false),
944 Cell::new(0),
946 Cell::new(0),
949 )),
950 };
951 let (sender, channel) = channel();
952 let token = event_loop
953 .handle()
954 .insert_source(
955 LockingSource {
956 channel,
957 lock: lock.clone(),
958 },
959 |_, _, lock| {
960 lock.lock();
961 lock.unlock();
962 },
963 )
964 .unwrap();
965 sender.send(()).unwrap();
966
967 event_loop.dispatch(None, &mut lock).unwrap();
968 assert_eq!(lock.lock.1.get(), 2);
970 assert_eq!(lock.lock.2.get(), 1);
972 event_loop.handle().disable(&token).unwrap();
973 event_loop
974 .dispatch(Some(Duration::ZERO), &mut lock)
975 .unwrap();
976 assert_eq!(lock.lock.1.get(), 2);
977
978 event_loop.handle().enable(&token).unwrap();
979 event_loop
980 .dispatch(Some(Duration::ZERO), &mut lock)
981 .unwrap();
982 assert_eq!(lock.lock.1.get(), 3);
983 event_loop.handle().remove(token);
984 event_loop
985 .dispatch(Some(Duration::ZERO), &mut lock)
986 .unwrap();
987 assert_eq!(lock.lock.1.get(), 3);
988 assert_eq!(lock.lock.2.get(), 1);
989
990 #[derive(Clone)]
991 struct Lock {
992 lock: Rc<(Cell<bool>, Cell<u32>, Cell<u32>)>,
993 }
994 impl Lock {
995 fn lock(&self) {
996 if self.lock.0.get() {
997 panic!();
998 }
999 self.lock.1.set(self.lock.1.get() + 1);
1001 self.lock.0.set(true)
1002 }
1003 fn unlock(&self) {
1004 if !self.lock.0.get() {
1005 panic!();
1006 }
1007 self.lock.0.set(false);
1008 }
1009 }
1010 struct LockingSource {
1011 channel: Channel<()>,
1012 lock: Lock,
1013 }
1014 impl EventSource for LockingSource {
1015 type Event = <Channel<()> as EventSource>::Event;
1016
1017 type Metadata = <Channel<()> as EventSource>::Metadata;
1018
1019 type Ret = <Channel<()> as EventSource>::Ret;
1020
1021 type Error = <Channel<()> as EventSource>::Error;
1022
1023 fn process_events<F>(
1024 &mut self,
1025 readiness: Readiness,
1026 token: Token,
1027 callback: F,
1028 ) -> Result<PostAction, Self::Error>
1029 where
1030 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
1031 {
1032 self.channel.process_events(readiness, token, callback)
1033 }
1034
1035 fn register(
1036 &mut self,
1037 poll: &mut Poll,
1038 token_factory: &mut TokenFactory,
1039 ) -> crate::Result<()> {
1040 self.channel.register(poll, token_factory)
1041 }
1042
1043 fn reregister(
1044 &mut self,
1045 poll: &mut Poll,
1046 token_factory: &mut TokenFactory,
1047 ) -> crate::Result<()> {
1048 self.channel.reregister(poll, token_factory)
1049 }
1050
1051 fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
1052 self.channel.unregister(poll)
1053 }
1054
1055 const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = true;
1056
1057 fn before_sleep(&mut self) -> crate::Result<Option<(Readiness, Token)>> {
1058 self.lock.lock();
1059 Ok(None)
1060 }
1061
1062 fn before_handle_events(&mut self, events: EventIterator) {
1063 let events_count = events.count();
1064 let lock = &self.lock.lock;
1065 lock.2.set(lock.2.get() + events_count as u32);
1066 self.lock.unlock();
1067 }
1068 }
1069 }
1070 #[test]
1071 fn default_additional_events() {
1072 let (sender, channel) = channel();
1073 let mut test_source = NoopWithDefaultHandlers { channel };
1074 let mut event_loop = EventLoop::try_new().unwrap();
1075 event_loop
1076 .handle()
1077 .insert_source(Box::new(&mut test_source), |_, _, _| {})
1078 .unwrap();
1079 sender.send(()).unwrap();
1080
1081 event_loop.dispatch(None, &mut ()).unwrap();
1082 struct NoopWithDefaultHandlers {
1083 channel: Channel<()>,
1084 }
1085 impl EventSource for NoopWithDefaultHandlers {
1086 type Event = <Channel<()> as EventSource>::Event;
1087
1088 type Metadata = <Channel<()> as EventSource>::Metadata;
1089
1090 type Ret = <Channel<()> as EventSource>::Ret;
1091
1092 type Error = <Channel<()> as EventSource>::Error;
1093
1094 fn process_events<F>(
1095 &mut self,
1096 readiness: Readiness,
1097 token: Token,
1098 callback: F,
1099 ) -> Result<PostAction, Self::Error>
1100 where
1101 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
1102 {
1103 self.channel.process_events(readiness, token, callback)
1104 }
1105
1106 fn register(
1107 &mut self,
1108 poll: &mut Poll,
1109 token_factory: &mut TokenFactory,
1110 ) -> crate::Result<()> {
1111 self.channel.register(poll, token_factory)
1112 }
1113
1114 fn reregister(
1115 &mut self,
1116 poll: &mut Poll,
1117 token_factory: &mut TokenFactory,
1118 ) -> crate::Result<()> {
1119 self.channel.reregister(poll, token_factory)
1120 }
1121
1122 fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
1123 self.channel.unregister(poll)
1124 }
1125
1126 const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = true;
1127 }
1128 }
1129
1130 #[test]
1131 fn additional_events_synthetic() {
1132 let mut event_loop: EventLoop<'_, Lock> = EventLoop::try_new().unwrap();
1133 let mut lock = Lock {
1134 lock: Rc::new(Cell::new(false)),
1135 };
1136 event_loop
1137 .handle()
1138 .insert_source(
1139 InstantWakeupLockingSource {
1140 lock: lock.clone(),
1141 token: None,
1142 },
1143 |_, _, lock| {
1144 lock.lock();
1145 lock.unlock();
1146 },
1147 )
1148 .unwrap();
1149
1150 event_loop.dispatch(None, &mut lock).unwrap();
1152 #[derive(Clone)]
1153 struct Lock {
1154 lock: Rc<Cell<bool>>,
1155 }
1156 impl Lock {
1157 fn lock(&self) {
1158 if self.lock.get() {
1159 panic!();
1160 }
1161 self.lock.set(true)
1162 }
1163 fn unlock(&self) {
1164 if !self.lock.get() {
1165 panic!();
1166 }
1167 self.lock.set(false);
1168 }
1169 }
1170 struct InstantWakeupLockingSource {
1171 lock: Lock,
1172 token: Option<Token>,
1173 }
1174 impl EventSource for InstantWakeupLockingSource {
1175 type Event = ();
1176
1177 type Metadata = ();
1178
1179 type Ret = ();
1180
1181 type Error = <Channel<()> as EventSource>::Error;
1182
1183 fn process_events<F>(
1184 &mut self,
1185 _: Readiness,
1186 token: Token,
1187 mut callback: F,
1188 ) -> Result<PostAction, Self::Error>
1189 where
1190 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
1191 {
1192 assert_eq!(token, self.token.unwrap());
1193 callback((), &mut ());
1194 Ok(PostAction::Continue)
1195 }
1196
1197 fn register(
1198 &mut self,
1199 _: &mut Poll,
1200 token_factory: &mut TokenFactory,
1201 ) -> crate::Result<()> {
1202 self.token = Some(token_factory.token());
1203 Ok(())
1204 }
1205
1206 fn reregister(&mut self, _: &mut Poll, _: &mut TokenFactory) -> crate::Result<()> {
1207 unreachable!()
1208 }
1209
1210 fn unregister(&mut self, _: &mut Poll) -> crate::Result<()> {
1211 unreachable!()
1212 }
1213
1214 const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = true;
1215
1216 fn before_sleep(&mut self) -> crate::Result<Option<(Readiness, Token)>> {
1217 self.lock.lock();
1218 Ok(Some((Readiness::EMPTY, self.token.unwrap())))
1219 }
1220
1221 fn before_handle_events(&mut self, _: EventIterator) {
1222 self.lock.unlock();
1223 }
1224 }
1225 }
1226
1227 #[cfg(unix)]
1228 #[test]
1229 fn insert_bad_source() {
1230 use std::mem::ManuallyDrop;
1231 use std::os::unix::io::{AsFd, FromRawFd, OwnedFd};
1232
1233 struct LeakedFd(ManuallyDrop<OwnedFd>);
1234
1235 impl AsFd for LeakedFd {
1236 fn as_fd(&self) -> std::os::unix::prelude::BorrowedFd<'_> {
1237 self.0.as_fd()
1238 }
1239 }
1240
1241 let event_loop = EventLoop::<()>::try_new().unwrap();
1242 let fd = LeakedFd(ManuallyDrop::new(unsafe {
1243 std::os::unix::io::OwnedFd::from_raw_fd(420)
1244 }));
1245 let ret = event_loop.handle().insert_source(
1246 crate::sources::generic::Generic::new(fd, Interest::READ, Mode::Level),
1247 |_, _, _| Ok(PostAction::Continue),
1248 );
1249 assert!(ret.is_err());
1250 }
1251
1252 #[test]
1253 fn invalid_token() {
1254 let (_ping, source) = crate::sources::ping::make_ping().unwrap();
1255
1256 let event_loop = EventLoop::<()>::try_new().unwrap();
1257 let handle = event_loop.handle();
1258 let reg_token = handle.insert_source(source, |_, _, _| {}).unwrap();
1259 handle.remove(reg_token);
1260
1261 let ret = handle.enable(®_token);
1262 assert!(ret.is_err());
1263 }
1264
1265 #[cfg(unix)]
1266 #[test]
1267 fn insert_source_no_interest() {
1268 use rustix::pipe::pipe;
1269
1270 let (read, _write) = pipe().unwrap();
1272
1273 let source = crate::sources::generic::Generic::new(read, Interest::EMPTY, Mode::Level);
1274 let dispatcher = Dispatcher::new(source, |_, _, _| Ok(PostAction::Continue));
1275
1276 let event_loop = EventLoop::<()>::try_new().unwrap();
1277 let handle = event_loop.handle();
1278 let ret = handle.register_dispatcher(dispatcher.clone());
1279
1280 if let Ok(token) = ret {
1281 handle.remove(token);
1283 } else {
1284 panic!();
1286 }
1287 }
1288
1289 #[test]
1290 fn disarm_rearm() {
1291 let mut event_loop = EventLoop::<bool>::try_new().unwrap();
1292 let (ping, ping_source) = make_ping().unwrap();
1293
1294 let ping_token = event_loop
1295 .handle()
1296 .insert_source(ping_source, |(), &mut (), dispatched| {
1297 *dispatched = true;
1298 })
1299 .unwrap();
1300
1301 ping.ping();
1302 let mut dispatched = false;
1303 event_loop
1304 .dispatch(Duration::ZERO, &mut dispatched)
1305 .unwrap();
1306 assert!(dispatched);
1307
1308 ping.ping();
1310 event_loop.handle().disable(&ping_token).unwrap();
1311 let mut dispatched = false;
1312 event_loop
1313 .dispatch(Duration::ZERO, &mut dispatched)
1314 .unwrap();
1315 assert!(!dispatched);
1316
1317 event_loop.handle().enable(&ping_token).unwrap();
1319 let mut dispatched = false;
1320 event_loop
1321 .dispatch(Duration::ZERO, &mut dispatched)
1322 .unwrap();
1323 assert!(dispatched);
1324 }
1325
1326 #[test]
1327 fn multiple_tokens() {
1328 struct DoubleSource {
1329 ping1: PingSource,
1330 ping2: PingSource,
1331 }
1332
1333 impl crate::EventSource for DoubleSource {
1334 type Event = u32;
1335 type Metadata = ();
1336 type Ret = ();
1337 type Error = PingError;
1338
1339 fn process_events<F>(
1340 &mut self,
1341 readiness: Readiness,
1342 token: Token,
1343 mut callback: F,
1344 ) -> Result<PostAction, Self::Error>
1345 where
1346 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
1347 {
1348 self.ping1
1349 .process_events(readiness, token, |(), &mut ()| callback(1, &mut ()))?;
1350 self.ping2
1351 .process_events(readiness, token, |(), &mut ()| callback(2, &mut ()))?;
1352 Ok(PostAction::Continue)
1353 }
1354
1355 fn register(
1356 &mut self,
1357 poll: &mut Poll,
1358 token_factory: &mut TokenFactory,
1359 ) -> crate::Result<()> {
1360 self.ping1.register(poll, token_factory)?;
1361 self.ping2.register(poll, token_factory)?;
1362 Ok(())
1363 }
1364
1365 fn reregister(
1366 &mut self,
1367 poll: &mut Poll,
1368 token_factory: &mut TokenFactory,
1369 ) -> crate::Result<()> {
1370 self.ping1.reregister(poll, token_factory)?;
1371 self.ping2.reregister(poll, token_factory)?;
1372 Ok(())
1373 }
1374
1375 fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
1376 self.ping1.unregister(poll)?;
1377 self.ping2.unregister(poll)?;
1378 Ok(())
1379 }
1380 }
1381
1382 let mut event_loop = EventLoop::<u32>::try_new().unwrap();
1383
1384 let (ping1, source1) = make_ping().unwrap();
1385 let (ping2, source2) = make_ping().unwrap();
1386
1387 let source = DoubleSource {
1388 ping1: source1,
1389 ping2: source2,
1390 };
1391
1392 event_loop
1393 .handle()
1394 .insert_source(source, |i, _, d| {
1395 eprintln!("Dispatching {}", i);
1396 *d += i
1397 })
1398 .unwrap();
1399
1400 let mut dispatched = 0;
1401 ping1.ping();
1402 event_loop
1403 .dispatch(Duration::ZERO, &mut dispatched)
1404 .unwrap();
1405 assert_eq!(dispatched, 1);
1406
1407 dispatched = 0;
1408 ping2.ping();
1409 event_loop
1410 .dispatch(Duration::ZERO, &mut dispatched)
1411 .unwrap();
1412 assert_eq!(dispatched, 2);
1413
1414 dispatched = 0;
1415 ping1.ping();
1416 ping2.ping();
1417 event_loop
1418 .dispatch(Duration::ZERO, &mut dispatched)
1419 .unwrap();
1420 assert_eq!(dispatched, 3);
1421 }
1422
1423 #[cfg(unix)]
1424 #[test]
1425 fn change_interests() {
1426 use rustix::io::write;
1427 use rustix::net::{recv, socketpair, AddressFamily, RecvFlags, SocketFlags, SocketType};
1428 let mut event_loop = EventLoop::<bool>::try_new().unwrap();
1429
1430 let (sock1, sock2) = socketpair(
1431 AddressFamily::UNIX,
1432 SocketType::STREAM,
1433 SocketFlags::empty(),
1434 None, )
1436 .unwrap();
1437
1438 let source = Generic::new(sock1, Interest::READ, Mode::Level);
1439 let dispatcher = Dispatcher::new(source, |_, fd, dispatched| {
1440 *dispatched = true;
1441 let mut buf = [0u8; 32];
1443 loop {
1444 match recv(&*fd, &mut buf, RecvFlags::DONTWAIT) {
1445 Ok((0, _)) => break, Ok(_) => {}
1447 Err(e) => {
1448 let e: std::io::Error = e.into();
1449 if e.kind() == std::io::ErrorKind::WouldBlock {
1450 break;
1451 } else {
1453 return Err(e);
1455 }
1456 }
1457 }
1458 }
1459 Ok(PostAction::Continue)
1460 });
1461
1462 let sock_token_1 = event_loop
1463 .handle()
1464 .register_dispatcher(dispatcher.clone())
1465 .unwrap();
1466
1467 let mut dispatched = false;
1469 event_loop
1470 .dispatch(Duration::ZERO, &mut dispatched)
1471 .unwrap();
1472 assert!(!dispatched);
1473
1474 write(&sock2, &[1, 2, 3]).unwrap();
1476 dispatched = false;
1477 event_loop
1478 .dispatch(Duration::ZERO, &mut dispatched)
1479 .unwrap();
1480 assert!(dispatched);
1481
1482 dispatched = false;
1484 event_loop
1485 .dispatch(Duration::ZERO, &mut dispatched)
1486 .unwrap();
1487 assert!(!dispatched);
1488
1489 dispatcher.as_source_mut().interest = Interest::WRITE;
1491 event_loop.handle().update(&sock_token_1).unwrap();
1492
1493 dispatched = false;
1495 event_loop
1496 .dispatch(Duration::ZERO, &mut dispatched)
1497 .unwrap();
1498 assert!(dispatched);
1499
1500 dispatcher.as_source_mut().interest = Interest::READ;
1502 event_loop.handle().update(&sock_token_1).unwrap();
1503
1504 dispatched = false;
1506 event_loop
1507 .dispatch(Duration::ZERO, &mut dispatched)
1508 .unwrap();
1509 assert!(!dispatched);
1510 }
1511
1512 #[test]
1513 fn kill_source() {
1514 let mut event_loop = EventLoop::<Option<RegistrationToken>>::try_new().unwrap();
1515
1516 let handle = event_loop.handle();
1517 let (ping, ping_source) = make_ping().unwrap();
1518 let ping_token = event_loop
1519 .handle()
1520 .insert_source(ping_source, move |(), &mut (), opt_src| {
1521 if let Some(src) = opt_src.take() {
1522 handle.remove(src);
1523 }
1524 })
1525 .unwrap();
1526
1527 ping.ping();
1528
1529 let mut opt_src = Some(ping_token);
1530
1531 event_loop.dispatch(Duration::ZERO, &mut opt_src).unwrap();
1532
1533 assert!(opt_src.is_none());
1534 }
1535
1536 #[test]
1537 fn non_static_data() {
1538 use std::sync::mpsc;
1539
1540 let (sender, receiver) = mpsc::channel();
1541
1542 {
1543 struct RefSender<'a>(&'a mpsc::Sender<()>);
1544 let mut ref_sender = RefSender(&sender);
1545
1546 let mut event_loop = EventLoop::<RefSender<'_>>::try_new().unwrap();
1547 let (ping, ping_source) = make_ping().unwrap();
1548 let _ping_token = event_loop
1549 .handle()
1550 .insert_source(ping_source, |_, _, ref_sender| {
1551 ref_sender.0.send(()).unwrap();
1552 })
1553 .unwrap();
1554
1555 ping.ping();
1556
1557 event_loop
1558 .dispatch(Duration::ZERO, &mut ref_sender)
1559 .unwrap();
1560 }
1561
1562 receiver.recv().unwrap();
1563 drop(sender);
1565 }
1566
1567 #[cfg(feature = "block_on")]
1568 #[test]
1569 fn block_on_test() {
1570 use crate::sources::timer::TimeoutFuture;
1571 use std::time::Duration;
1572
1573 let mut evl = EventLoop::<()>::try_new().unwrap();
1574
1575 let mut data = 22;
1576 let timeout = {
1577 let data = &mut data;
1578 let evl_handle = evl.handle();
1579
1580 async move {
1581 TimeoutFuture::from_duration(&evl_handle, Duration::from_secs(2)).await;
1582 *data = 32;
1583 11
1584 }
1585 };
1586
1587 let result = evl.block_on(timeout, &mut (), |&mut ()| {}).unwrap();
1588 assert_eq!(result, Some(11));
1589 assert_eq!(data, 32);
1590 }
1591
1592 #[cfg(feature = "block_on")]
1593 #[test]
1594 fn block_on_early_cancel() {
1595 use crate::sources::timer;
1596 use std::time::Duration;
1597
1598 let mut evl = EventLoop::<()>::try_new().unwrap();
1599
1600 let mut data = 22;
1601 let timeout = {
1602 let data = &mut data;
1603 let evl_handle = evl.handle();
1604
1605 async move {
1606 timer::TimeoutFuture::from_duration(&evl_handle, Duration::from_secs(2)).await;
1607 *data = 32;
1608 11
1609 }
1610 };
1611
1612 let timer_source = timer::Timer::from_duration(Duration::from_secs(1));
1613 let handle = evl.get_signal();
1614 let _timer_token = evl
1615 .handle()
1616 .insert_source(timer_source, move |_, _, _| {
1617 handle.stop();
1618 timer::TimeoutAction::Drop
1619 })
1620 .unwrap();
1621
1622 let result = evl.block_on(timeout, &mut (), |&mut ()| {}).unwrap();
1623 assert_eq!(result, None);
1624 assert_eq!(data, 22);
1625 }
1626
1627 #[test]
1628 fn reuse() {
1629 use crate::sources::timer;
1630 use std::sync::{Arc, Mutex};
1631 use std::time::{Duration, Instant};
1632
1633 let mut evl = EventLoop::<RegistrationToken>::try_new().unwrap();
1634 let handle = evl.handle();
1635
1636 let data = Arc::new(Mutex::new(1));
1637 let data_cloned = data.clone();
1638
1639 let timer_source = timer::Timer::from_duration(Duration::from_secs(1));
1640 let mut first_timer_token = evl
1641 .handle()
1642 .insert_source(timer_source, move |_, _, own_token| {
1643 handle.remove(*own_token);
1644 let data_cloned = data_cloned.clone();
1645 let _ = handle.insert_source(timer::Timer::immediate(), move |_, _, _| {
1646 *data_cloned.lock().unwrap() = 2;
1647 timer::TimeoutAction::Drop
1648 });
1649 timer::TimeoutAction::Drop
1650 })
1651 .unwrap();
1652
1653 let now = Instant::now();
1654 loop {
1655 evl.dispatch(Some(Duration::from_secs(3)), &mut first_timer_token)
1656 .unwrap();
1657 if Instant::now().duration_since(now) > Duration::from_secs(3) {
1658 break;
1659 }
1660 }
1661
1662 assert_eq!(*data.lock().unwrap(), 2);
1663 }
1664
1665 #[test]
1666 fn drop_of_subsource() {
1667 struct WithSubSource {
1668 token: Option<Token>,
1669 }
1670
1671 impl crate::EventSource for WithSubSource {
1672 type Event = ();
1673 type Metadata = ();
1674 type Ret = ();
1675 type Error = crate::Error;
1676 const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = true;
1677
1678 fn process_events<F>(
1679 &mut self,
1680 _: Readiness,
1681 _: Token,
1682 mut callback: F,
1683 ) -> Result<PostAction, Self::Error>
1684 where
1685 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
1686 {
1687 callback((), &mut ());
1688 Ok(PostAction::Remove)
1690 }
1691
1692 fn register(&mut self, _: &mut Poll, fact: &mut TokenFactory) -> crate::Result<()> {
1693 fact.token();
1695 fact.token();
1696 self.token = Some(fact.token());
1697 Ok(())
1698 }
1699
1700 fn reregister(&mut self, _: &mut Poll, _: &mut TokenFactory) -> crate::Result<()> {
1701 Ok(())
1702 }
1703
1704 fn unregister(&mut self, _: &mut Poll) -> crate::Result<()> {
1705 Ok(())
1706 }
1707
1708 fn before_sleep(&mut self) -> crate::Result<Option<(Readiness, Token)>> {
1710 Ok(self.token.map(|token| {
1711 (
1712 Readiness {
1713 readable: true,
1714 writable: false,
1715 error: false,
1716 },
1717 token,
1718 )
1719 }))
1720 }
1721 }
1722
1723 let mut evl = EventLoop::<bool>::try_new().unwrap();
1725 evl.handle()
1726 .insert_source(WithSubSource { token: None }, |_, _, ran| {
1727 *ran = true;
1728 })
1729 .unwrap();
1730
1731 let mut ran = false;
1732
1733 evl.dispatch(Some(Duration::ZERO), &mut ran).unwrap();
1734
1735 assert!(ran);
1736 }
1737
1738 struct DummySource;
1740
1741 impl crate::EventSource for DummySource {
1742 type Event = ();
1743 type Metadata = ();
1744 type Ret = ();
1745 type Error = crate::Error;
1746
1747 fn process_events<F>(
1748 &mut self,
1749 _: Readiness,
1750 _: Token,
1751 mut callback: F,
1752 ) -> Result<PostAction, Self::Error>
1753 where
1754 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
1755 {
1756 callback((), &mut ());
1757 Ok(PostAction::Continue)
1758 }
1759
1760 fn register(&mut self, _: &mut Poll, _: &mut TokenFactory) -> crate::Result<()> {
1761 Ok(())
1762 }
1763
1764 fn reregister(&mut self, _: &mut Poll, _: &mut TokenFactory) -> crate::Result<()> {
1765 Ok(())
1766 }
1767
1768 fn unregister(&mut self, _: &mut Poll) -> crate::Result<()> {
1769 Ok(())
1770 }
1771 }
1772
1773 #[test]
1774 fn weak_loop_handle() {
1775 let mut event_loop: EventLoop<()> = EventLoop::try_new().unwrap();
1776 let weak_handle1 = event_loop.handle().downgrade();
1777 let weak_handle2 = weak_handle1.clone();
1778 let weak_handle3 = weak_handle1.clone();
1779
1780 event_loop
1781 .handle()
1782 .insert_source(timer::Timer::immediate(), move |_, _, _| {
1783 assert!(weak_handle1.upgrade().is_some());
1785 timer::TimeoutAction::Drop
1786 })
1787 .unwrap();
1788
1789 event_loop.handle().insert_idle(move |_| {
1790 assert!(weak_handle2.upgrade().is_some());
1792 });
1793
1794 event_loop.dispatch(None, &mut ()).unwrap();
1795
1796 drop(event_loop);
1797
1798 assert!(weak_handle3.expired());
1800 }
1801}