calloop/sources/
timer.rs

1//! Timer event source
2//!
3//! The [`Timer`] is an event source that will fire its event after a certain amount of time
4//! specified at creation. Its timing is tracked directly by the event loop core logic, and it does
5//! not consume any system resource.
6//!
7//! As of calloop v0.11.0, the event loop always uses high-precision timers. However, the timer
8//! precision varies between operating systems; for instance, the scheduler granularity on Windows
9//! is about 16 milliseconds. If you need to rely on good precision timers in general, you may need
10//! to enable realtime features of your OS to ensure your thread is quickly woken up by the system
11//! scheduler.
12//!
13//! The provided event is an [`Instant`] representing the deadline for which this timer has fired
14//! (which can be earlier than the current time depending on the event loop congestion).
15//!
16//! The callback associated with this event source is expected to return a [`TimeoutAction`], which
17//! can be used to implement self-repeating timers by telling calloop to reprogram the same timer
18//! for a later timeout after it has fired.
19
20/*
21 * This module provides two main types:
22 *
23 * - `Timer` is the user-facing type that represents a timer event source
24 * - `TimerWheel` is an internal data structure for tracking registered timeouts, it is used by
25 *   the polling logic in sys/mod.rs
26 */
27
28use std::{
29    cell::RefCell,
30    collections::BinaryHeap,
31    rc::Rc,
32    task::Waker,
33    time::{Duration, Instant},
34};
35
36use crate::{EventSource, LoopHandle, Poll, PostAction, Readiness, Token, TokenFactory};
37
38#[derive(Debug)]
39struct Registration {
40    token: Token,
41    wheel: Rc<RefCell<TimerWheel>>,
42    counter: u32,
43}
44
45/// A timer event source
46///
47/// When registered to the event loop, it will trigger an event once its deadline is reached.
48/// If the deadline is in the past relative to the moment of its insertion in the event loop,
49/// the `TImer` will trigger an event as soon as the event loop is dispatched.
50#[derive(Debug)]
51pub struct Timer {
52    registration: Option<Registration>,
53    deadline: Option<Instant>,
54}
55
56impl Timer {
57    /// Create a timer that will fire immediately when inserted in the event loop
58    pub fn immediate() -> Timer {
59        Self::from_deadline(Instant::now())
60    }
61
62    /// Create a timer that will fire after a given duration from now
63    pub fn from_duration(duration: Duration) -> Timer {
64        Self::from_deadline_inner(Instant::now().checked_add(duration))
65    }
66
67    /// Create a timer that will fire at a given instant
68    pub fn from_deadline(deadline: Instant) -> Timer {
69        Self::from_deadline_inner(Some(deadline))
70    }
71
72    fn from_deadline_inner(deadline: Option<Instant>) -> Timer {
73        Timer {
74            registration: None,
75            deadline,
76        }
77    }
78
79    /// Changes the deadline of this timer to an [`Instant`]
80    ///
81    /// If the `Timer` is currently registered in the event loop, it needs to be
82    /// re-registered for this change to take effect.
83    pub fn set_deadline(&mut self, deadline: Instant) {
84        self.deadline = Some(deadline);
85    }
86
87    /// Changes the deadline of this timer to a [`Duration`] from now
88    ///
89    /// If the `Timer` is currently registered in the event loop, it needs to be
90    /// re-registered for this change to take effect.
91    pub fn set_duration(&mut self, duration: Duration) {
92        self.deadline = Instant::now().checked_add(duration);
93    }
94
95    /// Get the current deadline of this `Timer`
96    ///
97    /// Returns `None` if the timer has overflowed.
98    pub fn current_deadline(&self) -> Option<Instant> {
99        self.deadline
100    }
101}
102
103impl EventSource for Timer {
104    type Event = Instant;
105    type Metadata = ();
106    type Ret = TimeoutAction;
107    type Error = std::io::Error;
108
109    fn process_events<F>(
110        &mut self,
111        _: Readiness,
112        token: Token,
113        mut callback: F,
114    ) -> Result<PostAction, Self::Error>
115    where
116        F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
117    {
118        if let (Some(ref registration), Some(ref deadline)) = (&self.registration, &self.deadline) {
119            if registration.token != token {
120                return Ok(PostAction::Continue);
121            }
122            let new_deadline = match callback(*deadline, &mut ()) {
123                TimeoutAction::Drop => return Ok(PostAction::Remove),
124                TimeoutAction::ToInstant(instant) => instant,
125                TimeoutAction::ToDuration(duration) => match Instant::now().checked_add(duration) {
126                    Some(new_deadline) => new_deadline,
127                    None => {
128                        // The timer has overflowed, meaning we have no choice but to drop it.
129                        self.deadline = None;
130                        return Ok(PostAction::Remove);
131                    }
132                },
133            };
134            // If we received an event, we MUST have a valid counter value
135            registration.wheel.borrow_mut().insert_reuse(
136                registration.counter,
137                new_deadline,
138                registration.token,
139            );
140            self.deadline = Some(new_deadline);
141        }
142        Ok(PostAction::Continue)
143    }
144
145    fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
146        // Only register a deadline if we haven't overflowed.
147        if let Some(deadline) = self.deadline {
148            let wheel = poll.timers.clone();
149            let token = token_factory.token();
150            let counter = wheel.borrow_mut().insert(deadline, token);
151            self.registration = Some(Registration {
152                token,
153                wheel,
154                counter,
155            });
156        }
157
158        Ok(())
159    }
160
161    fn reregister(
162        &mut self,
163        poll: &mut Poll,
164        token_factory: &mut TokenFactory,
165    ) -> crate::Result<()> {
166        self.unregister(poll)?;
167        self.register(poll, token_factory)
168    }
169
170    fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
171        if let Some(registration) = self.registration.take() {
172            poll.timers.borrow_mut().cancel(registration.counter);
173        }
174        Ok(())
175    }
176}
177
178/// Action to reschedule a timeout if necessary
179#[derive(Debug)]
180pub enum TimeoutAction {
181    /// Don't reschedule this timer
182    Drop,
183    /// Reschedule this timer to a given [`Instant`]
184    ToInstant(Instant),
185    /// Reschedule this timer to a given [`Duration`] in the future
186    ToDuration(Duration),
187}
188
189// Internal representation of a timeout registered in the TimerWheel
190#[derive(Debug)]
191struct TimeoutData {
192    deadline: Instant,
193    token: RefCell<Option<Token>>,
194    counter: u32,
195}
196
197// A data structure for tracking registered timeouts
198#[derive(Debug)]
199pub(crate) struct TimerWheel {
200    heap: BinaryHeap<TimeoutData>,
201    counter: u32,
202}
203
204impl TimerWheel {
205    pub(crate) fn new() -> TimerWheel {
206        TimerWheel {
207            heap: BinaryHeap::new(),
208            counter: 0,
209        }
210    }
211
212    pub(crate) fn insert(&mut self, deadline: Instant, token: Token) -> u32 {
213        self.heap.push(TimeoutData {
214            deadline,
215            token: RefCell::new(Some(token)),
216            counter: self.counter,
217        });
218        let ret = self.counter;
219        self.counter += 1;
220        ret
221    }
222
223    pub(crate) fn insert_reuse(&mut self, counter: u32, deadline: Instant, token: Token) {
224        self.heap.push(TimeoutData {
225            deadline,
226            token: RefCell::new(Some(token)),
227            counter,
228        });
229    }
230
231    pub(crate) fn cancel(&mut self, counter: u32) {
232        if self
233            .heap
234            .peek()
235            .map(|data| data.counter == counter)
236            .unwrap_or(false)
237        {
238            self.heap.pop();
239            return;
240        };
241
242        self.heap
243            .iter()
244            .rev()
245            .find(|data| data.counter == counter)
246            .map(|data| data.token.take());
247    }
248
249    pub(crate) fn next_expired(&mut self, now: Instant) -> Option<(u32, Token)> {
250        loop {
251            // check if there is an expired item
252            if let Some(data) = self.heap.peek() {
253                if data.deadline > now {
254                    return None;
255                }
256                // there is an expired timeout, continue the
257                // loop body
258            } else {
259                return None;
260            }
261
262            // There is an item in the heap, this unwrap cannot blow
263            let data = self.heap.pop().unwrap();
264            if let Some(token) = data.token.into_inner() {
265                return Some((data.counter, token));
266            }
267            // otherwise this timeout was cancelled, continue looping
268        }
269    }
270
271    pub(crate) fn next_deadline(&self) -> Option<std::time::Instant> {
272        self.heap.peek().map(|data| data.deadline)
273    }
274}
275
276// trait implementations for TimeoutData
277
278impl std::cmp::Ord for TimeoutData {
279    #[inline]
280    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
281        // earlier values have priority
282        self.deadline.cmp(&other.deadline).reverse()
283    }
284}
285
286impl std::cmp::PartialOrd for TimeoutData {
287    #[inline]
288    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
289        Some(self.cmp(other))
290    }
291}
292
293// This impl is required for PartialOrd but actually never used
294// and the type is private, so ignore its coverage
295impl std::cmp::PartialEq for TimeoutData {
296    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
297    #[inline]
298    fn eq(&self, other: &Self) -> bool {
299        self.deadline == other.deadline
300    }
301}
302
303impl std::cmp::Eq for TimeoutData {}
304
305// Logic for timer futures
306
307/// A future that resolves once a certain timeout is expired
308pub struct TimeoutFuture {
309    deadline: Option<Instant>,
310    waker: Rc<RefCell<Option<Waker>>>,
311}
312
313impl std::fmt::Debug for TimeoutFuture {
314    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
315    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
316        f.debug_struct("TimeoutFuture")
317            .field("deadline", &self.deadline)
318            .finish_non_exhaustive()
319    }
320}
321
322impl TimeoutFuture {
323    /// Create a future that resolves after a given duration
324    pub fn from_duration<Data>(handle: &LoopHandle<'_, Data>, duration: Duration) -> TimeoutFuture {
325        Self::from_deadline_inner(handle, Instant::now().checked_add(duration))
326    }
327
328    /// Create a future that resolves at a given instant
329    pub fn from_deadline<Data>(handle: &LoopHandle<'_, Data>, deadline: Instant) -> TimeoutFuture {
330        Self::from_deadline_inner(handle, Some(deadline))
331    }
332
333    /// Create a future that resolves at a given instant
334    fn from_deadline_inner<Data>(
335        handle: &LoopHandle<'_, Data>,
336        deadline: Option<Instant>,
337    ) -> TimeoutFuture {
338        let timer = Timer::from_deadline_inner(deadline);
339        let waker = Rc::new(RefCell::new(None::<Waker>));
340        handle
341            .insert_source(timer, {
342                let waker = waker.clone();
343                move |_, &mut (), _| {
344                    if let Some(waker) = waker.borrow_mut().clone() {
345                        waker.wake()
346                    }
347                    TimeoutAction::Drop
348                }
349            })
350            .unwrap();
351
352        TimeoutFuture { deadline, waker }
353    }
354}
355
356impl std::future::Future for TimeoutFuture {
357    type Output = ();
358
359    fn poll(
360        self: std::pin::Pin<&mut Self>,
361        cx: &mut std::task::Context<'_>,
362    ) -> std::task::Poll<Self::Output> {
363        match self.deadline {
364            None => return std::task::Poll::Pending,
365
366            Some(deadline) => {
367                if Instant::now() >= deadline {
368                    return std::task::Poll::Ready(());
369                }
370            }
371        }
372
373        *self.waker.borrow_mut() = Some(cx.waker().clone());
374        std::task::Poll::Pending
375    }
376}
377
378#[cfg(test)]
379mod tests {
380    use super::*;
381    use crate::*;
382    use std::time::Duration;
383
384    #[test]
385    fn simple_timer() {
386        let mut event_loop = EventLoop::try_new().unwrap();
387
388        let mut dispatched = false;
389
390        event_loop
391            .handle()
392            .insert_source(
393                Timer::from_duration(Duration::from_millis(100)),
394                |_, &mut (), dispatched| {
395                    *dispatched = true;
396                    TimeoutAction::Drop
397                },
398            )
399            .unwrap();
400
401        event_loop
402            .dispatch(Some(Duration::ZERO), &mut dispatched)
403            .unwrap();
404        // not yet dispatched
405        assert!(!dispatched);
406
407        event_loop
408            .dispatch(Some(Duration::from_millis(150)), &mut dispatched)
409            .unwrap();
410        // now dispatched
411        assert!(dispatched);
412    }
413
414    #[test]
415    fn simple_timer_instant() {
416        let mut event_loop = EventLoop::try_new().unwrap();
417
418        let mut dispatched = false;
419
420        event_loop
421            .handle()
422            .insert_source(
423                Timer::from_duration(Duration::from_millis(100)),
424                |_, &mut (), dispatched| {
425                    *dispatched = true;
426                    TimeoutAction::Drop
427                },
428            )
429            .unwrap();
430
431        event_loop
432            .dispatch(Some(Duration::from_millis(150)), &mut dispatched)
433            .unwrap();
434        // now dispatched
435        assert!(dispatched);
436    }
437
438    #[test]
439    fn immediate_timer() {
440        let mut event_loop = EventLoop::try_new().unwrap();
441
442        let mut dispatched = false;
443
444        event_loop
445            .handle()
446            .insert_source(Timer::immediate(), |_, &mut (), dispatched| {
447                *dispatched = true;
448                TimeoutAction::Drop
449            })
450            .unwrap();
451
452        event_loop
453            .dispatch(Some(Duration::ZERO), &mut dispatched)
454            .unwrap();
455        // now dispatched
456        assert!(dispatched);
457    }
458
459    // We cannot actually test high precision timers, as they are only high precision in release mode
460    // This test is here to ensure that the high-precision codepath are executed and work as intended
461    // even if we cannot test if they are actually high precision
462    #[test]
463    fn high_precision_timer() {
464        let mut event_loop = EventLoop::try_new().unwrap();
465
466        let mut dispatched = false;
467
468        event_loop
469            .handle()
470            .insert_source(
471                Timer::from_duration(Duration::from_millis(100)),
472                |_, &mut (), dispatched| {
473                    *dispatched = true;
474                    TimeoutAction::Drop
475                },
476            )
477            .unwrap();
478
479        event_loop
480            .dispatch(Some(Duration::ZERO), &mut dispatched)
481            .unwrap();
482        // not yet dispatched
483        assert!(!dispatched);
484
485        event_loop
486            .dispatch(Some(Duration::from_micros(10200)), &mut dispatched)
487            .unwrap();
488        // yet not dispatched
489        assert!(!dispatched);
490
491        event_loop
492            .dispatch(Some(Duration::from_millis(100)), &mut dispatched)
493            .unwrap();
494        // now dispatched
495        assert!(dispatched);
496    }
497
498    #[test]
499    fn cancel_timer() {
500        let mut event_loop = EventLoop::try_new().unwrap();
501
502        let mut dispatched = false;
503
504        let token = event_loop
505            .handle()
506            .insert_source(
507                Timer::from_duration(Duration::from_millis(100)),
508                |_, &mut (), dispatched| {
509                    *dispatched = true;
510                    TimeoutAction::Drop
511                },
512            )
513            .unwrap();
514
515        event_loop
516            .dispatch(Some(Duration::ZERO), &mut dispatched)
517            .unwrap();
518        // not yet dispatched
519        assert!(!dispatched);
520
521        event_loop.handle().remove(token);
522
523        event_loop
524            .dispatch(Some(Duration::from_millis(150)), &mut dispatched)
525            .unwrap();
526        // still not dispatched
527        assert!(!dispatched);
528    }
529
530    #[test]
531    fn repeating_timer() {
532        let mut event_loop = EventLoop::try_new().unwrap();
533
534        let mut dispatched = 0;
535
536        event_loop
537            .handle()
538            .insert_source(
539                Timer::from_duration(Duration::from_millis(500)),
540                |_, &mut (), dispatched| {
541                    *dispatched += 1;
542                    TimeoutAction::ToDuration(Duration::from_millis(500))
543                },
544            )
545            .unwrap();
546
547        event_loop
548            .dispatch(Some(Duration::from_millis(250)), &mut dispatched)
549            .unwrap();
550        assert_eq!(dispatched, 0);
551
552        event_loop
553            .dispatch(Some(Duration::from_millis(510)), &mut dispatched)
554            .unwrap();
555        assert_eq!(dispatched, 1);
556
557        event_loop
558            .dispatch(Some(Duration::from_millis(510)), &mut dispatched)
559            .unwrap();
560        assert_eq!(dispatched, 2);
561
562        event_loop
563            .dispatch(Some(Duration::from_millis(510)), &mut dispatched)
564            .unwrap();
565        assert_eq!(dispatched, 3);
566    }
567
568    #[cfg(feature = "executor")]
569    #[test]
570    fn timeout_future() {
571        let mut event_loop = EventLoop::try_new().unwrap();
572
573        let mut dispatched = 0;
574
575        let timeout_1 =
576            TimeoutFuture::from_duration(&event_loop.handle(), Duration::from_millis(500));
577        let timeout_2 =
578            TimeoutFuture::from_duration(&event_loop.handle(), Duration::from_millis(1500));
579        // This one should never go off.
580        let timeout_3 = TimeoutFuture::from_duration(&event_loop.handle(), Duration::MAX);
581
582        let (exec, sched) = crate::sources::futures::executor().unwrap();
583        event_loop
584            .handle()
585            .insert_source(exec, move |(), &mut (), got| {
586                *got += 1;
587            })
588            .unwrap();
589
590        sched.schedule(timeout_1).unwrap();
591        sched.schedule(timeout_2).unwrap();
592        sched.schedule(timeout_3).unwrap();
593
594        // We do a 0-timeout dispatch after every regular dispatch to let the timeout triggers
595        // flow back to the executor
596
597        event_loop
598            .dispatch(Some(Duration::ZERO), &mut dispatched)
599            .unwrap();
600        event_loop
601            .dispatch(Some(Duration::ZERO), &mut dispatched)
602            .unwrap();
603        assert_eq!(dispatched, 0);
604
605        event_loop
606            .dispatch(Some(Duration::from_millis(1000)), &mut dispatched)
607            .unwrap();
608        event_loop
609            .dispatch(Some(Duration::ZERO), &mut dispatched)
610            .unwrap();
611        assert_eq!(dispatched, 1);
612
613        event_loop
614            .dispatch(Some(Duration::from_millis(1100)), &mut dispatched)
615            .unwrap();
616        event_loop
617            .dispatch(Some(Duration::ZERO), &mut dispatched)
618            .unwrap();
619        assert_eq!(dispatched, 2);
620    }
621
622    #[test]
623    fn no_overflow() {
624        let mut event_loop = EventLoop::try_new().unwrap();
625
626        let mut dispatched = 0;
627
628        event_loop
629            .handle()
630            .insert_source(
631                Timer::from_duration(Duration::from_millis(500)),
632                |_, &mut (), dispatched| {
633                    *dispatched += 1;
634                    TimeoutAction::Drop
635                },
636            )
637            .unwrap();
638
639        event_loop
640            .handle()
641            .insert_source(Timer::from_duration(Duration::MAX), |_, &mut (), _| {
642                panic!("This timer should never go off")
643            })
644            .unwrap();
645
646        event_loop
647            .dispatch(Some(Duration::from_millis(250)), &mut dispatched)
648            .unwrap();
649        assert_eq!(dispatched, 0);
650
651        event_loop
652            .dispatch(Some(Duration::from_millis(510)), &mut dispatched)
653            .unwrap();
654        assert_eq!(dispatched, 1);
655
656        event_loop
657            .dispatch(Some(Duration::from_millis(510)), &mut dispatched)
658            .unwrap();
659        assert_eq!(dispatched, 1);
660    }
661}