Skip to content

Commit

Permalink
Fix cancelling transactions
Browse files Browse the repository at this point in the history
When the initial (unawaited) `synchronized` call when opening a
transaction threw a cancellation exception, there would be no
surrounding try/catch, causing an unhandled exception.

Found through #2818 (comment)
  • Loading branch information
simolus3 committed Jan 2, 2025
1 parent 4dd6bc8 commit 0901c98
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 18 deletions.
2 changes: 2 additions & 0 deletions drift/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

- Fix `TableStatements.insertAll` to only apply a database-specific pragma for
SQLite databases.
- Don't attempt to roll-back transactions that failed to begin.
- Fix unhandled exception when cancelling transactions.

## 2.23.0

Expand Down
4 changes: 2 additions & 2 deletions drift/lib/src/runtime/api/connection_user.dart
Original file line number Diff line number Diff line change
Expand Up @@ -494,9 +494,9 @@ abstract class DatabaseConnectionUser {

return _runConnectionZoned(transaction, () async {
var success = false;
try {
await transactionExecutor.ensureOpen(attachedDatabase);

await transactionExecutor.ensureOpen(attachedDatabase);
try {
final result = await action();
success = true;
return result;
Expand Down
20 changes: 12 additions & 8 deletions drift/lib/src/runtime/executor/helpers/engines.dart
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,12 @@ without awaiting every statement in it.''');
return true;
}

Future<T> _synchronized<T>(Future<T> Function() action) {
Future<T> _synchronized<T>(Future<T> Function() action,
{bool abortIfCancelled = true}) {
if (isSequential || _waitingChildExecutors > 0) {
return _lock.synchronized(() {
checkIfCancelled();
return action();
return _lock.synchronized(() async {
if (abortIfCancelled) checkIfCancelled();
return await action();
});
} else {
// support multiple operations in parallel, so just run right away
Expand Down Expand Up @@ -221,13 +222,16 @@ class _StatementBasedTransactionExecutor extends _TransactionExecutor {
final parent = _parent;
parent._waitingChildExecutors++;

unawaited(parent._synchronized(() async {
unawaited(parent._synchronized(abortIfCancelled: false, () async {
try {
checkIfCancelled();
await runCustom(_startCommand);
_db.delegate.isInTransaction = true;
_opened!.complete(true);
} catch (e, s) {
_opened!.completeError(e, s);

_release();
}

// release the database lock after the transaction completes
Expand Down Expand Up @@ -256,7 +260,7 @@ class _StatementBasedTransactionExecutor extends _TransactionExecutor {
if (!_ensureOpenCalled) return;

await runCustom(_commitCommand, const []);
_afterCommitOrRollback();
_release();
}

@override
Expand All @@ -272,11 +276,11 @@ class _StatementBasedTransactionExecutor extends _TransactionExecutor {
// When aborting fails too, something is seriously wrong already. Let's
// at least make sure that we don't block the rest of the db by pretending
// the transaction is still open.
_afterCommitOrRollback();
_release();
}
}

void _afterCommitOrRollback() {
void _release() {
if (depth == 0) {
_db.delegate.isInTransaction = false;
}
Expand Down
4 changes: 2 additions & 2 deletions drift/lib/src/runtime/executor/stream_queries.dart
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ class StreamQueryStore {
}

class QueryStream<Rows extends Object> {
final QueryStreamFetcher _fetcher;
final QueryStreamFetcher<Rows> _fetcher;
final StreamQueryStore _store;
final DatabaseConnectionUser _database;

Expand Down Expand Up @@ -337,7 +337,7 @@ class QueryStream<Rows extends Object> {
}

if (operation.isCancelled) return;
runCancellable(_fetcher.fetchData, token: operation);
runCancellable<Rows>(_fetcher.fetchData, token: operation);
final data = await operation.resultOrNullIfCancelled;
if (data == null) return;

Expand Down
27 changes: 21 additions & 6 deletions drift/test/integration_tests/database_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ library;

import 'package:drift/drift.dart';
import 'package:drift/native.dart';
import 'package:drift/src/runtime/cancellation_zone.dart';
import 'package:sqlite3/sqlite3.dart';
import 'package:test/test.dart';

Expand Down Expand Up @@ -32,12 +33,10 @@ void main() {
return driftDb.select(driftDb.categories).get();
}),
throwsA(
isA<CouldNotRollBackException>().having(
(e) => e.cause,
'cause',
isA<SqliteException>().having((e) => e.causingStatement,
'causingStatement', 'BEGIN TRANSACTION'),
),
isA<SqliteException>()
.having((e) => e.causingStatement, 'causingStatement',
'BEGIN TRANSACTION')
.having((e) => e.extendedResultCode, 'resultCode', 262),
),
);

Expand Down Expand Up @@ -153,4 +152,20 @@ void main() {
final results = await Future.wait<void>([fut1, fut2]);
expect(results, [7, 5]);
});

test('can cancel opening transactions', () async {
final db = TodoDb(NativeDatabase.memory());
addTearDown(db.close);

final token = CancellationToken<void>()..cancel();

runCancellable(() async {
await db.transaction(() async {
throw 'should not be reached';
});
}, token: token);
expect(token.result, throwsA(isA<CancellationException>()));

await db.customSelect('SELECT 1').get();
});
}

0 comments on commit 0901c98

Please sign in to comment.