Skip to content

Client http transport #420

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 31 commits into from
May 21, 2019
Merged

Client http transport #420

merged 31 commits into from
May 21, 2019

Conversation

ascjones
Copy link
Contributor

@ascjones ascjones commented Apr 16, 2019

Rel: paritytech/substrate#2255.

Still todo:

  • Improve ergonomics of channel impl - helper for the transport Future
    • Use a combinator e.g. join or wrap in a struct which impls Future
  • Tests/examples of http client in use
  • Fix error handling in core client, channels shouldn't be dropped because of rpc errors

@ascjones
Copy link
Contributor Author

ascjones commented May 2, 2019

I've changed my approach here: initially I copied @tomusdrw's approach of creating a pair of channels for the Sink/Stream and creating the client with those.

let (rpc_client, sender) = request_response(8, move |request: String| {
let request = Request::post(&url)
.header(http::header::CONTENT_TYPE, http::header::HeaderValue::from_static("application/json"))
.body(request.into())
.unwrap();
client
.request(request)
.map_err(|e| RpcError::Other(e.into()))
.and_then(|res| {
// TODO [ToDr] Handle non-200
res.into_body()
.map_err(|e| RpcError::ParseError(e.to_string(), e.into()))
.concat2()
.map(|chunk| String::from_utf8_lossy(chunk.as_ref()).into_owned())
})
});

pub fn request_response<F, R>(request_buffer: usize, f: F) -> (
impl Future<Item=(), Error=RpcError>,
RpcChannel,
) where
F: Fn(String) -> R,
R: Future<Item=String, Error=RpcError>,
{
let (requests, requests_rx) = mpsc::channel(request_buffer);
let (responses, responses_rx) = mpsc::channel(0);
let run= requests_rx
.map(f)
.forward(responses.sink_map_err(|_e| ()))
.map(|_| ());
let sink = requests
.sink_map_err(|e| RpcError::Other(e.into()));
let stream = responses_rx
.map_err(|()| unreachable!())
.and_then(|res| res);
let (rpc_client, sender) = RpcClient::with_channel(sink, stream);
let client = run
.map_err(|()| RpcError::Other(format_err!("Transport error"))) // todo: [AJ] check unifying of error types
.join(rpc_client)
.map(|((), ())| ());
(client, sender)
}

However this ran into problems where HTTP errors would not be returned to the caller, because there is no way to correlate the error response with the sender in RpcClient, since the request Id is only available in a successful response from the server.

So I've created a "standalone" HTTP transport, which can handle the errors properly since it is request/response instead of duplex.

I've extracted the former RpcClient into transports and renamed it Duplex. Shared functionality between that and the HTTP transport (id assignment, request building. response handling) I've extracted out into some helpers.

Still a bit more work to do on this - but would be good to have some feedback to see whether this is the right direction.

@ascjones ascjones marked this pull request as ready for review May 2, 2019 15:13
@dvc94ch
Copy link
Contributor

dvc94ch commented May 3, 2019

However this ran into problems where HTTP errors would not be returned to the caller, because there is no way to correlate the error response with the sender in RpcClient, since the request Id is only available in a successful response from the server.

What happens currently? Is the error at least logged?

I see it's logged in the ws client which is suboptimal.

@ascjones
Copy link
Contributor Author

ascjones commented May 3, 2019

I think it would only be logged if you attach a logger to the client future error e.g. https://github.com/paritytech/substrate/pull/2255/files#diff-74c6a3e10a271935aa30a9981463e681R76

Copy link
Contributor

@tomusdrw tomusdrw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good after the first glance, couple of small nits.

channel: Some(channel),
outgoing: VecDeque::new(),
}
impl Into<RpcChannel> for mpsc::Sender<RpcMessage> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should rather implement From:

Library authors should not directly implement this[Into] trait, but should prefer implementing the From trait, which offers greater flexibility and provides an equivalent Into implementation for free, thanks to a blanket implementation in the standard library.

}

/// Creates a new `Duplex`, along with a channel to communicate
pub fn with_channel(sink: TSink, stream: TStream) -> (Self, RpcChannel) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method is a bit confusing, but seeing with_channel I think I was expecting the channel to be passed as parameter instead, I'd swap the names.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've pulled this out to a public function duplex. Not sure about the name but perhaps more idiomatic for creating the (Duplex, RpcChannel) tuple.

if self.channel.is_none() {
break;
}
let msg = match self.channel.as_mut().expect("channel is some; qed").poll() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could match on self.channel.as_mut() to avoid expect

break;
}
Ok(Async::NotReady) => break,
Err(()) => continue,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That will lead to an endless loop, why do we continue if the Sender is dropped? We should rather resolve with an error after processing the outgoing queue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems the futures impl of mpsc returns Ready(None) when the channel is closed - which is handled above.

And indeed that is documented as the expected behaviour for Stream, and for future versions.

// Handle outgoing rpc requests.
loop {
match self.outgoing.pop_front() {
Some(request) => match self.sink.start_send(request)? {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be easier to use send_all instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Task in #432

Some(tx) => tx
.send(result)
.map_err(|_| RpcError::Other(format_err!("oneshot channel closed")))?,
None => Err(RpcError::UnknownId)?,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it should be an error - this means the server can terminate the entire stream by sending a malformed response. I'd rather handle that gracefuly with a warning and continue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Task in #432

debug!("client finished");
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is correct, are we sure that in every case where we go into this branch we poll something upstream? Otherwise the task is not going to be woken up, I was looking at different cases, but it seems correct, so just leaving a notice here :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Task in #432

http::header::HeaderValue::from_static("application/json"),
)
.body(request.into())
.unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could do with a proof

.into_iter()
.map(|output| {
let id = output.id().clone();
let value: Result<Value, Error> = output.into();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This conversion here might be a bit wasteful, can't we have some more strict type than Value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasteful in what way? The output contains the json Value and it is just transformed to a Result

Output::Success(s) => Ok(s.result),

@ascjones
Copy link
Contributor Author

I've opened an issue to address some of the points with duplex. I think those changes will be better as a separate PR since I've just moved/renamed that code in this PR. #432

@tomusdrw tomusdrw merged commit 8c760e3 into master May 21, 2019
@tomusdrw tomusdrw deleted the aj-client-http-transport branch May 21, 2019 07:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants