Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: KillingSpark/rustbus
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 6feb4453830f8d15ec5b9daf4829b6dfd813a4ae
Choose a base ref
..
head repository: KillingSpark/rustbus
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 0b80e385941567f4fbef1aeadb7c30e9a7720c18
Choose a head ref
Showing with 14 additions and 13 deletions.
  1. +14 −13 rustbus/src/connection/ll_conn.rs
27 changes: 14 additions & 13 deletions rustbus/src/connection/ll_conn.rs
Original file line number Diff line number Diff line change
@@ -33,6 +33,7 @@ pub struct RecvConn {
stream: UnixStream,

msg_buf_in: Vec<u8>,
msg_buf_filled: usize,
cmsgs_in: Vec<ControlMessageOwned>,
cmsgspace: Vec<u8>,
}
@@ -57,12 +58,11 @@ impl RecvConn {
/// Reads from the source once but takes care that the internal buffer only reaches at maximum max_buffer_size
/// so we can process messages separatly and avoid leaking file descriptors to wrong messages
fn refill_buffer(&mut self, max_buffer_size: usize, timeout: Timeout) -> Result<()> {
let old_buf_size = self.msg_buf_in.len();
if self.msg_buf_in.len() != max_buffer_size {
self.msg_buf_in.resize(max_buffer_size, 0);
}

let iovec = IoSliceMut::new(&mut self.msg_buf_in[old_buf_size..]);
let iovec = IoSliceMut::new(&mut self.msg_buf_in[self.msg_buf_filled..max_buffer_size]);

self.cmsgspace.clear();
let flags = MsgFlags::empty();
@@ -102,19 +102,18 @@ impl RecvConn {

self.cmsgs_in.extend(msg.cmsgs());
let bytes = msg.bytes;
self.msg_buf_in.resize(old_buf_size + bytes, 0);
self.msg_buf_filled += bytes;
Ok(())
}

pub fn bytes_needed_for_current_message(&self) -> Result<usize> {
if self.msg_buf_in.len() < 16 {
if self.msg_buf_filled < 16 {
return Ok(16);
}
let (_, header) = unmarshal::unmarshal_header(&self.msg_buf_in, 0)?;
let (_, header_fields_len) = crate::wire::util::parse_u32(
&self.msg_buf_in[unmarshal::HEADER_LEN..],
header.byteorder,
)?;
let msg_buf_in = &self.msg_buf_in[..self.msg_buf_filled];
let (_, header) = unmarshal::unmarshal_header(msg_buf_in, 0)?;
let (_, header_fields_len) =
crate::wire::util::parse_u32(&msg_buf_in[unmarshal::HEADER_LEN..], header.byteorder)?;
let complete_header_size = unmarshal::HEADER_LEN + header_fields_len as usize + 4; // +4 because the length of the header fields does not count

let padding_between_header_and_body = 8 - ((complete_header_size) % 8);
@@ -131,7 +130,7 @@ impl RecvConn {

// Checks if the internal buffer currently holds a complete message
pub fn buffer_contains_whole_message(&self) -> Result<bool> {
if self.msg_buf_in.len() < 16 {
if self.msg_buf_filled < 16 {
return Ok(false);
}
let bytes_needed = self.bytes_needed_for_current_message();
@@ -143,7 +142,7 @@ impl RecvConn {
Err(e)
}
}
Ok(bytes_needed) => Ok(self.msg_buf_in.len() >= bytes_needed),
Ok(bytes_needed) => Ok(self.msg_buf_filled >= bytes_needed),
}
}
/// Blocks until a message has been read from the conn or the timeout has been reached
@@ -171,21 +170,22 @@ impl RecvConn {
/// Blocks until a message has been read from the conn or the timeout has been reached
pub fn get_next_message(&mut self, timeout: Timeout) -> Result<MarshalledMessage> {
self.read_whole_message(timeout)?;
debug_assert_eq!(self.msg_buf_filled, self.msg_buf_in.len());
let (hdrbytes, header) = unmarshal::unmarshal_header(&self.msg_buf_in, 0)?;
let (dynhdrbytes, dynheader) =
unmarshal::unmarshal_dynamic_header(&header, &self.msg_buf_in, hdrbytes)?;

let buf_size = self.msg_buf_in.len();
let (bytes_used, mut msg) = unmarshal::unmarshal_next_message(
&header,
dynheader,
std::mem::take(&mut self.msg_buf_in),
hdrbytes + dynhdrbytes,
)?;

if buf_size != bytes_used + hdrbytes + dynhdrbytes {
if self.msg_buf_filled != bytes_used + hdrbytes + dynhdrbytes {
return Err(Error::UnmarshalError(UnmarshalError::NotAllBytesUsed));
}
self.msg_buf_filled = 0;

for cmsg in &self.cmsgs_in {
match cmsg {
@@ -482,6 +482,7 @@ impl DuplexConn {
},
recv: RecvConn {
msg_buf_in: Vec::new(),
msg_buf_filled: 0,
cmsgs_in: Vec::new(),
cmsgspace: cmsg_space!([RawFd; 10]),
stream,