diff --git a/packages/amplify/amplify_flutter/lib/amplify_flutter.dart b/packages/amplify/amplify_flutter/lib/amplify_flutter.dart index c8cc4241dc..f197054c2b 100644 --- a/packages/amplify/amplify_flutter/lib/amplify_flutter.dart +++ b/packages/amplify/amplify_flutter/lib/amplify_flutter.dart @@ -6,7 +6,7 @@ library amplify_flutter; import 'package:amplify_core/amplify_core.dart'; import 'package:amplify_flutter/src/amplify_impl.dart'; -export 'package:amplify_core/amplify_core.dart' hide Amplify; +export 'package:amplify_core/amplify_core.dart' hide Amplify, WebSocketOptions; export 'package:amplify_secure_storage/amplify_secure_storage.dart'; /// Top level singleton Amplify object. diff --git a/packages/amplify_core/lib/amplify_core.dart b/packages/amplify_core/lib/amplify_core.dart index 745f3aedf0..1fed515499 100644 --- a/packages/amplify_core/lib/amplify_core.dart +++ b/packages/amplify_core/lib/amplify_core.dart @@ -64,7 +64,9 @@ export 'src/state_machine/transition.dart'; export 'src/types/analytics/analytics_types.dart'; /// API -export 'src/types/api/api_types.dart'; +export 'src/types/api/api_types.dart' hide WebSocketOptions; +// ignore: invalid_export_of_internal_element +export 'src/types/api/api_types.dart' show WebSocketOptions; /// App path provider export 'src/types/app_path_provider/app_path_provider.dart'; diff --git a/packages/amplify_core/lib/src/types/api/api_types.dart b/packages/amplify_core/lib/src/types/api/api_types.dart index 4f6b8a0408..f5fb215a37 100644 --- a/packages/amplify_core/lib/src/types/api/api_types.dart +++ b/packages/amplify_core/lib/src/types/api/api_types.dart @@ -22,6 +22,7 @@ export 'graphql/graphql_response.dart'; export 'graphql/graphql_response_error.dart'; export 'graphql/graphql_subscription_operation.dart'; export 'graphql/graphql_subscription_options.dart'; +export 'graphql/web_socket_options.dart'; export 'hub/api_hub_event.dart'; export 'hub/api_subscription_hub_event.dart'; // Types diff --git a/packages/amplify_core/lib/src/types/api/graphql/web_socket_options.dart b/packages/amplify_core/lib/src/types/api/graphql/web_socket_options.dart new file mode 100644 index 0000000000..eee15bd339 --- /dev/null +++ b/packages/amplify_core/lib/src/types/api/graphql/web_socket_options.dart @@ -0,0 +1,21 @@ +import 'package:meta/meta.dart'; // Importing the 'meta' package to use the @internal annotation + +/// An internal class to control websocket features after API plugin has been initialized. +@internal +class WebSocketOptions { + /// Private constructor to prevent instantiation + WebSocketOptions._(); + + /// Private static boolean field + static bool _autoReconnect = true; + + /// Static getter method for the boolean field + @internal + static bool get autoReconnect => _autoReconnect; + + /// Static setter method for the boolean field + @internal + static set autoReconnect(bool value) { + _autoReconnect = value; + } +} diff --git a/packages/amplify_datastore/android/src/main/kotlin/com/amazonaws/amplify/amplify_datastore/pigeons/NativePluginBindings.kt b/packages/amplify_datastore/android/src/main/kotlin/com/amazonaws/amplify/amplify_datastore/pigeons/NativePluginBindings.kt index 0f82a922a5..6b936216b1 100644 --- a/packages/amplify_datastore/android/src/main/kotlin/com/amazonaws/amplify/amplify_datastore/pigeons/NativePluginBindings.kt +++ b/packages/amplify_datastore/android/src/main/kotlin/com/amazonaws/amplify/amplify_datastore/pigeons/NativePluginBindings.kt @@ -387,6 +387,12 @@ class NativeApiPlugin(private val binaryMessenger: BinaryMessenger) { callback() } } + fun deviceOffline(callback: () -> Unit) { + val channel = BasicMessageChannel(binaryMessenger, "dev.flutter.pigeon.amplify_datastore.NativeApiPlugin.deviceOffline", codec) + channel.send(null) { + callback() + } + } fun onStop(callback: () -> Unit) { val channel = BasicMessageChannel(binaryMessenger, "dev.flutter.pigeon.amplify_datastore.NativeApiPlugin.onStop", codec) channel.send(null) { diff --git a/packages/amplify_datastore/example/ios/Runner.xcodeproj/xcshareddata/xcschemes/Runner.xcscheme b/packages/amplify_datastore/example/ios/Runner.xcodeproj/xcshareddata/xcschemes/Runner.xcscheme index ca2087b9e8..39bfae25b4 100644 --- a/packages/amplify_datastore/example/ios/Runner.xcodeproj/xcshareddata/xcschemes/Runner.xcscheme +++ b/packages/amplify_datastore/example/ios/Runner.xcodeproj/xcshareddata/xcschemes/Runner.xcscheme @@ -70,6 +70,13 @@ ReferencedContainer = "container:Runner.xcodeproj"> + + + + + CADisableMinimumFrameDurationOnPhone + CFBundleDevelopmentRegion $(DEVELOPMENT_LANGUAGE) CFBundleDisplayName @@ -24,6 +26,8 @@ $(FLUTTER_BUILD_NUMBER) LSRequiresIPhoneOS + UIApplicationSupportsIndirectInputEvents + UILaunchStoryboardName LaunchScreen UIMainStoryboardFile @@ -43,9 +47,5 @@ UIViewControllerBasedStatusBarAppearance - CADisableMinimumFrameDurationOnPhone - - UIApplicationSupportsIndirectInputEvents - diff --git a/packages/amplify_datastore/example/lib/main.dart b/packages/amplify_datastore/example/lib/main.dart index 08c6487b77..3789a5d1d5 100644 --- a/packages/amplify_datastore/example/lib/main.dart +++ b/packages/amplify_datastore/example/lib/main.dart @@ -162,11 +162,12 @@ class _MyAppState extends State { void listenToHub() { setState(() { hubSubscription = Amplify.Hub.listen(HubChannel.DataStore, (msg) { - if (msg.type case DataStoreHubEventType.networkStatus) { - print('Network status message: $msg'); + final payload = msg.payload; + if (payload is NetworkStatusEvent) { + print('Network status active: ${payload.active}'); return; } - print(msg); + print(msg.type); }); _listeningToHub = true; }); @@ -317,6 +318,9 @@ class _MyAppState extends State { displayQueryButtons( _isAmplifyConfigured, _queriesToView, updateQueriesToView), + // Start/Stop/Clear buttons + displaySyncButtons(), + Padding(padding: EdgeInsets.all(5.0)), Text("Listen to DataStore Hub"), Switch( diff --git a/packages/amplify_datastore/example/lib/queries_display_widgets.dart b/packages/amplify_datastore/example/lib/queries_display_widgets.dart index 1e8013a7e7..792a39857a 100644 --- a/packages/amplify_datastore/example/lib/queries_display_widgets.dart +++ b/packages/amplify_datastore/example/lib/queries_display_widgets.dart @@ -175,3 +175,35 @@ Widget getWidgetToDisplayComment( }), ); } + +Widget displaySyncButtons() { + return Row(mainAxisAlignment: MainAxisAlignment.center, children: [ + VerticalDivider( + color: Colors.white, + width: 5, + ), + ElevatedButton.icon( + onPressed: () { + Amplify.DataStore.start(); + }, + icon: Icon(Icons.play_arrow), + label: const Text("Start"), + ), + divider, + ElevatedButton.icon( + onPressed: () { + Amplify.DataStore.stop(); + }, + icon: Icon(Icons.stop), + label: const Text("Stop"), + ), + divider, + ElevatedButton.icon( + onPressed: () { + Amplify.DataStore.clear(); + }, + icon: Icon(Icons.delete_sweep), + label: const Text("Clear"), + ), + ]); +} diff --git a/packages/amplify_datastore/ios/Classes/FlutterApiPlugin.swift b/packages/amplify_datastore/ios/Classes/FlutterApiPlugin.swift index 4b82bc8051..3443e244ce 100644 --- a/packages/amplify_datastore/ios/Classes/FlutterApiPlugin.swift +++ b/packages/amplify_datastore/ios/Classes/FlutterApiPlugin.swift @@ -8,8 +8,9 @@ public class FlutterApiPlugin: APICategoryPlugin, AWSAPIAuthInformation private let apiAuthFactory: APIAuthProviderFactory private let nativeApiPlugin: NativeApiPlugin private let nativeSubscriptionEvents: PassthroughSubject - private var cancellables = AtomicDictionary() + private var cancellables = AtomicDictionary() private var endpoints: [String: String] + private var networkMonitor: AmplifyNetworkMonitor init( apiAuthProviderFactory: APIAuthProviderFactory, @@ -21,6 +22,23 @@ public class FlutterApiPlugin: APICategoryPlugin, AWSAPIAuthInformation self.nativeApiPlugin = nativeApiPlugin self.nativeSubscriptionEvents = subscriptionEventBus self.endpoints = endpoints + self.networkMonitor = AmplifyNetworkMonitor() + + // Listen to network events and send a notification to Flutter side when disconnected. + // This enables Flutter to clean up the websocket/subscriptions. + do { + let cancellable = try reachabilityPublisher()?.sink(receiveValue: { reachabilityUpdate in + if !reachabilityUpdate.isOnline { + DispatchQueue.main.async { + self.nativeApiPlugin.deviceOffline {} + } + } + }) + cancellables.set(value: (), forKey: cancellable) // the subscription is bind with class instance lifecycle, it should be released when stream is finished or unsubscribed + + } catch { + print("Failed to create reachability publisher: \(error)") + } } public func defaultAuthType() throws -> AWSAuthorizationType { @@ -122,6 +140,11 @@ public class FlutterApiPlugin: APICategoryPlugin, AWSAPIAuthInformation errors.contains(where: self.isUnauthorizedError(graphQLError:)) { return Fail(error: APIError.operationError("Unauthorized", "", nil)).eraseToAnyPublisher() } + if case .data(.failure(let graphQLResponseError)) = event, + case .error(let errors) = graphQLResponseError, + errors.contains(where: self.isFlutterNetworkError(graphQLError:)){ + return Fail(error: APIError.networkError("FlutterNetworkException", nil, URLError(.networkConnectionLost))).eraseToAnyPublisher() + } return Just(event).setFailureType(to: Error.self).eraseToAnyPublisher() } .eraseToAnyPublisher() @@ -182,6 +205,13 @@ public class FlutterApiPlugin: APICategoryPlugin, AWSAPIAuthInformation } return errorTypeValue == "Unauthorized" } + + private func isFlutterNetworkError(graphQLError: GraphQLError) -> Bool { + guard case let .string(errorTypeValue) = graphQLError.extensions?["errorType"] else { + return false + } + return errorTypeValue == "FlutterNetworkException" + } func asyncQuery(nativeRequest: NativeGraphQLRequest) async -> NativeGraphQLResponse { await withCheckedContinuation { continuation in @@ -236,14 +266,23 @@ public class FlutterApiPlugin: APICategoryPlugin, AWSAPIAuthInformation public func patch(request: RESTRequest) async throws -> RESTTask.Success { preconditionFailure("method not supported") } - + public func reachabilityPublisher(for apiName: String?) throws -> AnyPublisher? { - preconditionFailure("method not supported") + return networkMonitor.publisher + .compactMap { event in + switch event { + case (.offline, .online): + return ReachabilityUpdate(isOnline: true) + case (.online, .offline): + return ReachabilityUpdate(isOnline: false) + default: + return nil + } + } + .eraseToAnyPublisher() } public func reachabilityPublisher() throws -> AnyPublisher? { - return nil + return try reachabilityPublisher(for: nil) } - - } diff --git a/packages/amplify_datastore/ios/Classes/SwiftAmplifyDataStorePlugin.swift b/packages/amplify_datastore/ios/Classes/SwiftAmplifyDataStorePlugin.swift index 7ee5db422f..c3775556a2 100644 --- a/packages/amplify_datastore/ios/Classes/SwiftAmplifyDataStorePlugin.swift +++ b/packages/amplify_datastore/ios/Classes/SwiftAmplifyDataStorePlugin.swift @@ -145,8 +145,6 @@ public class SwiftAmplifyDataStorePlugin: NSObject, FlutterPlugin, NativeAmplify nil ) } - // TODO: Migrate to Async Swift v2 - // AmplifyAWSServiceConfiguration.addUserAgentPlatform(.flutter, version: "\(version) /datastore") try Amplify.configure(with : .data(data)) return completion(.success(())) } catch let error as ConfigurationError { diff --git a/packages/amplify_datastore/ios/Classes/api/GraphQLResponse+Decode.swift b/packages/amplify_datastore/ios/Classes/api/GraphQLResponse+Decode.swift index 289fa1d2e6..f5a9c6a8d7 100644 --- a/packages/amplify_datastore/ios/Classes/api/GraphQLResponse+Decode.swift +++ b/packages/amplify_datastore/ios/Classes/api/GraphQLResponse+Decode.swift @@ -141,7 +141,14 @@ extension GraphQLResponse { uniquingKeysWith: { _, a in a } ) } - + + if error.message?.stringValue?.contains("NetworkException") == true { + extensions = extensions.merging( + ["errorType": "FlutterNetworkException"], + uniquingKeysWith: { _, a in a } + ) + } + return (try? jsonEncoder.encode(error)) .flatMap { try? jsonDecoder.decode(GraphQLError.self, from: $0) } .map { diff --git a/packages/amplify_datastore/ios/Classes/pigeons/NativePluginBindings.swift b/packages/amplify_datastore/ios/Classes/pigeons/NativePluginBindings.swift index 9f4386ced4..df1b268a75 100644 --- a/packages/amplify_datastore/ios/Classes/pigeons/NativePluginBindings.swift +++ b/packages/amplify_datastore/ios/Classes/pigeons/NativePluginBindings.swift @@ -401,6 +401,12 @@ class NativeApiPlugin { completion() } } + func deviceOffline(completion: @escaping () -> Void) { + let channel = FlutterBasicMessageChannel(name: "dev.flutter.pigeon.amplify_datastore.NativeApiPlugin.deviceOffline", binaryMessenger: binaryMessenger, codec: codec) + channel.sendMessage(nil) { _ in + completion() + } + } func onStop(completion: @escaping () -> Void) { let channel = FlutterBasicMessageChannel(name: "dev.flutter.pigeon.amplify_datastore.NativeApiPlugin.onStop", binaryMessenger: binaryMessenger, codec: codec) channel.sendMessage(nil) { _ in diff --git a/packages/amplify_datastore/lib/amplify_datastore.dart b/packages/amplify_datastore/lib/amplify_datastore.dart index 3e4fc8ec4b..b71231d95c 100644 --- a/packages/amplify_datastore/lib/amplify_datastore.dart +++ b/packages/amplify_datastore/lib/amplify_datastore.dart @@ -350,7 +350,9 @@ class NativeAmplifyApi Future subscribe( NativeGraphQLRequest request) async { final flutterRequest = nativeRequestToGraphQLRequest(request); - + // Turn off then default reconnection behavior to allow native side to trigger reconnect + // ignore: invalid_use_of_internal_member + WebSocketOptions.autoReconnect = false; final operation = Amplify.API.subscribe(flutterRequest, onEstablished: () => sendNativeStartAckEvent(flutterRequest.id)); @@ -376,6 +378,28 @@ class NativeAmplifyApi } } + @override + Future deviceOffline() async { + await _notifySubscriptionsDisconnected(); + } + + Future _notifySubscriptionsDisconnected() async { + _subscriptionsCache.forEach((subId, stream) async { + // Send Swift subscriptions an expected error message when network is lost. + // Swift side is expecting this string to transform into the correct error type. + // This will cause the Sync Engine to enter retry mode and in order to recover it + // later we must unsubscribe and close the websocket. + GraphQLResponseError error = GraphQLResponseError( + message: 'FlutterNetworkException - Network disconnected', + ); + sendSubscriptionStreamErrorEvent(subId, error.toJson()); + // Note: the websocket will still be closing after this line. + // There may be a small delay in shutdown. + await unsubscribe(subId); + await stream.cancel(); + }); + } + /// Amplify.DataStore.Stop() callback /// /// Clean up subscriptions on stop. diff --git a/packages/amplify_datastore/lib/src/native_plugin.g.dart b/packages/amplify_datastore/lib/src/native_plugin.g.dart index 49bb3c66b7..c87f9b9994 100644 --- a/packages/amplify_datastore/lib/src/native_plugin.g.dart +++ b/packages/amplify_datastore/lib/src/native_plugin.g.dart @@ -358,6 +358,8 @@ abstract class NativeApiPlugin { Future unsubscribe(String subscriptionId); + Future deviceOffline(); + Future onStop(); static void setup(NativeApiPlugin? api, {BinaryMessenger? binaryMessenger}) { @@ -464,6 +466,21 @@ abstract class NativeApiPlugin { }); } } + { + final BasicMessageChannel channel = BasicMessageChannel( + 'dev.flutter.pigeon.amplify_datastore.NativeApiPlugin.deviceOffline', + codec, + binaryMessenger: binaryMessenger); + if (api == null) { + channel.setMessageHandler(null); + } else { + channel.setMessageHandler((Object? message) async { + // ignore message + await api.deviceOffline(); + return; + }); + } + } { final BasicMessageChannel channel = BasicMessageChannel( 'dev.flutter.pigeon.amplify_datastore.NativeApiPlugin.onStop', codec, diff --git a/packages/amplify_datastore/pigeons/native_plugin.dart b/packages/amplify_datastore/pigeons/native_plugin.dart index 8c49e0d3cb..cf09fd9042 100644 --- a/packages/amplify_datastore/pigeons/native_plugin.dart +++ b/packages/amplify_datastore/pigeons/native_plugin.dart @@ -42,6 +42,9 @@ abstract class NativeApiPlugin { @async void unsubscribe(String subscriptionId); + @async + void deviceOffline(); + @async void onStop(); } diff --git a/packages/api/amplify_api_dart/lib/amplify_api_dart.dart b/packages/api/amplify_api_dart/lib/amplify_api_dart.dart index 44022623ce..e25c15fb25 100644 --- a/packages/api/amplify_api_dart/lib/amplify_api_dart.dart +++ b/packages/api/amplify_api_dart/lib/amplify_api_dart.dart @@ -4,7 +4,8 @@ /// Amplify API for Dart library amplify_api_dart; -export 'package:amplify_core/src/types/api/api_types.dart'; +export 'package:amplify_core/src/types/api/api_types.dart' + hide WebSocketOptions; export 'src/api_plugin_impl.dart'; diff --git a/packages/api/amplify_api_dart/lib/src/graphql/web_socket/blocs/subscriptions_bloc.dart b/packages/api/amplify_api_dart/lib/src/graphql/web_socket/blocs/subscriptions_bloc.dart index 027dae236e..3f98fc8a30 100644 --- a/packages/api/amplify_api_dart/lib/src/graphql/web_socket/blocs/subscriptions_bloc.dart +++ b/packages/api/amplify_api_dart/lib/src/graphql/web_socket/blocs/subscriptions_bloc.dart @@ -142,11 +142,7 @@ class SubscriptionBloc } Stream> _complete(SubscriptionComplete event) async* { - assert( - _currentState is SubscriptionListeningState, - 'State should always be listening when completed.', - ); - yield (_currentState as SubscriptionListeningState).complete(); + yield _currentState.complete(); await close(); } diff --git a/packages/api/amplify_api_dart/lib/src/graphql/web_socket/blocs/web_socket_bloc.dart b/packages/api/amplify_api_dart/lib/src/graphql/web_socket/blocs/web_socket_bloc.dart index 9f679d5076..0903db904b 100644 --- a/packages/api/amplify_api_dart/lib/src/graphql/web_socket/blocs/web_socket_bloc.dart +++ b/packages/api/amplify_api_dart/lib/src/graphql/web_socket/blocs/web_socket_bloc.dart @@ -53,8 +53,10 @@ class WebSocketBloc with AWSDebuggable, AmplifyLoggerMixin { add(const InitEvent()); } + final blocId = uuid(); + @override - String get runtimeTypeName => 'WebSocketBloc'; + String get runtimeTypeName => 'WebSocketBloc - $blocId'; /// Default timeout response for polling static const Duration _pollResponseTimeout = Duration(seconds: 5); @@ -221,8 +223,11 @@ class WebSocketBloc with AWSDebuggable, AmplifyLoggerMixin { () => _timeout(timeoutDuration), ); - final pollTimer = - Timer.periodic(_currentState.options.pollInterval, (_) => _poll()); + final pollTimer = Timer.periodic( + _currentState.options.pollInterval, + // ignore: invalid_use_of_internal_member + (_) => WebSocketOptions.autoReconnect ? _poll() : () {}, + ); final connectedState = (_currentState as ConnectingState).connected( timeoutTimer, @@ -474,13 +479,28 @@ class WebSocketBloc with AWSDebuggable, AmplifyLoggerMixin { /// Connectivity stream monitors network availability on a hardware level. StreamSubscription _getConnectivityStream() { + var prev = ConnectivityStatus.disconnected; return _connectivity.onConnectivityChanged.listen( (status) { - if (status == ConnectivityStatus.connected) { + // ignore: invalid_use_of_internal_member + if (!WebSocketOptions.autoReconnect) { + // shutdown the socket when autoReconnect is turned off + if (status == ConnectivityStatus.disconnected && + prev == ConnectivityStatus.connected) { + _shutdownWithException( + const NetworkException( + 'Unable to recover network connection, web socket will close.', + recoverySuggestion: 'Check internet connection.', + ), + StackTrace.current, + ); + } + } else if (status == ConnectivityStatus.connected) { add(const NetworkFoundEvent()); } else if (status == ConnectivityStatus.disconnected) { add(const NetworkLossEvent()); } + prev = status; }, onError: (Object e, StackTrace st) => logger.error('Error in connectivity stream $e, $st'),