Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix all conformance failures other than timeouts/deadlines #274

Merged
merged 6 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions conformance/client/known-failing-stream-cases.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,3 @@ Timeouts/HTTPVersion:2/**/bidi-stream/**

# Deadline headers are not currently set.
Deadline Propagation/**

# Bug: incorrect code attribution for these failures (UNKNOWN instead of INTERNAL)
Connect Unexpected Responses/**/unexpected-stream-codec
11 changes: 0 additions & 11 deletions conformance/client/known-failing-unary-cases.txt
Original file line number Diff line number Diff line change
@@ -1,13 +1,2 @@
# Deadline headers are not currently set.
Deadline Propagation/**

# Bug: response content-type is not correctly checked
**/unexpected-content-type

# Bug: "trailers-only" responses are not correctly identified.
# If headers contain "grpc-status", this client assumes it is a
# trailers-only response. However, a trailers-only response should
# instead be identified by lack of body or HTTP trailers.
gRPC Unexpected Responses/**/trailers-only/*
gRPC-Web Unexpected Responses/**/trailers-only/ignore-header-if-body-present

3 changes: 2 additions & 1 deletion library/src/main/kotlin/com/connectrpc/AnyError.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ package com.connectrpc
import okio.ByteString

/**
* This is an Any adapter for various base data types.
* This is a protobuf-runtime-agnostic representation of google.protobuf.Any
* messages, which are used to represent error details in gRPC.
*/
class AnyError(
val typeUrl: String,
Expand Down
2 changes: 1 addition & 1 deletion library/src/main/kotlin/com/connectrpc/Code.kt
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ enum class Code(val codeName: String, val value: Int) {
ABORTED("aborted", 10),
OUT_OF_RANGE("out_of_range", 11),
UNIMPLEMENTED("unimplemented", 12),
INTERNAL_ERROR("internal", 13),
INTERNAL_ERROR("internal", 13), // TODO: rename enum value to INTERNAL
UNAVAILABLE("unavailable", 14),
DATA_LOSS("data_loss", 15),
UNAUTHENTICATED("unauthenticated", 16),
Expand Down
1 change: 1 addition & 0 deletions library/src/main/kotlin/com/connectrpc/Codec.kt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const val codecNameJSON = CODEC_NAME_JSON
* Defines a type that is capable of encoding and decoding messages using a specific format.
*/
interface Codec<E> {
// TODO: remove this method or unify somehow with SerializationStrategy.serializationName?
/**
* @return The name of the codec's format (e.g., "json", "proto"). Usually consumed
* in the form of adding the `content-type` header via "application/{name}".
Expand Down
6 changes: 4 additions & 2 deletions library/src/main/kotlin/com/connectrpc/ErrorDetailParser.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ import kotlin.reflect.KClass
*/
interface ErrorDetailParser {
/**
* Unpack the underlying payload into the input class type.
* Unpack the given Any payload into the input class type.
*/
fun <E : Any> unpack(any: AnyError, clazz: KClass<E>): E?

/**
* Parse payload for a list of error details.
* Parse the given bytes for a list of error details. The given
* bytes will be the serialized form of a google.rpc.Status
* Protobuf message.
*/
fun parseDetails(bytes: ByteArray): List<ConnectErrorDetail>
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import kotlin.reflect.KClass
interface SerializationStrategy {

/**
* The name of the serialization. Used in the content-encoding
* The name of the serialization. Used in the content-type
* header.
*/
fun serializationName(): String
Expand Down
3 changes: 3 additions & 0 deletions library/src/main/kotlin/com/connectrpc/StreamResult.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ package com.connectrpc
sealed class StreamResult<Output> {
// Headers have been received over the stream.
class Headers<Output>(val headers: com.connectrpc.Headers) : StreamResult<Output>() {
// TODO: This should include an HTTP status code, too. Computing an RPC code
// from the HTTP status code should be part of the protocol impl, not
// pushed down to the HTTPClientInterface impl.
override fun toString(): String {
return "Headers{headers=$headers}"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package com.connectrpc.protocols

const val ACCEPT_ENCODING = "accept-encoding"
const val CONTENT_ENCODING = "content-encoding"
const val CONTENT_TYPE = "content-type"
const val CONNECT_STREAMING_CONTENT_ENCODING = "connect-content-encoding"
const val CONNECT_STREAMING_ACCEPT_ENCODING = "connect-accept-encoding"
const val CONNECT_PROTOCOL_VERSION_KEY = "connect-protocol-version"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,37 +94,53 @@ internal class ConnectInterceptor(
}
val trailers = mutableMapOf<String, List<String>>()
trailers.putAll(response.headers.toTrailers())
trailers.putAll(response.trailers)
val responseHeaders =
val headers =
response.headers.filter { entry -> !entry.key.startsWith("trailer-") }
val compressionPool = clientConfig.compressionPool(responseHeaders[CONTENT_ENCODING]?.first())
val compressionPool = clientConfig.compressionPool(headers[CONTENT_ENCODING]?.first())
val responseBody = try {
compressionPool?.decompress(response.message.buffer) ?: response.message.buffer
} catch (e: Exception) {
return@UnaryFunction response.clone(
message = Buffer(),
headers = responseHeaders,
headers = headers,
trailers = trailers,
cause = ConnectException(
code = Code.INTERNAL_ERROR,
message = e.message,
exception = e,
metadata = headers.plus(trailers),
),
)
}
val contentType = headers[CONTENT_TYPE]?.first() ?: ""
val exception: ConnectException?
val message: Buffer
if (response.status != 200) {
exception = parseConnectUnaryException(response.status, responseHeaders.plus(trailers), responseBody)
exception = parseConnectUnaryException(response.status, contentType, headers.plus(trailers), responseBody)
// We've already read the response body to parse an error - don't read again.
message = Buffer()
} else {
exception = null
message = responseBody
val isValidContentType =
(serializationStrategy.serializationName() == "json" && contentTypeIsJSON(contentType)) ||
contentType == "application/" + serializationStrategy.serializationName()
if (isValidContentType) {
exception = null
} else {
// If content-type looks like it could be an RPC server's response, consider
// this an internal error. Otherwise, we infer a code from the HTTP status,
// which means a code of UNKNOWN since HTTP status is 200.
val code = if (contentType.startsWith("application/")) Code.INTERNAL_ERROR else Code.UNKNOWN
exception = ConnectException(
code = code,
message = "unexpected content-type: $contentType",
metadata = headers.plus(trailers),
)
}
}
response.clone(
message = message,
headers = responseHeaders,
headers = headers,
trailers = trailers,
cause = exception,
)
Expand Down Expand Up @@ -161,9 +177,25 @@ internal class ConnectInterceptor(
val streamResult: StreamResult<Buffer> = res.fold(
onHeaders = { result ->
responseHeaders = result.headers
responseCompressionPool =
clientConfig.compressionPool(responseHeaders[CONNECT_STREAMING_CONTENT_ENCODING]?.first())
StreamResult.Headers(responseHeaders)
val contentType = responseHeaders[CONTENT_TYPE]?.first() ?: ""
val isValidContentType = contentType == "application/connect+" + serializationStrategy.serializationName()
if (!isValidContentType) {
// If content-type looks like it could be an RPC server's response, consider
// this an internal error. Otherwise, we infer a code from the HTTP status,
// which means a code of UNKNOWN since HTTP status is 200.
val code = if (contentType.startsWith("application/connect+")) Code.INTERNAL_ERROR else Code.UNKNOWN
StreamResult.Complete(
ConnectException(
code = code,
message = "unexpected content-type: $contentType",
metadata = responseHeaders,
),
)
} else {
responseCompressionPool =
clientConfig.compressionPool(responseHeaders[CONNECT_STREAMING_CONTENT_ENCODING]?.first())
StreamResult.Headers(responseHeaders)
}
},
onMessage = { result ->
val (headerByte, unpackedMessage) = Envelope.unpackWithHeaderByte(
Expand Down Expand Up @@ -196,15 +228,15 @@ internal class ConnectInterceptor(
): UnaryHTTPRequest {
val serializationStrategy = clientConfig.serializationStrategy
val requestCodec = serializationStrategy.codec(request.methodSpec.requestClass)
val url = getUrlFromMethodSpec(
val url = constructURLForGETRequest(
request,
requestCodec,
finalRequestBody,
requestCompression,
)
return request.clone(
url = url,
contentType = "application/${requestCodec.encodingName()}",
contentType = "",
headers = request.headers,
methodSpec = request.methodSpec,
httpMethod = HTTPMethod.GET,
Expand Down Expand Up @@ -244,9 +276,9 @@ internal class ConnectInterceptor(
}
}

private fun parseConnectUnaryException(httpStatus: Int?, metadata: Headers, source: Buffer?): ConnectException {
private fun parseConnectUnaryException(httpStatus: Int?, contentType: String, metadata: Headers, source: Buffer?): ConnectException {
val code = Code.fromHTTPStatus(httpStatus)
if (source == null) {
if (source == null || !contentTypeIsJSON(contentType)) {
return ConnectException(code, "unexpected status code: $httpStatus")
}
return source.use { bufferedSource ->
Expand Down Expand Up @@ -298,7 +330,7 @@ private fun Headers.toTrailers(): Trailers {
return trailers
}

private fun getUrlFromMethodSpec(
private fun constructURLForGETRequest(
httpRequest: HTTPRequest,
codec: Codec<*>,
payload: Buffer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,7 @@ internal class EndStreamResponseJSON(
@Json(name = "error") val error: ErrorPayloadJSON?,
@Json(name = "metadata") val metadata: Headers?,
)

internal fun contentTypeIsJSON(contentType: String): Boolean {
return contentType == "application/json" || contentType == "application/json; charset=utf-8"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use MediaType to parse these reliably (we use this in ConnectOkHttpClient.kt)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, that is an okhttp type and helper function. But this is the library module, which intentionally does not have a dependency on okhttp.

This is what the Go implementation does, so despite being hacky, should be adequate. I'll add a TODO that this could be made more robust in the future.

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,24 @@ internal class GRPCCompletionParser(
*
* Returns an "absent" completion if unable to be parsed.
*/
internal fun parse(headers: Headers, trailers: Trailers): GRPCCompletion {
internal fun parse(headers: Headers, hasBody: Boolean, trailers: Trailers): GRPCCompletion {
val statusCode: Int
val statusMetadata: Map<String, List<String>>
val statusFromHeaders = parseStatus(headers)
val trailersOnly: Boolean
if (statusFromHeaders == null) {
statusCode = parseStatus(trailers)
?: return GRPCCompletion(
present = false,
code = Code.INTERNAL_ERROR,
message = "protocol error: status is missing from trailers",
metadata = trailers,
)
statusMetadata = trailers
trailersOnly = false
} else {
statusCode = statusFromHeaders
if (!hasBody && trailers.isEmpty()) {
statusMetadata = headers
trailersOnly = true
} else {
statusMetadata = trailers
trailersOnly = false
}
statusCode = parseStatus(statusMetadata)
?: return GRPCCompletion(
present = false,
code = Code.UNKNOWN,
message = "protocol error: status is missing from trailers",
metadata = statusMetadata,
)
// Note: we report combined headers and trailers as exception meta, so
// caller doesn't have to check both, which is particularly important
// since server could actually serialize them together in a single bucket
Expand Down
61 changes: 48 additions & 13 deletions library/src/main/kotlin/com/connectrpc/protocols/GRPCInterceptor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ internal class GRPCInterceptor(
private val completionParser = GRPCCompletionParser(serializationStrategy.errorDetailParser())
private var responseCompressionPool: CompressionPool? = null
private var responseHeaders: Headers = emptyMap()
private var streamEmpty: Boolean = true

override fun unaryFunction(): UnaryFunction {
return UnaryFunction(
Expand Down Expand Up @@ -66,30 +67,47 @@ internal class GRPCInterceptor(
if (response.cause != null) {
return@UnaryFunction response.clone(message = Buffer())
}
val headers = response.headers
if (response.status != 200) {
return@UnaryFunction response.clone(
message = Buffer(),
cause = ConnectException(
code = Code.fromHTTPStatus(response.status),
message = "unexpected status code: ${response.status}",
metadata = headers,
),
)
}
val contentType = headers[CONTENT_TYPE]?.first() ?: ""
if (!contentTypeIsExpectedGRPC(contentType, serializationStrategy.serializationName())) {
// If content-type looks like it could be a gRPC server's response, consider
// this an internal error. Otherwise, we infer a code from the HTTP status,
// which means a code of UNKNOWN since HTTP status is 200.
val code = if (contentTypeIsGRPC(contentType)) Code.INTERNAL_ERROR else Code.UNKNOWN
return@UnaryFunction response.clone(
message = Buffer(),
cause = ConnectException(
code = code,
message = "unexpected content-type: $contentType",
metadata = headers,
),
)
}
val headers = response.headers
var trailers = response.trailers
val completion = completionParser
.parse(headers, trailers)
val hasBody = !response.message.buffer.exhausted()
val completion = completionParser.parse(headers, hasBody, trailers)
if (completion.trailersOnly) {
trailers = headers // report the headers also as trailers
}
val exception = completion.toConnectExceptionOrNull(serializationStrategy)
val message = if (exception == null) {
if (response.message.buffer.exhausted()) {
if (!hasBody) {
return@UnaryFunction response.clone(
message = Buffer(),
cause = ConnectException(
code = Code.UNIMPLEMENTED,
message = "unary stream has no messages",
metadata = headers.plus(trailers),
),
)
}
Expand All @@ -105,6 +123,7 @@ internal class GRPCInterceptor(
cause = ConnectException(
code = Code.UNIMPLEMENTED,
message = "unary stream has multiple messages",
metadata = headers.plus(trailers),
),
)
}
Expand Down Expand Up @@ -137,21 +156,28 @@ internal class GRPCInterceptor(
streamResultFunction = { res ->
res.fold(
onHeaders = { result ->
val headers = result.headers
val completion = completionParser.parse(headers, emptyMap())
if (completion.present) {
responseHeaders = result.headers
val contentType = responseHeaders[CONTENT_TYPE]?.first() ?: ""
if (!contentTypeIsExpectedGRPC(contentType, serializationStrategy.serializationName())) {
// If content-type looks like it could be a gRPC server's response, consider
// this an internal error. Otherwise, we infer a code from the HTTP status,
// which means a code of UNKNOWN since HTTP status is 200.
val code = if (contentTypeIsGRPC(contentType)) Code.INTERNAL_ERROR else Code.UNKNOWN
StreamResult.Complete(
cause = completion.toConnectExceptionOrNull(serializationStrategy),
trailers = headers,
cause = ConnectException(
code = code,
message = "unexpected content-type: $contentType",
metadata = responseHeaders,
),
)
} else {
responseHeaders = headers
responseCompressionPool = clientConfig
.compressionPool(headers[GRPC_ENCODING]?.first())
StreamResult.Headers(headers)
.compressionPool(responseHeaders[GRPC_ENCODING]?.first())
StreamResult.Headers(responseHeaders)
}
},
onMessage = { result ->
streamEmpty = false
val (_, unpackedMessage) = Envelope.unpackWithHeaderByte(
result.message,
responseCompressionPool,
Expand All @@ -164,7 +190,7 @@ internal class GRPCInterceptor(
}
val trailers = result.trailers
val exception = completionParser
.parse(responseHeaders, trailers)
.parse(responseHeaders, !streamEmpty, trailers)
.toConnectExceptionOrNull(serializationStrategy)
StreamResult.Complete(
cause = exception,
Expand All @@ -190,3 +216,12 @@ internal class GRPCInterceptor(
return headers
}
}

internal fun contentTypeIsGRPC(contentType: String): Boolean {
return contentType == "application/grpc" || contentType.startsWith("application/grpc+")
}

internal fun contentTypeIsExpectedGRPC(contentType: String, expectCodec: String): Boolean {
return (expectCodec == "proto" && contentType == "application/grpc") ||
contentType == "application/grpc+$expectCodec"
}
Loading