Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide way to auto-close streams #213

Merged
merged 3 commits into from
Feb 6, 2024
Merged

Provide way to auto-close streams #213

merged 3 commits into from
Feb 6, 2024

Conversation

jhump
Copy link
Member

@jhump jhump commented Jan 31, 2024

This is mostly just me riffing on stuff that would be more idiomatic Kotlin.

In particular, this makes the three stream types Closeable. They aren't java.io.Closeable because I want the close to be suspending function. (Not currently necessary, but I think it might be in the future to implement some other ideas I have.)

Anyhow, this also introduces extension functions that let you execute an RPC and use the stream in a block, so that the stream is automatically closed for you. I think that works especially well for the ServerStream, which could actually throw before you get back a stream (since it could fail to send the initial request). So, without something like this, it could be gnarly with a separate try/catch on the main invocation and then another try/finally around the stream operations. (Admittedly, most client code probably won't look quite like the conformance client, which is trying to capture a lot of info to package back into response to send to test runner...)

This is still all just in the conformance client. Once we're happy with the shape of the APIs there, I'd like to re-do the actual interfaces in the main connectrpc package. So I consider the internal APIs of the conformance "adapter" package my playground :)

Anyhow, just figured I'd explore a little and see what you think.

@jhump jhump requested a review from pkwarren January 31, 2024 21:53
@kohenkatz
Copy link
Contributor

I think that works especially well for the ServerStream, which could actually throw before you get back a stream (since it could fail to send the initial request). So, without something like this, it could be gnarly with a separate try/catch on the main invocation and then another try/finally around the stream operations.

This sounds like a noble goal. Is it also true for bidirectional streams? I'm looking forward to seeing where this PR goes.

@jhump
Copy link
Member Author

jhump commented Feb 1, 2024

Is it also true for bidirectional streams?

If you are referring to the point about it throwing an exception before you get back a stream, no.

In fact, with the current interfaces (e.g. com.connectrpc.ServerOnlyStreamInterface, this is not an issue because (1) you get back the stream first, and then must send the one and only request and (2) the send method returns Result<Unit> instead of throwing.

However, I'm exploring some alternate interface shapes in the conformance client (link). In these, the call to get a server stream accepts the singular request message and sends it before returning the stream (link). Also, the send methods in these interfaces return Unit and throw on error instead of returning Result<Unit>, because I've found Result<Unit> to be error-prone: because it has no actual return value (just Unit), it is easy to fail to check if the result was successful or not.

Anyhow, @kohenkatz, I'm interested if you have any thoughts or feedback to provide. You can see these new interfaces in use in the conformance client code (sadly, the actual logic in there is pretty boring, but hopefully you can get an idea for the feel of these APIs).

@kohenkatz
Copy link
Contributor

As you noted at the top:

most client code probably won't look quite like the conformance client

... so it is a little difficult to generalize that code back to what we might see in a real app.

That said, I like what I'm seeing here so far. Some of our developers are very bad at error handling and cleaning up - they tend to assume that everything will always be successful - so any change that looks like it would simplify that is welcome.

@jhump
Copy link
Member Author

jhump commented Feb 1, 2024

One thing I am considering is, instead of an extension function that takes a block, to instead have the primary RPC methods (in the generated client stubs) take a block. That way, you must do all of the operations on the stream in a block, so there's no risk that the stream is leaked, even in the face of exceptions. But I'm worried there might be some uses of long-lived streams that the block might make awkward and clunky. @kohenkatz, @pkwarren, any thoughts or concerns with an approach like that?

@kohenkatz
Copy link
Contributor

But I'm worried there might be some uses of long-lived streams that the block might make awkward and clunky.

We use long-lived streams in two ways that I can think of offhand. In both of these cases, we also have logic to reconnect the stream, since we find that our users have very poor Internet connection quality and/or are moving between different network connections (Wi-Fi and cellular).

  1. When a multi-part task is being performed by several users, a server stream is used to keep all users updated about which parts of the task have been performed by another user. For this use-case, the block should be fine, since it is just dispatching UI updates which are handled on a different thread anyway. Additionally, reconnection means just re-executing the same block with the new connection.
  2. When a user is having a video or chat conversation, a bidirectional stream is used for the media (e.g., chat message text, WebRTC stream setup information). For this use-case, I'm not sure how well a block would work, since the sending side of the connection needs to be available to send messages on it. Effectively, this means that the first action inside the block needs to be passing a reference out of the block back into the parent object.

In truth, in the second case, there is probably a way to use a block eloquently as well, but it might require some more work. The issue with the description I gave above is that the application anyway needs a way to buffer messages that are waiting to be sent while the connection is being (re)established. This probably means that the best way to handle sending messages is to push them into some type of queue/flow/channel and then read from that queue inside the block.

Perhaps for long-lived bidirectional streams, it might make sense to have a version that requires a block for handling received messages and listens to some kind of queue for sending?

@jhump
Copy link
Member Author

jhump commented Feb 1, 2024

@pkwarren pointed out to me that the Ktor library also has a block-based API, even for full-duplex bidirectional calls like web sockets: https://ktor.io/docs/getting-started-ktor-client-chat.html#wire-it-together

One thing interesting is that the receiver for the block in that library is CoroutineScope, which is convenient since it lets you readily use launch or async from inside the block.

For the case you describe, I think the channel approach is the way to go. I think it would look something like so (psuedo-code-ish, so there may be typos and it probably wouldn't compile):

val requestChannel = Channel<Request>()

// TODO: Capture result of this as Deferred<T> so we
//       can do something with its result and also
//       take action if it fails. (Though if it fails
//       while requestChannel is still in use, the
//       channel will throw an exception.)
async {
    client.someBidiMethod(headers) { stream ->
        // separate coroutine to send requests from the channel to the server
        val sendJob = launch {
            try {
                for (req in requestChannel) {
                    stream.requests.send(req)
                }
            } catch (ex: Throwable) {
                // we can't accept any more requests
                requestChannel.close(ex)
            } finally {
                stream.requests.close()
            }
        }

        var failure: Throwable? = null
        try {
            for (resp in stream.responses.messages) {
                // process each response
            }
        } catch (ex: Throwable) {
            failure = ex
            throw ex
        } finally {
            // no-op if channel already closed or job already finished
            requestChannel.close(failure)
            sendJob.cancel()
        }
    }
}

// caller can then write requests to the channel, and they get sent on the stream
return requests 

WDYT? That doesn't seem too bad.

…ected, make block use CoroutineScope as this for easy access to launch and async
@jhump
Copy link
Member Author

jhump commented Feb 2, 2024

Based on that last comment, I pushed another commit. The block-based methods are now the way to use the streams. (The non-block-based methods are now protected and implementation details of the others.) Based on what Ktor was doing, I've also made the block receiver CoroutineScope, so it's easier to use launch and async from within the block.

Again, this is just for the conformance client for now. But I think this is starting to look pretty good for maybe applying to the interfaces in com.connectrpc (which would be a backwards-incompatible API change, but I think worth it as a big improvement on the API IMO).

@kohenkatz
Copy link
Contributor

@jhump I like this approach with a channel and a block. It forces the stream-handling to be done in one place, instead of being spread around in many places.

package com.connectrpc.conformance.client.adapt

// Like java.io.Closeable, but the close operation is suspendable.
interface Closeable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This all looks great - might just want to name this SuspendCloseable or AsyncCloseable or something like that so people aren't confused with multiple Closeable types.

@jhump jhump merged commit 3f69420 into main Feb 6, 2024
7 checks passed
@jhump jhump deleted the jh/improvements branch February 6, 2024 01:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants