1use std::cmp;
12use std::fmt;
13use std::sync::mpsc;
14
15use crate::{EventSource, Poll, PostAction, Readiness, Token, TokenFactory};
16
17use super::ping::{make_ping, Ping, PingError, PingSource};
18
19const MAX_EVENTS_CHECK: usize = 1024;
20
21#[derive(Debug)]
23pub enum Event<T> {
24 Msg(T),
26 Closed,
31}
32
33#[derive(Debug)]
37pub struct Sender<T> {
38 sender: mpsc::Sender<T>,
39 ping: Ping,
40}
41
42impl<T> Clone for Sender<T> {
43 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
44 fn clone(&self) -> Sender<T> {
45 Sender {
46 sender: self.sender.clone(),
47 ping: self.ping.clone(),
48 }
49 }
50}
51
52impl<T> Sender<T> {
53 pub fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> {
58 self.sender.send(t).map(|()| self.ping.ping())
59 }
60}
61
62impl<T> Drop for Sender<T> {
63 fn drop(&mut self) {
64 self.ping.ping();
66 }
67}
68
69#[derive(Debug)]
73pub struct SyncSender<T> {
74 sender: mpsc::SyncSender<T>,
75 ping: Ping,
76}
77
78impl<T> Clone for SyncSender<T> {
79 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
80 fn clone(&self) -> SyncSender<T> {
81 SyncSender {
82 sender: self.sender.clone(),
83 ping: self.ping.clone(),
84 }
85 }
86}
87
88impl<T> SyncSender<T> {
89 pub fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> {
99 let ret = self.try_send(t);
100 match ret {
101 Ok(()) => Ok(()),
102 Err(mpsc::TrySendError::Full(t)) => self.sender.send(t).map(|()| self.ping.ping()),
103 Err(mpsc::TrySendError::Disconnected(t)) => Err(mpsc::SendError(t)),
104 }
105 }
106
107 pub fn try_send(&self, t: T) -> Result<(), mpsc::TrySendError<T>> {
114 let ret = self.sender.try_send(t);
115 if let Ok(()) | Err(mpsc::TrySendError::Full(_)) = ret {
116 self.ping.ping();
117 }
118 ret
119 }
120}
121
122#[derive(Debug)]
126pub struct Channel<T> {
127 receiver: mpsc::Receiver<T>,
128 source: PingSource,
129 ping: Ping,
130 capacity: usize,
131}
132
133unsafe impl<T: Send> Send for Channel<T> {}
139
140impl<T> Channel<T> {
141 pub fn recv(&self) -> Result<T, mpsc::RecvError> {
147 self.receiver.recv()
148 }
149
150 pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
156 self.receiver.try_recv()
157 }
158}
159
160pub fn channel<T>() -> (Sender<T>, Channel<T>) {
162 let (sender, receiver) = mpsc::channel();
163 let (ping, source) = make_ping().expect("Failed to create a Ping.");
164 (
165 Sender {
166 sender,
167 ping: ping.clone(),
168 },
169 Channel {
170 receiver,
171 ping,
172 source,
173 capacity: usize::MAX,
174 },
175 )
176}
177
178pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Channel<T>) {
180 let (sender, receiver) = mpsc::sync_channel(bound);
181 let (ping, source) = make_ping().expect("Failed to create a Ping.");
182 (
183 SyncSender {
184 sender,
185 ping: ping.clone(),
186 },
187 Channel {
188 receiver,
189 source,
190 ping,
191 capacity: bound,
192 },
193 )
194}
195
196impl<T> EventSource for Channel<T> {
197 type Event = Event<T>;
198 type Metadata = ();
199 type Ret = ();
200 type Error = ChannelError;
201
202 fn process_events<C>(
203 &mut self,
204 readiness: Readiness,
205 token: Token,
206 mut callback: C,
207 ) -> Result<PostAction, Self::Error>
208 where
209 C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
210 {
211 let receiver = &self.receiver;
212 let capacity = self.capacity;
213 let mut clear_readiness = false;
214 let mut disconnected = false;
215
216 let action = self
217 .source
218 .process_events(readiness, token, |(), &mut ()| {
219 let max = cmp::min(capacity.saturating_add(1), MAX_EVENTS_CHECK);
221 for _ in 0..max {
222 match receiver.try_recv() {
223 Ok(val) => callback(Event::Msg(val), &mut ()),
224 Err(mpsc::TryRecvError::Empty) => {
225 clear_readiness = true;
226 break;
227 }
228 Err(mpsc::TryRecvError::Disconnected) => {
229 callback(Event::Closed, &mut ());
230 disconnected = true;
231 break;
232 }
233 }
234 }
235 })
236 .map_err(ChannelError)?;
237
238 if disconnected {
239 Ok(PostAction::Remove)
240 } else if clear_readiness {
241 Ok(action)
242 } else {
243 self.ping.ping();
245 Ok(PostAction::Continue)
246 }
247 }
248
249 fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
250 self.source.register(poll, token_factory)
251 }
252
253 fn reregister(
254 &mut self,
255 poll: &mut Poll,
256 token_factory: &mut TokenFactory,
257 ) -> crate::Result<()> {
258 self.source.reregister(poll, token_factory)
259 }
260
261 fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
262 self.source.unregister(poll)
263 }
264}
265
266#[derive(Debug)]
268pub struct ChannelError(PingError);
269
270impl fmt::Display for ChannelError {
271 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
272 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
273 fmt::Display::fmt(&self.0, f)
274 }
275}
276
277impl std::error::Error for ChannelError {
278 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
279 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
280 Some(&self.0)
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use super::*;
287
288 #[test]
289 fn basic_channel() {
290 let mut event_loop = crate::EventLoop::try_new().unwrap();
291
292 let handle = event_loop.handle();
293
294 let (tx, rx) = channel::<()>();
295
296 let mut got = (false, false);
298
299 let _channel_token = handle
300 .insert_source(rx, move |evt, &mut (), got: &mut (bool, bool)| match evt {
301 Event::Msg(()) => {
302 got.0 = true;
303 }
304 Event::Closed => {
305 got.1 = true;
306 }
307 })
308 .unwrap();
309
310 event_loop
312 .dispatch(Some(::std::time::Duration::ZERO), &mut got)
313 .unwrap();
314
315 assert_eq!(got, (false, false));
316
317 tx.send(()).unwrap();
319 event_loop
320 .dispatch(Some(::std::time::Duration::ZERO), &mut got)
321 .unwrap();
322
323 assert_eq!(got, (true, false));
324
325 ::std::mem::drop(tx);
327 event_loop
328 .dispatch(Some(::std::time::Duration::ZERO), &mut got)
329 .unwrap();
330
331 assert_eq!(got, (true, true));
332 }
333
334 #[test]
335 fn basic_sync_channel() {
336 let mut event_loop = crate::EventLoop::try_new().unwrap();
337
338 let handle = event_loop.handle();
339
340 let (tx, rx) = sync_channel::<()>(2);
341
342 let mut received = (0, false);
343
344 let _channel_token = handle
345 .insert_source(
346 rx,
347 move |evt, &mut (), received: &mut (u32, bool)| match evt {
348 Event::Msg(()) => {
349 received.0 += 1;
350 }
351 Event::Closed => {
352 received.1 = true;
353 }
354 },
355 )
356 .unwrap();
357
358 event_loop
360 .dispatch(Some(::std::time::Duration::ZERO), &mut received)
361 .unwrap();
362
363 assert_eq!(received.0, 0);
364 assert!(!received.1);
365
366 tx.send(()).unwrap();
368 tx.send(()).unwrap();
369 assert!(tx.try_send(()).is_err());
370
371 event_loop
373 .dispatch(Some(::std::time::Duration::ZERO), &mut received)
374 .unwrap();
375
376 assert_eq!(received.0, 2);
377 assert!(!received.1);
378
379 tx.send(()).unwrap();
381 std::mem::drop(tx);
382
383 event_loop
385 .dispatch(Some(::std::time::Duration::ZERO), &mut received)
386 .unwrap();
387
388 assert_eq!(received.0, 3);
389 assert!(received.1);
390 }
391
392 #[test]
393 fn test_more_than_1024() {
394 let mut event_loop = crate::EventLoop::try_new().unwrap();
395 let handle = event_loop.handle();
396
397 let (tx, rx) = channel::<()>();
398 let mut received = (0u32, false);
399
400 handle
401 .insert_source(
402 rx,
403 move |evt, &mut (), received: &mut (u32, bool)| match evt {
404 Event::Msg(()) => received.0 += 1,
405 Event::Closed => received.1 = true,
406 },
407 )
408 .unwrap();
409
410 event_loop
411 .dispatch(Some(std::time::Duration::ZERO), &mut received)
412 .unwrap();
413
414 assert_eq!(received.0, 0);
415 assert!(!received.1);
416
417 for _ in 0..MAX_EVENTS_CHECK + 1 {
419 tx.send(()).unwrap();
420 }
421
422 event_loop
423 .dispatch(Some(std::time::Duration::ZERO), &mut received)
424 .unwrap();
425
426 assert_eq!(received.0, MAX_EVENTS_CHECK as u32);
427 assert!(!received.1);
428
429 event_loop
430 .dispatch(Some(std::time::Duration::ZERO), &mut received)
431 .unwrap();
432
433 assert_eq!(received.0, (MAX_EVENTS_CHECK + 1) as u32);
434 assert!(!received.1);
435 }
436}