Skip to content

Commit

Permalink
fix(DataStore): improve MutationEvent resiliency to interruptions (#3492
Browse files Browse the repository at this point in the history
)

* fix(DataStore): save and syncMutation under transaction

* fix(DataStore): MutationEvent dequeue include inProcess true events

* fix(DataStore): throw on missing syncEngine to rollback save

* AWSMutationDatabaseAdapter.getNextMutationEvent doc comment and tests

* unit tests for save transaction

* Add integration test

* address PR comments

* fix test
  • Loading branch information
lawmicha authored Feb 20, 2024
1 parent ccbce93 commit f1d354c
Show file tree
Hide file tree
Showing 10 changed files with 589 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,18 @@ final class SQLiteStorageEngineAdapter: StorageEngineAdapter {
condition: QueryPredicate? = nil,
eagerLoad: Bool = true,
completion: DataStoreCallback<M>) {
completion(save(model,
modelSchema: modelSchema,
condition: condition,
eagerLoad: eagerLoad))
}

func save<M: Model>(_ model: M,
modelSchema: ModelSchema,
condition: QueryPredicate? = nil,
eagerLoad: Bool = true) -> DataStoreResult<M> {
guard let connection = connection else {
completion(.failure(DataStoreError.nilSQLiteConnection()))
return
return .failure(DataStoreError.nilSQLiteConnection())
}
do {
let modelType = type(of: model)
Expand All @@ -162,8 +171,7 @@ final class SQLiteStorageEngineAdapter: StorageEngineAdapter {
let dataStoreError = DataStoreError.invalidCondition(
"Cannot apply a condition on model which does not exist.",
"Save the model instance without a condition first.")
completion(.failure(causedBy: dataStoreError))
return
return .failure(causedBy: dataStoreError)
}

let statement = InsertStatement(model: model, modelSchema: modelSchema)
Expand All @@ -179,9 +187,7 @@ final class SQLiteStorageEngineAdapter: StorageEngineAdapter {
let dataStoreError = DataStoreError.invalidCondition(
"Save failed due to condition did not match existing model instance.",
"The save will continue to fail until the model instance is updated.")
completion(.failure(causedBy: dataStoreError))

return
return .failure(causedBy: dataStoreError)
}
}

Expand All @@ -192,23 +198,22 @@ final class SQLiteStorageEngineAdapter: StorageEngineAdapter {
}

// load the recent saved instance and pass it back to the callback
query(modelType, modelSchema: modelSchema,
predicate: model.identifier(schema: modelSchema).predicate,
eagerLoad: eagerLoad) {
switch $0 {
case .success(let result):
if let saved = result.first {
completion(.success(saved))
} else {
completion(.failure(.nonUniqueResult(model: modelType.modelName,
count: result.count)))
}
case .failure(let error):
completion(.failure(error))
let queryResult = query(modelType, modelSchema: modelSchema,
predicate: model.identifier(schema: modelSchema).predicate,
eagerLoad: eagerLoad)
switch queryResult {
case .success(let result):
if let saved = result.first {
return .success(saved)
} else {
return .failure(.nonUniqueResult(model: modelType.modelName,
count: result.count))
}
case .failure(let error):
return .failure(error)
}
} catch {
completion(.failure(causedBy: error))
return .failure(causedBy: error)
}
}

Expand Down Expand Up @@ -321,9 +326,22 @@ final class SQLiteStorageEngineAdapter: StorageEngineAdapter {
paginationInput: QueryPaginationInput? = nil,
eagerLoad: Bool = true,
completion: DataStoreCallback<[M]>) {
completion(query(modelType,
modelSchema: modelSchema,
predicate: predicate,
sort: sort,
paginationInput: paginationInput,
eagerLoad: eagerLoad))
}

private func query<M: Model>(_ modelType: M.Type,
modelSchema: ModelSchema,
predicate: QueryPredicate? = nil,
sort: [QuerySortDescriptor]? = nil,
paginationInput: QueryPaginationInput? = nil,
eagerLoad: Bool = true) -> DataStoreResult<[M]> {
guard let connection = connection else {
completion(.failure(DataStoreError.nilSQLiteConnection()))
return
return .failure(DataStoreError.nilSQLiteConnection())
}
do {
let statement = SelectStatement(from: modelSchema,
Expand All @@ -336,9 +354,9 @@ final class SQLiteStorageEngineAdapter: StorageEngineAdapter {
withSchema: modelSchema,
using: statement,
eagerLoad: eagerLoad)
completion(.success(result))
return .success(result)
} catch {
completion(.failure(causedBy: error))
return .failure(causedBy: error)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,31 +208,41 @@ final class StorageEngine: StorageEngineBehavior {
completion(.failure(causedBy: dataStoreError))
}

let wrappedCompletion: DataStoreCallback<M> = { result in
guard modelSchema.isSyncable, let syncEngine = self.syncEngine else {
completion(result)
return
}

guard case .success(let savedModel) = result else {
completion(result)
return
do {
try storageAdapter.transaction {
let result = self.storageAdapter.save(model,
modelSchema: modelSchema,
condition: condition,
eagerLoad: eagerLoad)
guard modelSchema.isSyncable else {
completion(result)
return
}

guard case .success(let savedModel) = result else {
completion(result)
return
}

guard let syncEngine else {
let message = "No SyncEngine available to sync mutation event, rollback save."
self.log.verbose("\(#function) \(message) : \(savedModel)")
throw DataStoreError.internalOperation(
message,
"`DataStore.save()` was interrupted. `DataStore.stop()` may have been called.",
nil)
}
self.log.verbose("\(#function) syncing mutation for \(savedModel)")
self.syncMutation(of: savedModel,
modelSchema: modelSchema,
mutationType: mutationType,
predicate: condition,
syncEngine: syncEngine,
completion: completion)
}

self.log.verbose("\(#function) syncing mutation for \(savedModel)")
self.syncMutation(of: savedModel,
modelSchema: modelSchema,
mutationType: mutationType,
predicate: condition,
syncEngine: syncEngine,
completion: completion)
} catch {
completion(.failure(causedBy: error))
}

storageAdapter.save(model,
modelSchema: modelSchema,
condition: condition,
eagerLoad: eagerLoad,
completion: wrappedCompletion)
}

func save<M: Model>(_ model: M,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ protocol StorageEngineAdapter: AnyObject, ModelStorageBehavior, ModelStorageErro

// MARK: - Synchronous APIs

func save<M: Model>(_ model: M,
modelSchema: ModelSchema,
condition: QueryPredicate?,
eagerLoad: Bool) -> DataStoreResult<M>

func exists(_ modelSchema: ModelSchema,
withIdentifier id: ModelIdentifierProtocol,
predicate: QueryPredicate?) throws -> Bool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,34 +9,55 @@ import Amplify
import Combine

extension AWSMutationDatabaseAdapter: MutationEventSource {

/// DataStore implements a FIFO queue of MutationEvents by using the local database
/// and querying for the earliest MutationEvent by its `createdAt` field.
///
/// **Note**: In a previous revision of this code, this query used to filter on `InProcess` == `false` MutationEvents.
/// This was to skip over already in-flight mutation events and grab the next one. However, it was observed in highly
/// concurrent calls to `DataStore.start()` /`stop()` / `save()` that it will interrupt the
/// **OutgoingMutationQueue** of processing and deleting a **MutationEvent** . `DataStore.start()`,
/// which starts the remote sync engine, should perform a step to move all `InProcess` **MutationEvents** back
/// to false, however there's a timing issue that is difficult to pinpoint. **OutgoingMutationQueue**'s query manages
/// to pick up the second MutationEvent in the queue and sends it off, while the first one that is marked as `inProcess`
/// isn't being processed, likely that process was already cancelled. The query below was updated to always dequeue the
/// first regardless of `InProcess` in the [PR #3492](https://github.com/aws-amplify/amplify-swift/pull/3492).
/// By removing the filter, there is a small chance that the same event may be sent twice. Sending the event twice is idempotent
/// and the second response will be `ConditionalCheckFailed`. The `InProcess` flag is still needed for the
/// handling consecutive update scenarios.
///
/// - Parameter completion: The first MutationEvent in the FIFO queue.
func getNextMutationEvent(completion: @escaping DataStoreCallback<MutationEvent>) {
log.verbose(#function)

guard let storageAdapter = storageAdapter else {
completion(.failure(DataStoreError.nilStorageAdapter()))
return
}

let fields = MutationEvent.keys
let predicate = fields.inProcess == false || fields.inProcess == nil
let sort = QuerySortDescriptor(fieldName: MutationEvent.keys.createdAt.stringValue, order: .ascending)
storageAdapter.query(MutationEvent.self,
predicate: predicate,
sort: [sort],
paginationInput: nil,
eagerLoad: true) { result in
switch result {
case .failure(let dataStoreError):
completion(.failure(dataStoreError))
case .success(let mutationEvents):
guard let notInProcessEvent = mutationEvents.first else {
self.nextEventPromise.set(completion)
return
}
self.markInProcess(mutationEvent: notInProcessEvent,
storageAdapter: storageAdapter,
completion: completion)
}
storageAdapter.query(
MutationEvent.self,
predicate: nil,
sort: [sort],
paginationInput: nil,
eagerLoad: true) { result in
switch result {
case .failure(let dataStoreError):
completion(.failure(dataStoreError))
case .success(let mutationEvents):
guard let mutationEvent = mutationEvents.first else {
self.nextEventPromise.set(completion)
return
}
if mutationEvent.inProcess {
log.verbose("The head of the MutationEvent queue was already inProcess (most likely interrupted process): \(mutationEvent)")
completion(.success(mutationEvent))
} else {
self.markInProcess(mutationEvent: mutationEvent,
storageAdapter: storageAdapter,
completion: completion)
}
}

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,48 @@ final class StorageEngineTestsPostComment4V2Tests: StorageEngineTestsBase, Share
}
}

func testSavePostAndSyncSuccess() async throws {
let receivedMutationEvent = expectation(description: "Mutation Events submitted to sync engine")
let expectedSuccess = expectation(description: "Simulated success on mutation event submitted to sync engine")
let post = ParentPost4V2(
id: "postId1",
title: "title1")

syncEngine.setCallbackOnSubmit { submittedMutationEvent, completion in
receivedMutationEvent.fulfill()
if submittedMutationEvent.modelId == post.id {
expectedSuccess.fulfill()
completion(.success(submittedMutationEvent))
} else {
XCTFail("Unexpected submitted MutationEvent \(submittedMutationEvent)")
completion(.failure(.internalOperation("mockError", "", nil)))
}
}
try await saveAsync(post)
await fulfillment(of: [receivedMutationEvent, expectedSuccess], timeout: 1)

}

/// A save should fail if the corresponding MutationEvent could not be submitted to the syncEngine.
func testSavePostFailDueToSyncEngineMissing() async throws {
storageEngine.syncEngine = nil
do {
try await saveAsync(
ParentPost4V2(
id: "postId1",
title: "title1"))
XCTFail("Expected to fail when sync engine is `nil`")
} catch {
guard let dataStoreError = error as? DataStoreError else {
XCTFail("Unexpected type of error \(error)")
return
}
XCTAssertEqual(
dataStoreError.errorDescription,
"No SyncEngine available to sync mutation event, rollback save.")
}
}

func testSaveCommentThenQueryComment() async throws {
let comment = ChildComment4V2(content: "content")
let savedComment = try await saveAsync(comment)
Expand Down
Loading

0 comments on commit f1d354c

Please sign in to comment.