From f1d354c638fca3da417ab07cd595835355d0224f Mon Sep 17 00:00:00 2001 From: Michael Law <1365977+lawmicha@users.noreply.github.com> Date: Tue, 20 Feb 2024 14:50:13 -0500 Subject: [PATCH] fix(DataStore): improve MutationEvent resiliency to interruptions (#3492) * fix(DataStore): save and syncMutation under transaction * fix(DataStore): MutationEvent dequeue include inProcess true events * fix(DataStore): throw on missing syncEngine to rollback save * AWSMutationDatabaseAdapter.getNextMutationEvent doc comment and tests * unit tests for save transaction * Add integration test * address PR comments * fix test --- .../SQLite/StorageEngineAdapter+SQLite.swift | 68 +++-- .../Storage/StorageEngine.swift | 56 +++-- .../Storage/StorageEngineAdapter.swift | 5 + ...nDatabaseAdapter+MutationEventSource.swift | 61 +++-- ...torageEngineTestsPostComment4V2Tests.swift | 42 ++++ .../AWSMutationDatabaseAdapterTests.swift | 167 +++++++++++- .../MockSQLiteStorageEngineAdapter.swift | 8 + ...oreLazyLoadPostComment4V2StressTests.swift | 238 ++++++++++++++++++ .../AWSDataStoreLazyLoadBaseTest.swift | 15 +- .../project.pbxproj | 4 + 10 files changed, 589 insertions(+), 75 deletions(-) create mode 100644 AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginLazyLoadTests/LL1/AWSDataStoreLazyLoadPostComment4V2StressTests.swift diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/SQLite/StorageEngineAdapter+SQLite.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/SQLite/StorageEngineAdapter+SQLite.swift index ae3549bf81..d01c94bacb 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/SQLite/StorageEngineAdapter+SQLite.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/SQLite/StorageEngineAdapter+SQLite.swift @@ -148,9 +148,18 @@ final class SQLiteStorageEngineAdapter: StorageEngineAdapter { condition: QueryPredicate? = nil, eagerLoad: Bool = true, completion: DataStoreCallback) { + completion(save(model, + modelSchema: modelSchema, + condition: condition, + eagerLoad: eagerLoad)) + } + + func save(_ model: M, + modelSchema: ModelSchema, + condition: QueryPredicate? = nil, + eagerLoad: Bool = true) -> DataStoreResult { guard let connection = connection else { - completion(.failure(DataStoreError.nilSQLiteConnection())) - return + return .failure(DataStoreError.nilSQLiteConnection()) } do { let modelType = type(of: model) @@ -162,8 +171,7 @@ final class SQLiteStorageEngineAdapter: StorageEngineAdapter { let dataStoreError = DataStoreError.invalidCondition( "Cannot apply a condition on model which does not exist.", "Save the model instance without a condition first.") - completion(.failure(causedBy: dataStoreError)) - return + return .failure(causedBy: dataStoreError) } let statement = InsertStatement(model: model, modelSchema: modelSchema) @@ -179,9 +187,7 @@ final class SQLiteStorageEngineAdapter: StorageEngineAdapter { let dataStoreError = DataStoreError.invalidCondition( "Save failed due to condition did not match existing model instance.", "The save will continue to fail until the model instance is updated.") - completion(.failure(causedBy: dataStoreError)) - - return + return .failure(causedBy: dataStoreError) } } @@ -192,23 +198,22 @@ final class SQLiteStorageEngineAdapter: StorageEngineAdapter { } // load the recent saved instance and pass it back to the callback - query(modelType, modelSchema: modelSchema, - predicate: model.identifier(schema: modelSchema).predicate, - eagerLoad: eagerLoad) { - switch $0 { - case .success(let result): - if let saved = result.first { - completion(.success(saved)) - } else { - completion(.failure(.nonUniqueResult(model: modelType.modelName, - count: result.count))) - } - case .failure(let error): - completion(.failure(error)) + let queryResult = query(modelType, modelSchema: modelSchema, + predicate: model.identifier(schema: modelSchema).predicate, + eagerLoad: eagerLoad) + switch queryResult { + case .success(let result): + if let saved = result.first { + return .success(saved) + } else { + return .failure(.nonUniqueResult(model: modelType.modelName, + count: result.count)) } + case .failure(let error): + return .failure(error) } } catch { - completion(.failure(causedBy: error)) + return .failure(causedBy: error) } } @@ -321,9 +326,22 @@ final class SQLiteStorageEngineAdapter: StorageEngineAdapter { paginationInput: QueryPaginationInput? = nil, eagerLoad: Bool = true, completion: DataStoreCallback<[M]>) { + completion(query(modelType, + modelSchema: modelSchema, + predicate: predicate, + sort: sort, + paginationInput: paginationInput, + eagerLoad: eagerLoad)) + } + + private func query(_ modelType: M.Type, + modelSchema: ModelSchema, + predicate: QueryPredicate? = nil, + sort: [QuerySortDescriptor]? = nil, + paginationInput: QueryPaginationInput? = nil, + eagerLoad: Bool = true) -> DataStoreResult<[M]> { guard let connection = connection else { - completion(.failure(DataStoreError.nilSQLiteConnection())) - return + return .failure(DataStoreError.nilSQLiteConnection()) } do { let statement = SelectStatement(from: modelSchema, @@ -336,9 +354,9 @@ final class SQLiteStorageEngineAdapter: StorageEngineAdapter { withSchema: modelSchema, using: statement, eagerLoad: eagerLoad) - completion(.success(result)) + return .success(result) } catch { - completion(.failure(causedBy: error)) + return .failure(causedBy: error) } } diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/StorageEngine.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/StorageEngine.swift index 0ea14ad64b..078bf60624 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/StorageEngine.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/StorageEngine.swift @@ -208,31 +208,41 @@ final class StorageEngine: StorageEngineBehavior { completion(.failure(causedBy: dataStoreError)) } - let wrappedCompletion: DataStoreCallback = { result in - guard modelSchema.isSyncable, let syncEngine = self.syncEngine else { - completion(result) - return - } - - guard case .success(let savedModel) = result else { - completion(result) - return + do { + try storageAdapter.transaction { + let result = self.storageAdapter.save(model, + modelSchema: modelSchema, + condition: condition, + eagerLoad: eagerLoad) + guard modelSchema.isSyncable else { + completion(result) + return + } + + guard case .success(let savedModel) = result else { + completion(result) + return + } + + guard let syncEngine else { + let message = "No SyncEngine available to sync mutation event, rollback save." + self.log.verbose("\(#function) \(message) : \(savedModel)") + throw DataStoreError.internalOperation( + message, + "`DataStore.save()` was interrupted. `DataStore.stop()` may have been called.", + nil) + } + self.log.verbose("\(#function) syncing mutation for \(savedModel)") + self.syncMutation(of: savedModel, + modelSchema: modelSchema, + mutationType: mutationType, + predicate: condition, + syncEngine: syncEngine, + completion: completion) } - - self.log.verbose("\(#function) syncing mutation for \(savedModel)") - self.syncMutation(of: savedModel, - modelSchema: modelSchema, - mutationType: mutationType, - predicate: condition, - syncEngine: syncEngine, - completion: completion) + } catch { + completion(.failure(causedBy: error)) } - - storageAdapter.save(model, - modelSchema: modelSchema, - condition: condition, - eagerLoad: eagerLoad, - completion: wrappedCompletion) } func save(_ model: M, diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/StorageEngineAdapter.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/StorageEngineAdapter.swift index b911f0a1c3..191df26ebc 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/StorageEngineAdapter.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/StorageEngineAdapter.swift @@ -34,6 +34,11 @@ protocol StorageEngineAdapter: AnyObject, ModelStorageBehavior, ModelStorageErro // MARK: - Synchronous APIs + func save(_ model: M, + modelSchema: ModelSchema, + condition: QueryPredicate?, + eagerLoad: Bool) -> DataStoreResult + func exists(_ modelSchema: ModelSchema, withIdentifier id: ModelIdentifierProtocol, predicate: QueryPredicate?) throws -> Bool diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/AWSMutationDatabaseAdapter/AWSMutationDatabaseAdapter+MutationEventSource.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/AWSMutationDatabaseAdapter/AWSMutationDatabaseAdapter+MutationEventSource.swift index 0ed3b5c0b8..cdd83055a6 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/AWSMutationDatabaseAdapter/AWSMutationDatabaseAdapter+MutationEventSource.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/AWSMutationDatabaseAdapter/AWSMutationDatabaseAdapter+MutationEventSource.swift @@ -9,6 +9,24 @@ import Amplify import Combine extension AWSMutationDatabaseAdapter: MutationEventSource { + + /// DataStore implements a FIFO queue of MutationEvents by using the local database + /// and querying for the earliest MutationEvent by its `createdAt` field. + /// + /// **Note**: In a previous revision of this code, this query used to filter on `InProcess` == `false` MutationEvents. + /// This was to skip over already in-flight mutation events and grab the next one. However, it was observed in highly + /// concurrent calls to `DataStore.start()` /`stop()` / `save()` that it will interrupt the + /// **OutgoingMutationQueue** of processing and deleting a **MutationEvent** . `DataStore.start()`, + /// which starts the remote sync engine, should perform a step to move all `InProcess` **MutationEvents** back + /// to false, however there's a timing issue that is difficult to pinpoint. **OutgoingMutationQueue**'s query manages + /// to pick up the second MutationEvent in the queue and sends it off, while the first one that is marked as `inProcess` + /// isn't being processed, likely that process was already cancelled. The query below was updated to always dequeue the + /// first regardless of `InProcess` in the [PR #3492](https://github.com/aws-amplify/amplify-swift/pull/3492). + /// By removing the filter, there is a small chance that the same event may be sent twice. Sending the event twice is idempotent + /// and the second response will be `ConditionalCheckFailed`. The `InProcess` flag is still needed for the + /// handling consecutive update scenarios. + /// + /// - Parameter completion: The first MutationEvent in the FIFO queue. func getNextMutationEvent(completion: @escaping DataStoreCallback) { log.verbose(#function) @@ -16,27 +34,30 @@ extension AWSMutationDatabaseAdapter: MutationEventSource { completion(.failure(DataStoreError.nilStorageAdapter())) return } - - let fields = MutationEvent.keys - let predicate = fields.inProcess == false || fields.inProcess == nil let sort = QuerySortDescriptor(fieldName: MutationEvent.keys.createdAt.stringValue, order: .ascending) - storageAdapter.query(MutationEvent.self, - predicate: predicate, - sort: [sort], - paginationInput: nil, - eagerLoad: true) { result in - switch result { - case .failure(let dataStoreError): - completion(.failure(dataStoreError)) - case .success(let mutationEvents): - guard let notInProcessEvent = mutationEvents.first else { - self.nextEventPromise.set(completion) - return - } - self.markInProcess(mutationEvent: notInProcessEvent, - storageAdapter: storageAdapter, - completion: completion) - } + storageAdapter.query( + MutationEvent.self, + predicate: nil, + sort: [sort], + paginationInput: nil, + eagerLoad: true) { result in + switch result { + case .failure(let dataStoreError): + completion(.failure(dataStoreError)) + case .success(let mutationEvents): + guard let mutationEvent = mutationEvents.first else { + self.nextEventPromise.set(completion) + return + } + if mutationEvent.inProcess { + log.verbose("The head of the MutationEvent queue was already inProcess (most likely interrupted process): \(mutationEvent)") + completion(.success(mutationEvent)) + } else { + self.markInProcess(mutationEvent: mutationEvent, + storageAdapter: storageAdapter, + completion: completion) + } + } } } diff --git a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Storage/StorageEngineTestsPostComment4V2Tests.swift b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Storage/StorageEngineTestsPostComment4V2Tests.swift index d1d8ffc064..25db881a49 100644 --- a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Storage/StorageEngineTestsPostComment4V2Tests.swift +++ b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Storage/StorageEngineTestsPostComment4V2Tests.swift @@ -50,6 +50,48 @@ final class StorageEngineTestsPostComment4V2Tests: StorageEngineTestsBase, Share } } + func testSavePostAndSyncSuccess() async throws { + let receivedMutationEvent = expectation(description: "Mutation Events submitted to sync engine") + let expectedSuccess = expectation(description: "Simulated success on mutation event submitted to sync engine") + let post = ParentPost4V2( + id: "postId1", + title: "title1") + + syncEngine.setCallbackOnSubmit { submittedMutationEvent, completion in + receivedMutationEvent.fulfill() + if submittedMutationEvent.modelId == post.id { + expectedSuccess.fulfill() + completion(.success(submittedMutationEvent)) + } else { + XCTFail("Unexpected submitted MutationEvent \(submittedMutationEvent)") + completion(.failure(.internalOperation("mockError", "", nil))) + } + } + try await saveAsync(post) + await fulfillment(of: [receivedMutationEvent, expectedSuccess], timeout: 1) + + } + + /// A save should fail if the corresponding MutationEvent could not be submitted to the syncEngine. + func testSavePostFailDueToSyncEngineMissing() async throws { + storageEngine.syncEngine = nil + do { + try await saveAsync( + ParentPost4V2( + id: "postId1", + title: "title1")) + XCTFail("Expected to fail when sync engine is `nil`") + } catch { + guard let dataStoreError = error as? DataStoreError else { + XCTFail("Unexpected type of error \(error)") + return + } + XCTAssertEqual( + dataStoreError.errorDescription, + "No SyncEngine available to sync mutation event, rollback save.") + } + } + func testSaveCommentThenQueryComment() async throws { let comment = ChildComment4V2(content: "content") let savedComment = try await saveAsync(comment) diff --git a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/MutationQueue/AWSMutationDatabaseAdapterTests.swift b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/MutationQueue/AWSMutationDatabaseAdapterTests.swift index b24b499c3a..a8d8829f3c 100644 --- a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/MutationQueue/AWSMutationDatabaseAdapterTests.swift +++ b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/MutationQueue/AWSMutationDatabaseAdapterTests.swift @@ -6,6 +6,7 @@ // import Foundation +import SQLite import XCTest @testable import Amplify @@ -15,14 +16,14 @@ import AWSPluginsCore class AWSMutationDatabaseAdapterTests: XCTestCase { var databaseAdapter: AWSMutationDatabaseAdapter! - + var storageAdapter: MockSQLiteStorageEngineAdapter! let model1 = Post(title: "model1", content: "content1", createdAt: .now()) let post = Post.keys override func setUp() { do { - let mockStorageAdapter = MockSQLiteStorageEngineAdapter() - databaseAdapter = try AWSMutationDatabaseAdapter(storageAdapter: mockStorageAdapter) + storageAdapter = MockSQLiteStorageEngineAdapter() + databaseAdapter = try AWSMutationDatabaseAdapter(storageAdapter: storageAdapter) } catch { XCTFail("Failed to setup system under test") } @@ -148,6 +149,166 @@ class AWSMutationDatabaseAdapterTests: XCTestCase { XCTAssertEqual(disposition, .dropCandidateWithError(DataStoreError.unknown("", "", nil))) } + + /// Retrieve the first MutationEvent + func test_getNextMutationEvent_AlreadyInProcess() { + let queryExpectation = expectation(description: "query called") + let getMutationEventCompleted = expectation(description: "getNextMutationEvent completed") + var mutationEvent1 = MutationEvent(modelId: "1111-22", + modelName: "Post", + json: "{}", + mutationType: .create) + mutationEvent1.inProcess = true + let mutationEvent2 = MutationEvent(modelId: "1111-22", + modelName: "Post", + json: "{}", + mutationType: .create) + let queryResponder = QueryModelTypePredicateResponder { _, _ in + queryExpectation.fulfill() + return .success([mutationEvent1, mutationEvent2]) + } + storageAdapter.responders[.queryModelTypePredicate] = queryResponder + databaseAdapter.getNextMutationEvent { result in + switch result { + case .success(let mutationEvent): + XCTAssertTrue(mutationEvent.inProcess) + case .failure(let error): + XCTFail("Should have been successful result, error: \(error)") + } + getMutationEventCompleted.fulfill() + } + + waitForExpectations(timeout: 1) + } + + /// Retrieve the first MutationEvent + func test_getNextMutationEvent_MarkInProcess() { + let queryExpectation = expectation(description: "query called") + let getMutationEventCompleted = expectation(description: "getNextMutationEvent completed") + let mutationEvent1 = MutationEvent(modelId: "1111-22", + modelName: "Post", + json: "{}", + mutationType: .create) + XCTAssertFalse(mutationEvent1.inProcess) + let mutationEvent2 = MutationEvent(modelId: "1111-22", + modelName: "Post", + json: "{}", + mutationType: .create) + let queryResponder = QueryModelTypePredicateResponder { _, _ in + queryExpectation.fulfill() + return .success([mutationEvent1, mutationEvent2]) + } + storageAdapter.responders[.queryModelTypePredicate] = queryResponder + databaseAdapter.getNextMutationEvent { result in + switch result { + case .success(let mutationEvent): + XCTAssertTrue(mutationEvent.inProcess) + case .failure(let error): + XCTFail("Should have been successful result, error: \(error)") + } + + getMutationEventCompleted.fulfill() + } + + waitForExpectations(timeout: 1) + } + + /// This tests uses an in-memory SQLite connection to save and query mutation events. + /// + /// 1. First query will return `m2` since `createdAt`is the oldest, and marked `inProcess` true. + /// 2. The second query will also return `m2` since `inProcess` does not impact the results + /// 3. Delete `m2` from the storage + /// 4. The third query will return `m1` since `m2` was dequeued + func testGetNextMutationEvent_WithInMemoryStorage() throws { + let connection = try Connection(.inMemory) + let storageAdapter = try SQLiteStorageEngineAdapter(connection: connection) + try storageAdapter.setUp(modelSchemas: StorageEngine.systemModelSchemas) + let oldestCreatedAt = Temporal.DateTime.now().add(value: -1, to: .second) + let newerCreatedAt = Temporal.DateTime.now().add(value: 1, to: .second) + databaseAdapter.storageAdapter = storageAdapter + let m1 = MutationEvent(modelId: "m1", + modelName: "Post", + json: "{}", + mutationType: .create, + createdAt: newerCreatedAt, + inProcess: false) + let m2 = MutationEvent(modelId: "m2", + modelName: "Post", + json: "{}", + mutationType: .create, + createdAt: oldestCreatedAt, + inProcess: false) + let setUpM1 = storageAdapter.save(m1, modelSchema: MutationEvent.schema) + guard case .success = setUpM1 else { + XCTFail("Could not set up mutation event: \(m1)") + return + } + let setUpM2 = storageAdapter.save(m2, modelSchema: MutationEvent.schema) + guard case .success = setUpM2 else { + XCTFail("Could not set up mutation event: \(m2)") + return + } + + // (1) + let firstQueryCompleted = expectation(description: "getNextMutationEvent completed") + databaseAdapter.getNextMutationEvent { result in + switch result { + case .success(let mutationEvent): + XCTAssertTrue(mutationEvent.inProcess) + XCTAssertEqual(mutationEvent.id, m2.id) + case .failure(let error): + XCTFail("Should have been successful result, error: \(error)") + } + + firstQueryCompleted.fulfill() + } + + waitForExpectations(timeout: 1) + + // (2) + let secondQueryCompleted = expectation(description: "getNextMutationEvent completed") + databaseAdapter.getNextMutationEvent { result in + switch result { + case .success(let mutationEvent): + XCTAssertTrue(mutationEvent.inProcess) + XCTAssertEqual(mutationEvent.id, m2.id) + case .failure(let error): + XCTFail("Should have been successful result, error: \(error)") + } + + secondQueryCompleted.fulfill() + } + + waitForExpectations(timeout: 1) + + // (3) + storageAdapter.delete(MutationEvent.self, + modelSchema: MutationEvent.schema, + withId: m2.id) { result in + switch result { + case .success: + break + case .failure(let error): + XCTFail("Couldn't delete mutation event, error: \(error)") + } + } + + // (4) + let thirdQueryCompleted = expectation(description: "getNextMutationEvent completed") + databaseAdapter.getNextMutationEvent { result in + switch result { + case .success(let mutationEvent): + XCTAssertTrue(mutationEvent.inProcess) + XCTAssertEqual(mutationEvent.id, m1.id) + case .failure(let error): + XCTFail("Should have been successful result, error: \(error)") + } + + thirdQueryCompleted.fulfill() + } + + waitForExpectations(timeout: 1) + } } extension AWSMutationDatabaseAdapter.MutationDisposition: Equatable { diff --git a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/SubscriptionSync/Support/MockSQLiteStorageEngineAdapter.swift b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/SubscriptionSync/Support/MockSQLiteStorageEngineAdapter.swift index 5b5fb2d46a..dbdfdbf2a2 100644 --- a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/SubscriptionSync/Support/MockSQLiteStorageEngineAdapter.swift +++ b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/SubscriptionSync/Support/MockSQLiteStorageEngineAdapter.swift @@ -177,6 +177,14 @@ class MockSQLiteStorageEngineAdapter: StorageEngineAdapter { : completion(.success(model)) } + func save(_ model: M, + modelSchema: ModelSchema, + condition: QueryPredicate?, + eagerLoad: Bool) -> DataStoreResult { + XCTFail("Not yet implemented") + return .failure(.internalOperation("", "", nil)) + } + func save(_ model: M, modelSchema: ModelSchema, condition where: QueryPredicate?, diff --git a/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginLazyLoadTests/LL1/AWSDataStoreLazyLoadPostComment4V2StressTests.swift b/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginLazyLoadTests/LL1/AWSDataStoreLazyLoadPostComment4V2StressTests.swift new file mode 100644 index 0000000000..260534710f --- /dev/null +++ b/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginLazyLoadTests/LL1/AWSDataStoreLazyLoadPostComment4V2StressTests.swift @@ -0,0 +1,238 @@ +// +// Copyright Amazon.com Inc. or its affiliates. +// All Rights Reserved. +// +// SPDX-License-Identifier: Apache-2.0 +// + +import Foundation +import Combine +import XCTest + +import Amplify +import AWSPluginsCore +import AWSDataStorePlugin + +extension AWSDataStoreLazyLoadPostComment4V2Tests { + + static let loggingContext = "multiSaveWithInterruptions" + + /// Test performing save's and stop/start concurrently. + /// + /// This test was validated prior to [PR 3492](https://github.com/aws-amplify/amplify-swift/pull/3492) + /// and will fail. The failure will show up when the test asserts that a queried comment from AppSync should contain the associated + /// post, but comment's post is `nil`. See the PR for changes in adding transactional support for commiting the two writes (saving the model and + /// mutation event) and MutationEvent dequeuing logic. + /// + /// - Given: A set of models (post and comment) created and saved to DataStore. + /// - When: A detached task will interrupt DataStore by calling `DataStore.stop()`, + /// followed by restarting it (`DataStore.start()`), while saving comment and posts. + /// - Then: + /// - DataStore should sync data in the correct order of what was saved/submitted to it + /// - the post should be synced before the comment + /// - it should not skip over an item, ie. a comment saved but post is missing. + /// - The remote store should contain all items synced + /// - comments and post should exist. + /// - the comment should also have the post reference. + /// + func testMultiSaveWithInterruptions() async throws { + await setup(withModels: PostComment4V2Models()) + let amplify = AmplifyTestExecutor() + + Amplify.Logging.info("[\(AWSDataStoreLazyLoadPostComment4V2Tests.loggingContext)] Begin saving data with interruptions") + let savesSyncedExpectation = expectation(description: "Outbox is empty after saving (with interruptions)") + savesSyncedExpectation.assertForOverFulfill = false + try await amplify.multipleSavesWithInterruptions(savesSyncedExpectation) + await fulfillment(of: [savesSyncedExpectation], timeout: 120) + + Amplify.Logging.info("[\(AWSDataStoreLazyLoadPostComment4V2Tests.loggingContext)] Outbox is empty, begin asserting data") + let savedModels = await amplify.savedModels + for savedModel in savedModels { + let savedComment = savedModel.0 + let savedPost = savedModel.1 + + try await assertQueryComment(savedComment, post: savedPost) + try await assertQueryPost(savedPost) + } + Amplify.Logging.info("[\(AWSDataStoreLazyLoadPostComment4V2Tests.loggingContext)] All models match remote store, begin clean up.") + try await cleanUp(savedModels) + } + + actor AmplifyTestExecutor { + var savedModels = [(Comment, Post)]() + + /// The minimum number of iterations, through trial and error, found to reproduce the bug. + private let count = 15 + + /// `isOutboxEmpty` is used to return the flow back to the caller via fulfilling the `savesSyncedExpectation`. + /// By listening to the OutboxEvent after performing the operations, the last outboxEvent to be `true` while `index` + /// is the last index, will be when `savesSyncedExpectation` is fulfilled and returned execution back to the caller. + private var isOutboxEmpty = false + + private var index = 0 + private var subscribeToOutboxEventTask: Task? + private var outboxEventsCount = 0 + + /// Perform saving the comment/post in one detached task while another detached task will + /// perform the interruption (stop/start). Repeat with a bit of delay to allow DataStore some + /// time to kick off its start sequence- this will always be the case since the last operation of + /// each detached task is a `save` (implicit `start`) or a `start`. + func multipleSavesWithInterruptions(_ savesSyncedExpectation: XCTestExpectation) async throws { + subscribeToOutboxEvent(savesSyncedExpectation) + while isOutboxEmpty == false { + try await Task.sleep(seconds: 1) + } + for i in 0..