1use std::cell::{Cell, RefCell};
2use std::fmt::Debug;
3use std::rc::Rc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7use std::{io, slice};
8
9#[cfg(feature = "block_on")]
10use std::future::Future;
11
12#[cfg(unix)]
13use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
14#[cfg(windows)]
15use std::os::windows::io::{AsHandle, AsRawHandle, AsSocket as AsFd, BorrowedHandle, RawHandle};
16
17use polling::Poller;
18use tracing::{trace, warn};
19
20use crate::list::{SourceEntry, SourceList};
21use crate::sources::{Dispatcher, EventSource, Idle, IdleDispatcher};
22use crate::sys::{Notifier, PollEvent};
23use crate::token::TokenInner;
24use crate::{
25 AdditionalLifecycleEventsSet, InsertError, Poll, PostAction, Readiness, Token, TokenFactory,
26};
27
28type IdleCallback<'i, Data> = Rc<RefCell<dyn IdleDispatcher<Data> + 'i>>;
29
30#[derive(Clone, Copy, Debug, PartialEq, Eq)]
37pub struct RegistrationToken {
38 inner: TokenInner,
39}
40
41impl RegistrationToken {
42 pub(crate) fn new(inner: TokenInner) -> Self {
46 Self { inner }
47 }
48}
49
50pub(crate) struct LoopInner<'l, Data> {
51 pub(crate) poll: RefCell<Poll>,
52 pub(crate) sources: RefCell<SourceList<'l, Data>>,
55 pub(crate) sources_with_additional_lifecycle_events: RefCell<AdditionalLifecycleEventsSet>,
56 idles: RefCell<Vec<IdleCallback<'l, Data>>>,
57 pending_action: Cell<PostAction>,
58}
59
60pub struct LoopHandle<'l, Data> {
66 inner: Rc<LoopInner<'l, Data>>,
67}
68
69impl<'l, Data> std::fmt::Debug for LoopHandle<'l, Data> {
70 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 f.write_str("LoopHandle { ... }")
73 }
74}
75
76impl<'l, Data> Clone for LoopHandle<'l, Data> {
77 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
78 fn clone(&self) -> Self {
79 LoopHandle {
80 inner: self.inner.clone(),
81 }
82 }
83}
84
85impl<'l, Data> LoopHandle<'l, Data> {
86 pub fn insert_source<S, F>(
94 &self,
95 source: S,
96 callback: F,
97 ) -> Result<RegistrationToken, InsertError<S>>
98 where
99 S: EventSource + 'l,
100 F: FnMut(S::Event, &mut S::Metadata, &mut Data) -> S::Ret + 'l,
101 {
102 let dispatcher = Dispatcher::new(source, callback);
103 self.register_dispatcher(dispatcher.clone())
104 .map_err(|error| InsertError {
105 error,
106 inserted: dispatcher.into_source_inner(),
107 })
108 }
109
110 #[cfg_attr(feature = "nightly_coverage", coverage(off))] pub fn register_dispatcher<S>(
117 &self,
118 dispatcher: Dispatcher<'l, S, Data>,
119 ) -> crate::Result<RegistrationToken>
120 where
121 S: EventSource + 'l,
122 {
123 let mut sources = self.inner.sources.borrow_mut();
124 let mut poll = self.inner.poll.borrow_mut();
125
126 let slot = sources.vacant_entry();
128
129 slot.source = Some(dispatcher.clone_as_event_dispatcher());
130 trace!(source = slot.token.get_id(), "Inserting new source");
131 let ret = slot.source.as_ref().unwrap().register(
132 &mut poll,
133 &mut self
134 .inner
135 .sources_with_additional_lifecycle_events
136 .borrow_mut(),
137 &mut TokenFactory::new(slot.token),
138 );
139
140 if let Err(error) = ret {
141 slot.source = None;
142 return Err(error);
143 }
144
145 Ok(RegistrationToken { inner: slot.token })
146 }
147
148 pub fn insert_idle<'i, F: FnOnce(&mut Data) + 'l + 'i>(&self, callback: F) -> Idle<'i> {
153 let mut opt_cb = Some(callback);
154 let callback = Rc::new(RefCell::new(Some(move |data: &mut Data| {
155 if let Some(cb) = opt_cb.take() {
156 cb(data);
157 }
158 })));
159 self.inner.idles.borrow_mut().push(callback.clone());
160 Idle { callback }
161 }
162
163 pub fn enable(&self, token: &RegistrationToken) -> crate::Result<()> {
169 if let &SourceEntry {
170 token: entry_token,
171 source: Some(ref source),
172 } = self.inner.sources.borrow().get(token.inner)?
173 {
174 trace!(source = entry_token.get_id(), "Registering source");
175 source.register(
176 &mut self.inner.poll.borrow_mut(),
177 &mut self
178 .inner
179 .sources_with_additional_lifecycle_events
180 .borrow_mut(),
181 &mut TokenFactory::new(entry_token),
182 )
183 } else {
184 Err(crate::Error::InvalidToken)
185 }
186 }
187
188 pub fn update(&self, token: &RegistrationToken) -> crate::Result<()> {
193 if let &SourceEntry {
194 token: entry_token,
195 source: Some(ref source),
196 } = self.inner.sources.borrow().get(token.inner)?
197 {
198 trace!(
199 source = entry_token.get_id(),
200 "Updating registration of source"
201 );
202 if !source.reregister(
203 &mut self.inner.poll.borrow_mut(),
204 &mut self
205 .inner
206 .sources_with_additional_lifecycle_events
207 .borrow_mut(),
208 &mut TokenFactory::new(entry_token),
209 )? {
210 trace!(
211 source = entry_token.get_id(),
212 "Can't update registration withing a callback, storing for later."
213 );
214 self.inner.pending_action.set(PostAction::Reregister);
216 }
217 Ok(())
218 } else {
219 Err(crate::Error::InvalidToken)
220 }
221 }
222
223 pub fn disable(&self, token: &RegistrationToken) -> crate::Result<()> {
227 if let &SourceEntry {
228 token: entry_token,
229 source: Some(ref source),
230 } = self.inner.sources.borrow().get(token.inner)?
231 {
232 if !token.inner.same_source_as(entry_token) {
233 return Err(crate::Error::InvalidToken);
235 }
236 trace!(source = entry_token.get_id(), "Unregistering source");
237 if !source.unregister(
238 &mut self.inner.poll.borrow_mut(),
239 &mut self
240 .inner
241 .sources_with_additional_lifecycle_events
242 .borrow_mut(),
243 *token,
244 )? {
245 trace!(
246 source = entry_token.get_id(),
247 "Cannot unregister source in callback, storing for later."
248 );
249 self.inner.pending_action.set(PostAction::Disable);
251 }
252 Ok(())
253 } else {
254 Err(crate::Error::InvalidToken)
255 }
256 }
257
258 pub fn remove(&self, token: RegistrationToken) {
260 if let Ok(&mut SourceEntry {
261 token: entry_token,
262 ref mut source,
263 }) = self.inner.sources.borrow_mut().get_mut(token.inner)
264 {
265 if let Some(source) = source.take() {
266 trace!(source = entry_token.get_id(), "Removing source");
267 if let Err(e) = source.unregister(
268 &mut self.inner.poll.borrow_mut(),
269 &mut self
270 .inner
271 .sources_with_additional_lifecycle_events
272 .borrow_mut(),
273 token,
274 ) {
275 warn!("Failed to unregister source from the polling system: {e:?}");
276 }
277 }
278 }
279 }
280
281 pub fn adapt_io<F: AsFd>(&self, fd: F) -> crate::Result<crate::io::Async<'l, F>> {
289 crate::io::Async::new(self.inner.clone(), fd)
290 }
291}
292
293pub struct EventLoop<'l, Data> {
297 #[allow(dead_code)]
298 poller: Arc<Poller>,
299 handle: LoopHandle<'l, Data>,
300 signals: Arc<Signals>,
301 synthetic_events: Vec<PollEvent>,
303}
304
305impl<'l, Data> std::fmt::Debug for EventLoop<'l, Data> {
306 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
307 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
308 f.write_str("EventLoop { ... }")
309 }
310}
311
312struct Signals {
314 stop: AtomicBool,
316
317 #[cfg(feature = "block_on")]
319 future_ready: AtomicBool,
320}
321
322impl<'l, Data> EventLoop<'l, Data> {
323 pub fn try_new() -> crate::Result<Self> {
327 let poll = Poll::new()?;
328 let poller = poll.poller.clone();
329 let handle = LoopHandle {
330 inner: Rc::new(LoopInner {
331 poll: RefCell::new(poll),
332 sources: RefCell::new(SourceList::new()),
333 idles: RefCell::new(Vec::new()),
334 pending_action: Cell::new(PostAction::Continue),
335 sources_with_additional_lifecycle_events: Default::default(),
336 }),
337 };
338
339 Ok(EventLoop {
340 handle,
341 signals: Arc::new(Signals {
342 stop: AtomicBool::new(false),
343 #[cfg(feature = "block_on")]
344 future_ready: AtomicBool::new(false),
345 }),
346 poller,
347 synthetic_events: vec![],
348 })
349 }
350
351 pub fn handle(&self) -> LoopHandle<'l, Data> {
353 self.handle.clone()
354 }
355
356 fn dispatch_events(
357 &mut self,
358 mut timeout: Option<Duration>,
359 data: &mut Data,
360 ) -> crate::Result<()> {
361 let now = Instant::now();
362 {
363 let mut extra_lifecycle_sources = self
364 .handle
365 .inner
366 .sources_with_additional_lifecycle_events
367 .borrow_mut();
368 let sources = &self.handle.inner.sources.borrow();
369 for source in &mut *extra_lifecycle_sources.values {
370 if let Ok(SourceEntry {
371 source: Some(disp), ..
372 }) = sources.get(source.inner)
373 {
374 if let Some((readiness, token)) = disp.before_sleep()? {
375 timeout = Some(Duration::ZERO);
377 self.synthetic_events.push(PollEvent { readiness, token });
378 }
379 } else {
380 unreachable!()
381 }
382 }
383 }
384 let events = {
385 let poll = self.handle.inner.poll.borrow();
386 loop {
387 let result = poll.poll(timeout);
388
389 match result {
390 Ok(events) => break events,
391 Err(crate::Error::IoError(err)) if err.kind() == io::ErrorKind::Interrupted => {
392 if let Some(to) = timeout {
394 let elapsed = now.elapsed();
395 if elapsed >= to {
396 return Ok(());
397 } else {
398 timeout = Some(to - elapsed);
399 }
400 }
401 }
402 Err(err) => return Err(err),
403 };
404 }
405 };
406 {
407 let mut extra_lifecycle_sources = self
408 .handle
409 .inner
410 .sources_with_additional_lifecycle_events
411 .borrow_mut();
412 if !extra_lifecycle_sources.values.is_empty() {
413 for source in &mut *extra_lifecycle_sources.values {
414 if let Ok(SourceEntry {
415 source: Some(disp), ..
416 }) = self.handle.inner.sources.borrow().get(source.inner)
417 {
418 let iter = EventIterator {
419 inner: events.iter(),
420 registration_token: *source,
421 };
422 disp.before_handle_events(iter);
423 } else {
424 unreachable!()
425 }
426 }
427 }
428 }
429
430 for event in self.synthetic_events.drain(..).chain(events) {
431 let reg_token = event.token.inner.forget_sub_id();
433
434 let opt_disp = self
435 .handle
436 .inner
437 .sources
438 .borrow()
439 .get(reg_token)
440 .ok()
441 .and_then(|entry| entry.source.clone());
442
443 if let Some(disp) = opt_disp {
444 trace!(source = reg_token.get_id(), "Dispatching events for source");
445 let mut ret = disp.process_events(event.readiness, event.token, data)?;
446
447 let pending_action = self
449 .handle
450 .inner
451 .pending_action
452 .replace(PostAction::Continue);
453 if let PostAction::Continue = ret {
454 ret = pending_action;
455 }
456
457 match ret {
458 PostAction::Reregister => {
459 trace!(
460 source = reg_token.get_id(),
461 "Postaction reregister for source"
462 );
463 disp.reregister(
464 &mut self.handle.inner.poll.borrow_mut(),
465 &mut self
466 .handle
467 .inner
468 .sources_with_additional_lifecycle_events
469 .borrow_mut(),
470 &mut TokenFactory::new(reg_token),
471 )?;
472 }
473 PostAction::Disable => {
474 trace!(
475 source = reg_token.get_id(),
476 "Postaction unregister for source"
477 );
478 disp.unregister(
479 &mut self.handle.inner.poll.borrow_mut(),
480 &mut self
481 .handle
482 .inner
483 .sources_with_additional_lifecycle_events
484 .borrow_mut(),
485 RegistrationToken::new(reg_token),
486 )?;
487 }
488 PostAction::Remove => {
489 trace!(source = reg_token.get_id(), "Postaction remove for source");
490 if let Ok(entry) = self.handle.inner.sources.borrow_mut().get_mut(reg_token)
491 {
492 entry.source = None;
493 }
494 }
495 PostAction::Continue => {}
496 }
497
498 if self
499 .handle
500 .inner
501 .sources
502 .borrow()
503 .get(reg_token)
504 .ok()
505 .map(|entry| entry.source.is_none())
506 .unwrap_or(true)
507 {
508 let mut poll = self.handle.inner.poll.borrow_mut();
510 if let Err(e) = disp.unregister(
511 &mut poll,
512 &mut self
513 .handle
514 .inner
515 .sources_with_additional_lifecycle_events
516 .borrow_mut(),
517 RegistrationToken::new(reg_token),
518 ) {
519 warn!("Failed to unregister source from the polling system: {e:?}",);
520 }
521 }
522 } else {
523 warn!(?reg_token, "Received an event for non-existence source");
524 }
525 }
526
527 Ok(())
528 }
529
530 fn dispatch_idles(&mut self, data: &mut Data) {
531 let idles = std::mem::take(&mut *self.handle.inner.idles.borrow_mut());
532 for idle in idles {
533 idle.borrow_mut().dispatch(data);
534 }
535 }
536
537 pub fn dispatch<D: Into<Option<Duration>>>(
546 &mut self,
547 timeout: D,
548 data: &mut Data,
549 ) -> crate::Result<()> {
550 self.dispatch_events(timeout.into(), data)?;
551 self.dispatch_idles(data);
552
553 Ok(())
554 }
555
556 pub fn get_signal(&self) -> LoopSignal {
560 LoopSignal {
561 signal: self.signals.clone(),
562 notifier: self.handle.inner.poll.borrow().notifier(),
563 }
564 }
565
566 pub fn run<F, D: Into<Option<Duration>>>(
576 &mut self,
577 timeout: D,
578 data: &mut Data,
579 mut cb: F,
580 ) -> crate::Result<()>
581 where
582 F: FnMut(&mut Data),
583 {
584 let timeout = timeout.into();
585 self.signals.stop.store(false, Ordering::Release);
586 while !self.signals.stop.load(Ordering::Acquire) {
587 self.dispatch(timeout, data)?;
588 cb(data);
589 }
590 Ok(())
591 }
592
593 #[cfg(feature = "block_on")]
601 pub fn block_on<R>(
602 &mut self,
603 future: impl Future<Output = R>,
604 data: &mut Data,
605 mut cb: impl FnMut(&mut Data),
606 ) -> crate::Result<Option<R>> {
607 use std::task::{Context, Poll, Wake, Waker};
608
609 struct EventLoopWaker(LoopSignal);
611
612 impl Wake for EventLoopWaker {
613 fn wake(self: Arc<Self>) {
614 self.0.signal.future_ready.store(true, Ordering::Release);
616 self.0.notifier.notify().ok();
617 }
618
619 fn wake_by_ref(self: &Arc<Self>) {
620 self.0.signal.future_ready.store(true, Ordering::Release);
622 self.0.notifier.notify().ok();
623 }
624 }
625
626 pin_utils::pin_mut!(future);
628
629 let waker = {
631 let handle = EventLoopWaker(self.get_signal());
632
633 Waker::from(Arc::new(handle))
634 };
635 let mut context = Context::from_waker(&waker);
636
637 let mut output = None;
639
640 self.signals.stop.store(false, Ordering::Release);
641 self.signals.future_ready.store(true, Ordering::Release);
642
643 while !self.signals.stop.load(Ordering::Acquire) {
644 if self.signals.future_ready.swap(false, Ordering::AcqRel) {
646 if let Poll::Ready(result) = future.as_mut().poll(&mut context) {
648 output = Some(result);
649 break;
650 }
651 }
652
653 self.dispatch_events(None, data)?;
655 self.dispatch_idles(data);
656 cb(data);
657 }
658
659 Ok(output)
660 }
661}
662
663#[cfg(unix)]
664impl<'l, Data> AsRawFd for EventLoop<'l, Data> {
665 fn as_raw_fd(&self) -> RawFd {
673 self.poller.as_raw_fd()
674 }
675}
676
677#[cfg(unix)]
678impl<'l, Data> AsFd for EventLoop<'l, Data> {
679 fn as_fd(&self) -> BorrowedFd<'_> {
686 self.poller.as_fd()
687 }
688}
689
690#[cfg(windows)]
691impl<Data> AsRawHandle for EventLoop<'_, Data> {
692 fn as_raw_handle(&self) -> RawHandle {
693 self.poller.as_raw_handle()
694 }
695}
696
697#[cfg(windows)]
698impl<Data> AsHandle for EventLoop<'_, Data> {
699 fn as_handle(&self) -> BorrowedHandle<'_> {
700 self.poller.as_handle()
701 }
702}
703
704#[derive(Clone, Debug)]
705pub struct EventIterator<'a> {
713 inner: slice::Iter<'a, PollEvent>,
714 registration_token: RegistrationToken,
715}
716
717impl<'a> Iterator for EventIterator<'a> {
718 type Item = (Readiness, Token);
719
720 fn next(&mut self) -> Option<Self::Item> {
721 for next in self.inner.by_ref() {
722 if next
723 .token
724 .inner
725 .same_source_as(self.registration_token.inner)
726 {
727 return Some((next.readiness, next.token));
728 }
729 }
730 None
731 }
732}
733
734#[derive(Clone)]
737pub struct LoopSignal {
738 signal: Arc<Signals>,
739 notifier: Notifier,
740}
741
742impl std::fmt::Debug for LoopSignal {
743 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
744 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
745 f.write_str("LoopSignal { ... }")
746 }
747}
748
749impl LoopSignal {
750 pub fn stop(&self) {
757 self.signal.stop.store(true, Ordering::Release);
758 }
759
760 pub fn wakeup(&self) {
767 self.notifier.notify().ok();
768 }
769}
770
771#[cfg(test)]
772mod tests {
773 use std::{cell::Cell, rc::Rc, time::Duration};
774
775 use crate::{
776 channel::{channel, Channel},
777 ping::*,
778 EventIterator, EventSource, Poll, PostAction, Readiness, RegistrationToken, Token,
779 TokenFactory,
780 };
781
782 #[cfg(unix)]
783 use crate::{generic::Generic, Dispatcher, Interest, Mode};
784
785 use super::EventLoop;
786
787 #[test]
788 fn dispatch_idle() {
789 let mut event_loop = EventLoop::try_new().unwrap();
790
791 let mut dispatched = false;
792
793 event_loop.handle().insert_idle(|d| {
794 *d = true;
795 });
796
797 event_loop
798 .dispatch(Some(Duration::ZERO), &mut dispatched)
799 .unwrap();
800
801 assert!(dispatched);
802 }
803
804 #[test]
805 fn cancel_idle() {
806 let mut event_loop = EventLoop::try_new().unwrap();
807
808 let mut dispatched = false;
809
810 let handle = event_loop.handle();
811 let idle = handle.insert_idle(move |d| {
812 *d = true;
813 });
814
815 idle.cancel();
816
817 event_loop
818 .dispatch(Duration::ZERO, &mut dispatched)
819 .unwrap();
820
821 assert!(!dispatched);
822 }
823
824 #[test]
825 fn wakeup() {
826 let mut event_loop = EventLoop::try_new().unwrap();
827
828 let signal = event_loop.get_signal();
829
830 ::std::thread::spawn(move || {
831 ::std::thread::sleep(Duration::from_millis(500));
832 signal.wakeup();
833 });
834
835 event_loop.dispatch(None, &mut ()).unwrap();
837 }
838
839 #[test]
840 fn wakeup_stop() {
841 let mut event_loop = EventLoop::try_new().unwrap();
842
843 let signal = event_loop.get_signal();
844
845 ::std::thread::spawn(move || {
846 ::std::thread::sleep(Duration::from_millis(500));
847 signal.stop();
848 signal.wakeup();
849 });
850
851 event_loop.run(None, &mut (), |_| {}).unwrap();
853 }
854
855 #[test]
856 fn additional_events() {
857 let mut event_loop: EventLoop<'_, Lock> = EventLoop::try_new().unwrap();
858 let mut lock = Lock {
859 lock: Rc::new((
860 Cell::new(false),
862 Cell::new(0),
864 Cell::new(0),
867 )),
868 };
869 let (sender, channel) = channel();
870 let token = event_loop
871 .handle()
872 .insert_source(
873 LockingSource {
874 channel,
875 lock: lock.clone(),
876 },
877 |_, _, lock| {
878 lock.lock();
879 lock.unlock();
880 },
881 )
882 .unwrap();
883 sender.send(()).unwrap();
884
885 event_loop.dispatch(None, &mut lock).unwrap();
886 assert_eq!(lock.lock.1.get(), 2);
888 assert_eq!(lock.lock.2.get(), 1);
890 event_loop.handle().disable(&token).unwrap();
891 event_loop
892 .dispatch(Some(Duration::ZERO), &mut lock)
893 .unwrap();
894 assert_eq!(lock.lock.1.get(), 2);
895
896 event_loop.handle().enable(&token).unwrap();
897 event_loop
898 .dispatch(Some(Duration::ZERO), &mut lock)
899 .unwrap();
900 assert_eq!(lock.lock.1.get(), 3);
901 event_loop.handle().remove(token);
902 event_loop
903 .dispatch(Some(Duration::ZERO), &mut lock)
904 .unwrap();
905 assert_eq!(lock.lock.1.get(), 3);
906 assert_eq!(lock.lock.2.get(), 1);
907
908 #[derive(Clone)]
909 struct Lock {
910 lock: Rc<(Cell<bool>, Cell<u32>, Cell<u32>)>,
911 }
912 impl Lock {
913 fn lock(&self) {
914 if self.lock.0.get() {
915 panic!();
916 }
917 self.lock.1.set(self.lock.1.get() + 1);
919 self.lock.0.set(true)
920 }
921 fn unlock(&self) {
922 if !self.lock.0.get() {
923 panic!();
924 }
925 self.lock.0.set(false);
926 }
927 }
928 struct LockingSource {
929 channel: Channel<()>,
930 lock: Lock,
931 }
932 impl EventSource for LockingSource {
933 type Event = <Channel<()> as EventSource>::Event;
934
935 type Metadata = <Channel<()> as EventSource>::Metadata;
936
937 type Ret = <Channel<()> as EventSource>::Ret;
938
939 type Error = <Channel<()> as EventSource>::Error;
940
941 fn process_events<F>(
942 &mut self,
943 readiness: Readiness,
944 token: Token,
945 callback: F,
946 ) -> Result<PostAction, Self::Error>
947 where
948 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
949 {
950 self.channel.process_events(readiness, token, callback)
951 }
952
953 fn register(
954 &mut self,
955 poll: &mut Poll,
956 token_factory: &mut TokenFactory,
957 ) -> crate::Result<()> {
958 self.channel.register(poll, token_factory)
959 }
960
961 fn reregister(
962 &mut self,
963 poll: &mut Poll,
964 token_factory: &mut TokenFactory,
965 ) -> crate::Result<()> {
966 self.channel.reregister(poll, token_factory)
967 }
968
969 fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
970 self.channel.unregister(poll)
971 }
972
973 const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = true;
974
975 fn before_sleep(&mut self) -> crate::Result<Option<(Readiness, Token)>> {
976 self.lock.lock();
977 Ok(None)
978 }
979
980 fn before_handle_events(&mut self, events: EventIterator) {
981 let events_count = events.count();
982 let lock = &self.lock.lock;
983 lock.2.set(lock.2.get() + events_count as u32);
984 self.lock.unlock();
985 }
986 }
987 }
988 #[test]
989 fn default_additional_events() {
990 let (sender, channel) = channel();
991 let mut test_source = NoopWithDefaultHandlers { channel };
992 let mut event_loop = EventLoop::try_new().unwrap();
993 event_loop
994 .handle()
995 .insert_source(Box::new(&mut test_source), |_, _, _| {})
996 .unwrap();
997 sender.send(()).unwrap();
998
999 event_loop.dispatch(None, &mut ()).unwrap();
1000 struct NoopWithDefaultHandlers {
1001 channel: Channel<()>,
1002 }
1003 impl EventSource for NoopWithDefaultHandlers {
1004 type Event = <Channel<()> as EventSource>::Event;
1005
1006 type Metadata = <Channel<()> as EventSource>::Metadata;
1007
1008 type Ret = <Channel<()> as EventSource>::Ret;
1009
1010 type Error = <Channel<()> as EventSource>::Error;
1011
1012 fn process_events<F>(
1013 &mut self,
1014 readiness: Readiness,
1015 token: Token,
1016 callback: F,
1017 ) -> Result<PostAction, Self::Error>
1018 where
1019 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
1020 {
1021 self.channel.process_events(readiness, token, callback)
1022 }
1023
1024 fn register(
1025 &mut self,
1026 poll: &mut Poll,
1027 token_factory: &mut TokenFactory,
1028 ) -> crate::Result<()> {
1029 self.channel.register(poll, token_factory)
1030 }
1031
1032 fn reregister(
1033 &mut self,
1034 poll: &mut Poll,
1035 token_factory: &mut TokenFactory,
1036 ) -> crate::Result<()> {
1037 self.channel.reregister(poll, token_factory)
1038 }
1039
1040 fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
1041 self.channel.unregister(poll)
1042 }
1043
1044 const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = true;
1045 }
1046 }
1047
1048 #[test]
1049 fn additional_events_synthetic() {
1050 let mut event_loop: EventLoop<'_, Lock> = EventLoop::try_new().unwrap();
1051 let mut lock = Lock {
1052 lock: Rc::new(Cell::new(false)),
1053 };
1054 event_loop
1055 .handle()
1056 .insert_source(
1057 InstantWakeupLockingSource {
1058 lock: lock.clone(),
1059 token: None,
1060 },
1061 |_, _, lock| {
1062 lock.lock();
1063 lock.unlock();
1064 },
1065 )
1066 .unwrap();
1067
1068 event_loop.dispatch(None, &mut lock).unwrap();
1070 #[derive(Clone)]
1071 struct Lock {
1072 lock: Rc<Cell<bool>>,
1073 }
1074 impl Lock {
1075 fn lock(&self) {
1076 if self.lock.get() {
1077 panic!();
1078 }
1079 self.lock.set(true)
1080 }
1081 fn unlock(&self) {
1082 if !self.lock.get() {
1083 panic!();
1084 }
1085 self.lock.set(false);
1086 }
1087 }
1088 struct InstantWakeupLockingSource {
1089 lock: Lock,
1090 token: Option<Token>,
1091 }
1092 impl EventSource for InstantWakeupLockingSource {
1093 type Event = ();
1094
1095 type Metadata = ();
1096
1097 type Ret = ();
1098
1099 type Error = <Channel<()> as EventSource>::Error;
1100
1101 fn process_events<F>(
1102 &mut self,
1103 _: Readiness,
1104 token: Token,
1105 mut callback: F,
1106 ) -> Result<PostAction, Self::Error>
1107 where
1108 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
1109 {
1110 assert_eq!(token, self.token.unwrap());
1111 callback((), &mut ());
1112 Ok(PostAction::Continue)
1113 }
1114
1115 fn register(
1116 &mut self,
1117 _: &mut Poll,
1118 token_factory: &mut TokenFactory,
1119 ) -> crate::Result<()> {
1120 self.token = Some(token_factory.token());
1121 Ok(())
1122 }
1123
1124 fn reregister(&mut self, _: &mut Poll, _: &mut TokenFactory) -> crate::Result<()> {
1125 unreachable!()
1126 }
1127
1128 fn unregister(&mut self, _: &mut Poll) -> crate::Result<()> {
1129 unreachable!()
1130 }
1131
1132 const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = true;
1133
1134 fn before_sleep(&mut self) -> crate::Result<Option<(Readiness, Token)>> {
1135 self.lock.lock();
1136 Ok(Some((Readiness::EMPTY, self.token.unwrap())))
1137 }
1138
1139 fn before_handle_events(&mut self, _: EventIterator) {
1140 self.lock.unlock();
1141 }
1142 }
1143 }
1144
1145 #[cfg(unix)]
1146 #[test]
1147 fn insert_bad_source() {
1148 use std::mem::ManuallyDrop;
1149 use std::os::unix::io::{AsFd, FromRawFd, OwnedFd};
1150
1151 struct LeakedFd(ManuallyDrop<OwnedFd>);
1152
1153 impl AsFd for LeakedFd {
1154 fn as_fd(&self) -> std::os::unix::prelude::BorrowedFd<'_> {
1155 self.0.as_fd()
1156 }
1157 }
1158
1159 let event_loop = EventLoop::<()>::try_new().unwrap();
1160 let fd = LeakedFd(ManuallyDrop::new(unsafe {
1161 std::os::unix::io::OwnedFd::from_raw_fd(420)
1162 }));
1163 let ret = event_loop.handle().insert_source(
1164 crate::sources::generic::Generic::new(fd, Interest::READ, Mode::Level),
1165 |_, _, _| Ok(PostAction::Continue),
1166 );
1167 assert!(ret.is_err());
1168 }
1169
1170 #[test]
1171 fn invalid_token() {
1172 let (_ping, source) = crate::sources::ping::make_ping().unwrap();
1173
1174 let event_loop = EventLoop::<()>::try_new().unwrap();
1175 let handle = event_loop.handle();
1176 let reg_token = handle.insert_source(source, |_, _, _| {}).unwrap();
1177 handle.remove(reg_token);
1178
1179 let ret = handle.enable(®_token);
1180 assert!(ret.is_err());
1181 }
1182
1183 #[cfg(unix)]
1184 #[test]
1185 fn insert_source_no_interest() {
1186 use rustix::pipe::pipe;
1187
1188 let (read, _write) = pipe().unwrap();
1190
1191 let source = crate::sources::generic::Generic::new(read, Interest::EMPTY, Mode::Level);
1192 let dispatcher = Dispatcher::new(source, |_, _, _| Ok(PostAction::Continue));
1193
1194 let event_loop = EventLoop::<()>::try_new().unwrap();
1195 let handle = event_loop.handle();
1196 let ret = handle.register_dispatcher(dispatcher.clone());
1197
1198 if let Ok(token) = ret {
1199 handle.remove(token);
1201 } else {
1202 panic!();
1204 }
1205 }
1206
1207 #[test]
1208 fn disarm_rearm() {
1209 let mut event_loop = EventLoop::<bool>::try_new().unwrap();
1210 let (ping, ping_source) = make_ping().unwrap();
1211
1212 let ping_token = event_loop
1213 .handle()
1214 .insert_source(ping_source, |(), &mut (), dispatched| {
1215 *dispatched = true;
1216 })
1217 .unwrap();
1218
1219 ping.ping();
1220 let mut dispatched = false;
1221 event_loop
1222 .dispatch(Duration::ZERO, &mut dispatched)
1223 .unwrap();
1224 assert!(dispatched);
1225
1226 ping.ping();
1228 event_loop.handle().disable(&ping_token).unwrap();
1229 let mut dispatched = false;
1230 event_loop
1231 .dispatch(Duration::ZERO, &mut dispatched)
1232 .unwrap();
1233 assert!(!dispatched);
1234
1235 event_loop.handle().enable(&ping_token).unwrap();
1237 let mut dispatched = false;
1238 event_loop
1239 .dispatch(Duration::ZERO, &mut dispatched)
1240 .unwrap();
1241 assert!(dispatched);
1242 }
1243
1244 #[test]
1245 fn multiple_tokens() {
1246 struct DoubleSource {
1247 ping1: PingSource,
1248 ping2: PingSource,
1249 }
1250
1251 impl crate::EventSource for DoubleSource {
1252 type Event = u32;
1253 type Metadata = ();
1254 type Ret = ();
1255 type Error = PingError;
1256
1257 fn process_events<F>(
1258 &mut self,
1259 readiness: Readiness,
1260 token: Token,
1261 mut callback: F,
1262 ) -> Result<PostAction, Self::Error>
1263 where
1264 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
1265 {
1266 self.ping1
1267 .process_events(readiness, token, |(), &mut ()| callback(1, &mut ()))?;
1268 self.ping2
1269 .process_events(readiness, token, |(), &mut ()| callback(2, &mut ()))?;
1270 Ok(PostAction::Continue)
1271 }
1272
1273 fn register(
1274 &mut self,
1275 poll: &mut Poll,
1276 token_factory: &mut TokenFactory,
1277 ) -> crate::Result<()> {
1278 self.ping1.register(poll, token_factory)?;
1279 self.ping2.register(poll, token_factory)?;
1280 Ok(())
1281 }
1282
1283 fn reregister(
1284 &mut self,
1285 poll: &mut Poll,
1286 token_factory: &mut TokenFactory,
1287 ) -> crate::Result<()> {
1288 self.ping1.reregister(poll, token_factory)?;
1289 self.ping2.reregister(poll, token_factory)?;
1290 Ok(())
1291 }
1292
1293 fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
1294 self.ping1.unregister(poll)?;
1295 self.ping2.unregister(poll)?;
1296 Ok(())
1297 }
1298 }
1299
1300 let mut event_loop = EventLoop::<u32>::try_new().unwrap();
1301
1302 let (ping1, source1) = make_ping().unwrap();
1303 let (ping2, source2) = make_ping().unwrap();
1304
1305 let source = DoubleSource {
1306 ping1: source1,
1307 ping2: source2,
1308 };
1309
1310 event_loop
1311 .handle()
1312 .insert_source(source, |i, _, d| {
1313 eprintln!("Dispatching {}", i);
1314 *d += i
1315 })
1316 .unwrap();
1317
1318 let mut dispatched = 0;
1319 ping1.ping();
1320 event_loop
1321 .dispatch(Duration::ZERO, &mut dispatched)
1322 .unwrap();
1323 assert_eq!(dispatched, 1);
1324
1325 dispatched = 0;
1326 ping2.ping();
1327 event_loop
1328 .dispatch(Duration::ZERO, &mut dispatched)
1329 .unwrap();
1330 assert_eq!(dispatched, 2);
1331
1332 dispatched = 0;
1333 ping1.ping();
1334 ping2.ping();
1335 event_loop
1336 .dispatch(Duration::ZERO, &mut dispatched)
1337 .unwrap();
1338 assert_eq!(dispatched, 3);
1339 }
1340
1341 #[cfg(unix)]
1342 #[test]
1343 fn change_interests() {
1344 use rustix::io::write;
1345 use rustix::net::{recv, socketpair, AddressFamily, RecvFlags, SocketFlags, SocketType};
1346 let mut event_loop = EventLoop::<bool>::try_new().unwrap();
1347
1348 let (sock1, sock2) = socketpair(
1349 AddressFamily::UNIX,
1350 SocketType::STREAM,
1351 SocketFlags::empty(),
1352 None, )
1354 .unwrap();
1355
1356 let source = Generic::new(sock1, Interest::READ, Mode::Level);
1357 let dispatcher = Dispatcher::new(source, |_, fd, dispatched| {
1358 *dispatched = true;
1359 let mut buf = [0u8; 32];
1361 loop {
1362 match recv(&*fd, &mut buf, RecvFlags::DONTWAIT) {
1363 Ok(0) => break, Ok(_) => {}
1365 Err(e) => {
1366 let e: std::io::Error = e.into();
1367 if e.kind() == std::io::ErrorKind::WouldBlock {
1368 break;
1369 } else {
1371 return Err(e);
1373 }
1374 }
1375 }
1376 }
1377 Ok(PostAction::Continue)
1378 });
1379
1380 let sock_token_1 = event_loop
1381 .handle()
1382 .register_dispatcher(dispatcher.clone())
1383 .unwrap();
1384
1385 let mut dispatched = false;
1387 event_loop
1388 .dispatch(Duration::ZERO, &mut dispatched)
1389 .unwrap();
1390 assert!(!dispatched);
1391
1392 write(&sock2, &[1, 2, 3]).unwrap();
1394 dispatched = false;
1395 event_loop
1396 .dispatch(Duration::ZERO, &mut dispatched)
1397 .unwrap();
1398 assert!(dispatched);
1399
1400 dispatched = false;
1402 event_loop
1403 .dispatch(Duration::ZERO, &mut dispatched)
1404 .unwrap();
1405 assert!(!dispatched);
1406
1407 dispatcher.as_source_mut().interest = Interest::WRITE;
1409 event_loop.handle().update(&sock_token_1).unwrap();
1410
1411 dispatched = false;
1413 event_loop
1414 .dispatch(Duration::ZERO, &mut dispatched)
1415 .unwrap();
1416 assert!(dispatched);
1417
1418 dispatcher.as_source_mut().interest = Interest::READ;
1420 event_loop.handle().update(&sock_token_1).unwrap();
1421
1422 dispatched = false;
1424 event_loop
1425 .dispatch(Duration::ZERO, &mut dispatched)
1426 .unwrap();
1427 assert!(!dispatched);
1428 }
1429
1430 #[test]
1431 fn kill_source() {
1432 let mut event_loop = EventLoop::<Option<RegistrationToken>>::try_new().unwrap();
1433
1434 let handle = event_loop.handle();
1435 let (ping, ping_source) = make_ping().unwrap();
1436 let ping_token = event_loop
1437 .handle()
1438 .insert_source(ping_source, move |(), &mut (), opt_src| {
1439 if let Some(src) = opt_src.take() {
1440 handle.remove(src);
1441 }
1442 })
1443 .unwrap();
1444
1445 ping.ping();
1446
1447 let mut opt_src = Some(ping_token);
1448
1449 event_loop.dispatch(Duration::ZERO, &mut opt_src).unwrap();
1450
1451 assert!(opt_src.is_none());
1452 }
1453
1454 #[test]
1455 fn non_static_data() {
1456 use std::sync::mpsc;
1457
1458 let (sender, receiver) = mpsc::channel();
1459
1460 {
1461 struct RefSender<'a>(&'a mpsc::Sender<()>);
1462 let mut ref_sender = RefSender(&sender);
1463
1464 let mut event_loop = EventLoop::<RefSender<'_>>::try_new().unwrap();
1465 let (ping, ping_source) = make_ping().unwrap();
1466 let _ping_token = event_loop
1467 .handle()
1468 .insert_source(ping_source, |_, _, ref_sender| {
1469 ref_sender.0.send(()).unwrap();
1470 })
1471 .unwrap();
1472
1473 ping.ping();
1474
1475 event_loop
1476 .dispatch(Duration::ZERO, &mut ref_sender)
1477 .unwrap();
1478 }
1479
1480 receiver.recv().unwrap();
1481 drop(sender);
1483 }
1484
1485 #[cfg(feature = "block_on")]
1486 #[test]
1487 fn block_on_test() {
1488 use crate::sources::timer::TimeoutFuture;
1489 use std::time::Duration;
1490
1491 let mut evl = EventLoop::<()>::try_new().unwrap();
1492
1493 let mut data = 22;
1494 let timeout = {
1495 let data = &mut data;
1496 let evl_handle = evl.handle();
1497
1498 async move {
1499 TimeoutFuture::from_duration(&evl_handle, Duration::from_secs(2)).await;
1500 *data = 32;
1501 11
1502 }
1503 };
1504
1505 let result = evl.block_on(timeout, &mut (), |&mut ()| {}).unwrap();
1506 assert_eq!(result, Some(11));
1507 assert_eq!(data, 32);
1508 }
1509
1510 #[cfg(feature = "block_on")]
1511 #[test]
1512 fn block_on_early_cancel() {
1513 use crate::sources::timer;
1514 use std::time::Duration;
1515
1516 let mut evl = EventLoop::<()>::try_new().unwrap();
1517
1518 let mut data = 22;
1519 let timeout = {
1520 let data = &mut data;
1521 let evl_handle = evl.handle();
1522
1523 async move {
1524 timer::TimeoutFuture::from_duration(&evl_handle, Duration::from_secs(2)).await;
1525 *data = 32;
1526 11
1527 }
1528 };
1529
1530 let timer_source = timer::Timer::from_duration(Duration::from_secs(1));
1531 let handle = evl.get_signal();
1532 let _timer_token = evl
1533 .handle()
1534 .insert_source(timer_source, move |_, _, _| {
1535 handle.stop();
1536 timer::TimeoutAction::Drop
1537 })
1538 .unwrap();
1539
1540 let result = evl.block_on(timeout, &mut (), |&mut ()| {}).unwrap();
1541 assert_eq!(result, None);
1542 assert_eq!(data, 22);
1543 }
1544
1545 #[test]
1546 fn reuse() {
1547 use crate::sources::timer;
1548 use std::sync::{Arc, Mutex};
1549 use std::time::{Duration, Instant};
1550
1551 let mut evl = EventLoop::<RegistrationToken>::try_new().unwrap();
1552 let handle = evl.handle();
1553
1554 let data = Arc::new(Mutex::new(1));
1555 let data_cloned = data.clone();
1556
1557 let timer_source = timer::Timer::from_duration(Duration::from_secs(1));
1558 let mut first_timer_token = evl
1559 .handle()
1560 .insert_source(timer_source, move |_, _, own_token| {
1561 handle.remove(*own_token);
1562 let data_cloned = data_cloned.clone();
1563 let _ = handle.insert_source(timer::Timer::immediate(), move |_, _, _| {
1564 *data_cloned.lock().unwrap() = 2;
1565 timer::TimeoutAction::Drop
1566 });
1567 timer::TimeoutAction::Drop
1568 })
1569 .unwrap();
1570
1571 let now = Instant::now();
1572 loop {
1573 evl.dispatch(Some(Duration::from_secs(3)), &mut first_timer_token)
1574 .unwrap();
1575 if Instant::now().duration_since(now) > Duration::from_secs(3) {
1576 break;
1577 }
1578 }
1579
1580 assert_eq!(*data.lock().unwrap(), 2);
1581 }
1582
1583 #[test]
1584 fn drop_of_subsource() {
1585 struct WithSubSource {
1586 token: Option<Token>,
1587 }
1588
1589 impl crate::EventSource for WithSubSource {
1590 type Event = ();
1591 type Metadata = ();
1592 type Ret = ();
1593 type Error = crate::Error;
1594 const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = true;
1595
1596 fn process_events<F>(
1597 &mut self,
1598 _: Readiness,
1599 _: Token,
1600 mut callback: F,
1601 ) -> Result<PostAction, Self::Error>
1602 where
1603 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
1604 {
1605 callback((), &mut ());
1606 Ok(PostAction::Remove)
1608 }
1609
1610 fn register(&mut self, _: &mut Poll, fact: &mut TokenFactory) -> crate::Result<()> {
1611 fact.token();
1613 fact.token();
1614 self.token = Some(fact.token());
1615 Ok(())
1616 }
1617
1618 fn reregister(&mut self, _: &mut Poll, _: &mut TokenFactory) -> crate::Result<()> {
1619 Ok(())
1620 }
1621
1622 fn unregister(&mut self, _: &mut Poll) -> crate::Result<()> {
1623 Ok(())
1624 }
1625
1626 fn before_sleep(&mut self) -> crate::Result<Option<(Readiness, Token)>> {
1628 Ok(self.token.map(|token| {
1629 (
1630 Readiness {
1631 readable: true,
1632 writable: false,
1633 error: false,
1634 },
1635 token,
1636 )
1637 }))
1638 }
1639 }
1640
1641 let mut evl = EventLoop::<bool>::try_new().unwrap();
1643 evl.handle()
1644 .insert_source(WithSubSource { token: None }, |_, _, ran| {
1645 *ran = true;
1646 })
1647 .unwrap();
1648
1649 let mut ran = false;
1650
1651 evl.dispatch(Some(Duration::ZERO), &mut ran).unwrap();
1652
1653 assert!(ran);
1654 }
1655
1656 struct DummySource;
1658
1659 impl crate::EventSource for DummySource {
1660 type Event = ();
1661 type Metadata = ();
1662 type Ret = ();
1663 type Error = crate::Error;
1664
1665 fn process_events<F>(
1666 &mut self,
1667 _: Readiness,
1668 _: Token,
1669 mut callback: F,
1670 ) -> Result<PostAction, Self::Error>
1671 where
1672 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
1673 {
1674 callback((), &mut ());
1675 Ok(PostAction::Continue)
1676 }
1677
1678 fn register(&mut self, _: &mut Poll, _: &mut TokenFactory) -> crate::Result<()> {
1679 Ok(())
1680 }
1681
1682 fn reregister(&mut self, _: &mut Poll, _: &mut TokenFactory) -> crate::Result<()> {
1683 Ok(())
1684 }
1685
1686 fn unregister(&mut self, _: &mut Poll) -> crate::Result<()> {
1687 Ok(())
1688 }
1689 }
1690}