1use std::{
29 cell::RefCell,
30 collections::BinaryHeap,
31 rc::Rc,
32 task::Waker,
33 time::{Duration, Instant},
34};
35
36use crate::{EventSource, LoopHandle, Poll, PostAction, Readiness, Token, TokenFactory};
37
38#[derive(Debug)]
39struct Registration {
40 token: Token,
41 wheel: Rc<RefCell<TimerWheel>>,
42 counter: u32,
43}
44
45#[derive(Debug)]
51pub struct Timer {
52 registration: Option<Registration>,
53 deadline: Option<Instant>,
54}
55
56impl Timer {
57 pub fn immediate() -> Timer {
59 Self::from_deadline(Instant::now())
60 }
61
62 pub fn from_duration(duration: Duration) -> Timer {
64 Self::from_deadline_inner(Instant::now().checked_add(duration))
65 }
66
67 pub fn from_deadline(deadline: Instant) -> Timer {
69 Self::from_deadline_inner(Some(deadline))
70 }
71
72 fn from_deadline_inner(deadline: Option<Instant>) -> Timer {
73 Timer {
74 registration: None,
75 deadline,
76 }
77 }
78
79 pub fn set_deadline(&mut self, deadline: Instant) {
84 self.deadline = Some(deadline);
85 }
86
87 pub fn set_duration(&mut self, duration: Duration) {
92 self.deadline = Instant::now().checked_add(duration);
93 }
94
95 pub fn current_deadline(&self) -> Option<Instant> {
99 self.deadline
100 }
101}
102
103impl EventSource for Timer {
104 type Event = Instant;
105 type Metadata = ();
106 type Ret = TimeoutAction;
107 type Error = std::io::Error;
108
109 fn process_events<F>(
110 &mut self,
111 _: Readiness,
112 token: Token,
113 mut callback: F,
114 ) -> Result<PostAction, Self::Error>
115 where
116 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
117 {
118 if let (Some(ref registration), Some(ref deadline)) = (&self.registration, &self.deadline) {
119 if registration.token != token {
120 return Ok(PostAction::Continue);
121 }
122 let new_deadline = match callback(*deadline, &mut ()) {
123 TimeoutAction::Drop => return Ok(PostAction::Remove),
124 TimeoutAction::ToInstant(instant) => instant,
125 TimeoutAction::ToDuration(duration) => match Instant::now().checked_add(duration) {
126 Some(new_deadline) => new_deadline,
127 None => {
128 self.deadline = None;
130 return Ok(PostAction::Remove);
131 }
132 },
133 };
134 registration.wheel.borrow_mut().insert_reuse(
136 registration.counter,
137 new_deadline,
138 registration.token,
139 );
140 self.deadline = Some(new_deadline);
141 }
142 Ok(PostAction::Continue)
143 }
144
145 fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
146 if let Some(deadline) = self.deadline {
148 let wheel = poll.timers.clone();
149 let token = token_factory.token();
150 let counter = wheel.borrow_mut().insert(deadline, token);
151 self.registration = Some(Registration {
152 token,
153 wheel,
154 counter,
155 });
156 }
157
158 Ok(())
159 }
160
161 fn reregister(
162 &mut self,
163 poll: &mut Poll,
164 token_factory: &mut TokenFactory,
165 ) -> crate::Result<()> {
166 self.unregister(poll)?;
167 self.register(poll, token_factory)
168 }
169
170 fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
171 if let Some(registration) = self.registration.take() {
172 poll.timers.borrow_mut().cancel(registration.counter);
173 }
174 Ok(())
175 }
176}
177
178#[derive(Debug)]
180pub enum TimeoutAction {
181 Drop,
183 ToInstant(Instant),
185 ToDuration(Duration),
187}
188
189#[derive(Debug)]
191struct TimeoutData {
192 deadline: Instant,
193 token: Token,
194 counter: u32,
195}
196
197#[derive(Debug)]
199pub(crate) struct TimerWheel {
200 heap: BinaryHeap<TimeoutData>,
201 counter: u32,
202}
203
204impl TimerWheel {
205 pub(crate) fn new() -> TimerWheel {
206 TimerWheel {
207 heap: BinaryHeap::new(),
208 counter: 0,
209 }
210 }
211
212 pub(crate) fn insert(&mut self, deadline: Instant, token: Token) -> u32 {
213 self.heap.push(TimeoutData {
214 deadline,
215 token,
216 counter: self.counter,
217 });
218 let ret = self.counter;
219 self.counter += 1;
220 ret
221 }
222
223 pub(crate) fn insert_reuse(&mut self, counter: u32, deadline: Instant, token: Token) {
224 self.heap.push(TimeoutData {
225 deadline,
226 token,
227 counter,
228 });
229 }
230
231 pub(crate) fn cancel(&mut self, counter: u32) {
232 if self
233 .heap
234 .peek()
235 .map(|data| data.counter == counter)
236 .unwrap_or(false)
237 {
238 self.heap.pop();
239 return;
240 };
241
242 self.heap.retain(|data| data.counter != counter);
243 }
244
245 pub(crate) fn next_expired(&mut self, now: Instant) -> Option<(u32, Token)> {
246 self.heap.peek().filter(|data| now >= data.deadline)?;
248
249 let data = self.heap.pop().unwrap();
251 Some((data.counter, data.token))
252 }
253
254 pub(crate) fn next_deadline(&self) -> Option<std::time::Instant> {
255 self.heap.peek().map(|data| data.deadline)
256 }
257}
258
259impl std::cmp::Ord for TimeoutData {
262 #[inline]
263 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
264 self.deadline.cmp(&other.deadline).reverse()
266 }
267}
268
269impl std::cmp::PartialOrd for TimeoutData {
270 #[inline]
271 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
272 Some(self.cmp(other))
273 }
274}
275
276impl std::cmp::PartialEq for TimeoutData {
279 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
280 #[inline]
281 fn eq(&self, other: &Self) -> bool {
282 self.deadline == other.deadline
283 }
284}
285
286impl std::cmp::Eq for TimeoutData {}
287
288pub struct TimeoutFuture {
292 deadline: Option<Instant>,
293 waker: Rc<RefCell<Option<Waker>>>,
294}
295
296impl std::fmt::Debug for TimeoutFuture {
297 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
298 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
299 f.debug_struct("TimeoutFuture")
300 .field("deadline", &self.deadline)
301 .finish_non_exhaustive()
302 }
303}
304
305impl TimeoutFuture {
306 pub fn from_duration<Data>(handle: &LoopHandle<'_, Data>, duration: Duration) -> TimeoutFuture {
308 Self::from_deadline_inner(handle, Instant::now().checked_add(duration))
309 }
310
311 pub fn from_deadline<Data>(handle: &LoopHandle<'_, Data>, deadline: Instant) -> TimeoutFuture {
313 Self::from_deadline_inner(handle, Some(deadline))
314 }
315
316 fn from_deadline_inner<Data>(
318 handle: &LoopHandle<'_, Data>,
319 deadline: Option<Instant>,
320 ) -> TimeoutFuture {
321 let timer = Timer::from_deadline_inner(deadline);
322 let waker = Rc::new(RefCell::new(None::<Waker>));
323 handle
324 .insert_source(timer, {
325 let waker = waker.clone();
326 move |_, &mut (), _| {
327 if let Some(waker) = waker.borrow_mut().clone() {
328 waker.wake()
329 }
330 TimeoutAction::Drop
331 }
332 })
333 .unwrap();
334
335 TimeoutFuture { deadline, waker }
336 }
337}
338
339impl std::future::Future for TimeoutFuture {
340 type Output = ();
341
342 fn poll(
343 self: std::pin::Pin<&mut Self>,
344 cx: &mut std::task::Context<'_>,
345 ) -> std::task::Poll<Self::Output> {
346 match self.deadline {
347 None => return std::task::Poll::Pending,
348
349 Some(deadline) => {
350 if Instant::now() >= deadline {
351 return std::task::Poll::Ready(());
352 }
353 }
354 }
355
356 *self.waker.borrow_mut() = Some(cx.waker().clone());
357 std::task::Poll::Pending
358 }
359}
360
361#[cfg(test)]
362mod tests {
363 use super::*;
364 use crate::*;
365 use std::time::Duration;
366
367 #[test]
368 fn simple_timer() {
369 let mut event_loop = EventLoop::try_new().unwrap();
370
371 let mut dispatched = false;
372
373 event_loop
374 .handle()
375 .insert_source(
376 Timer::from_duration(Duration::from_millis(100)),
377 |_, &mut (), dispatched| {
378 *dispatched = true;
379 TimeoutAction::Drop
380 },
381 )
382 .unwrap();
383
384 event_loop
385 .dispatch(Some(Duration::ZERO), &mut dispatched)
386 .unwrap();
387 assert!(!dispatched);
389
390 event_loop
391 .dispatch(Some(Duration::from_millis(150)), &mut dispatched)
392 .unwrap();
393 assert!(dispatched);
395 }
396
397 #[test]
398 fn simple_timer_instant() {
399 let mut event_loop = EventLoop::try_new().unwrap();
400
401 let mut dispatched = false;
402
403 event_loop
404 .handle()
405 .insert_source(
406 Timer::from_duration(Duration::from_millis(100)),
407 |_, &mut (), dispatched| {
408 *dispatched = true;
409 TimeoutAction::Drop
410 },
411 )
412 .unwrap();
413
414 event_loop
415 .dispatch(Some(Duration::from_millis(150)), &mut dispatched)
416 .unwrap();
417 assert!(dispatched);
419 }
420
421 #[test]
422 fn immediate_timer() {
423 let mut event_loop = EventLoop::try_new().unwrap();
424
425 let mut dispatched = false;
426
427 event_loop
428 .handle()
429 .insert_source(Timer::immediate(), |_, &mut (), dispatched| {
430 *dispatched = true;
431 TimeoutAction::Drop
432 })
433 .unwrap();
434
435 event_loop
436 .dispatch(Some(Duration::ZERO), &mut dispatched)
437 .unwrap();
438 assert!(dispatched);
440 }
441
442 #[test]
446 fn high_precision_timer() {
447 let mut event_loop = EventLoop::try_new().unwrap();
448
449 let mut dispatched = false;
450
451 event_loop
452 .handle()
453 .insert_source(
454 Timer::from_duration(Duration::from_millis(100)),
455 |_, &mut (), dispatched| {
456 *dispatched = true;
457 TimeoutAction::Drop
458 },
459 )
460 .unwrap();
461
462 event_loop
463 .dispatch(Some(Duration::ZERO), &mut dispatched)
464 .unwrap();
465 assert!(!dispatched);
467
468 event_loop
469 .dispatch(Some(Duration::from_micros(10200)), &mut dispatched)
470 .unwrap();
471 assert!(!dispatched);
473
474 event_loop
475 .dispatch(Some(Duration::from_millis(100)), &mut dispatched)
476 .unwrap();
477 assert!(dispatched);
479 }
480
481 #[test]
482 fn cancel_timer() {
483 let mut event_loop = EventLoop::try_new().unwrap();
484
485 let mut dispatched = false;
486
487 let token = event_loop
488 .handle()
489 .insert_source(
490 Timer::from_duration(Duration::from_millis(100)),
491 |_, &mut (), dispatched| {
492 *dispatched = true;
493 TimeoutAction::Drop
494 },
495 )
496 .unwrap();
497
498 event_loop
499 .dispatch(Some(Duration::ZERO), &mut dispatched)
500 .unwrap();
501 assert!(!dispatched);
503
504 event_loop.handle().remove(token);
505
506 event_loop
507 .dispatch(Some(Duration::from_millis(150)), &mut dispatched)
508 .unwrap();
509 assert!(!dispatched);
511 }
512
513 #[test]
514 fn repeating_timer() {
515 let mut event_loop = EventLoop::try_new().unwrap();
516
517 let mut dispatched = 0;
518
519 event_loop
520 .handle()
521 .insert_source(
522 Timer::from_duration(Duration::from_millis(500)),
523 |_, &mut (), dispatched| {
524 *dispatched += 1;
525 TimeoutAction::ToDuration(Duration::from_millis(500))
526 },
527 )
528 .unwrap();
529
530 event_loop
531 .dispatch(Some(Duration::from_millis(250)), &mut dispatched)
532 .unwrap();
533 assert_eq!(dispatched, 0);
534
535 event_loop
536 .dispatch(Some(Duration::from_millis(510)), &mut dispatched)
537 .unwrap();
538 assert_eq!(dispatched, 1);
539
540 event_loop
541 .dispatch(Some(Duration::from_millis(510)), &mut dispatched)
542 .unwrap();
543 assert_eq!(dispatched, 2);
544
545 event_loop
546 .dispatch(Some(Duration::from_millis(510)), &mut dispatched)
547 .unwrap();
548 assert_eq!(dispatched, 3);
549 }
550
551 #[cfg(feature = "executor")]
552 #[test]
553 fn timeout_future() {
554 let mut event_loop = EventLoop::try_new().unwrap();
555
556 let mut dispatched = 0;
557
558 let timeout_1 =
559 TimeoutFuture::from_duration(&event_loop.handle(), Duration::from_millis(500));
560 let timeout_2 =
561 TimeoutFuture::from_duration(&event_loop.handle(), Duration::from_millis(1500));
562 let timeout_3 = TimeoutFuture::from_duration(&event_loop.handle(), Duration::MAX);
564
565 let (exec, sched) = crate::sources::futures::executor().unwrap();
566 event_loop
567 .handle()
568 .insert_source(exec, move |(), &mut (), got| {
569 *got += 1;
570 })
571 .unwrap();
572
573 sched.schedule(timeout_1).unwrap();
574 sched.schedule(timeout_2).unwrap();
575 sched.schedule(timeout_3).unwrap();
576
577 event_loop
581 .dispatch(Some(Duration::ZERO), &mut dispatched)
582 .unwrap();
583 event_loop
584 .dispatch(Some(Duration::ZERO), &mut dispatched)
585 .unwrap();
586 assert_eq!(dispatched, 0);
587
588 event_loop
589 .dispatch(Some(Duration::from_millis(1000)), &mut dispatched)
590 .unwrap();
591 event_loop
592 .dispatch(Some(Duration::ZERO), &mut dispatched)
593 .unwrap();
594 assert_eq!(dispatched, 1);
595
596 event_loop
597 .dispatch(Some(Duration::from_millis(1100)), &mut dispatched)
598 .unwrap();
599 event_loop
600 .dispatch(Some(Duration::ZERO), &mut dispatched)
601 .unwrap();
602 assert_eq!(dispatched, 2);
603 }
604
605 #[test]
606 fn no_overflow() {
607 let mut event_loop = EventLoop::try_new().unwrap();
608
609 let mut dispatched = 0;
610
611 event_loop
612 .handle()
613 .insert_source(
614 Timer::from_duration(Duration::from_millis(500)),
615 |_, &mut (), dispatched| {
616 *dispatched += 1;
617 TimeoutAction::Drop
618 },
619 )
620 .unwrap();
621
622 event_loop
623 .handle()
624 .insert_source(Timer::from_duration(Duration::MAX), |_, &mut (), _| {
625 panic!("This timer should never go off")
626 })
627 .unwrap();
628
629 event_loop
630 .dispatch(Some(Duration::from_millis(250)), &mut dispatched)
631 .unwrap();
632 assert_eq!(dispatched, 0);
633
634 event_loop
635 .dispatch(Some(Duration::from_millis(510)), &mut dispatched)
636 .unwrap();
637 assert_eq!(dispatched, 1);
638
639 event_loop
640 .dispatch(Some(Duration::from_millis(510)), &mut dispatched)
641 .unwrap();
642 assert_eq!(dispatched, 1);
643 }
644}