Skip to content

Commit

Permalink
fix(datastore): change OutgoingMutationQueue use TaskQueue for state …
Browse files Browse the repository at this point in the history
…transitions (#3720)

* fix(datastore): change OutgoingMutationQueue use TaskQueue for state transition

* Update AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift

Co-authored-by: Michael Law <[email protected]>

---------

Co-authored-by: Michael Law <[email protected]>
  • Loading branch information
5d and lawmicha authored May 27, 2024
1 parent 63b4c1b commit e51aee8
Showing 1 changed file with 6 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {
private let operationQueue: OperationQueue

/// A DispatchQueue for synchronizing state on the mutation queue
private let mutationDispatchQueue = DispatchQueue(
label: "com.amazonaws.OutgoingMutationQueue",
target: DispatchQueue.global()
)
private let mutationDispatchQueue = TaskQueue<Void>()

private weak var api: APICategoryGraphQLBehaviorExtended?
private weak var reconciliationQueue: IncomingEventReconciliationQueue?
Expand All @@ -55,7 +52,7 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {

let operationQueue = OperationQueue()
operationQueue.name = "com.amazonaws.OutgoingMutationOperationQueue"
operationQueue.underlyingQueue = mutationDispatchQueue
operationQueue.qualityOfService = .default
operationQueue.maxConcurrentOperationCount = 1
operationQueue.isSuspended = true

Expand Down Expand Up @@ -139,6 +136,10 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {

queryMutationEventsFromStorage { [weak self] in
guard let self = self else { return }
guard case .starting = self.stateMachine.state else {
self.log.debug("Unexpected state transition while performing `doStart()` during `.starting` state. Current state: \(self.stateMachine.state).")
return
}

self.operationQueue.isSuspended = false
// State machine notification to ".receivedSubscription" will be handled in `receive(subscription:)`
Expand Down

0 comments on commit e51aee8

Please sign in to comment.