From 58d98d395e53ea974cf0b4f2dc98cd94af050110 Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Tue, 15 Nov 2022 17:40:14 +0900 Subject: [PATCH] +DistributedProgress - PoC implementation of a distributed progress reporter --- .gitignore | 1 + .../DistributedProgress.swift | 225 ++++++++++++++++++ .../DistributedProgressTests.swift | 165 +++++++++++++ 3 files changed, 391 insertions(+) create mode 100644 Sources/DistributedCluster/DistributedProgress/DistributedProgress.swift create mode 100644 Tests/DistributedClusterTests/DistributedProgressTests.swift diff --git a/.gitignore b/.gitignore index b791028c2..eaee00a37 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ *.orig *.app +.history Instruments/ActorInstruments/ActorInstruments.xcodeproj/xcuserdata Instruments/ActorInstruments/build/ diff --git a/Sources/DistributedCluster/DistributedProgress/DistributedProgress.swift b/Sources/DistributedCluster/DistributedProgress/DistributedProgress.swift new file mode 100644 index 000000000..29ecaa7ec --- /dev/null +++ b/Sources/DistributedCluster/DistributedProgress/DistributedProgress.swift @@ -0,0 +1,225 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Distributed +import DistributedActorsConcurrencyHelpers +import Logging + +public distributed actor DistributedProgress { + public typealias ActorSystem = ClusterSystem + lazy var log = Logger(actor: self) + + var step: Steps? + var subscribers: Set = [] + + public init(actorSystem: ActorSystem, steps: Steps.Type = Steps.self) { + self.actorSystem = actorSystem + } + + func to(step: Steps) async throws { + // TODO: checks that we don't move backwards... + log.notice("Move to step: \(step)") + self.step = step + + for sub in subscribers { + try await sub.currentStep(step) + } + + if step == Steps.allCases.reversed().first { + self.log.notice("Progress completed, clear subscribers.") + self.subscribers = [] + return + } + } + + distributed func subscribe(subscriber: Subscriber) async throws { + self.log.notice("Subscribed \(subscriber.id)...") + self.subscribers.insert(subscriber) + + if let step { + try await subscriber.currentStep(step) + } + } + + distributed actor ProgressSubscriber { + typealias ActorSystem = ClusterSystem + + /// Mutable box that we update as the progress proceeds remotely... + let box: Box + + init(box: Box, actorSystem: ActorSystem) { + self.actorSystem = actorSystem + self.box = box + } + + distributed func currentStep(_ step: Steps) { + self.box.updateStep(step) + } + } + + public final class Box: Codable { + public typealias Element = Steps + + let lock: Lock + private var currentStep: Steps? + + let source: DistributedProgress + let actorSystem: ClusterSystem + private var _sub: ProgressSubscriber? + + private var _nextCC: CheckedContinuation? + private var _completedCC: CheckedContinuation? + + public // FIXME: not public + init(source: DistributedProgress) { + self.source = source + self.actorSystem = source.actorSystem + self.lock = Lock() + self.currentStep = nil + } + + public init(from decoder: Decoder) throws { + let container = try decoder.singleValueContainer() + self.lock = Lock() + self.currentStep = nil + self.actorSystem = decoder.userInfo[.actorSystemKey] as! ClusterSystem + self.source = try container.decode(DistributedProgress.self) + } + + public func encode(to encoder: Encoder) throws { + var container = encoder.singleValueContainer() + try container.encode(self.source) + } + + /// Suspend until this ``DistributedProgress`` has reached its last, and final, "step". + public func completed() async throws { + if self.currentStep == Steps.last { + return + } + + try await ensureSubscription() + + await withCheckedContinuation { (cc: CheckedContinuation) in + self._completedCC = cc + } + } + + /// Suspend until this ``DistributedProgress`` receives a next "step". + public func nextStep() async throws -> Steps? { + if self.currentStep == Steps.last { + return nil // last step was already emitted + } + + try await ensureSubscription() + + return await withCheckedContinuation { (cc: CheckedContinuation) in + self._nextCC = cc + } + } + + func updateStep(_ step: Steps) { + self.lock.lock() + defer { self.lock.unlock() } + + self.currentStep = step + + if let onNext = _nextCC { + onNext.resume(returning: step) + } + + if step == Steps.last { + if let completed = _completedCC { + completed.resume() + } + } + } + + @discardableResult + private func ensureSubscription() async throws -> ProgressSubscriber { + self.lock.lock() + + if let sub = self._sub { + self.lock.unlock() + return sub + } else { + let sub = ProgressSubscriber(box: self, actorSystem: self.actorSystem) + self._sub = sub + self.lock.unlock() + + try await self.source.subscribe(subscriber: sub) + return sub + } + } + } +} + +// ==== ---------------------------------------------------------------------------------------------------------------- +// MARK: Progress AsyncSequence + +extension DistributedProgress.Box { + + public func steps(file: String = #file, line: UInt = #line) async throws -> DistributedProgressAsyncSequence { + try await self.ensureSubscription() + + return DistributedProgressAsyncSequence(box: self) + } +} + +public struct DistributedProgressAsyncSequence: AsyncSequence { + public typealias Element = Steps + + private let box: DistributedProgress.Box + + public init(box: DistributedProgress.Box) { + self.box = box + } + + public func makeAsyncIterator() -> AsyncIterator { + return AsyncIterator(box: self.box) + } + + public struct AsyncIterator: AsyncIteratorProtocol { + public typealias Element = Steps + let box: DistributedProgress.Box + + init(box: DistributedProgress.Box) { + self.box = box + } + + public func next() async throws -> Steps? { + try await box.nextStep() + } + } +} + +// ==== ---------------------------------------------------------------------------------------------------------------- +// MARK: Progress Steps protocol + +public protocol DistributedProgressSteps: Codable, Sendable, Equatable, CaseIterable { + static var count: Int { get } + static var last: Self { get } +} +extension DistributedProgressSteps { + public static var count: Int { + precondition(count > 0, "\(Self.self) cannot have zero steps (cases)!") + return Self.allCases.count + } + + public static var last: Self { + guard let last = Self.allCases.reversed().first else { + fatalError("\(Self.self) cannot have zero steps (cases)!") + } + return last + } +} \ No newline at end of file diff --git a/Tests/DistributedClusterTests/DistributedProgressTests.swift b/Tests/DistributedClusterTests/DistributedProgressTests.swift new file mode 100644 index 000000000..a3c8c3462 --- /dev/null +++ b/Tests/DistributedClusterTests/DistributedProgressTests.swift @@ -0,0 +1,165 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Distributed +import DistributedActorsTestKit +@testable import DistributedCluster +import Foundation +import Logging +import XCTest + +// ==== ---------------------------------------------------------------------------------------------------------------- +// MARK: Builder + +distributed actor Builder: CustomStringConvertible { + let probe: ActorTestProbe + lazy var log = Logger(actor: self) + + init(probe: ActorTestProbe, actorSystem: ActorSystem) { + self.actorSystem = actorSystem + self.probe = probe + probe.tell("Romeo init") + } + + public distributed func build() -> DistributedProgress.Box { + let progress = DistributedProgress(actorSystem: actorSystem, steps: BuildSteps.self) + + Task { + try await progress.whenLocal { progress in + try await progress.to(step: .prepare) + try await Task.sleep(until: .now + .milliseconds(100), clock: .continuous) + try await progress.to(step: .compile) + try await Task.sleep(until: .now + .milliseconds(200), clock: .continuous) + try await progress.to(step: .test) + try await Task.sleep(until: .now + .milliseconds(200), clock: .continuous) + try await progress.to(step: .complete) + } + } + + return DistributedProgress.Box(source: progress) + } + + nonisolated var description: String { + "\(Self.self)(\(id))" + } +} + +enum BuildSteps: String, DistributedProgressSteps { + case prepare + case compile + case test + case complete +} + +// ==== ---------------------------------------------------------------------------------------------------------------- +// MARK: Juliet + +distributed actor BuildWatcher: CustomStringConvertible { + lazy var log = Logger(actor: self) + + let probe: ActorTestProbe + let builder: Builder + + init(probe: ActorTestProbe, builder: Builder, actorSystem: ActorSystem) { + self.actorSystem = actorSystem + self.probe = probe + self.builder = builder + } + + distributed func runBuild_waitCompleted() async throws { + let progress = try await self.builder.build() + + try await progress.completed() + probe.tell("completed") + } + + distributed func runBuild_streamSteps() async throws { + let progress = try await self.builder.build() + + for try await step in try await progress.steps() { + probe.tell("received-step:\(step)") + } + } + + nonisolated var description: String { + "\(Self.self)(\(id))" + } +} + +// ==== ---------------------------------------------------------------------------------------------------------------- +// MARK: Tests + +final class DistributedProgressTests: ClusteredActorSystemsXCTestCase, @unchecked Sendable { + override var captureLogs: Bool { + false + } + + override func configureLogCapture(settings: inout LogCapture.Settings) { + settings.excludeActorPaths = [ + "/system/cluster", + "/system/gossip", + "/system/cluster/gossip", + "/system/receptionist", + "/system/receptionist-ref", + "/system/cluster/swim", + "/system/clusterEvents", + ] + } + + func test_progress_happyPath() async throws { + let system = await self.setUpNode("single") + + let pb = self.testKit(system).makeTestProbe(expecting: String.self) + let pw = self.testKit(system).makeTestProbe(expecting: String.self) + let p = self.testKit(system).makeTestProbe(expecting: String.self) + + let builder = Builder(probe: pb, actorSystem: system) + let watcher = BuildWatcher(probe: pw, builder: builder, actorSystem: system) + + Task { + try await watcher.runBuild_waitCompleted() + p.tell("done") + } + + try pw.expectMessage("completed") + try p.expectMessage("done") + } + + func test_progress_stream() async throws { + let system = await self.setUpNode("single") + + let pb = self.testKit(system).makeTestProbe(expecting: String.self) + let pw = self.testKit(system).makeTestProbe(expecting: String.self) + let p = self.testKit(system).makeTestProbe(expecting: String.self) + + let builder = Builder(probe: pb, actorSystem: system) + let watcher = BuildWatcher(probe: pw, builder: builder, actorSystem: system) + + Task { + try await watcher.runBuild_streamSteps() + p.tell("done") + } + + let messages = try pw.fishForMessages(within: .seconds(5)) { message in + if message == "received-step:\(BuildSteps.last)" { + return .catchComplete + } else { + return .catchContinue + } + } + pinfo("Received \(messages.count) progress updates: \(messages)") + messages.shouldEqual(BuildSteps.allCases.suffix(messages.count).map { "received-step:\($0)" }) + try p.expectMessage("done") + } +}