Skip to content

Commit

Permalink
WALSnapshotTransaction is Sendable
Browse files Browse the repository at this point in the history
# Conflicts:
#	TODO.md
  • Loading branch information
groue committed Aug 26, 2024
1 parent aff662c commit dd64781
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 34 deletions.
4 changes: 4 additions & 0 deletions GRDB/Core/DatabaseError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,10 @@ extension DatabaseError {
static func connectionIsClosed() -> Self {
DatabaseError(resultCode: .SQLITE_MISUSE, message: "Connection is closed")
}

static func snapshotIsLost() -> Self {
DatabaseError(resultCode: .SQLITE_ABORT, message: "Snapshot is lost.")
}
}

// Support for `catch DatabaseError.SQLITE_XXX`
Expand Down
2 changes: 1 addition & 1 deletion GRDB/Core/DatabaseSnapshotPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ extension DatabaseSnapshotPool: DatabaseSnapshotReader {
return try reader.reentrantSync { db in
let result = try value(db)
if snapshotIsLost(db) {
throw DatabaseError(resultCode: .SQLITE_ABORT, message: "Snapshot is lost.")
throw DatabaseError.snapshotIsLost()
}
return result
}
Expand Down
80 changes: 56 additions & 24 deletions GRDB/Core/WALSnapshotTransaction.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,28 @@
///
/// `WALSnapshotTransaction` **takes ownership** of its reader
/// `SerializedDatabase` (TODO: make it a move-only type eventually).
class WALSnapshotTransaction {
private let reader: SerializedDatabase
private let release: (_ isInsideTransaction: Bool) -> Void
final class WALSnapshotTransaction: @unchecked Sendable {
// @unchecked because `databaseAccess` is protected by a mutex.

private struct DatabaseAccess {
let reader: SerializedDatabase
let release: @Sendable (_ isInsideTransaction: Bool) -> Void

// MUST be called only once
func commitAndRelease() {
// WALSnapshotTransaction may be deinitialized in the dispatch
// queue of its reader: allow reentrancy.
let isInsideTransaction = reader.reentrantSync(allowingLongLivedTransaction: false) { db in
try? db.commit()
return db.isInsideTransaction
}
release(isInsideTransaction)
}
}

// TODO: consider using the serialized DispatchQueue of reader instead of a lock.
/// nil when closed
private let databaseAccessMutex: Mutex<DatabaseAccess?>

/// The state of the database at the beginning of the transaction.
let walSnapshot: WALSnapshot
Expand Down Expand Up @@ -36,10 +55,11 @@ class WALSnapshotTransaction {
/// is no longer used.
init(
onReader reader: SerializedDatabase,
release: @escaping (_ isInsideTransaction: Bool) -> Void)
release: @escaping @Sendable (_ isInsideTransaction: Bool) -> Void)
throws
{
assert(reader.configuration.readonly)
let databaseAccess = DatabaseAccess(reader: reader, release: release)

do {
// Open a long-lived transaction, and enter snapshot isolation
Expand All @@ -50,44 +70,56 @@ class WALSnapshotTransaction {
try db.clearSchemaCacheIfNeeded()
return try WALSnapshot(db)
}
self.reader = reader
self.release = release
self.databaseAccessMutex = Mutex(databaseAccess)
} catch {
// self is not initialized, so deinit will not run.
Self.commitAndRelease(reader: reader, release: release)
databaseAccess.commitAndRelease()
throw error
}
}

deinit {
Self.commitAndRelease(reader: reader, release: release)
close()
}

/// Executes database operations in the snapshot transaction, and
/// returns their result after they have finished executing.
func read<T>(_ value: (Database) throws -> T) rethrows -> T {
// We should check the validity of the snapshot, as DatabaseSnapshotPool does.
try reader.sync(value)
func read<T>(_ value: (Database) throws -> T) throws -> T {
try databaseAccessMutex.withLock { databaseAccess in
guard let databaseAccess else {
throw DatabaseError.snapshotIsLost()
}

// We should check the validity of the snapshot, as DatabaseSnapshotPool does.
return try databaseAccess.reader.sync(value)
}
}

/// Schedules database operations for execution, and
/// returns immediately.
func asyncRead(_ value: @escaping (Database) -> Void) {
// We should check the validity of the snapshot, as DatabaseSnapshotPool does.
reader.async(value)
func asyncRead(_ value: @escaping @Sendable (Result<Database, Error>) -> Void) {
databaseAccessMutex.withLock { databaseAccess in
guard let databaseAccess else {
value(.failure(DatabaseError.snapshotIsLost()))
return
}

databaseAccess.reader.async { db in
// We should check the validity of the snapshot, as DatabaseSnapshotPool does.
// At least check if self was closed:
if self.databaseAccessMutex.load() == nil {
value(.failure(DatabaseError.snapshotIsLost()))
}
value(.success(db))
}
}
}

private static func commitAndRelease(
reader: SerializedDatabase,
release: (_ isInsideTransaction: Bool) -> Void)
{
// WALSnapshotTransaction may be deinitialized in the dispatch
// queue of its reader: allow reentrancy.
let isInsideTransaction = reader.reentrantSync(allowingLongLivedTransaction: false) { db in
try? db.commit()
return db.isInsideTransaction
func close() {
databaseAccessMutex.withLock { databaseAccess in
databaseAccess?.commitAndRelease()
databaseAccess = nil
}
release(isInsideTransaction)
}
}
#endif
15 changes: 7 additions & 8 deletions GRDB/ValueObservation/Observers/ValueConcurrentObserver.swift
Original file line number Diff line number Diff line change
Expand Up @@ -352,10 +352,11 @@ extension ValueConcurrentObserver {
let initialFetchTransaction = try result.get()
// Second async jump because that's how
// `DatabasePool.asyncWALSnapshotTransaction` has to be used.
initialFetchTransaction.asyncRead { db in
initialFetchTransaction.asyncRead { dbResult in
do {
let fetchedValue: Reducer.Fetched
let initialRegion: DatabaseRegion
let db = try dbResult.get()

switch self.trackingMode {
case let .constantRegion(regions):
Expand Down Expand Up @@ -429,11 +430,9 @@ extension ValueConcurrentObserver {
// checkpointed. That's why we'll keep `initialFetchTransaction`
// alive until the comparison is done.
//
// However, we want to release `initialFetchTransaction` as soon as
// However, we want to close `initialFetchTransaction` as soon as
// possible, so that the reader connection it holds becomes
// available for other reads. It will be released when this optional
// is set to nil:
var initialFetchTransaction: WALSnapshotTransaction? = initialFetchTransaction
// available for other reads.

databaseAccess.dbPool.asyncWriteWithoutTransaction { writerDB in
let events = self.lock.synchronized { self.notificationCallbacks?.events }
Expand All @@ -446,17 +445,17 @@ extension ValueConcurrentObserver {
// Was the database modified since the initial fetch?
let isModified: Bool
if let currentWALSnapshot = try? WALSnapshot(writerDB) {
let ordering = initialFetchTransaction!.walSnapshot.compare(currentWALSnapshot)
let ordering = initialFetchTransaction.walSnapshot.compare(currentWALSnapshot)
assert(ordering <= 0, "Unexpected snapshot ordering")
isModified = ordering < 0
} else {
// Can't compare: assume the database was modified.
isModified = true
}

// Comparison done: end the WAL snapshot transaction
// Comparison done: close the WAL snapshot transaction
// and release its reader connection.
initialFetchTransaction = nil
initialFetchTransaction.close()

if isModified {
events.databaseDidChange?()
Expand Down
2 changes: 1 addition & 1 deletion TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@
- [X] GRDB7: Sendable: ReadWriteBox (57a86a0e)
- [X] GRDB7: Sendable: Pool (f13b2d2e)
- [X] GRDB7: Sendable: OnDemandFuture fulfill (2aabc4c1)
- [ ] GRDB7: Sendable: WALSnapshotTransaction (7fd34012)
- [X] GRDB7: Sendable: WALSnapshotTransaction (7fd34012)
- [ ] GRDB7: sending closures for SerializedDatabase
- [ ] GRDB7: sending closures for ValueObservationScheduler
- [ ] GRDB7: Sendable closures for ValueObservation.handleEvents
Expand Down

0 comments on commit dd64781

Please sign in to comment.