x11rb/rust_connection/
mod.rs

1//! A pure-rust implementation of a connection to an X11 server.
2
3use std::io::IoSlice;
4use std::sync::{Condvar, Mutex, MutexGuard, TryLockError};
5use std::time::Instant;
6
7use crate::connection::{
8    compute_length_field, Connection, ReplyOrError, RequestConnection, RequestKind,
9};
10use crate::cookie::{Cookie, CookieWithFds, VoidCookie};
11use crate::errors::DisplayParsingError;
12pub use crate::errors::{ConnectError, ConnectionError, ParseError, ReplyError, ReplyOrIdError};
13use crate::extension_manager::ExtensionManager;
14use crate::protocol::bigreq::{ConnectionExt as _, EnableReply};
15use crate::protocol::xproto::{Setup, GET_INPUT_FOCUS_REQUEST, QUERY_EXTENSION_REQUEST};
16use crate::utils::RawFdContainer;
17use crate::x11_utils::{ExtensionInformation, TryParse, TryParseFd};
18use x11rb_protocol::connect::Connect;
19use x11rb_protocol::connection::{Connection as ProtoConnection, PollReply, ReplyFdKind};
20use x11rb_protocol::id_allocator::IdAllocator;
21use x11rb_protocol::{xauth::get_auth, DiscardMode, RawEventAndSeqNumber, SequenceNumber};
22
23mod packet_reader;
24mod stream;
25mod write_buffer;
26
27use packet_reader::PacketReader;
28pub use stream::{DefaultStream, PollMode, Stream};
29use write_buffer::WriteBuffer;
30
31type Buffer = <RustConnection as RequestConnection>::Buf;
32/// A combination of a buffer and a list of file descriptors for use by [`RustConnection`].
33pub type BufWithFds = crate::connection::BufWithFds<Buffer>;
34
35#[derive(Debug)]
36enum MaxRequestBytes {
37    Unknown,
38    Requested(Option<SequenceNumber>),
39    Known(usize),
40}
41
42#[derive(Debug)]
43struct ConnectionInner {
44    inner: ProtoConnection,
45    write_buffer: WriteBuffer,
46}
47
48type MutexGuardInner<'a> = MutexGuard<'a, ConnectionInner>;
49
50#[derive(Debug, Copy, Clone, PartialEq, Eq)]
51pub(crate) enum BlockingMode {
52    Blocking,
53    NonBlocking,
54}
55
56/// A connection to an X11 server implemented in pure rust
57///
58/// This type is generic over `S`, which allows to use a generic stream to communicate with the
59/// server. This stream can written to and read from, but it can also be polled, meaning that one
60/// checks if new data can be read or written.
61///
62/// `RustConnection` always used an internal buffer for reading, so `R` does not need
63/// to be buffered.
64#[derive(Debug)]
65pub struct RustConnection<S: Stream = DefaultStream> {
66    inner: Mutex<ConnectionInner>,
67    stream: S,
68    // This mutex is only locked with `try_lock` (never blocks), so a simpler
69    // lock based only on a atomic variable would be more efficient.
70    packet_reader: Mutex<PacketReader>,
71    reader_condition: Condvar,
72    setup: Setup,
73    extension_manager: Mutex<ExtensionManager>,
74    maximum_request_bytes: Mutex<MaxRequestBytes>,
75    id_allocator: Mutex<IdAllocator>,
76}
77
78// Locking rules
79// =============
80//
81// To avoid deadlocks, it is important to have a defined ordering about mutexes:
82//
83// Mutexes that may be locked when no other mutex is held:
84// - maximum_request_bytes
85// - extension_manager
86// - id_allocator
87//
88// Then comes `inner`. This mutex protects the information about in-flight requests and packets
89// that were already read from the connection but not given out to callers. This mutex also
90// contains the write buffer and has to be locked in order to write something to the X11 server.
91// In this case, the mutex has to be kept locked until writing the request has finished. This is
92// necessary to ensure correct sync insertion without threads interfering with each other. When
93// this mutex is locked for operations other than writing, the lock should be kept only for a
94// short time.
95//
96// The inner level is `packet_reader`. This mutex is only locked when `inner` is already held and
97// only with `try_lock()`. This ensures that there is only one reader. While actually reading, the
98// lock on `inner` is released so that other threads can make progress. If more threads want to
99// read while `read` is already locked, they sleep on `reader_condition`. The actual reader will
100// then notify this condition variable once it is done reading.
101//
102// n.b. notgull: write_buffer follows the same rules
103//
104// The condition variable is necessary since one thread may read packets that another thread waits
105// for. Thus, after reading something from the connection, all threads that wait for something have
106// to check if they are the intended recipient.
107
108impl RustConnection<DefaultStream> {
109    /// Establish a new connection.
110    ///
111    /// If no `dpy_name` is provided, the value from `$DISPLAY` is used.
112    pub fn connect(dpy_name: Option<&str>) -> Result<(Self, usize), ConnectError> {
113        // Parse display information
114        let parsed_display = x11rb_protocol::parse_display::parse_display(dpy_name)?;
115        let screen = parsed_display.screen.into();
116
117        // Establish connection by iterating over ConnectAddresses until we find one that
118        // works.
119        let mut error = None;
120        for addr in parsed_display.connect_instruction() {
121            let start = Instant::now();
122            match DefaultStream::connect(&addr) {
123                Ok((stream, (family, address))) => {
124                    crate::trace!(
125                        "Connected to X11 server via {:?} in {:?}",
126                        addr,
127                        start.elapsed()
128                    );
129
130                    // we found a stream, get auth information
131                    let (auth_name, auth_data) = get_auth(family, &address, parsed_display.display)
132                        // Ignore all errors while determining auth; instead we just try without auth info.
133                        .unwrap_or(None)
134                        .unwrap_or_else(|| (Vec::new(), Vec::new()));
135                    crate::trace!("Picked authentication via auth mechanism {:?}", auth_name);
136
137                    // finish connecting to server
138                    return Ok((
139                        Self::connect_to_stream_with_auth_info(
140                            stream, screen, auth_name, auth_data,
141                        )?,
142                        screen,
143                    ));
144                }
145                Err(e) => {
146                    crate::debug!("Failed to connect to X11 server via {:?}: {:?}", addr, e);
147                    error = Some(e);
148                    continue;
149                }
150            }
151        }
152
153        // none of the addresses worked
154        Err(match error {
155            Some(e) => ConnectError::IoError(e),
156            None => DisplayParsingError::Unknown.into(),
157        })
158    }
159}
160
161impl<S: Stream> RustConnection<S> {
162    /// Establish a new connection to the given streams.
163    ///
164    /// `read` is used for reading data from the X11 server and `write` is used for writing.
165    /// `screen` is the number of the screen that should be used. This function checks that a
166    /// screen with that number exists.
167    pub fn connect_to_stream(stream: S, screen: usize) -> Result<Self, ConnectError> {
168        Self::connect_to_stream_with_auth_info(stream, screen, Vec::new(), Vec::new())
169    }
170
171    /// Establish a new connection to the given streams.
172    ///
173    /// `read` is used for reading data from the X11 server and `write` is used for writing.
174    /// `screen` is the number of the screen that should be used. This function checks that a
175    /// screen with that number exists.
176    ///
177    /// The parameters `auth_name` and `auth_data` are used for the members
178    /// `authorization_protocol_name` and `authorization_protocol_data` of the `SetupRequest` that
179    /// is sent to the X11 server.
180    pub fn connect_to_stream_with_auth_info(
181        stream: S,
182        screen: usize,
183        auth_name: Vec<u8>,
184        auth_data: Vec<u8>,
185    ) -> Result<Self, ConnectError> {
186        let (mut connect, setup_request) = Connect::with_authorization(auth_name, auth_data);
187
188        // write the connect() setup request
189        let mut nwritten = 0;
190        let mut fds = vec![];
191
192        crate::trace!(
193            "Writing connection setup with {} bytes",
194            setup_request.len()
195        );
196        while nwritten != setup_request.len() {
197            stream.poll(PollMode::Writable)?;
198            // poll returned successfully, so the stream is writable.
199            match stream.write(&setup_request[nwritten..], &mut fds) {
200                Ok(0) => {
201                    return Err(std::io::Error::new(
202                        std::io::ErrorKind::WriteZero,
203                        "failed to write whole buffer",
204                    )
205                    .into())
206                }
207                Ok(n) => nwritten += n,
208                // Spurious wakeup from poll, try again
209                Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {}
210                Err(e) => return Err(e.into()),
211            }
212        }
213
214        // read in the setup
215        loop {
216            stream.poll(PollMode::Readable)?;
217            crate::trace!(
218                "Reading connection setup with at least {} bytes remaining",
219                connect.buffer().len()
220            );
221            let adv = match stream.read(connect.buffer(), &mut fds) {
222                Ok(0) => {
223                    return Err(std::io::Error::new(
224                        std::io::ErrorKind::UnexpectedEof,
225                        "failed to read whole buffer",
226                    )
227                    .into())
228                }
229                Ok(n) => n,
230                // Spurious wakeup from poll, try again
231                Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => continue,
232                Err(e) => return Err(e.into()),
233            };
234            crate::trace!("Read {} bytes", adv);
235
236            // advance the internal buffer
237            if connect.advance(adv) {
238                break;
239            }
240        }
241
242        // resolve the setup
243        let setup = connect.into_setup()?;
244
245        // Check that we got a valid screen number
246        if screen >= setup.roots.len() {
247            return Err(ConnectError::InvalidScreen);
248        }
249
250        // Success! Set up our state
251        Self::for_connected_stream(stream, setup)
252    }
253
254    /// Establish a new connection for an already connected stream.
255    ///
256    /// The given `stream` is used for communicating with the X11 server.
257    /// It is assumed that `setup` was just received from the server. Thus, the first reply to a
258    /// request that is sent will have sequence number one.
259    pub fn for_connected_stream(stream: S, setup: Setup) -> Result<Self, ConnectError> {
260        let id_allocator = IdAllocator::new(setup.resource_id_base, setup.resource_id_mask)?;
261
262        Ok(RustConnection {
263            inner: Mutex::new(ConnectionInner {
264                inner: ProtoConnection::new(),
265                write_buffer: WriteBuffer::new(),
266            }),
267            stream,
268            packet_reader: Mutex::new(PacketReader::new()),
269            reader_condition: Condvar::new(),
270            setup,
271            extension_manager: Default::default(),
272            maximum_request_bytes: Mutex::new(MaxRequestBytes::Unknown),
273            id_allocator: Mutex::new(id_allocator),
274        })
275    }
276
277    /// Internal function for actually sending a request.
278    ///
279    /// This function "does the actual work" for `send_request_with_reply()` and
280    /// `send_request_without_reply()`.
281    fn send_request(
282        &self,
283        bufs: &[IoSlice<'_>],
284        fds: Vec<RawFdContainer>,
285        kind: ReplyFdKind,
286    ) -> Result<SequenceNumber, ConnectionError> {
287        let _guard = crate::debug_span!("send_request").entered();
288
289        let request_info = RequestInfo {
290            extension_manager: &self.extension_manager,
291            major_opcode: bufs[0][0],
292            minor_opcode: bufs[0][1],
293        };
294        crate::debug!("Sending {}", request_info);
295
296        let mut storage = Default::default();
297        let bufs = compute_length_field(self, bufs, &mut storage)?;
298
299        // Note: `inner` must be kept blocked until the request has been completely written
300        // or buffered to avoid sending the data of different requests interleaved. For this
301        // reason, `read_packet_and_enqueue` must always be called with `BlockingMode::NonBlocking`
302        // during a write, otherwise `inner` would be temporarily released.
303        let mut inner = self.inner.lock().unwrap();
304
305        loop {
306            let send_result = inner.inner.send_request(kind);
307            match send_result {
308                Some(seqno) => {
309                    // Now actually send the buffers
310                    let _inner = self.write_all_vectored(inner, bufs, fds)?;
311                    return Ok(seqno);
312                }
313                None => {
314                    crate::trace!("Syncing with the X11 server since there are too many outstanding void requests");
315                    inner = self.send_sync(inner)?;
316                }
317            }
318        }
319    }
320
321    /// Send a synchronisation packet to the X11 server.
322    ///
323    /// This function sends a `GetInputFocus` request to the X11 server and arranges for its reply
324    /// to be ignored. This ensures that a reply is expected (`ConnectionInner.next_reply_expected`
325    /// increases).
326    fn send_sync<'a>(
327        &'a self,
328        mut inner: MutexGuardInner<'a>,
329    ) -> Result<MutexGuardInner<'a>, std::io::Error> {
330        let length = 1u16.to_ne_bytes();
331        let request = [
332            GET_INPUT_FOCUS_REQUEST,
333            0, /* pad */
334            length[0],
335            length[1],
336        ];
337
338        let seqno = inner
339            .inner
340            .send_request(ReplyFdKind::ReplyWithoutFDs)
341            .expect("Sending a HasResponse request should not be blocked by syncs");
342        inner
343            .inner
344            .discard_reply(seqno, DiscardMode::DiscardReplyAndError);
345        let inner = self.write_all_vectored(inner, &[IoSlice::new(&request)], Vec::new())?;
346
347        Ok(inner)
348    }
349
350    /// Write a set of buffers on a `writer`. May also read packets
351    /// from the server.
352    fn write_all_vectored<'a>(
353        &'a self,
354        mut inner: MutexGuardInner<'a>,
355        mut bufs: &[IoSlice<'_>],
356        mut fds: Vec<RawFdContainer>,
357    ) -> std::io::Result<MutexGuardInner<'a>> {
358        let mut partial_buf: &[u8] = &[];
359        while !partial_buf.is_empty() || !bufs.is_empty() {
360            self.stream.poll(PollMode::ReadAndWritable)?;
361            let write_result = if !partial_buf.is_empty() {
362                // "inner" is held, passed into this function, so this should never be held
363                inner
364                    .write_buffer
365                    .write(&self.stream, partial_buf, &mut fds)
366            } else {
367                // same as above
368                inner
369                    .write_buffer
370                    .write_vectored(&self.stream, bufs, &mut fds)
371            };
372            match write_result {
373                Ok(0) => {
374                    return Err(std::io::Error::new(
375                        std::io::ErrorKind::WriteZero,
376                        "failed to write anything",
377                    ));
378                }
379                Ok(mut count) => {
380                    // Successful write
381                    if count >= partial_buf.len() {
382                        count -= partial_buf.len();
383                        partial_buf = &[];
384                    } else {
385                        partial_buf = &partial_buf[count..];
386                        count = 0;
387                    }
388                    while count > 0 {
389                        if count >= bufs[0].len() {
390                            count -= bufs[0].len();
391                        } else {
392                            partial_buf = &bufs[0][count..];
393                            count = 0;
394                        }
395                        bufs = &bufs[1..];
396                        // Skip empty slices
397                        while bufs.first().map(|s| s.len()) == Some(0) {
398                            bufs = &bufs[1..];
399                        }
400                    }
401                }
402                Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
403                    crate::trace!("Writing more data would block for now");
404                    // Writing would block, try to read instead because the
405                    // server might not accept new requests after its
406                    // buffered replies have been read.
407                    inner = self.read_packet_and_enqueue(inner, BlockingMode::NonBlocking)?;
408                }
409                Err(e) => return Err(e),
410            }
411        }
412        if !fds.is_empty() {
413            return Err(std::io::Error::new(
414                std::io::ErrorKind::Other,
415                "Left over FDs after sending the request",
416            ));
417        }
418        Ok(inner)
419    }
420
421    fn flush_impl<'a>(
422        &'a self,
423        mut inner: MutexGuardInner<'a>,
424    ) -> std::io::Result<MutexGuardInner<'a>> {
425        // n.b. notgull: inner guard is held
426        while inner.write_buffer.needs_flush() {
427            self.stream.poll(PollMode::ReadAndWritable)?;
428            let flush_result = inner.write_buffer.flush(&self.stream);
429            match flush_result {
430                // Flush completed
431                Ok(()) => break,
432                Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
433                    crate::trace!("Flushing more data would block for now");
434                    // Writing would block, try to read instead because the
435                    // server might not accept new requests after its
436                    // buffered replies have been read.
437                    inner = self.read_packet_and_enqueue(inner, BlockingMode::NonBlocking)?;
438                }
439                Err(e) => return Err(e),
440            }
441        }
442        Ok(inner)
443    }
444
445    /// Read a packet from the connection.
446    ///
447    /// This function waits for an X11 packet to be received. It drops the mutex protecting the
448    /// inner data while waiting for a packet so that other threads can make progress. For this
449    /// reason, you need to pass in a `MutexGuard` to be dropped. This function locks the mutex
450    /// again and returns a new `MutexGuard`.
451    ///
452    /// Note: If `mode` is `BlockingMode::Blocking`, the lock on `inner` will be temporarily
453    /// released. While sending a request, `inner` must be kept locked to avoid sending the data
454    /// of different requests interleaved. So, when `read_packet_and_enqueue` is called as part
455    /// of a write, it must always be done with `mode` set to `BlockingMode::NonBlocking`.
456    fn read_packet_and_enqueue<'a>(
457        &'a self,
458        mut inner: MutexGuardInner<'a>,
459        mode: BlockingMode,
460    ) -> Result<MutexGuardInner<'a>, std::io::Error> {
461        // 0.1. Try to lock the `packet_reader` mutex.
462        match self.packet_reader.try_lock() {
463            Err(TryLockError::WouldBlock) => {
464                // In non-blocking mode, we just return immediately
465                match mode {
466                    BlockingMode::NonBlocking => {
467                        crate::trace!("read_packet_and_enqueue in NonBlocking mode doing nothing since reader is already locked");
468                        return Ok(inner);
469                    }
470                    BlockingMode::Blocking => {
471                        crate::trace!("read_packet_and_enqueue in Blocking mode waiting for pre-existing reader");
472                    }
473                }
474
475                // 1.1. Someone else is reading (other thread is at 2.2);
476                // wait for it. `Condvar::wait` will unlock `inner`, so
477                // the other thread can relock `inner` at 2.1.3 (and to allow
478                // other threads to arrive 0.1).
479                //
480                // When `wait` finishes, other thread has enqueued a packet,
481                // so the purpose of this function has been fulfilled. `wait`
482                // will relock `inner` when it returns.
483                Ok(self.reader_condition.wait(inner).unwrap())
484            }
485            Err(TryLockError::Poisoned(e)) => panic!("{}", e),
486            Ok(mut packet_reader) => {
487                // Make sure sleeping readers are woken up when we return
488                // (Even in case of errors)
489                let notify_on_drop = NotifyOnDrop(&self.reader_condition);
490
491                // 2.1. Poll for read if mode is blocking.
492                if mode == BlockingMode::Blocking {
493                    // 2.1.1. Unlock `inner`, so other threads can use it while
494                    // during the poll.
495                    drop(inner);
496                    // 2.1.2. Do the actual poll
497                    self.stream.poll(PollMode::Readable)?;
498                    // 2.1.3. Relock inner
499                    inner = self.inner.lock().unwrap();
500                }
501
502                // 2.2. Try to read as many packets as possible without blocking.
503                let mut fds = Vec::new();
504                let mut packets = Vec::new();
505                packet_reader.try_read_packets(&self.stream, &mut packets, &mut fds)?;
506
507                // 2.3. Once `inner` has been relocked, drop the
508                // lock on `packet_reader`. While inner is locked, other
509                // threads cannot arrive at 0.1 anyways.
510                //
511                // `packet_reader` must be unlocked with `inner` is locked,
512                // otherwise it could let another thread wait on 2.1
513                // for a reply that has been read but not enqueued yet.
514                drop(packet_reader);
515
516                // 2.4. Actually enqueue the read packets.
517                inner.inner.enqueue_fds(fds);
518                packets
519                    .into_iter()
520                    .for_each(|packet| inner.inner.enqueue_packet(packet));
521
522                // 2.5. Notify the condvar by dropping the `notify_on_drop` object.
523                // The object would have been dropped when the function returns, so
524                // the explicit drop is not really needed. The purpose of having a
525                // explicit drop is to... make it explicit.
526                drop(notify_on_drop);
527
528                // 2.6. Return the locked `inner` back to the caller.
529                Ok(inner)
530            }
531        }
532    }
533
534    fn prefetch_maximum_request_bytes_impl(&self, max_bytes: &mut MutexGuard<'_, MaxRequestBytes>) {
535        if let MaxRequestBytes::Unknown = **max_bytes {
536            crate::info!("Prefetching maximum request length");
537            let request = self
538                .bigreq_enable()
539                .map(|cookie| cookie.into_sequence_number())
540                .ok();
541            **max_bytes = MaxRequestBytes::Requested(request);
542        }
543    }
544
545    /// Returns a reference to the contained stream.
546    pub fn stream(&self) -> &S {
547        &self.stream
548    }
549}
550
551impl<S: Stream> RequestConnection for RustConnection<S> {
552    type Buf = Vec<u8>;
553
554    fn send_request_with_reply<Reply>(
555        &self,
556        bufs: &[IoSlice<'_>],
557        fds: Vec<RawFdContainer>,
558    ) -> Result<Cookie<'_, Self, Reply>, ConnectionError>
559    where
560        Reply: TryParse,
561    {
562        Ok(Cookie::new(
563            self,
564            self.send_request(bufs, fds, ReplyFdKind::ReplyWithoutFDs)?,
565        ))
566    }
567
568    fn send_request_with_reply_with_fds<Reply>(
569        &self,
570        bufs: &[IoSlice<'_>],
571        fds: Vec<RawFdContainer>,
572    ) -> Result<CookieWithFds<'_, Self, Reply>, ConnectionError>
573    where
574        Reply: TryParseFd,
575    {
576        Ok(CookieWithFds::new(
577            self,
578            self.send_request(bufs, fds, ReplyFdKind::ReplyWithFDs)?,
579        ))
580    }
581
582    fn send_request_without_reply(
583        &self,
584        bufs: &[IoSlice<'_>],
585        fds: Vec<RawFdContainer>,
586    ) -> Result<VoidCookie<'_, Self>, ConnectionError> {
587        Ok(VoidCookie::new(
588            self,
589            self.send_request(bufs, fds, ReplyFdKind::NoReply)?,
590        ))
591    }
592
593    fn discard_reply(&self, sequence: SequenceNumber, _kind: RequestKind, mode: DiscardMode) {
594        crate::debug!(
595            "Discarding reply to request {} in mode {:?}",
596            sequence,
597            mode
598        );
599        self.inner
600            .lock()
601            .unwrap()
602            .inner
603            .discard_reply(sequence, mode);
604    }
605
606    fn prefetch_extension_information(
607        &self,
608        extension_name: &'static str,
609    ) -> Result<(), ConnectionError> {
610        self.extension_manager
611            .lock()
612            .unwrap()
613            .prefetch_extension_information(self, extension_name)
614    }
615
616    fn extension_information(
617        &self,
618        extension_name: &'static str,
619    ) -> Result<Option<ExtensionInformation>, ConnectionError> {
620        self.extension_manager
621            .lock()
622            .unwrap()
623            .extension_information(self, extension_name)
624    }
625
626    fn wait_for_reply_or_raw_error(
627        &self,
628        sequence: SequenceNumber,
629    ) -> Result<ReplyOrError<Vec<u8>>, ConnectionError> {
630        match self.wait_for_reply_with_fds_raw(sequence)? {
631            ReplyOrError::Reply((reply, _fds)) => Ok(ReplyOrError::Reply(reply)),
632            ReplyOrError::Error(e) => Ok(ReplyOrError::Error(e)),
633        }
634    }
635
636    fn wait_for_reply(&self, sequence: SequenceNumber) -> Result<Option<Vec<u8>>, ConnectionError> {
637        let _guard = crate::debug_span!("wait_for_reply", sequence).entered();
638
639        let mut inner = self.inner.lock().unwrap();
640        inner = self.flush_impl(inner)?;
641        loop {
642            crate::trace!({ sequence }, "Polling for reply");
643            let poll_result = inner.inner.poll_for_reply(sequence);
644            match poll_result {
645                PollReply::TryAgain => {}
646                PollReply::NoReply => return Ok(None),
647                PollReply::Reply(buffer) => return Ok(Some(buffer)),
648            }
649            inner = self.read_packet_and_enqueue(inner, BlockingMode::Blocking)?;
650        }
651    }
652
653    fn check_for_raw_error(
654        &self,
655        sequence: SequenceNumber,
656    ) -> Result<Option<Buffer>, ConnectionError> {
657        let _guard = crate::debug_span!("check_for_raw_error", sequence).entered();
658
659        let mut inner = self.inner.lock().unwrap();
660        if inner.inner.prepare_check_for_reply_or_error(sequence) {
661            crate::trace!("Inserting sync with the X11 server");
662            inner = self.send_sync(inner)?;
663            assert!(!inner.inner.prepare_check_for_reply_or_error(sequence));
664        }
665        // Ensure the request is sent
666        inner = self.flush_impl(inner)?;
667        loop {
668            crate::trace!({ sequence }, "Polling for reply or error");
669            let poll_result = inner.inner.poll_check_for_reply_or_error(sequence);
670            match poll_result {
671                PollReply::TryAgain => {}
672                PollReply::NoReply => return Ok(None),
673                PollReply::Reply(buffer) => return Ok(Some(buffer)),
674            }
675            inner = self.read_packet_and_enqueue(inner, BlockingMode::Blocking)?;
676        }
677    }
678
679    fn wait_for_reply_with_fds_raw(
680        &self,
681        sequence: SequenceNumber,
682    ) -> Result<ReplyOrError<BufWithFds, Buffer>, ConnectionError> {
683        let _guard = crate::debug_span!("wait_for_reply_with_fds_raw", sequence).entered();
684
685        let mut inner = self.inner.lock().unwrap();
686        // Ensure the request is sent
687        inner = self.flush_impl(inner)?;
688        loop {
689            crate::trace!({ sequence }, "Polling for reply or error");
690            if let Some(reply) = inner.inner.poll_for_reply_or_error(sequence) {
691                if reply.0[0] == 0 {
692                    crate::trace!("Got error");
693                    return Ok(ReplyOrError::Error(reply.0));
694                } else {
695                    crate::trace!("Got reply");
696                    return Ok(ReplyOrError::Reply(reply));
697                }
698            }
699            inner = self.read_packet_and_enqueue(inner, BlockingMode::Blocking)?;
700        }
701    }
702
703    fn maximum_request_bytes(&self) -> usize {
704        let mut max_bytes = self.maximum_request_bytes.lock().unwrap();
705        self.prefetch_maximum_request_bytes_impl(&mut max_bytes);
706        use MaxRequestBytes::*;
707        let max_bytes = &mut *max_bytes;
708        match max_bytes {
709            Unknown => unreachable!("We just prefetched this"),
710            Requested(seqno) => {
711                let _guard = crate::info_span!("maximum_request_bytes").entered();
712
713                let length = seqno
714                    // If prefetching the request succeeded, get a cookie
715                    .and_then(|seqno| {
716                        Cookie::<_, EnableReply>::new(self, seqno)
717                            // and then get the reply to the request
718                            .reply()
719                            .map(|reply| reply.maximum_request_length)
720                            .ok()
721                    })
722                    // If anything failed (sending the request, getting the reply), use Setup
723                    .unwrap_or_else(|| self.setup.maximum_request_length.into())
724                    // Turn the u32 into usize, using the max value in case of overflow
725                    .try_into()
726                    .unwrap_or(usize::max_value());
727                let length = length * 4;
728                *max_bytes = Known(length);
729                crate::info!("Maximum request length is {} bytes", length);
730                length
731            }
732            Known(length) => *length,
733        }
734    }
735
736    fn prefetch_maximum_request_bytes(&self) {
737        let mut max_bytes = self.maximum_request_bytes.lock().unwrap();
738        self.prefetch_maximum_request_bytes_impl(&mut max_bytes);
739    }
740
741    fn parse_error(&self, error: &[u8]) -> Result<crate::x11_utils::X11Error, ParseError> {
742        let ext_mgr = self.extension_manager.lock().unwrap();
743        crate::x11_utils::X11Error::try_parse(error, &*ext_mgr)
744    }
745
746    fn parse_event(&self, event: &[u8]) -> Result<crate::protocol::Event, ParseError> {
747        let ext_mgr = self.extension_manager.lock().unwrap();
748        crate::protocol::Event::parse(event, &*ext_mgr)
749    }
750}
751
752impl<S: Stream> Connection for RustConnection<S> {
753    fn wait_for_raw_event_with_sequence(
754        &self,
755    ) -> Result<RawEventAndSeqNumber<Vec<u8>>, ConnectionError> {
756        let _guard = crate::trace_span!("wait_for_raw_event_with_sequence").entered();
757
758        let mut inner = self.inner.lock().unwrap();
759        loop {
760            if let Some(event) = inner.inner.poll_for_event_with_sequence() {
761                return Ok(event);
762            }
763            inner = self.read_packet_and_enqueue(inner, BlockingMode::Blocking)?;
764        }
765    }
766
767    fn poll_for_raw_event_with_sequence(
768        &self,
769    ) -> Result<Option<RawEventAndSeqNumber<Vec<u8>>>, ConnectionError> {
770        let _guard = crate::trace_span!("poll_for_raw_event_with_sequence").entered();
771
772        let mut inner = self.inner.lock().unwrap();
773        if let Some(event) = inner.inner.poll_for_event_with_sequence() {
774            Ok(Some(event))
775        } else {
776            inner = self.read_packet_and_enqueue(inner, BlockingMode::NonBlocking)?;
777            Ok(inner.inner.poll_for_event_with_sequence())
778        }
779    }
780
781    fn flush(&self) -> Result<(), ConnectionError> {
782        let inner = self.inner.lock().unwrap();
783        let _inner = self.flush_impl(inner)?;
784        Ok(())
785    }
786
787    fn setup(&self) -> &Setup {
788        &self.setup
789    }
790
791    fn generate_id(&self) -> Result<u32, ReplyOrIdError> {
792        let mut id_allocator = self.id_allocator.lock().unwrap();
793        if let Some(id) = id_allocator.generate_id() {
794            Ok(id)
795        } else {
796            use crate::protocol::xc_misc::{self, ConnectionExt as _};
797
798            if self
799                .extension_information(xc_misc::X11_EXTENSION_NAME)?
800                .is_none()
801            {
802                crate::error!("XIDs are exhausted and XC-MISC extension is not available");
803                Err(ReplyOrIdError::IdsExhausted)
804            } else {
805                crate::info!("XIDs are exhausted; fetching free range via XC-MISC");
806                id_allocator.update_xid_range(&self.xc_misc_get_xid_range()?.reply()?)?;
807                id_allocator
808                    .generate_id()
809                    .ok_or(ReplyOrIdError::IdsExhausted)
810            }
811        }
812    }
813}
814
815/// Call `notify_all` on a condition variable when dropped.
816#[derive(Debug)]
817struct NotifyOnDrop<'a>(&'a Condvar);
818
819impl Drop for NotifyOnDrop<'_> {
820    fn drop(&mut self) {
821        self.0.notify_all();
822    }
823}
824
825/// Format information about a request in a Display impl
826struct RequestInfo<'a> {
827    extension_manager: &'a Mutex<ExtensionManager>,
828    major_opcode: u8,
829    minor_opcode: u8,
830}
831
832impl std::fmt::Display for RequestInfo<'_> {
833    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
834        // QueryExtension is used by the extension manager. We would deadlock if we
835        // tried to lock it again. Hence, this case is hardcoded here.
836        if self.major_opcode == QUERY_EXTENSION_REQUEST {
837            write!(f, "QueryExtension request")
838        } else {
839            let guard = self.extension_manager.lock().unwrap();
840            write!(
841                f,
842                "{} request",
843                x11rb_protocol::protocol::get_request_name(
844                    &*guard,
845                    self.major_opcode,
846                    self.minor_opcode
847                )
848            )
849        }
850    }
851}