From dd64781ffe14224ae9c2e9dcc21c0c7c22798257 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gwendal=20Roue=CC=81?= Date: Sun, 31 Mar 2024 08:51:57 +0200 Subject: [PATCH] WALSnapshotTransaction is Sendable # Conflicts: # TODO.md --- GRDB/Core/DatabaseError.swift | 4 + GRDB/Core/DatabaseSnapshotPool.swift | 2 +- GRDB/Core/WALSnapshotTransaction.swift | 80 +++++++++++++------ .../Observers/ValueConcurrentObserver.swift | 15 ++-- TODO.md | 2 +- 5 files changed, 69 insertions(+), 34 deletions(-) diff --git a/GRDB/Core/DatabaseError.swift b/GRDB/Core/DatabaseError.swift index 524c18d63e..bf808dbaa5 100644 --- a/GRDB/Core/DatabaseError.swift +++ b/GRDB/Core/DatabaseError.swift @@ -415,6 +415,10 @@ extension DatabaseError { static func connectionIsClosed() -> Self { DatabaseError(resultCode: .SQLITE_MISUSE, message: "Connection is closed") } + + static func snapshotIsLost() -> Self { + DatabaseError(resultCode: .SQLITE_ABORT, message: "Snapshot is lost.") + } } // Support for `catch DatabaseError.SQLITE_XXX` diff --git a/GRDB/Core/DatabaseSnapshotPool.swift b/GRDB/Core/DatabaseSnapshotPool.swift index 935f3ccf47..715dabf72e 100644 --- a/GRDB/Core/DatabaseSnapshotPool.swift +++ b/GRDB/Core/DatabaseSnapshotPool.swift @@ -318,7 +318,7 @@ extension DatabaseSnapshotPool: DatabaseSnapshotReader { return try reader.reentrantSync { db in let result = try value(db) if snapshotIsLost(db) { - throw DatabaseError(resultCode: .SQLITE_ABORT, message: "Snapshot is lost.") + throw DatabaseError.snapshotIsLost() } return result } diff --git a/GRDB/Core/WALSnapshotTransaction.swift b/GRDB/Core/WALSnapshotTransaction.swift index 4340b67e4e..eb6597fe1a 100644 --- a/GRDB/Core/WALSnapshotTransaction.swift +++ b/GRDB/Core/WALSnapshotTransaction.swift @@ -3,9 +3,28 @@ /// /// `WALSnapshotTransaction` **takes ownership** of its reader /// `SerializedDatabase` (TODO: make it a move-only type eventually). -class WALSnapshotTransaction { - private let reader: SerializedDatabase - private let release: (_ isInsideTransaction: Bool) -> Void +final class WALSnapshotTransaction: @unchecked Sendable { + // @unchecked because `databaseAccess` is protected by a mutex. + + private struct DatabaseAccess { + let reader: SerializedDatabase + let release: @Sendable (_ isInsideTransaction: Bool) -> Void + + // MUST be called only once + func commitAndRelease() { + // WALSnapshotTransaction may be deinitialized in the dispatch + // queue of its reader: allow reentrancy. + let isInsideTransaction = reader.reentrantSync(allowingLongLivedTransaction: false) { db in + try? db.commit() + return db.isInsideTransaction + } + release(isInsideTransaction) + } + } + + // TODO: consider using the serialized DispatchQueue of reader instead of a lock. + /// nil when closed + private let databaseAccessMutex: Mutex /// The state of the database at the beginning of the transaction. let walSnapshot: WALSnapshot @@ -36,10 +55,11 @@ class WALSnapshotTransaction { /// is no longer used. init( onReader reader: SerializedDatabase, - release: @escaping (_ isInsideTransaction: Bool) -> Void) + release: @escaping @Sendable (_ isInsideTransaction: Bool) -> Void) throws { assert(reader.configuration.readonly) + let databaseAccess = DatabaseAccess(reader: reader, release: release) do { // Open a long-lived transaction, and enter snapshot isolation @@ -50,44 +70,56 @@ class WALSnapshotTransaction { try db.clearSchemaCacheIfNeeded() return try WALSnapshot(db) } - self.reader = reader - self.release = release + self.databaseAccessMutex = Mutex(databaseAccess) } catch { // self is not initialized, so deinit will not run. - Self.commitAndRelease(reader: reader, release: release) + databaseAccess.commitAndRelease() throw error } } deinit { - Self.commitAndRelease(reader: reader, release: release) + close() } /// Executes database operations in the snapshot transaction, and /// returns their result after they have finished executing. - func read(_ value: (Database) throws -> T) rethrows -> T { - // We should check the validity of the snapshot, as DatabaseSnapshotPool does. - try reader.sync(value) + func read(_ value: (Database) throws -> T) throws -> T { + try databaseAccessMutex.withLock { databaseAccess in + guard let databaseAccess else { + throw DatabaseError.snapshotIsLost() + } + + // We should check the validity of the snapshot, as DatabaseSnapshotPool does. + return try databaseAccess.reader.sync(value) + } } /// Schedules database operations for execution, and /// returns immediately. - func asyncRead(_ value: @escaping (Database) -> Void) { - // We should check the validity of the snapshot, as DatabaseSnapshotPool does. - reader.async(value) + func asyncRead(_ value: @escaping @Sendable (Result) -> Void) { + databaseAccessMutex.withLock { databaseAccess in + guard let databaseAccess else { + value(.failure(DatabaseError.snapshotIsLost())) + return + } + + databaseAccess.reader.async { db in + // We should check the validity of the snapshot, as DatabaseSnapshotPool does. + // At least check if self was closed: + if self.databaseAccessMutex.load() == nil { + value(.failure(DatabaseError.snapshotIsLost())) + } + value(.success(db)) + } + } } - private static func commitAndRelease( - reader: SerializedDatabase, - release: (_ isInsideTransaction: Bool) -> Void) - { - // WALSnapshotTransaction may be deinitialized in the dispatch - // queue of its reader: allow reentrancy. - let isInsideTransaction = reader.reentrantSync(allowingLongLivedTransaction: false) { db in - try? db.commit() - return db.isInsideTransaction + func close() { + databaseAccessMutex.withLock { databaseAccess in + databaseAccess?.commitAndRelease() + databaseAccess = nil } - release(isInsideTransaction) } } #endif diff --git a/GRDB/ValueObservation/Observers/ValueConcurrentObserver.swift b/GRDB/ValueObservation/Observers/ValueConcurrentObserver.swift index e57aef0983..d4cc4b0304 100644 --- a/GRDB/ValueObservation/Observers/ValueConcurrentObserver.swift +++ b/GRDB/ValueObservation/Observers/ValueConcurrentObserver.swift @@ -352,10 +352,11 @@ extension ValueConcurrentObserver { let initialFetchTransaction = try result.get() // Second async jump because that's how // `DatabasePool.asyncWALSnapshotTransaction` has to be used. - initialFetchTransaction.asyncRead { db in + initialFetchTransaction.asyncRead { dbResult in do { let fetchedValue: Reducer.Fetched let initialRegion: DatabaseRegion + let db = try dbResult.get() switch self.trackingMode { case let .constantRegion(regions): @@ -429,11 +430,9 @@ extension ValueConcurrentObserver { // checkpointed. That's why we'll keep `initialFetchTransaction` // alive until the comparison is done. // - // However, we want to release `initialFetchTransaction` as soon as + // However, we want to close `initialFetchTransaction` as soon as // possible, so that the reader connection it holds becomes - // available for other reads. It will be released when this optional - // is set to nil: - var initialFetchTransaction: WALSnapshotTransaction? = initialFetchTransaction + // available for other reads. databaseAccess.dbPool.asyncWriteWithoutTransaction { writerDB in let events = self.lock.synchronized { self.notificationCallbacks?.events } @@ -446,7 +445,7 @@ extension ValueConcurrentObserver { // Was the database modified since the initial fetch? let isModified: Bool if let currentWALSnapshot = try? WALSnapshot(writerDB) { - let ordering = initialFetchTransaction!.walSnapshot.compare(currentWALSnapshot) + let ordering = initialFetchTransaction.walSnapshot.compare(currentWALSnapshot) assert(ordering <= 0, "Unexpected snapshot ordering") isModified = ordering < 0 } else { @@ -454,9 +453,9 @@ extension ValueConcurrentObserver { isModified = true } - // Comparison done: end the WAL snapshot transaction + // Comparison done: close the WAL snapshot transaction // and release its reader connection. - initialFetchTransaction = nil + initialFetchTransaction.close() if isModified { events.databaseDidChange?() diff --git a/TODO.md b/TODO.md index 1fd95c8d99..b1906820e6 100644 --- a/TODO.md +++ b/TODO.md @@ -113,7 +113,7 @@ - [X] GRDB7: Sendable: ReadWriteBox (57a86a0e) - [X] GRDB7: Sendable: Pool (f13b2d2e) - [X] GRDB7: Sendable: OnDemandFuture fulfill (2aabc4c1) -- [ ] GRDB7: Sendable: WALSnapshotTransaction (7fd34012) +- [X] GRDB7: Sendable: WALSnapshotTransaction (7fd34012) - [ ] GRDB7: sending closures for SerializedDatabase - [ ] GRDB7: sending closures for ValueObservationScheduler - [ ] GRDB7: Sendable closures for ValueObservation.handleEvents