Skip to content

Commit 44c501b

Browse files
authored
Implement subscription support in the client (#436)
* Rustfmt fixes. * Enable logging in tests. * Improve pubsub example documentation. * Allow Response to contain a Notification. * Add jsonrpc-pubsub as a core-client dependency. * Add pubsub support to local transport. * Turn RpcMessage into an enum. * Add SubscriptionStream. * Add subscription support to RawClient. * Add subscription support to RequestBuilder. * Add subscription support to Duplex. * Add subscription test. * Add TypedSubscriptionStream. * Add subscription support to TypedClient. * Test typed client subscription. * Add subscription support to procmacro. * Handle typed::Subscriber. * Address grumbles. * rustfmt fixes. * Fix tests. * Avoid unwrapping. * Fix doc tests. * Impl From instead of Into. * Improve code. * Deny warnings and missing docs. * Fix explicit dyn warning. * Implement Debug for Duplex. * Remove allow(deprecated). * Fix build. * Fix build on windows. * Add constructor to LocalRpc. * Parse output into subscription id. * Should handle multiple subscriptions. * Add Deserialize bound for subscription args in client * Remove deny(warnings) * Rewrite using less maps. * Bounds for wrapped Subscriber generic types (#3) * Handle Subscriber wrapped generic type * Pubsub Subscriber wrapper with multiple generic types
1 parent 3bee17b commit 44c501b

40 files changed

+941
-252
lines changed

core-client/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,6 @@
55
//!
66
//! See documentation of [`jsonrpc-client-transports`](../jsonrpc_client_transports/) for more details.
77
8+
#![deny(missing_docs)]
9+
810
pub use jsonrpc_client_transports::*;

core-client/transports/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ futures = "0.1.26"
3333
hyper = { version = "0.12", optional = true }
3434
hyper-tls = { version = "0.3.2", optional = true }
3535
jsonrpc-core = { version = "12.0", path = "../../core" }
36+
jsonrpc-pubsub = { version = "12.0", path = "../../pubsub" }
3637
log = "0.4"
3738
serde = "1.0"
3839
serde_json = "1.0"

core-client/transports/src/lib.rs

Lines changed: 229 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use jsonrpc_core::{Error, Params};
99
use serde::de::DeserializeOwned;
1010
use serde::Serialize;
1111
use serde_json::Value;
12+
use std::marker::PhantomData;
1213

1314
pub mod transports;
1415

@@ -27,9 +28,6 @@ pub enum RpcError {
2728
/// Request timed out.
2829
#[fail(display = "Request timed out")]
2930
Timeout,
30-
/// The server returned a response with an unknown id.
31-
#[fail(display = "Server returned a response with an unknown id")]
32-
UnknownId,
3331
/// Not rpc specific errors.
3432
#[fail(display = "{}", _0)]
3533
Other(failure::Error),
@@ -41,9 +39,8 @@ impl From<Error> for RpcError {
4139
}
4240
}
4341

44-
/// A message sent to the `RpcClient`. This is public so that
45-
/// the derive crate can generate a client.
46-
struct RpcMessage {
42+
/// A rpc call message.
43+
struct CallMessage {
4744
/// The rpc method name.
4845
method: String,
4946
/// The rpc method parameters.
@@ -53,6 +50,46 @@ struct RpcMessage {
5350
sender: oneshot::Sender<Result<Value, RpcError>>,
5451
}
5552

53+
/// A rpc subscription.
54+
struct Subscription {
55+
/// The subscribe method name.
56+
subscribe: String,
57+
/// The subscribe method parameters.
58+
subscribe_params: Params,
59+
/// The name of the notification.
60+
notification: String,
61+
/// The unsubscribe method name.
62+
unsubscribe: String,
63+
}
64+
65+
/// A rpc subscribe message.
66+
struct SubscribeMessage {
67+
/// The subscription to subscribe to.
68+
subscription: Subscription,
69+
/// The channel to send notifications to.
70+
sender: mpsc::Sender<Result<Value, RpcError>>,
71+
}
72+
73+
/// A message sent to the `RpcClient`.
74+
enum RpcMessage {
75+
/// Make a rpc call.
76+
Call(CallMessage),
77+
/// Subscribe to a notification.
78+
Subscribe(SubscribeMessage),
79+
}
80+
81+
impl From<CallMessage> for RpcMessage {
82+
fn from(msg: CallMessage) -> Self {
83+
RpcMessage::Call(msg)
84+
}
85+
}
86+
87+
impl From<SubscribeMessage> for RpcMessage {
88+
fn from(msg: SubscribeMessage) -> Self {
89+
RpcMessage::Subscribe(msg)
90+
}
91+
}
92+
5693
/// A channel to a `RpcClient`.
5794
#[derive(Clone)]
5895
pub struct RpcChannel(mpsc::Sender<RpcMessage>);
@@ -99,6 +136,67 @@ impl Future for RpcFuture {
99136
}
100137
}
101138

139+
/// The stream returned by a subscribe.
140+
pub struct SubscriptionStream {
141+
recv: mpsc::Receiver<Result<Value, RpcError>>,
142+
}
143+
144+
impl SubscriptionStream {
145+
/// Crates a new `SubscriptionStream`.
146+
pub fn new(recv: mpsc::Receiver<Result<Value, RpcError>>) -> Self {
147+
SubscriptionStream { recv }
148+
}
149+
}
150+
151+
impl Stream for SubscriptionStream {
152+
type Item = Value;
153+
type Error = RpcError;
154+
155+
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
156+
match self.recv.poll() {
157+
Ok(Async::Ready(Some(Ok(value)))) => Ok(Async::Ready(Some(value))),
158+
Ok(Async::Ready(Some(Err(error)))) => Err(error),
159+
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
160+
Ok(Async::NotReady) => Ok(Async::NotReady),
161+
Err(()) => Err(RpcError::Other(format_err!("mpsc channel returned an error."))),
162+
}
163+
}
164+
}
165+
166+
/// A typed subscription stream.
167+
pub struct TypedSubscriptionStream<T> {
168+
_marker: PhantomData<T>,
169+
returns: &'static str,
170+
stream: SubscriptionStream,
171+
}
172+
173+
impl<T> TypedSubscriptionStream<T> {
174+
/// Creates a new `TypedSubscriptionStream`.
175+
pub fn new(stream: SubscriptionStream, returns: &'static str) -> Self {
176+
TypedSubscriptionStream {
177+
_marker: PhantomData,
178+
returns,
179+
stream,
180+
}
181+
}
182+
}
183+
184+
impl<T: DeserializeOwned + 'static> Stream for TypedSubscriptionStream<T> {
185+
type Item = T;
186+
type Error = RpcError;
187+
188+
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
189+
let result = match self.stream.poll()? {
190+
Async::Ready(Some(value)) => serde_json::from_value::<T>(value)
191+
.map(|result| Async::Ready(Some(result)))
192+
.map_err(|error| RpcError::ParseError(self.returns.into(), error.into()))?,
193+
Async::Ready(None) => Async::Ready(None),
194+
Async::NotReady => Async::NotReady,
195+
};
196+
Ok(result)
197+
}
198+
}
199+
102200
/// Client for raw JSON RPC requests
103201
#[derive(Clone)]
104202
pub struct RawClient(RpcChannel);
@@ -113,16 +211,40 @@ impl RawClient {
113211
/// Call RPC with raw JSON
114212
pub fn call_method(&self, method: &str, params: Params) -> impl Future<Item = Value, Error = RpcError> {
115213
let (sender, receiver) = oneshot::channel();
116-
let msg = RpcMessage {
214+
let msg = CallMessage {
117215
method: method.into(),
118216
params,
119217
sender,
120218
};
121219
self.0
122-
.send(msg)
220+
.send(msg.into())
123221
.map_err(|error| RpcError::Other(error.into()))
124222
.and_then(|_| RpcFuture::new(receiver))
125223
}
224+
225+
/// Subscribe to topic with raw JSON
226+
pub fn subscribe(
227+
&self,
228+
subscribe: &str,
229+
subscribe_params: Params,
230+
notification: &str,
231+
unsubscribe: &str,
232+
) -> impl Future<Item = SubscriptionStream, Error = RpcError> {
233+
let (sender, receiver) = mpsc::channel(0);
234+
let msg = SubscribeMessage {
235+
subscription: Subscription {
236+
subscribe: subscribe.into(),
237+
subscribe_params,
238+
notification: notification.into(),
239+
unsubscribe: unsubscribe.into(),
240+
},
241+
sender,
242+
};
243+
self.0
244+
.send(msg.into())
245+
.map_err(|error| RpcError::Other(error.into()))
246+
.map(|_| SubscriptionStream::new(receiver))
247+
}
126248
}
127249

128250
/// Client for typed JSON RPC requests
@@ -167,14 +289,46 @@ impl TypedClient {
167289
future::done(result)
168290
}))
169291
}
292+
293+
/// Subscribe with serialization of request and deserialization of response
294+
pub fn subscribe<T: Serialize, R: DeserializeOwned + 'static>(
295+
&self,
296+
subscribe: &str,
297+
subscribe_params: T,
298+
topic: &str,
299+
unsubscribe: &str,
300+
returns: &'static str,
301+
) -> impl Future<Item = TypedSubscriptionStream<R>, Error = RpcError> {
302+
let args = serde_json::to_value(subscribe_params)
303+
.expect("Only types with infallible serialisation can be used for JSON-RPC");
304+
305+
let params = match args {
306+
Value::Array(vec) => Params::Array(vec),
307+
Value::Null => Params::None,
308+
_ => {
309+
return future::Either::A(future::err(RpcError::Other(format_err!(
310+
"RPC params should serialize to a JSON array, or null"
311+
))))
312+
}
313+
};
314+
315+
let typed_stream = self
316+
.0
317+
.subscribe(subscribe, params, topic, unsubscribe)
318+
.map(move |stream| TypedSubscriptionStream::new(stream, returns));
319+
future::Either::B(typed_stream)
320+
}
170321
}
171322

172323
#[cfg(test)]
173324
mod tests {
174325
use super::*;
175326
use crate::transports::local;
176327
use crate::{RpcChannel, RpcError, TypedClient};
177-
use jsonrpc_core::{self, IoHandler};
328+
use jsonrpc_core::{self as core, IoHandler};
329+
use jsonrpc_pubsub::{PubSubHandler, Subscriber, SubscriptionId};
330+
use std::sync::atomic::{AtomicBool, Ordering};
331+
use std::sync::Arc;
178332

179333
#[derive(Clone)]
180334
struct AddClient(TypedClient);
@@ -193,6 +347,7 @@ mod tests {
193347

194348
#[test]
195349
fn test_client_terminates() {
350+
crate::logger::init_log();
196351
let mut handler = IoHandler::new();
197352
handler.add_method("add", |params: Params| {
198353
let (a, b) = params.parse::<(u64, u64)>()?;
@@ -215,4 +370,69 @@ mod tests {
215370
});
216371
tokio::run(fut);
217372
}
373+
374+
#[test]
375+
fn should_handle_subscription() {
376+
crate::logger::init_log();
377+
// given
378+
let mut handler = PubSubHandler::<local::LocalMeta, _>::default();
379+
let called = Arc::new(AtomicBool::new(false));
380+
let called2 = called.clone();
381+
handler.add_subscription(
382+
"hello",
383+
("subscribe_hello", |params, _meta, subscriber: Subscriber| {
384+
assert_eq!(params, core::Params::None);
385+
let sink = subscriber
386+
.assign_id(SubscriptionId::Number(5))
387+
.expect("assigned subscription id");
388+
std::thread::spawn(move || {
389+
for i in 0..3 {
390+
std::thread::sleep(std::time::Duration::from_millis(100));
391+
let value = serde_json::json!({
392+
"subscription": 5,
393+
"result": vec![i],
394+
});
395+
sink.notify(serde_json::from_value(value).unwrap())
396+
.wait()
397+
.expect("sent notification");
398+
}
399+
});
400+
}),
401+
("unsubscribe_hello", move |id, _meta| {
402+
// Should be called because session is dropped.
403+
called2.store(true, Ordering::SeqCst);
404+
assert_eq!(id, SubscriptionId::Number(5));
405+
future::ok(core::Value::Bool(true))
406+
}),
407+
);
408+
409+
// when
410+
let (client, rpc_client) = local::connect_with_pubsub::<TypedClient, _>(handler);
411+
let received = Arc::new(std::sync::Mutex::new(vec![]));
412+
let r2 = received.clone();
413+
let fut = client
414+
.subscribe::<_, (u32,)>("subscribe_hello", (), "hello", "unsubscribe_hello", "u32")
415+
.and_then(|stream| {
416+
stream
417+
.into_future()
418+
.map(move |(result, _)| {
419+
drop(client);
420+
r2.lock().unwrap().push(result.unwrap());
421+
})
422+
.map_err(|_| {
423+
panic!("Expected message not received.");
424+
})
425+
})
426+
.join(rpc_client)
427+
.map(|(res, _)| {
428+
log::info!("ok {:?}", res);
429+
})
430+
.map_err(|err| {
431+
log::error!("err {:?}", err);
432+
});
433+
tokio::run(fut);
434+
assert_eq!(called.load(Ordering::SeqCst), true);
435+
assert!(!received.lock().unwrap().is_empty(), "Expected at least one received item.");
436+
}
437+
218438
}

0 commit comments

Comments
 (0)