From c034ef398efef278188a609ebdaf3bec01e6ca0c Mon Sep 17 00:00:00 2001 From: Jeff Hiner Date: Thu, 2 Jan 2020 12:27:19 -0700 Subject: [PATCH 1/5] Initial 2015 deprecation fixes --- examples/pub-client.rs | 4 ++-- src/packet/mod.rs | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/examples/pub-client.rs b/examples/pub-client.rs index 5216cad..664db0e 100644 --- a/examples/pub-client.rs +++ b/examples/pub-client.rs @@ -147,11 +147,11 @@ fn main() { let mut line = String::new(); stdin.read_line(&mut line).unwrap(); - if line.trim_right() == "" { + if line.trim_end() == "" { continue; } - let message = format!("{}: {}", user_name, line.trim_right()); + let message = format!("{}: {}", user_name, line.trim_end()); for chan in &channels { let publish_packet = PublishPacket::new(chan.clone(), QoSWithPacketIdentifier::Level0, message.clone()); diff --git a/src/packet/mod.rs b/src/packet/mod.rs index 3fd5d9a..bbec48f 100644 --- a/src/packet/mod.rs +++ b/src/packet/mod.rs @@ -298,13 +298,13 @@ macro_rules! impl_variable_packet { Err(FixedHeaderError::Unrecognized(code, length)) => { let reader = &mut reader.take(length as u64); let mut buf = Vec::with_capacity(length as usize); - try!(reader.read_to_end(&mut buf)); + reader.read_to_end(&mut buf)?; return Err(VariablePacketError::UnrecognizedPacket(code, buf)); }, Err(FixedHeaderError::ReservedType(code, length)) => { let reader = &mut reader.take(length as u64); let mut buf = Vec::with_capacity(length as usize); - try!(reader.read_to_end(&mut buf)); + reader.read_to_end(&mut buf)?; return Err(VariablePacketError::ReservedPacket(code, buf)); }, Err(err) => return Err(From::from(err)) @@ -316,7 +316,7 @@ macro_rules! impl_variable_packet { match fixed_header.packet_type.control_type { $( ControlType::$hdr => { - let pk = try!(<$name as Packet>::decode_packet(reader, fixed_header)); + let pk = <$name as Packet>::decode_packet(reader, fixed_header)?; Ok(VariablePacket::$name(pk)) } )+ From 39f3bc2e5dc86b2ec5b7d03b0418694ccd9a7f92 Mon Sep 17 00:00:00 2001 From: Jeff Hiner Date: Thu, 2 Jan 2020 12:31:24 -0700 Subject: [PATCH 2/5] Update to 2018 edition --- Cargo.toml | 1 + src/control/fixed_header.rs | 8 ++++---- src/control/variable_header/connect_ack_flags.rs | 4 ++-- src/control/variable_header/connect_flags.rs | 4 ++-- src/control/variable_header/connect_ret_code.rs | 4 ++-- src/control/variable_header/keep_alive.rs | 4 ++-- src/control/variable_header/mod.rs | 4 ++-- src/control/variable_header/packet_identifier.rs | 4 ++-- src/control/variable_header/protocol_level.rs | 4 ++-- src/control/variable_header/protocol_name.rs | 4 ++-- src/control/variable_header/topic_name.rs | 6 +++--- src/lib.rs | 8 -------- src/packet/connack.rs | 12 ++++++------ src/packet/connect.rs | 16 ++++++++-------- src/packet/disconnect.rs | 4 ++-- src/packet/mod.rs | 16 ++++++++-------- src/packet/pingreq.rs | 4 ++-- src/packet/pingresp.rs | 4 ++-- src/packet/puback.rs | 8 ++++---- src/packet/pubcomp.rs | 8 ++++---- src/packet/publish.rs | 16 ++++++++-------- src/packet/pubrec.rs | 8 ++++---- src/packet/pubrel.rs | 8 ++++---- src/packet/suback.rs | 10 +++++----- src/packet/subscribe.rs | 12 ++++++------ src/packet/unsuback.rs | 8 ++++---- src/packet/unsubscribe.rs | 12 ++++++------ src/qos.rs | 2 +- src/topic_filter.rs | 7 ++++--- src/topic_name.rs | 5 +++-- 30 files changed, 105 insertions(+), 110 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e6daf3a..f4906cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ description = "MQTT Protocol Library" keywords = ["mqtt", "protocol"] repository = "https://github.com/zonyitoo/mqtt-rs" documentation = "https://docs.rs/mqtt-protocol" +edition = "2018" [dependencies] byteorder = "1.2" diff --git a/src/control/fixed_header.rs b/src/control/fixed_header.rs index 4b2cb7c..c8baddc 100644 --- a/src/control/fixed_header.rs +++ b/src/control/fixed_header.rs @@ -10,8 +10,8 @@ use byteorder::{ReadBytesExt, WriteBytesExt}; use futures::{future, Future}; use tokio_io::{io as async_io, AsyncRead}; -use {Decodable, Encodable}; -use control::packet_type::{PacketType, PacketTypeError}; +use crate::{Decodable, Encodable}; +use crate::control::packet_type::{PacketType, PacketTypeError}; /// Fixed header for each MQTT control packet /// @@ -214,8 +214,8 @@ impl Error for FixedHeaderError { mod test { use super::*; - use {Decodable, Encodable}; - use control::packet_type::{ControlType, PacketType}; + use crate::{Decodable, Encodable}; + use crate::control::packet_type::{ControlType, PacketType}; use std::io::Cursor; #[test] diff --git a/src/control/variable_header/connect_ack_flags.rs b/src/control/variable_header/connect_ack_flags.rs index 8163d12..44ad572 100644 --- a/src/control/variable_header/connect_ack_flags.rs +++ b/src/control/variable_header/connect_ack_flags.rs @@ -3,8 +3,8 @@ use std::io::{Read, Write}; use byteorder::{ReadBytesExt, WriteBytesExt}; -use {Decodable, Encodable}; -use control::variable_header::VariableHeaderError; +use crate::{Decodable, Encodable}; +use crate::control::variable_header::VariableHeaderError; /// Flags in `CONNACK` packet #[derive(Debug, Eq, PartialEq, Copy, Clone)] diff --git a/src/control/variable_header/connect_flags.rs b/src/control/variable_header/connect_flags.rs index 1fbc11a..74ece49 100644 --- a/src/control/variable_header/connect_flags.rs +++ b/src/control/variable_header/connect_flags.rs @@ -3,8 +3,8 @@ use std::io::{Read, Write}; use byteorder::{ReadBytesExt, WriteBytesExt}; -use {Decodable, Encodable}; -use control::variable_header::VariableHeaderError; +use crate::{Decodable, Encodable}; +use crate::control::variable_header::VariableHeaderError; /// Flags for `CONNECT` packet #[derive(Debug, Eq, PartialEq, Copy, Clone)] diff --git a/src/control/variable_header/connect_ret_code.rs b/src/control/variable_header/connect_ret_code.rs index 432f792..a85a60e 100644 --- a/src/control/variable_header/connect_ret_code.rs +++ b/src/control/variable_header/connect_ret_code.rs @@ -3,8 +3,8 @@ use std::io::{Read, Write}; use byteorder::{ReadBytesExt, WriteBytesExt}; -use {Decodable, Encodable}; -use control::variable_header::VariableHeaderError; +use crate::{Decodable, Encodable}; +use crate::control::variable_header::VariableHeaderError; pub const CONNECTION_ACCEPTED: u8 = 0x00; pub const UNACCEPTABLE_PROTOCOL_VERSION: u8 = 0x01; diff --git a/src/control/variable_header/keep_alive.rs b/src/control/variable_header/keep_alive.rs index dab9b54..102fc72 100644 --- a/src/control/variable_header/keep_alive.rs +++ b/src/control/variable_header/keep_alive.rs @@ -3,8 +3,8 @@ use std::io::{Read, Write}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; -use {Decodable, Encodable}; -use control::variable_header::VariableHeaderError; +use crate::{Decodable, Encodable}; +use crate::control::variable_header::VariableHeaderError; /// Keep alive time interval #[derive(Debug, Eq, PartialEq, Copy, Clone)] diff --git a/src/control/variable_header/mod.rs b/src/control/variable_header/mod.rs index 3f4d7b0..1e425a9 100644 --- a/src/control/variable_header/mod.rs +++ b/src/control/variable_header/mod.rs @@ -6,8 +6,8 @@ use std::fmt; use std::io; use std::string::FromUtf8Error; -use encodable::StringEncodeError; -use topic_name::TopicNameError; +use crate::encodable::StringEncodeError; +use crate::topic_name::TopicNameError; pub use self::connect_ack_flags::ConnackFlags; pub use self::connect_flags::ConnectFlags; diff --git a/src/control/variable_header/packet_identifier.rs b/src/control/variable_header/packet_identifier.rs index fbe5a49..99dba39 100644 --- a/src/control/variable_header/packet_identifier.rs +++ b/src/control/variable_header/packet_identifier.rs @@ -3,8 +3,8 @@ use std::io::{Read, Write}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; -use {Decodable, Encodable}; -use control::variable_header::VariableHeaderError; +use crate::{Decodable, Encodable}; +use crate::control::variable_header::VariableHeaderError; /// Packet identifier #[derive(Debug, Eq, PartialEq, Copy, Clone)] diff --git a/src/control/variable_header/protocol_level.rs b/src/control/variable_header/protocol_level.rs index 44363d1..ff878e7 100644 --- a/src/control/variable_header/protocol_level.rs +++ b/src/control/variable_header/protocol_level.rs @@ -5,8 +5,8 @@ use std::io::{Read, Write}; use byteorder::{ReadBytesExt, WriteBytesExt}; -use {Decodable, Encodable}; -use control::variable_header::VariableHeaderError; +use crate::{Decodable, Encodable}; +use crate::control::variable_header::VariableHeaderError; pub const SPEC_3_1_1: u8 = 0x04; diff --git a/src/control/variable_header/protocol_name.rs b/src/control/variable_header/protocol_name.rs index 5021e01..71e3863 100644 --- a/src/control/variable_header/protocol_name.rs +++ b/src/control/variable_header/protocol_name.rs @@ -1,8 +1,8 @@ use std::convert::From; use std::io::{Read, Write}; -use {Decodable, Encodable}; -use control::variable_header::VariableHeaderError; +use crate::{Decodable, Encodable}; +use crate::control::variable_header::VariableHeaderError; /// Protocol name in variable header /// diff --git a/src/control/variable_header/topic_name.rs b/src/control/variable_header/topic_name.rs index eef1d6a..a875af0 100644 --- a/src/control/variable_header/topic_name.rs +++ b/src/control/variable_header/topic_name.rs @@ -1,9 +1,9 @@ use std::convert::{From, Into}; use std::io::{Read, Write}; -use {Decodable, Encodable}; -use control::variable_header::VariableHeaderError; -use topic_name::TopicName; +use crate::{Decodable, Encodable}; +use crate::control::variable_header::VariableHeaderError; +use crate::topic_name::TopicName; /// Topic name wrapper #[derive(Debug, Eq, PartialEq, Clone)] diff --git a/src/lib.rs b/src/lib.rs index 33edd66..989ff4b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,14 +34,6 @@ //! assert_eq!(VariablePacket::PublishPacket(packet), auto_decode); //! ``` -extern crate log; -extern crate byteorder; -extern crate regex; -#[macro_use] -extern crate lazy_static; -extern crate futures; -extern crate tokio_io; - pub use self::encodable::{Decodable, Encodable}; pub use self::qos::QualityOfService; pub use self::topic_filter::{TopicFilter, TopicFilterRef}; diff --git a/src/packet/connack.rs b/src/packet/connack.rs index 5a7cef9..ea78f0f 100644 --- a/src/packet/connack.rs +++ b/src/packet/connack.rs @@ -2,10 +2,10 @@ use std::io::{Read, Write}; -use {Decodable, Encodable}; -use control::{ControlType, FixedHeader, PacketType}; -use control::variable_header::{ConnackFlags, ConnectReturnCode}; -use packet::{Packet, PacketError}; +use crate::{Decodable, Encodable}; +use crate::control::{ControlType, FixedHeader, PacketType}; +use crate::control::variable_header::{ConnackFlags, ConnectReturnCode}; +use crate::packet::{Packet, PacketError}; /// `CONNACK` packet #[derive(Debug, Eq, PartialEq, Clone)] @@ -79,8 +79,8 @@ mod test { use std::io::Cursor; - use {Decodable, Encodable}; - use control::variable_header::ConnectReturnCode; + use crate::{Decodable, Encodable}; + use crate::control::variable_header::ConnectReturnCode; #[test] pub fn test_connack_packet_basic() { diff --git a/src/packet/connect.rs b/src/packet/connect.rs index e5b296b..bbbfbe2 100644 --- a/src/packet/connect.rs +++ b/src/packet/connect.rs @@ -4,13 +4,13 @@ use std::error::Error; use std::fmt; use std::io::{self, Read, Write}; -use {Decodable, Encodable}; -use control::{ControlType, FixedHeader, PacketType}; -use control::variable_header::{ConnectFlags, KeepAlive, ProtocolLevel, ProtocolName}; -use control::variable_header::protocol_level::SPEC_3_1_1; -use encodable::{StringEncodeError, VarBytes}; -use packet::{Packet, PacketError}; -use topic_name::{TopicName, TopicNameError}; +use crate::{Decodable, Encodable}; +use crate::control::{ControlType, FixedHeader, PacketType}; +use crate::control::variable_header::{ConnectFlags, KeepAlive, ProtocolLevel, ProtocolName}; +use crate::control::variable_header::protocol_level::SPEC_3_1_1; +use crate::encodable::{StringEncodeError, VarBytes}; +use crate::packet::{Packet, PacketError}; +use crate::topic_name::{TopicName, TopicNameError}; /// `CONNECT` packet #[derive(Debug, Eq, PartialEq, Clone)] @@ -373,7 +373,7 @@ mod test { use std::io::Cursor; - use {Decodable, Encodable}; + use crate::{Decodable, Encodable}; #[test] fn test_connect_packet_encode_basic() { diff --git a/src/packet/disconnect.rs b/src/packet/disconnect.rs index 3453dd2..85932f1 100644 --- a/src/packet/disconnect.rs +++ b/src/packet/disconnect.rs @@ -2,8 +2,8 @@ use std::io::{Read, Write}; -use control::{ControlType, FixedHeader, PacketType}; -use packet::{Packet, PacketError}; +use crate::control::{ControlType, FixedHeader, PacketType}; +use crate::packet::{Packet, PacketError}; /// `DISCONNECT` packet #[derive(Debug, Eq, PartialEq, Clone)] diff --git a/src/packet/mod.rs b/src/packet/mod.rs index bbec48f..def124f 100644 --- a/src/packet/mod.rs +++ b/src/packet/mod.rs @@ -8,13 +8,13 @@ use std::io::{self, Read, Write, Cursor}; use futures::Future; use tokio_io::{io as async_io, AsyncRead}; -use {Decodable, Encodable}; -use control::ControlType; -use control::FixedHeader; -use control::fixed_header::FixedHeaderError; -use control::variable_header::VariableHeaderError; -use encodable::StringEncodeError; -use topic_name::TopicNameError; +use crate::{Decodable, Encodable}; +use crate::control::ControlType; +use crate::control::FixedHeader; +use crate::control::fixed_header::FixedHeaderError; +use crate::control::variable_header::VariableHeaderError; +use crate::encodable::StringEncodeError; +use crate::topic_name::TopicNameError; pub use self::connack::ConnackPacket; pub use self::connect::ConnectPacket; @@ -437,7 +437,7 @@ mod test { use std::io::Cursor; - use {Decodable, Encodable}; + use crate::{Decodable, Encodable}; #[test] fn test_variable_packet_basic() { diff --git a/src/packet/pingreq.rs b/src/packet/pingreq.rs index 756a38f..48ff382 100644 --- a/src/packet/pingreq.rs +++ b/src/packet/pingreq.rs @@ -2,8 +2,8 @@ use std::io::{Read, Write}; -use control::{ControlType, FixedHeader, PacketType}; -use packet::{Packet, PacketError}; +use crate::control::{ControlType, FixedHeader, PacketType}; +use crate::packet::{Packet, PacketError}; /// `PINGREQ` packet #[derive(Debug, Eq, PartialEq, Clone)] diff --git a/src/packet/pingresp.rs b/src/packet/pingresp.rs index 7e8ea2f..bb18571 100644 --- a/src/packet/pingresp.rs +++ b/src/packet/pingresp.rs @@ -2,8 +2,8 @@ use std::io::{Read, Write}; -use control::{ControlType, FixedHeader, PacketType}; -use packet::{Packet, PacketError}; +use crate::control::{ControlType, FixedHeader, PacketType}; +use crate::packet::{Packet, PacketError}; /// `PINGRESP` packet #[derive(Debug, Eq, PartialEq, Clone)] diff --git a/src/packet/puback.rs b/src/packet/puback.rs index a9e4201..6e260b6 100644 --- a/src/packet/puback.rs +++ b/src/packet/puback.rs @@ -2,10 +2,10 @@ use std::io::{Read, Write}; -use {Decodable, Encodable}; -use control::{ControlType, FixedHeader, PacketType}; -use control::variable_header::PacketIdentifier; -use packet::{Packet, PacketError}; +use crate::{Decodable, Encodable}; +use crate::control::{ControlType, FixedHeader, PacketType}; +use crate::control::variable_header::PacketIdentifier; +use crate::packet::{Packet, PacketError}; /// `PUBACK` packet #[derive(Debug, Eq, PartialEq, Clone)] diff --git a/src/packet/pubcomp.rs b/src/packet/pubcomp.rs index a5071ad..48cd132 100644 --- a/src/packet/pubcomp.rs +++ b/src/packet/pubcomp.rs @@ -2,10 +2,10 @@ use std::io::{Read, Write}; -use {Decodable, Encodable}; -use control::{ControlType, FixedHeader, PacketType}; -use control::variable_header::PacketIdentifier; -use packet::{Packet, PacketError}; +use crate::{Decodable, Encodable}; +use crate::control::{ControlType, FixedHeader, PacketType}; +use crate::control::variable_header::PacketIdentifier; +use crate::packet::{Packet, PacketError}; /// `PUBCOMP` packet #[derive(Debug, Eq, PartialEq, Clone)] diff --git a/src/packet/publish.rs b/src/packet/publish.rs index 9ceb37d..eee44a9 100644 --- a/src/packet/publish.rs +++ b/src/packet/publish.rs @@ -2,12 +2,12 @@ use std::io::{Read, Write}; -use {Decodable, Encodable}; -use control::{ControlType, FixedHeader, PacketType}; -use control::variable_header::PacketIdentifier; -use packet::{Packet, PacketError}; -use qos::QualityOfService; -use topic_name::TopicName; +use crate::{Decodable, Encodable}; +use crate::control::{ControlType, FixedHeader, PacketType}; +use crate::control::variable_header::PacketIdentifier; +use crate::packet::{Packet, PacketError}; +use crate::qos::QualityOfService; +use crate::topic_name::TopicName; /// QoS with identifier pairs #[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Copy, Clone)] @@ -175,8 +175,8 @@ mod test { use std::io::Cursor; - use {Decodable, Encodable}; - use topic_name::TopicName; + use crate::{Decodable, Encodable}; + use crate::topic_name::TopicName; #[test] fn test_publish_packet_basic() { diff --git a/src/packet/pubrec.rs b/src/packet/pubrec.rs index 569f6fe..360e3bf 100644 --- a/src/packet/pubrec.rs +++ b/src/packet/pubrec.rs @@ -2,10 +2,10 @@ use std::io::{Read, Write}; -use {Decodable, Encodable}; -use control::{ControlType, FixedHeader, PacketType}; -use control::variable_header::PacketIdentifier; -use packet::{Packet, PacketError}; +use crate::{Decodable, Encodable}; +use crate::control::{ControlType, FixedHeader, PacketType}; +use crate::control::variable_header::PacketIdentifier; +use crate::packet::{Packet, PacketError}; /// `PUBREC` packet #[derive(Debug, Eq, PartialEq, Clone)] diff --git a/src/packet/pubrel.rs b/src/packet/pubrel.rs index 30dd1ba..2260f1e 100644 --- a/src/packet/pubrel.rs +++ b/src/packet/pubrel.rs @@ -2,10 +2,10 @@ use std::io::{Read, Write}; -use {Decodable, Encodable}; -use control::{ControlType, FixedHeader, PacketType}; -use control::variable_header::PacketIdentifier; -use packet::{Packet, PacketError}; +use crate::{Decodable, Encodable}; +use crate::control::{ControlType, FixedHeader, PacketType}; +use crate::control::variable_header::PacketIdentifier; +use crate::packet::{Packet, PacketError}; /// `PUBREL` packet #[derive(Debug, Eq, PartialEq, Clone)] diff --git a/src/packet/suback.rs b/src/packet/suback.rs index d28f148..4297dd9 100644 --- a/src/packet/suback.rs +++ b/src/packet/suback.rs @@ -8,11 +8,11 @@ use std::io::{self, Read, Write}; use byteorder::{ReadBytesExt, WriteBytesExt}; -use {Decodable, Encodable}; -use control::{ControlType, FixedHeader, PacketType}; -use control::variable_header::PacketIdentifier; -use packet::{Packet, PacketError}; -use qos::QualityOfService; +use crate::{Decodable, Encodable}; +use crate::control::{ControlType, FixedHeader, PacketType}; +use crate::control::variable_header::PacketIdentifier; +use crate::packet::{Packet, PacketError}; +use crate::qos::QualityOfService; /// Subscribe code #[repr(u8)] diff --git a/src/packet/subscribe.rs b/src/packet/subscribe.rs index ac3f7b8..5c99710 100644 --- a/src/packet/subscribe.rs +++ b/src/packet/subscribe.rs @@ -8,12 +8,12 @@ use std::string::FromUtf8Error; use byteorder::{ReadBytesExt, WriteBytesExt}; -use {Decodable, Encodable, QualityOfService}; -use control::{ControlType, FixedHeader, PacketType}; -use control::variable_header::PacketIdentifier; -use encodable::StringEncodeError; -use packet::{Packet, PacketError}; -use topic_filter::{TopicFilter, TopicFilterError}; +use crate::{Decodable, Encodable, QualityOfService}; +use crate::control::{ControlType, FixedHeader, PacketType}; +use crate::control::variable_header::PacketIdentifier; +use crate::encodable::StringEncodeError; +use crate::packet::{Packet, PacketError}; +use crate::topic_filter::{TopicFilter, TopicFilterError}; /// `SUBSCRIBE` packet #[derive(Debug, Eq, PartialEq, Clone)] diff --git a/src/packet/unsuback.rs b/src/packet/unsuback.rs index 710cd1f..0b0b39b 100644 --- a/src/packet/unsuback.rs +++ b/src/packet/unsuback.rs @@ -2,10 +2,10 @@ use std::io::{Read, Write}; -use {Decodable, Encodable}; -use control::{ControlType, FixedHeader, PacketType}; -use control::variable_header::PacketIdentifier; -use packet::{Packet, PacketError}; +use crate::{Decodable, Encodable}; +use crate::control::{ControlType, FixedHeader, PacketType}; +use crate::control::variable_header::PacketIdentifier; +use crate::packet::{Packet, PacketError}; /// `UNSUBACK` packet #[derive(Debug, Eq, PartialEq, Clone)] diff --git a/src/packet/unsubscribe.rs b/src/packet/unsubscribe.rs index b9a8680..17dfbab 100644 --- a/src/packet/unsubscribe.rs +++ b/src/packet/unsubscribe.rs @@ -6,12 +6,12 @@ use std::fmt; use std::io::{self, Read, Write}; use std::string::FromUtf8Error; -use {Decodable, Encodable}; -use control::{ControlType, FixedHeader, PacketType}; -use control::variable_header::PacketIdentifier; -use encodable::StringEncodeError; -use packet::{Packet, PacketError}; -use topic_filter::{TopicFilter, TopicFilterError}; +use crate::{Decodable, Encodable}; +use crate::control::{ControlType, FixedHeader, PacketType}; +use crate::control::variable_header::PacketIdentifier; +use crate::encodable::StringEncodeError; +use crate::packet::{Packet, PacketError}; +use crate::topic_filter::{TopicFilter, TopicFilterError}; /// `UNSUBSCRIBE` packet #[derive(Debug, Eq, PartialEq, Clone)] diff --git a/src/qos.rs b/src/qos.rs index 9928ea6..ee6bbeb 100644 --- a/src/qos.rs +++ b/src/qos.rs @@ -1,6 +1,6 @@ //! QoS (Quality of Services) -use packet::publish::QoSWithPacketIdentifier; +use crate::packet::publish::QoSWithPacketIdentifier; #[repr(u8)] #[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Copy, Clone)] diff --git a/src/topic_filter.rs b/src/topic_filter.rs index bc97bad..21de971 100644 --- a/src/topic_filter.rs +++ b/src/topic_filter.rs @@ -7,11 +7,12 @@ use std::io::{Read, Write}; use std::mem; use std::ops::Deref; +use lazy_static::lazy_static; use regex::Regex; -use {Decodable, Encodable}; -use encodable::StringEncodeError; -use topic_name::TopicNameRef; +use crate::{Decodable, Encodable}; +use crate::encodable::StringEncodeError; +use crate::topic_name::TopicNameRef; const VALIDATE_TOPIC_FILTER_REGEX: &'static str = r"^(([^+#]*|\+)(/([^+#]*|\+))*(/#)?|#)$"; diff --git a/src/topic_name.rs b/src/topic_name.rs index 56f6c1c..4abd5cf 100644 --- a/src/topic_name.rs +++ b/src/topic_name.rs @@ -7,10 +7,11 @@ use std::io::{Read, Write}; use std::mem; use std::ops::Deref; +use lazy_static::lazy_static; use regex::Regex; -use {Decodable, Encodable}; -use encodable::StringEncodeError; +use crate::{Decodable, Encodable}; +use crate::encodable::StringEncodeError; const TOPIC_NAME_VALIDATE_REGEX: &'static str = r"^[^#+]+$"; From 64052fdedf3d0aafea8f101ac17c1818fc1098df Mon Sep 17 00:00:00 2001 From: Jeff Hiner Date: Thu, 2 Jan 2020 12:35:32 -0700 Subject: [PATCH 3/5] Update straightforward dependencies --- Cargo.toml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f4906cc..d629be6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,17 +10,17 @@ documentation = "https://docs.rs/mqtt-protocol" edition = "2018" [dependencies] -byteorder = "1.2" +byteorder = "1.3" log = "0.4" -regex = "1.0" -lazy_static = "1.1" +regex = "1.3" +lazy_static = "1.4" tokio-io = "0.1" futures = "0.1" [dev-dependencies] clap = "2" -env_logger = "0.5" -uuid = { version = "0.7", features = ["v4"] } +env_logger = "0.7" +uuid = { version = "0.8", features = ["v4"] } time = "0.1" tokio = "0.1" From ff6765eb0f90ea5e5eee1a16475f2c7cad093681 Mon Sep 17 00:00:00 2001 From: Jeff Hiner Date: Fri, 3 Jan 2020 00:19:51 -0700 Subject: [PATCH 4/5] Update futures/tokio to async compatible versions --- Cargo.toml | 18 +++- examples/sub-client-async.rs | 122 ++++++++++----------- src/control/fixed_header.rs | 109 +++++++++++-------- src/packet/mod.rs | 200 ++++++++++++++++++----------------- 4 files changed, 238 insertions(+), 211 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d629be6..b075665 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] authors = ["Y. T. Chung "] name = "mqtt-protocol" -version = "0.7.0" +version = "0.8.0" license = "MIT/Apache-2.0" description = "MQTT Protocol Library" keywords = ["mqtt", "protocol"] @@ -14,15 +14,23 @@ byteorder = "1.3" log = "0.4" regex = "1.3" lazy_static = "1.4" -tokio-io = "0.1" -futures = "0.1" +tokio = { version = "0.2", optional = true } [dev-dependencies] clap = "2" env_logger = "0.7" -uuid = { version = "0.8", features = ["v4"] } time = "0.1" -tokio = "0.1" +tokio = { version = "0.2", features = ["macros", "rt-threaded", "net", "time", "io-util", "stream"] } +futures = { version = "0.3" } +uuid = { version = "0.8", features = ["v4"] } + +[features] +async = ["tokio"] +default = [] [lib] name = "mqtt" + +[[example]] +name = "sub-client-async" +required-features = ["async"] diff --git a/examples/sub-client-async.rs b/examples/sub-client-async.rs index 2638e80..505c3ed 100644 --- a/examples/sub-client-async.rs +++ b/examples/sub-client-async.rs @@ -1,28 +1,18 @@ -extern crate mqtt; -#[macro_use] -extern crate log; -extern crate clap; -extern crate env_logger; -extern crate futures; -extern crate tokio; -extern crate uuid; - use std::env; -use std::fmt::Debug; use std::io::Write; use std::net; use std::str; -use std::time::{Duration, Instant}; +use std::time::Duration; use clap::{App, Arg}; +use log::{error, info, trace}; use uuid::Uuid; -use futures::{future, Future, Stream}; - -use tokio::io::{self, AsyncRead}; +use futures::join; +use futures::prelude::*; use tokio::net::TcpStream; -use tokio::timer::Interval; +use tokio::prelude::*; use mqtt::control::variable_header::ConnectReturnCode; use mqtt::packet::*; @@ -33,13 +23,13 @@ fn generate_client_id() -> String { format!("/MQTT/rust/{}", Uuid::new_v4()) } -fn alt_drop(err: E) { - warn!("{:?}", err); -} - -fn main() { +#[tokio::main] +async fn main() { // configure logging - env::set_var("RUST_LOG", env::var_os("RUST_LOG").unwrap_or_else(|| "info".into())); + env::set_var( + "RUST_LOG", + env::var_os("RUST_LOG").unwrap_or_else(|| "info".into()), + ); env_logger::init(); let matches = App::new("sub-client") @@ -51,7 +41,8 @@ fn main() { .takes_value(true) .required(true) .help("MQTT server address (host:port)"), - ).arg( + ) + .arg( Arg::with_name("SUBSCRIBE") .short("s") .long("subscribe") @@ -59,25 +50,29 @@ fn main() { .multiple(true) .required(true) .help("Channel filter to subscribe"), - ).arg( + ) + .arg( Arg::with_name("USER_NAME") .short("u") .long("username") .takes_value(true) .help("Login user name"), - ).arg( + ) + .arg( Arg::with_name("PASSWORD") .short("p") .long("password") .takes_value(true) .help("Password"), - ).arg( + ) + .arg( Arg::with_name("CLIENT_ID") .short("i") .long("client-identifier") .takes_value(true) .help("Client identifier"), - ).get_matches(); + ) + .get_matches(); let server_addr = matches.value_of("SERVER").unwrap(); let client_id = matches @@ -87,7 +82,12 @@ fn main() { let channel_filters: Vec<(TopicFilter, QualityOfService)> = matches .values_of("SUBSCRIBE") .unwrap() - .map(|c| (TopicFilter::new(c.to_string()).unwrap(), QualityOfService::Level0)) + .map(|c| { + ( + TopicFilter::new(c.to_string()).unwrap(), + QualityOfService::Level0, + ) + }) .collect(); let keep_alive = 10; @@ -142,52 +142,46 @@ fn main() { } // connection made, start the async work - let program = future::ok(()).and_then(move |()| { - let stream = TcpStream::from_std(stream, &Default::default()).unwrap(); - let (mqtt_read, mqtt_write) = stream.split(); + let mut stream = TcpStream::from_std(stream).unwrap(); + let (mut mqtt_read, mut mqtt_write) = stream.split(); - let ping_time = Duration::new((keep_alive / 2) as u64, 0); - let ping_stream = Interval::new(Instant::now() + ping_time, ping_time); + let ping_time = Duration::new((keep_alive / 2) as u64, 0); + let mut ping_stream = tokio::time::interval(ping_time); - let ping_sender = ping_stream.map_err(alt_drop).fold(mqtt_write, |mqtt_write, _| { + let ping_sender = async move { + while let Some(_) = ping_stream.next().await { info!("Sending PINGREQ to broker"); let pingreq_packet = PingreqPacket::new(); let mut buf = Vec::new(); pingreq_packet.encode(&mut buf).unwrap(); - io::write_all(mqtt_write, buf) - .map(|(mqtt_write, _buf)| mqtt_write) - .map_err(alt_drop) - }); - - let receiver = future::loop_fn::<_, (), _, _>(mqtt_read, |mqtt_read| { - VariablePacket::parse(mqtt_read).map(|(mqtt_read, packet)| { - trace!("PACKET {:?}", packet); - - match packet { - VariablePacket::PingrespPacket(..) => { - info!("Receiving PINGRESP from broker .."); - } - VariablePacket::PublishPacket(ref publ) => { - let msg = match str::from_utf8(&publ.payload_ref()[..]) { - Ok(msg) => msg, - Err(err) => { - error!("Failed to decode publish message {:?}", err); - return future::Loop::Continue(mqtt_read); - } - }; - info!("PUBLISH ({}): {}", publ.topic_name(), msg); - } - _ => {} - } + mqtt_write.write_all(&buf).await.unwrap(); + } + }; - future::Loop::Continue(mqtt_read) - }) - }).map_err(alt_drop); + let receiver = async move { + while let Ok(packet) = VariablePacket::parse(&mut mqtt_read).await { + trace!("PACKET {:?}", packet); - ping_sender.join(receiver).map(alt_drop) - }); + match packet { + VariablePacket::PingrespPacket(..) => { + info!("Receiving PINGRESP from broker .."); + } + VariablePacket::PublishPacket(ref publ) => { + let msg = match str::from_utf8(&publ.payload_ref()[..]) { + Ok(msg) => msg, + Err(err) => { + error!("Failed to decode publish message {:?}", err); + continue; + } + }; + info!("PUBLISH ({}): {}", publ.topic_name(), msg); + } + _ => {} + } + } + }; - tokio::run(program); + join!(ping_sender, receiver); } diff --git a/src/control/fixed_header.rs b/src/control/fixed_header.rs index c8baddc..8157e5b 100644 --- a/src/control/fixed_header.rs +++ b/src/control/fixed_header.rs @@ -7,11 +7,11 @@ use std::io::{self, Read, Write}; use byteorder::{ReadBytesExt, WriteBytesExt}; -use futures::{future, Future}; -use tokio_io::{io as async_io, AsyncRead}; +#[cfg(feature = "async")] +use tokio::io::{AsyncRead, AsyncReadExt}; -use crate::{Decodable, Encodable}; use crate::control::packet_type::{PacketType, PacketTypeError}; +use crate::{Decodable, Encodable}; /// Fixed header for each MQTT control packet /// @@ -45,41 +45,50 @@ impl FixedHeader { } } + #[cfg(feature = "async")] /// Asynchronously parse a single fixed header from an AsyncRead type, such as a network /// socket. - pub fn parse(rdr: A) -> impl Future), Error = FixedHeaderError> { - async_io::read_exact(rdr, [0u8]) - .from_err() - .and_then(|(rdr, [type_val])| { - let mut data: Vec = Vec::new(); - data.push(type_val); - future::loop_fn((rdr, 0, 0, data), move |(rdr, mut cur, i, mut data)| { - async_io::read_exact(rdr, [0u8]) - .from_err() - .and_then(move |(rdr, [byte])| { - data.push(byte); - - cur |= (u32::from(byte) & 0x7F) << (7 * i); - - if i >= 4 { - return Err(FixedHeaderError::MalformedRemainingLength); - } - - if byte & 0x80 == 0 { - Ok(future::Loop::Break((rdr, cur, data))) - } else { - Ok(future::Loop::Continue((rdr, cur, i + 1, data))) - } - }) - }).and_then(move |(rdr, remaining_len, data)| match PacketType::from_u8(type_val) { - Ok(packet_type) => Ok((rdr, FixedHeader::new(packet_type, remaining_len), data)), - Err(PacketTypeError::UndefinedType(ty, _)) => { - Err(FixedHeaderError::Unrecognized(ty, remaining_len)) - } - Err(PacketTypeError::ReservedType(ty, _)) => Err(FixedHeaderError::ReservedType(ty, remaining_len)), - Err(err) => Err(From::from(err)), - }) - }) + pub async fn parse( + rdr: &mut A, + ) -> Result<(Self, Vec), FixedHeaderError> { + use std::slice; + let mut type_val = 0u8; + rdr.read_exact(slice::from_mut(&mut type_val)).await?; + + let mut data: Vec = Vec::new(); + data.push(type_val); + let mut remaining_len = 0; + let mut i = 0; + + loop { + let mut byte = 0u8; + rdr.read_exact(slice::from_mut(&mut byte)).await?; + + data.push(byte); + + remaining_len |= (u32::from(byte) & 0x7F) << (7 * i); + + if i >= 4 { + return Err(FixedHeaderError::MalformedRemainingLength); + } + + if byte & 0x80 == 0 { + break; + } else { + i += 1; + } + } + + match PacketType::from_u8(type_val) { + Ok(packet_type) => Ok((FixedHeader::new(packet_type, remaining_len), data)), + Err(PacketTypeError::UndefinedType(ty, _)) => { + Err(FixedHeaderError::Unrecognized(ty, remaining_len)) + } + Err(PacketTypeError::ReservedType(ty, _)) => { + Err(FixedHeaderError::ReservedType(ty, remaining_len)) + } + Err(err) => Err(From::from(err)), + } } } @@ -126,7 +135,10 @@ impl Decodable for FixedHeader { type Err = FixedHeaderError; type Cond = (); - fn decode_with(rdr: &mut R, _rest: Option<()>) -> Result { + fn decode_with( + rdr: &mut R, + _rest: Option<()>, + ) -> Result { let type_val = rdr.read_u8()?; let remaining_len = { let mut cur = 0u32; @@ -148,8 +160,12 @@ impl Decodable for FixedHeader { match PacketType::from_u8(type_val) { Ok(packet_type) => Ok(FixedHeader::new(packet_type, remaining_len)), - Err(PacketTypeError::UndefinedType(ty, _)) => Err(FixedHeaderError::Unrecognized(ty, remaining_len)), - Err(PacketTypeError::ReservedType(ty, _)) => Err(FixedHeaderError::ReservedType(ty, remaining_len)), + Err(PacketTypeError::UndefinedType(ty, _)) => { + Err(FixedHeaderError::Unrecognized(ty, remaining_len)) + } + Err(PacketTypeError::ReservedType(ty, _)) => { + Err(FixedHeaderError::ReservedType(ty, remaining_len)) + } Err(err) => Err(From::from(err)), } } @@ -180,8 +196,12 @@ impl fmt::Display for FixedHeaderError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { &FixedHeaderError::MalformedRemainingLength => write!(f, "Malformed remaining length"), - &FixedHeaderError::Unrecognized(code, length) => write!(f, "Unrecognized header ({}, {})", code, length), - &FixedHeaderError::ReservedType(code, length) => write!(f, "Reserved header ({}, {})", code, length), + &FixedHeaderError::Unrecognized(code, length) => { + write!(f, "Unrecognized header ({}, {})", code, length) + } + &FixedHeaderError::ReservedType(code, length) => { + write!(f, "Reserved header ({}, {})", code, length) + } &FixedHeaderError::PacketTypeError(ref err) => write!(f, "{}", err), &FixedHeaderError::IoError(ref err) => write!(f, "{}", err), } @@ -214,8 +234,8 @@ impl Error for FixedHeaderError { mod test { use super::*; - use crate::{Decodable, Encodable}; use crate::control::packet_type::{ControlType, PacketType}; + use crate::{Decodable, Encodable}; use std::io::Cursor; #[test] @@ -233,7 +253,10 @@ mod test { let stream = b"\x10\xc1\x02"; let mut cursor = Cursor::new(&stream[..]); let header = FixedHeader::decode(&mut cursor).unwrap(); - assert_eq!(header.packet_type, PacketType::with_default(ControlType::Connect)); + assert_eq!( + header.packet_type, + PacketType::with_default(ControlType::Connect) + ); assert_eq!(header.remaining_length, 321); } diff --git a/src/packet/mod.rs b/src/packet/mod.rs index def124f..483aff9 100644 --- a/src/packet/mod.rs +++ b/src/packet/mod.rs @@ -3,18 +3,18 @@ use std::convert::From; use std::error::Error; use std::fmt; -use std::io::{self, Read, Write, Cursor}; +use std::io::{self, Read, Write}; -use futures::Future; -use tokio_io::{io as async_io, AsyncRead}; +#[cfg(feature = "async")] +use tokio::io::{AsyncRead, AsyncReadExt}; -use crate::{Decodable, Encodable}; -use crate::control::ControlType; -use crate::control::FixedHeader; use crate::control::fixed_header::FixedHeaderError; use crate::control::variable_header::VariableHeaderError; +use crate::control::ControlType; +use crate::control::FixedHeader; use crate::encodable::StringEncodeError; use crate::topic_name::TopicNameError; +use crate::{Decodable, Encodable}; pub use self::connack::ConnackPacket; pub use self::connect::ConnectPacket; @@ -33,18 +33,18 @@ pub use self::unsubscribe::UnsubscribePacket; pub use self::publish::QoSWithPacketIdentifier; -pub mod connect; pub mod connack; -pub mod publish; +pub mod connect; +pub mod disconnect; +pub mod pingreq; +pub mod pingresp; pub mod puback; +pub mod pubcomp; +pub mod publish; pub mod pubrec; pub mod pubrel; -pub mod pubcomp; -pub mod pingreq; -pub mod pingresp; -pub mod disconnect; -pub mod subscribe; pub mod suback; +pub mod subscribe; pub mod unsuback; pub mod unsubscribe; @@ -64,7 +64,10 @@ pub trait Packet: Sized { /// Length of bytes after encoding variable header fn encoded_variable_headers_length(&self) -> u32; /// Deocde packet with a `FixedHeader` - fn decode_packet(reader: &mut R, fixed_header: FixedHeader) -> Result>; + fn decode_packet( + reader: &mut R, + fixed_header: FixedHeader, + ) -> Result>; } impl Encodable for T { @@ -80,7 +83,9 @@ impl Encodable for T { } fn encoded_length(&self) -> u32 { - self.fixed_header().encoded_length() + self.encoded_variable_headers_length() + self.payload_ref().encoded_length() + self.fixed_header().encoded_length() + + self.encoded_variable_headers_length() + + self.payload_ref().encoded_length() } } @@ -88,7 +93,10 @@ impl Decodable for T { type Err = PacketError; type Cond = FixedHeader; - fn decode_with(reader: &mut R, fixed_header: Option) -> Result> { + fn decode_with( + reader: &mut R, + fixed_header: Option, + ) -> Result> { let fixed_header: FixedHeader = if let Some(hdr) = fixed_header { hdr } else { @@ -191,68 +199,66 @@ macro_rules! impl_variable_packet { )+ } + #[cfg(feature = "async")] impl VariablePacket { - pub fn peek(rdr: A) -> impl Future), Error = VariablePacketError> { - FixedHeader::parse(rdr).then(|result| { - let (rdr, fixed_header, data) = match result { - Ok((rdr, header, data)) => (rdr, header, data), - Err(FixedHeaderError::Unrecognized(code, _length)) => { - // can't read excess bytes from rdr as it was dropped when an error - // occurred - return Err(VariablePacketError::UnrecognizedPacket(code, Vec::new())); - }, - Err(FixedHeaderError::ReservedType(code, _length)) => { - // can't read excess bytes from rdr as it was dropped when an error - // occurred - return Err(VariablePacketError::ReservedPacket(code, Vec::new())); - }, - Err(err) => return Err(From::from(err)) - }; - - Ok((rdr, fixed_header, data)) - }) + pub async fn peek(rdr: &mut A) -> Result<(FixedHeader, Vec), VariablePacketError> { + let result = FixedHeader::parse(rdr).await; + + let (fixed_header, data) = match result { + Ok((header, data)) => (header, data), + Err(FixedHeaderError::Unrecognized(code, _length)) => { + // TODO: Could read/drop excess bytes from rdr, if you want + return Err(VariablePacketError::UnrecognizedPacket(code, Vec::new())); + }, + Err(FixedHeaderError::ReservedType(code, _length)) => { + // TODO: Could read/drop excess bytes from rdr, if you want + return Err(VariablePacketError::ReservedPacket(code, Vec::new())); + }, + Err(err) => return Err(From::from(err)) + }; + + Ok((fixed_header, data)) } - pub fn peek_finalize(rdr: A) -> impl Future, Self), Error = VariablePacketError> { - Self::peek(rdr).and_then(|(rdr, fixed_header, header_buffer)| { - let packet = vec![0u8; fixed_header.remaining_length as usize]; - async_io::read_exact(rdr, packet) - .from_err() - .and_then(move |(rdr, packet)| { - let mut buff_rdr = Cursor::new(packet.clone()); - let output = match fixed_header.packet_type.control_type { - $( - ControlType::$hdr => { - let pk = <$name as Packet>::decode_packet(&mut buff_rdr, fixed_header)?; - VariablePacket::$name(pk) - } - )+ - }; - let mut result = Vec::new(); - result.extend(header_buffer); - result.extend(packet); - Ok((rdr, result, output)) - }) - }) + + pub async fn peek_finalize(rdr: &mut A) -> Result<(Vec, Self), VariablePacketError> { + use std::io::Cursor; + let (fixed_header, mut result) = Self::peek(rdr).await?; + + let mut packet = vec![0u8; fixed_header.remaining_length as usize]; + rdr.read_exact(&mut packet).await?; + + let mut buff_rdr = Cursor::new(packet.clone()); + let output = match fixed_header.packet_type.control_type { + $( + ControlType::$hdr => { + let pk = <$name as Packet>::decode_packet(&mut buff_rdr, fixed_header)?; + VariablePacket::$name(pk) + } + )+ + }; + + result.extend(packet); + Ok((result, output)) } - pub fn parse(rdr: A) -> impl Future { - Self::peek(rdr).and_then(|(rdr, fixed_header, _)| { - let buffer = vec![0u8; fixed_header.remaining_length as usize]; - async_io::read_exact(rdr, buffer) - .from_err() - .and_then(move |(rdr, buffer)| { - let mut buff_rdr = Cursor::new(buffer); - let output = match fixed_header.packet_type.control_type { - $( - ControlType::$hdr => { - let pk = <$name as Packet>::decode_packet(&mut buff_rdr, fixed_header)?; - VariablePacket::$name(pk) - } - )+ - }; - - Ok((rdr, output)) - }) - }) + + pub async fn parse(rdr: &mut A) -> Result { + use std::io::Cursor; + let (fixed_header, _) = Self::peek(rdr).await?; + + let mut buffer = vec![0u8; fixed_header.remaining_length as usize]; + rdr.read_exact(&mut buffer).await?; + + let mut buff_rdr = Cursor::new(buffer); + let output = match fixed_header.packet_type.control_type { + $( + ControlType::$hdr => { + let pk = <$name as Packet>::decode_packet(&mut buff_rdr, fixed_header)?; + VariablePacket::$name(pk) + } + )+ + }; + + Ok(output) } } @@ -457,9 +463,9 @@ mod test { assert_eq!(var_packet, decoded_packet); } - #[test] - fn test_variable_packet_async_parse() { - use std::io::Cursor; + #[cfg(feature = "async")] + #[tokio::test] + async fn test_variable_packet_async_parse() { let packet = ConnectPacket::new("MQTT".to_owned(), "1234".to_owned()); // Wrap it @@ -470,16 +476,15 @@ mod test { var_packet.encode(&mut buf).unwrap(); // Parse - let async_buf = Cursor::new(buf); - match VariablePacket::parse(async_buf).wait() { - Err(_) => assert!(false), - Ok((_, decoded_packet)) => assert_eq!(var_packet, decoded_packet), - } + let mut async_buf = buf.as_slice(); + let decoded_packet = VariablePacket::parse(&mut async_buf).await.unwrap(); + + assert_eq!(var_packet, decoded_packet); } - #[test] - fn test_variable_packet_async_peek() { - use std::io::Cursor; + #[cfg(feature = "async")] + #[tokio::test] + async fn test_variable_packet_async_peek() { let packet = ConnectPacket::new("MQTT".to_owned(), "1234".to_owned()); // Wrap it @@ -490,20 +495,17 @@ mod test { var_packet.encode(&mut buf).unwrap(); // Peek - let async_buf = Cursor::new(buf.clone()); - match VariablePacket::peek(async_buf.clone()).wait() { - Err(_) => assert!(false), - Ok((_, fixed_header, _)) => assert_eq!(fixed_header.packet_type.control_type, ControlType::Connect), - } + let mut async_buf = buf.as_slice(); + let (fixed_header, _) = VariablePacket::peek(&mut async_buf).await.unwrap(); + + assert_eq!(fixed_header.packet_type.control_type, ControlType::Connect); // Read the rest - match VariablePacket::peek_finalize(async_buf).wait() { - Err(_) => assert!(false), - Ok((_, peeked_buffer, peeked_packet)) => { - assert_eq!(peeked_buffer, buf); - assert_eq!(peeked_packet, var_packet); - } - } - } + let mut async_buf = buf.as_slice(); + let (peeked_buffer, peeked_packet) = + VariablePacket::peek_finalize(&mut async_buf).await.unwrap(); + assert_eq!(peeked_buffer, buf); + assert_eq!(peeked_packet, var_packet); + } } From 2da5e08c23aef030401e37269420b85496992c3e Mon Sep 17 00:00:00 2001 From: Jeff Hiner Date: Fri, 3 Jan 2020 09:43:34 -0700 Subject: [PATCH 5/5] Update documentation --- src/control/fixed_header.rs | 2 ++ src/packet/mod.rs | 13 ++++++++++--- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/src/control/fixed_header.rs b/src/control/fixed_header.rs index 8157e5b..cce640d 100644 --- a/src/control/fixed_header.rs +++ b/src/control/fixed_header.rs @@ -48,6 +48,8 @@ impl FixedHeader { #[cfg(feature = "async")] /// Asynchronously parse a single fixed header from an AsyncRead type, such as a network /// socket. + /// + /// This requires mqtt-rs to be built with `feature = "async"` pub async fn parse( rdr: &mut A, ) -> Result<(Self, Vec), FixedHeaderError> { diff --git a/src/packet/mod.rs b/src/packet/mod.rs index 483aff9..9141853 100644 --- a/src/packet/mod.rs +++ b/src/packet/mod.rs @@ -201,7 +201,10 @@ macro_rules! impl_variable_packet { #[cfg(feature = "async")] impl VariablePacket { - pub async fn peek(rdr: &mut A) -> Result<(FixedHeader, Vec), VariablePacketError> { + pub(crate) async fn peek(rdr: &mut A) -> Result<(FixedHeader, Vec), VariablePacketError> { + // TODO: This doesn't really "peek" the stream without modifying it, and it's unclear what + // the returned Vec is supposed to be. Perhaps change the name or change the functionality + // before making public. let result = FixedHeader::parse(rdr).await; let (fixed_header, data) = match result { @@ -220,7 +223,8 @@ macro_rules! impl_variable_packet { Ok((fixed_header, data)) } - pub async fn peek_finalize(rdr: &mut A) -> Result<(Vec, Self), VariablePacketError> { + #[allow(dead_code)] + pub(crate) async fn peek_finalize(rdr: &mut A) -> Result<(Vec, Self), VariablePacketError> { use std::io::Cursor; let (fixed_header, mut result) = Self::peek(rdr).await?; @@ -241,6 +245,9 @@ macro_rules! impl_variable_packet { Ok((result, output)) } + /// Asynchronously parse a packet from a `tokio::io::AsyncRead` + /// + /// This requires mqtt-rs to be built with `feature = "async"` pub async fn parse(rdr: &mut A) -> Result { use std::io::Cursor; let (fixed_header, _) = Self::peek(rdr).await?; @@ -477,7 +484,7 @@ mod test { // Parse let mut async_buf = buf.as_slice(); - let decoded_packet = VariablePacket::parse(&mut async_buf).await.unwrap(); + let decoded_packet = VariablePacket::parse(&mut async_buf).await.unwrap(); assert_eq!(var_packet, decoded_packet); }