Skip to content

Commit

Permalink
Merge pull request #819 from memorysafety/pvdrz-ring-buffer
Browse files Browse the repository at this point in the history
Use a ring buffer to pipe data between IO objects
  • Loading branch information
pvdrz authored Jan 23, 2024
2 parents 23e8b0b + d2fad50 commit 208a25b
Show file tree
Hide file tree
Showing 2 changed files with 308 additions and 62 deletions.
82 changes: 20 additions & 62 deletions src/exec/use_pty/pipe.rs → src/exec/use_pty/pipe/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
mod ring_buffer;

use std::{
io::{self, Read, Write},
marker::PhantomData,
Expand All @@ -6,6 +8,8 @@ use std::{

use crate::exec::event::{EventHandle, EventRegistry, PollEvent, Process};

use self::ring_buffer::RingBuffer;

// A pipe able to stream data bidirectionally between two read-write types.
pub(super) struct Pipe<L, R> {
left: L,
Expand Down Expand Up @@ -100,16 +104,9 @@ impl<L: Read + Write + AsRawFd, R: Read + Write + AsRawFd> Pipe<L, R> {
}
}

/// The size of the internal buffer of the pipe.
const BUFSIZE: usize = 6 * 1024;

/// A buffer that stores the bytes read from `R` before they are written to `W`.
struct Buffer<R, W> {
buffer: [u8; BUFSIZE],
/// The start of the busy section of the buffer.
start: usize,
/// The end of the busy section of the buffer.
end: usize,
internal: RingBuffer,
/// The handle for the event of the reader.
read_handle: EventHandle,
/// The handle for the event of the writer.
Expand All @@ -128,27 +125,13 @@ impl<R: Read, W: Write> Buffer<R, W> {
write_handle.ignore(registry);

Self {
buffer: [0; BUFSIZE],
start: 0,
end: 0,
internal: RingBuffer::new(),
read_handle,
write_handle,
marker: PhantomData,
}
}

/// Return true if the buffer is empty.
fn is_empty(&self) -> bool {
self.start == self.end
}

/// Return true if the buffer is full.
fn is_full(&self) -> bool {
// FIXME: This doesn't really mean that the buffer is full but it cannot be used for writes
// anyway.
self.end == BUFSIZE
}

/// Read bytes into the buffer.
///
/// Calling this function will block until `read` is ready to be read.
Expand All @@ -157,23 +140,17 @@ impl<R: Read, W: Write> Buffer<R, W> {
read: &mut R,
registry: &mut EventRegistry<T>,
) -> io::Result<()> {
// Don't read if the buffer is full.
if self.is_full() {
// If the buffer is full, there is nothing to be read.
if self.internal.is_full() {
self.read_handle.ignore(registry);
return Ok(());
}

// This is the remaining free section that follows the busy section of the buffer.
let buffer = &mut self.buffer[self.end..];
// Read bytes and insert them into the buffer.
let inserted_len = self.internal.insert(read)?;

// Read `len` bytes from `read` into the buffer.
let len = read.read(buffer)?;

// Mark the `len` bytes after the busy section as busy too.
self.end += len;

// If we read something, the buffer is not empty anymore and we can resume writing.
if len > 0 {
// If we inserted something, the buffer is not empty anymore and we can resume writing.
if inserted_len > 0 {
self.write_handle.resume(registry);
}

Expand All @@ -188,29 +165,17 @@ impl<R: Read, W: Write> Buffer<R, W> {
write: &mut W,
registry: &mut EventRegistry<T>,
) -> io::Result<()> {
// Don't write if the buffer is empty.
if self.is_empty() {
// If the buffer is empty, there is nothing to be written.
if self.internal.is_empty() {
self.write_handle.ignore(registry);
return Ok(());
}

// This is the busy section of the buffer.
let buffer = &self.buffer[self.start..self.end];
// Remove bytes from the buffer and write them.
let removed_len = self.internal.remove(write)?;

// Write the first `len` bytes of the busy section to `write`.
let len = write.write(buffer)?;

if len == buffer.len() {
// If we were able to write all the busy section, we can mark the whole buffer as free.
self.start = 0;
self.end = 0;
} else {
// Otherwise we just free the first `len` bytes of the busy section.
self.start += len;
}

// If we wrote something, the buffer is not full anymore and we can resume reading.
if len > 0 {
// If we removed something, the buffer is not full anymore and we can resume reading.
if removed_len > 0 {
self.read_handle.resume(registry);
}

Expand All @@ -219,15 +184,8 @@ impl<R: Read, W: Write> Buffer<R, W> {

/// Flush this buffer, ensuring that all the contents of its internal buffer are written.
fn flush(&mut self, write: &mut W) -> io::Result<()> {
// This is the busy section of the buffer.
let buffer = &self.buffer[self.start..self.end];

// Write the complete busy section to `write`.
write.write_all(buffer)?;

// If we were able to write all the busy section, we can mark the whole buffer as free.
self.start = 0;
self.end = 0;
// Remove bytes from the buffer and write them.
self.internal.remove(write)?;

write.flush()
}
Expand Down
Loading

0 comments on commit 208a25b

Please sign in to comment.