1use std::{
29 cell::RefCell,
30 collections::BinaryHeap,
31 rc::Rc,
32 task::Waker,
33 time::{Duration, Instant},
34};
35
36use crate::{EventSource, LoopHandle, Poll, PostAction, Readiness, Token, TokenFactory};
37
38#[derive(Debug)]
39struct Registration {
40 token: Token,
41 wheel: Rc<RefCell<TimerWheel>>,
42 counter: u32,
43}
44
45#[derive(Debug)]
51pub struct Timer {
52 registration: Option<Registration>,
53 deadline: Option<Instant>,
54}
55
56impl Timer {
57 pub fn immediate() -> Timer {
59 Self::from_deadline(Instant::now())
60 }
61
62 pub fn from_duration(duration: Duration) -> Timer {
64 Self::from_deadline_inner(Instant::now().checked_add(duration))
65 }
66
67 pub fn from_deadline(deadline: Instant) -> Timer {
69 Self::from_deadline_inner(Some(deadline))
70 }
71
72 fn from_deadline_inner(deadline: Option<Instant>) -> Timer {
73 Timer {
74 registration: None,
75 deadline,
76 }
77 }
78
79 pub fn set_deadline(&mut self, deadline: Instant) {
84 self.deadline = Some(deadline);
85 }
86
87 pub fn set_duration(&mut self, duration: Duration) {
92 self.deadline = Instant::now().checked_add(duration);
93 }
94
95 pub fn current_deadline(&self) -> Option<Instant> {
99 self.deadline
100 }
101}
102
103impl EventSource for Timer {
104 type Event = Instant;
105 type Metadata = ();
106 type Ret = TimeoutAction;
107 type Error = std::io::Error;
108
109 fn process_events<F>(
110 &mut self,
111 _: Readiness,
112 token: Token,
113 mut callback: F,
114 ) -> Result<PostAction, Self::Error>
115 where
116 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
117 {
118 if let (Some(ref registration), Some(ref deadline)) = (&self.registration, &self.deadline) {
119 if registration.token != token {
120 return Ok(PostAction::Continue);
121 }
122 let new_deadline = match callback(*deadline, &mut ()) {
123 TimeoutAction::Drop => return Ok(PostAction::Remove),
124 TimeoutAction::ToInstant(instant) => instant,
125 TimeoutAction::ToDuration(duration) => match Instant::now().checked_add(duration) {
126 Some(new_deadline) => new_deadline,
127 None => {
128 self.deadline = None;
130 return Ok(PostAction::Remove);
131 }
132 },
133 };
134 registration.wheel.borrow_mut().insert_reuse(
136 registration.counter,
137 new_deadline,
138 registration.token,
139 );
140 self.deadline = Some(new_deadline);
141 }
142 Ok(PostAction::Continue)
143 }
144
145 fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
146 if let Some(deadline) = self.deadline {
148 let wheel = poll.timers.clone();
149 let token = token_factory.token();
150 let counter = wheel.borrow_mut().insert(deadline, token);
151 self.registration = Some(Registration {
152 token,
153 wheel,
154 counter,
155 });
156 }
157
158 Ok(())
159 }
160
161 fn reregister(
162 &mut self,
163 poll: &mut Poll,
164 token_factory: &mut TokenFactory,
165 ) -> crate::Result<()> {
166 self.unregister(poll)?;
167 self.register(poll, token_factory)
168 }
169
170 fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
171 if let Some(registration) = self.registration.take() {
172 poll.timers.borrow_mut().cancel(registration.counter);
173 }
174 Ok(())
175 }
176}
177
178#[derive(Debug)]
180pub enum TimeoutAction {
181 Drop,
183 ToInstant(Instant),
185 ToDuration(Duration),
187}
188
189#[derive(Debug)]
191struct TimeoutData {
192 deadline: Instant,
193 token: RefCell<Option<Token>>,
194 counter: u32,
195}
196
197#[derive(Debug)]
199pub(crate) struct TimerWheel {
200 heap: BinaryHeap<TimeoutData>,
201 counter: u32,
202}
203
204impl TimerWheel {
205 pub(crate) fn new() -> TimerWheel {
206 TimerWheel {
207 heap: BinaryHeap::new(),
208 counter: 0,
209 }
210 }
211
212 pub(crate) fn insert(&mut self, deadline: Instant, token: Token) -> u32 {
213 self.heap.push(TimeoutData {
214 deadline,
215 token: RefCell::new(Some(token)),
216 counter: self.counter,
217 });
218 let ret = self.counter;
219 self.counter += 1;
220 ret
221 }
222
223 pub(crate) fn insert_reuse(&mut self, counter: u32, deadline: Instant, token: Token) {
224 self.heap.push(TimeoutData {
225 deadline,
226 token: RefCell::new(Some(token)),
227 counter,
228 });
229 }
230
231 pub(crate) fn cancel(&mut self, counter: u32) {
232 if self
233 .heap
234 .peek()
235 .map(|data| data.counter == counter)
236 .unwrap_or(false)
237 {
238 self.heap.pop();
239 return;
240 };
241
242 self.heap
243 .iter()
244 .rev()
245 .find(|data| data.counter == counter)
246 .map(|data| data.token.take());
247 }
248
249 pub(crate) fn next_expired(&mut self, now: Instant) -> Option<(u32, Token)> {
250 loop {
251 if let Some(data) = self.heap.peek() {
253 if data.deadline > now {
254 return None;
255 }
256 } else {
259 return None;
260 }
261
262 let data = self.heap.pop().unwrap();
264 if let Some(token) = data.token.into_inner() {
265 return Some((data.counter, token));
266 }
267 }
269 }
270
271 pub(crate) fn next_deadline(&self) -> Option<std::time::Instant> {
272 self.heap.peek().map(|data| data.deadline)
273 }
274}
275
276impl std::cmp::Ord for TimeoutData {
279 #[inline]
280 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
281 self.deadline.cmp(&other.deadline).reverse()
283 }
284}
285
286impl std::cmp::PartialOrd for TimeoutData {
287 #[inline]
288 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
289 Some(self.cmp(other))
290 }
291}
292
293impl std::cmp::PartialEq for TimeoutData {
296 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
297 #[inline]
298 fn eq(&self, other: &Self) -> bool {
299 self.deadline == other.deadline
300 }
301}
302
303impl std::cmp::Eq for TimeoutData {}
304
305pub struct TimeoutFuture {
309 deadline: Option<Instant>,
310 waker: Rc<RefCell<Option<Waker>>>,
311}
312
313impl std::fmt::Debug for TimeoutFuture {
314 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
315 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
316 f.debug_struct("TimeoutFuture")
317 .field("deadline", &self.deadline)
318 .finish_non_exhaustive()
319 }
320}
321
322impl TimeoutFuture {
323 pub fn from_duration<Data>(handle: &LoopHandle<'_, Data>, duration: Duration) -> TimeoutFuture {
325 Self::from_deadline_inner(handle, Instant::now().checked_add(duration))
326 }
327
328 pub fn from_deadline<Data>(handle: &LoopHandle<'_, Data>, deadline: Instant) -> TimeoutFuture {
330 Self::from_deadline_inner(handle, Some(deadline))
331 }
332
333 fn from_deadline_inner<Data>(
335 handle: &LoopHandle<'_, Data>,
336 deadline: Option<Instant>,
337 ) -> TimeoutFuture {
338 let timer = Timer::from_deadline_inner(deadline);
339 let waker = Rc::new(RefCell::new(None::<Waker>));
340 handle
341 .insert_source(timer, {
342 let waker = waker.clone();
343 move |_, &mut (), _| {
344 if let Some(waker) = waker.borrow_mut().clone() {
345 waker.wake()
346 }
347 TimeoutAction::Drop
348 }
349 })
350 .unwrap();
351
352 TimeoutFuture { deadline, waker }
353 }
354}
355
356impl std::future::Future for TimeoutFuture {
357 type Output = ();
358
359 fn poll(
360 self: std::pin::Pin<&mut Self>,
361 cx: &mut std::task::Context<'_>,
362 ) -> std::task::Poll<Self::Output> {
363 match self.deadline {
364 None => return std::task::Poll::Pending,
365
366 Some(deadline) => {
367 if Instant::now() >= deadline {
368 return std::task::Poll::Ready(());
369 }
370 }
371 }
372
373 *self.waker.borrow_mut() = Some(cx.waker().clone());
374 std::task::Poll::Pending
375 }
376}
377
378#[cfg(test)]
379mod tests {
380 use super::*;
381 use crate::*;
382 use std::time::Duration;
383
384 #[test]
385 fn simple_timer() {
386 let mut event_loop = EventLoop::try_new().unwrap();
387
388 let mut dispatched = false;
389
390 event_loop
391 .handle()
392 .insert_source(
393 Timer::from_duration(Duration::from_millis(100)),
394 |_, &mut (), dispatched| {
395 *dispatched = true;
396 TimeoutAction::Drop
397 },
398 )
399 .unwrap();
400
401 event_loop
402 .dispatch(Some(Duration::ZERO), &mut dispatched)
403 .unwrap();
404 assert!(!dispatched);
406
407 event_loop
408 .dispatch(Some(Duration::from_millis(150)), &mut dispatched)
409 .unwrap();
410 assert!(dispatched);
412 }
413
414 #[test]
415 fn simple_timer_instant() {
416 let mut event_loop = EventLoop::try_new().unwrap();
417
418 let mut dispatched = false;
419
420 event_loop
421 .handle()
422 .insert_source(
423 Timer::from_duration(Duration::from_millis(100)),
424 |_, &mut (), dispatched| {
425 *dispatched = true;
426 TimeoutAction::Drop
427 },
428 )
429 .unwrap();
430
431 event_loop
432 .dispatch(Some(Duration::from_millis(150)), &mut dispatched)
433 .unwrap();
434 assert!(dispatched);
436 }
437
438 #[test]
439 fn immediate_timer() {
440 let mut event_loop = EventLoop::try_new().unwrap();
441
442 let mut dispatched = false;
443
444 event_loop
445 .handle()
446 .insert_source(Timer::immediate(), |_, &mut (), dispatched| {
447 *dispatched = true;
448 TimeoutAction::Drop
449 })
450 .unwrap();
451
452 event_loop
453 .dispatch(Some(Duration::ZERO), &mut dispatched)
454 .unwrap();
455 assert!(dispatched);
457 }
458
459 #[test]
463 fn high_precision_timer() {
464 let mut event_loop = EventLoop::try_new().unwrap();
465
466 let mut dispatched = false;
467
468 event_loop
469 .handle()
470 .insert_source(
471 Timer::from_duration(Duration::from_millis(100)),
472 |_, &mut (), dispatched| {
473 *dispatched = true;
474 TimeoutAction::Drop
475 },
476 )
477 .unwrap();
478
479 event_loop
480 .dispatch(Some(Duration::ZERO), &mut dispatched)
481 .unwrap();
482 assert!(!dispatched);
484
485 event_loop
486 .dispatch(Some(Duration::from_micros(10200)), &mut dispatched)
487 .unwrap();
488 assert!(!dispatched);
490
491 event_loop
492 .dispatch(Some(Duration::from_millis(100)), &mut dispatched)
493 .unwrap();
494 assert!(dispatched);
496 }
497
498 #[test]
499 fn cancel_timer() {
500 let mut event_loop = EventLoop::try_new().unwrap();
501
502 let mut dispatched = false;
503
504 let token = event_loop
505 .handle()
506 .insert_source(
507 Timer::from_duration(Duration::from_millis(100)),
508 |_, &mut (), dispatched| {
509 *dispatched = true;
510 TimeoutAction::Drop
511 },
512 )
513 .unwrap();
514
515 event_loop
516 .dispatch(Some(Duration::ZERO), &mut dispatched)
517 .unwrap();
518 assert!(!dispatched);
520
521 event_loop.handle().remove(token);
522
523 event_loop
524 .dispatch(Some(Duration::from_millis(150)), &mut dispatched)
525 .unwrap();
526 assert!(!dispatched);
528 }
529
530 #[test]
531 fn repeating_timer() {
532 let mut event_loop = EventLoop::try_new().unwrap();
533
534 let mut dispatched = 0;
535
536 event_loop
537 .handle()
538 .insert_source(
539 Timer::from_duration(Duration::from_millis(500)),
540 |_, &mut (), dispatched| {
541 *dispatched += 1;
542 TimeoutAction::ToDuration(Duration::from_millis(500))
543 },
544 )
545 .unwrap();
546
547 event_loop
548 .dispatch(Some(Duration::from_millis(250)), &mut dispatched)
549 .unwrap();
550 assert_eq!(dispatched, 0);
551
552 event_loop
553 .dispatch(Some(Duration::from_millis(510)), &mut dispatched)
554 .unwrap();
555 assert_eq!(dispatched, 1);
556
557 event_loop
558 .dispatch(Some(Duration::from_millis(510)), &mut dispatched)
559 .unwrap();
560 assert_eq!(dispatched, 2);
561
562 event_loop
563 .dispatch(Some(Duration::from_millis(510)), &mut dispatched)
564 .unwrap();
565 assert_eq!(dispatched, 3);
566 }
567
568 #[cfg(feature = "executor")]
569 #[test]
570 fn timeout_future() {
571 let mut event_loop = EventLoop::try_new().unwrap();
572
573 let mut dispatched = 0;
574
575 let timeout_1 =
576 TimeoutFuture::from_duration(&event_loop.handle(), Duration::from_millis(500));
577 let timeout_2 =
578 TimeoutFuture::from_duration(&event_loop.handle(), Duration::from_millis(1500));
579 let timeout_3 = TimeoutFuture::from_duration(&event_loop.handle(), Duration::MAX);
581
582 let (exec, sched) = crate::sources::futures::executor().unwrap();
583 event_loop
584 .handle()
585 .insert_source(exec, move |(), &mut (), got| {
586 *got += 1;
587 })
588 .unwrap();
589
590 sched.schedule(timeout_1).unwrap();
591 sched.schedule(timeout_2).unwrap();
592 sched.schedule(timeout_3).unwrap();
593
594 event_loop
598 .dispatch(Some(Duration::ZERO), &mut dispatched)
599 .unwrap();
600 event_loop
601 .dispatch(Some(Duration::ZERO), &mut dispatched)
602 .unwrap();
603 assert_eq!(dispatched, 0);
604
605 event_loop
606 .dispatch(Some(Duration::from_millis(1000)), &mut dispatched)
607 .unwrap();
608 event_loop
609 .dispatch(Some(Duration::ZERO), &mut dispatched)
610 .unwrap();
611 assert_eq!(dispatched, 1);
612
613 event_loop
614 .dispatch(Some(Duration::from_millis(1100)), &mut dispatched)
615 .unwrap();
616 event_loop
617 .dispatch(Some(Duration::ZERO), &mut dispatched)
618 .unwrap();
619 assert_eq!(dispatched, 2);
620 }
621
622 #[test]
623 fn no_overflow() {
624 let mut event_loop = EventLoop::try_new().unwrap();
625
626 let mut dispatched = 0;
627
628 event_loop
629 .handle()
630 .insert_source(
631 Timer::from_duration(Duration::from_millis(500)),
632 |_, &mut (), dispatched| {
633 *dispatched += 1;
634 TimeoutAction::Drop
635 },
636 )
637 .unwrap();
638
639 event_loop
640 .handle()
641 .insert_source(Timer::from_duration(Duration::MAX), |_, &mut (), _| {
642 panic!("This timer should never go off")
643 })
644 .unwrap();
645
646 event_loop
647 .dispatch(Some(Duration::from_millis(250)), &mut dispatched)
648 .unwrap();
649 assert_eq!(dispatched, 0);
650
651 event_loop
652 .dispatch(Some(Duration::from_millis(510)), &mut dispatched)
653 .unwrap();
654 assert_eq!(dispatched, 1);
655
656 event_loop
657 .dispatch(Some(Duration::from_millis(510)), &mut dispatched)
658 .unwrap();
659 assert_eq!(dispatched, 1);
660 }
661}