From e51aee86304b6a55b018315a2cd3b2bda0d84e3a Mon Sep 17 00:00:00 2001 From: Di Wu Date: Mon, 27 May 2024 17:50:47 +0000 Subject: [PATCH] fix(datastore): change OutgoingMutationQueue use TaskQueue for state transitions (#3720) * fix(datastore): change OutgoingMutationQueue use TaskQueue for state transition * Update AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift Co-authored-by: Michael Law <1365977+lawmicha@users.noreply.github.com> --------- Co-authored-by: Michael Law <1365977+lawmicha@users.noreply.github.com> --- .../OutgoingMutationQueue/OutgoingMutationQueue.swift | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift index 26cde77852..d517b170af 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift @@ -27,10 +27,7 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior { private let operationQueue: OperationQueue /// A DispatchQueue for synchronizing state on the mutation queue - private let mutationDispatchQueue = DispatchQueue( - label: "com.amazonaws.OutgoingMutationQueue", - target: DispatchQueue.global() - ) + private let mutationDispatchQueue = TaskQueue() private weak var api: APICategoryGraphQLBehaviorExtended? private weak var reconciliationQueue: IncomingEventReconciliationQueue? @@ -55,7 +52,7 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior { let operationQueue = OperationQueue() operationQueue.name = "com.amazonaws.OutgoingMutationOperationQueue" - operationQueue.underlyingQueue = mutationDispatchQueue + operationQueue.qualityOfService = .default operationQueue.maxConcurrentOperationCount = 1 operationQueue.isSuspended = true @@ -139,6 +136,10 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior { queryMutationEventsFromStorage { [weak self] in guard let self = self else { return } + guard case .starting = self.stateMachine.state else { + self.log.debug("Unexpected state transition while performing `doStart()` during `.starting` state. Current state: \(self.stateMachine.state).") + return + } self.operationQueue.isSuspended = false // State machine notification to ".receivedSubscription" will be handled in `receive(subscription:)`