Skip to content

Commit

Permalink
Merge pull request #37 from jeff-at-dwelo/update_futures
Browse files Browse the repository at this point in the history
Update async/await with tokio
  • Loading branch information
zonyitoo authored Jan 3, 2020
2 parents 004a1ee + 2da5e08 commit 8f45d33
Show file tree
Hide file tree
Showing 32 changed files with 356 additions and 325 deletions.
27 changes: 18 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,27 +1,36 @@
[package]
authors = ["Y. T. Chung <[email protected]>"]
name = "mqtt-protocol"
version = "0.7.0"
version = "0.8.0"
license = "MIT/Apache-2.0"
description = "MQTT Protocol Library"
keywords = ["mqtt", "protocol"]
repository = "https://github.com/zonyitoo/mqtt-rs"
documentation = "https://docs.rs/mqtt-protocol"
edition = "2018"

[dependencies]
byteorder = "1.2"
byteorder = "1.3"
log = "0.4"
regex = "1.0"
lazy_static = "1.1"
tokio-io = "0.1"
futures = "0.1"
regex = "1.3"
lazy_static = "1.4"
tokio = { version = "0.2", optional = true }

[dev-dependencies]
clap = "2"
env_logger = "0.5"
uuid = { version = "0.7", features = ["v4"] }
env_logger = "0.7"
time = "0.1"
tokio = "0.1"
tokio = { version = "0.2", features = ["macros", "rt-threaded", "net", "time", "io-util", "stream"] }
futures = { version = "0.3" }
uuid = { version = "0.8", features = ["v4"] }

[features]
async = ["tokio"]
default = []

[lib]
name = "mqtt"

[[example]]
name = "sub-client-async"
required-features = ["async"]
4 changes: 2 additions & 2 deletions examples/pub-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,11 @@ fn main() {
let mut line = String::new();
stdin.read_line(&mut line).unwrap();

if line.trim_right() == "" {
if line.trim_end() == "" {
continue;
}

let message = format!("{}: {}", user_name, line.trim_right());
let message = format!("{}: {}", user_name, line.trim_end());

for chan in &channels {
let publish_packet = PublishPacket::new(chan.clone(), QoSWithPacketIdentifier::Level0, message.clone());
Expand Down
122 changes: 58 additions & 64 deletions examples/sub-client-async.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,18 @@
extern crate mqtt;
#[macro_use]
extern crate log;
extern crate clap;
extern crate env_logger;
extern crate futures;
extern crate tokio;
extern crate uuid;

use std::env;
use std::fmt::Debug;
use std::io::Write;
use std::net;
use std::str;
use std::time::{Duration, Instant};
use std::time::Duration;

use clap::{App, Arg};
use log::{error, info, trace};

use uuid::Uuid;

use futures::{future, Future, Stream};

use tokio::io::{self, AsyncRead};
use futures::join;
use futures::prelude::*;
use tokio::net::TcpStream;
use tokio::timer::Interval;
use tokio::prelude::*;

use mqtt::control::variable_header::ConnectReturnCode;
use mqtt::packet::*;
Expand All @@ -33,13 +23,13 @@ fn generate_client_id() -> String {
format!("/MQTT/rust/{}", Uuid::new_v4())
}

fn alt_drop<E: Debug>(err: E) {
warn!("{:?}", err);
}

fn main() {
#[tokio::main]
async fn main() {
// configure logging
env::set_var("RUST_LOG", env::var_os("RUST_LOG").unwrap_or_else(|| "info".into()));
env::set_var(
"RUST_LOG",
env::var_os("RUST_LOG").unwrap_or_else(|| "info".into()),
);
env_logger::init();

let matches = App::new("sub-client")
Expand All @@ -51,33 +41,38 @@ fn main() {
.takes_value(true)
.required(true)
.help("MQTT server address (host:port)"),
).arg(
)
.arg(
Arg::with_name("SUBSCRIBE")
.short("s")
.long("subscribe")
.takes_value(true)
.multiple(true)
.required(true)
.help("Channel filter to subscribe"),
).arg(
)
.arg(
Arg::with_name("USER_NAME")
.short("u")
.long("username")
.takes_value(true)
.help("Login user name"),
).arg(
)
.arg(
Arg::with_name("PASSWORD")
.short("p")
.long("password")
.takes_value(true)
.help("Password"),
).arg(
)
.arg(
Arg::with_name("CLIENT_ID")
.short("i")
.long("client-identifier")
.takes_value(true)
.help("Client identifier"),
).get_matches();
)
.get_matches();

let server_addr = matches.value_of("SERVER").unwrap();
let client_id = matches
Expand All @@ -87,7 +82,12 @@ fn main() {
let channel_filters: Vec<(TopicFilter, QualityOfService)> = matches
.values_of("SUBSCRIBE")
.unwrap()
.map(|c| (TopicFilter::new(c.to_string()).unwrap(), QualityOfService::Level0))
.map(|c| {
(
TopicFilter::new(c.to_string()).unwrap(),
QualityOfService::Level0,
)
})
.collect();

let keep_alive = 10;
Expand Down Expand Up @@ -142,52 +142,46 @@ fn main() {
}

// connection made, start the async work
let program = future::ok(()).and_then(move |()| {
let stream = TcpStream::from_std(stream, &Default::default()).unwrap();
let (mqtt_read, mqtt_write) = stream.split();
let mut stream = TcpStream::from_std(stream).unwrap();
let (mut mqtt_read, mut mqtt_write) = stream.split();

let ping_time = Duration::new((keep_alive / 2) as u64, 0);
let ping_stream = Interval::new(Instant::now() + ping_time, ping_time);
let ping_time = Duration::new((keep_alive / 2) as u64, 0);
let mut ping_stream = tokio::time::interval(ping_time);

let ping_sender = ping_stream.map_err(alt_drop).fold(mqtt_write, |mqtt_write, _| {
let ping_sender = async move {
while let Some(_) = ping_stream.next().await {
info!("Sending PINGREQ to broker");

let pingreq_packet = PingreqPacket::new();

let mut buf = Vec::new();
pingreq_packet.encode(&mut buf).unwrap();
io::write_all(mqtt_write, buf)
.map(|(mqtt_write, _buf)| mqtt_write)
.map_err(alt_drop)
});

let receiver = future::loop_fn::<_, (), _, _>(mqtt_read, |mqtt_read| {
VariablePacket::parse(mqtt_read).map(|(mqtt_read, packet)| {
trace!("PACKET {:?}", packet);

match packet {
VariablePacket::PingrespPacket(..) => {
info!("Receiving PINGRESP from broker ..");
}
VariablePacket::PublishPacket(ref publ) => {
let msg = match str::from_utf8(&publ.payload_ref()[..]) {
Ok(msg) => msg,
Err(err) => {
error!("Failed to decode publish message {:?}", err);
return future::Loop::Continue(mqtt_read);
}
};
info!("PUBLISH ({}): {}", publ.topic_name(), msg);
}
_ => {}
}
mqtt_write.write_all(&buf).await.unwrap();
}
};

future::Loop::Continue(mqtt_read)
})
}).map_err(alt_drop);
let receiver = async move {
while let Ok(packet) = VariablePacket::parse(&mut mqtt_read).await {
trace!("PACKET {:?}", packet);

ping_sender.join(receiver).map(alt_drop)
});
match packet {
VariablePacket::PingrespPacket(..) => {
info!("Receiving PINGRESP from broker ..");
}
VariablePacket::PublishPacket(ref publ) => {
let msg = match str::from_utf8(&publ.payload_ref()[..]) {
Ok(msg) => msg,
Err(err) => {
error!("Failed to decode publish message {:?}", err);
continue;
}
};
info!("PUBLISH ({}): {}", publ.topic_name(), msg);
}
_ => {}
}
}
};

tokio::run(program);
join!(ping_sender, receiver);
}
Loading

0 comments on commit 8f45d33

Please sign in to comment.