Skip to content

Commit

Permalink
Merge branch 'master' of github.com:asonix/tokio-zmq
Browse files Browse the repository at this point in the history
  • Loading branch information
asonix committed Jan 3, 2018
2 parents 4455570 + 7352fc3 commit 45172e8
Show file tree
Hide file tree
Showing 18 changed files with 994 additions and 846 deletions.
266 changes: 266 additions & 0 deletions examples/dealer_router.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
/*
* This file is part of Tokio ZMQ.
*
* Copyright © 2017 Riley Trautman
*
* Tokio ZMQ is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Tokio ZMQ is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Tokio ZMQ. If not, see <http://www.gnu.org/licenses/>.
*/

#![feature(try_from)]

extern crate futures;
extern crate tokio_core;
extern crate zmq;
extern crate tokio_zmq;
extern crate log;
extern crate env_logger;

use std::rc::Rc;
use std::convert::TryInto;
use std::collections::VecDeque;
use std::thread;
use std::env;

use futures::stream::iter_ok;
use futures::{Future, Stream};
use tokio_core::reactor::Core;
use tokio_zmq::prelude::*;
use tokio_zmq::{Dealer, Rep, Req, Router, Pub, Sub};
use tokio_zmq::{Socket, Error};

pub struct Stop;

impl ControlHandler for Stop {
fn should_stop(&self, _: VecDeque<zmq::Message>) -> bool {
println!("Got stop signal");
true
}
}

fn client() {
let mut core = Core::new().unwrap();
let ctx = Rc::new(zmq::Context::new());
let req: Req = Socket::new(Rc::clone(&ctx), core.handle())
.connect("tcp://localhost:5559")
.try_into()
.unwrap();

let zpub: Pub = Socket::new(Rc::clone(&ctx), core.handle())
.bind("tcp://*:5561")
.try_into()
.unwrap();

let runner = iter_ok(0..10)
.and_then(|request_nbr| {
let msg = zmq::Message::from_slice(b"Hewwo?").unwrap();

let mut multipart = VecDeque::new();
multipart.push_back(msg);

println!("Sending 'Hewwo?' for {}", request_nbr);

let response = req.recv();
let request = req.send(multipart);

request.and_then(move |_| {
response.map(move |multipart| (request_nbr, multipart))
})
})
.for_each(|(request_nbr, multipart)| {
for msg in multipart {
if let Some(msg) = msg.as_str() {
println!("Received reply {} {}", request_nbr, msg);
}
}

Ok(())
})
.and_then(|_| {
let msg = zmq::Message::from_slice(b"").unwrap();

let mut multipart = VecDeque::new();
multipart.push_back(msg);

zpub.send(multipart)
});

let res = core.run(runner);
if let Err(e) = res {
println!("client bailed: {:?}", e);
}
}

fn worker() {
let mut core = Core::new().unwrap();
let ctx = Rc::new(zmq::Context::new());

let rep: Rep = Socket::new(Rc::clone(&ctx), core.handle())
.connect("tcp://localhost:5560")
.try_into()
.unwrap();

let sub: Sub = Socket::new(Rc::clone(&ctx), core.handle())
.connect("tcp://localhost:5561")
.filter(b"")
.try_into()
.unwrap();

let rep = rep.controlled(sub);

let runner = rep.stream(Stop)
.map(|multipart| {
for msg in multipart {
if let Some(msg) = msg.as_str() {
println!("Received request: {}", msg);
}
}

let msg = zmq::Message::from_slice(b"Mr Obama???").unwrap();
let mut multipart = VecDeque::new();
multipart.push_back(msg);

multipart
})
.forward(rep.sink::<Error>());

let res = core.run(runner);

if let Err(e) = res {
println!("worker bailed: {:?}", e);
}
}

fn broker() {
let mut core = Core::new().unwrap();
let ctx = Rc::new(zmq::Context::new());

let router: Router = Socket::new(Rc::clone(&ctx), core.handle())
.bind("tcp://*:5559")
.try_into()
.unwrap();
let sub: Sub = Socket::new(Rc::clone(&ctx), core.handle())
.connect("tcp://localhost:5561")
.filter(b"")
.try_into()
.unwrap();
let router = router.controlled(sub);

let dealer: Dealer = Socket::new(Rc::clone(&ctx), core.handle())
.bind("tcp://*:5560")
.try_into()
.unwrap();
let sub: Sub = Socket::new(Rc::clone(&ctx), core.handle())
.connect("tcp://localhost:5561")
.filter(b"")
.try_into()
.unwrap();
let dealer = dealer.controlled(sub);

let d2r = dealer
.stream(Stop)
.map(|multipart| {
for msg in &multipart {
if let Some(msg) = msg.as_str() {
println!("Relaying message '{}' to router", msg);
} else {
println!("Relaying unknown message to router");
}
}
multipart
})
.forward(router.sink::<Error>());
let r2d = router
.stream(Stop)
.map(|multipart| {
for msg in &multipart {
if let Some(msg) = msg.as_str() {
println!("Relaying message '{}' to dealer", msg);
} else {
println!("Relaying unknown message to dealer");
}
}
multipart
})
.forward(dealer.sink::<Error>());

core.handle().spawn(d2r.map(|_| ()).map_err(|e| {
println!("d2r bailed: {:?}", e)
}));
let res = core.run(r2d);

if let Err(e) = res {
println!("broker bailed: {:?}", e);
}
}

#[derive(Debug, PartialEq)]
enum Selection {
All,
Broker,
Worker,
Client,
}

impl Selection {
fn broker(&self) -> bool {
*self == Selection::All || *self == Selection::Broker
}

fn worker(&self) -> bool {
*self == Selection::All || *self == Selection::Worker
}

fn client(&self) -> bool {
*self == Selection::All || *self == Selection::Client
}
}

fn main() {
env_logger::init().unwrap();

let selection = env::var("SELECTION").unwrap_or("all".into());

let selection = match selection.as_ref() {
"broker" => Selection::Broker,
"worker" => Selection::Worker,
"client" => Selection::Client,
_ => Selection::All,
};

println!("SELECTION: {:?}", selection);

let mut broker_thread = None;
let mut worker_thread = None;
let mut client_thread = None;

if selection.broker() {
broker_thread = Some(thread::spawn(broker));
}
if selection.worker() {
worker_thread = Some(thread::spawn(worker));
}
if selection.client() {
client_thread = Some(thread::spawn(client));
}

if let Some(broker_thread) = broker_thread {
broker_thread.join().unwrap();
}
if let Some(worker_thread) = worker_thread {
worker_thread.join().unwrap();
}
if let Some(client_thread) = client_thread {
client_thread.join().unwrap();
}
}
2 changes: 1 addition & 1 deletion examples/req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ fn main() {
.try_into()
.unwrap();

let runner = iter_ok(0..10)
let runner = iter_ok(0..10000)
.and_then(|i| {
let mut multipart = VecDeque::new();
let msg1 = zmq::Message::from_slice(format!("Hewwo? {}", i).as_bytes()).unwrap();
Expand Down
Loading

0 comments on commit 45172e8

Please sign in to comment.