1#![cfg_attr(unix, doc = "```")]
11#![cfg_attr(not(unix), doc = "```no_run")]
12use polling::Poller;
45use std::{borrow, marker::PhantomData, ops, panic::AssertUnwindSafe, sync::Arc};
46
47#[cfg(unix)]
48use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd};
49#[cfg(windows)]
50use std::os::windows::io::{
51 AsRawSocket as AsRawFd, AsSocket as AsFd, BorrowedSocket as BorrowedFd,
52};
53
54use crate::{EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory};
55
56#[derive(Debug)]
58pub struct FdWrapper<T: AsRawFd>(T);
59
60impl<T: AsRawFd> FdWrapper<T> {
61 pub unsafe fn new(inner: T) -> Self {
68 Self(inner)
69 }
70}
71
72impl<T: AsRawFd> ops::Deref for FdWrapper<T> {
73 type Target = T;
74
75 fn deref(&self) -> &Self::Target {
76 &self.0
77 }
78}
79
80impl<T: AsRawFd> ops::DerefMut for FdWrapper<T> {
81 fn deref_mut(&mut self) -> &mut Self::Target {
82 &mut self.0
83 }
84}
85
86impl<T: AsRawFd> AsFd for FdWrapper<T> {
87 #[cfg(unix)]
88 fn as_fd(&self) -> BorrowedFd {
89 unsafe { BorrowedFd::borrow_raw(self.0.as_raw_fd()) }
90 }
91
92 #[cfg(windows)]
93 fn as_socket(&self) -> BorrowedFd {
94 unsafe { BorrowedFd::borrow_raw(self.0.as_raw_socket()) }
95 }
96}
97
98#[derive(Debug)]
114pub struct NoIoDrop<T>(T);
115
116impl<T> NoIoDrop<T> {
117 pub unsafe fn get_mut(&mut self) -> &mut T {
123 &mut self.0
124 }
125}
126
127impl<T> AsRef<T> for NoIoDrop<T> {
128 fn as_ref(&self) -> &T {
129 &self.0
130 }
131}
132
133impl<T> borrow::Borrow<T> for NoIoDrop<T> {
134 fn borrow(&self) -> &T {
135 &self.0
136 }
137}
138
139impl<T> ops::Deref for NoIoDrop<T> {
140 type Target = T;
141
142 fn deref(&self) -> &Self::Target {
143 &self.0
144 }
145}
146
147impl<T: AsFd> AsFd for NoIoDrop<T> {
148 #[cfg(unix)]
149 fn as_fd(&self) -> BorrowedFd<'_> {
150 self.0.as_fd()
152 }
153
154 #[cfg(windows)]
155 fn as_socket(&self) -> BorrowedFd<'_> {
156 self.0.as_socket()
158 }
159}
160
161#[derive(Debug)]
163pub struct Generic<F: AsFd, E = std::io::Error> {
164 file: Option<NoIoDrop<F>>,
168 pub interest: Interest,
170 pub mode: Mode,
172
173 poller: Option<Arc<Poller>>,
177
178 token: Option<Token>,
181
182 _error_type: PhantomData<AssertUnwindSafe<E>>,
184}
185
186impl<F: AsFd> Generic<F, std::io::Error> {
187 pub fn new(file: F, interest: Interest, mode: Mode) -> Generic<F, std::io::Error> {
190 Generic {
191 file: Some(NoIoDrop(file)),
192 interest,
193 mode,
194 token: None,
195 poller: None,
196 _error_type: PhantomData,
197 }
198 }
199
200 pub fn new_with_error<E>(file: F, interest: Interest, mode: Mode) -> Generic<F, E> {
202 Generic {
203 file: Some(NoIoDrop(file)),
204 interest,
205 mode,
206 token: None,
207 poller: None,
208 _error_type: PhantomData,
209 }
210 }
211}
212
213impl<F: AsFd, E> Generic<F, E> {
214 pub fn unwrap(mut self) -> F {
216 let NoIoDrop(file) = self.file.take().unwrap();
217
218 if let Some(poller) = self.poller.take() {
220 poller
221 .delete(
222 #[cfg(unix)]
223 file.as_fd(),
224 #[cfg(windows)]
225 file.as_socket(),
226 )
227 .ok();
228 }
229
230 file
231 }
232
233 pub fn get_ref(&self) -> &F {
235 &self.file.as_ref().unwrap().0
236 }
237
238 pub unsafe fn get_mut(&mut self) -> &mut F {
246 self.file.as_mut().unwrap().get_mut()
247 }
248}
249
250impl<F: AsFd, E> Drop for Generic<F, E> {
251 fn drop(&mut self) {
252 if let (Some(file), Some(poller)) = (self.file.take(), self.poller.take()) {
254 poller
255 .delete(
256 #[cfg(unix)]
257 file.as_fd(),
258 #[cfg(windows)]
259 file.as_socket(),
260 )
261 .ok();
262 }
263 }
264}
265
266impl<F, E> EventSource for Generic<F, E>
267where
268 F: AsFd,
269 E: Into<Box<dyn std::error::Error + Send + Sync>>,
270{
271 type Event = Readiness;
272 type Metadata = NoIoDrop<F>;
273 type Ret = Result<PostAction, E>;
274 type Error = E;
275
276 fn process_events<C>(
277 &mut self,
278 readiness: Readiness,
279 token: Token,
280 mut callback: C,
281 ) -> Result<PostAction, Self::Error>
282 where
283 C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
284 {
285 if self.token != Some(token) {
287 return Ok(PostAction::Continue);
288 }
289
290 callback(readiness, self.file.as_mut().unwrap())
291 }
292
293 fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
294 let token = token_factory.token();
295
296 unsafe {
298 poll.register(
299 &self.file.as_ref().unwrap().0,
300 self.interest,
301 self.mode,
302 token,
303 )?;
304 }
305
306 self.poller = Some(poll.poller().clone());
311 self.token = Some(token);
312
313 Ok(())
314 }
315
316 fn reregister(
317 &mut self,
318 poll: &mut Poll,
319 token_factory: &mut TokenFactory,
320 ) -> crate::Result<()> {
321 let token = token_factory.token();
322
323 poll.reregister(
324 &self.file.as_ref().unwrap().0,
325 self.interest,
326 self.mode,
327 token,
328 )?;
329
330 self.token = Some(token);
331 Ok(())
332 }
333
334 fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
335 poll.unregister(&self.file.as_ref().unwrap().0)?;
336 self.poller = None;
337 self.token = None;
338 Ok(())
339 }
340}
341
342#[cfg(all(unix, test))]
343mod tests {
344 use std::io::{Read, Write};
345
346 use super::Generic;
347 use crate::{Dispatcher, Interest, Mode, PostAction};
348 #[cfg(unix)]
349 #[test]
350 fn dispatch_unix() {
351 use std::os::unix::net::UnixStream;
352
353 let mut event_loop = crate::EventLoop::try_new().unwrap();
354
355 let handle = event_loop.handle();
356
357 let (mut tx, rx) = UnixStream::pair().unwrap();
358
359 let generic = Generic::new(rx, Interest::READ, Mode::Level);
360
361 let mut dispached = false;
362
363 let _generic_token = handle
364 .insert_source(generic, move |readiness, file, d| {
365 assert!(readiness.readable);
366 assert!(!readiness.writable);
368 let mut buffer = vec![0; 10];
369 let ret = (&**file).read(&mut buffer).unwrap();
370 assert_eq!(ret, 6);
371 assert_eq!(&buffer[..6], &[1, 2, 3, 4, 5, 6]);
372
373 *d = true;
374 Ok(PostAction::Continue)
375 })
376 .unwrap();
377
378 event_loop
379 .dispatch(Some(::std::time::Duration::ZERO), &mut dispached)
380 .unwrap();
381
382 assert!(!dispached);
383
384 let ret = tx.write(&[1, 2, 3, 4, 5, 6]).unwrap();
385 assert_eq!(ret, 6);
386 tx.flush().unwrap();
387
388 event_loop
389 .dispatch(Some(::std::time::Duration::ZERO), &mut dispached)
390 .unwrap();
391
392 assert!(dispached);
393 }
394
395 #[test]
396 fn register_deregister_unix() {
397 use std::os::unix::net::UnixStream;
398
399 let mut event_loop = crate::EventLoop::try_new().unwrap();
400
401 let handle = event_loop.handle();
402
403 let (mut tx, rx) = UnixStream::pair().unwrap();
404
405 let generic = Generic::new(rx, Interest::READ, Mode::Level);
406 let dispatcher = Dispatcher::new(generic, move |_, _, d| {
407 *d = true;
408 Ok(PostAction::Continue)
409 });
410
411 let mut dispached = false;
412
413 let generic_token = handle.register_dispatcher(dispatcher.clone()).unwrap();
414
415 event_loop
416 .dispatch(Some(::std::time::Duration::ZERO), &mut dispached)
417 .unwrap();
418
419 assert!(!dispached);
420
421 event_loop.handle().remove(generic_token);
424
425 let ret = tx.write(&[1, 2, 3, 4, 5, 6]).unwrap();
426 assert_eq!(ret, 6);
427 tx.flush().unwrap();
428
429 event_loop
430 .dispatch(Some(::std::time::Duration::ZERO), &mut dispached)
431 .unwrap();
432
433 assert!(!dispached);
435
436 let generic = dispatcher.into_source_inner();
438 let _generic_token = handle
439 .insert_source(generic, move |readiness, file, d| {
440 assert!(readiness.readable);
441 assert!(!readiness.writable);
443 let mut buffer = vec![0; 10];
444 let ret = (&**file).read(&mut buffer).unwrap();
445 assert_eq!(ret, 6);
446 assert_eq!(&buffer[..6], &[1, 2, 3, 4, 5, 6]);
447
448 *d = true;
449 Ok(PostAction::Continue)
450 })
451 .unwrap();
452
453 event_loop
454 .dispatch(Some(::std::time::Duration::ZERO), &mut dispached)
455 .unwrap();
456
457 assert!(dispached);
459 }
460
461 #[cfg(target_os = "linux")]
463 #[test]
464 fn duplicate_insert() {
465 use std::os::unix::{
466 io::{AsFd, BorrowedFd},
467 net::UnixStream,
468 };
469 let event_loop = crate::EventLoop::<()>::try_new().unwrap();
470
471 let handle = event_loop.handle();
472
473 let (_, rx) = UnixStream::pair().unwrap();
474
475 struct RcFd<T> {
477 rc: std::rc::Rc<T>,
478 }
479
480 impl<T: AsFd> AsFd for RcFd<T> {
481 fn as_fd(&self) -> BorrowedFd<'_> {
482 self.rc.as_fd()
483 }
484 }
485
486 let rx = std::rc::Rc::new(rx);
487
488 let token = handle
489 .insert_source(
490 Generic::new(RcFd { rc: rx.clone() }, Interest::READ, Mode::Level),
491 |_, _, _| Ok(PostAction::Continue),
492 )
493 .unwrap();
494
495 let ret = handle.insert_source(
497 Generic::new(RcFd { rc: rx.clone() }, Interest::READ, Mode::Level),
498 |_, _, _| Ok(PostAction::Continue),
499 );
500 assert!(ret.is_err());
501 std::mem::drop(ret);
502
503 handle.update(&token).unwrap();
505 }
506}