From 09c6cf0b4e968204be5eda63614e815078fea970 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Sat, 6 Jan 2024 18:06:01 +0100 Subject: [PATCH] Close vfs when wasm database is closed --- drift/CHANGELOG.md | 1 + drift/lib/src/web/channel.dart | 33 ++++++- drift/lib/src/web/wasm_setup.dart | 36 +++++--- .../src/web/wasm_setup/dedicated_worker.dart | 1 + drift/lib/src/web/wasm_setup/protocol.dart | 73 ++++++++++++++++ drift/lib/src/web/wasm_setup/shared.dart | 86 ++++++++++++++++--- .../lib/src/web/wasm_setup/shared_worker.dart | 2 + .../web_wasm/lib/driver.dart | 4 + .../web_wasm/test/drift_wasm_test.dart | 2 +- .../integration_tests/web_wasm/web/main.dart | 3 + 10 files changed, 214 insertions(+), 27 deletions(-) diff --git a/drift/CHANGELOG.md b/drift/CHANGELOG.md index 0ac3ef2cb..9edca9d7f 100644 --- a/drift/CHANGELOG.md +++ b/drift/CHANGELOG.md @@ -1,6 +1,7 @@ ## 2.15.0-dev - Methods in the query builder API now respect custom types. +- Close wasm databases hosted in workers after the last client disconnects. ## 2.14.1 diff --git a/drift/lib/src/web/channel.dart b/drift/lib/src/web/channel.dart index d1cd21ac9..125e6ffd3 100644 --- a/drift/lib/src/web/channel.dart +++ b/drift/lib/src/web/channel.dart @@ -2,6 +2,8 @@ import 'dart:html'; import 'package:stream_channel/stream_channel.dart'; +const _disconnectMessage = '_disconnect'; + /// Extension to transform a raw [MessagePort] from web workers into a Dart /// [StreamChannel]. extension PortToChannel on MessagePort { @@ -10,10 +12,35 @@ extension PortToChannel on MessagePort { /// /// This can be used to implement a remote database connection over service /// workers. - StreamChannel channel() { + /// + /// The [explicitClose] parameter can be used to control whether a close + /// message should be sent through the channel when it is closed. This will + /// cause it to be closed on the other end as well. Note that this is not a + /// reliable way of determining channel closures though, as there is no event + /// for channels being closed due to a tab or worker being closed. + /// Both "ends" of a JS channel calling [channel] on their part must use the + /// value for [explicitClose]. + StreamChannel channel({bool explicitClose = false}) { final controller = StreamChannelController(); - onMessage.map((event) => event.data).pipe(controller.local.sink); - controller.local.stream.listen(postMessage, onDone: close); + onMessage.listen((event) { + final message = event.data; + + if (explicitClose && message == _disconnectMessage) { + // Other end has closed the connection + controller.local.sink.close(); + } else { + controller.local.sink.add(message); + } + }); + + controller.local.stream.listen(postMessage, onDone: () { + // Closed locally, inform the other end. + if (explicitClose) { + postMessage(_disconnectMessage); + } + + close(); + }); return controller.foreign; } diff --git a/drift/lib/src/web/wasm_setup.dart b/drift/lib/src/web/wasm_setup.dart index 682dfa1ba..f3d070387 100644 --- a/drift/lib/src/web/wasm_setup.dart +++ b/drift/lib/src/web/wasm_setup.dart @@ -114,6 +114,7 @@ class WasmDatabaseOpener { as DedicatedWorkerCompatibilityResult; _handleCompatibilityResult(status); + dedicatedWorker.version = status.version; if (status.supportsNestedWorkers && status.canAccessOpfs && @@ -142,6 +143,7 @@ class WasmDatabaseOpener { as SharedWorkerCompatibilityResult; _handleCompatibilityResult(sharedFeatures); + shared.version = sharedFeatures.version; // Prefer to use the shared worker to host the database if it supports the // necessary APIs. @@ -160,6 +162,7 @@ class WasmDatabaseOpener { final class _DriftWorker { final AbstractWorker worker; + ProtocolVersion version = ProtocolVersion.legacy; /// The message port to communicate with the worker, if it's a shared worker. final MessagePort? portForShared; @@ -225,16 +228,8 @@ final class _ProbeResult implements WasmProbeResult { final channel = MessageChannel(); final initializer = initializeDatabase; final initChannel = initializer != null ? MessageChannel() : null; - final local = channel.port1.channel(); - - final message = ServeDriftDatabase( - sqlite3WasmUri: opener.sqlite3WasmUri, - port: channel.port2, - storage: implementation, - databaseName: name, - initializationPort: initChannel?.port2, - ); + ServeDriftDatabase message; final sharedWorker = opener._sharedWorker; final dedicatedWorker = opener._dedicatedWorker; @@ -242,10 +237,28 @@ final class _ProbeResult implements WasmProbeResult { case WasmStorageImplementation.opfsShared: case WasmStorageImplementation.sharedIndexedDb: // Forward connection request to shared worker. - message.sendTo(sharedWorker!.send); + message = ServeDriftDatabase( + sqlite3WasmUri: opener.sqlite3WasmUri, + port: channel.port2, + storage: implementation, + databaseName: name, + initializationPort: initChannel?.port2, + protocolVersion: sharedWorker!.version, + ); + + message.sendTo(sharedWorker.send); case WasmStorageImplementation.opfsLocks: case WasmStorageImplementation.unsafeIndexedDb: if (dedicatedWorker != null) { + message = ServeDriftDatabase( + sqlite3WasmUri: opener.sqlite3WasmUri, + port: channel.port2, + storage: implementation, + databaseName: name, + initializationPort: initChannel?.port2, + protocolVersion: dedicatedWorker.version, + ); + message.sendTo(dedicatedWorker.send); } else { // Workers seem to be broken, but we don't need them with this storage @@ -276,6 +289,9 @@ final class _ProbeResult implements WasmProbeResult { } }); + final local = channel.port1 + .channel(explicitClose: message.protocolVersion >= ProtocolVersion.v1); + var connection = await connectToRemoteAndInitialize(local); if (implementation == WasmStorageImplementation.opfsLocks) { // We want stream queries to update for writes in other tabs. For the diff --git a/drift/lib/src/web/wasm_setup/dedicated_worker.dart b/drift/lib/src/web/wasm_setup/dedicated_worker.dart index 74b364286..522465734 100644 --- a/drift/lib/src/web/wasm_setup/dedicated_worker.dart +++ b/drift/lib/src/web/wasm_setup/dedicated_worker.dart @@ -77,6 +77,7 @@ class DedicatedDriftWorker { opfsExists: opfsExists, indexedDbExists: indexedDbExists, existingDatabases: existingDatabases, + version: ProtocolVersion.current, ).sendToClient(self); case ServeDriftDatabase(): _servers.serve(message); diff --git a/drift/lib/src/web/wasm_setup/protocol.dart b/drift/lib/src/web/wasm_setup/protocol.dart index 9c5c41505..3d681efcb 100644 --- a/drift/lib/src/web/wasm_setup/protocol.dart +++ b/drift/lib/src/web/wasm_setup/protocol.dart @@ -8,6 +8,56 @@ import 'package:sqlite3/wasm.dart'; import 'types.dart'; +/// Due to in-browser caching or users not updating their `drift_worker.dart` +/// file after updating drift, the main web app and the workers may be compiled +/// with different versions of drift. To avoid inconsistencies in the +/// communication channel between them, they compare their versions in a +/// handshake and only use features supported by both. +class ProtocolVersion { + final int versionCode; + + const ProtocolVersion._(this.versionCode); + + void writeToJs(Object object) { + setProperty(object, 'v', versionCode); + } + + bool operator >=(ProtocolVersion other) { + return versionCode >= other.versionCode; + } + + static ProtocolVersion negotiate(int? versionCode) { + return switch (versionCode) { + null => legacy, + <= 0 => legacy, + 1 => v1, + > 1 => current, + _ => throw AssertionError(), + }; + } + + static ProtocolVersion fromJsObject(Object object) { + if (hasProperty(object, 'v')) { + return negotiate(getProperty(object, 'v')); + } else { + return legacy; + } + } + + /// The protocol version used for drift versions up to 2.14 - these don't have + /// a version marker anywhere. + static const legacy = ProtocolVersion._(0); + + /// This version makes workers report their supported protocol version. + /// + /// When both the client and the involved worker support this version, an + /// explicit close notification is sent from clients to workers when closing + /// databases. This allows workers to release resources more effieciently. + static const v1 = ProtocolVersion._(1); + + static const current = v1; +} + typedef PostMessage = void Function(Object? msg, [List? transfer]); /// Sealed superclass for JavaScript objects exchanged between the UI tab and @@ -65,6 +115,12 @@ sealed class CompatibilityResult extends WasmInitializationMessage { /// be used to check whether the database exists. final List existingDatabases; + /// The latest protocol version spoken by the worker. + /// + /// Workers only started to report their version in drift 2.15, we assume + /// [ProtocolVersion.legacy] for workers that don't report their version. + final ProtocolVersion version; + final bool indexedDbExists; final bool opfsExists; @@ -74,6 +130,7 @@ sealed class CompatibilityResult extends WasmInitializationMessage { required this.existingDatabases, required this.indexedDbExists, required this.opfsExists, + required this.version, }); } @@ -96,6 +153,7 @@ final class SharedWorkerCompatibilityResult extends CompatibilityResult { required super.indexedDbExists, required super.opfsExists, required super.existingDatabases, + required super.version, }); factory SharedWorkerCompatibilityResult.fromJsPayload(Object payload) { @@ -103,9 +161,15 @@ final class SharedWorkerCompatibilityResult extends CompatibilityResult { final asBooleans = asList.cast(); final List existingDatabases; + var version = ProtocolVersion.legacy; + if (asList.length > 5) { existingDatabases = EncodeLocations.readFromJs(asList[5] as List); + + if (asList.length > 6) { + version = ProtocolVersion.negotiate(asList[6] as int); + } } else { existingDatabases = const []; } @@ -117,6 +181,7 @@ final class SharedWorkerCompatibilityResult extends CompatibilityResult { indexedDbExists: asBooleans[3], opfsExists: asBooleans[4], existingDatabases: existingDatabases, + version: version, ); } @@ -129,6 +194,7 @@ final class SharedWorkerCompatibilityResult extends CompatibilityResult { indexedDbExists, opfsExists, existingDatabases.encodeToJs(), + version.versionCode, ]); } @@ -175,6 +241,7 @@ final class ServeDriftDatabase extends WasmInitializationMessage { final WasmStorageImplementation storage; final String databaseName; final MessagePort? initializationPort; + final ProtocolVersion protocolVersion; ServeDriftDatabase({ required this.sqlite3WasmUri, @@ -182,6 +249,7 @@ final class ServeDriftDatabase extends WasmInitializationMessage { required this.storage, required this.databaseName, required this.initializationPort, + required this.protocolVersion, }); factory ServeDriftDatabase.fromJsPayload(Object payload) { @@ -192,6 +260,7 @@ final class ServeDriftDatabase extends WasmInitializationMessage { .byName(getProperty(payload, 'storage')), databaseName: getProperty(payload, 'database'), initializationPort: getProperty(payload, 'initPort'), + protocolVersion: ProtocolVersion.fromJsObject(payload), ); } @@ -204,6 +273,7 @@ final class ServeDriftDatabase extends WasmInitializationMessage { setProperty(object, 'database', databaseName); final initPort = initializationPort; setProperty(object, 'initPort', initPort); + protocolVersion.writeToJs(object); sender.sendTyped(type, object, [ port, @@ -249,6 +319,7 @@ final class DedicatedWorkerCompatibilityResult extends CompatibilityResult { required super.indexedDbExists, required super.opfsExists, required super.existingDatabases, + required super.version, }); factory DedicatedWorkerCompatibilityResult.fromJsPayload(Object payload) { @@ -268,6 +339,7 @@ final class DedicatedWorkerCompatibilityResult extends CompatibilityResult { indexedDbExists: getProperty(payload, 'indexedDbExists'), opfsExists: getProperty(payload, 'opfsExists'), existingDatabases: existingDatabases, + version: ProtocolVersion.fromJsObject(payload), ); } @@ -283,6 +355,7 @@ final class DedicatedWorkerCompatibilityResult extends CompatibilityResult { setProperty(object, 'indexedDbExists', indexedDbExists); setProperty(object, 'opfsExists', opfsExists); setProperty(object, 'existing', existingDatabases.encodeToJs()); + version.writeToJs(object); sender.sendTyped(type, object); } diff --git a/drift/lib/src/web/wasm_setup/shared.dart b/drift/lib/src/web/wasm_setup/shared.dart index 389b330d6..ab582b8a3 100644 --- a/drift/lib/src/web/wasm_setup/shared.dart +++ b/drift/lib/src/web/wasm_setup/shared.dart @@ -9,6 +9,7 @@ import 'package:js/js_util.dart'; // ignore: implementation_imports import 'package:sqlite3/src/wasm/js_interop/file_system_access.dart'; import 'package:sqlite3/wasm.dart'; +import 'package:stream_channel/stream_channel.dart'; import '../channel.dart'; import 'protocol.dart'; @@ -196,15 +197,21 @@ class DriftServerController { initializer: initializer, ))); - return RunningWasmServer(message.storage, server); + final wasmServer = RunningWasmServer(message.storage, server); + wasmServer.lastClientDisconnected.whenComplete(() { + servers.remove(message.databaseName); + wasmServer.server.shutdown(); + }); + return wasmServer; }); - server.server.serve(message.port.channel()); + server.serve(message.port + .channel(explicitClose: message.protocolVersion >= ProtocolVersion.v1)); } /// Loads a new sqlite3 WASM module, registers an appropriate VFS for [storage] /// and finally opens a database, creating it if it doesn't exist. - Future openConnection({ + Future openConnection({ required Uri sqlite3WasmUri, required String databaseName, required WasmStorageImplementation storage, @@ -212,15 +219,24 @@ class DriftServerController { }) async { final sqlite3 = await WasmSqlite3.loadFromUrl(sqlite3WasmUri); - final vfs = await switch (storage) { - WasmStorageImplementation.opfsShared => - SimpleOpfsFileSystem.loadFromStorage(pathForOpfs(databaseName)), - WasmStorageImplementation.opfsLocks => _loadLockedWasmVfs(databaseName), - WasmStorageImplementation.unsafeIndexedDb || - WasmStorageImplementation.sharedIndexedDb => - IndexedDbFileSystem.open(dbName: databaseName), - WasmStorageImplementation.inMemory => Future.value(InMemoryFileSystem()), - }; + VirtualFileSystem vfs; + void Function()? close; + + switch (storage) { + case WasmStorageImplementation.opfsShared: + final simple = vfs = await SimpleOpfsFileSystem.loadFromStorage( + pathForOpfs(databaseName)); + close = simple.close; + case WasmStorageImplementation.opfsLocks: + final locks = vfs = await _loadLockedWasmVfs(databaseName); + close = locks.close; + case WasmStorageImplementation.unsafeIndexedDb: + case WasmStorageImplementation.sharedIndexedDb: + final idb = vfs = await IndexedDbFileSystem.open(dbName: databaseName); + close = idb.close; + case WasmStorageImplementation.inMemory: + vfs = InMemoryFileSystem(); + } if (initializer != null && vfs.xAccess('/database', 0) == 0) { final response = await initializer(); @@ -234,7 +250,13 @@ class DriftServerController { } sqlite3.registerVirtualFileSystem(vfs, makeDefault: true); - return WasmDatabase(sqlite3: sqlite3, path: '/database', setup: _setup); + var db = WasmDatabase(sqlite3: sqlite3, path: '/database', setup: _setup); + + if (close != null) { + return db.interceptWith(_CloseVfsOnClose(close)); + } else { + return db; + } } Future _loadLockedWasmVfs(String databaseName) async { @@ -253,6 +275,20 @@ class DriftServerController { } } +class _CloseVfsOnClose extends QueryInterceptor { + final FutureOr Function() _close; + + _CloseVfsOnClose(this._close); + + @override + Future close(QueryExecutor inner) async { + await inner.close(); + if (inner is! TransactionExecutor) { + await _close(); + } + } +} + /// Information about a running drift server in a web worker. class RunningWasmServer { /// The storage implementation used by the VFS of this server. @@ -261,8 +297,32 @@ class RunningWasmServer { /// The server hosting the drift database. final DriftServer server; + int _connectedClients = 0; + final Completer _lastClientDisconnected = Completer.sync(); + + /// A future that completes synchronously after all [serve]d connections have + /// closed. + Future get lastClientDisconnected => _lastClientDisconnected.future; + /// Default constructor RunningWasmServer(this.storage, this.server); + + /// Tracks a new connection and serves drift database requests over it. + void serve(StreamChannel channel) { + _connectedClients++; + + server.serve( + channel.transformStream(StreamTransformer.fromHandlers( + handleDone: (sink) { + if (--_connectedClients == 0) { + _lastClientDisconnected.complete(); + } + + sink.close(); + }, + )), + ); + } } /// Reported compatibility results with IndexedDB and OPFS. diff --git a/drift/lib/src/web/wasm_setup/shared_worker.dart b/drift/lib/src/web/wasm_setup/shared_worker.dart index 595b95c46..a5c32e8c4 100644 --- a/drift/lib/src/web/wasm_setup/shared_worker.dart +++ b/drift/lib/src/web/wasm_setup/shared_worker.dart @@ -78,6 +78,7 @@ class SharedDriftWorker { indexedDbExists: indexedDbExists, opfsExists: false, existingDatabases: const [], + version: ProtocolVersion.current, ); } else { final worker = _dedicatedWorker ??= Worker(Uri.base.toString()); @@ -102,6 +103,7 @@ class SharedDriftWorker { indexedDbExists: indexedDbExists, opfsExists: opfsExists, existingDatabases: databases, + version: ProtocolVersion.current, )); messageSubscription?.cancel(); diff --git a/extras/integration_tests/web_wasm/lib/driver.dart b/extras/integration_tests/web_wasm/lib/driver.dart index 7f1793730..6a58ffffa 100644 --- a/extras/integration_tests/web_wasm/lib/driver.dart +++ b/extras/integration_tests/web_wasm/lib/driver.dart @@ -135,6 +135,10 @@ class DriftWebDriver { 'open(arguments[0], arguments[1])', [implementation?.name]); } + Future closeDatabase() async { + await driver.executeAsync("close('', arguments[0])", []); + } + Future insertIntoDatabase() async { await driver.executeAsync('insert("", arguments[0])', []); } diff --git a/extras/integration_tests/web_wasm/test/drift_wasm_test.dart b/extras/integration_tests/web_wasm/test/drift_wasm_test.dart index 2b99d4f72..77c1145fa 100644 --- a/extras/integration_tests/web_wasm/test/drift_wasm_test.dart +++ b/extras/integration_tests/web_wasm/test/drift_wasm_test.dart @@ -144,7 +144,7 @@ void main() { await driver.insertIntoDatabase(); await driver.waitForTableUpdate(); - await driver.driver.refresh(); // Reset JS state + await driver.closeDatabase(); final newImpls = await driver.probeImplementations(); expect(newImpls.existing, hasLength(1)); diff --git a/extras/integration_tests/web_wasm/web/main.dart b/extras/integration_tests/web_wasm/web/main.dart index c43dc99c9..b3742a96a 100644 --- a/extras/integration_tests/web_wasm/web/main.dart +++ b/extras/integration_tests/web_wasm/web/main.dart @@ -22,6 +22,9 @@ InitializationMode initializationMode = InitializationMode.none; void main() { _addCallbackForWebDriver('detectImplementations', _detectImplementations); _addCallbackForWebDriver('open', _open); + _addCallbackForWebDriver('close', (arg) async { + await openedDatabase?.close(); + }); _addCallbackForWebDriver('insert', _insert); _addCallbackForWebDriver('get_rows', _getRows); _addCallbackForWebDriver('wait_for_update', _waitForUpdate);