x11rb/rust_connection/
write_buffer.rs

1use std::collections::VecDeque;
2use std::io::IoSlice;
3
4use super::Stream;
5use crate::utils::RawFdContainer;
6
7#[derive(Debug)]
8pub(super) struct WriteBuffer {
9    data_buf: VecDeque<u8>,
10    fd_buf: Vec<RawFdContainer>,
11}
12
13impl WriteBuffer {
14    pub(super) fn new() -> Self {
15        // Buffer size chosen by checking what libxcb does
16        Self::with_capacity(16384)
17    }
18
19    fn with_capacity(capacity: usize) -> Self {
20        Self {
21            data_buf: VecDeque::with_capacity(capacity),
22            fd_buf: Vec::new(),
23        }
24    }
25
26    fn flush_buffer(&mut self, stream: &impl Stream) -> std::io::Result<()> {
27        while self.needs_flush() {
28            crate::trace!(
29                "Trying to flush {} bytes of data and {} FDs",
30                self.data_buf.len(),
31                self.fd_buf.len()
32            );
33            let (data_buf_1, data_buf_2) = self.data_buf.as_slices();
34            let data_bufs = [IoSlice::new(data_buf_1), IoSlice::new(data_buf_2)];
35            match stream.write_vectored(&data_bufs, &mut self.fd_buf) {
36                Ok(0) => {
37                    if self.data_buf.is_empty() {
38                        assert!(!self.fd_buf.is_empty());
39                        return Err(std::io::Error::new(
40                            std::io::ErrorKind::WriteZero,
41                            "failed to write the buffered FDs",
42                        ));
43                    } else {
44                        return Err(std::io::Error::new(
45                            std::io::ErrorKind::WriteZero,
46                            "failed to write the buffered data",
47                        ));
48                    }
49                }
50                Ok(n) => {
51                    crate::trace!("Flushing wrote {} bytes of data", n);
52                    let _ = self.data_buf.drain(..n);
53                }
54                Err(e) => return Err(e),
55            }
56        }
57        Ok(())
58    }
59
60    fn write_helper<W: Stream, F, G>(
61        &mut self,
62        stream: &W,
63        fds: &mut Vec<RawFdContainer>,
64        write_buffer: F,
65        write_inner: G,
66        first_buffer: &[u8],
67        to_write_length: usize,
68    ) -> std::io::Result<usize>
69    where
70        F: FnOnce(&mut VecDeque<u8>),
71        G: FnOnce(&W, &mut Vec<RawFdContainer>) -> std::io::Result<usize>,
72    {
73        crate::trace!(
74            "Writing {} FDs and {} bytes of data",
75            fds.len(),
76            to_write_length
77        );
78        self.fd_buf.append(fds);
79
80        // Is there enough buffer space left for this write?
81        if (self.data_buf.capacity() - self.data_buf.len()) < to_write_length {
82            // Not enough space, try to flush
83            match self.flush_buffer(stream) {
84                Ok(_) => {}
85                Err(e) => {
86                    if e.kind() == std::io::ErrorKind::WouldBlock {
87                        let available_buf = self.data_buf.capacity() - self.data_buf.len();
88                        if available_buf == 0 {
89                            // Buffer filled and cannot flush anything without
90                            // blocking, so return `WouldBlock`.
91                            crate::trace!("Writing failed due to full buffer: {:?}", e);
92                            return Err(e);
93                        } else {
94                            let n_to_write = first_buffer.len().min(available_buf);
95                            self.data_buf.extend(&first_buffer[..n_to_write]);
96                            // Return `Ok` because some or all data has been buffered,
97                            // so from the outside it is seen as a successful write.
98                            crate::trace!("Writing appended {} bytes to the buffer", n_to_write);
99                            return Ok(n_to_write);
100                        }
101                    } else {
102                        return Err(e);
103                    }
104                }
105            }
106        }
107
108        if to_write_length >= self.data_buf.capacity() {
109            // Write is larger than the buffer capacity, thus we just flushed the buffer. This
110            // means that at this point the buffer is empty. Write directly to self.inner. No data
111            // is copied into the buffer, since that would just mean that the large write gets
112            // split into multiple smaller ones.
113            assert!(self.data_buf.is_empty());
114            crate::trace!("Large write is written directly to the stream");
115            write_inner(stream, &mut self.fd_buf)
116        } else {
117            // At this point there is enough space available in the buffer.
118            crate::trace!("Data to write is appended to the buffer");
119            write_buffer(&mut self.data_buf);
120            Ok(to_write_length)
121        }
122    }
123
124    pub(super) fn write(
125        &mut self,
126        stream: &impl Stream,
127        buf: &[u8],
128        fds: &mut Vec<RawFdContainer>,
129    ) -> std::io::Result<usize> {
130        self.write_helper(
131            stream,
132            fds,
133            |w| w.extend(buf),
134            |w, fd| w.write(buf, fd),
135            buf,
136            buf.len(),
137        )
138    }
139
140    pub(super) fn write_vectored(
141        &mut self,
142        stream: &impl Stream,
143        bufs: &[IoSlice<'_>],
144        fds: &mut Vec<RawFdContainer>,
145    ) -> std::io::Result<usize> {
146        let first_nonempty = bufs
147            .iter()
148            .find(|b| !b.is_empty())
149            .map_or(&[][..], |b| &**b);
150        let total_len = bufs.iter().map(|b| b.len()).sum();
151        self.write_helper(
152            stream,
153            fds,
154            |w| {
155                for buf in bufs.iter() {
156                    w.extend(&**buf);
157                }
158            },
159            |w, fd| w.write_vectored(bufs, fd),
160            first_nonempty,
161            total_len,
162        )
163    }
164
165    /// Returns `true` if there is buffered data or FDs.
166    pub(super) fn needs_flush(&self) -> bool {
167        !self.data_buf.is_empty() || !self.fd_buf.is_empty()
168    }
169
170    pub(super) fn flush(&mut self, stream: &impl Stream) -> std::io::Result<()> {
171        self.flush_buffer(stream)
172    }
173}
174
175#[cfg(test)]
176mod test {
177    use std::io::{Error, ErrorKind, IoSlice, Result};
178
179    use super::super::{PollMode, Stream};
180    use super::WriteBuffer;
181    use crate::utils::RawFdContainer;
182
183    struct WouldBlockWriter;
184
185    impl Stream for WouldBlockWriter {
186        fn poll(&self, _mode: PollMode) -> Result<()> {
187            unimplemented!();
188        }
189
190        fn read(&self, _buf: &mut [u8], _fd_storage: &mut Vec<RawFdContainer>) -> Result<usize> {
191            unimplemented!();
192        }
193
194        fn write(&self, _buf: &[u8], _fds: &mut Vec<RawFdContainer>) -> Result<usize> {
195            Err(Error::new(ErrorKind::WouldBlock, "would block"))
196        }
197    }
198
199    // Once upon a time, this paniced because it did bufs[0]
200    #[test]
201    fn empty_write() {
202        let stream = WouldBlockWriter;
203        let mut write_buffer = WriteBuffer::new();
204        let bufs = &[];
205        let _ = write_buffer
206            .write_vectored(&stream, bufs, &mut Vec::new())
207            .unwrap();
208    }
209
210    // Once upon a time, BufWriteFD fell back to only writing the first buffer. This could be
211    // mistaken as EOF.
212    #[test]
213    fn incorrect_eof() {
214        let stream = WouldBlockWriter;
215        let mut write_buffer = WriteBuffer::with_capacity(1);
216        let bufs = &[IoSlice::new(&[]), IoSlice::new(b"fooo")];
217        match write_buffer.write_vectored(&stream, bufs, &mut Vec::new()) {
218            Ok(0) => panic!("This looks like EOF!?"),
219            Ok(_) => {}
220            Err(ref e) if e.kind() == ErrorKind::WouldBlock => {}
221            Err(e) => panic!("Unexpected error: {:?}", e),
222        }
223    }
224}