Skip to content

Commit

Permalink
Implement compression
Browse files Browse the repository at this point in the history
Signed-off-by: Sri Krishna <[email protected]>
  • Loading branch information
srikrsna-buf committed Jan 7, 2025
1 parent aba0e3e commit e5f599c
Show file tree
Hide file tree
Showing 24 changed files with 393 additions and 62 deletions.
1 change: 1 addition & 0 deletions packages/connect/lib/connect.dart
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export 'src/abort.dart';
export 'src/client.dart';
export 'src/code.dart';
export 'src/codec.dart';
export 'src/compression.dart';
export 'src/exception.dart';
export 'src/headers.dart';
export 'src/http.dart';
Expand Down
1 change: 1 addition & 0 deletions packages/connect/lib/io.dart
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.

export 'src/gzip.dart';
export 'src/io.dart';
22 changes: 22 additions & 0 deletions packages/connect/lib/src/compression.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2024 The Connect Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import 'dart:convert';
import 'dart:typed_data';

/// Compression is specialized [Codec] for a certain compression algorithm.
abstract interface class Compression implements Codec<Uint8List, Uint8List> {
/// Name of the compression algorithm.
abstract final String name;
}
18 changes: 18 additions & 0 deletions packages/connect/lib/src/connect/header.dart
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import '../abort.dart';
import '../codec.dart';
import '../compression.dart';
import '../headers.dart';
import '../spec.dart';
import 'version.dart';
Expand All @@ -22,12 +23,18 @@ const headerContentType = "content-type";
const headerContentLength = "content-length";
const headerTimeout = "connect-timeout-ms";
const headerProtocolVersion = "connect-protocol-version";
const headerUnaryEncoding = "content-encoding";
const headerStreamEncoding = "connect-content-encoding";
const headerUnaryAcceptEncoding = "accept-encoding";
const headerStreamAcceptEncoding = "connect-accept-encoding";

Headers requestHeader(
Codec codec,
StreamType streamType,
Headers? userProvidedHeaders,
AbortSignal? signal,
Compression? sendCompression,
List<Compression> acceptCompressions,
) {
final header = userProvidedHeaders == null
? Headers()
Expand All @@ -41,5 +48,16 @@ Headers requestHeader(
? 'application/${codec.name}'
: 'application/connect+${codec.name}';
// TODO: User agent headers.
if (sendCompression != null) {
header[streamType == StreamType.unary
? headerUnaryEncoding
: headerStreamEncoding] = sendCompression.name;
}
if (acceptCompressions.isNotEmpty) {
header[streamType == StreamType.unary
? headerUnaryAcceptEncoding
: headerStreamAcceptEncoding] =
acceptCompressions.map((c) => c.name).join(",");
}
return header;
}
28 changes: 23 additions & 5 deletions packages/connect/lib/src/connect/protocol.dart
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import '../code.dart';
import '../codec.dart';
import '../compression.dart';
import '../exception.dart';
import '../headers.dart';
import '../http.dart';
Expand Down Expand Up @@ -42,13 +43,17 @@ final class Protocol implements base.Protocol {
Headers requestHeaders<I, O>(
Spec<I, O> spec,
Codec codec,
Compression? sendCompression,
List<Compression> acceptCompressions,
CallOptions? options,
) {
return requestHeader(
codec,
spec.streamType,
options?.headers,
options?.signal,
sendCompression,
acceptCompressions,
);
}

Expand All @@ -57,6 +62,8 @@ final class Protocol implements base.Protocol {
UnaryRequest<I, O> req,
Codec codec,
HttpClient httpClient,
Compression? sendCompression,
List<Compression> acceptCompressions,
) async {
final HttpRequest hReq;
if (useHttpGet &&
Expand All @@ -73,20 +80,28 @@ final class Protocol implements base.Protocol {
req.signal,
);
} else {
final body = codec.encode(req.message);
hReq = HttpRequest(
req.url,
"POST",
req.headers,
Stream.fromIterable([codec.encode(req.message)]),
Stream.fromIterable([sendCompression?.encode(body) ?? body]),
req.signal,
);
}
final res = await httpClient(hReq);
final unaryError = res.validate(req.spec.streamType, codec);
final (:compression, :unaryError) = res.validate(
req.spec.streamType,
codec,
acceptCompressions,
);
final (:headers, :trailers) = res.header.demux();
final body = await res.body.toBytes(
var body = await res.body.toBytes(
int.tryParse(headers[headerContentLength] ?? ''),
);
if (compression != null) {
body = compression.decode(body);
}
if (unaryError != null) {
throw errorFromJsonBytes(
body,
Expand All @@ -107,6 +122,8 @@ final class Protocol implements base.Protocol {
StreamRequest<I, O> req,
Codec codec,
HttpClient httpClient,
Compression? sendCompression,
List<Compression> acceptCompressions,
) async {
final res = await httpClient(
HttpRequest(
Expand All @@ -117,14 +134,15 @@ final class Protocol implements base.Protocol {
req.signal,
),
);
res.validate(req.spec.streamType, codec);
final (:compression, unaryError: _) =
res.validate(req.spec.streamType, codec, acceptCompressions);
final trailer = Headers();
return StreamResponse(
req.spec,
res.header,
res.body
.splitEnvelope()
.decompress()
.decompress(compression)
.parse(
codec,
req.spec.outputFactory,
Expand Down
15 changes: 12 additions & 3 deletions packages/connect/lib/src/connect/response.dart
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

import '../code.dart';
import '../codec.dart';
import '../compression.dart';
import '../exception.dart';
import '../headers.dart';
import '../http.dart';
import '../protocol/compression.dart';
import '../spec.dart';
import 'header.dart';
import 'http_status.dart';
Expand All @@ -28,10 +30,17 @@ extension ResponseValidation on HttpResponse {
/// For unary RPCs with an HTTP error status, this returns an error
/// derived from the HTTP status instead of throwing it, giving an
/// implementation a chance to parse a Connect error from the wire.
ConnectException? validate(
({ConnectException? unaryError, Compression? compression}) validate(
StreamType streamType,
Codec codec,
List<Compression> acceptCompressions,
) {
final compression = findCompression(
acceptCompressions,
streamType == StreamType.unary
? headerUnaryEncoding
: headerStreamEncoding,
);
final contentType = header[headerContentType] ?? '';
if (status != 200) {
final statusErr = ConnectException(
Expand All @@ -41,7 +50,7 @@ extension ResponseValidation on HttpResponse {
if (streamType == StreamType.unary &&
contentType.startsWith("application/json")) {
// Unary JSON response
return statusErr;
return (unaryError: statusErr, compression: compression);
}
throw statusErr;
}
Expand All @@ -58,6 +67,6 @@ extension ResponseValidation on HttpResponse {
metadata: header,
);
}
return null;
return (unaryError: null, compression: compression);
}
}
5 changes: 5 additions & 0 deletions packages/connect/lib/src/connect/transport.dart
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

import "../codec.dart";
import "../compression.dart";
import "../http.dart";
import "../interceptor.dart";
import "../protocol/transport.dart";
Expand All @@ -26,11 +27,15 @@ final class Transport extends ProtocolTransport {
required HttpClient httpClient,
List<Interceptor>? interceptors,
bool useHttpGet = false,
Compression? sendCompression,
List<Compression>? acceptCompressions,
}) : super(
baseUrl,
codec,
Protocol(useHttpGet),
httpClient,
interceptors ?? [],
sendCompression,
acceptCompressions ?? [],
);
}
9 changes: 9 additions & 0 deletions packages/connect/lib/src/fake.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import 'dart:typed_data';

import 'abort.dart';
import 'codec.dart';
import 'compression.dart';
import 'headers.dart';
import 'http.dart';
import 'interceptor.dart';
Expand Down Expand Up @@ -155,6 +156,8 @@ final class _FakeTransport extends ProtocolTransport {
_FakeProtocol(handlers),
(req) => throw UnimplementedError(),
interceptors ?? [],
null,
[],
);
}

Expand All @@ -167,6 +170,8 @@ final class _FakeProtocol implements Protocol {
Headers requestHeaders<I, O>(
Spec<I, O> spec,
Codec codec,
Compression? sendCompression,
List<Compression> acceptCompressions,
CallOptions? options,
) {
final headers = Headers();
Expand All @@ -181,6 +186,8 @@ final class _FakeProtocol implements Protocol {
UnaryRequest<I, O> req,
Codec codec,
HttpClient client,
Compression? sendCompression,
List<Compression> acceptCompressions,
) async {
final handler = handlers[req.spec];
if (handler == null) {
Expand Down Expand Up @@ -211,6 +218,8 @@ final class _FakeProtocol implements Protocol {
StreamRequest<I, O> req,
Codec codec,
HttpClient client,
Compression? sendCompression,
List<Compression> acceptCompressions,
) async {
final handler = handlers[req.spec];
if (handler == null) {
Expand Down
39 changes: 31 additions & 8 deletions packages/connect/lib/src/grpc/protocol.dart
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import '../code.dart';
import '../codec.dart';
import '../compression.dart';
import '../exception.dart';
import '../headers.dart';
import '../http.dart';
Expand All @@ -32,37 +33,54 @@ import 'trailer.dart';

final class Protocol implements base.Protocol {
final StatusParser statusParser;
final Compression? sendCompression;

const Protocol(this.statusParser);
const Protocol(this.statusParser, this.sendCompression);

@override
Headers requestHeaders<I, O>(
Spec<I, O> spec,
Codec codec,
Compression? sendCompression,
List<Compression> acceptCompressions,
CallOptions? options,
) {
return requestHeader(codec, options?.headers, options?.signal);
return requestHeader(
codec,
options?.headers,
options?.signal,
sendCompression,
acceptCompressions,
);
}

@override
Future<UnaryResponse<I, O>> unary<I extends Object, O extends Object>(
UnaryRequest<I, O> req,
Codec codec,
HttpClient httpClient,
Compression? sendCompression,
List<Compression> acceptCompressions,
) async {
final res = await httpClient(
HttpRequest(
req.url,
"POST",
req.headers,
Stream.fromIterable([encodeEnvelope(0, codec.encode(req.message))]),
Stream.fromIterable([req.message])
.serialize(codec)
.compress(sendCompression)
.joinEnvelope(),
req.signal,
),
);
final (foundStatus: _, :headerError) = res.validate(statusParser);
final (foundStatus: _, :headerError, :compression) = res.validate(
statusParser,
acceptCompressions,
);
final message = await res.body
.splitEnvelope()
.decompress()
.decompress(compression)
.parse(codec, req.spec.outputFactory)
.tryReadingSingleMessage();
res.trailer.validateTrailer(res.header, statusParser);
Expand Down Expand Up @@ -98,17 +116,22 @@ final class Protocol implements base.Protocol {
StreamRequest<I, O> req,
Codec codec,
HttpClient httpClient,
Compression? sendCompression,
List<Compression> acceptCompressions,
) async {
final res = await httpClient(
HttpRequest(
req.url,
"POST",
req.headers,
req.message.serialize(codec).joinEnvelope(),
req.message.serialize(codec).compress(sendCompression).joinEnvelope(),
req.signal,
),
);
final (:foundStatus, :headerError) = res.validate(statusParser);
final (:foundStatus, :headerError, :compression) = res.validate(
statusParser,
acceptCompressions,
);
if (headerError != null) {
// Trailers only response.
throw headerError;
Expand All @@ -118,7 +141,7 @@ final class Protocol implements base.Protocol {
res.header,
res.body
.splitEnvelope()
.decompress()
.decompress(compression)
.parse(codec, req.spec.outputFactory)
.onDone(() {
if (!foundStatus) {
Expand Down
Loading

0 comments on commit e5f599c

Please sign in to comment.