diff --git a/packages/connect/lib/connect.dart b/packages/connect/lib/connect.dart index 19ef349..c028594 100644 --- a/packages/connect/lib/connect.dart +++ b/packages/connect/lib/connect.dart @@ -18,6 +18,7 @@ export 'src/abort.dart'; export 'src/client.dart'; export 'src/code.dart'; export 'src/codec.dart'; +export 'src/compression.dart'; export 'src/exception.dart'; export 'src/headers.dart'; export 'src/http.dart'; diff --git a/packages/connect/lib/io.dart b/packages/connect/lib/io.dart index b1b954c..b7a9ee7 100644 --- a/packages/connect/lib/io.dart +++ b/packages/connect/lib/io.dart @@ -12,4 +12,5 @@ // See the License for the specific language governing permissions and // limitations under the License. +export 'src/gzip.dart'; export 'src/io.dart'; diff --git a/packages/connect/lib/src/compression.dart b/packages/connect/lib/src/compression.dart new file mode 100644 index 0000000..a0a7d52 --- /dev/null +++ b/packages/connect/lib/src/compression.dart @@ -0,0 +1,22 @@ +// Copyright 2024 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'dart:convert'; +import 'dart:typed_data'; + +/// Compression is specialized [Codec] for a certain compression algorithm. +abstract interface class Compression implements Codec { + /// Name of the compression algorithm. + abstract final String name; +} diff --git a/packages/connect/lib/src/connect/header.dart b/packages/connect/lib/src/connect/header.dart index c2ef820..a7a623e 100644 --- a/packages/connect/lib/src/connect/header.dart +++ b/packages/connect/lib/src/connect/header.dart @@ -14,6 +14,7 @@ import '../abort.dart'; import '../codec.dart'; +import '../compression.dart'; import '../headers.dart'; import '../spec.dart'; import 'version.dart'; @@ -22,12 +23,18 @@ const headerContentType = "content-type"; const headerContentLength = "content-length"; const headerTimeout = "connect-timeout-ms"; const headerProtocolVersion = "connect-protocol-version"; +const headerUnaryEncoding = "content-encoding"; +const headerStreamEncoding = "connect-content-encoding"; +const headerUnaryAcceptEncoding = "accept-encoding"; +const headerStreamAcceptEncoding = "connect-accept-encoding"; Headers requestHeader( Codec codec, StreamType streamType, Headers? userProvidedHeaders, AbortSignal? signal, + Compression? sendCompression, + List acceptCompressions, ) { final header = userProvidedHeaders == null ? Headers() @@ -41,5 +48,16 @@ Headers requestHeader( ? 'application/${codec.name}' : 'application/connect+${codec.name}'; // TODO: User agent headers. + if (sendCompression != null) { + header[streamType == StreamType.unary + ? headerUnaryEncoding + : headerStreamEncoding] = sendCompression.name; + } + if (acceptCompressions.isNotEmpty) { + header[streamType == StreamType.unary + ? headerUnaryAcceptEncoding + : headerStreamAcceptEncoding] = + acceptCompressions.map((c) => c.name).join(","); + } return header; } diff --git a/packages/connect/lib/src/connect/protocol.dart b/packages/connect/lib/src/connect/protocol.dart index 29345d3..d14cecc 100644 --- a/packages/connect/lib/src/connect/protocol.dart +++ b/packages/connect/lib/src/connect/protocol.dart @@ -14,6 +14,7 @@ import '../code.dart'; import '../codec.dart'; +import '../compression.dart'; import '../exception.dart'; import '../headers.dart'; import '../http.dart'; @@ -42,6 +43,8 @@ final class Protocol implements base.Protocol { Headers requestHeaders( Spec spec, Codec codec, + Compression? sendCompression, + List acceptCompressions, CallOptions? options, ) { return requestHeader( @@ -49,6 +52,8 @@ final class Protocol implements base.Protocol { spec.streamType, options?.headers, options?.signal, + sendCompression, + acceptCompressions, ); } @@ -57,6 +62,8 @@ final class Protocol implements base.Protocol { UnaryRequest req, Codec codec, HttpClient httpClient, + Compression? sendCompression, + List acceptCompressions, ) async { final HttpRequest hReq; if (useHttpGet && @@ -73,20 +80,28 @@ final class Protocol implements base.Protocol { req.signal, ); } else { + final body = codec.encode(req.message); hReq = HttpRequest( req.url, "POST", req.headers, - Stream.fromIterable([codec.encode(req.message)]), + Stream.fromIterable([sendCompression?.encode(body) ?? body]), req.signal, ); } final res = await httpClient(hReq); - final unaryError = res.validate(req.spec.streamType, codec); + final (:compression, :unaryError) = res.validate( + req.spec.streamType, + codec, + acceptCompressions, + ); final (:headers, :trailers) = res.header.demux(); - final body = await res.body.toBytes( + var body = await res.body.toBytes( int.tryParse(headers[headerContentLength] ?? ''), ); + if (compression != null) { + body = compression.decode(body); + } if (unaryError != null) { throw errorFromJsonBytes( body, @@ -107,6 +122,8 @@ final class Protocol implements base.Protocol { StreamRequest req, Codec codec, HttpClient httpClient, + Compression? sendCompression, + List acceptCompressions, ) async { final res = await httpClient( HttpRequest( @@ -117,14 +134,15 @@ final class Protocol implements base.Protocol { req.signal, ), ); - res.validate(req.spec.streamType, codec); + final (:compression, unaryError: _) = + res.validate(req.spec.streamType, codec, acceptCompressions); final trailer = Headers(); return StreamResponse( req.spec, res.header, res.body .splitEnvelope() - .decompress() + .decompress(compression) .parse( codec, req.spec.outputFactory, diff --git a/packages/connect/lib/src/connect/response.dart b/packages/connect/lib/src/connect/response.dart index 4ad638f..d85e58c 100644 --- a/packages/connect/lib/src/connect/response.dart +++ b/packages/connect/lib/src/connect/response.dart @@ -14,9 +14,11 @@ import '../code.dart'; import '../codec.dart'; +import '../compression.dart'; import '../exception.dart'; import '../headers.dart'; import '../http.dart'; +import '../protocol/compression.dart'; import '../spec.dart'; import 'header.dart'; import 'http_status.dart'; @@ -28,10 +30,17 @@ extension ResponseValidation on HttpResponse { /// For unary RPCs with an HTTP error status, this returns an error /// derived from the HTTP status instead of throwing it, giving an /// implementation a chance to parse a Connect error from the wire. - ConnectException? validate( + ({ConnectException? unaryError, Compression? compression}) validate( StreamType streamType, Codec codec, + List acceptCompressions, ) { + final compression = findCompression( + acceptCompressions, + streamType == StreamType.unary + ? headerUnaryEncoding + : headerStreamEncoding, + ); final contentType = header[headerContentType] ?? ''; if (status != 200) { final statusErr = ConnectException( @@ -41,7 +50,7 @@ extension ResponseValidation on HttpResponse { if (streamType == StreamType.unary && contentType.startsWith("application/json")) { // Unary JSON response - return statusErr; + return (unaryError: statusErr, compression: compression); } throw statusErr; } @@ -58,6 +67,6 @@ extension ResponseValidation on HttpResponse { metadata: header, ); } - return null; + return (unaryError: null, compression: compression); } } diff --git a/packages/connect/lib/src/connect/transport.dart b/packages/connect/lib/src/connect/transport.dart index a16d9a1..83468f1 100644 --- a/packages/connect/lib/src/connect/transport.dart +++ b/packages/connect/lib/src/connect/transport.dart @@ -13,6 +13,7 @@ // limitations under the License. import "../codec.dart"; +import "../compression.dart"; import "../http.dart"; import "../interceptor.dart"; import "../protocol/transport.dart"; @@ -26,11 +27,15 @@ final class Transport extends ProtocolTransport { required HttpClient httpClient, List? interceptors, bool useHttpGet = false, + Compression? sendCompression, + List? acceptCompressions, }) : super( baseUrl, codec, Protocol(useHttpGet), httpClient, interceptors ?? [], + sendCompression, + acceptCompressions ?? [], ); } diff --git a/packages/connect/lib/src/fake.dart b/packages/connect/lib/src/fake.dart index 58aa531..cc28d4f 100644 --- a/packages/connect/lib/src/fake.dart +++ b/packages/connect/lib/src/fake.dart @@ -17,6 +17,7 @@ import 'dart:typed_data'; import 'abort.dart'; import 'codec.dart'; +import 'compression.dart'; import 'headers.dart'; import 'http.dart'; import 'interceptor.dart'; @@ -155,6 +156,8 @@ final class _FakeTransport extends ProtocolTransport { _FakeProtocol(handlers), (req) => throw UnimplementedError(), interceptors ?? [], + null, + [], ); } @@ -167,6 +170,8 @@ final class _FakeProtocol implements Protocol { Headers requestHeaders( Spec spec, Codec codec, + Compression? sendCompression, + List acceptCompressions, CallOptions? options, ) { final headers = Headers(); @@ -181,6 +186,8 @@ final class _FakeProtocol implements Protocol { UnaryRequest req, Codec codec, HttpClient client, + Compression? sendCompression, + List acceptCompressions, ) async { final handler = handlers[req.spec]; if (handler == null) { @@ -211,6 +218,8 @@ final class _FakeProtocol implements Protocol { StreamRequest req, Codec codec, HttpClient client, + Compression? sendCompression, + List acceptCompressions, ) async { final handler = handlers[req.spec]; if (handler == null) { diff --git a/packages/connect/lib/src/grpc/protocol.dart b/packages/connect/lib/src/grpc/protocol.dart index 731ad87..2146527 100644 --- a/packages/connect/lib/src/grpc/protocol.dart +++ b/packages/connect/lib/src/grpc/protocol.dart @@ -14,6 +14,7 @@ import '../code.dart'; import '../codec.dart'; +import '../compression.dart'; import '../exception.dart'; import '../headers.dart'; import '../http.dart'; @@ -32,16 +33,25 @@ import 'trailer.dart'; final class Protocol implements base.Protocol { final StatusParser statusParser; + final Compression? sendCompression; - const Protocol(this.statusParser); + const Protocol(this.statusParser, this.sendCompression); @override Headers requestHeaders( Spec spec, Codec codec, + Compression? sendCompression, + List acceptCompressions, CallOptions? options, ) { - return requestHeader(codec, options?.headers, options?.signal); + return requestHeader( + codec, + options?.headers, + options?.signal, + sendCompression, + acceptCompressions, + ); } @override @@ -49,20 +59,28 @@ final class Protocol implements base.Protocol { UnaryRequest req, Codec codec, HttpClient httpClient, + Compression? sendCompression, + List acceptCompressions, ) async { final res = await httpClient( HttpRequest( req.url, "POST", req.headers, - Stream.fromIterable([encodeEnvelope(0, codec.encode(req.message))]), + Stream.fromIterable([req.message]) + .serialize(codec) + .compress(sendCompression) + .joinEnvelope(), req.signal, ), ); - final (foundStatus: _, :headerError) = res.validate(statusParser); + final (foundStatus: _, :headerError, :compression) = res.validate( + statusParser, + acceptCompressions, + ); final message = await res.body .splitEnvelope() - .decompress() + .decompress(compression) .parse(codec, req.spec.outputFactory) .tryReadingSingleMessage(); res.trailer.validateTrailer(res.header, statusParser); @@ -98,17 +116,22 @@ final class Protocol implements base.Protocol { StreamRequest req, Codec codec, HttpClient httpClient, + Compression? sendCompression, + List acceptCompressions, ) async { final res = await httpClient( HttpRequest( req.url, "POST", req.headers, - req.message.serialize(codec).joinEnvelope(), + req.message.serialize(codec).compress(sendCompression).joinEnvelope(), req.signal, ), ); - final (:foundStatus, :headerError) = res.validate(statusParser); + final (:foundStatus, :headerError, :compression) = res.validate( + statusParser, + acceptCompressions, + ); if (headerError != null) { // Trailers only response. throw headerError; @@ -118,7 +141,7 @@ final class Protocol implements base.Protocol { res.header, res.body .splitEnvelope() - .decompress() + .decompress(compression) .parse(codec, req.spec.outputFactory) .onDone(() { if (!foundStatus) { diff --git a/packages/connect/lib/src/grpc/request_header.dart b/packages/connect/lib/src/grpc/request_header.dart index 7f75c80..103f9d0 100644 --- a/packages/connect/lib/src/grpc/request_header.dart +++ b/packages/connect/lib/src/grpc/request_header.dart @@ -14,6 +14,7 @@ import '../abort.dart'; import '../codec.dart'; +import '../compression.dart'; import '../headers.dart'; import 'headers.dart'; @@ -23,6 +24,8 @@ Headers requestHeader( Codec codec, Headers? userProvidedHeaders, AbortSignal? signal, + Compression? sendCompression, + List acceptCompressions, ) { final header = userProvidedHeaders == null ? Headers() @@ -36,5 +39,12 @@ Headers requestHeader( // don't support HTTP trailers. header["Te"] = "trailers"; // TODO: User agent headers. + if (sendCompression != null) { + header[headerEncoding] = sendCompression.name; + } + if (acceptCompressions.isNotEmpty) { + header[headerAcceptEncoding] = + acceptCompressions.map((c) => c.name).join(","); + } return header; } diff --git a/packages/connect/lib/src/grpc/response.dart b/packages/connect/lib/src/grpc/response.dart index 1bd5726..1489902 100644 --- a/packages/connect/lib/src/grpc/response.dart +++ b/packages/connect/lib/src/grpc/response.dart @@ -12,15 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -import 'package:connectrpc/src/grpc/request_header.dart'; - import '../code.dart'; +import '../compression.dart'; import '../exception.dart'; -import '../grpc/headers.dart'; -import '../grpc/status.dart'; -import '../grpc/trailer_error.dart'; import '../headers.dart'; import '../http.dart'; +import '../protocol/compression.dart'; +import './headers.dart'; +import './request_header.dart'; +import './status.dart'; +import './trailer_error.dart'; import 'http_status.dart'; extension ResponseValidation on HttpResponse { @@ -32,8 +33,13 @@ extension ResponseValidation on HttpResponse { /// Returns an object that indicates whether a gRPC status was found /// in the response header. In this case, clients can not expect a /// trailer. - ({bool foundStatus, ConnectException? headerError}) validate( + ({ + bool foundStatus, + ConnectException? headerError, + Compression? compression, + }) validate( StatusParser statusParser, + List acceptCompressions, ) { if (status != 200) { throw ConnectException( @@ -42,14 +48,6 @@ extension ResponseValidation on HttpResponse { metadata: header, ); } - final encoding = header[headerEncoding]; - if (encoding != null && header[headerEncoding] != "identity") { - throw ConnectException( - Code.internal, - 'unsupported response encoding "$encoding"', - metadata: header, - ); - } final contentType = header[headerContentType] ?? ''; if (!contentType.startsWith(contentTypePrefix)) { throw ConnectException( @@ -60,7 +58,8 @@ extension ResponseValidation on HttpResponse { } return ( foundStatus: header.contains(headerGrpcStatus), - headerError: header.findError(statusParser) + headerError: header.findError(statusParser), + compression: findCompression(acceptCompressions, headerEncoding) ); } } diff --git a/packages/connect/lib/src/grpc/transport.dart b/packages/connect/lib/src/grpc/transport.dart index 05b4e2d..a3434cb 100644 --- a/packages/connect/lib/src/grpc/transport.dart +++ b/packages/connect/lib/src/grpc/transport.dart @@ -13,6 +13,7 @@ // limitations under the License. import '../codec.dart'; +import '../compression.dart'; import '../http.dart'; import '../interceptor.dart'; import '../protocol/transport.dart'; @@ -30,11 +31,15 @@ final class Transport extends ProtocolTransport { required HttpClient httpClient, required StatusParser statusParser, List? interceptors, + Compression? sendCompression, + List? acceptCompressions, }) : super( baseUrl, codec, - Protocol(statusParser), + Protocol(statusParser, sendCompression), httpClient, interceptors ?? [], + sendCompression, + acceptCompressions ?? [], ); } diff --git a/packages/connect/lib/src/grpc_web/protocol.dart b/packages/connect/lib/src/grpc_web/protocol.dart index 81617a4..69d4525 100644 --- a/packages/connect/lib/src/grpc_web/protocol.dart +++ b/packages/connect/lib/src/grpc_web/protocol.dart @@ -16,6 +16,7 @@ import 'dart:async'; import '../code.dart'; import '../codec.dart'; +import '../compression.dart'; import '../exception.dart'; import '../grpc/headers.dart'; import '../grpc/status.dart'; @@ -42,9 +43,17 @@ final class Protocol implements base.Protocol { Headers requestHeaders( Spec spec, Codec codec, + Compression? sendCompression, + List acceptCompressions, CallOptions? options, ) { - return requestHeader(codec, options?.headers, options?.signal); + return requestHeader( + codec, + options?.headers, + options?.signal, + sendCompression, + acceptCompressions, + ); } @override @@ -52,20 +61,28 @@ final class Protocol implements base.Protocol { UnaryRequest req, Codec codec, HttpClient httpClient, + Compression? sendCompression, + List acceptCompressions, ) async { final res = await httpClient( HttpRequest( req.url, "POST", req.headers, - Stream.fromIterable([encodeEnvelope(0, codec.encode(req.message))]), + Stream.fromIterable([req.message]) + .serialize(codec) + .compress(sendCompression) + .joinEnvelope(), req.signal, ), ); - final (:foundStatus, :headerError) = res.validate(statusParser); + final (:foundStatus, :headerError, :compression) = res.validate( + statusParser, + acceptCompressions, + ); final (message, trailer) = await res.body .splitEnvelope() - .decompress() + .decompress(compression) .parse( codec, req.spec.outputFactory, @@ -102,6 +119,8 @@ final class Protocol implements base.Protocol { StreamRequest req, Codec codec, HttpClient httpClient, + Compression? sendCompression, + List acceptCompressions, ) async { final res = await httpClient( HttpRequest( @@ -112,7 +131,10 @@ final class Protocol implements base.Protocol { req.signal, ), ); - final (:foundStatus, :headerError) = res.validate(statusParser); + final (:foundStatus, :headerError, :compression) = res.validate( + statusParser, + acceptCompressions, + ); if (headerError != null) { throw headerError; } @@ -122,7 +144,7 @@ final class Protocol implements base.Protocol { res.header, res.body .splitEnvelope() - .decompress() + .decompress(compression) .parse( codec, req.spec.outputFactory, diff --git a/packages/connect/lib/src/grpc_web/request_header.dart b/packages/connect/lib/src/grpc_web/request_header.dart index 1a943d0..2331c31 100644 --- a/packages/connect/lib/src/grpc_web/request_header.dart +++ b/packages/connect/lib/src/grpc_web/request_header.dart @@ -14,6 +14,7 @@ import '../abort.dart'; import '../codec.dart'; +import '../compression.dart'; import '../grpc/headers.dart'; import '../headers.dart'; @@ -30,6 +31,8 @@ Headers requestHeader( Codec codec, Headers? userProvidedHeaders, AbortSignal? signal, + Compression? sendCompression, + List acceptCompressions, ) { final header = userProvidedHeaders == null ? Headers() @@ -43,5 +46,12 @@ Headers requestHeader( '${deadline.difference(DateTime.now()).inMilliseconds}m'; } // TODO: User agent headers. + if (sendCompression != null) { + header[headerEncoding] = sendCompression.name; + } + if (acceptCompressions.isNotEmpty) { + header[headerAcceptEncoding] = + acceptCompressions.map((c) => c.name).join(","); + } return header; } diff --git a/packages/connect/lib/src/grpc_web/response.dart b/packages/connect/lib/src/grpc_web/response.dart index 13815cc..46b755e 100644 --- a/packages/connect/lib/src/grpc_web/response.dart +++ b/packages/connect/lib/src/grpc_web/response.dart @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -import '../code.dart'; +import '../compression.dart'; import '../exception.dart'; import '../grpc/headers.dart'; import '../grpc/http_status.dart'; @@ -20,6 +20,7 @@ import '../grpc/status.dart'; import '../grpc/trailer_error.dart'; import '../headers.dart'; import '../http.dart'; +import '../protocol/compression.dart'; extension ResponseValidation on HttpResponse { /// Validates response status and header for the gRPC-web protocol. @@ -30,23 +31,21 @@ extension ResponseValidation on HttpResponse { /// Returns an object that indicates whether a gRPC status was found /// in the response header. In this case, clients can not expect a /// trailer. - ({bool foundStatus, ConnectException? headerError}) validate( + ({ + bool foundStatus, + ConnectException? headerError, + Compression? compression, + }) validate( StatusParser statusParser, + List acceptCompression, ) { - final encoding = header[headerEncoding]; - if (encoding != null && header[headerEncoding] != "identity") { - throw ConnectException( - Code.internal, - 'unsupported response encoding "$encoding"', - metadata: header, - ); - } // For compatibility with the `grpc-web` package, we treat all HTTP status // codes in the 200 range as valid, not just HTTP 200. if (status >= 200 && status < 300) { return ( foundStatus: header.contains(headerGrpcStatus), - headerError: header.findError(statusParser) + headerError: header.findError(statusParser), + compression: findCompression(acceptCompression, headerEncoding), ); } throw ConnectException( diff --git a/packages/connect/lib/src/grpc_web/transport.dart b/packages/connect/lib/src/grpc_web/transport.dart index 11b1533..5f093fa 100644 --- a/packages/connect/lib/src/grpc_web/transport.dart +++ b/packages/connect/lib/src/grpc_web/transport.dart @@ -13,6 +13,7 @@ // limitations under the License. import "../codec.dart"; +import "../compression.dart"; import "../grpc/status.dart"; import "../http.dart"; import "../interceptor.dart"; @@ -27,11 +28,15 @@ final class Transport extends ProtocolTransport { required HttpClient httpClient, required StatusParser statusParser, List? interceptors, + Compression? sendCompression, + List? acceptCompressions, }) : super( baseUrl, codec, Protocol(statusParser), httpClient, interceptors ?? [], + sendCompression, + acceptCompressions ?? [], ); } diff --git a/packages/connect/lib/src/gzip.dart b/packages/connect/lib/src/gzip.dart new file mode 100644 index 0000000..1beeabb --- /dev/null +++ b/packages/connect/lib/src/gzip.dart @@ -0,0 +1,46 @@ +// Copyright 2024 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'dart:convert'; +import 'dart:io'; +import 'dart:typed_data'; + +import 'compression.dart'; + +/// [Compression] that supports gzip compression algorithm. +final class GzipCompression extends Codec + implements Compression { + final GZipCodec _codec; + late final _Converter _encoder = _Converter(_codec.encoder); + late final _Converter _decoder = _Converter(_codec.decoder); + + GzipCompression({GZipCodec? codec}) : _codec = codec ?? gzip; + @override + String get name => "gzip"; + + @override + Converter get decoder => _decoder; + + @override + Converter get encoder => _encoder; +} + +final class _Converter extends Converter { + final Converter, List> converter; + const _Converter(this.converter); + @override + Uint8List convert(Uint8List input) { + return Uint8List.fromList(converter.convert(input)); + } +} diff --git a/packages/connect/lib/src/io.dart b/packages/connect/lib/src/io.dart index b0fc6de..73abaae 100644 --- a/packages/connect/lib/src/io.dart +++ b/packages/connect/lib/src/io.dart @@ -28,8 +28,10 @@ import 'sentinel.dart'; HttpClient createHttpClient(io.HttpClient client) { return (creq) async { final req = await client.openUrl(creq.method, Uri.parse(creq.url)); - // Remove default values. - req.headers.removeAll("accept-encoding"); + // We don't want compression unless it came from upstream. + // + // Ref: https://api.dart.dev/dart-io/HttpClient-class.html + req.headers.removeAll(io.HttpHeaders.acceptEncodingHeader); for (final header in creq.header.entries) { req.headers.add(header.name, header.value); } @@ -44,8 +46,19 @@ HttpClient createHttpClient(io.HttpClient client) { } } final res = await sentinel.race(req.close()); + final compressed = + res.headers.value(io.HttpHeaders.contentEncodingHeader) == 'gzip'; final headers = Headers(); res.headers.forEach((key, values) { + // It automatically decompresses gzip responses, but keeps the + // original content-length and accept-encoding headers. + // + // Ref: https://api.dart.dev/dart-io/HttpClient-class.html + if (compressed && + (key == io.HttpHeaders.contentLengthHeader || + key == io.HttpHeaders.contentEncodingHeader)) { + return; + } for (final value in values) { headers.add(key, value); } diff --git a/packages/connect/lib/src/protocol/compression.dart b/packages/connect/lib/src/protocol/compression.dart index 719f81b..8118cb7 100644 --- a/packages/connect/lib/src/protocol/compression.dart +++ b/packages/connect/lib/src/protocol/compression.dart @@ -15,7 +15,10 @@ import 'dart:async'; import '../code.dart'; +import '../compression.dart'; import '../exception.dart'; +import '../headers.dart'; +import '../http.dart'; import 'envelope.dart'; /// Indicates that the data in a EnvelopedMessage is @@ -24,17 +27,63 @@ import 'envelope.dart'; /// const compressedFlag = 1; // 0b00000001 -extension DecompressEnvelope on Stream { - /// Noop for now. Just ensures everything is uncompressed. - Stream decompress() async* { +extension EnvelopeStreamCompression on Stream { + /// Decompressess the envoloped messaged if the [compression] is not null and + /// the envelope is compressed. + Stream decompress( + Compression? compression, + ) async* { await for (final env in this) { - if ((env.flags & compressedFlag) == compressedFlag) { + if (env.flags & compressedFlag == compressedFlag) { + if (compression == null) { + throw ConnectException( + Code.internal, + "received compressed envelope, but do not know how to decompress", + ); + } + yield EnvelopedMessage( + env.flags ^ compressedFlag, + compression.decode(env.data), + ); + } else { + yield env; + } + } + } + + /// Compresses an uncompressed enveloped message using the given + /// [compression]. + Stream compress( + Compression? compression, + ) { + if (compression == null) { + return this; + } + return (() async* { + await for (final env in this) { + yield EnvelopedMessage( + env.flags | compressedFlag, + compression.encode(env.data), + ); + } + })(); + } +} + +extension FindCompression on HttpResponse { + Compression? findCompression(List accept, String headerKey) { + final encoding = header[headerKey]; + if (encoding != null && encoding.toLowerCase() != 'identity') { + final i = accept.indexWhere((c) => c.name == encoding); + if (i < 0) { throw ConnectException( Code.internal, - "received compressed envelope, but do not know how to decompress", + 'unsupported response encoding "$encoding"', + metadata: header, ); } - yield env; + return accept[i]; } + return null; } } diff --git a/packages/connect/lib/src/protocol/protocol.dart b/packages/connect/lib/src/protocol/protocol.dart index 78733ef..c287d59 100644 --- a/packages/connect/lib/src/protocol/protocol.dart +++ b/packages/connect/lib/src/protocol/protocol.dart @@ -12,7 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -import 'package:connectrpc/connect.dart'; +import '../codec.dart'; +import '../compression.dart'; +import '../headers.dart'; +import '../http.dart'; +import '../interceptor.dart'; +import '../spec.dart'; +import '../transport.dart'; /// Protocol is common abstraction of an RPC protocol over HTTP. abstract interface class Protocol { @@ -20,6 +26,8 @@ abstract interface class Protocol { Headers requestHeaders( Spec spec, Codec codec, + Compression? sendCompression, + List acceptCompressions, CallOptions? options, ); @@ -29,6 +37,8 @@ abstract interface class Protocol { UnaryRequest req, Codec codec, HttpClient client, + Compression? sendCompression, + List acceptCompressions, ); /// Performs a stream request using the given [client] according @@ -37,5 +47,7 @@ abstract interface class Protocol { StreamRequest req, Codec codec, HttpClient client, + Compression? sendCompression, + List acceptCompressions, ); } diff --git a/packages/connect/lib/src/protocol/transport.dart b/packages/connect/lib/src/protocol/transport.dart index d590ecb..4714d88 100644 --- a/packages/connect/lib/src/protocol/transport.dart +++ b/packages/connect/lib/src/protocol/transport.dart @@ -14,6 +14,7 @@ import '../abort.dart'; import '../codec.dart'; +import '../compression.dart'; import '../http.dart'; import '../interceptor.dart'; import '../spec.dart'; @@ -28,6 +29,8 @@ abstract base class ProtocolTransport implements Transport { final Protocol _protocol; final HttpClient _httpClient; final List _interceptors; + final Compression? sendCompression; + final List acceptCompressions; ProtocolTransport( String baseUrl, @@ -35,6 +38,8 @@ abstract base class ProtocolTransport implements Transport { this._protocol, this._httpClient, this._interceptors, + this.sendCompression, + this.acceptCompressions, ) : _baseUrl = baseUrl.replaceAll(RegExp(r'/?$'), ""); @override @@ -48,12 +53,24 @@ abstract base class ProtocolTransport implements Transport { final req = UnaryRequest( spec, _baseUrl + spec.procedure, - _protocol.requestHeaders(spec, _codec, options), + _protocol.requestHeaders( + spec, + _codec, + sendCompression, + acceptCompressions, + options, + ), input, signal, ); if (_interceptors.isEmpty) { - return await _protocol.unary(req, _codec, _httpClient); + return await _protocol.unary( + req, + _codec, + _httpClient, + sendCompression, + acceptCompressions, + ); } final first = _interceptors.apply( (req) async { @@ -61,6 +78,8 @@ abstract base class ProtocolTransport implements Transport { req as UnaryRequest, _codec, _httpClient, + sendCompression, + acceptCompressions, ); }, ); @@ -82,13 +101,25 @@ abstract base class ProtocolTransport implements Transport { final req = StreamRequest( spec, _baseUrl + spec.procedure, - _protocol.requestHeaders(spec, _codec, options), + _protocol.requestHeaders( + spec, + _codec, + sendCompression, + acceptCompressions, + options, + ), input, signal, ); late final StreamResponse res; if (_interceptors.isEmpty) { - res = await _protocol.stream(req, _codec, _httpClient); + res = await _protocol.stream( + req, + _codec, + _httpClient, + sendCompression, + acceptCompressions, + ); } else { final first = _interceptors.apply( (req) async { @@ -96,6 +127,8 @@ abstract base class ProtocolTransport implements Transport { req as StreamRequest, _codec, _httpClient, + sendCompression, + acceptCompressions, ); }, ); diff --git a/packages/connect/test/conformance/conformance.io.yaml b/packages/connect/test/conformance/conformance.io.yaml index f066cfc..6377aaf 100644 --- a/packages/connect/test/conformance/conformance.io.yaml +++ b/packages/connect/test/conformance/conformance.io.yaml @@ -11,6 +11,7 @@ features: - CODEC_JSON compressions: - COMPRESSION_IDENTITY + - COMPRESSION_GZIP supportsTls: true supportsH2c: false supportsTlsClientCerts: true diff --git a/packages/connect/test/conformance/io_test.dart b/packages/connect/test/conformance/io_test.dart index 257cf77..13c2235 100644 --- a/packages/connect/test/conformance/io_test.dart +++ b/packages/connect/test/conformance/io_test.dart @@ -25,6 +25,7 @@ import 'package:connectrpc/protobuf.dart'; import 'package:connectrpc/protocol/connect.dart' as connect; import 'package:connectrpc/protocol/grpc.dart' as grpc; import 'package:connectrpc/protocol/grpc_web.dart' as grpc_web; +import 'package:connectrpc/src/gzip.dart'; import 'package:protobuf/protobuf.dart'; import 'package:test/test.dart'; @@ -69,24 +70,36 @@ void main() { HTTPVersion.HTTP_VERSION_2 => http2.createHttpClient(context: context), _ => throw 'Unsupported Http version', }; + final compression = switch (req.compression) { + Compression.COMPRESSION_GZIP => GzipCompression(), + Compression.COMPRESSION_UNSPECIFIED => null, + Compression.COMPRESSION_IDENTITY => null, + _ => throw 'Unsupported compression ${req.compression.name}', + }; return switch (req.protocol) { Protocol.PROTOCOL_CONNECT => connect.Transport( baseUrl: baseUrl, codec: codec, httpClient: httpClient, useHttpGet: req.useGetHttpMethod, + sendCompression: compression, + acceptCompressions: compression != null ? [compression] : [], ), Protocol.PROTOCOL_GRPC => grpc.Transport( baseUrl: baseUrl, codec: codec, httpClient: httpClient, statusParser: StatusParser(), + sendCompression: compression, + acceptCompressions: compression != null ? [compression] : [], ), Protocol.PROTOCOL_GRPC_WEB => grpc_web.Transport( baseUrl: baseUrl, codec: codec, httpClient: httpClient, statusParser: StatusParser(), + sendCompression: compression, + acceptCompressions: compression != null ? [compression] : [], ), _ => throw "Unknown protocol", }; diff --git a/packages/connect/test/protocol/transport_test.dart b/packages/connect/test/protocol/transport_test.dart index b59c99e..6354b6d 100644 --- a/packages/connect/test/protocol/transport_test.dart +++ b/packages/connect/test/protocol/transport_test.dart @@ -196,6 +196,8 @@ class DelegatingProtocol implements Protocol { Headers requestHeaders( Spec spec, Codec codec, + Compression? sendCompression, + List acceptCompressions, CallOptions? options, ) { final onRequestHeaders = this.onRequestHeaders; @@ -214,6 +216,8 @@ class DelegatingProtocol implements Protocol { UnaryRequest req, Codec codec, HttpClient client, + Compression? sendCompression, + List acceptCompressions, ) async { final onUnary = this.onUnary; if (onUnary != null) { @@ -233,6 +237,8 @@ class DelegatingProtocol implements Protocol { StreamRequest req, Codec codec, HttpClient client, + Compression? sendCompression, + List acceptCompressions, ) async { final onStream = this.onStream; if (onStream != null) { @@ -258,6 +264,8 @@ final class TestTransport extends ProtocolTransport { protocol, (req) => throw UnimplementedError(), interceptors ?? [], + null, + [], ); }