Skip to content

Commit

Permalink
Some cleanup, mostly in the HTTP request representations (#211)
Browse files Browse the repository at this point in the history
1. The biggest chunk clarifies the representation of HTTP requests.
Previously the `HTTPRequest` class _might_ have a request message.
This message was really required for unary operations (and, if absent,
would be treated as empty request) and ignored for stream operations.
So now the type is split into two: a base`HTTPRequest` which has no
request message, and a `UnaryHTTPRequest` which has a non-optional
message type. Also, since everything in the framework works with
`Buffer` for messages, this changes the type of the message from
`ByteArray` to `Buffer`.
2. The next change renames some variables/parameters in the
compression stuff to make it a little easier to read.
3. Another small change tries to make `Envelope.pack` a little more
DRY.
4. The final change is a bug fix: when a gRPC operation completes,
this was treating missing trailers as a successful RPC, as if the trailers
were present and indicated a status of "ok". But that is not correct as
missing trailers in the gRPC protocol means something is definitely
wrong (even if it's a unary operation that includes the singular
response message). So now the client will consider this case to
be an RPC error.
  • Loading branch information
jhump authored Jan 31, 2024
1 parent 868181b commit 3204d07
Show file tree
Hide file tree
Showing 17 changed files with 222 additions and 176 deletions.
3 changes: 2 additions & 1 deletion library/src/main/kotlin/com/connectrpc/Interceptor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package com.connectrpc

import com.connectrpc.http.HTTPRequest
import com.connectrpc.http.HTTPResponse
import com.connectrpc.http.UnaryHTTPRequest
import okio.Buffer

/**
Expand Down Expand Up @@ -52,7 +53,7 @@ interface Interceptor {
}

class UnaryFunction(
val requestFunction: (HTTPRequest) -> HTTPRequest = { it },
val requestFunction: (UnaryHTTPRequest) -> UnaryHTTPRequest = { it },
val responseFunction: (HTTPResponse) -> HTTPResponse = { it },
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ interface CompressionPool {

/**
* Compress an outbound request message.
* @param buffer: The uncompressed request message.
* @param input: The uncompressed request message.
* @return The compressed request message.
*/
fun compress(buffer: Buffer): Buffer
fun compress(input: Buffer): Buffer

/**
* Decompress an inbound response message.
* @param buffer: The compressed response message.
* @param input: The compressed response message.
* @return The uncompressed response message.
*/
fun decompress(buffer: Buffer): Buffer
fun decompress(input: Buffer): Buffer
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,23 @@ object GzipCompressionPool : CompressionPool {
return "gzip"
}

override fun compress(buffer: Buffer): Buffer {
val gzippedSink = Buffer()
GzipSink(gzippedSink).use { source ->
source.write(buffer, buffer.size)
override fun compress(input: Buffer): Buffer {
val result = Buffer()
GzipSink(result).use { gzippedSink ->
gzippedSink.write(input, input.size)
}
return gzippedSink
return result
}

override fun decompress(buffer: Buffer): Buffer {
override fun decompress(input: Buffer): Buffer {
val result = Buffer()
if (buffer.size == 0L) return result
// We're lenient and will allow an empty payload to be
// interpreted as a compressed empty payload (even though
// it's missing the gzip format preamble/metadata).
if (input.size == 0L) return result

GzipSource(buffer).use {
while (it.read(result, Int.MAX_VALUE.toLong()) != -1L) {
GzipSource(input).use { gzippedSource ->
while (gzippedSource.read(result, Int.MAX_VALUE.toLong()) != -1L) {
// continue reading.
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ interface HTTPClientInterface {
*
* @return A function to cancel the underlying network call.
*/
fun unary(request: HTTPRequest, onResult: (HTTPResponse) -> Unit): Cancelable
fun unary(request: UnaryHTTPRequest, onResult: (HTTPResponse) -> Unit): Cancelable

/**
* Initialize a new HTTP stream.
Expand Down
112 changes: 74 additions & 38 deletions library/src/main/kotlin/com/connectrpc/http/HTTPRequest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,58 +16,94 @@ package com.connectrpc.http

import com.connectrpc.Headers
import com.connectrpc.MethodSpec
import okio.Buffer
import java.net.URL

internal object HTTPMethod {
internal const val GET = "GET"
internal const val POST = "POST"
enum class HTTPMethod(
val string: String,
) {
GET("GET"),
POST("POST"),
}

/**
* HTTP request used for sending primitive data to the server.
* HTTP request used to initiate RPCs.
*/
class HTTPRequest internal constructor(
open class HTTPRequest internal constructor(
// The URL for the request.
val url: URL,
// Value to assign to the `content-type` header.
val contentType: String,
// Additional outbound headers for the request.
val headers: Headers,
// Body data to send with the request.
val message: ByteArray? = null,
// The method spec associated with the request.
val methodSpec: MethodSpec<*, *>,
)

/**
* Clones the [HTTPRequest] with override values.
*
* Intended to make mutations for [HTTPRequest] safe for
* [com.connectrpc.Interceptor] implementation.
*/
fun HTTPRequest.clone(
// The URL for the request.
url: URL = this.url,
// Value to assign to the `content-type` header.
contentType: String = this.contentType,
// Additional outbound headers for the request.
headers: Headers = this.headers,
// The method spec associated with the request.
methodSpec: MethodSpec<*, *> = this.methodSpec,
): HTTPRequest {
return HTTPRequest(
url,
contentType,
headers,
methodSpec,
)
}

/**
* HTTP request used to initiate unary RPCs. In addition
* to RPC metadata, this also includes the request data.
*/
class UnaryHTTPRequest(
// The URL for the request.
url: URL,
// Value to assign to the `content-type` header.
contentType: String,
// Additional outbound headers for the request.
headers: Headers,
// The method spec associated with the request.
methodSpec: MethodSpec<*, *>,
// Body data for the request.
val message: Buffer,
// HTTP method to use with the request.
// Almost always POST, but side effect free unary RPCs may be made with GET.
val httpMethod: String = HTTPMethod.POST,
) {
/**
* Clones the [HTTPRequest] with override values.
*
* Intended to make mutations for [HTTPRequest] safe for
* [com.connectrpc.Interceptor] implementation.
*/
fun clone(
// The URL for the request.
url: URL = this.url,
// Value to assign to the `content-type` header.
contentType: String = this.contentType,
// Additional outbound headers for the request.
headers: Headers = this.headers,
// Body data to send with the request.
message: ByteArray? = this.message,
// The method spec associated with the request.
methodSpec: MethodSpec<*, *> = this.methodSpec,
// The HTTP method to use with the request.
httpMethod: String = this.httpMethod,
): HTTPRequest {
return HTTPRequest(
url,
contentType,
headers,
message,
methodSpec,
httpMethod,
)
}
val httpMethod: HTTPMethod = HTTPMethod.POST,
) : HTTPRequest(url, contentType, headers, methodSpec)

fun UnaryHTTPRequest.clone(
// The URL for the request.
url: URL = this.url,
// Value to assign to the `content-type` header.
contentType: String = this.contentType,
// Additional outbound headers for the request.
headers: Headers = this.headers,
// The method spec associated with the request.
methodSpec: MethodSpec<*, *> = this.methodSpec,
// Body data for the request.
message: Buffer = this.message,
// The HTTP method to use with the request.
httpMethod: HTTPMethod = this.httpMethod,
): UnaryHTTPRequest {
return UnaryHTTPRequest(
url,
contentType,
headers,
methodSpec,
message,
httpMethod,
)
}
5 changes: 3 additions & 2 deletions library/src/main/kotlin/com/connectrpc/impl/ProtocolClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import com.connectrpc.UnaryBlockingCall
import com.connectrpc.http.Cancelable
import com.connectrpc.http.HTTPClientInterface
import com.connectrpc.http.HTTPRequest
import com.connectrpc.http.UnaryHTTPRequest
import com.connectrpc.http.transform
import com.connectrpc.protocols.GETConfiguration
import kotlinx.coroutines.CompletableDeferred
Expand Down Expand Up @@ -79,12 +80,12 @@ class ProtocolClient(
} else {
requestCodec.serialize(request)
}
val unaryRequest = HTTPRequest(
val unaryRequest = UnaryHTTPRequest(
url = urlFromMethodSpec(methodSpec),
contentType = "application/${requestCodec.encodingName()}",
headers = headers,
message = requestMessage.readByteArray(),
methodSpec = methodSpec,
message = requestMessage,
)
val unaryFunc = config.createInterceptorChain()
val finalRequest = unaryFunc.requestFunction(unaryRequest)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import com.connectrpc.compression.CompressionPool
import com.connectrpc.http.HTTPMethod
import com.connectrpc.http.HTTPRequest
import com.connectrpc.http.HTTPResponse
import com.connectrpc.http.UnaryHTTPRequest
import com.connectrpc.http.clone
import com.connectrpc.toLowercase
import com.squareup.moshi.Moshi
import okio.Buffer
Expand Down Expand Up @@ -67,15 +69,11 @@ internal class ConnectInterceptor(
requestHeaders[USER_AGENT] = listOf("connect-kotlin/${ConnectConstants.VERSION}")
}
val requestCompression = clientConfig.requestCompression
val requestMessage = Buffer()
if (request.message != null) {
requestMessage.write(request.message)
}
val finalRequestBody = if (requestCompression?.shouldCompress(requestMessage) == true) {
val finalRequestBody = if (requestCompression?.shouldCompress(request.message) == true) {
requestHeaders.put(CONTENT_ENCODING, listOf(requestCompression.compressionPool.name()))
requestCompression.compressionPool.compress(requestMessage)
requestCompression.compressionPool.compress(request.message)
} else {
requestMessage
request.message
}
if (shouldUseGETRequest(request, finalRequestBody)) {
constructGETRequest(request, finalRequestBody, requestCompression)
Expand All @@ -84,8 +82,8 @@ internal class ConnectInterceptor(
url = request.url,
contentType = request.contentType,
headers = requestHeaders,
message = finalRequestBody.readByteArray(),
methodSpec = request.methodSpec,
message = finalRequestBody,
)
}
},
Expand Down Expand Up @@ -153,7 +151,6 @@ internal class ConnectInterceptor(
url = request.url,
contentType = request.contentType,
headers = requestHeaders,
message = request.message,
methodSpec = request.methodSpec,
)
},
Expand Down Expand Up @@ -196,10 +193,10 @@ internal class ConnectInterceptor(
}

private fun constructGETRequest(
request: HTTPRequest,
request: UnaryHTTPRequest,
finalRequestBody: Buffer,
requestCompression: RequestCompression?,
): HTTPRequest {
): UnaryHTTPRequest {
val serializationStrategy = clientConfig.serializationStrategy
val requestCodec = serializationStrategy.codec(request.methodSpec.requestClass)
val url = getUrlFromMethodSpec(
Expand Down
27 changes: 12 additions & 15 deletions library/src/main/kotlin/com/connectrpc/protocols/Envelope.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,23 @@ class Envelope {
* @param compressionMinBytes The minimum bytes the source needs to be in order to be compressed.
*/
fun pack(source: Buffer, compressionPool: CompressionPool? = null, compressionMinBytes: Int? = null): Buffer {
val flags: Int
val payload: Buffer
if (compressionMinBytes == null ||
source.size < compressionMinBytes ||
compressionPool == null
) {
return source.use {
val result = Buffer()
result.writeByte(0)
result.writeInt(source.buffer.size.toInt())
result.writeAll(source)
result
}
}
return source.use { buffer ->
val result = Buffer()
result.writeByte(1)
val compressedBuffer = compressionPool.compress(buffer)
result.writeInt(compressedBuffer.size.toInt())
result.writeAll(compressedBuffer)
result
flags = 0
payload = source
} else {
flags = 1
payload = compressionPool.compress(source)
}
val result = Buffer()
result.writeByte(flags)
result.writeInt(payload.buffer.size.toInt())
result.writeAll(payload)
return result
}

/**
Expand Down
Loading

0 comments on commit 3204d07

Please sign in to comment.