Skip to content

Commit

Permalink
[docc] Soundness, docs additions and minor test cleanups for beta.2
Browse files Browse the repository at this point in the history
* fix docc warnings

* [docc] bring back @comment { fishy docs } enabling

* [docc] validate we don't have any warnings in docc, e.g. bad links

* fix validate_docc script

* Showcase: Incorrect number of arguments passed to called function!
  musttail call swifttailcc void %4(%swift.context* swiftasync %1,
%swift.error* %2), !dbg !1538
in function
$s17DistributedActors10WorkerPoolCyxGAA14LifecycleWatchA2aEP10terminated5actoryAA13ClusterSystemC7ActorIDV_tYaFTW.0.61
<unknown>:0: error: fatal error encountered during compilation; please
submit a bug report (https://swift.org/contributing/#reporting-bugs) and
include the project
<unknown>:0: note: Broken function found, compilation aborted!

* fix conformance crasher

* remove superflous prints and logs

* =receptionist rdar://97586760 lookup() trying to cast stub actor

* work on WeakActorDictionary

* Update Sources/DistributedActors/DistributedActors.docc/Receptionist.md

Co-authored-by: Yim Lee <[email protected]>

* +test reenable testkit tests

* !system cleanup shutdown() and terminated methods on actor system

* formatting

* rename Docs directory, add Security page

* test fixes

* +docs receptionist checkout docs

* +docs some docs for leadership

Co-authored-by: Yim Lee <[email protected]>
  • Loading branch information
ktoso and yim-lee authored Jul 28, 2022
1 parent ec23c41 commit 811d2e8
Show file tree
Hide file tree
Showing 52 changed files with 574 additions and 383 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import PackagePlugin
@main struct FishyDocsBuildPlugin: BuildToolPlugin {
func createBuildCommands(context: PluginContext, target: Target) async throws -> [Command] {
let genSourcesDir = context.pluginWorkDirectory
let doccBasePath = "\(context.package.directory)/Sources/DistributedActors/DistributedActors.docc"
let doccBasePath = "\(context.package.directory)/Sources/DistributedActors/Docs.docc"

let mdFiles = try FileManager.default
.contentsOfDirectory(atPath: doccBasePath)
Expand Down
63 changes: 55 additions & 8 deletions InternalPlugins/FishyDocs/Sources/FishyDocs/FishyDocs.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@ func log(_ log: String, file: String = #fileID, line: UInt = #line) {
print("[fishy-docs] \(log)")
}

struct Commands {
static let Namespace = "fishy-docs"
static let Enable = "\(Namespace):enable"
static let Disable = "\(Namespace):disable"
static let Skip = "\(Namespace):skip-next"
}

@main
struct FishyDocs: ParsableCommand {
@Option(help: "Folder containing the docc documentation to scan for fishy-docs")
Expand Down Expand Up @@ -41,16 +48,23 @@ struct FishyDocs: ParsableCommand {
}

func usesFishyDocs(document: Document, url: URL) -> Bool {
return (try? String(contentsOf: url).contains("fishy-docs:enable")) ?? false

// FIXME(docc): docc breaks when @Comment is used in symbol documentation: https://github.com/apple/swift-docc/issues/343
// var detectFishyDocs = DetectFishyDocs()
// detectFishyDocs.visit(document)
// return detectFishyDocs.detected
var detectFishyDocs = DetectFishyDocs()
detectFishyDocs.visit(document)
return detectFishyDocs.detected
}

func makeDocsTestCode(document: Document, doccFileName: String) throws -> String? {
var concatCodeBlocks = ConcatCodeBlocks()
assert(!doccFileName.isEmpty)

guard var name = doccFileName.split(separator: "/").last else {
return nil
}
guard let simpleName = name.split(separator: ".").first else {
return nil
}
name = simpleName

var concatCodeBlocks = ConcatCodeBlocks(name: String(name))
concatCodeBlocks.visit(document)

guard !concatCodeBlocks.code.isEmpty else {
Expand Down Expand Up @@ -79,6 +93,20 @@ struct DetectFishyDocs: MarkupWalker {
struct ConcatCodeBlocks: MarkupWalker {
var importBlocks: [CodeBlock] = []
var codeBlocks: [CodeBlock] = []

/// True if capturing code blocks is enabled
var includeCodeBlock: Bool = true

/// Allows for skipping a single code block, followed after the fishy-docs:skip-next
var skipNextCodeBlock: Bool = false

/// Name of the file we're compile-testing
let name: String

init(name: String) {
self.name = name
}

var code: String {
var allBlocks = importBlocks
allBlocks.append(contentsOf: codeBlocks)
Expand All @@ -88,7 +116,7 @@ struct ConcatCodeBlocks: MarkupWalker {
codeBlockStrings.append(block.code)
}

codeBlockStrings.append("func __test() async throws {")
codeBlockStrings.append("func __compileTest_\(self.name)() async throws {")
for block in codeBlocks {
var s = "// ==== "

Expand All @@ -115,7 +143,26 @@ struct ConcatCodeBlocks: MarkupWalker {
return codeBlockStrings.joined(separator: "\n")
}

mutating func visitParagraph(_ paragraph: Paragraph) {
if paragraph.plainText.contains(Commands.Enable) {
self.includeCodeBlock = true
} else if paragraph.plainText.contains(Commands.Disable) {
self.includeCodeBlock = false
} else if paragraph.plainText.contains(Commands.Skip) {
self.skipNextCodeBlock = true
self.includeCodeBlock = false
}
}

mutating func visitCodeBlock(_ codeBlock: CodeBlock) {
guard !self.skipNextCodeBlock else {
self.skipNextCodeBlock = false
return
}
guard self.includeCodeBlock else {
return
}

if codeBlock.code.contains("import ") {
self.importBlocks.append(codeBlock)
} else {
Expand Down
16 changes: 8 additions & 8 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ var targets: [PackageDescription.Target] = [
]
),

// .testTarget(
// name: "DistributedActorsTestKitTests",
// dependencies: [
// "DistributedActors",
// "DistributedActorsTestKit"
// ]
// ),
//
.testTarget(
name: "DistributedActorsTestKitTests",
dependencies: [
"DistributedActors",
"DistributedActorsTestKit",
]
),

// .testTarget(
// name: "CDistributedActorsMailboxTests",
// dependencies: [
Expand Down
3 changes: 1 addition & 2 deletions Sources/DistributedActors/Cluster/ClusterEventStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ public struct ClusterEventStream: AsyncSequence {
}
}

// FIXME(distributed): the only reason this actor is distributed is because of LifecycleWatch
internal distributed actor ClusterEventStreamActor: LifecycleWatch {
typealias ActorSystem = ClusterSystem

Expand Down Expand Up @@ -210,7 +209,7 @@ internal distributed actor ClusterEventStreamActor: LifecycleWatch {
}
}

distributed func terminated(actor id: ActorID) {
func terminated(actor id: ActorID) {
if self.subscribers.removeValue(forKey: id) != nil {
self.log.trace("Removed subscriber [\(id)], because it terminated")
}
Expand Down
10 changes: 7 additions & 3 deletions Sources/DistributedActors/Cluster/ClusterShell.swift
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ extension ClusterShell {
}
.receiveSpecificSignal(_Signals.Terminated.self) { context, signal in
context.log.error("Cluster actor \(signal.id) terminated unexpectedly! Will initiate cluster shutdown.")
context.system.shutdown()
try context.system.shutdown()
return .same // the system shutdown will cause downing which we may want to still handle, and then will stop us
}
}
Expand Down Expand Up @@ -1305,8 +1305,12 @@ extension ClusterShell {
let onDownAction = context.system.settings.onDownAction.make()
try onDownAction(context.system) // TODO: return a future and run with a timeout
} catch {
context.system.log.error("Failed to executed onDownAction! Shutting down system forcefully! Error: \(error)")
context.system.shutdown()
context.system.log.error("Failed to executed onDownAction! Shutting down system forcefully!", metadata: ["error": "\(error)"])
do {
try context.system.shutdown()
} catch {
context.system.log.error("Failed shutting down actor system!", metadata: ["error": "\(error)"])
}
}

state = self.interpretLeaderActions(context.system, state, state.collectLeaderActions())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public struct OnDownActionStrategySettings {
guard .milliseconds(0) < shutdownDelay else {
context.log.warning("This node was marked as [.down], delay is immediate. Shutting down the system immediately!")
Task {
system.shutdown()
try system.shutdown()
}
return .stop
}
Expand All @@ -85,7 +85,7 @@ public struct OnDownActionStrategySettings {
return .receiveMessage { _ in
system.log.warning("Shutting down...")
Task {
system.shutdown()
try system.shutdown()
}
return .stop
}
Expand Down
5 changes: 3 additions & 2 deletions Sources/DistributedActors/Cluster/Leadership.swift
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,9 @@ extension Leadership {
/// fulfilling this role whenever the minimum number of nodes exist. This may be useful when operation would
/// potentially be unsafe given less than `minimumNrOfMembers` nodes.
///
// TODO: In situations which need strong guarantees, this leadership election scheme does NOT provide strong enough
/// guarantees, and you should consider using another scheme or consensus based modes.
public struct LowestReachableMember: LeaderElection {
// TODO: In situations which need strong guarantees, this leadership election scheme does NOT provide strong enough
// guarantees, and you should consider using another scheme or consensus based modes.
let minimumNumberOfMembersToDecide: Int
let loseLeadershipIfBelowMinNrOfMembers: Bool

Expand Down Expand Up @@ -318,6 +318,7 @@ extension Leadership {
// MARK: Leadership settings

extension ClusterSystemSettings {
/// Configure leadership election using which the cluster leader should be decided.
public struct LeadershipSelectionSettings {
private enum _LeadershipSelectionSettings {
case none
Expand Down
1 change: 0 additions & 1 deletion Sources/DistributedActors/Cluster/NodeDeathWatcher.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import NIO
/// and individually watched actors, the watcher handles subscribing for cluster events on behalf of actors which watch
/// other actors on remote nodes, and messages them `SystemMessage.nodeTerminated(node)` upon node termination (down),
/// which are in turn translated by the actors locally to `SystemMessage.terminated(ref:existenceConfirmed:idTerminated:true)`
///
/// to any actor which watched at least one actor on a node that has been downed.
///
/// Actor which is notified automatically when a remote actor is `context.watch()`-ed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,7 @@ extension OpLogDistributedReceptionist {
// MARK: Termination handling

extension OpLogDistributedReceptionist {
public distributed func terminated(actor id: ID) {
public func terminated(actor id: ID) {
if id == ActorID._receptionist(on: id.uniqueNode, for: .distributedActors) {
self.log.debug("Watched receptionist terminated: \(id)")
self.receptionistTerminated(identity: id)
Expand Down
44 changes: 20 additions & 24 deletions Sources/DistributedActors/ClusterSystem.swift
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable {

// Access MUST be protected with `namingLock`.
private var _managedRefs: [ActorID: _ReceivesSystemMessages] = [:]
private var _managedDistributedActors: WeakActorDictionary = .init()
private var _managedDistributedActors: WeakAnyDistributedActorDictionary = .init()
private var _reservedNames: Set<ActorID> = []

typealias WellKnownName = String
Expand Down Expand Up @@ -427,51 +427,57 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable {
#endif
}

/// Object that can be awaited on until the system has completed shutting down.
public struct Shutdown {
private let receptacle: BlockingReceptacle<Error?>

init(receptacle: BlockingReceptacle<Error?>) {
self.receptacle = receptacle
}

@available(*, deprecated, message: "will be replaced by distributed actor / closure version")
public func wait(atMost timeout: Duration) throws {
if let error = self.receptacle.wait(atMost: timeout).flatMap({ $0 }) {
throw error
}
}

@available(*, deprecated, message: "will be replaced by distributed actor / closure version")
public func wait() throws {
if let error = self.receptacle.wait() {
throw error
}
}
}

/// Suspends until the ``ClusterSystem`` is terminated by a call to ``shutdown``.
var terminated: Void {
get async throws {
/// Suspend until the system has completed its shutdown and is terminated.
public func wait() async throws {
// TODO: implement without blocking the internal task;
try await Task.detached {
try Shutdown(receptacle: self.shutdownReceptacle).wait()
if let error = self.receptacle.wait() {
throw error
}
}.value
}
}

/// Suspends until the ``ClusterSystem`` is terminated by a call to ``shutdown()``.
public var terminated: Void {
get async throws {
try await Shutdown(receptacle: self.shutdownReceptacle).wait()
}
}

/// Forcefully stops this actor system and all actors that live within it.
/// This is an asynchronous operation and will be executed on a separate thread.
///
/// You can use `shutdown().wait()` to synchronously await on the system's termination,
/// or provide a callback to be executed after the system has completed it's shutdown.
///
/// - Parameters:
/// - queue: allows configuring on which dispatch queue the shutdown operation will be finalized.
/// - afterShutdownCompleted: optional callback to be invoked when the system has completed shutting down.
/// Will be invoked on the passed in `queue` (which defaults to `DispatchQueue.global()`).
/// - Returns: A `Shutdown` value that can be waited upon until the system has completed the shutdown.
@discardableResult
public func shutdown(queue: DispatchQueue = DispatchQueue.global(), afterShutdownCompleted: @escaping (Error?) -> Void = { _ in () }) -> Shutdown {
public func shutdown() throws -> Shutdown {
guard self.shutdownFlag.loadThenWrappingIncrement(by: 1, ordering: .relaxed) == 0 else {
// shutdown already kicked off by someone else
afterShutdownCompleted(nil)
return Shutdown(receptacle: self.shutdownReceptacle)
}

Expand Down Expand Up @@ -510,7 +516,7 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable {
// self._receptionistRef = self.deadLetters.adapted()
} catch {
self.shutdownReceptacle.offerOnce(error)
afterShutdownCompleted(error)
throw error
}

/// Only once we've shutdown all dispatchers and loops, we clear cycles between the serialization and system,
Expand All @@ -523,7 +529,6 @@ public class ClusterSystem: DistributedActorSystem, @unchecked Sendable {
_ = self._clusterStore.storeIfNilThenLoad(Box(nil))

self.shutdownReceptacle.offerOnce(nil)
afterShutdownCompleted(nil)

return Shutdown(receptacle: self.shutdownReceptacle)
}
Expand Down Expand Up @@ -1085,16 +1090,13 @@ extension ClusterSystem {
Res: Codable
{
if let interceptor = actor.id.context.remoteCallInterceptor {
self.log.warning("INTERCEPTOR remote call \(actor.id)...")
print("[\(self.cluster.uniqueNode)] INTERCEPTOR remote call \(actor.id)...")
return try await interceptor.interceptRemoteCall(on: actor, target: target, invocation: &invocation, throwing: throwing, returning: returning)
}

guard actor.id.uniqueNode != self.cluster.uniqueNode else {
// It actually is a remote call, so redirect it to local call-path.
// Such calls can happen when we deal with interceptors and proxies;
// To make their lives easier, we centralize the noticing when a call is local and dispatch it from here.
self.log.warning("ACTUALLY LOCAL CALL: \(target) on \(actor.id)")
return try await self.localCall(on: actor, target: target, invocation: &invocation, throwing: throwing, returning: returning)
}

Expand All @@ -1116,13 +1118,10 @@ extension ClusterSystem {
arguments: arguments
)

print("[\(self.cluster.uniqueNode)] SEND INVOCATION: \(invocation) TO \(recipient.id.fullDescription)")
log.warning("[\(self.cluster.uniqueNode)] SEND INVOCATION: \(invocation) TO \(recipient.id.fullDescription)")
recipient.sendInvocation(invocation)
}

if let error = reply.thrownError {
print("[\(self.cluster.uniqueNode)] reply error: \(error)")
throw error
}
guard let value = reply.value else {
Expand Down Expand Up @@ -1258,13 +1257,10 @@ extension ClusterSystem {
Err: Error,
Res: Codable
{
print("[\(self.cluster.uniqueNode)] ACT: \(actor.id.fullDescription)")
precondition(
self.cluster.uniqueNode == actor.id.uniqueNode,
"Attempted to localCall an actor whose ID was a different node: [\(actor.id)], current node: \(self.cluster.uniqueNode)"
)
// precondition(!__isRemoteActor(actor),
// "Attempted to localCall a remote actor! \(actor.id)")
self.log.trace("Execute local call", metadata: [
"actor/id": "\(actor.id.fullDescription)",
"target": "\(target)",
Expand Down Expand Up @@ -1302,7 +1298,7 @@ extension ClusterSystem {
"invocation": "\(invocation)",
])

guard let shell = self._cluster else {
guard self._cluster != nil else {
self.log.error("Cluster has shut down already, yet received message. Message will be dropped: \(invocation)")
return
}
Expand Down
Loading

0 comments on commit 811d2e8

Please sign in to comment.