calloop/sources/
channel.rs

1//! An MPSC channel whose receiving end is an event source
2//!
3//! Create a channel using [`channel()`](channel), which returns a
4//! [`Sender`] that can be cloned and sent accross threads if `T: Send`,
5//! and a [`Channel`] that can be inserted into an [`EventLoop`](crate::EventLoop).
6//! It will generate one event per message.
7//!
8//! A synchronous version of the channel is provided by [`sync_channel`], in which
9//! the [`SyncSender`] will block when the channel is full.
10
11use std::cmp;
12use std::fmt;
13use std::sync::mpsc;
14
15use crate::{EventSource, Poll, PostAction, Readiness, Token, TokenFactory};
16
17use super::ping::{make_ping, Ping, PingError, PingSource};
18
19const MAX_EVENTS_CHECK: usize = 1024;
20
21/// The events generated by the channel event source
22#[derive(Debug)]
23pub enum Event<T> {
24    /// A message was received and is bundled here
25    Msg(T),
26    /// The channel was closed
27    ///
28    /// This means all the `Sender`s associated with this channel
29    /// have been dropped, no more messages will ever be received.
30    Closed,
31}
32
33/// The sender end of a channel
34///
35/// It can be cloned and sent accross threads (if `T` is).
36#[derive(Debug)]
37pub struct Sender<T> {
38    sender: mpsc::Sender<T>,
39    ping: Ping,
40}
41
42impl<T> Clone for Sender<T> {
43    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
44    fn clone(&self) -> Sender<T> {
45        Sender {
46            sender: self.sender.clone(),
47            ping: self.ping.clone(),
48        }
49    }
50}
51
52impl<T> Sender<T> {
53    /// Send a message to the channel
54    ///
55    /// This will wake the event loop and deliver an `Event::Msg` to
56    /// it containing the provided value.
57    pub fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> {
58        self.sender.send(t).map(|()| self.ping.ping())
59    }
60}
61
62impl<T> Drop for Sender<T> {
63    fn drop(&mut self) {
64        // ping on drop, to notify about channel closure
65        self.ping.ping();
66    }
67}
68
69/// The sender end of a synchronous channel
70///
71/// It can be cloned and sent accross threads (if `T` is).
72#[derive(Debug)]
73pub struct SyncSender<T> {
74    sender: mpsc::SyncSender<T>,
75    ping: Ping,
76}
77
78impl<T> Clone for SyncSender<T> {
79    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
80    fn clone(&self) -> SyncSender<T> {
81        SyncSender {
82            sender: self.sender.clone(),
83            ping: self.ping.clone(),
84        }
85    }
86}
87
88impl<T> SyncSender<T> {
89    /// Send a message to the synchronous channel
90    ///
91    /// This will wake the event loop and deliver an `Event::Msg` to
92    /// it containing the provided value. If the channel is full, this
93    /// function will block until the event loop empties it and it can
94    /// deliver the message.
95    ///
96    /// Due to the blocking behavior, this method should not be used on the
97    /// same thread as the one running the event loop, as it could cause deadlocks.
98    pub fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> {
99        let ret = self.try_send(t);
100        match ret {
101            Ok(()) => Ok(()),
102            Err(mpsc::TrySendError::Full(t)) => self.sender.send(t).map(|()| self.ping.ping()),
103            Err(mpsc::TrySendError::Disconnected(t)) => Err(mpsc::SendError(t)),
104        }
105    }
106
107    /// Send a message to the synchronous channel
108    ///
109    /// This will wake the event loop and deliver an `Event::Msg` to
110    /// it containing the provided value. If the channel is full, this
111    /// function will return an error, but the event loop will still be
112    /// signaled for readiness.
113    pub fn try_send(&self, t: T) -> Result<(), mpsc::TrySendError<T>> {
114        let ret = self.sender.try_send(t);
115        if let Ok(()) | Err(mpsc::TrySendError::Full(_)) = ret {
116            self.ping.ping();
117        }
118        ret
119    }
120}
121
122/// The receiving end of the channel
123///
124/// This is the event source to be inserted into your `EventLoop`.
125#[derive(Debug)]
126pub struct Channel<T> {
127    receiver: mpsc::Receiver<T>,
128    source: PingSource,
129    ping: Ping,
130    capacity: usize,
131}
132
133// This impl is safe because the Channel is only able to move around threads
134// when it is not inserted into an event loop. (Otherwise it is stuck into
135// a Source<_> and the internals of calloop, which are not Send).
136// At this point, the Arc<Receiver> has a count of 1, and it is obviously
137// safe to Send between threads.
138unsafe impl<T: Send> Send for Channel<T> {}
139
140impl<T> Channel<T> {
141    /// Proxy for [`mpsc::Receiver::recv`] to manually poll events.
142    ///
143    /// *Note*: Normally you would want to use the `Channel` by inserting
144    /// it into an event loop instead. Use this for example to immediately
145    /// dispatch pending events after creation.
146    pub fn recv(&self) -> Result<T, mpsc::RecvError> {
147        self.receiver.recv()
148    }
149
150    /// Proxy for [`mpsc::Receiver::try_recv`] to manually poll events.
151    ///
152    /// *Note*: Normally you would want to use the `Channel` by inserting
153    /// it into an event loop instead. Use this for example to immediately
154    /// dispatch pending events after creation.
155    pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
156        self.receiver.try_recv()
157    }
158}
159
160/// Create a new asynchronous channel
161pub fn channel<T>() -> (Sender<T>, Channel<T>) {
162    let (sender, receiver) = mpsc::channel();
163    let (ping, source) = make_ping().expect("Failed to create a Ping.");
164    (
165        Sender {
166            sender,
167            ping: ping.clone(),
168        },
169        Channel {
170            receiver,
171            ping,
172            source,
173            capacity: usize::MAX,
174        },
175    )
176}
177
178/// Create a new synchronous, bounded channel
179pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Channel<T>) {
180    let (sender, receiver) = mpsc::sync_channel(bound);
181    let (ping, source) = make_ping().expect("Failed to create a Ping.");
182    (
183        SyncSender {
184            sender,
185            ping: ping.clone(),
186        },
187        Channel {
188            receiver,
189            source,
190            ping,
191            capacity: bound,
192        },
193    )
194}
195
196impl<T> EventSource for Channel<T> {
197    type Event = Event<T>;
198    type Metadata = ();
199    type Ret = ();
200    type Error = ChannelError;
201
202    fn process_events<C>(
203        &mut self,
204        readiness: Readiness,
205        token: Token,
206        mut callback: C,
207    ) -> Result<PostAction, Self::Error>
208    where
209        C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
210    {
211        let receiver = &self.receiver;
212        let capacity = self.capacity;
213        let mut clear_readiness = false;
214        let mut disconnected = false;
215
216        let action = self
217            .source
218            .process_events(readiness, token, |(), &mut ()| {
219                // Limit the number of elements we process at a time to the channel's capacity, or 1024.
220                let max = cmp::min(capacity.saturating_add(1), MAX_EVENTS_CHECK);
221                for _ in 0..max {
222                    match receiver.try_recv() {
223                        Ok(val) => callback(Event::Msg(val), &mut ()),
224                        Err(mpsc::TryRecvError::Empty) => {
225                            clear_readiness = true;
226                            break;
227                        }
228                        Err(mpsc::TryRecvError::Disconnected) => {
229                            callback(Event::Closed, &mut ());
230                            disconnected = true;
231                            break;
232                        }
233                    }
234                }
235            })
236            .map_err(ChannelError)?;
237
238        if disconnected {
239            Ok(PostAction::Remove)
240        } else if clear_readiness {
241            Ok(action)
242        } else {
243            // Re-notify the ping source so we can try again.
244            self.ping.ping();
245            Ok(PostAction::Continue)
246        }
247    }
248
249    fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
250        self.source.register(poll, token_factory)
251    }
252
253    fn reregister(
254        &mut self,
255        poll: &mut Poll,
256        token_factory: &mut TokenFactory,
257    ) -> crate::Result<()> {
258        self.source.reregister(poll, token_factory)
259    }
260
261    fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
262        self.source.unregister(poll)
263    }
264}
265
266/// An error arising from processing events for a channel.
267#[derive(Debug)]
268pub struct ChannelError(PingError);
269
270impl fmt::Display for ChannelError {
271    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
272    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
273        fmt::Display::fmt(&self.0, f)
274    }
275}
276
277impl std::error::Error for ChannelError {
278    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
279    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
280        Some(&self.0)
281    }
282}
283
284#[cfg(test)]
285mod tests {
286    use super::*;
287
288    #[test]
289    fn basic_channel() {
290        let mut event_loop = crate::EventLoop::try_new().unwrap();
291
292        let handle = event_loop.handle();
293
294        let (tx, rx) = channel::<()>();
295
296        // (got_msg, got_closed)
297        let mut got = (false, false);
298
299        let _channel_token = handle
300            .insert_source(rx, move |evt, &mut (), got: &mut (bool, bool)| match evt {
301                Event::Msg(()) => {
302                    got.0 = true;
303                }
304                Event::Closed => {
305                    got.1 = true;
306                }
307            })
308            .unwrap();
309
310        // nothing is sent, nothing is received
311        event_loop
312            .dispatch(Some(::std::time::Duration::ZERO), &mut got)
313            .unwrap();
314
315        assert_eq!(got, (false, false));
316
317        // a message is send
318        tx.send(()).unwrap();
319        event_loop
320            .dispatch(Some(::std::time::Duration::ZERO), &mut got)
321            .unwrap();
322
323        assert_eq!(got, (true, false));
324
325        // the sender is dropped
326        ::std::mem::drop(tx);
327        event_loop
328            .dispatch(Some(::std::time::Duration::ZERO), &mut got)
329            .unwrap();
330
331        assert_eq!(got, (true, true));
332    }
333
334    #[test]
335    fn basic_sync_channel() {
336        let mut event_loop = crate::EventLoop::try_new().unwrap();
337
338        let handle = event_loop.handle();
339
340        let (tx, rx) = sync_channel::<()>(2);
341
342        let mut received = (0, false);
343
344        let _channel_token = handle
345            .insert_source(
346                rx,
347                move |evt, &mut (), received: &mut (u32, bool)| match evt {
348                    Event::Msg(()) => {
349                        received.0 += 1;
350                    }
351                    Event::Closed => {
352                        received.1 = true;
353                    }
354                },
355            )
356            .unwrap();
357
358        // nothing is sent, nothing is received
359        event_loop
360            .dispatch(Some(::std::time::Duration::ZERO), &mut received)
361            .unwrap();
362
363        assert_eq!(received.0, 0);
364        assert!(!received.1);
365
366        // fill the channel
367        tx.send(()).unwrap();
368        tx.send(()).unwrap();
369        assert!(tx.try_send(()).is_err());
370
371        // empty it
372        event_loop
373            .dispatch(Some(::std::time::Duration::ZERO), &mut received)
374            .unwrap();
375
376        assert_eq!(received.0, 2);
377        assert!(!received.1);
378
379        // send a final message and drop the sender
380        tx.send(()).unwrap();
381        std::mem::drop(tx);
382
383        // final read of the channel
384        event_loop
385            .dispatch(Some(::std::time::Duration::ZERO), &mut received)
386            .unwrap();
387
388        assert_eq!(received.0, 3);
389        assert!(received.1);
390    }
391
392    #[test]
393    fn test_more_than_1024() {
394        let mut event_loop = crate::EventLoop::try_new().unwrap();
395        let handle = event_loop.handle();
396
397        let (tx, rx) = channel::<()>();
398        let mut received = (0u32, false);
399
400        handle
401            .insert_source(
402                rx,
403                move |evt, &mut (), received: &mut (u32, bool)| match evt {
404                    Event::Msg(()) => received.0 += 1,
405                    Event::Closed => received.1 = true,
406                },
407            )
408            .unwrap();
409
410        event_loop
411            .dispatch(Some(std::time::Duration::ZERO), &mut received)
412            .unwrap();
413
414        assert_eq!(received.0, 0);
415        assert!(!received.1);
416
417        // Send 1025 elements into the channel.
418        for _ in 0..MAX_EVENTS_CHECK + 1 {
419            tx.send(()).unwrap();
420        }
421
422        event_loop
423            .dispatch(Some(std::time::Duration::ZERO), &mut received)
424            .unwrap();
425
426        assert_eq!(received.0, MAX_EVENTS_CHECK as u32);
427        assert!(!received.1);
428
429        event_loop
430            .dispatch(Some(std::time::Duration::ZERO), &mut received)
431            .unwrap();
432
433        assert_eq!(received.0, (MAX_EVENTS_CHECK + 1) as u32);
434        assert!(!received.1);
435    }
436}