Skip to content

Commit

Permalink
Add documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
asonix committed Jan 2, 2018
1 parent 9cf515e commit 7f58c4d
Show file tree
Hide file tree
Showing 28 changed files with 966 additions and 118 deletions.
6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
[package]
name = "tokio-zmq"
description = "Provides Futures abstractions for ZeroMQ on the Tokio event-loop"
repository = "https://github.com/asonix/tokio-zmq"
version = "0.1.0"
authors = ["Riley Trautman <[email protected]>"]
license = "GPL-3.0"
authors = ["Riley Trautman <[email protected]>"]
repository = "https://github.com/asonix/tokio-zmq"
readme = "README.md"
keywords = ["zmq", "zeromq", "futures", "tokio"]

[dependencies]
zmq = "0.8"
Expand Down
18 changes: 16 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ Currently Supported Sockets
- SUB
- PUSH
- PULL
- XPUB
- XSUB
- PAIR

See the [examples folder](https://github.com/asonix/zmq-futures/tree/master/examples) for usage examples.

Expand All @@ -29,15 +32,16 @@ use std::convert::TryInto;

use futures::Stream;
use tokio_core::reactor::Core;
use tokio_zmq::{SinkSocket, Socket, StreamSocket, Error};
use tokio_zmq::prelude::*;
use tokio_zmq::{Socket, Error};
use tokio_zmq::Rep; // the socket type you want

fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let context = Rc::new(zmq::Context::new());
let rep: Rep = Socket::new(context, handle)
.bind("tcp://*:5560".into())
.bind("tcp://*:5560")
.try_into()
.unwrap()

Expand All @@ -53,6 +57,16 @@ fn main() {
}
```

### Running the examples
The `req.rs` and `rep.rs` examples are designed to be used together. The `rep` example starts a server with a REP socket, and the `req` example queries that server with a REQ socket.

The `zpub.rs` and `sub.rs` examples should be used togheter. `zpub` produces values that `sub` consumes.

The `push.rs`, `pull_push.rs`, and `pull.rs` files should be used together. `push` produces values, which are relayed by `pull_push` to `pull`, which consumes them and sends a stop signal to itself and to `pull_push`.

### Contributing
Feel free to open issues for anything you find an issue with. Please note that any contributed code will be licensed under the GPLv3.

### License

Copyright © 2017 Riley Trautman
Expand Down
11 changes: 6 additions & 5 deletions examples/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ use std::collections::VecDeque;

use futures::{Future, Stream};
use tokio_core::reactor::Core;
use tokio_zmq::prelude::*;
use tokio_zmq::{Pub, Pull, PullControlled, Sub};
use tokio_zmq::{ControlHandler, ControlledStreamSocket, SinkSocket, Socket};
use tokio_zmq::Socket;

pub struct Stop;

Expand All @@ -47,17 +48,17 @@ fn main() {
let handle = core.handle();
let ctx = Rc::new(zmq::Context::new());
let cmd: Sub = Socket::new(ctx.clone(), handle.clone())
.connect("tcp://localhost:5559".into())
.filter(Vec::new())
.connect("tcp://localhost:5559")
.filter(b"")
.try_into()
.unwrap();
let conn: Pull = Socket::new(ctx.clone(), handle.clone())
.bind("tcp://*:5558".into())
.bind("tcp://*:5558")
.try_into()
.unwrap();
let conn: PullControlled = conn.controlled(cmd);
let send_cmd: Pub = Socket::new(ctx, handle.clone())
.bind("tcp://*:5559".into())
.bind("tcp://*:5559")
.try_into()
.unwrap();

Expand Down
9 changes: 5 additions & 4 deletions examples/pull_push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ use std::collections::VecDeque;

use futures::Stream;
use tokio_core::reactor::Core;
use tokio_zmq::prelude::*;
use tokio_zmq::{Pull, PullControlled, Push, Sub};
use tokio_zmq::{Error as ZmqFutError, ControlHandler, ControlledStreamSocket, SinkSocket, Socket};
use tokio_zmq::{Error as ZmqFutError, Socket};

pub struct Stop;

Expand All @@ -47,12 +48,12 @@ fn main() {
let handle = core.handle();
let ctx = Rc::new(zmq::Context::new());
let cmd: Sub = Socket::new(ctx.clone(), handle.clone())
.connect("tcp://localhost:5559".into())
.filter(Vec::new())
.connect("tcp://localhost:5559")
.filter(b"")
.try_into()
.unwrap();
let stream: Pull = Socket::new(ctx.clone(), handle.clone())
.connect("tcp://localhost:5557".into())
.connect("tcp://localhost:5557")
.try_into()
.unwrap();
let stream: PullControlled = stream.controlled(cmd);
Expand Down
7 changes: 4 additions & 3 deletions examples/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ use std::collections::VecDeque;
use futures::{Future, Stream};
use futures::stream::iter_ok;
use tokio_core::reactor::{Core, Interval};
use tokio_zmq::{Error as ZmqFutError, Push, SinkSocket, Socket};
use tokio_zmq::prelude::*;
use tokio_zmq::{Error as ZmqFutError, Push, Socket};

#[derive(Debug)]
enum Error {
Expand Down Expand Up @@ -65,11 +66,11 @@ fn main() {
let handle = core.handle();
let ctx = Rc::new(zmq::Context::new());
let workers: Push = Socket::new(ctx.clone(), handle.clone())
.bind("tcp://*:5557".into())
.bind("tcp://*:5557")
.try_into()
.unwrap();
let sink: Push = Socket::new(ctx, handle.clone())
.connect("tcp://localhost:5558".into())
.connect("tcp://localhost:5558")
.try_into()
.unwrap();

Expand Down
5 changes: 3 additions & 2 deletions examples/rep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ use std::convert::TryInto;

use futures::Stream;
use tokio_core::reactor::Core;
use tokio_zmq::{SinkSocket, Socket, StreamSocket, Rep, Error};
use tokio_zmq::prelude::*;
use tokio_zmq::{Socket, Rep, Error};

fn main() {
env_logger::init().unwrap();
Expand All @@ -40,7 +41,7 @@ fn main() {
let handle = core.handle();
let ctx = Rc::new(zmq::Context::new());
let rep: Rep = Socket::new(ctx, handle)
.bind("tcp://*:5560".into())
.bind("tcp://*:5560")
.try_into()
.unwrap();

Expand Down
5 changes: 3 additions & 2 deletions examples/req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ use std::collections::VecDeque;
use futures::{Future, Stream};
use futures::stream::iter_ok;
use tokio_core::reactor::Core;
use tokio_zmq::{FutureSocket, Socket, Req};
use tokio_zmq::prelude::*;
use tokio_zmq::{Socket, Req};

fn main() {
env_logger::init().unwrap();
Expand All @@ -42,7 +43,7 @@ fn main() {
let handle = core.handle();
let ctx = Rc::new(zmq::Context::new());
let req: Req = Socket::new(ctx, handle)
.connect("tcp://localhost:5560".into())
.connect("tcp://localhost:5560")
.try_into()
.unwrap();

Expand Down
7 changes: 4 additions & 3 deletions examples/sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@ use std::convert::TryInto;

use futures::Stream;
use tokio_core::reactor::Core;
use tokio_zmq::{Socket, StreamSocket, Sub};
use tokio_zmq::prelude::*;
use tokio_zmq::{Socket, Sub};

fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let ctx = Rc::new(zmq::Context::new());
let sub: Sub = Socket::new(ctx, handle)
.connect("tcp://localhost:5556".into())
.filter(Vec::new())
.connect("tcp://localhost:5556")
.filter(b"")
.try_into()
.unwrap();

Expand Down
5 changes: 3 additions & 2 deletions examples/zpub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ use std::collections::VecDeque;

use futures::Stream;
use tokio_core::reactor::{Core, Interval};
use tokio_zmq::{Error as ZmqFutError, Pub, SinkSocket, Socket};
use tokio_zmq::prelude::*;
use tokio_zmq::{Error as ZmqFutError, Pub, Socket};

#[derive(Debug)]
enum Error {
Expand Down Expand Up @@ -64,7 +65,7 @@ fn main() {
let handle = core.handle();
let ctx = Rc::new(zmq::Context::new());
let zpub: Pub = Socket::new(ctx, handle)
.bind("tcp://*:5556".into())
.bind("tcp://*:5556")
.try_into()
.unwrap();

Expand Down
97 changes: 93 additions & 4 deletions src/async/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
* along with Tokio ZMQ. If not, see <http://www.gnu.org/licenses/>.
*/

//! This module contains definitions for `MultipartRequest` and `MultipartResponse`, the two types that
//! implement `futures::Future`.
use std::collections::VecDeque;
use std::rc::Rc;

Expand All @@ -26,11 +29,55 @@ use tokio_file_unix::File;
use futures::{Async, Future, Poll};
use futures::task;

use async::sink::MsgPlace;
use error::Error;
use super::Multipart;
use ZmqFile;

use super::{MsgPlace, Multipart};
use file::ZmqFile;

/// The `MultipartRequest` Future handles asynchronously sending data to a socket.
///
/// You shouldn't ever need to manually create one, but if you do, the following will suffice.
/// ### Example
/// ```rust
/// # #![feature(conservative_impl_trait)]
/// # #![feature(try_from)]
/// #
/// # extern crate zmq;
/// # extern crate futures;
/// # extern crate tokio_core;
/// # extern crate tokio_zmq;
/// #
/// # use std::rc::Rc;
/// # use std::convert::TryInto;
/// # use std::collections::VecDeque;
/// #
/// # use futures::Future;
/// # use tokio_core::reactor::Core;
/// # use tokio_zmq::prelude::*;
/// # use tokio_zmq::async::MultipartRequest;
/// # use tokio_zmq::{Error, Rep, Socket};
/// #
/// # fn main() {
/// # get_sock();
/// # }
/// # fn get_sock() -> impl Future<Item = (), Error = Error> {
/// # let core = Core::new().unwrap();
/// # let ctx = Rc::new(zmq::Context::new());
/// # let rep: Rep = Socket::new(ctx, core.handle())
/// # .bind("tcp://*:5567")
/// # .try_into()
/// # .unwrap();
/// # let socket = rep.socket();
/// # let sock = socket.inner_sock();
/// # let file = socket.inner_file();
/// # let mut multipart = VecDeque::new();
/// # let msg = zmq::Message::from_slice(format!("Hey").as_bytes()).unwrap();
/// # multipart.push_back(msg);
/// MultipartRequest::new(sock, file, multipart).and_then(|_| {
/// // succesfull request
/// # Ok(())
/// })
/// # }
/// ```
pub struct MultipartRequest {
sock: Rc<zmq::Socket>,
file: Rc<PollEvented<File<ZmqFile>>>,
Expand Down Expand Up @@ -148,6 +195,48 @@ impl Future for MultipartRequest {
}
}

/// The `MultipartResponse` Future handles asynchronously getting data from a socket.
///
/// You shouldn't ever need to manually create one, but if you do, the following will suffice.
/// ### Example
/// ```rust
/// # #![feature(conservative_impl_trait)]
/// # #![feature(try_from)]
/// #
/// # extern crate zmq;
/// # extern crate futures;
/// # extern crate tokio_core;
/// # extern crate tokio_zmq;
/// #
/// # use std::rc::Rc;
/// # use std::convert::TryInto;
/// # use std::collections::VecDeque;
/// #
/// # use futures::Future;
/// # use tokio_core::reactor::Core;
/// # use tokio_zmq::prelude::*;
/// # use tokio_zmq::async::{Multipart, MultipartResponse};
/// # use tokio_zmq::{Error, Rep, Socket};
/// #
/// # fn main() {
/// # get_sock();
/// # }
/// # fn get_sock() -> impl Future<Item = Multipart, Error = Error> {
/// # let core = Core::new().unwrap();
/// # let ctx = Rc::new(zmq::Context::new());
/// # let rep: Rep = Socket::new(ctx, core.handle())
/// # .bind("tcp://*:5567")
/// # .try_into()
/// # .unwrap();
/// # let socket = rep.socket();
/// # let sock = socket.inner_sock();
/// # let file = socket.inner_file();
/// MultipartResponse::new(sock, file).and_then(|multipart| {
/// // handle multipart response
/// # Ok(multipart)
/// })
/// # }
/// ```
pub struct MultipartResponse {
sock: Rc<zmq::Socket>,
file: Rc<PollEvented<File<ZmqFile>>>,
Expand Down
Loading

0 comments on commit 7f58c4d

Please sign in to comment.