Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

=cluster handle aggressively rejoining/replacing nodes from same host/port pair #1083

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ struct SamplePrettyLogHandler: LogHandler {

let file = file.split(separator: "/").last ?? ""
let line = line
print("\(self.timestamp()) [\(file):\(line)] [\(nodeInfo)\(Self.CONSOLE_BOLD)\(label)\(Self.CONSOLE_RESET)] [\(level)] \(message)\(metadataString)")
print("\(self.timestamp()) \(level) [\(nodeInfo)\(Self.CONSOLE_BOLD)\(label)\(Self.CONSOLE_RESET)][\(file):\(line)] \(message)\(metadataString)")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same formatting as our example formatters in samples

}

internal func prettyPrint(metadata: Logger.MetadataValue) -> String {
Expand Down
13 changes: 8 additions & 5 deletions Sources/DistributedActorsTestKit/LogCapture.swift
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,13 @@ extension LogCapture {
public func printLogs() {
for log in self.logs {
var metadataString: String = ""
var actorPath: String = ""
var actorIdentifier: String = ""
if var metadata = log.metadata {
if let path = metadata.removeValue(forKey: "actor/path") {
actorPath = "[\(path)]"
if let id = metadata.removeValue(forKey: "actor/id") {
actorIdentifier = "[\(id)]"
_ = metadata.removeValue(forKey: "actor/path") // discard it
} else if let path = metadata.removeValue(forKey: "actor/path") {
actorIdentifier = "[\(path)]"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revamping this now that we have more distributed actor types and less paths 👍

}

metadata.removeValue(forKey: "label")
Expand Down Expand Up @@ -148,10 +151,10 @@ extension LogCapture {
metadataString = String(metadataString.dropLast(1))
}
}
let date = ActorOriginLogHandler._createFormatter().string(from: log.date)
let date = ActorOriginLogHandler._createSimpleFormatter().string(from: log.date)
let file = log.file.split(separator: "/").last ?? ""
let line = log.line
print("[captured] [\(self.captureLabel)] [\(date)] [\(file):\(line)]\(actorPath) [\(log.level)] \(log.message)\(metadataString)")
print("[captured] [\(self.captureLabel)] \(date) \(log.level) \(actorIdentifier) [\(file):\(line)] \(log.message)\(metadataString)")
}
}

Expand Down
8 changes: 8 additions & 0 deletions Sources/DistributedCluster/ActorLogging.swift
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,14 @@ struct ActorOriginLogHandler: LogHandler {
return formatter
}

public static func _createSimpleFormatter() -> DateFormatter {
let formatter = DateFormatter()
formatter.dateFormat = "H:m:ss.SSSS"
formatter.locale = Locale(identifier: "en_US")
formatter.calendar = Calendar(identifier: .gregorian)
return formatter
}

private let context: LoggingContext

private var targetLogger: Logger
Expand Down
34 changes: 32 additions & 2 deletions Sources/DistributedCluster/Cluster/Cluster+Membership.swift
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,22 @@ extension Cluster {
self._members.count
}

/// More efficient than using `members(atLeast:)` followed by a `.count`
/// More efficient than using ``members(atLeast:)`` followed by a `.count`
public func count(atLeast status: Cluster.MemberStatus) -> Int {
self._members.values
.lazy
.filter { member in status <= member.status }
.count
}

/// More efficient than using ``members(atMost:)`` followed by a `.count`
public func count(atMost status: Cluster.MemberStatus) -> Int {
self._members.values
.lazy
.filter { member in member.status <= status }
.count
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

used in test but generally useful


/// More efficient than using `members(withStatus:)` followed by a `.count`
public func count(withStatus status: Cluster.MemberStatus) -> Int {
self._members.values
Expand Down Expand Up @@ -153,6 +161,13 @@ extension Cluster {
}
}

/// Returns all members that are part of this membership, and have the any ``Cluster/MemberStatus`` that is *at most*
/// the passed in `status` passed in and `reachability` status. See ``Cluster/MemberStatus`` to learn more about the meaning of "at most".
///
/// - Parameters:
/// - statuses: statuses for which to check the members for
/// - reachability: optional reachability that is the members will be filtered by
/// - Returns: array of members matching those checks. Can be empty.
ktoso marked this conversation as resolved.
Show resolved Hide resolved
public func members(atMost status: Cluster.MemberStatus, reachability: Cluster.MemberReachability? = nil) -> [Cluster.Member] {
if status == .removed, reachability == nil {
return Array(self._members.values)
Expand Down Expand Up @@ -223,6 +238,21 @@ extension Cluster {
}
}

extension Cluster.Membership: Sequence {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally useful to for member in membership 👍

public struct Iterator: IteratorProtocol {
public typealias Element = Cluster.Member
internal var it: Dictionary<Cluster.Node, Cluster.Member>.Values.Iterator

public mutating func next() -> Cluster.Member? {
self.it.next()
}
}

public func makeIterator() -> Iterator {
.init(it: self._members.values.makeIterator())
}
}

// Implementation notes: Membership/Member equality
//
// Membership equality is special, as it manually DOES take into account the Member's states (status, reachability),
Expand Down Expand Up @@ -619,7 +649,7 @@ extension Cluster.Membership {
// TODO: diffing is not super well tested, may lose up numbers
static func _diff(from: Cluster.Membership, to: Cluster.Membership) -> MembershipDiff {
var entries: [Cluster.MembershipChange] = []
entries.reserveCapacity(max(from._members.count, to._members.count))
entries.reserveCapacity(Swift.max(from._members.count, to._members.count))

// TODO: can likely be optimized more
var to = to
Expand Down
3 changes: 0 additions & 3 deletions Sources/MultiNodeTestKit/MultiNodeTestConductor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,8 @@ extension MultiNodeTestConductor {
checkPoint: MultiNode.Checkpoint,
waitTime: Duration) async throws
{
self.actorSystem.log.warning("CHECKPOINT FROM [\(node)]")
try await RemoteCall.with(timeout: waitTime) {
self.actorSystem.log.warning("CHECKPOINT FROM [\(node)] INNER (\(__isRemoteActor(self)))")
try await self._enterCheckPoint(node: node, checkPoint: checkPoint)
self.actorSystem.log.warning("CHECKPOINT FROM [\(node)] DONE")
}
}

Expand Down
21 changes: 20 additions & 1 deletion Sources/MultiNodeTestKit/MultiNodeTestKit+Control.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,22 @@ extension MultiNodeTest {
}
}

/// The "current" cluster system on which this code is executing.
public var system: ClusterSystem {
precondition(self._actorSystem != nil, "Actor system already released!")
return self._actorSystem!
}

/// The "current" endpoint on which this code is executing.
public var endpoint: Cluster.Endpoint {
self.system.cluster.endpoint
}

/// The "current" node on which this code is executing.
public var node: Cluster.Node {
self.system.cluster.node
}

/// Logger specific to this concrete node in a multi-node test.
/// Once an ``actorSystem`` is assigned to this multi-node control,
/// this logger is the same as the actor system's default logger.
Expand All @@ -53,10 +64,17 @@ extension MultiNodeTest {
self.system.cluster
}

public var allNodes: some Collection<Cluster.Endpoint> {
public var allEndpoints: some Collection<Cluster.Endpoint> {
self._allEndpoints.values
}

public func endpoint(_ node: Nodes) -> Cluster.Endpoint {
guard let endpoint = _allEndpoints[node.rawValue] else {
fatalError("No such node: \(node) known to cluster control. Known nodes: \(self._allEndpoints.keys)")
}
return endpoint
}

public init(nodeName: String) {
self.nodeName = nodeName
}
Expand All @@ -75,6 +93,7 @@ extension MultiNodeTest {
// MARK: Run pieces of code on specific node

extension MultiNodeTest.Control {
/// Runs a piece of code only on the specified `node`.
ktoso marked this conversation as resolved.
Show resolved Hide resolved
public func on(_ node: Nodes) -> Bool {
return node.rawValue == self.system.name
}
Expand Down
28 changes: 23 additions & 5 deletions Sources/MultiNodeTestKitRunner/OutputGrepper.swift
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,19 @@ internal struct OutputGrepper {
}
}

typealias ProgramOutput = [String]
struct ProgramOutput: Sendable {
let logs: [String]
var expectedExit: Bool = false

init(logs: [String]) {
self.logs = logs
}

init(logs: [String], expectedExit: Bool) {
self.logs = logs
self.expectedExit = expectedExit
}
}

private final class GrepHandler: ChannelInboundHandler {
typealias InboundIn = String
Expand Down Expand Up @@ -109,15 +121,21 @@ private final class GrepHandler: ChannelInboundHandler {
line.lowercased().contains("precondition failed") ||
line.lowercased().contains("assertion failed")
{
self.promise.fail(MultiNodeProgramError(message: line, completeOutput: self.logs))
self.logs = []
context.close(promise: nil)
if line.contains("MULTI-NODE-EXPECTED-EXIT") {
self.promise.succeed(.init(logs: logs, expectedExit: true))
context.close(promise: nil)
return
} else {
self.promise.fail(MultiNodeProgramError(message: line, completeOutput: self.logs))
self.logs = []
context.close(promise: nil)
}
}
}

func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
if case .some(.inputClosed) = event as? ChannelEvent {
self.promise.succeed(self.logs)
self.promise.succeed(.init(logs: self.logs))
context.close(promise: nil)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,9 @@ struct MultiNodeTestKitRunnerBoot {
detectedReason = .outputError("\(reasonLines)\n\(line)")
}
}
case .success(let logs):
case .success(let result):
if settings.dumpNodeLogs == .always {
for line in logs {
for line in result.logs {
log("[\(nodeName)](\(multiNodeTest.testName)) \(line)")
}
}
Expand All @@ -181,7 +181,8 @@ struct MultiNodeTestKitRunnerBoot {
return .unexpectedRunResult(runResult)
}

let outputLines = try result.get()
let result = try result.get()
let outputLines = result.logs
let outputJoined = outputLines.joined(separator: "\n")
if outputJoined.range(of: expectedFailureRegex, options: .regularExpression) != nil {
if settings.dumpNodeLogs == .always {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import DistributedActorsTestKit
@testable import DistributedCluster
import NIO
import XCTest

final class AggressiveNodeReplacementClusteredTests: ClusteredActorSystemsXCTestCase {
override func configureLogCapture(settings: inout LogCapture.Settings) {
settings.excludeActorPaths = [
"/system/replicator",
"/system/cluster/swim",
]
}

func test_join_replacement_repeatedly_shouldConsistentlyReplaceOldNode() async throws {
let main = await setUpNode("main") { settings in
settings.bindPort += 100
}

let rounds = 5
for round in 0 ..< rounds {
main.log.info("Joining second replacement node, round: \(round)")

// We purposefully make sure the `second` node becomes leader -- it has the lowest port.
// This is the "worst case" since the leader is the one marking things "down" usually.
// But here we want to rely on the replacement mechanism triggering the ".down" of the "previous node on
// the same address".
let second = await setUpNode("second-\(round)") { settings in
// always the same host/port (!), this means we'll be continuously replacing the "old" (previous) node
settings.endpoint.host = main.cluster.endpoint.host
settings.endpoint.port = main.cluster.endpoint.port - 100 // we want the this node to be the leader -- lowest address
}

let service = await ServiceActor(actorSystem: second)

main.log.notice("Joining [\(second.cluster.endpoint)] to stable main: [\(main.cluster.endpoint)]")

// Join the main node, and replace the existing "second" node which the main perhaps does not even yet realize has become down.
// Thus must trigger a down of the old node.
second.cluster.join(node: main.cluster.node)
for await actor in await second.receptionist.listing(of: .aggressiveNodeReplacementService).prefix(1) {
_ = try await actor.randomInt()
main.log.notice("Roundtrip with second [\(reflecting: second.cluster.node)] - OK")
break
}

try second.shutdown() // shutdown and immediately create a new instance on the same host-port to replace it
// On purpose: do NOT wait for it to shut down completely.
}

let membership: Cluster.Membership = await main.cluster.membershipSnapshot

// 3 still can happen, since we can have the "down" second and the "joining/up" second.
membership.count(atMost: .up).shouldBeLessThanOrEqual(3)

// Verify we indeed saw 4 replacements:
try self.capturedLogs(of: main).shouldContain(grep: "which replaces the previously known: [Member(sact://second-0:")
try self.capturedLogs(of: main).shouldContain(grep: "which replaces the previously known: [Member(sact://second-1:")
try self.capturedLogs(of: main).shouldContain(grep: "which replaces the previously known: [Member(sact://second-2:")
try self.capturedLogs(of: main).shouldContain(grep: "which replaces the previously known: [Member(sact://second-3:")
// 4 is not replaced
}

distributed actor ServiceActor {
var hellosSentCount: Int = 0

init(actorSystem: ActorSystem) async {
self.actorSystem = actorSystem
await actorSystem.receptionist.checkIn(self, with: .aggressiveNodeReplacementService)
actorSystem.log.notice("Registering actor with \(DistributedReception.Key<ServiceActor>.aggressiveNodeReplacementService)!")
}

distributed func randomInt(in range: Range<Int> = 0 ..< 10) async throws -> Int {
return Int.random(in: range)
}
}
}

extension DistributedReception.Key {
static var aggressiveNodeReplacementService: DistributedReception.Key<AggressiveNodeReplacementClusteredTests.ServiceActor> {
"aggressive-rejoin-service-actors"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,7 @@ final class DowningClusteredTests: ClusteredActorSystemsXCTestCase {

func test_many_nonLeaders_shouldPropagateToOtherNodes() async throws {
if Int.random(in: 10 ... 100) > 0 {
pinfo("SKIPPING FLAKY TEST, REVISIT IT SOON") // FIXME: https://github.com/apple/swift-distributed-actors/issues/712
return
throw XCTSkip("SKIPPING FLAKY TEST, REVISIT IT SOON") // FIXME: https://github.com/apple/swift-distributed-actors/issues/712
}

var nodes: [ClusterSystem] = []
Expand Down
17 changes: 17 additions & 0 deletions Tests/DistributedClusterTests/DefaultActorSystem.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2022 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import DistributedCluster

typealias DefaultDistributedActorSystem = ClusterSystem