diff --git a/rustbus/benches/marshal_benchmark.rs b/rustbus/benches/marshal_benchmark.rs index 7dfecc2..db81f8f 100644 --- a/rustbus/benches/marshal_benchmark.rs +++ b/rustbus/benches/marshal_benchmark.rs @@ -12,10 +12,10 @@ fn marsh(msg: &rustbus::message_builder::MarshalledMessage, buf: &mut Vec) { } fn unmarshal(buf: &[u8]) { - let (hdrbytes, header) = unmarshal_header(&buf, 0).unwrap(); - let (dynhdrbytes, dynheader) = unmarshal_dynamic_header(&header, &buf, hdrbytes).unwrap(); + let (hdrbytes, header) = unmarshal_header(buf, 0).unwrap(); + let (dynhdrbytes, dynheader) = unmarshal_dynamic_header(&header, buf, hdrbytes).unwrap(); let (_, _unmarshed_msg) = - unmarshal_next_message(&header, dynheader, &buf, hdrbytes + dynhdrbytes).unwrap(); + unmarshal_next_message(&header, dynheader, buf.to_vec(), hdrbytes + dynhdrbytes).unwrap(); } fn criterion_benchmark(c: &mut Criterion) { diff --git a/rustbus/examples/conn.rs b/rustbus/examples/conn.rs index 608165a..060c2ad 100644 --- a/rustbus/examples/conn.rs +++ b/rustbus/examples/conn.rs @@ -35,10 +35,7 @@ fn main() -> Result<(), rustbus::connection::Error> { println!("\n"); let reqname_serial = rpc_con - .send_message(&mut standard_messages::request_name( - "io.killing.spark".into(), - 0, - ))? + .send_message(&mut standard_messages::request_name("io.killing.spark", 0))? .write_all() .unwrap(); @@ -73,7 +70,7 @@ fn main() -> Result<(), rustbus::connection::Error> { println!("\n"); println!("\n"); - let mut sig_listen_msg = standard_messages::add_match("type='signal'".into()); + let mut sig_listen_msg = standard_messages::add_match("type='signal'"); //println!("Send message: {:?}", sig_listen_msg); rpc_con diff --git a/rustbus/examples/dispatch.rs b/rustbus/examples/dispatch.rs index 366dcf0..591cc8a 100644 --- a/rustbus/examples/dispatch.rs +++ b/rustbus/examples/dispatch.rs @@ -70,10 +70,10 @@ fn main() { con.send_hello(rustbus::connection::Timeout::Infinite) .unwrap(); - if std::env::args().find(|arg| "server".eq(arg)).is_some() { + if std::env::args().any(|arg| "server".eq(&arg)) { con.send .send_message(&mut rustbus::standard_messages::request_name( - "killing.spark.io".into(), + "killing.spark.io", rustbus::standard_messages::DBUS_NAME_FLAG_REPLACE_EXISTING, )) .unwrap() diff --git a/rustbus/examples/fd.rs b/rustbus/examples/fd.rs index 3565aa9..45a2b99 100644 --- a/rustbus/examples/fd.rs +++ b/rustbus/examples/fd.rs @@ -20,7 +20,7 @@ fn main() -> Result<(), rustbus::connection::Error> { .write_all() .unwrap(); - con.send_message(&mut standard_messages::add_match("type='signal'".into()))? + con.send_message(&mut standard_messages::add_match("type='signal'"))? .write_all() .unwrap(); @@ -31,10 +31,9 @@ fn main() -> Result<(), rustbus::connection::Error> { .dynheader .interface .eq(&Some("io.killing.spark".to_owned())) + && signal.dynheader.member.eq(&Some("TestSignal".to_owned())) { - if signal.dynheader.member.eq(&Some("TestSignal".to_owned())) { - break signal; - } + break signal; } }; diff --git a/rustbus/examples/server.rs b/rustbus/examples/server.rs index 150414f..f5c8de8 100644 --- a/rustbus/examples/server.rs +++ b/rustbus/examples/server.rs @@ -32,10 +32,7 @@ fn main() -> Result<(), rustbus::connection::Error> { let mut rpc_con = RpcConn::session_conn(Timeout::Infinite)?; let namereq_serial = rpc_con - .send_message(&mut standard_messages::request_name( - "io.killing.spark".into(), - 0, - ))? + .send_message(&mut standard_messages::request_name("io.killing.spark", 0))? .write_all() .unwrap(); let resp = rpc_con.wait_response(namereq_serial, Timeout::Infinite)?; diff --git a/rustbus/fuzz/fuzz_targets/fuzz_unmarshal.rs b/rustbus/fuzz/fuzz_targets/fuzz_unmarshal.rs index 778e048..1c88bfe 100644 --- a/rustbus/fuzz/fuzz_targets/fuzz_unmarshal.rs +++ b/rustbus/fuzz/fuzz_targets/fuzz_unmarshal.rs @@ -17,7 +17,7 @@ fuzz_target!(|data: &[u8]| { let (_bytes_used, msg) = match rustbus::wire::unmarshal::unmarshal_next_message( &header, dynheader, - data, + data.to_vec(), hdrbytes + dynhdrbytes, ) { Ok(msg) => msg, diff --git a/rustbus/src/bin/fuzz_artifact.rs b/rustbus/src/bin/fuzz_artifact.rs index c9058da..8bdebfc 100644 --- a/rustbus/src/bin/fuzz_artifact.rs +++ b/rustbus/src/bin/fuzz_artifact.rs @@ -34,7 +34,7 @@ fn run_artifact(path: &str) { let (_bytes_used, msg) = match rustbus::wire::unmarshal::unmarshal_next_message( &header, dynheader, - data, + data.clone(), hdrbytes + dynhdrbytes, ) { Ok(msg) => msg, diff --git a/rustbus/src/connection/ll_conn.rs b/rustbus/src/connection/ll_conn.rs index d8a5d89..c88faca 100644 --- a/rustbus/src/connection/ll_conn.rs +++ b/rustbus/src/connection/ll_conn.rs @@ -33,7 +33,9 @@ pub struct RecvConn { stream: UnixStream, msg_buf_in: Vec, + msg_buf_filled: usize, cmsgs_in: Vec, + cmsgspace: Vec, } pub struct DuplexConn { @@ -56,14 +58,13 @@ impl RecvConn { /// Reads from the source once but takes care that the internal buffer only reaches at maximum max_buffer_size /// so we can process messages separatly and avoid leaking file descriptors to wrong messages fn refill_buffer(&mut self, max_buffer_size: usize, timeout: Timeout) -> Result<()> { - let bytes_to_read = max_buffer_size - self.msg_buf_in.len(); - - const BUFSIZE: usize = 512; - let mut tmpbuf = [0u8; BUFSIZE]; + if self.msg_buf_in.len() != max_buffer_size { + self.msg_buf_in.resize(max_buffer_size, 0); + } - let iovec = IoSliceMut::new(&mut tmpbuf[..usize::min(bytes_to_read, BUFSIZE)]); + let iovec = IoSliceMut::new(&mut self.msg_buf_in[self.msg_buf_filled..max_buffer_size]); - let mut cmsgspace = cmsg_space!([RawFd; 10]); + self.cmsgspace.clear(); let flags = MsgFlags::empty(); let old_timeout = self.stream.read_timeout()?; @@ -82,7 +83,7 @@ impl RecvConn { let msg = recvmsg::( self.stream.as_raw_fd(), iovec_mut, - Some(&mut cmsgspace), + Some(&mut self.cmsgspace), flags, ) .map_err(|e| match e { @@ -101,19 +102,18 @@ impl RecvConn { self.cmsgs_in.extend(msg.cmsgs()); let bytes = msg.bytes; - self.msg_buf_in.extend_from_slice(&tmpbuf[..bytes]); + self.msg_buf_filled += bytes; Ok(()) } pub fn bytes_needed_for_current_message(&self) -> Result { - if self.msg_buf_in.len() < 16 { + if self.msg_buf_filled < 16 { return Ok(16); } - let (_, header) = unmarshal::unmarshal_header(&self.msg_buf_in, 0)?; - let (_, header_fields_len) = crate::wire::util::parse_u32( - &self.msg_buf_in[unmarshal::HEADER_LEN..], - header.byteorder, - )?; + let msg_buf_in = &self.msg_buf_in[..self.msg_buf_filled]; + let (_, header) = unmarshal::unmarshal_header(msg_buf_in, 0)?; + let (_, header_fields_len) = + crate::wire::util::parse_u32(&msg_buf_in[unmarshal::HEADER_LEN..], header.byteorder)?; let complete_header_size = unmarshal::HEADER_LEN + header_fields_len as usize + 4; // +4 because the length of the header fields does not count let padding_between_header_and_body = 8 - ((complete_header_size) % 8); @@ -130,7 +130,7 @@ impl RecvConn { // Checks if the internal buffer currently holds a complete message pub fn buffer_contains_whole_message(&self) -> Result { - if self.msg_buf_in.len() < 16 { + if self.msg_buf_filled < 16 { return Ok(false); } let bytes_needed = self.bytes_needed_for_current_message(); @@ -142,7 +142,7 @@ impl RecvConn { Err(e) } } - Ok(bytes_needed) => Ok(self.msg_buf_in.len() >= bytes_needed), + Ok(bytes_needed) => Ok(self.msg_buf_filled >= bytes_needed), } } /// Blocks until a message has been read from the conn or the timeout has been reached @@ -170,6 +170,7 @@ impl RecvConn { /// Blocks until a message has been read from the conn or the timeout has been reached pub fn get_next_message(&mut self, timeout: Timeout) -> Result { self.read_whole_message(timeout)?; + debug_assert_eq!(self.msg_buf_filled, self.msg_buf_in.len()); let (hdrbytes, header) = unmarshal::unmarshal_header(&self.msg_buf_in, 0)?; let (dynhdrbytes, dynheader) = unmarshal::unmarshal_dynamic_header(&header, &self.msg_buf_in, hdrbytes)?; @@ -177,14 +178,14 @@ impl RecvConn { let (bytes_used, mut msg) = unmarshal::unmarshal_next_message( &header, dynheader, - &self.msg_buf_in, + std::mem::take(&mut self.msg_buf_in), hdrbytes + dynhdrbytes, )?; - if self.msg_buf_in.len() != bytes_used + hdrbytes + dynhdrbytes { + if self.msg_buf_filled != bytes_used + hdrbytes + dynhdrbytes { return Err(Error::UnmarshalError(UnmarshalError::NotAllBytesUsed)); } - self.msg_buf_in.clear(); + self.msg_buf_filled = 0; for cmsg in &self.cmsgs_in { match cmsg { @@ -481,7 +482,9 @@ impl DuplexConn { }, recv: RecvConn { msg_buf_in: Vec::new(), + msg_buf_filled: 0, cmsgs_in: Vec::new(), + cmsgspace: cmsg_space!([RawFd; 10]), stream, }, }) diff --git a/rustbus/src/tests.rs b/rustbus/src/tests.rs index 84d476c..30e3586 100644 --- a/rustbus/src/tests.rs +++ b/rustbus/src/tests.rs @@ -48,7 +48,8 @@ fn test_marshal_unmarshal() { let headers_plus_padding = hdrbytes + dynhdrbytes + (8 - ((hdrbytes + dynhdrbytes) % 8)); assert_eq!(headers_plus_padding, buf.len()); - let (_, unmarshed_msg) = unmarshal_next_message(&header, dynheader, msg.get_buf(), 0).unwrap(); + let (_, unmarshed_msg) = + unmarshal_next_message(&header, dynheader, msg.get_buf().to_vec(), 0).unwrap(); let msg = unmarshed_msg.unmarshall_all().unwrap(); diff --git a/rustbus/src/tests/dbus_send.rs b/rustbus/src/tests/dbus_send.rs index 3463abf..8bf578b 100644 --- a/rustbus/src/tests/dbus_send.rs +++ b/rustbus/src/tests/dbus_send.rs @@ -33,7 +33,7 @@ fn test_dbus_send_comp() -> Result<(), crate::connection::Error> { // Request name let reqname_serial = rpc_con .send_message(&mut standard_messages::request_name( - "io.killing.spark.dbustest".into(), + "io.killing.spark.dbustest", 0, ))? .write_all() @@ -44,7 +44,7 @@ fn test_dbus_send_comp() -> Result<(), crate::connection::Error> { )?; let sig_serial = rpc_con - .send_message(&mut standard_messages::add_match("type='signal'".into()))? + .send_message(&mut standard_messages::add_match("type='signal'"))? .write_all() .map_err(force_finish_on_error)?; let _msg = rpc_con.wait_response( @@ -53,7 +53,7 @@ fn test_dbus_send_comp() -> Result<(), crate::connection::Error> { )?; std::process::Command::new("dbus-send") - .args(&[ + .args([ "--dest=io.killing.spark.dbustest", "/", "io.killing.spark.dbustest.Member", @@ -64,7 +64,7 @@ fn test_dbus_send_comp() -> Result<(), crate::connection::Error> { .unwrap(); std::process::Command::new("dbus-send") - .args(&[ + .args([ "--dest=io.killing.spark.dbustest", "/", "io.killing.spark.dbustest.Member", @@ -76,7 +76,7 @@ fn test_dbus_send_comp() -> Result<(), crate::connection::Error> { .unwrap(); std::process::Command::new("dbus-send") - .args(&[ + .args([ "--dest=io.killing.spark.dbustest", "/", "io.killing.spark.dbustest.Member", @@ -88,7 +88,7 @@ fn test_dbus_send_comp() -> Result<(), crate::connection::Error> { .unwrap(); std::process::Command::new("dbus-send") - .args(&[ + .args([ "--dest=io.killing.spark.dbustest", "/", "io.killing.spark.dbustest.Member", @@ -100,7 +100,7 @@ fn test_dbus_send_comp() -> Result<(), crate::connection::Error> { .unwrap(); std::process::Command::new("dbus-send") - .args(&[ + .args([ "--dest=io.killing.spark.dbustest", "/", "io.killing.spark.dbustest.Member", @@ -116,7 +116,7 @@ fn test_dbus_send_comp() -> Result<(), crate::connection::Error> { .unwrap(); std::process::Command::new("dbus-send") - .args(&[ + .args([ "--dest=io.killing.spark.dbustest", "/", "io.killing.spark.dbustest.Member", diff --git a/rustbus/src/tests/fdpassing.rs b/rustbus/src/tests/fdpassing.rs index 87f3934..b75bc5c 100644 --- a/rustbus/src/tests/fdpassing.rs +++ b/rustbus/src/tests/fdpassing.rs @@ -20,12 +20,10 @@ fn test_fd_passing() { .unwrap() .write_all() .unwrap(); - con2.send_message(&mut crate::standard_messages::add_match( - "type='signal'".into(), - )) - .unwrap() - .write_all() - .unwrap(); + con2.send_message(&mut crate::standard_messages::add_match("type='signal'")) + .unwrap() + .write_all() + .unwrap(); std::thread::sleep(std::time::Duration::from_secs(1)); @@ -39,10 +37,9 @@ fn test_fd_passing() { .dynheader .interface .eq(&Some("io.killing.spark".to_owned())) + && signal.dynheader.member.eq(&Some("TestSignal".to_owned())) { - if signal.dynheader.member.eq(&Some("TestSignal".to_owned())) { - break signal; - } + break signal; } }; diff --git a/rustbus/src/tests/verify_marshalling.rs b/rustbus/src/tests/verify_marshalling.rs index 9740408..4d939c9 100644 --- a/rustbus/src/tests/verify_marshalling.rs +++ b/rustbus/src/tests/verify_marshalling.rs @@ -244,7 +244,7 @@ fn verify_dict_marshalling() { byteorder: ByteOrder::LittleEndian, }; let ctx = &mut ctx; - (&map).marshal(ctx).unwrap(); + map.marshal(ctx).unwrap(); assert_eq!( ctx.buf, // Note the longer \0 chain after the length. This is the needed padding after the u32 length and the dict-entry diff --git a/rustbus/src/wire/unmarshal.rs b/rustbus/src/wire/unmarshal.rs index a3d74e6..07d765f 100644 --- a/rustbus/src/wire/unmarshal.rs +++ b/rustbus/src/wire/unmarshal.rs @@ -148,11 +148,11 @@ pub fn unmarshal_body<'a, 'e>( pub fn unmarshal_next_message( header: &Header, dynheader: DynamicHeader, - buf: &[u8], + mut buf: Vec, offset: usize, ) -> UnmarshalResult { let sig = dynheader.signature.clone().unwrap_or_else(|| "".to_owned()); - let padding = align_offset(8, buf, offset)?; + let padding = align_offset(8, &buf, offset)?; if header.body_len == 0 { let msg = MarshalledMessage { @@ -169,14 +169,12 @@ pub fn unmarshal_next_message( return Err(UnmarshalError::NotEnoughBytes); } + // TODO: keep the offset around instead of shifting the bytes. + drop(buf.drain(..offset)); + let msg = MarshalledMessage { dynheader, - body: MarshalledMessageBody::from_parts( - buf[offset..].to_vec(), - vec![], - sig, - header.byteorder, - ), + body: MarshalledMessageBody::from_parts(buf, vec![], sig, header.byteorder), typ: header.typ, flags: header.flags, }; diff --git a/rustbus/src/wire/unmarshal/traits.rs b/rustbus/src/wire/unmarshal/traits.rs index cca707e..4cc3c62 100644 --- a/rustbus/src/wire/unmarshal/traits.rs +++ b/rustbus/src/wire/unmarshal/traits.rs @@ -169,9 +169,9 @@ mod test { // annotate the receiver with a type &str to unmarshal a &str "ABCD".marshal(ctx).unwrap(); let _s: &str = unmarshal(&mut UnmarshalContext { - buf: &ctx.buf, + buf: ctx.buf, byteorder: ctx.byteorder, - fds: &ctx.fds, + fds: ctx.fds, offset: 0, }) .unwrap() @@ -181,9 +181,9 @@ mod test { ctx.buf.clear(); true.marshal(ctx).unwrap(); let _b: bool = unmarshal(&mut UnmarshalContext { - buf: &ctx.buf, + buf: ctx.buf, byteorder: ctx.byteorder, - fds: &ctx.fds, + fds: ctx.fds, offset: 0, }) .unwrap() @@ -193,9 +193,9 @@ mod test { ctx.buf.clear(); 0i32.marshal(ctx).unwrap(); let _i = unmarshal::(&mut UnmarshalContext { - buf: &ctx.buf, + buf: ctx.buf, byteorder: ctx.byteorder, - fds: &ctx.fds, + fds: ctx.fds, offset: 0, }) .unwrap() @@ -206,9 +206,9 @@ mod test { fn x(_arg: (i32, i32, &str)) {} (0, 0, "ABCD").marshal(ctx).unwrap(); let arg = unmarshal(&mut UnmarshalContext { - buf: &ctx.buf, + buf: ctx.buf, byteorder: ctx.byteorder, - fds: &ctx.fds, + fds: ctx.fds, offset: 0, }) .unwrap() @@ -480,10 +480,7 @@ mod test { SignatureWrapper::new("sy").unwrap(), parser.get::().unwrap().get().unwrap() ); - assert_eq!( - true, - parser.get::().unwrap().get::().unwrap() - ); + assert!(parser.get::().unwrap().get::().unwrap()); // check Array of variants let var_vec: Vec = parser.get().unwrap(); @@ -499,7 +496,7 @@ mod test { SignatureWrapper::new("sy").unwrap(), var_vec[8].get().unwrap() ); - assert_eq!(true, var_vec[9].get::().unwrap()); + assert!(var_vec[9].get::().unwrap()); // check Dict of {String, variants} let var_map: HashMap = parser.get().unwrap(); @@ -515,6 +512,6 @@ mod test { SignatureWrapper::new("sy").unwrap(), var_map["8"].get().unwrap() ); - assert_eq!(true, var_map["9"].get::().unwrap()); + assert!(var_map["9"].get::().unwrap()); } } diff --git a/rustbus/src/wire/variant_macros.rs b/rustbus/src/wire/variant_macros.rs index ac77403..9e2f822 100644 --- a/rustbus/src/wire/variant_macros.rs +++ b/rustbus/src/wire/variant_macros.rs @@ -157,8 +157,8 @@ fn test_variant_sig_macro() { crate::message_builder::marshal_as_variant( 0xFFFFu64, crate::ByteOrder::LittleEndian, - &mut ctx.buf, - &mut ctx.fds, + ctx.buf, + ctx.fds, ) .unwrap(); @@ -238,8 +238,8 @@ fn test_variant_sig_macro() { crate::message_builder::marshal_as_variant( ("", "", 100u8), crate::ByteOrder::LittleEndian, - &mut ctx.buf, - &mut ctx.fds, + ctx.buf, + ctx.fds, ) .unwrap(); let (_bytes, uv) = ::unmarshal(&mut UnmarshalContext { @@ -413,8 +413,8 @@ fn test_variant_var_macro() { crate::message_builder::marshal_as_variant( 0xFFFFu64, crate::ByteOrder::LittleEndian, - &mut ctx.buf, - &mut ctx.fds, + ctx.buf, + ctx.fds, ) .unwrap(); @@ -466,14 +466,11 @@ fn test_variant_var_macro() { dbus_variant_var!(MyVariant2, CaseMap => Map<'fds, 'buf>; CaseStruct => Struct<'fds, 'buf>); let mut map = Map::new(); - map.insert( - "AAAA".into(), - (100, 20, (300, MyVariant::String("BBBB".into()))), - ); + map.insert("AAAA".into(), (100, 20, (300, MyVariant::String("BBBB")))); map.insert("CCCC".into(), (400, 50, (600, MyVariant::V2(0)))); map.insert("DDDD".into(), (500, 60, (700, MyVariant::Integer(10)))); let v1 = MyVariant2::CaseMap(map); - let v2 = MyVariant2::CaseStruct((10, 20, MyVariant::String("AAAAA".into()))); + let v2 = MyVariant2::CaseStruct((10, 20, MyVariant::String("AAAAA"))); let v3 = MyVariant2::CaseStruct((30, 40, MyVariant::V2(10))); let v4 = MyVariant2::CaseStruct((30, 40, MyVariant::Integer(20))); @@ -567,8 +564,8 @@ fn test_variant_var_macro() { crate::message_builder::marshal_as_variant( ("testtext", "moretesttext", 100u8), crate::ByteOrder::LittleEndian, - &mut ctx.buf, - &mut ctx.fds, + ctx.buf, + ctx.fds, ) .unwrap(); let (_bytes, uv) = ::unmarshal(&mut UnmarshalContext { diff --git a/rustbus_derive_test/src/lib.rs b/rustbus_derive_test/src/lib.rs index 6a50808..5d28ae5 100644 --- a/rustbus_derive_test/src/lib.rs +++ b/rustbus_derive_test/src/lib.rs @@ -102,7 +102,7 @@ pub fn test_enum_derive() { } let v1 = Variant1::A("ABCD".into()); - let v2 = Variant1::B("ABCD".into(), "EFGH".into()); + let v2 = Variant1::B("ABCD", "EFGH".into()); let v3 = Variant1::C { c1: "ABCD".into(), c2: "EFGH".into(),