From 26b5dbb8dbbaccea04bca7cc866a4035a1b503d3 Mon Sep 17 00:00:00 2001 From: Steve Browne Date: Fri, 29 Apr 2022 15:12:46 -0400 Subject: [PATCH 01/11] Implemented a basic transport based on the fetch api to replace xhr. --- lib/src/client/transport/fetch_transport.dart | 343 ++++++++++++++++++ lib/src/client/web_channel.dart | 6 +- 2 files changed, 347 insertions(+), 2 deletions(-) create mode 100644 lib/src/client/transport/fetch_transport.dart diff --git a/lib/src/client/transport/fetch_transport.dart b/lib/src/client/transport/fetch_transport.dart new file mode 100644 index 00000000..8dc53637 --- /dev/null +++ b/lib/src/client/transport/fetch_transport.dart @@ -0,0 +1,343 @@ +// Copyright (c) 2022, the gRPC project authors. Please see the AUTHORS file +// for details. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'dart:async'; +import 'dart:html'; +import 'dart:js_util' as js_util; +import 'dart:typed_data'; + +import 'package:async/async.dart'; +import 'package:meta/meta.dart'; + +import '../../client/call.dart'; +import '../../shared/message.dart'; +import '../../shared/status.dart'; +import '../connection.dart'; +import 'cors.dart' as cors; +import 'transport.dart'; +import 'web_streams.dart'; + +const _contentTypeKey = 'Content-Type'; + +/// Implementation of Fetch API simulating @HttpRequest for minimal changes +class FetchHttpRequest { + // Request parameters + var method = 'GET'; + var uri = ''; + var referrerPolicy = 'origin'; + var mode = 'cors'; + var credentials = 'omit'; + var cache = 'default'; + var redirect = 'follow'; + var integrity = ''; + var keepAlive = true; + var headers = {}; + var readyState = HttpRequest.UNSENT; + set withCredentials(bool value) => credentials = value ? 'include' : 'omit'; + set responseType(String unused) {} + + // Streams and controllers + final onReadyStateChangeController = StreamController.broadcast(); + Stream get onReadyStateChange => onReadyStateChangeController.stream; + final onProgressController = StreamController.broadcast(); + Stream get onProgress => onProgressController.stream; + final onErrorController = StreamController.broadcast(); + Stream get onError => onErrorController.stream; + + // Response information + CancelableOperation? _cancelable; + dynamic _response; + dynamic get response => _response; + int get status => + response != null ? js_util.getProperty(response, 'status') : 0; + Map get responseHeaders => response != null + ? toDartMap(js_util.getProperty(response, 'headers')) + : {}; + String get responseText => + response != null ? js_util.getProperty(response, 'statusText') : ''; + dynamic get body => + response != null ? js_util.getProperty(response, 'body') : null; + + static Map toDartMap(Headers obj) => + Map.fromIterable(getObjectKeys(obj), + value: (key) => js_util.callMethod(obj, 'get', [key]).toString()); + + static List getObjectKeys(Headers obj) => + List.from(js_util.callMethod(obj, 'keys', [])); + + Future send([List? data]) async { + final wgs = WorkerGlobalScope.instance; + _setReadyState(HttpRequest.LOADING); + final init = { + 'method': method, + 'referrerPolicy': referrerPolicy, + 'mode': mode, + 'credentials': credentials, + 'cache': cache, + 'redirect': redirect, + 'integrity': integrity, + 'keepalive': keepAlive, + if (headers.isNotEmpty) 'headers': headers, + if (data != null) 'body': String.fromCharCodes(data), + }; + final operation = + _cancelable = CancelableOperation.fromFuture(wgs.fetch(uri, init)); + + _response = await operation.value; + _setReadyState(HttpRequest.HEADERS_RECEIVED); + if (status < 200 || status >= 300) { + onErrorController.add(status); + } + + final reader = body?.getReader(); + if (reader == null) { + onErrorController.add(0); + return; + } + + while (true) { + final result = await js_util.promiseToFuture(reader.read()); + final value = js_util.getProperty(result, 'value'); + if (value != null) { + onProgressController.add(value as Uint8List); + } + if (js_util.getProperty(result, 'done')) { + _setReadyState(HttpRequest.DONE); + break; + } + } + } + + void _setReadyState(int state) { + readyState = state; + onReadyStateChangeController.add(state); + if (state == HttpRequest.DONE) {} + } + + void open(String method, String uri) { + this.method = method; + this.uri = uri; + _setReadyState(HttpRequest.OPENED); + } + + void abort() async { + await _cancelable?.cancel(); + close(); + } + + void close() { + onReadyStateChangeController.close(); + onProgressController.close(); + onErrorController.close(); + } + + void setRequestHeader(String name, String value) { + headers[name] = value; + } + + void overrideMimeType(String mimeType) {} +} + +class FetchTransportStream implements GrpcTransportStream { + final FetchHttpRequest _request; + final ErrorHandler _onError; + final Function(FetchTransportStream stream) _onDone; + bool _headersReceived = false; + final StreamController _incomingProcessor = StreamController(); + final StreamController _incomingMessages = StreamController(); + final StreamController> _outgoingMessages = StreamController(); + + @override + Stream get incomingMessages => _incomingMessages.stream; + + @override + StreamSink> get outgoingMessages => _outgoingMessages.sink; + + FetchTransportStream(this._request, + {required ErrorHandler onError, required onDone}) + : _onError = onError, + _onDone = onDone { + _outgoingMessages.stream + .map(frame) + .listen((data) => _request.send(data), cancelOnError: true); + + _request.onReadyStateChange.listen((data) { + if (_incomingProcessor.isClosed) { + return; + } + switch (_request.readyState) { + case HttpRequest.HEADERS_RECEIVED: + _onHeadersReceived(); + break; + case HttpRequest.DONE: + _onRequestDone(); + _close(); + break; + } + }); + + _request.onError.listen((_) { + if (_incomingProcessor.isClosed) { + return; + } + _onError(GrpcError.unavailable('FetchTransportStream connection-error'), + StackTrace.current); + terminate(); + }); + + _request.onProgress.listen((bytes) { + if (_incomingProcessor.isClosed) { + return; + } + _incomingProcessor.add(bytes.buffer); + }); + + _incomingProcessor.stream + .transform(GrpcWebDecoder()) + .transform(grpcDecompressor()) + .listen(_incomingMessages.add, + onError: _onError, onDone: _incomingMessages.close); + } + + bool _validateResponseState() { + try { + validateHttpStatusAndContentType( + _request.status, _request.responseHeaders, + rawResponse: _request.responseText); + return true; + } catch (e, st) { + _onError(e, st); + return false; + } + } + + void _onHeadersReceived() { + _headersReceived = true; + if (!_validateResponseState()) { + return; + } + _incomingMessages.add(GrpcMetadata(_request.responseHeaders)); + } + + void _onRequestDone() { + if (!_headersReceived && !_validateResponseState()) { + return; + } + if (_request.response == null) { + _onError( + GrpcError.unavailable('FetchTransportStream request null response', + null, _request.responseText), + StackTrace.current); + return; + } + } + + void _close() { + _incomingProcessor.close(); + _outgoingMessages.close(); + _onDone(this); + } + + @override + Future terminate() async { + _close(); + _request.abort(); + } +} + +class FetchClientConnection extends ClientConnection { + final Uri uri; + + final _requests = {}; + + FetchClientConnection(this.uri); + + @override + String get authority => uri.authority; + @override + String get scheme => uri.scheme; + + void _initializeRequest( + FetchHttpRequest request, Map metadata) { + for (final header in metadata.keys) { + request.setRequestHeader(header, metadata[header]!); + } + // Overriding the mimetype allows us to stream and parse the data + request.overrideMimeType('text/plain; charset=x-user-defined'); + request.responseType = 'text'; + } + + @visibleForTesting + FetchHttpRequest createRequest() => FetchHttpRequest(); + + @override + GrpcTransportStream makeRequest(String path, Duration? timeout, + Map metadata, ErrorHandler onError, + {CallOptions? callOptions}) { + // gRPC-web headers. + if (_getContentTypeHeader(metadata) == null) { + metadata['Content-Type'] = 'application/grpc-web+proto'; + metadata['X-User-Agent'] = 'grpc-web-dart/0.1'; + metadata['X-Grpc-Web'] = '1'; + } + + var requestUri = uri.resolve(path); + if (callOptions is WebCallOptions && + callOptions.bypassCorsPreflight == true) { + requestUri = cors.moveHttpHeadersToQueryParam(metadata, requestUri); + } + + final request = createRequest(); + request.open('POST', requestUri.toString()); + if (callOptions is WebCallOptions && callOptions.withCredentials == true) { + request.withCredentials = true; + } + // Must set headers after calling open(). + _initializeRequest(request, metadata); + + final transportStream = + FetchTransportStream(request, onError: onError, onDone: _removeStream); + _requests.add(transportStream); + return transportStream; + } + + void _removeStream(FetchTransportStream stream) { + _requests.remove(stream); + } + + @override + Future terminate() async { + for (var request in List.of(_requests)) { + request.terminate(); + } + } + + @override + void dispatchCall(ClientCall call) { + call.onConnectionReady(this); + } + + @override + Future shutdown() async {} +} + +MapEntry? _getContentTypeHeader(Map metadata) { + for (var entry in metadata.entries) { + if (entry.key.toLowerCase() == _contentTypeKey.toLowerCase()) { + return entry; + } + } + return null; +} diff --git a/lib/src/client/web_channel.dart b/lib/src/client/web_channel.dart index f81da14f..1ce9824b 100644 --- a/lib/src/client/web_channel.dart +++ b/lib/src/client/web_channel.dart @@ -15,7 +15,8 @@ import 'channel.dart'; import 'connection.dart'; -import 'transport/xhr_transport.dart'; +import 'transport/fetch_transport.dart'; +//import 'transport/xhr_transport.dart'; /// A channel to a grpc-web endpoint. class GrpcWebClientChannel extends ClientChannelBase { @@ -25,6 +26,7 @@ class GrpcWebClientChannel extends ClientChannelBase { @override ClientConnection createConnection() { - return XhrClientConnection(uri); + //return XhrClientConnection(uri); + return FetchClientConnection(uri); } } From da178b3aff1e268f0fb68abc51165853b92524de Mon Sep 17 00:00:00 2001 From: Steve Browne Date: Sun, 1 May 2022 16:14:02 -0400 Subject: [PATCH 02/11] Implemented a hack using eval to call fetch so we can pass the body as a Uint8Array to support values larger than 127 which string encoding was messing up. --- lib/src/client/transport/fetch_transport.dart | 34 +++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/lib/src/client/transport/fetch_transport.dart b/lib/src/client/transport/fetch_transport.dart index 8dc53637..8ec070f9 100644 --- a/lib/src/client/transport/fetch_transport.dart +++ b/lib/src/client/transport/fetch_transport.dart @@ -14,6 +14,7 @@ // limitations under the License. import 'dart:async'; +import 'dart:convert'; import 'dart:html'; import 'dart:js_util' as js_util; import 'dart:typed_data'; @@ -80,20 +81,25 @@ class FetchHttpRequest { Future send([List? data]) async { final wgs = WorkerGlobalScope.instance; _setReadyState(HttpRequest.LOADING); - final init = { - 'method': method, - 'referrerPolicy': referrerPolicy, - 'mode': mode, - 'credentials': credentials, - 'cache': cache, - 'redirect': redirect, - 'integrity': integrity, - 'keepalive': keepAlive, - if (headers.isNotEmpty) 'headers': headers, - if (data != null) 'body': String.fromCharCodes(data), - }; - final operation = - _cancelable = CancelableOperation.fromFuture(wgs.fetch(uri, init)); + final headersStr = + headers.isNotEmpty ? '"headers": ${json.encode(headers)},' : ''; + final bodyStr = data != null ? '"body": Uint8Array.from($data),' : ''; + final initStr = '''{ + $headersStr + $bodyStr + "method": "$method", + "referrerPolicy": "$referrerPolicy", + "mode": "$mode", + "credentials": "$credentials", + "cache": "$cache", + "redirect": "$redirect", + "integrity": "$integrity", + "keepalive": $keepAlive + }'''; + + final promise = js_util.promiseToFuture( + js_util.callMethod(wgs, 'eval', ['fetch("$uri", $initStr)'])); + final operation = _cancelable = CancelableOperation.fromFuture(promise); _response = await operation.value; _setReadyState(HttpRequest.HEADERS_RECEIVED); From e627db5cb6d41a4c2dfb16c12e5e93b12d14ac7a Mon Sep 17 00:00:00 2001 From: Steve Browne Date: Sun, 1 May 2022 21:35:42 -0400 Subject: [PATCH 03/11] Created RequestInit js interop type to force body to be a Uint8List and marshal to javascript Uint8Array. --- lib/src/client/transport/fetch_transport.dart | 81 ++++++++++++++----- 1 file changed, 61 insertions(+), 20 deletions(-) diff --git a/lib/src/client/transport/fetch_transport.dart b/lib/src/client/transport/fetch_transport.dart index 8ec070f9..0c03f80e 100644 --- a/lib/src/client/transport/fetch_transport.dart +++ b/lib/src/client/transport/fetch_transport.dart @@ -14,12 +14,12 @@ // limitations under the License. import 'dart:async'; -import 'dart:convert'; import 'dart:html'; import 'dart:js_util' as js_util; import 'dart:typed_data'; import 'package:async/async.dart'; +import 'package:js/js.dart'; import 'package:meta/meta.dart'; import '../../client/call.dart'; @@ -32,6 +32,52 @@ import 'web_streams.dart'; const _contentTypeKey = 'Content-Type'; +@anonymous +@JS() +class RequestInit { + external factory RequestInit( + {required String method, + Object? headers, + List? body, + required String referrerPolicy, + required String mode, + required String credentials, + required String cache, + required String redirect, + required String integrity, + required bool keepalive}); + + external String get method; + external set method(String newValue); + + external Object? get headers; + external set headers(Object? newValue); + + external Uint8List? get body; + external set body(Uint8List? newValue); + + external String get referrerPolicy; + external set referrerPolicy(String newValue); + + external String get mode; + external set mode(String newValue); + + external String get credentials; + external set credentials(String newValue); + + external String get cache; + external set cache(String newValue); + + external String get redirect; + external set redirect(String newValue); + + external String get integrity; + external set integrity(String newValue); + + external bool get keepalive; + external set keepalive(bool newValue); +} + /// Implementation of Fetch API simulating @HttpRequest for minimal changes class FetchHttpRequest { // Request parameters @@ -81,25 +127,20 @@ class FetchHttpRequest { Future send([List? data]) async { final wgs = WorkerGlobalScope.instance; _setReadyState(HttpRequest.LOADING); - final headersStr = - headers.isNotEmpty ? '"headers": ${json.encode(headers)},' : ''; - final bodyStr = data != null ? '"body": Uint8Array.from($data),' : ''; - final initStr = '''{ - $headersStr - $bodyStr - "method": "$method", - "referrerPolicy": "$referrerPolicy", - "mode": "$mode", - "credentials": "$credentials", - "cache": "$cache", - "redirect": "$redirect", - "integrity": "$integrity", - "keepalive": $keepAlive - }'''; - - final promise = js_util.promiseToFuture( - js_util.callMethod(wgs, 'eval', ['fetch("$uri", $initStr)'])); - final operation = _cancelable = CancelableOperation.fromFuture(promise); + + final init = RequestInit( + cache: cache, + credentials: credentials, + integrity: integrity, + keepalive: keepAlive, + method: method, + mode: mode, + redirect: redirect, + referrerPolicy: referrerPolicy, + body: data, + headers: js_util.jsify(headers)); + final operation = _cancelable = CancelableOperation.fromFuture( + js_util.promiseToFuture(js_util.callMethod(wgs, 'fetch', [uri, init]))); _response = await operation.value; _setReadyState(HttpRequest.HEADERS_RECEIVED); From 32ad527340875b616f3a5858b292ba8ec5f9e4fc Mon Sep 17 00:00:00 2001 From: Steve Browne Date: Tue, 3 May 2022 17:10:36 -0400 Subject: [PATCH 04/11] Made some minor changes to better handle when a streaming request is aborted so that we stop reading gracefully. --- lib/src/client/transport/fetch_transport.dart | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/lib/src/client/transport/fetch_transport.dart b/lib/src/client/transport/fetch_transport.dart index 0c03f80e..bfad8051 100644 --- a/lib/src/client/transport/fetch_transport.dart +++ b/lib/src/client/transport/fetch_transport.dart @@ -104,7 +104,8 @@ class FetchHttpRequest { Stream get onError => onErrorController.stream; // Response information - CancelableOperation? _cancelable; + CancelableOperation? _cancelableFetch; + CancelableOperation? _cancelableSend; dynamic _response; dynamic get response => _response; int get status => @@ -125,6 +126,12 @@ class FetchHttpRequest { List.from(js_util.callMethod(obj, 'keys', [])); Future send([List? data]) async { + final doSend = _doSend(data); + _cancelableSend = CancelableOperation.fromFuture(doSend); + await doSend; + } + + Future _doSend([List? data]) async { final wgs = WorkerGlobalScope.instance; _setReadyState(HttpRequest.LOADING); @@ -139,11 +146,15 @@ class FetchHttpRequest { referrerPolicy: referrerPolicy, body: data, headers: js_util.jsify(headers)); - final operation = _cancelable = CancelableOperation.fromFuture( + final operation = _cancelableFetch = CancelableOperation.fromFuture( js_util.promiseToFuture(js_util.callMethod(wgs, 'fetch', [uri, init]))); _response = await operation.value; _setReadyState(HttpRequest.HEADERS_RECEIVED); + if (_cancelableSend?.isCanceled ?? false) { + return; + } + if (status < 200 || status >= 300) { onErrorController.add(status); } @@ -156,6 +167,9 @@ class FetchHttpRequest { while (true) { final result = await js_util.promiseToFuture(reader.read()); + if (_cancelableSend?.isCanceled ?? false) { + return; + } final value = js_util.getProperty(result, 'value'); if (value != null) { onProgressController.add(value as Uint8List); @@ -180,7 +194,8 @@ class FetchHttpRequest { } void abort() async { - await _cancelable?.cancel(); + await _cancelableFetch?.cancel(); + await _cancelableSend?.cancel(); close(); } From 20dfb827646a671091d33468a25f0a3ed408163c Mon Sep 17 00:00:00 2001 From: Steve Browne Date: Wed, 4 May 2022 12:49:46 -0400 Subject: [PATCH 05/11] Added fetch transport tests which are mostly the same as xhr, but with some minor changes because of the events. Also made some minor tweaks to fetch to have a String response for the tests like xhr. --- lib/src/client/transport/fetch_transport.dart | 17 +- .../client_fetch_transport_test.dart | 430 ++++++++++++++++++ 2 files changed, 440 insertions(+), 7 deletions(-) create mode 100644 test/client_tests/client_fetch_transport_test.dart diff --git a/lib/src/client/transport/fetch_transport.dart b/lib/src/client/transport/fetch_transport.dart index bfad8051..95cb44a1 100644 --- a/lib/src/client/transport/fetch_transport.dart +++ b/lib/src/client/transport/fetch_transport.dart @@ -14,6 +14,7 @@ // limitations under the License. import 'dart:async'; +import 'dart:convert'; import 'dart:html'; import 'dart:js_util' as js_util; import 'dart:typed_data'; @@ -107,16 +108,17 @@ class FetchHttpRequest { CancelableOperation? _cancelableFetch; CancelableOperation? _cancelableSend; dynamic _response; - dynamic get response => _response; + Uint8List? _lastResponse; + String? get response => responseText; int get status => - response != null ? js_util.getProperty(response, 'status') : 0; - Map get responseHeaders => response != null - ? toDartMap(js_util.getProperty(response, 'headers')) + _response != null ? js_util.getProperty(_response, 'status') : 0; + Map get responseHeaders => _response != null + ? toDartMap(js_util.getProperty(_response, 'headers')) : {}; - String get responseText => - response != null ? js_util.getProperty(response, 'statusText') : ''; + String? get responseText => + _lastResponse != null ? utf8.decode(_lastResponse!) : null; dynamic get body => - response != null ? js_util.getProperty(response, 'body') : null; + _response != null ? js_util.getProperty(_response, 'body') : null; static Map toDartMap(Headers obj) => Map.fromIterable(getObjectKeys(obj), @@ -172,6 +174,7 @@ class FetchHttpRequest { } final value = js_util.getProperty(result, 'value'); if (value != null) { + _lastResponse = value; onProgressController.add(value as Uint8List); } if (js_util.getProperty(result, 'done')) { diff --git a/test/client_tests/client_fetch_transport_test.dart b/test/client_tests/client_fetch_transport_test.dart new file mode 100644 index 00000000..ccfe907b --- /dev/null +++ b/test/client_tests/client_fetch_transport_test.dart @@ -0,0 +1,430 @@ +// Copyright (c) 2017, the gRPC project authors. Please see the AUTHORS file +// for details. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +@TestOn('browser') + +import 'dart:async'; + +import 'dart:html'; +import 'dart:typed_data'; + +import 'package:async/async.dart'; +import 'package:grpc/src/client/call.dart'; +import 'package:grpc/src/client/transport/fetch_transport.dart'; +import 'package:grpc/src/shared/message.dart'; +import 'package:grpc/src/shared/status.dart'; +import 'package:mockito/mockito.dart'; +import 'package:stream_transform/stream_transform.dart'; + +import 'package:test/test.dart'; + +class MockFetchRequest extends Mock implements FetchHttpRequest { + MockFetchRequest({int? code}) : status = code ?? 200; + // ignore: close_sinks + final readyStateChangeController = StreamController(); + // ignore: close_sinks + final progressController = StreamController(); + // ignore: close_sinks + final errorController = StreamController(); + + @override + Stream get onReadyStateChange => readyStateChangeController.stream; + + @override + Stream get onProgress => progressController.stream; + + @override + Stream get onError => errorController.stream; + + @override + final int status; + + @override + int get readyState => + super.noSuchMethod(Invocation.getter(#readyState), returnValue: -1); + + @override + String? get response => + super.noSuchMethod(Invocation.getter(#response), returnValue: null); + + @override + Map get responseHeaders => + super.noSuchMethod(Invocation.getter(#responseHeaders), + returnValue: {}); +} + +class MockFetchClientConnection extends FetchClientConnection { + MockFetchClientConnection({int? code}) + : _statusCode = code ?? 200, + super(Uri.parse('test:8080')); + + late MockFetchRequest latestRequest; + final int _statusCode; + + @override + FetchHttpRequest createRequest() { + final request = MockFetchRequest(code: _statusCode); + latestRequest = request; + return request; + } +} + +void main() { + test('Make request sends correct headers', () async { + final metadata = { + 'parameter_1': 'value_1', + 'parameter_2': 'value_2' + }; + + final connection = MockFetchClientConnection(); + + connection.makeRequest('path', Duration(seconds: 10), metadata, + (error, _) => fail(error.toString())); + + verify(connection.latestRequest + .setRequestHeader('Content-Type', 'application/grpc-web+proto')); + verify(connection.latestRequest + .setRequestHeader('X-User-Agent', 'grpc-web-dart/0.1')); + verify(connection.latestRequest.setRequestHeader('X-Grpc-Web', '1')); + verify(connection.latestRequest + .overrideMimeType('text/plain; charset=x-user-defined')); + verify(connection.latestRequest.responseType = 'text'); + }); + + test( + 'Make request sends correct headers and path if bypassCorsPreflight=true', + () async { + final metadata = {'header_1': 'value_1', 'header_2': 'value_2'}; + final connection = MockFetchClientConnection(); + + connection.makeRequest('path', Duration(seconds: 10), metadata, + (error, _) => fail(error.toString()), + callOptions: WebCallOptions(bypassCorsPreflight: true)); + + expect(metadata, isEmpty); + verify(connection.latestRequest.open('POST', + 'test:path?%24httpHeaders=header_1%3Avalue_1%0D%0Aheader_2%3Avalue_2%0D%0AContent-Type%3Aapplication%2Fgrpc-web%2Bproto%0D%0AX-User-Agent%3Agrpc-web-dart%2F0.1%0D%0AX-Grpc-Web%3A1%0D%0A')); + verify(connection.latestRequest + .overrideMimeType('text/plain; charset=x-user-defined')); + verify(connection.latestRequest.responseType = 'text'); + }); + + test( + 'Make request sends correct headers if call options already have ' + 'Content-Type header', () async { + final metadata = { + 'header_1': 'value_1', + 'header_2': 'value_2', + 'Content-Type': 'application/json+protobuf' + }; + final connection = MockFetchClientConnection(); + + connection.makeRequest('/path', Duration(seconds: 10), metadata, + (error, _) => fail(error.toString())); + + expect(metadata, { + 'header_1': 'value_1', + 'header_2': 'value_2', + 'Content-Type': 'application/json+protobuf', + }); + }); + + test('Content-Type header case insensitivity', () async { + final metadata = { + 'header_1': 'value_1', + 'CONTENT-TYPE': 'application/json+protobuf' + }; + final connection = MockFetchClientConnection(); + + connection.makeRequest('/path', Duration(seconds: 10), metadata, + (error, _) => fail(error.toString())); + expect(metadata, { + 'header_1': 'value_1', + 'CONTENT-TYPE': 'application/json+protobuf', + }); + + final lowerMetadata = { + 'header_1': 'value_1', + 'content-type': 'application/json+protobuf' + }; + connection.makeRequest('/path', Duration(seconds: 10), lowerMetadata, + (error, _) => fail(error.toString())); + expect(lowerMetadata, { + 'header_1': 'value_1', + 'content-type': 'application/json+protobuf', + }); + }); + + test('Make request sends correct headers path if only withCredentials=true', + () async { + final metadata = {'header_1': 'value_1', 'header_2': 'value_2'}; + final connection = MockFetchClientConnection(); + + connection.makeRequest('path', Duration(seconds: 10), metadata, + (error, _) => fail(error.toString()), + callOptions: WebCallOptions(withCredentials: true)); + + expect(metadata, { + 'header_1': 'value_1', + 'header_2': 'value_2', + 'Content-Type': 'application/grpc-web+proto', + 'X-User-Agent': 'grpc-web-dart/0.1', + 'X-Grpc-Web': '1' + }); + verify(connection.latestRequest + .setRequestHeader('Content-Type', 'application/grpc-web+proto')); + verify(connection.latestRequest + .setRequestHeader('X-User-Agent', 'grpc-web-dart/0.1')); + verify(connection.latestRequest.setRequestHeader('X-Grpc-Web', '1')); + verify(connection.latestRequest.open('POST', 'test:path')); + verify(connection.latestRequest.withCredentials = true); + verify(connection.latestRequest + .overrideMimeType('text/plain; charset=x-user-defined')); + verify(connection.latestRequest.responseType = 'text'); + }); + + test('Sent data converted to stream properly', () async { + final metadata = { + 'parameter_1': 'value_1', + 'parameter_2': 'value_2' + }; + + final connection = MockFetchClientConnection(); + + final stream = connection.makeRequest('path', Duration(seconds: 10), + metadata, (error, _) => fail(error.toString())); + + final data = List.filled(10, 0); + stream.outgoingMessages.add(data); + await stream.terminate(); + + final expectedData = frame(data); + expect(verify(connection.latestRequest.send(captureAny)).captured.single, + expectedData); + }); + + test('Stream handles headers properly', () async { + final responseHeaders = { + 'parameter_1': 'value_1', + 'parameter_2': 'value_2', + 'content-type': 'application/grpc+proto', + }; + + final transport = MockFetchClientConnection(); + + final stream = transport.makeRequest('test_path', Duration(seconds: 10), {}, + (error, _) => fail(error.toString())); + + when(transport.latestRequest.responseHeaders).thenReturn(responseHeaders); + when(transport.latestRequest.response) + .thenReturn(String.fromCharCodes(frame([]))); + + // Set expectation for request readyState and generate two readyStateChange + // events, so that incomingMessages stream completes. + final readyStates = [HttpRequest.HEADERS_RECEIVED, HttpRequest.DONE]; + when(transport.latestRequest.readyState) + .thenAnswer((_) => readyStates.removeAt(0)); + transport.latestRequest.readyStateChangeController.add(readyStates.first); + transport.latestRequest.readyStateChangeController.add(readyStates.first); + + // Should be only one metadata message with headers augmented with :status + // field. + final message = await stream.incomingMessages.single as GrpcMetadata; + expect(message.metadata, responseHeaders); + }); + + test('Stream handles trailers properly', () async { + final requestHeaders = { + 'parameter_1': 'value_1', + 'content-type': 'application/grpc+proto', + }; + final responseTrailers = { + 'trailer_1': 'value_1', + 'trailer_2': 'value_2', + }; + + final connection = MockFetchClientConnection(); + + final stream = connection.makeRequest('test_path', Duration(seconds: 10), + requestHeaders, (error, _) => fail(error.toString())); + + final encodedTrailers = frame(responseTrailers.entries + .map((e) => '${e.key}:${e.value}') + .join('\r\n') + .codeUnits); + encodedTrailers[0] = 0x80; // Mark this frame as trailers. + final encodedString = String.fromCharCodes(encodedTrailers); + + when(connection.latestRequest.responseHeaders).thenReturn(requestHeaders); + when(connection.latestRequest.response).thenReturn(encodedString); + + // Set expectation for request readyState and generate events so that + // incomingMessages stream completes. + final readyStates = [HttpRequest.HEADERS_RECEIVED, HttpRequest.DONE]; + when(connection.latestRequest.readyState) + .thenAnswer((_) => readyStates.removeAt(0)); + connection.latestRequest.readyStateChangeController.add(readyStates.first); + connection.latestRequest.progressController + .add(Uint8List.fromList(encodedTrailers)); + connection.latestRequest.readyStateChangeController.add(readyStates.first); + + // Should be two metadata messages: headers and trailers. + final messages = + await stream.incomingMessages.whereType().toList(); + expect(messages.length, 2); + expect(messages.first.metadata, requestHeaders); + expect(messages.last.metadata, responseTrailers); + }); + + test('Stream handles empty trailers properly', () async { + final requestHeaders = { + 'content-type': 'application/grpc+proto', + }; + + final connection = MockFetchClientConnection(); + + final stream = connection.makeRequest('test_path', Duration(seconds: 10), + {}, (error, _) => fail(error.toString())); + + final encoded = frame(''.codeUnits); + encoded[0] = 0x80; // Mark this frame as trailers. + final encodedString = String.fromCharCodes(encoded); + + when(connection.latestRequest.responseHeaders).thenReturn(requestHeaders); + when(connection.latestRequest.response).thenReturn(encodedString); + + // Set expectation for request readyState and generate events so that + // incomingMessages stream completes. + final readyStates = [HttpRequest.HEADERS_RECEIVED, HttpRequest.DONE]; + when(connection.latestRequest.readyState) + .thenAnswer((_) => readyStates.removeAt(0)); + connection.latestRequest.readyStateChangeController.add(readyStates.first); + connection.latestRequest.progressController + .add(Uint8List.fromList(encoded)); + connection.latestRequest.readyStateChangeController.add(readyStates.first); + + // Should be two metadata messages: headers and trailers. + final messages = + await stream.incomingMessages.whereType().toList(); + expect(messages.length, 2); + expect(messages.first.metadata, requestHeaders); + expect(messages.last.metadata, isEmpty); + }); + + test('Stream deserializes data properly', () async { + final requestHeaders = { + 'parameter_1': 'value_1', + 'parameter_2': 'value_2', + 'content-type': 'application/grpc+proto', + }; + + final connection = MockFetchClientConnection(); + + final stream = connection.makeRequest('test_path', Duration(seconds: 10), + requestHeaders, (error, _) => fail(error.toString())); + final data = List.filled(10, 224); + when(connection.latestRequest.responseHeaders).thenReturn(requestHeaders); + when(connection.latestRequest.response) + .thenReturn(String.fromCharCodes(frame(data))); + + // Set expectation for request readyState and generate events, so that + // incomingMessages stream completes. + final readyStates = [HttpRequest.HEADERS_RECEIVED, HttpRequest.DONE]; + when(connection.latestRequest.readyState) + .thenAnswer((_) => readyStates.removeAt(0)); + connection.latestRequest.readyStateChangeController.add(readyStates.first); + connection.latestRequest.progressController + .add(Uint8List.fromList(frame(data))); + connection.latestRequest.readyStateChangeController.add(readyStates.first); + + // Expect a single data message. + final message = await stream.incomingMessages.whereType().single; + expect(message.data, data); + }); + + test('GrpcError with error details in response', () async { + final connection = MockFetchClientConnection(code: 400); + final errors = []; + // The incoming messages stream never completes when there's an error, so + // using completer. + final errorReceived = Completer(); + connection.makeRequest('test_path', Duration(seconds: 10), {}, (e, _) { + errorReceived.complete(); + errors.add(e as GrpcError); + }); + const errorDetails = 'error details'; + when(connection.latestRequest.responseHeaders) + .thenReturn({'content-type': 'application/grpc+proto'}); + when(connection.latestRequest.readyState).thenReturn(HttpRequest.DONE); + when(connection.latestRequest.responseText).thenReturn(errorDetails); + connection.latestRequest.readyStateChangeController.add(HttpRequest.DONE); + await errorReceived; + expect(errors.single.rawResponse, errorDetails); + }); + + test('Stream receives multiple messages', () async { + final metadata = { + 'parameter_1': 'value_1', + 'parameter_2': 'value_2', + 'content-type': 'application/grpc+proto', + }; + + final connection = MockFetchClientConnection(); + + final stream = connection.makeRequest('test_path', Duration(seconds: 10), + metadata, (error, _) => fail(error.toString())); + + final data = >[ + List.filled(10, 224), + List.filled(5, 124) + ]; + final encodedStrings = + data.map((d) => String.fromCharCodes(frame(d))).toList(); + + when(connection.latestRequest.responseHeaders).thenReturn(metadata); + when(connection.latestRequest.readyState) + .thenReturn(HttpRequest.HEADERS_RECEIVED); + + // At first invocation the response should be the the first message, after + // that first + last messages. + var first = true; + when(connection.latestRequest.response).thenAnswer((_) { + if (first) { + first = false; + return encodedStrings[0]; + } + return encodedStrings[0] + encodedStrings[1]; + }); + + final readyStates = [HttpRequest.HEADERS_RECEIVED, HttpRequest.DONE]; + when(connection.latestRequest.readyState) + .thenAnswer((_) => readyStates.removeAt(0)); + + final queue = StreamQueue(stream.incomingMessages); + // Headers. + connection.latestRequest.readyStateChangeController.add(readyStates.first); + expect(((await queue.next) as GrpcMetadata).metadata, metadata); + // Data 1. + connection.latestRequest.progressController + .add(Uint8List.fromList(frame(data[0]))); + expect(((await queue.next) as GrpcData).data, data[0]); + // Data 2. + connection.latestRequest.progressController + .add(Uint8List.fromList(frame(data[1]))); + expect(((await queue.next) as GrpcData).data, data[1]); + // Done. + connection.latestRequest.readyStateChangeController.add(readyStates.first); + expect(await queue.hasNext, isFalse); + }); +} From 824ef8525b39f7de48e49161f0e0fed155a37001 Mon Sep 17 00:00:00 2001 From: Steve Browne Date: Thu, 5 May 2022 09:31:36 -0400 Subject: [PATCH 06/11] Ignore missing_js_lib_annotation. This shows up with dart SDK 2.12, but not with 2.16 which is what I was using during development. --- lib/src/client/transport/fetch_transport.dart | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/src/client/transport/fetch_transport.dart b/lib/src/client/transport/fetch_transport.dart index 95cb44a1..c8085fea 100644 --- a/lib/src/client/transport/fetch_transport.dart +++ b/lib/src/client/transport/fetch_transport.dart @@ -34,6 +34,7 @@ import 'web_streams.dart'; const _contentTypeKey = 'Content-Type'; @anonymous +// ignore: missing_js_lib_annotation @JS() class RequestInit { external factory RequestInit( From 5f0e9ed1c0735494e6a7d34bdf68c5cf6aa2e1c0 Mon Sep 17 00:00:00 2001 From: Steve Browne Date: Wed, 11 May 2022 11:58:44 -0400 Subject: [PATCH 07/11] Allow malformed in utf8 decode because the responseText may not always be valid utf8. --- lib/src/client/transport/fetch_transport.dart | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/src/client/transport/fetch_transport.dart b/lib/src/client/transport/fetch_transport.dart index c8085fea..96daded8 100644 --- a/lib/src/client/transport/fetch_transport.dart +++ b/lib/src/client/transport/fetch_transport.dart @@ -116,8 +116,9 @@ class FetchHttpRequest { Map get responseHeaders => _response != null ? toDartMap(js_util.getProperty(_response, 'headers')) : {}; - String? get responseText => - _lastResponse != null ? utf8.decode(_lastResponse!) : null; + String? get responseText => _lastResponse != null + ? utf8.decode(_lastResponse!, allowMalformed: true) + : null; dynamic get body => _response != null ? js_util.getProperty(_response, 'body') : null; From 5fa142b1493d57c455caf0624cade42347b90d25 Mon Sep 17 00:00:00 2001 From: Steve Browne Date: Thu, 12 May 2022 09:09:02 -0400 Subject: [PATCH 08/11] Prevent empty responses from triggering a grpc error for null responses. --- lib/src/client/transport/fetch_transport.dart | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/src/client/transport/fetch_transport.dart b/lib/src/client/transport/fetch_transport.dart index 96daded8..3cc4f5ba 100644 --- a/lib/src/client/transport/fetch_transport.dart +++ b/lib/src/client/transport/fetch_transport.dart @@ -180,6 +180,7 @@ class FetchHttpRequest { onProgressController.add(value as Uint8List); } if (js_util.getProperty(result, 'done')) { + _lastResponse ??= Uint8List(0); _setReadyState(HttpRequest.DONE); break; } From 9384f07cdf525bc72dd2502a6f6991e736ab1434 Mon Sep 17 00:00:00 2001 From: Steve Browne Date: Thu, 16 Jun 2022 09:43:44 -0400 Subject: [PATCH 09/11] Iterate through the javascript iterator manually if the keys are not an iterable type. This apparently changed with flutter 3.0. --- lib/src/client/transport/fetch_transport.dart | 20 +++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/lib/src/client/transport/fetch_transport.dart b/lib/src/client/transport/fetch_transport.dart index 3cc4f5ba..0f029b3d 100644 --- a/lib/src/client/transport/fetch_transport.dart +++ b/lib/src/client/transport/fetch_transport.dart @@ -126,8 +126,24 @@ class FetchHttpRequest { Map.fromIterable(getObjectKeys(obj), value: (key) => js_util.callMethod(obj, 'get', [key]).toString()); - static List getObjectKeys(Headers obj) => - List.from(js_util.callMethod(obj, 'keys', [])); + static List getObjectKeys(Headers obj) { + final keys = js_util.callMethod(obj, 'keys', []); + // This used to work prior to flutter 3.0 now we type check to see if supported + if (keys is Iterable) { + return List.from(keys); + } + + // Otherwise we have to fall back and manually iterate through the javascript iterator + final res = List.empty(growable: true); + while (true) { + final next = js_util.callMethod(keys, 'next', []); + if (js_util.getProperty(next, 'done')) { + break; + } + res.add(js_util.getProperty(next, 'value').toString()); + } + return res; + } Future send([List? data]) async { final doSend = _doSend(data); From 5a8818b5271a5294ce2a059d6c962c3940d5f632 Mon Sep 17 00:00:00 2001 From: Steve Browne Date: Tue, 5 Jul 2022 12:22:15 -0400 Subject: [PATCH 10/11] Use js_util.callMethod rather than calling methods directly on the dynamic javascript types because the resulting code does not work when building web in release mode. --- lib/src/client/transport/fetch_transport.dart | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lib/src/client/transport/fetch_transport.dart b/lib/src/client/transport/fetch_transport.dart index 0f029b3d..42f57658 100644 --- a/lib/src/client/transport/fetch_transport.dart +++ b/lib/src/client/transport/fetch_transport.dart @@ -179,14 +179,17 @@ class FetchHttpRequest { onErrorController.add(status); } - final reader = body?.getReader(); + final stream = body; + final reader = + stream != null ? js_util.callMethod(stream, 'getReader', []) : null; if (reader == null) { onErrorController.add(0); return; } while (true) { - final result = await js_util.promiseToFuture(reader.read()); + final result = + await js_util.promiseToFuture(js_util.callMethod(reader, 'read', [])); if (_cancelableSend?.isCanceled ?? false) { return; } From 3b409ad3cbede8bdc4aea8265e01d06e42721942 Mon Sep 17 00:00:00 2001 From: Steve Browne Date: Fri, 15 Jul 2022 08:50:07 -0400 Subject: [PATCH 11/11] Use AbortController and AbortSignal to properly cancel fetches. --- lib/src/client/transport/fetch_transport.dart | 43 ++++++++++++++----- 1 file changed, 33 insertions(+), 10 deletions(-) diff --git a/lib/src/client/transport/fetch_transport.dart b/lib/src/client/transport/fetch_transport.dart index 42f57658..3a2bcf26 100644 --- a/lib/src/client/transport/fetch_transport.dart +++ b/lib/src/client/transport/fetch_transport.dart @@ -33,6 +33,19 @@ import 'web_streams.dart'; const _contentTypeKey = 'Content-Type'; +@JS() +class AbortSignal { + external factory AbortSignal(); + external bool get aborted; +} + +@JS() +class AbortController { + external factory AbortController(); + external void abort([dynamic reason]); + external AbortSignal get signal; +} + @anonymous // ignore: missing_js_lib_annotation @JS() @@ -41,6 +54,7 @@ class RequestInit { {required String method, Object? headers, List? body, + AbortSignal? signal, required String referrerPolicy, required String mode, required String credentials, @@ -58,6 +72,9 @@ class RequestInit { external Uint8List? get body; external set body(Uint8List? newValue); + external AbortSignal? get signal; + external set signal(AbortSignal? newValue); + external String get referrerPolicy; external set referrerPolicy(String newValue); @@ -106,7 +123,7 @@ class FetchHttpRequest { Stream get onError => onErrorController.stream; // Response information - CancelableOperation? _cancelableFetch; + AbortController? _abortController; CancelableOperation? _cancelableSend; dynamic _response; Uint8List? _lastResponse; @@ -155,6 +172,7 @@ class FetchHttpRequest { final wgs = WorkerGlobalScope.instance; _setReadyState(HttpRequest.LOADING); + _abortController = AbortController(); final init = RequestInit( cache: cache, credentials: credentials, @@ -164,17 +182,19 @@ class FetchHttpRequest { mode: mode, redirect: redirect, referrerPolicy: referrerPolicy, + signal: _abortController?.signal, body: data, headers: js_util.jsify(headers)); - final operation = _cancelableFetch = CancelableOperation.fromFuture( - js_util.promiseToFuture(js_util.callMethod(wgs, 'fetch', [uri, init]))); - _response = await operation.value; - _setReadyState(HttpRequest.HEADERS_RECEIVED); - if (_cancelableSend?.isCanceled ?? false) { + _response = await js_util + .promiseToFuture(js_util.callMethod(wgs, 'fetch', [uri, init])) + .onError((error, stackTrace) => null, + test: (error) => _abortController?.signal.aborted ?? false); + if (_response == null || (_cancelableSend?.isCanceled ?? false)) { return; } + _setReadyState(HttpRequest.HEADERS_RECEIVED); if (status < 200 || status >= 300) { onErrorController.add(status); } @@ -188,9 +208,11 @@ class FetchHttpRequest { } while (true) { - final result = - await js_util.promiseToFuture(js_util.callMethod(reader, 'read', [])); - if (_cancelableSend?.isCanceled ?? false) { + final result = await js_util + .promiseToFuture(js_util.callMethod(reader, 'read', [])) + .onError((error, stackTrace) => null, + test: (error) => _abortController?.signal.aborted ?? false); + if (result == null || (_cancelableSend?.isCanceled ?? false)) { return; } final value = js_util.getProperty(result, 'value'); @@ -219,7 +241,7 @@ class FetchHttpRequest { } void abort() async { - await _cancelableFetch?.cancel(); + _abortController?.abort(); await _cancelableSend?.cancel(); close(); } @@ -228,6 +250,7 @@ class FetchHttpRequest { onReadyStateChangeController.close(); onProgressController.close(); onErrorController.close(); + _response = null; } void setRequestHeader(String name, String value) {