Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement stream operations in the conformance client #196

Merged
merged 7 commits into from
Jan 16, 2024

Conversation

jhump
Copy link
Member

@jhump jhump commented Jan 12, 2024

I've got each stream type in its own commit, to hopefully make this easier to review.

We still don't run the conformance tests for stream operations in CI because there are some flaky failures that I need to troubleshoot. In some circumstances, the reference servers are sending back unexpected errors for server stream RPCs, so I'll need to use a debugger with the reference server Go code to figure it out.

I want to dig a little bit more into the timeout issues with bidi RPCs, but I think this is a real bug in okhttp and, if so, will file an issue.

Comment on lines -23 to -24
supportsTlsClientCerts: true
supportsHalfDuplexBidiOverHttp1: true
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed the line about client certs since this property defaults to false. An error is occurring when this code tries to instantiate a client cert from the data in the conformance request. I haven't dug into it, and decided to just omit it for now.

It turns out that we actually can't do half-duplex stuff over HTTP 1.1, because we don't actually know ahead of time, for a given bidi RPC, if it's half-duplex or full-duplex (in the framework, based on generated protobuf metadata). But the request body must be marked as "duplex" if it might be full-duplex. So have to mark the body as duplex for all bidi RPCs, and okhttp then disallows them to be used with HTTP 1.1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to comment these properties out with a TODO to know to come back to them instead of removing them? Up to you but might help identify remaining work since it sounds like there are several things to track down.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I think we can leave the supportsHalfDuplexBidiOverHttp1 out since there's no realistic way of changing that. But, yes, I should add back supportsTlsClientCerts with a TODO to get them working.

Comment on lines +47 to +48
val requests: RequestStream<Req>
val responses: ResponseStream<Resp>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed a few things in these interfaces to be property getters instead of methods since they really are lightweight properties.

@@ -46,6 +46,7 @@ abstract class ClientStreamClient<Req : MessageLite, Resp : MessageLite>(
interface ClientStream<Req : MessageLite, Resp : MessageLite> {
suspend fun send(req: Req)
suspend fun closeAndReceive(): ResponseMessage<Resp>
suspend fun cancel()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to add this in order to expose the ability to cancel a client-stream RPC. I could have also called it close for consistency with other methods. What do you think? Should I change it?

TBH, I kind of don't like that the methods that cancel the RPC are named like "receive close" / "close response", since that's not very clear as to what they're really doing. Maybe when overhauling these interfaces (more-or-less making them look like the interfaces in the conformance client's adapt package), we can revisit some of the names and name this operation cancel instead of close 🤷.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine for now - maybe there's a better API we could put together that would hook into coroutine cancelation instead of having to expose separate methods for this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is already hooked into coroutine cancellation:

But, I'm not sure I 100% understand exactly how that actually works. I don't think that alone does what we want it to do: the continuation in question for streaming simply returns the bidirectional stream object. So I think the only thing that this would cancel is the actual call to the streaming RPC method. Once it returns the stream object, that continuation is complete and the cancellation no longer does anything.

So I guess we'd have to do something similar in the app code, to tie its coroutine's cancellation to cancelling the stream (which I think means we still need a cancel method of some sort). I think it would be challenging to somehow have the framework automatically manage coroutine lifetimes and tie them to the stream since ownership is unclear -- i.e. it's possible that one coroutine could pass the stream to another that runs on a different thread/worker, in which case it's unclear what coroutine cancellation semantics should be.

@@ -71,7 +74,8 @@ enum class Code(val codeName: String, val value: Int) {
if (value == null) {
return UNKNOWN
}
return values().first { code -> code.value == value }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would throw if the given value was not known, which seemed wrong.

@@ -175,7 +172,8 @@ private class ResponseCallback(
}
}

internal class PipeDuplexRequestBody(
internal class PipeRequestBody(
private val duplex: Boolean,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was necessary in order to do client or server streaming over HTTP 1.1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! I expected this might be a bit more tricky - especially for client streaming calls.

@jhump jhump requested a review from pkwarren January 12, 2024 17:49
Comment on lines -23 to -24
supportsTlsClientCerts: true
supportsHalfDuplexBidiOverHttp1: true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to comment these properties out with a TODO to know to come back to them instead of removing them? Up to you but might help identify remaining work since it sounds like there are several things to track down.

@@ -46,6 +46,7 @@ abstract class ClientStreamClient<Req : MessageLite, Resp : MessageLite>(
interface ClientStream<Req : MessageLite, Resp : MessageLite> {
suspend fun send(req: Req)
suspend fun closeAndReceive(): ResponseMessage<Resp>
suspend fun cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine for now - maybe there's a better API we could put together that would hook into coroutine cancelation instead of having to expose separate methods for this.

@@ -175,7 +172,8 @@ private class ResponseCallback(
}
}

internal class PipeDuplexRequestBody(
internal class PipeRequestBody(
private val duplex: Boolean,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! I expected this might be a bit more tricky - especially for client streaming calls.

@jhump jhump merged commit 30c7c69 into main Jan 16, 2024
7 checks passed
@jhump jhump deleted the jh/stream-conformance branch January 16, 2024 14:50
jhump added a commit that referenced this pull request Jan 31, 2024
This includes a fix to the recently observed flakiness in client
streams, which was introduced (in #196).

The problems are described below. The fixes are each in their own
commit, so reviewing commit-by-commit might make this PR easier to read.

1. There was an issue in the use of `Result<Unit>` as the return type
for `send` operations. Since the return type (`Unit`) is really a
void/no-return type, calling code was never checking the result. That
means that when the operation failed, it was never noticed. I'm a little
surprised that we don't have linters or warnings for calls to functions
that return `Result` where that return value is ignored.

It turns out that this was not just a problem in my calling code,
failing to check the return value for failure, but in the framework
itself: the stream wrapper in `ProtocolClient` (wrapping underlying
stream returned by `ConnectOkHttpClient`) was using an `onSend` callback
that called the underlying stream's `send`. But the `onSend` callback
simply returned `Unit` instead of `Result<Unit>`, and the method that
propagated the result wasn't checking the result and throwing.
    
I think this is the biggest commit here, and it's because I did some
overhauling of `Stream`. For one, I changed it to an interface -- mainly
so that we could apply a decorator pattern to it and
`HTTPClientInterface` (more on that in a later PR). This makes the
wrapper in `ProtocolClient` simpler -- instead of it being a full
implementation, with its own atomic booleans to guard/track the close
operations, it just delegates to the underlying implementation.
    
2. The Connect unary protocol can return a 408 status code for
"canceled" and "deadline exceeded" RPC errors. But okhttp auto-retries
this status code, even though the requests are not idempotent (i.e. even
for POST calls). This isn't an issue with the stream conformance tests,
but was noticed later after I added an extra check to the reference
server so that it catches cases where a client sends an RPC for the same
test case more than once. This commit adds a network interceptor to the
`OkHttpClient` that will identify 408 responses that look like Connect
unary responses and change their status code to 499. That is the only
way I could find to prevent the retries.

3. The recently introduced flakiness in client streams is actually a
rather severe issue. It was mainly observed in the new conformance suite
with server streams when gzip was used, because it was all due to race
conditions and the gzip operations would slow down the producer thread
just enough to tickle the issue. The problem is that the
`RequestBody.writeTo` function should not return before the request body
is finished when the request body is not duplex. But it was calling
`pipe.fold` and then immediately returning. The `fold` method swaps in a
new sink in place of the read-side of the pipe and then returns, without
waiting for the pipe's write side to complete. So now we use a
`CountDownLatch` to wait until the writer is complete (which is signaled
via a call to `close`).

4. The last issue I encountered was much less frequent, and also turned
out to be a race condition. It was caused by a concurrency bug in
`okio.Pipe` (square/okio#1412). Basically,
some duplex operations (i.e. bidi RPCs) would infrequently timeout
because, even though the stream writer had closed the pipe, the HTTP
request body incorrectly remained open. I've opened a PR with a fix in
the `okio` library, but I've also added a work-around for now in the
code here, by using extra synchronization between the calls to `write`,
`close`, and `fold`.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants