diff --git a/CHANGELOG.md b/CHANGELOG.md index bc006fd0f..12c81f68a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,7 @@ * Dart ^3.0.2 and Flutter ^3.10.2 ### Internal +* Synced realms will use async open to prevent overloading the server with schema updates. [#1369](https://github.com/realm/realm-dart/pull/1369)) * Using Core 13.15.1 ## 1.2.0 (2023-06-08) diff --git a/lib/src/native/realm_bindings.dart b/lib/src/native/realm_bindings.dart index a98b66260..62847e566 100644 --- a/lib/src/native/realm_bindings.dart +++ b/lib/src/native/realm_bindings.dart @@ -3129,6 +3129,32 @@ class RealmLibrary { ffi.Pointer Function( ffi.Pointer)>(); + void realm_dart_async_open_task_callback( + ffi.Pointer userdata, + ffi.Pointer realm, + ffi.Pointer error, + ) { + return _realm_dart_async_open_task_callback( + userdata, + realm, + error, + ); + } + + late final _realm_dart_async_open_task_callbackPtr = _lookup< + ffi.NativeFunction< + ffi.Void Function( + ffi.Pointer, + ffi.Pointer, + ffi.Pointer)>>( + 'realm_dart_async_open_task_callback'); + late final _realm_dart_async_open_task_callback = + _realm_dart_async_open_task_callbackPtr.asFunction< + void Function( + ffi.Pointer, + ffi.Pointer, + ffi.Pointer)>(); + ffi.Pointer realm_dart_create_scheduler( int isolateId, int port, @@ -10819,6 +10845,14 @@ class RealmLibrary { class _SymbolAddresses { final RealmLibrary _library; _SymbolAddresses(this._library); + ffi.Pointer< + ffi.NativeFunction< + ffi.Void Function( + ffi.Pointer, + ffi.Pointer, + ffi.Pointer)>> + get realm_dart_async_open_task_callback => + _library._realm_dart_async_open_task_callbackPtr; ffi.Pointer< ffi.NativeFunction< ffi.Pointer Function(ffi.Uint64, Dart_Port)>> diff --git a/lib/src/native/realm_core.dart b/lib/src/native/realm_core.dart index d8d924f1e..2081e40a2 100644 --- a/lib/src/native/realm_core.dart +++ b/lib/src/native/realm_core.dart @@ -143,14 +143,7 @@ class _RealmCore { return null; } - final message = error.ref.message.cast().toRealmDartString(); - Object? userError; - if (error.ref.usercode_error != nullptr) { - userError = error.ref.usercode_error.toObject(isPersistent: true); - _realmLib.realm_dart_delete_persistent_handle(error.ref.usercode_error); - } - - return LastError(error.ref.error, message, userError); + return error.ref.toLastError(); } void throwLastError([String? errorMessage]) { @@ -626,8 +619,8 @@ class _RealmCore { } final beforeRealm = RealmInternal.getUnowned(syncConfig, RealmHandle._unowned(beforeHandle)); - final afterRealm = - RealmInternal.getUnowned(syncConfig, RealmHandle._unowned(_realmLib.realm_from_thread_safe_reference(afterReference, scheduler.handle._pointer))); + final realmPtr = _realmLib.invokeGetPointer(() => _realmLib.realm_from_thread_safe_reference(afterReference, scheduler.handle._pointer)); + final afterRealm = RealmInternal.getUnowned(syncConfig, RealmHandle._unowned(realmPtr)); try { return await afterResetCallback(beforeRealm, afterRealm); @@ -664,6 +657,61 @@ class _RealmCore { return RealmHandle._(realmPtr); } + RealmAsyncOpenTaskHandle createRealmAsyncOpenTask(FlexibleSyncConfiguration config) { + final configHandle = _createConfig(config); + final asyncOpenTaskPtr = + _realmLib.invokeGetPointer(() => _realmLib.realm_open_synchronized(configHandle._pointer), "Error opening realm at path ${config.path}"); + return RealmAsyncOpenTaskHandle._(asyncOpenTaskPtr); + } + + Future openRealmAsync(RealmAsyncOpenTaskHandle handle, CancellationToken? cancellationToken) { + final completer = CancellableCompleter(cancellationToken); + final callback = + Pointer.fromFunction realm, Pointer error)>(_openRealmAsyncCallback); + final userData = _realmLib.realm_dart_userdata_async_new(completer, callback.cast(), scheduler.handle._pointer); + _realmLib.realm_async_open_task_start( + handle._pointer, + _realmLib.addresses.realm_dart_async_open_task_callback, + userData.cast(), + _realmLib.addresses.realm_dart_userdata_async_free, + ); + + return completer.future; + } + + static void _openRealmAsyncCallback(Object userData, Pointer realmSafePtr, Pointer error) { + return using((Arena arena) { + final completer = userData as Completer; + + if (error != nullptr) { + final err = arena(); + _realmLib.realm_get_async_error(error, err); + completer.completeError(RealmException("Failed to open realm ${err.ref.toLastError().toString()}")); + return; + } + + final realmPtr = _realmLib.invokeGetPointer(() => _realmLib.realm_from_thread_safe_reference(realmSafePtr, scheduler.handle._pointer)); + completer.complete(RealmHandle._(realmPtr)); + }); + } + + void cancelOpenRealmAsync(RealmAsyncOpenTaskHandle handle) { + _realmLib.realm_async_open_task_cancel(handle._pointer); + } + + RealmAsyncOpenTaskProgressNotificationTokenHandle realmAsyncOpenRegisterAsyncOpenProgressNotifier( + RealmAsyncOpenTaskHandle handle, RealmAsyncOpenProgressNotificationsController controller) { + final callback = Pointer.fromFunction(_syncProgressCallback); + final userdata = _realmLib.realm_dart_userdata_async_new(controller, callback.cast(), scheduler.handle._pointer); + final tokenPtr = _realmLib.invokeGetPointer(() => _realmLib.realm_async_open_task_register_download_progress_notifier( + handle._pointer, + _realmLib.addresses.realm_dart_sync_progress_callback, + userdata.cast(), + _realmLib.addresses.realm_dart_userdata_async_free, + )); + return RealmAsyncOpenTaskProgressNotificationTokenHandle._(tokenPtr); + } + RealmSchema readSchema(Realm realm) { return using((Arena arena) { return _readSchema(realm, arena); @@ -1777,10 +1825,6 @@ class _RealmCore { return using((arena) { final handle = SyncClientConfigHandle._(_realmLib.realm_sync_client_config_new()); - // TODO: Remove later - // Disable multiplexing for now due to: https://github.com/realm/realm-core/issues/6656 - _realmLib.realm_sync_client_config_set_multiplex_sessions(handle._pointer, false); - // <-- end _realmLib.realm_sync_client_config_set_base_file_path(handle._pointer, configuration.baseFilePath.path.toCharPtr(arena)); _realmLib.realm_sync_client_config_set_metadata_mode(handle._pointer, configuration.metadataPersistenceMode.index); _realmLib.realm_sync_client_config_set_connect_timeout(handle._pointer, configuration.maxConnectionTimeout.inMilliseconds); @@ -2228,20 +2272,20 @@ class _RealmCore { RealmSyncSessionConnectionStateNotificationTokenHandle sessionRegisterProgressNotifier( Session session, ProgressDirection direction, ProgressMode mode, SessionProgressNotificationsController controller) { final isStreaming = mode == ProgressMode.reportIndefinitely; - final callback = Pointer.fromFunction(_progressCallback); + final callback = Pointer.fromFunction(_syncProgressCallback); final userdata = _realmLib.realm_dart_userdata_async_new(controller, callback.cast(), scheduler.handle._pointer); - final notification_token = _realmLib.realm_sync_session_register_progress_notifier( + final tokenPtr = _realmLib.invokeGetPointer(() => _realmLib.realm_sync_session_register_progress_notifier( session.handle._pointer, _realmLib.addresses.realm_dart_sync_progress_callback, direction.index, isStreaming, userdata.cast(), - _realmLib.addresses.realm_dart_userdata_async_free); - return RealmSyncSessionConnectionStateNotificationTokenHandle._(notification_token); + _realmLib.addresses.realm_dart_userdata_async_free)); + return RealmSyncSessionConnectionStateNotificationTokenHandle._(tokenPtr); } - static void _progressCallback(Object userdata, int transferred, int transferable) { - final controller = userdata as SessionProgressNotificationsController; + static void _syncProgressCallback(Object userdata, int transferred, int transferable) { + final controller = userdata as ProgressNotificationsController; controller.onProgress(transferred, transferable); } @@ -2845,6 +2889,14 @@ class SubscriptionHandle extends HandleBase { SubscriptionHandle._(Pointer pointer) : super(pointer, 184); } +class RealmAsyncOpenTaskHandle extends HandleBase { + RealmAsyncOpenTaskHandle._(Pointer pointer) : super(pointer, 32); +} + +class RealmAsyncOpenTaskProgressNotificationTokenHandle extends HandleBase { + RealmAsyncOpenTaskProgressNotificationTokenHandle._(Pointer pointer) : super(pointer, 40); +} + class SubscriptionSetHandle extends RootedHandleBase { @override bool get shouldRoot => true; @@ -3350,3 +3402,16 @@ class SyncErrorDetails { this.compensatingWrites, }); } + +extension on realm_error { + LastError toLastError() { + final message = this.message.cast().toRealmDartString(); + Object? userError; + if (error == realm_errno.RLM_ERR_CALLBACK && usercode_error != nullptr) { + userError = usercode_error.toObject(isPersistent: true); + _realmLib.realm_dart_delete_persistent_handle(usercode_error); + } + + return LastError(error, message, userError); + } +} diff --git a/lib/src/realm_class.dart b/lib/src/realm_class.dart index c016da5c0..675e2007e 100644 --- a/lib/src/realm_class.dart +++ b/lib/src/realm_class.dart @@ -173,31 +173,47 @@ class Realm implements Finalizable { if (cancellationToken != null && cancellationToken.isCancelled) { throw cancellationToken.exception!; } - final realm = Realm(config); - StreamSubscription? subscription; - try { - if (config is FlexibleSyncConfiguration) { - final session = realm.syncSession; - if (onProgressCallback != null) { - subscription = session.getProgressStream(ProgressDirection.download, ProgressMode.forCurrentlyOutstandingWork).listen(onProgressCallback); - } - await session.waitForDownload(cancellationToken); - await subscription?.cancel(); - } - } catch (_) { - await subscription?.cancel(); - realm.close(); - rethrow; + + if (config is! FlexibleSyncConfiguration) { + final realm = Realm(config); + return await CancellableFuture.value(realm, cancellationToken); } - return await CancellableFuture.value(realm, cancellationToken); + + _ensureDirectory(config); + + final asyncOpenHandle = realmCore.createRealmAsyncOpenTask(config); + return await CancellableFuture.from(() async { + if (cancellationToken != null && cancellationToken.isCancelled) { + throw cancellationToken.exception!; + } + + StreamSubscription? progressSubscription; + if (onProgressCallback != null) { + final progressController = RealmAsyncOpenProgressNotificationsController._(asyncOpenHandle); + final progressStream = progressController.createStream(); + progressSubscription = progressStream.listen(onProgressCallback); + } + + late final RealmHandle realmHandle; + try { + realmHandle = await realmCore.openRealmAsync(asyncOpenHandle, cancellationToken); + return Realm._(config, realmHandle); + } finally { + await progressSubscription?.cancel(); + } + }, cancellationToken, onCancel: () => realmCore.cancelOpenRealmAsync(asyncOpenHandle)); } static RealmHandle _openRealm(Configuration config) { + _ensureDirectory(config); + return realmCore.openRealm(config); + } + + static void _ensureDirectory(Configuration config) { var dir = File(config.path).parent; if (!dir.existsSync()) { dir.createSync(recursive: true); } - return realmCore.openRealm(config); } void _populateMetadata() { @@ -972,3 +988,35 @@ class MigrationRealm extends DynamicRealm { /// * syncProgress - an object of [SyncProgress] that contains `transferredBytes` and `transferableBytes`. /// {@category Realm} typedef ProgressCallback = void Function(SyncProgress syncProgress); + +/// @nodoc +class RealmAsyncOpenProgressNotificationsController implements ProgressNotificationsController { + final RealmAsyncOpenTaskHandle _handle; + RealmAsyncOpenTaskProgressNotificationTokenHandle? _tokenHandle; + late final StreamController _streamController; + + RealmAsyncOpenProgressNotificationsController._(this._handle); + + Stream createStream() { + _streamController = StreamController(onListen: _start, onCancel: _stop); + return _streamController.stream; + } + + @override + void onProgress(int transferredBytes, int transferableBytes) { + _streamController.add(SessionInternal.createSyncProgress(transferredBytes, transferableBytes)); + } + + void _start() { + if (_tokenHandle != null) { + throw RealmStateError("Progress subscription already started."); + } + + _tokenHandle = realmCore.realmAsyncOpenRegisterAsyncOpenProgressNotifier(_handle, this); + } + + void _stop() { + _tokenHandle?.release(); + _tokenHandle = null; + } +} diff --git a/lib/src/session.dart b/lib/src/session.dart index e8e123e9d..84adaddf7 100644 --- a/lib/src/session.dart +++ b/lib/src/session.dart @@ -91,7 +91,7 @@ class SyncProgress { /// successfully transferred. final int transferableBytes; - SyncProgress._(this.transferredBytes, this.transferableBytes); + const SyncProgress._(this.transferredBytes, this.transferableBytes); } /// A type containing information about the transition of a connection state from one value to another. @@ -124,15 +124,21 @@ extension SessionInternal on Session { void raiseError(SyncErrorCategory category, int errorCode, bool isFatal) { realmCore.raiseError(this, category, errorCode, isFatal); } + + static SyncProgress createSyncProgress(int transferredBytes, int transferableBytes) => SyncProgress._(transferredBytes, transferableBytes); +} + +abstract interface class ProgressNotificationsController { + void onProgress(int transferredBytes, int transferableBytes); } /// @nodoc -class SessionProgressNotificationsController { +class SessionProgressNotificationsController implements ProgressNotificationsController { final Session _session; final ProgressDirection _direction; final ProgressMode _mode; - RealmSyncSessionConnectionStateNotificationTokenHandle? _token; + RealmSyncSessionConnectionStateNotificationTokenHandle? _tokenHandle; late final StreamController _streamController; SessionProgressNotificationsController(this._session, this._direction, this._mode); @@ -142,6 +148,7 @@ class SessionProgressNotificationsController { return _streamController.stream; } + @override void onProgress(int transferredBytes, int transferableBytes) { _streamController.add(SyncProgress._(transferredBytes, transferableBytes)); @@ -151,15 +158,15 @@ class SessionProgressNotificationsController { } void _start() { - if (_token != null) { - throw RealmStateError("Session progress subscription already started"); + if (_tokenHandle != null) { + throw RealmStateError("Session progress subscription already started."); } - _token = realmCore.sessionRegisterProgressNotifier(_session, _direction, _mode, this); + _tokenHandle = realmCore.sessionRegisterProgressNotifier(_session, _direction, _mode, this); } void _stop() { - _token?.release(); - _token = null; + _tokenHandle?.release(); + _tokenHandle = null; } } @@ -591,7 +598,6 @@ enum SyncResolveErrorCode { /// /// These errors will be reported via the error handlers of the affected sessions. enum SyncWebSocketErrorCode { - /// Web socket resolution failed websocketResolveFailed(4400), diff --git a/src/realm_dart_sync.cpp b/src/realm_dart_sync.cpp index d2867cf9d..8846e3aae 100644 --- a/src/realm_dart_sync.cpp +++ b/src/realm_dart_sync.cpp @@ -218,3 +218,11 @@ RLM_API bool realm_dart_sync_after_reset_handler_callback(realm_userdata_t userd }; return invoke_dart_and_await_result(&userCallback); } + +RLM_API void realm_dart_async_open_task_callback(realm_userdata_t userdata, realm_thread_safe_reference_t* realm, const realm_async_error_t* error) +{ + auto ud = reinterpret_cast(userdata); + ud->scheduler->invoke([ud, realm, error]() { + (reinterpret_cast(ud->dart_callback))(ud->handle, realm, error); + }); +} diff --git a/src/realm_dart_sync.h b/src/realm_dart_sync.h index ba354b89b..8d1f9f412 100644 --- a/src/realm_dart_sync.h +++ b/src/realm_dart_sync.h @@ -41,4 +41,6 @@ RLM_API void realm_dart_sync_on_subscription_state_changed_callback(realm_userda RLM_API bool realm_dart_sync_before_reset_handler_callback(realm_userdata_t userdata, realm_t* realm); -RLM_API bool realm_dart_sync_after_reset_handler_callback(realm_userdata_t userdata, realm_t* before_realm, realm_thread_safe_reference_t* after_realm, bool did_recover); \ No newline at end of file +RLM_API bool realm_dart_sync_after_reset_handler_callback(realm_userdata_t userdata, realm_t* before_realm, realm_thread_safe_reference_t* after_realm, bool did_recover); + +RLM_API void realm_dart_async_open_task_callback(realm_userdata_t userdata, realm_thread_safe_reference_t* realm, const realm_async_error_t* error); diff --git a/test/test.dart b/test/test.dart index bbe16b44f..ea05fbedb 100644 --- a/test/test.dart +++ b/test/test.dart @@ -401,7 +401,7 @@ Future setupTests(List? args) async { addTearDown(() async { final paths = HashSet(); paths.add(path); - + realmCore.clearCachedApps(); while (_openRealms.isNotEmpty) {