diff --git a/core/src/lib.rs b/core/src/lib.rs index be0fa7b..3a9dc34 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -57,6 +57,7 @@ #[macro_use] extern crate error_chain; +#[macro_use] extern crate futures; extern crate jsonrpc_core; #[macro_use] @@ -65,6 +66,7 @@ extern crate serde; #[cfg_attr(test, macro_use)] extern crate serde_json; +use futures::Async; use futures::future::Future; use jsonrpc_core::types::{Id, MethodCall, Params, Version}; use serde_json::Value as JsonValue; @@ -102,34 +104,91 @@ error_chain! { } } -/// A `Future` trait object. -pub type BoxFuture = Box + Send>; /// A lazy RPC call `Future`. The actual call has not been sent when an instance of this type /// is returned from a client generated by the macro in this crate. This is a `Future` that, when /// executed, performs the RPC call. -pub struct RpcRequest(BoxFuture); +pub struct RpcRequest(::std::result::Result, Option>); -impl RpcRequest { +impl RpcRequest +where + T: serde::de::DeserializeOwned + Send + 'static, + E: ::std::error::Error + Send + 'static, + F: Future, Error = E> + Send + 'static, +{ /// Consume this RPC request and run it synchronously. This blocks until the RPC call is done, /// then the result of the call is returned. pub fn call(self) -> Result { - self.0.wait() + self.wait() + } +} + +impl Future for RpcRequest +where + T: serde::de::DeserializeOwned + Send + 'static, + E: ::std::error::Error + Send + 'static, + F: Future, Error = E> + Send + 'static, +{ + type Item = T; + type Error = Error; + + fn poll(&mut self) -> futures::Poll { + match self.0 { + Ok(ref mut inner) => inner.poll(), + Err(ref mut error_option) => Err(error_option + .take() + .expect("Cannot call RpcRequest poll twice when in error state")), + } + } +} + +struct InnerRpcRequest { + transport_future: F, + id: Id, + _marker: ::std::marker::PhantomData, +} + +impl InnerRpcRequest { + fn new(transport_future: F, id: Id) -> Self { + Self { + transport_future, + id, + _marker: ::std::marker::PhantomData, + } } } -impl Future for RpcRequest { +impl Future for InnerRpcRequest +where + T: serde::de::DeserializeOwned + Send + 'static, + E: ::std::error::Error + Send + 'static, + F: Future, Error = E> + Send + 'static, +{ type Item = T; type Error = Error; fn poll(&mut self) -> futures::Poll { - self.0.poll() + let response_raw = try_ready!( + self.transport_future + .poll() + .chain_err(|| ErrorKind::TransportError) + ); + trace!( + "Deserializing {} byte response to request with id {:?}", + response_raw.len(), + self.id + ); + response::parse(&response_raw, &self.id).map(|t| Async::Ready(t)) } } + /// Trait for types acting as a transport layer for the JSON-RPC 2.0 clients generated by the /// `jsonrpc_client` macro. pub trait Transport { + /// The future type this transport returns on send operations. + type Future: Future, Error = Self::Error> + Send + 'static; + /// The type of error that this transport emits if it fails. type Error: ::std::error::Error + Send + 'static; @@ -139,7 +198,7 @@ pub trait Transport { /// Sends the given data over the transport and returns a future that will complete with the /// response to the request, or the transport specific error if something went wrong. - fn send(&self, json_data: Vec) -> BoxFuture, Self::Error>; + fn send(&self, json_data: Vec) -> Self::Future; } @@ -150,41 +209,25 @@ pub trait Transport { /// # Not intended for direct use /// This is being called from the client structs generated by the `jsonrpc_client` macro. This /// function is not intended to be used directly, only the generated structs should call this. -pub fn call_method(transport: &mut T, method: String, params: P) -> RpcRequest +pub fn call_method( + transport: &mut T, + method: String, + params: P, +) -> RpcRequest where T: Transport, P: serde::Serialize, R: serde::de::DeserializeOwned + Send + 'static, { - let raw_id = transport.get_next_id(); - let id = Id::Num(raw_id); - trace!( - "Serializing call to method \"{}\" with id {}", - method, - raw_id - ); - let request_serialization_result = serialize_request(id.clone(), method.clone(), params) + let id = Id::Num(transport.get_next_id()); + trace!("Serializing call to method \"{}\" with id {:?}", method, id); + let request_serialization_result = serialize_request(id.clone(), method, params) .chain_err(|| ErrorKind::SerializeError); match request_serialization_result { - Err(e) => RpcRequest(Box::new(futures::future::err(e))), + Err(e) => RpcRequest(Err(Some(e))), Ok(request_raw) => { - trace!( - "Sending call to method \"{}\" with id {} to transport", - method, - raw_id - ); - let future = transport - .send(request_raw) - .map_err(|e| Error::with_chain(e, ErrorKind::TransportError)) - .and_then(move |response_raw: Vec| { - trace!( - "Deserializing response to method \"{}\" with id {}", - method, - raw_id - ); - response::parse::(&response_raw, id) - }); - RpcRequest(Box::new(future)) + let transport_future = transport.send(request_raw); + RpcRequest(Ok(InnerRpcRequest::new(transport_future, id))) } } } @@ -220,19 +263,22 @@ mod tests { use super::*; use std::io; + pub type BoxFuture = Box + Send>; + /// A test transport that just echoes back a response containing the entire request as the /// result. #[derive(Clone)] struct EchoTransport; impl Transport for EchoTransport { + type Future = BoxFuture, io::Error>; type Error = io::Error; fn get_next_id(&mut self) -> u64 { 1 } - fn send(&self, json_data: Vec) -> BoxFuture, io::Error> { + fn send(&self, json_data: Vec) -> Self::Future { let json = json!({ "jsonrpc": "2.0", "id": 1, @@ -247,13 +293,14 @@ mod tests { struct ErrorTransport; impl Transport for ErrorTransport { + type Future = BoxFuture, io::Error>; type Error = io::Error; fn get_next_id(&mut self) -> u64 { 1 } - fn send(&self, _json_data: Vec) -> BoxFuture, io::Error> { + fn send(&self, _json_data: Vec) -> Self::Future { let json = json!({ "jsonrpc": "2.0", "id": 1, diff --git a/core/src/macros.rs b/core/src/macros.rs index 8989bbf..e6f8ee5 100644 --- a/core/src/macros.rs +++ b/core/src/macros.rs @@ -11,14 +11,14 @@ #[macro_export] macro_rules! jsonrpc_client { ( - $(#[$struct_doc:meta])* + $(#[$struct_attr:meta])* pub struct $struct_name:ident {$( - $(#[$doc:meta])* + $(#[$attr:meta])* pub fn $method:ident(&mut $selff:ident $(, $arg_name:ident: $arg_ty:ty)*) -> RpcRequest<$return_ty:ty>; )*} ) => ( - $(#[$struct_doc])* + $(#[$struct_attr])* pub struct $struct_name { transport: T, } @@ -30,9 +30,9 @@ macro_rules! jsonrpc_client { } $( - $(#[$doc])* + $(#[$attr])* pub fn $method(&mut $selff $(, $arg_name: $arg_ty)*) - -> $crate::RpcRequest<$return_ty> + -> $crate::RpcRequest<$return_ty, T::Future> { let method = String::from(stringify!($method)); let params = expand_params!($($arg_name,)*); diff --git a/core/src/response.rs b/core/src/response.rs index 58ee026..4a7059f 100644 --- a/core/src/response.rs +++ b/core/src/response.rs @@ -13,7 +13,7 @@ use serde_json; /// Parses a binary response into json, extracts the "result" field and tries to deserialize that /// to the desired type. -pub fn parse(response_raw: &[u8], expected_id: Id) -> Result +pub fn parse(response_raw: &[u8], expected_id: &Id) -> Result where R: serde::de::DeserializeOwned, { @@ -24,17 +24,15 @@ where ErrorKind::ResponseError("Not JSON-RPC 2.0 compatible") ); ensure!( - response.id() == &expected_id, + response.id() == expected_id, ErrorKind::ResponseError("Response id not equal to request id") ); match response { Output::Success(success) => { trace!("Received json result: {}", success.result); - serde_json::from_value::(success.result) + serde_json::from_value(success.result) .chain_err(|| ErrorKind::ResponseError("Not valid for target type")) } - Output::Failure(failure) => { - Err(ErrorKind::JsonRpcError(failure.error).into()) - } + Output::Failure(failure) => bail!(ErrorKind::JsonRpcError(failure.error)), } } diff --git a/http/src/lib.rs b/http/src/lib.rs index 52e36ee..44b3fd5 100644 --- a/http/src/lib.rs +++ b/http/src/lib.rs @@ -81,7 +81,7 @@ extern crate native_tls; use futures::{future, Future, Stream}; use futures::sync::{mpsc, oneshot}; use hyper::{Client, Request, StatusCode, Uri}; -use jsonrpc_client_core::{BoxFuture, Transport}; +use jsonrpc_client_core::Transport; use std::str::FromStr; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -119,6 +119,7 @@ error_chain! { type CoreSender = mpsc::UnboundedSender<(Request, oneshot::Sender>>)>; type CoreReceiver = mpsc::UnboundedReceiver<(Request, oneshot::Sender>>)>; + /// The main struct of the HTTP transport implementation for /// [`jsonrpc_client_core`](../jsonrpc_client_core). /// @@ -336,14 +337,15 @@ impl HttpHandle { } impl Transport for HttpHandle { + type Future = Box, Error = Self::Error> + Send>; type Error = Error; fn get_next_id(&mut self) -> u64 { self.id.fetch_add(1, Ordering::SeqCst) as u64 } - fn send(&self, json_data: Vec) -> BoxFuture, Error> { - let request = self.create_request(json_data.clone()); + fn send(&self, json_data: Vec) -> Self::Future { + let request = self.create_request(json_data); let (response_tx, response_rx) = oneshot::channel(); let future = future::result(self.request_tx.unbounded_send((request, response_tx))) .map_err(|e| {