diff --git a/.github/workflows/dart.yml b/.github/workflows/dart.yml index 81c03343..b56c8d07 100644 --- a/.github/workflows/dart.yml +++ b/.github/workflows/dart.yml @@ -93,5 +93,5 @@ jobs: - name: Run tests run: dart test --platform ${{ matrix.platform }} - name: Run vmservice test - if: ${{ matrix.platform != 'chrome' && false }} #Disable until https://github.com/grpc/grpc-dart/issues/697 is resolved + if: ${{ matrix.platform != 'chrome' }} run: dart run --enable-vm-service --timeline-streams=Dart test/timeline_test.dart diff --git a/CHANGELOG.md b/CHANGELOG.md index d93dd2a1..57aae486 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,19 +39,19 @@ ## 3.2.1 -* `package:http` now supports more versions: `>=0.13.0 <2.0.0`. +* `package:http` now supports more versions: `>=0.13.0 <2.0.0`. * `package:protobuf` new supports more versions: `>=2.0.0 <4.0.0`. ## 3.2.0 -* `ChannelOptions` now exposes `connectTimeout`, which is used on the +* `ChannelOptions` now exposes `connectTimeout`, which is used on the socket connect. This is used to specify the maximum allowed time to wait for a connection to be established. If `connectTime` is longer than the system level timeout duration, a timeout may occur sooner than specified in `connectTimeout`. On timeout, a `SocketException` is thrown. * Require Dart 2.17 or greater. * Fix issue [#51](https://github.com/grpc/grpc-dart/issues/51), add support for custom error handling. -* Expose client IP address to server +* Expose client IP address to server * Add a `channelShutdownHandler` argument to `ClientChannel` and the subclasses. This callback can be used to react to channel shutdown or termination. * Export the `Code` protobuf enum from the `grpc.dart` library. diff --git a/README.md b/README.md index 1e0419a4..f85542b2 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ The [Dart](https://www.dart.dev/) implementation of ## Learn more - [Quick Start](https://grpc.io/docs/languages/dart/quickstart) - get an app running in minutes -- [Examples](example) +- [Examples](https://github.com/grpc/grpc-dart/tree/master/example) - [API reference](https://grpc.io/docs/languages/dart/api) For complete documentation, see [Dart gRPC](https://grpc.io/docs/languages/dart). diff --git a/lib/src/client/call.dart b/lib/src/client/call.dart index 8d5918f8..65f0de3b 100644 --- a/lib/src/client/call.dart +++ b/lib/src/client/call.dart @@ -86,9 +86,9 @@ class CallOptions { CallOptions mergedWith(CallOptions? other) { if (other == null) return this; - final mergedMetadata = Map.from(metadata)..addAll(other.metadata); + final mergedMetadata = Map.of(metadata)..addAll(other.metadata); final mergedTimeout = other.timeout ?? timeout; - final mergedProviders = List.from(metadataProviders) + final mergedProviders = List.of(metadataProviders) ..addAll(other.metadataProviders); final mergedCompression = other.compression ?? compression; return CallOptions._( @@ -146,9 +146,9 @@ class WebCallOptions extends CallOptions { CallOptions mergedWith(CallOptions? other) { if (other == null) return this; - final mergedMetadata = Map.from(metadata)..addAll(other.metadata); + final mergedMetadata = Map.of(metadata)..addAll(other.metadata); final mergedTimeout = other.timeout ?? timeout; - final mergedProviders = List.from(metadataProviders) + final mergedProviders = List.of(metadataProviders) ..addAll(other.metadataProviders); if (other is! WebCallOptions) { @@ -241,7 +241,7 @@ class ClientCall implements Response { if (options.metadataProviders.isEmpty) { _sendRequest(connection, _sanitizeMetadata(options.metadata)); } else { - final metadata = Map.from(options.metadata); + final metadata = Map.of(options.metadata); Future.forEach( options.metadataProviders, (MetadataProvider provider) => provider(metadata, @@ -483,6 +483,12 @@ class ClientCall implements Response { if (_responseSubscription != null) { futures.add(_responseSubscription!.cancel()); } + if (!_headers.isCompleted) { + _headers.complete({}); + } + if (!_trailers.isCompleted) { + _trailers.complete({}); + } await Future.wait(futures); } diff --git a/lib/src/client/http2_connection.dart b/lib/src/client/http2_connection.dart index 998410a1..4cce6771 100644 --- a/lib/src/client/http2_connection.dart +++ b/lib/src/client/http2_connection.dart @@ -113,7 +113,7 @@ class Http2ClientConnection implements connection.ClientConnection { transport.ping(); } }, - onPingTimeout: () => shutdown(), + onPingTimeout: () => transport.finish(), ); transport.onFrameReceived .listen((_) => keepAliveManager?.onFrameReceived()); diff --git a/lib/src/client/transport/web_streams.dart b/lib/src/client/transport/web_streams.dart index 8480ab29..d4100add 100644 --- a/lib/src/client/transport/web_streams.dart +++ b/lib/src/client/transport/web_streams.dart @@ -153,6 +153,7 @@ class _GrpcWebConversionSink implements ChunkedConversionSink { case _GrpcWebParseState.message: _parseMessage(chunkData); break; + // ignore: unreachable_switch_default default: // only expected to be hit when hot-restarting, see above break processingLoop; diff --git a/lib/src/shared/status.dart b/lib/src/shared/status.dart index e0474ac1..cb662326 100644 --- a/lib/src/shared/status.dart +++ b/lib/src/shared/status.dart @@ -352,6 +352,7 @@ class GrpcError implements Exception { /// This list comes from `error_details.proto`. If any new error detail types are /// added to the protbuf definition, this function should be updated accordingly to /// support them. +@visibleForTesting GeneratedMessage parseErrorDetailsFromAny(Any any) { switch (any.typeUrl) { case 'type.googleapis.com/google.rpc.RetryInfo': @@ -473,7 +474,7 @@ GrpcError? grpcErrorDetailsFromTrailers(Map trailers) { } Map toCustomTrailers(Map trailers) { - return Map.from(trailers) + return Map.of(trailers) ..remove(':status') ..remove('content-type') ..remove('grpc-status') diff --git a/pubspec.yaml b/pubspec.yaml index c7183bef..ae05bbcc 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,6 +1,6 @@ name: grpc description: Dart implementation of gRPC, a high performance, open-source universal RPC framework. -version: 4.0.1 +version: 4.0.2-wip repository: https://github.com/grpc/grpc-dart diff --git a/test/client_tests/call_test.dart b/test/client_tests/call_test.dart index c519f6e8..9ce832b5 100644 --- a/test/client_tests/call_test.dart +++ b/test/client_tests/call_test.dart @@ -13,10 +13,26 @@ // See the License for the specific language governing permissions and // limitations under the License. +import 'package:grpc/grpc.dart'; import 'package:grpc/src/client/call.dart'; import 'package:test/test.dart'; +import '../src/client_utils.dart'; + void main() { + const dummyValue = 0; + const cancelDurationMillis = 300; + + late ClientHarness harness; + + setUp(() { + harness = ClientHarness()..setUp(); + }); + + tearDown(() { + harness.tearDown(); + }); + test('WebCallOptions mergeWith CallOptions returns WebCallOptions', () { final options = WebCallOptions(bypassCorsPreflight: true, withCredentials: true); @@ -28,4 +44,57 @@ void main() { expect(mergedOptions.bypassCorsPreflight, true); expect(mergedOptions.withCredentials, true); }); + + test( + 'Cancelling a call correctly complete headers future', + () async { + final clientCall = harness.client.unary(dummyValue); + + Future.delayed( + Duration(milliseconds: cancelDurationMillis), + ).then((_) => clientCall.cancel()); + + expect(await clientCall.headers, isEmpty); + + await expectLater( + clientCall, + throwsA( + isA().having( + (e) => e.codeName, + 'Test codename', + contains('CANCELLED'), + ), + ), + ); + }, + ); + + test( + 'Cancelling a call correctly complete trailers futures', + () async { + final clientCall = harness.client.unary(dummyValue); + + Future.delayed( + Duration(milliseconds: cancelDurationMillis), + ).then((_) { + clientCall.cancel(); + }); + + expect( + await clientCall.trailers, + isEmpty, + ); + + await expectLater( + clientCall, + throwsA( + isA().having( + (e) => e.codeName, + 'Test codename', + contains('CANCELLED'), + ), + ), + ); + }, + ); } diff --git a/test/client_tests/client_xhr_transport_test.dart b/test/client_tests/client_xhr_transport_test.dart index ece79432..1c58077f 100644 --- a/test/client_tests/client_xhr_transport_test.dart +++ b/test/client_tests/client_xhr_transport_test.dart @@ -65,7 +65,7 @@ class MockHttpRequest extends Mock implements HttpRequest { class MockXhrClientConnection extends XhrClientConnection { MockXhrClientConnection({int? code}) : _statusCode = code ?? 200, - super(Uri.parse('test:8080')); + super(Uri.parse('test:0')); late MockHttpRequest latestRequest; final int _statusCode; diff --git a/test/client_tests/grpc_or_grpcweb_channel_grpc_test.dart b/test/client_tests/grpc_or_grpcweb_channel_grpc_test.dart index 1db9bdad..a2b6e4a2 100644 --- a/test/client_tests/grpc_or_grpcweb_channel_grpc_test.dart +++ b/test/client_tests/grpc_or_grpcweb_channel_grpc_test.dart @@ -20,7 +20,7 @@ import 'package:grpc/grpc_or_grpcweb.dart'; import 'package:test/test.dart'; const host = 'example.com'; -const port = 8080; +const port = 0; void main() { test('Channel on non-web uses gRPC ClientChannel with correct params', () { diff --git a/test/client_tests/grpc_or_grpcweb_channel_web_test.dart b/test/client_tests/grpc_or_grpcweb_channel_web_test.dart index 3bd9329d..8382219e 100644 --- a/test/client_tests/grpc_or_grpcweb_channel_web_test.dart +++ b/test/client_tests/grpc_or_grpcweb_channel_web_test.dart @@ -20,7 +20,7 @@ import 'package:grpc/grpc_web.dart'; import 'package:test/test.dart'; const host = 'example.com'; -const port = 8080; +const port = 0; void main() { test('Channel on web uses GrpcWebClientChannel with correct URI', () { diff --git a/test/keepalive_test.dart b/test/keepalive_test.dart index 7ffa1156..bf48f9b5 100644 --- a/test/keepalive_test.dart +++ b/test/keepalive_test.dart @@ -32,16 +32,20 @@ void main() { late EchoServiceClient fakeClient; late FakeClientChannel fakeChannel; late EchoServiceClient unresponsiveClient; - late ClientChannel unresponsiveChannel; + late FakeClientChannel unresponsiveChannel; + + final pingInterval = Duration(milliseconds: 10); + final timeout = Duration(milliseconds: 30); + final maxBadPings = 5; setUp(() async { final serverOptions = ServerKeepAliveOptions( - maxBadPings: 5, + maxBadPings: maxBadPings, minIntervalBetweenPingsWithoutData: Duration(milliseconds: 10), ); final clientOptions = ClientKeepAliveOptions( - pingInterval: Duration(milliseconds: 10), - timeout: Duration(milliseconds: 30), + pingInterval: pingInterval, + timeout: timeout, permitWithoutCalls: true, ); @@ -49,7 +53,7 @@ void main() { services: [FakeEchoService()], keepAliveOptions: serverOptions, ); - await server.serve(address: 'localhost', port: 8081); + await server.serve(address: 'localhost', port: 0); fakeChannel = FakeClientChannel( 'localhost', port: server.port!, @@ -79,7 +83,7 @@ void main() { test('Server terminates connection after too many pings without data', () async { await fakeClient.echo(EchoRequest()); - await Future.delayed(Duration(milliseconds: 300)); + await Future.delayed(timeout * maxBadPings * 2); await fakeClient.echo(EchoRequest()); // Check that the server closed the connection, the next request then has // to build a new one. @@ -88,23 +92,27 @@ void main() { test('Server doesnt terminate connection after pings, as data is sent', () async { - final timer = Timer.periodic( - Duration(milliseconds: 10), (timer) => fakeClient.echo(EchoRequest())); - await Future.delayed(Duration(milliseconds: 200), () => timer.cancel()); - - // Wait for last request to be sent - await Future.delayed(Duration(milliseconds: 20)); + for (var i = 0; i < 10; i++) { + await fakeClient.echo(EchoRequest()); + await Future.delayed(timeout * 0.2); + } // Check that the server never closed the connection expect(fakeChannel.newConnectionCounter, 1); }); - test('Server doesnt ack the ping, making the client shutdown the connection', + test('Server doesnt ack the ping, making the client shutdown the transport', () async { + //Send a first request, get a connection + await unresponsiveClient.echo(EchoRequest()); + expect(unresponsiveChannel.newConnectionCounter, 1); + + //Ping is not being acked on time + await Future.delayed(timeout * 2); + + //A second request gets a new connection await unresponsiveClient.echo(EchoRequest()); - await Future.delayed(Duration(milliseconds: 200)); - await expectLater( - unresponsiveClient.echo(EchoRequest()), throwsA(isA())); + expect(unresponsiveChannel.newConnectionCounter, 2); }); } @@ -113,7 +121,7 @@ class FakeClientChannel extends ClientChannel { FakeHttp2ClientConnection? fakeHttp2ClientConnection; FakeClientChannel( super.host, { - super.port = 443, + super.port, super.options = const ChannelOptions(), super.channelShutdownHandler, }); @@ -142,20 +150,23 @@ class FakeHttp2ClientConnection extends Http2ClientConnection { } /// A wrapper around a [FakeHttp2ClientConnection] -class UnresponsiveClientChannel extends ClientChannel { +class UnresponsiveClientChannel extends FakeClientChannel { UnresponsiveClientChannel( super.host, { - super.port = 443, + super.port, super.options = const ChannelOptions(), super.channelShutdownHandler, }); @override - ClientConnection createConnection() => - UnresponsiveHttp2ClientConnection(host, port, options); + ClientConnection createConnection() { + fakeHttp2ClientConnection = + UnresponsiveHttp2ClientConnection(host, port, options); + return fakeHttp2ClientConnection!; + } } -class UnresponsiveHttp2ClientConnection extends Http2ClientConnection { +class UnresponsiveHttp2ClientConnection extends FakeHttp2ClientConnection { UnresponsiveHttp2ClientConnection(super.host, super.port, super.options); @override @@ -189,8 +200,6 @@ class FakeEchoService extends EchoServiceBase { @override Stream serverStreamingEcho( - ServiceCall call, ServerStreamingEchoRequest request) { - // TODO: implement serverStreamingEcho - throw UnimplementedError(); - } + ServiceCall call, ServerStreamingEchoRequest request) => + throw UnsupportedError('Not used in this test'); } diff --git a/test/proxy_secure_test.dart b/test/proxy_secure_test.dart index d4deb252..806913b2 100644 --- a/test/proxy_secure_test.dart +++ b/test/proxy_secure_test.dart @@ -33,13 +33,13 @@ void main() { server = Server.create(services: [FakeEchoService()]); await server.serve( address: 'localhost', - port: 8888, + port: 0, security: ServerTlsCredentials( certificate: File('test/data/localhost.crt').readAsBytesSync(), privateKey: File('test/data/localhost.key').readAsBytesSync(), ), ); - final proxy = Proxy(host: 'localhost', port: 8080); + final proxy = Proxy(host: 'localhost', port: 0); final proxyCAName = '/CN=mitmproxy/O=mitmproxy'; fakeChannel = ClientChannel( diff --git a/test/proxy_test.dart b/test/proxy_test.dart index ec832af4..6fc30720 100644 --- a/test/proxy_test.dart +++ b/test/proxy_test.dart @@ -30,9 +30,9 @@ void main() { setUp(() async { server = Server.create(services: [FakeEchoService()]); - await server.serve(address: 'localhost', port: 8888); + await server.serve(address: 'localhost', port: 0); - final proxy = Proxy(host: 'localhost', port: 8080); + final proxy = Proxy(host: 'localhost', port: 0); fakeChannel = ClientChannel( 'localhost', diff --git a/test/server_cancellation_test.dart b/test/server_cancellation_test.dart index 4de5aa28..ab119bb2 100644 --- a/test/server_cancellation_test.dart +++ b/test/server_cancellation_test.dart @@ -47,7 +47,7 @@ void main() { server = Server.create( services: [EchoService()], ); - await server.serve(address: 'localhost', port: 8081); + await server.serve(address: 'localhost', port: 0); channel = ClientChannel( 'localhost', port: server.port!, diff --git a/test/tools/http2_client.dart b/test/tools/http2_client.dart index d7359cd2..9bd40045 100644 --- a/test/tools/http2_client.dart +++ b/test/tools/http2_client.dart @@ -20,7 +20,7 @@ import 'package:grpc/src/client/http2_connection.dart'; import 'package:http2/http2.dart'; Future main(List args) async { - final serverPort = 5678; + final serverPort = 0; final proxyPort = int.tryParse(args.first); final proxy = @@ -37,7 +37,7 @@ Future main(List args) async { final incoming = proxy == null ? connector.socket : await connector.connectToProxy(proxy); - final uri = Uri.parse('http://localhost:8080'); + final uri = Uri.parse('http://localhost:0'); final transport = ClientTransportConnection.viaStreams(incoming, connector.socket);