Skip to content

Commit

Permalink
Refactor ValueReducer for Swift concurrency
Browse files Browse the repository at this point in the history
The fetching facet of a ValueReducer is Sendable.
  • Loading branch information
groue committed Aug 27, 2024
1 parent aac8720 commit 849bba2
Show file tree
Hide file tree
Showing 11 changed files with 140 additions and 88 deletions.
2 changes: 1 addition & 1 deletion GRDB/Documentation.docc/Extension/ValueObservation.md
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,6 @@ When needed, you can help GRDB optimize observations and reduce database content
- ``handleEvents(willStart:willFetch:willTrackRegion:databaseDidChange:didReceiveValue:didFail:didCancel:)``
- ``print(_:to:)``

### Support
### Supporting Types

- ``ValueReducer``
38 changes: 21 additions & 17 deletions GRDB/ValueObservation/Observers/ValueConcurrentObserver.swift
Original file line number Diff line number Diff line change
Expand Up @@ -76,25 +76,25 @@ final class ValueConcurrentObserver<Reducer: ValueReducer, Scheduler: ValueObser
/// The observed DatabasePool.
let dbPool: DatabasePool

/// A reducer that fetches database values.
private let reducer: Reducer
/// The fetcher that fetches database values.
private let fetcher: Reducer.Fetcher

init(dbPool: DatabasePool, reducer: Reducer) {
init(dbPool: DatabasePool, fetcher: Reducer.Fetcher) {
self.dbPool = dbPool
self.reducer = reducer
self.fetcher = fetcher
}

func fetch(_ db: Database) throws -> Reducer.Fetched {
func fetch(_ db: Database) throws -> Reducer.Fetcher.Value {
try db.isolated(readOnly: true) {
try reducer._fetch(db)
try fetcher.fetch(db)
}
}

func fetchRecordingObservedRegion(_ db: Database) throws -> (Reducer.Fetched, DatabaseRegion) {
func fetchRecordingObservedRegion(_ db: Database) throws -> (Reducer.Fetcher.Value, DatabaseRegion) {
var region = DatabaseRegion()
let fetchedValue = try db.isolated(readOnly: true) {
try db.recordingSelection(&region) {
try reducer._fetch(db)
try fetcher.fetch(db)
}
}
return try (fetchedValue, region.observableRegion(db))
Expand Down Expand Up @@ -170,9 +170,9 @@ final class ValueConcurrentObserver<Reducer: ValueReducer, Scheduler: ValueObser
// State
self.databaseAccess = DatabaseAccess(
dbPool: dbPool,
// ValueReducer semantics guarantees that reducer._fetch
// ValueReducer semantics guarantees that the fetcher
// is independent from the reducer state
reducer: reducer)
fetcher: reducer._makeFetcher())
self.notificationCallbacks = NotificationCallbacks(events: events, onChange: onChange)
self.reducer = reducer
self.reduceQueue = DispatchQueue(
Expand Down Expand Up @@ -298,7 +298,9 @@ extension ValueConcurrentObserver {
return try syncStartWithoutWALSnapshot(from: databaseAccess)
}

let (fetchedValue, initialRegion): (Reducer.Fetched, DatabaseRegion) = try initialFetchTransaction.read { db in
let fetchedValue: Reducer.Fetcher.Value
let initialRegion: DatabaseRegion
(fetchedValue, initialRegion) = try initialFetchTransaction.read { db in
switch trackingMode {
case let .constantRegion(regions):
let fetchedValue = try databaseAccess.fetch(db)
Expand Down Expand Up @@ -354,7 +356,7 @@ extension ValueConcurrentObserver {
// `DatabasePool.asyncWALSnapshotTransaction` has to be used.
initialFetchTransaction.asyncRead { dbResult in
do {
let fetchedValue: Reducer.Fetched
let fetchedValue: Reducer.Fetcher.Value
let initialRegion: DatabaseRegion
let db = try dbResult.get()

Expand Down Expand Up @@ -461,7 +463,7 @@ extension ValueConcurrentObserver {
events.databaseDidChange?()

// Fetch
let fetchedValue: Reducer.Fetched
let fetchedValue: Reducer.Fetcher.Value

switch self.trackingMode {
case .constantRegion:
Expand Down Expand Up @@ -538,7 +540,9 @@ extension ValueConcurrentObserver {
// for observing the database is to be able to fetch the initial value
// without having to wait for an eventual long-running write
// transaction to complete.
let (fetchedValue, initialRegion) = try databaseAccess.dbPool.read { db -> (Reducer.Fetched, DatabaseRegion) in
let fetchedValue: Reducer.Fetcher.Value
let initialRegion: DatabaseRegion
(fetchedValue, initialRegion) = try databaseAccess.dbPool.read { db in
switch trackingMode {
case let .constantRegion(regions):
let fetchedValue = try databaseAccess.fetch(db)
Expand Down Expand Up @@ -583,7 +587,7 @@ extension ValueConcurrentObserver {

do {
// Fetch
let fetchedValue: Reducer.Fetched
let fetchedValue: Reducer.Fetcher.Value
let initialRegion: DatabaseRegion
let db = try dbResult.get()
switch self.trackingMode {
Expand Down Expand Up @@ -643,7 +647,7 @@ extension ValueConcurrentObserver {
do {
try writerDB.isolated(readOnly: true) {
// Fetch
let fetchedValue: Reducer.Fetched
let fetchedValue: Reducer.Fetcher.Value
let observedRegion: DatabaseRegion
switch self.trackingMode {
case .constantRegion:
Expand Down Expand Up @@ -822,7 +826,7 @@ extension ValueConcurrentObserver: TransactionObserver {
}
}

private func reduce(_ fetchResult: Result<Reducer.Fetched, Error>) {
private func reduce(_ fetchResult: Result<Reducer.Fetcher.Value, Error>) {
reduceQueue.async {
do {
let fetchedValue = try fetchResult.get()
Expand Down
24 changes: 12 additions & 12 deletions GRDB/ValueObservation/Observers/ValueWriteOnlyObserver.swift
Original file line number Diff line number Diff line change
Expand Up @@ -83,26 +83,26 @@ final class ValueWriteOnlyObserver<
/// If true, database values are fetched from a read-only access.
private let readOnly: Bool

/// A reducer that fetches database values.
private let reducer: Reducer
/// The fetcher that fetches database values.
private let fetcher: Reducer.Fetcher

init(writer: Writer, readOnly: Bool, reducer: Reducer) {
init(writer: Writer, readOnly: Bool, fetcher: Reducer.Fetcher) {
self.writer = writer
self.readOnly = readOnly
self.reducer = reducer
self.fetcher = fetcher
}

func fetch(_ db: Database) throws -> Reducer.Fetched {
func fetch(_ db: Database) throws -> Reducer.Fetcher.Value {
try db.isolated(readOnly: readOnly) {
try reducer._fetch(db)
try fetcher.fetch(db)
}
}

func fetchRecordingObservedRegion(_ db: Database) throws -> (Reducer.Fetched, DatabaseRegion) {
func fetchRecordingObservedRegion(_ db: Database) throws -> (Reducer.Fetcher.Value, DatabaseRegion) {
var region = DatabaseRegion()
let fetchedValue = try db.isolated(readOnly: readOnly) {
try db.recordingSelection(&region) {
try reducer._fetch(db)
try fetcher.fetch(db)
}
}
return try (fetchedValue, region.observableRegion(db))
Expand Down Expand Up @@ -160,9 +160,9 @@ final class ValueWriteOnlyObserver<
self.databaseAccess = DatabaseAccess(
writer: writer,
readOnly: readOnly,
// ValueReducer semantics guarantees that reducer._fetch
// ValueReducer semantics guarantees that the fetcher
// is independent from the reducer state
reducer: reducer)
fetcher: reducer._makeFetcher())
self.notificationCallbacks = NotificationCallbacks(events: events, onChange: onChange)
self.reducer = reducer
self.reduceQueue = DispatchQueue(
Expand Down Expand Up @@ -301,7 +301,7 @@ extension ValueWriteOnlyObserver {
/// By grouping the initial fetch and the beginning of observation in a
/// single database access, we are sure that no concurrent write can happen
/// during the initial fetch, and that we won't miss any future change.
private func fetchAndStartObservation(_ db: Database) throws -> Reducer.Fetched? {
private func fetchAndStartObservation(_ db: Database) throws -> Reducer.Fetcher.Value? {
let (events, databaseAccess) = lock.synchronized {
(notificationCallbacks?.events, self.databaseAccess)
}
Expand Down Expand Up @@ -380,7 +380,7 @@ extension ValueWriteOnlyObserver: TransactionObserver {

do {
// Fetch
let fetchedValue: Reducer.Fetched
let fetchedValue: Reducer.Fetcher.Value

switch trackingMode {
case .constantRegion, .constantRegionRecordedFromSelection:
Expand Down
20 changes: 14 additions & 6 deletions GRDB/ValueObservation/Reducers/Fetch.swift
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
extension ValueReducers {
/// A `ValueReducer` that perform database fetches.
public struct Fetch<Value>: ValueReducer {
private let __fetch: (Database) throws -> Value
public struct _Fetcher: _ValueReducerFetcher {
let _fetch: @Sendable (Database) throws -> Value

public func fetch(_ db: Database) throws -> Value {
assert(db.isInsideTransaction, "Fetching in a non-isolated way is illegal")
return try _fetch(db)
}
}

private let _fetch: @Sendable (Database) throws -> Value

/// Creates a reducer which passes raw fetched values through.
init(fetch: @escaping (Database) throws -> Value) {
self.__fetch = fetch
init(fetch: @escaping @Sendable (Database) throws -> Value) {
self._fetch = fetch
}

public func _fetch(_ db: Database) throws -> Value {
assert(db.isInsideTransaction, "Fetching in a non-isolated way is illegal")
return try __fetch(db)
public func _makeFetcher() -> _Fetcher {
_Fetcher(_fetch: _fetch)
}

public func _value(_ fetched: Value) -> Value? {
Expand Down
16 changes: 7 additions & 9 deletions GRDB/ValueObservation/Reducers/Map.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ extension ValueObservation {
}

extension ValueReducers {
/// A `ValueReducer` whose values consist of those in a `Base` reduced
/// A `ValueReducer` whose values consist of those in a `Base` reducer
/// passed through a transform function.
///
/// See ``ValueObservation/map(_:)``.
public struct Map<Base: _ValueReducer, Value>: _ValueReducer {
public struct Map<Base: _ValueReducer, Value>: ValueReducer {
private var base: Base
private let transform: (Base.Value) throws -> Value

Expand All @@ -39,15 +39,13 @@ extension ValueReducers {
self.transform = transform
}

public mutating func _value(_ fetched: Base.Fetched) throws -> Value? {
public func _makeFetcher() -> Base.Fetcher {
base._makeFetcher()
}

public mutating func _value(_ fetched: Base.Fetcher.Value) throws -> Value? {
guard let value = try base._value(fetched) else { return nil }
return try transform(value)
}
}
}

extension ValueReducers.Map: ValueReducer where Base: ValueReducer {
public func _fetch(_ db: Database) throws -> Base.Fetched {
try base._fetch(db)
}
}
14 changes: 6 additions & 8 deletions GRDB/ValueObservation/Reducers/RemoveDuplicates.swift
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ extension ValueReducers {
/// previously observed value.
///
/// See ``ValueObservation/removeDuplicates()``.
public struct RemoveDuplicates<Base: _ValueReducer>: _ValueReducer {
public struct RemoveDuplicates<Base: ValueReducer>: ValueReducer {
private var base: Base
private var previousValue: Base.Value?
private var predicate: (Base.Value, Base.Value) -> Bool
Expand All @@ -77,7 +77,11 @@ extension ValueReducers {
self.predicate = predicate
}

public mutating func _value(_ fetched: Base.Fetched) throws -> Base.Value? {
public func _makeFetcher() -> Base.Fetcher {
base._makeFetcher()
}

public mutating func _value(_ fetched: Base.Fetcher.Value) throws -> Base.Value? {
guard let value = try base._value(fetched) else {
return nil
}
Expand All @@ -90,9 +94,3 @@ extension ValueReducers {
}
}
}

extension ValueReducers.RemoveDuplicates: ValueReducer where Base: ValueReducer {
public func _fetch(_ db: Database) throws -> Base.Fetched {
try base._fetch(db)
}
}
27 changes: 17 additions & 10 deletions GRDB/ValueObservation/Reducers/Trace.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,26 @@ extension ValueReducers {
///
/// See ``ValueObservation/handleEvents(willStart:willFetch:willTrackRegion:databaseDidChange:didReceiveValue:didFail:didCancel:)``
/// and ``ValueObservation/print(_:to:)``.
public struct Trace<Base: _ValueReducer>: _ValueReducer {
public struct Trace<Base: ValueReducer>: ValueReducer {
public struct _Fetcher: _ValueReducerFetcher {
let base: Base.Fetcher
let willFetch: @Sendable () -> Void

public func fetch(_ db: Database) throws -> Base.Fetcher.Value {
willFetch()
return try base.fetch(db)
}
}

var base: Base
let willFetch: () -> Void
let willFetch: @Sendable () -> Void
let didReceiveValue: (Base.Value) -> Void

public mutating func _value(_ fetched: Base.Fetched) throws -> Base.Value? {
public func _makeFetcher() -> _Fetcher {
_Fetcher(base: base._makeFetcher(), willFetch: willFetch)
}

public mutating func _value(_ fetched: Base.Fetcher.Value) throws -> Base.Value? {
guard let value = try base._value(fetched) else {
return nil
}
Expand All @@ -19,10 +33,3 @@ extension ValueReducers {
}
// swiftlint:enable line_length
}

extension ValueReducers.Trace: ValueReducer where Base: ValueReducer {
public func _fetch(_ db: Database) throws -> Base.Fetched {
willFetch()
return try base._fetch(db)
}
}
42 changes: 32 additions & 10 deletions GRDB/ValueObservation/Reducers/ValueReducer.swift
Original file line number Diff line number Diff line change
@@ -1,11 +1,28 @@
// A `ValueReducer` fetches and transforms the database values
// observed by a ``ValueObservation``.
//
// It is NOT Sendable, because we need `ValueReducers.RemoveDuplicates` to
// be able to call `Equatable.==`, which IS not a Sendable function.
// Thread-safety will be assured by `ValueObservation`, which will make sure
// it does not invoke the reducer concurrently.
//
// However, we need to be able to fetch from any database dispatch queue,
// and maybe concurrently. That's why a `ValueReducer` has a Sendable facet,
// which is its `Fetcher`.

/// Implementation details of `ValueReducer`.
public protocol _ValueReducer {
/// The type of fetched database values
associatedtype Fetched
/// The Sendable type that fetches database values
associatedtype Fetcher: _ValueReducerFetcher

/// The type of observed values
associatedtype Value

/// Returns a value that fetches database values upon changes in an
/// observed database region. The returned value method must not depend
/// on the state of the reducer.
func _makeFetcher() -> Fetcher

/// Transforms a fetched value into an eventual observed value. Returns nil
/// when observer should not be notified.
///
Expand All @@ -18,25 +35,30 @@ public protocol _ValueReducer {
/// reducer._value(...) // MUST NOT be nil
/// reducer._value(...) // MAY be nil
/// reducer._value(...) // MAY be nil
mutating func _value(_ fetched: Fetched) throws -> Value?
mutating func _value(_ fetched: Fetcher.Value) throws -> Value?
}

public protocol _ValueReducerFetcher: Sendable {
/// The type of fetched database values
associatedtype Value

func fetch(_ db: Database) throws -> Value
}

/// `ValueReducer` supports ``ValueObservation``.
///
/// A `ValueReducer` fetches and transforms the database values
/// observed by a ``ValueObservation``.
///
/// Do not declare new conformances to `ValueReducer`. Only the built-in
/// conforming types are valid.
///
/// ## Topics
///
/// ### Support
/// ### Supporting Types
///
/// - ``ValueReducers``
public protocol ValueReducer: _ValueReducer {
/// Fetches database values upon changes in an observed database region.
///
/// This method must does not depend on the state of the reducer.
func _fetch(_ db: Database) throws -> Fetched
}
public protocol ValueReducer: _ValueReducer { }

/// A namespace for concrete types that adopt the ``ValueReducer`` protocol.
public enum ValueReducers { }
Loading

0 comments on commit 849bba2

Please sign in to comment.