From 0901c984a987e54ba0b67e227adbfdcf3c08eeb4 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 2 Jan 2025 23:45:15 +0100 Subject: [PATCH] Fix cancelling transactions When the initial (unawaited) `synchronized` call when opening a transaction threw a cancellation exception, there would be no surrounding try/catch, causing an unhandled exception. Found through https://github.com/simolus3/drift/issues/2818#issuecomment-2567702603 --- drift/CHANGELOG.md | 2 ++ .../lib/src/runtime/api/connection_user.dart | 4 +-- .../src/runtime/executor/helpers/engines.dart | 20 ++++++++------ .../src/runtime/executor/stream_queries.dart | 4 +-- .../test/integration_tests/database_test.dart | 27 ++++++++++++++----- 5 files changed, 39 insertions(+), 18 deletions(-) diff --git a/drift/CHANGELOG.md b/drift/CHANGELOG.md index 132a7ce22..03ad272ec 100644 --- a/drift/CHANGELOG.md +++ b/drift/CHANGELOG.md @@ -2,6 +2,8 @@ - Fix `TableStatements.insertAll` to only apply a database-specific pragma for SQLite databases. +- Don't attempt to roll-back transactions that failed to begin. +- Fix unhandled exception when cancelling transactions. ## 2.23.0 diff --git a/drift/lib/src/runtime/api/connection_user.dart b/drift/lib/src/runtime/api/connection_user.dart index 822d22c43..00c299200 100644 --- a/drift/lib/src/runtime/api/connection_user.dart +++ b/drift/lib/src/runtime/api/connection_user.dart @@ -494,9 +494,9 @@ abstract class DatabaseConnectionUser { return _runConnectionZoned(transaction, () async { var success = false; - try { - await transactionExecutor.ensureOpen(attachedDatabase); + await transactionExecutor.ensureOpen(attachedDatabase); + try { final result = await action(); success = true; return result; diff --git a/drift/lib/src/runtime/executor/helpers/engines.dart b/drift/lib/src/runtime/executor/helpers/engines.dart index 6a8cce67e..f9e404781 100644 --- a/drift/lib/src/runtime/executor/helpers/engines.dart +++ b/drift/lib/src/runtime/executor/helpers/engines.dart @@ -50,11 +50,12 @@ without awaiting every statement in it.'''); return true; } - Future _synchronized(Future Function() action) { + Future _synchronized(Future Function() action, + {bool abortIfCancelled = true}) { if (isSequential || _waitingChildExecutors > 0) { - return _lock.synchronized(() { - checkIfCancelled(); - return action(); + return _lock.synchronized(() async { + if (abortIfCancelled) checkIfCancelled(); + return await action(); }); } else { // support multiple operations in parallel, so just run right away @@ -221,13 +222,16 @@ class _StatementBasedTransactionExecutor extends _TransactionExecutor { final parent = _parent; parent._waitingChildExecutors++; - unawaited(parent._synchronized(() async { + unawaited(parent._synchronized(abortIfCancelled: false, () async { try { + checkIfCancelled(); await runCustom(_startCommand); _db.delegate.isInTransaction = true; _opened!.complete(true); } catch (e, s) { _opened!.completeError(e, s); + + _release(); } // release the database lock after the transaction completes @@ -256,7 +260,7 @@ class _StatementBasedTransactionExecutor extends _TransactionExecutor { if (!_ensureOpenCalled) return; await runCustom(_commitCommand, const []); - _afterCommitOrRollback(); + _release(); } @override @@ -272,11 +276,11 @@ class _StatementBasedTransactionExecutor extends _TransactionExecutor { // When aborting fails too, something is seriously wrong already. Let's // at least make sure that we don't block the rest of the db by pretending // the transaction is still open. - _afterCommitOrRollback(); + _release(); } } - void _afterCommitOrRollback() { + void _release() { if (depth == 0) { _db.delegate.isInTransaction = false; } diff --git a/drift/lib/src/runtime/executor/stream_queries.dart b/drift/lib/src/runtime/executor/stream_queries.dart index 4a368bed9..ee7eb826b 100644 --- a/drift/lib/src/runtime/executor/stream_queries.dart +++ b/drift/lib/src/runtime/executor/stream_queries.dart @@ -193,7 +193,7 @@ class StreamQueryStore { } class QueryStream { - final QueryStreamFetcher _fetcher; + final QueryStreamFetcher _fetcher; final StreamQueryStore _store; final DatabaseConnectionUser _database; @@ -337,7 +337,7 @@ class QueryStream { } if (operation.isCancelled) return; - runCancellable(_fetcher.fetchData, token: operation); + runCancellable(_fetcher.fetchData, token: operation); final data = await operation.resultOrNullIfCancelled; if (data == null) return; diff --git a/drift/test/integration_tests/database_test.dart b/drift/test/integration_tests/database_test.dart index 3a5d2f39d..a5fc19c06 100644 --- a/drift/test/integration_tests/database_test.dart +++ b/drift/test/integration_tests/database_test.dart @@ -4,6 +4,7 @@ library; import 'package:drift/drift.dart'; import 'package:drift/native.dart'; +import 'package:drift/src/runtime/cancellation_zone.dart'; import 'package:sqlite3/sqlite3.dart'; import 'package:test/test.dart'; @@ -32,12 +33,10 @@ void main() { return driftDb.select(driftDb.categories).get(); }), throwsA( - isA().having( - (e) => e.cause, - 'cause', - isA().having((e) => e.causingStatement, - 'causingStatement', 'BEGIN TRANSACTION'), - ), + isA() + .having((e) => e.causingStatement, 'causingStatement', + 'BEGIN TRANSACTION') + .having((e) => e.extendedResultCode, 'resultCode', 262), ), ); @@ -153,4 +152,20 @@ void main() { final results = await Future.wait([fut1, fut2]); expect(results, [7, 5]); }); + + test('can cancel opening transactions', () async { + final db = TodoDb(NativeDatabase.memory()); + addTearDown(db.close); + + final token = CancellationToken()..cancel(); + + runCancellable(() async { + await db.transaction(() async { + throw 'should not be reached'; + }); + }, token: token); + expect(token.result, throwsA(isA())); + + await db.customSelect('SELECT 1').get(); + }); }