Skip to content

Commit

Permalink
fix(Core): TaskQueue async execution (#3611)
Browse files Browse the repository at this point in the history
* fix(Core): TaskQueue<Success> async execution is not serial #3556 (#3557)

* fix(Core): make TaskQueue take async task throwable

* fix(core): resolve swift lint warnings

---------

Co-authored-by: Tomasz Trela <[email protected]>
  • Loading branch information
5d and MuniekMg authored Apr 16, 2024
1 parent dc32a9d commit 4eecd94
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 11 deletions.
51 changes: 40 additions & 11 deletions Amplify/Core/Support/TaskQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,25 @@
import Foundation

/// A helper for executing asynchronous work serially.
public actor TaskQueue<Success> {
private var previousTask: Task<Success, Error>?
public class TaskQueue<Success> {
typealias Block = @Sendable () async -> Void
private var streamContinuation: AsyncStream<Block>.Continuation!

public init() {}
public init() {
let stream = AsyncStream<Block>.init { continuation in
streamContinuation = continuation
}

Task {
for await block in stream {
_ = await block()
}
}
}

deinit {
streamContinuation.finish()
}

/// Serializes asynchronous requests made from an async context
///
Expand All @@ -25,17 +40,31 @@ public actor TaskQueue<Success> {
/// TaskQueue serializes this work so that `doAsync1` is performed before `doAsync2`,
/// which is performed before `doAsync3`.
public func sync(block: @Sendable @escaping () async throws -> Success) async throws -> Success {
let currentTask: Task<Success, Error> = Task { [previousTask] in
_ = await previousTask?.result
return try await block()
try await withCheckedThrowingContinuation { continuation in
streamContinuation.yield {
do {
let value = try await block()
continuation.resume(returning: value)
} catch {
continuation.resume(throwing: error)
}
}
}
previousTask = currentTask
return try await currentTask.value
}

public nonisolated func async(block: @Sendable @escaping () async throws -> Success) rethrows {
Task {
try await sync(block: block)
public func async(block: @Sendable @escaping () async throws -> Success) {
streamContinuation.yield {
do {
_ = try await block()
} catch {
Self.log.warn("Failed to handle async task in TaskQueue<\(Success.self)> with error: \(error)")
}
}
}
}

extension TaskQueue {
public static var log: Logger {
Amplify.Logging.logger(forNamespace: String(describing: self))
}
}
16 changes: 16 additions & 0 deletions AmplifyTests/CoreTests/AmplifyTaskQueueTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,20 @@ final class AmplifyTaskQueueTests: XCTestCase {
await fulfillment(of: [expectation1, expectation2, expectation3], enforceOrder: true)
}

func testAsync() async throws {
let taskCount = 1_000
let expectations: [XCTestExpectation] = (0..<taskCount).map {
expectation(description: "Expected execution of a task number \($0)")
}

let taskQueue = TaskQueue<Void>()

for i in 0..<taskCount {
taskQueue.async {
expectations[i].fulfill()
}
}

await fulfillment(of: expectations, enforceOrder: true)
}
}

0 comments on commit 4eecd94

Please sign in to comment.