Skip to content

Commit

Permalink
get half-duplex bidi stream operations working - timeouts aren't work…
Browse files Browse the repository at this point in the history
…ing though(?)
  • Loading branch information
jhump committed Jan 12, 2024
1 parent 0df45eb commit 798285b
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 52 deletions.
4 changes: 1 addition & 3 deletions conformance/client/lite-stream-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,5 @@ features:
# This config file only runs stream RPC test cases.
- STREAM_TYPE_CLIENT_STREAM
- STREAM_TYPE_SERVER_STREAM
#- STREAM_TYPE_HALF_DUPLEX_BIDI_STREAM
- STREAM_TYPE_HALF_DUPLEX_BIDI_STREAM
#- STREAM_TYPE_FULL_DUPLEX_BIDI_STREAM
#supportsTlsClientCerts: true
supportsHalfDuplexBidiOverHttp1: true
2 changes: 0 additions & 2 deletions conformance/client/lite-unary-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,3 @@ features:
# so that we can run them all three ways: suspend,
# callback, and blocking.
- STREAM_TYPE_UNARY
supportsTlsClientCerts: true
supportsHalfDuplexBidiOverHttp1: true
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import com.connectrpc.conformance.client.adapt.ClientCompatRequest.StreamType
import com.connectrpc.conformance.client.adapt.ClientResponseResult
import com.connectrpc.conformance.client.adapt.ClientStreamClient
import com.connectrpc.conformance.client.adapt.Invoker
import com.connectrpc.conformance.client.adapt.ResponseStream
import com.connectrpc.conformance.client.adapt.ServerStreamClient
import com.connectrpc.conformance.client.adapt.UnaryClient
import com.connectrpc.impl.ProtocolClient
Expand Down Expand Up @@ -215,32 +216,7 @@ class Client(
delay(cancel.millis.toLong())
stream.close()
}
val payloads : MutableList<MessageLite> = mutableListOf()
var connEx : ConnectException? = null
var trailers : Headers
try {
if (cancel is Cancel.AfterNumResponses && cancel.num == 0) {
stream.close()
}
for (resp in stream.messages()) {
payloads.add(payloadExtractor(resp))
if (cancel is Cancel.AfterNumResponses && payloads.size == cancel.num) {
stream.close()
}
}
trailers = stream.trailers()
} catch (ex: ConnectException) {
connEx = ex
trailers = ex.metadata
} finally {
stream.close()
}
return ClientResponseResult(
headers = stream.headers(),
payloads = payloads,
error = connEx,
trailers = trailers,
)
return streamResult(0, stream, cancel)
}

private suspend fun <Req : MessageLite, Resp : MessageLite> handleBidi(
Expand All @@ -261,7 +237,36 @@ class Client(
client: BidiStreamClient<Req, Resp>,
req: ClientCompatRequest,
): ClientResponseResult {
TODO("implement me")
val stream = client.execute(req.requestHeaders)
var numUnsent = 0
for (i in req.requestMessages.indices) {
if (req.requestDelayMs > 0) {
delay(req.requestDelayMs.toLong())
}
val msg = fromAny(req.requestMessages[i], client.reqTemplate, BIDI_STREAM_REQUEST_NAME)
try {
stream.requests.send(msg)
} catch (_: Exception) {
numUnsent = req.requestMessages.size - i
break
}
}
val cancel = req.cancel
when (cancel) {
is Cancel.BeforeCloseSend -> {
stream.responses.close() // cancel
stream.requests.close() // close send
}
is Cancel.AfterCloseSendMs -> {
stream.requests.close() // close send
delay(cancel.millis.toLong())
stream.responses.close() // cancel
}
else -> {
stream.requests.close() // close send
}
}
return streamResult(numUnsent, stream.responses, cancel)
}

private suspend fun <Req : MessageLite, Resp : MessageLite> handleFullDuplexBidi(
Expand Down Expand Up @@ -298,6 +303,36 @@ class Client(
}
}

private suspend fun streamResult(numUnsent: Int, stream: ResponseStream<out MessageLite>, cancel: Cancel?): ClientResponseResult {
val payloads : MutableList<MessageLite> = mutableListOf()
var connEx : ConnectException? = null
var trailers : Headers
try {
if (cancel is Cancel.AfterNumResponses && cancel.num == 0) {
stream.close()
}
for (resp in stream.messages) {
payloads.add(payloadExtractor(resp))
if (cancel is Cancel.AfterNumResponses && cancel.num == payloads.size) {
stream.close()
}
}
trailers = stream.trailers()
} catch (ex: ConnectException) {
connEx = ex
trailers = ex.metadata
} finally {
stream.close()
}
return ClientResponseResult(
headers = stream.headers(),
payloads = payloads,
error = connEx,
trailers = trailers,
numUnsentRequests = numUnsent,
)
}

private fun getClient(req: ClientCompatRequest): Pair<OkHttpClient, ProtocolClient> {
// TODO: cache/re-use clients instead of creating a new one for every request
val serializationStrategy = serializationFactory(req.codec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,18 @@ abstract class BidiStreamClient<Req : MessageLite, Resp : MessageLite>(
* @param Resp The response message type
*/
interface BidiStream<Req : MessageLite, Resp : MessageLite> {
fun requests(): RequestStream<Req>
fun responses(): ResponseStream<Resp>
val requests: RequestStream<Req>
val responses: ResponseStream<Resp>
companion object {
fun <Req : MessageLite, Resp : MessageLite> new(underlying: BidirectionalStreamInterface<Req, Resp>): BidiStream<Req, Resp> {
val reqStream = RequestStream.new(underlying)
val respStream = ResponseStream.new(underlying)
return object : BidiStream<Req, Resp> {
override fun requests(): RequestStream<Req> {
return reqStream
}
override val requests: RequestStream<Req>
get() = reqStream

override fun responses(): ResponseStream<Resp> {
return respStream
}
override val responses: ResponseStream<Resp>
get() = respStream
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import kotlinx.coroutines.channels.ReceiveChannel
* @param Resp The response message type
*/
interface ResponseStream<Resp : MessageLite> {
fun messages(): ReceiveChannel<Resp>
val messages: ReceiveChannel<Resp>

suspend fun headers(): Headers

Expand All @@ -41,9 +41,8 @@ interface ResponseStream<Resp : MessageLite> {
companion object {
fun <Req : MessageLite, Resp : MessageLite> new(underlying: BidirectionalStreamInterface<Req, Resp>): ResponseStream<Resp> {
return object : ResponseStream<Resp> {
override fun messages(): ReceiveChannel<Resp> {
return underlying.responseChannel()
}
override val messages: ReceiveChannel<Resp>
get() = underlying.responseChannel()

override suspend fun headers(): Headers {
return underlying.responseHeaders().await()
Expand All @@ -61,9 +60,8 @@ interface ResponseStream<Resp : MessageLite> {

fun <Req : MessageLite, Resp : MessageLite> new(underlying: ServerOnlyStreamInterface<Req, Resp>): ResponseStream<Resp> {
return object : ResponseStream<Resp> {
override fun messages(): ReceiveChannel<Resp> {
return underlying.responseChannel()
}
override val messages: ReceiveChannel<Resp>
get() = underlying.responseChannel()

override suspend fun headers(): Headers {
return underlying.responseHeaders().await()
Expand Down
4 changes: 1 addition & 3 deletions conformance/client/standard-stream-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,5 @@ features:
# This config file only runs stream RPC test cases.
- STREAM_TYPE_CLIENT_STREAM
- STREAM_TYPE_SERVER_STREAM
#- STREAM_TYPE_HALF_DUPLEX_BIDI_STREAM
- STREAM_TYPE_HALF_DUPLEX_BIDI_STREAM
#- STREAM_TYPE_FULL_DUPLEX_BIDI_STREAM
#supportsTlsClientCerts: true
supportsHalfDuplexBidiOverHttp1: true
2 changes: 0 additions & 2 deletions conformance/client/standard-unary-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,3 @@ features:
# so that we can run them all three ways: suspend,
# callback, and blocking.
- STREAM_TYPE_UNARY
supportsTlsClientCerts: true
supportsHalfDuplexBidiOverHttp1: true

0 comments on commit 798285b

Please sign in to comment.