-
Notifications
You must be signed in to change notification settings - Fork 62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
=cluster handle aggressively rejoining/replacing nodes from same host/port pair #1083
Changes from 7 commits
87c4ffa
8879f25
7f5dfc6
bfe17f5
accb0de
854edde
c06684b
7d160aa
3608af3
794bcca
eb52ee0
6a92463
88a8cc9
f2c3a30
9cc2141
6f429f7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -115,10 +115,13 @@ extension LogCapture { | |
public func printLogs() { | ||
for log in self.logs { | ||
var metadataString: String = "" | ||
var actorPath: String = "" | ||
var actorIdentifier: String = "" | ||
if var metadata = log.metadata { | ||
if let path = metadata.removeValue(forKey: "actor/path") { | ||
actorPath = "[\(path)]" | ||
if let id = metadata.removeValue(forKey: "actor/id") { | ||
actorIdentifier = "[\(id)]" | ||
_ = metadata.removeValue(forKey: "actor/path") // discard it | ||
} else if let path = metadata.removeValue(forKey: "actor/path") { | ||
actorIdentifier = "[\(path)]" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. revamping this now that we have more |
||
} | ||
|
||
metadata.removeValue(forKey: "label") | ||
|
@@ -148,10 +151,10 @@ extension LogCapture { | |
metadataString = String(metadataString.dropLast(1)) | ||
} | ||
} | ||
let date = ActorOriginLogHandler._createFormatter().string(from: log.date) | ||
let date = ActorOriginLogHandler._createSimpleFormatter().string(from: log.date) | ||
let file = log.file.split(separator: "/").last ?? "" | ||
let line = log.line | ||
print("[captured] [\(self.captureLabel)] [\(date)] [\(file):\(line)]\(actorPath) [\(log.level)] \(log.message)\(metadataString)") | ||
print("[captured] [\(self.captureLabel)] \(date) \(log.level) \(actorIdentifier) [\(file):\(line)] \(log.message)\(metadataString)") | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -90,14 +90,22 @@ extension Cluster { | |
self._members.count | ||
} | ||
|
||
/// More efficient than using `members(atLeast:)` followed by a `.count` | ||
/// More efficient than using ``members(atLeast:)`` followed by a `.count` | ||
public func count(atLeast status: Cluster.MemberStatus) -> Int { | ||
self._members.values | ||
.lazy | ||
.filter { member in status <= member.status } | ||
.count | ||
} | ||
|
||
/// More efficient than using ``members(atMost:)`` followed by a `.count` | ||
public func count(atMost status: Cluster.MemberStatus) -> Int { | ||
self._members.values | ||
.lazy | ||
.filter { member in member.status <= status } | ||
.count | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. used in test but generally useful |
||
|
||
/// More efficient than using `members(withStatus:)` followed by a `.count` | ||
public func count(withStatus status: Cluster.MemberStatus) -> Int { | ||
self._members.values | ||
|
@@ -153,6 +161,13 @@ extension Cluster { | |
} | ||
} | ||
|
||
/// Returns all members that are part of this membership, and have the any ``Cluster/MemberStatus`` that is *at most* | ||
/// the passed in `status` passed in and `reachability` status. See ``Cluster/MemberStatus`` to learn more about the meaning of "at most". | ||
/// | ||
/// - Parameters: | ||
/// - statuses: statuses for which to check the members for | ||
/// - reachability: optional reachability that is the members will be filtered by | ||
/// - Returns: array of members matching those checks. Can be empty. | ||
ktoso marked this conversation as resolved.
Show resolved
Hide resolved
|
||
public func members(atMost status: Cluster.MemberStatus, reachability: Cluster.MemberReachability? = nil) -> [Cluster.Member] { | ||
if status == .removed, reachability == nil { | ||
return Array(self._members.values) | ||
|
@@ -223,6 +238,21 @@ extension Cluster { | |
} | ||
} | ||
|
||
extension Cluster.Membership: Sequence { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. generally useful to |
||
public struct Iterator: IteratorProtocol { | ||
public typealias Element = Cluster.Member | ||
internal var it: Dictionary<Cluster.Node, Cluster.Member>.Values.Iterator | ||
|
||
public mutating func next() -> Cluster.Member? { | ||
self.it.next() | ||
} | ||
} | ||
|
||
public func makeIterator() -> Iterator { | ||
.init(it: self._members.values.makeIterator()) | ||
} | ||
} | ||
|
||
// Implementation notes: Membership/Member equality | ||
// | ||
// Membership equality is special, as it manually DOES take into account the Member's states (status, reachability), | ||
|
@@ -619,7 +649,7 @@ extension Cluster.Membership { | |
// TODO: diffing is not super well tested, may lose up numbers | ||
static func _diff(from: Cluster.Membership, to: Cluster.Membership) -> MembershipDiff { | ||
var entries: [Cluster.MembershipChange] = [] | ||
entries.reserveCapacity(max(from._members.count, to._members.count)) | ||
entries.reserveCapacity(Swift.max(from._members.count, to._members.count)) | ||
|
||
// TODO: can likely be optimized more | ||
var to = to | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
//===----------------------------------------------------------------------===// | ||
// | ||
// This source file is part of the Swift Distributed Actors open source project | ||
// | ||
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors | ||
// Licensed under Apache License v2.0 | ||
// | ||
// See LICENSE.txt for license information | ||
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors | ||
// | ||
// SPDX-License-Identifier: Apache-2.0 | ||
// | ||
//===----------------------------------------------------------------------===// | ||
|
||
import DistributedActorsTestKit | ||
@testable import DistributedCluster | ||
import NIO | ||
import XCTest | ||
|
||
final class AggressiveNodeReplacementClusteredTests: ClusteredActorSystemsXCTestCase { | ||
override func configureLogCapture(settings: inout LogCapture.Settings) { | ||
settings.excludeActorPaths = [ | ||
"/system/replicator", | ||
"/system/cluster/swim", | ||
] | ||
} | ||
|
||
func test_join_replacement_repeatedly_shouldConsistentlyReplaceOldNode() async throws { | ||
let main = await setUpNode("main") { settings in | ||
settings.bindPort += 100 | ||
} | ||
|
||
let rounds = 5 | ||
for round in 0 ..< rounds { | ||
main.log.info("Joining second replacement node, round: \(round)") | ||
|
||
// We purposefully make sure the `second` node becomes leader -- it has the lowest port. | ||
// This is the "worst case" since the leader is the one marking things "down" usually. | ||
// But here we want to rely on the replacement mechanism triggering the ".down" of the "previous node on | ||
// the same address". | ||
let second = await setUpNode("second-\(round)") { settings in | ||
// always the same host/port (!), this means we'll be continuously replacing the "old" (previous) node | ||
settings.endpoint.host = main.cluster.endpoint.host | ||
settings.endpoint.port = main.cluster.endpoint.port - 100 // we want the this node to be the leader -- lowest address | ||
} | ||
|
||
let service = await ServiceActor(actorSystem: second) | ||
|
||
main.log.notice("Joining [\(second.cluster.endpoint)] to stable main: [\(main.cluster.endpoint)]") | ||
|
||
// Join the main node, and replace the existing "second" node which the main perhaps does not even yet realize has become down. | ||
// Thus must trigger a down of the old node. | ||
second.cluster.join(node: main.cluster.node) | ||
for await actor in await second.receptionist.listing(of: .aggressiveNodeReplacementService).prefix(1) { | ||
_ = try await actor.randomInt() | ||
main.log.notice("Roundtrip with second [\(reflecting: second.cluster.node)] - OK") | ||
break | ||
} | ||
|
||
try second.shutdown() // shutdown and immediately create a new instance on the same host-port to replace it | ||
// On purpose: do NOT wait for it to shut down completely. | ||
} | ||
|
||
let membership: Cluster.Membership = await main.cluster.membershipSnapshot | ||
|
||
// 3 still can happen, since we can have the "down" second and the "joining/up" second. | ||
membership.count(atMost: .up).shouldBeLessThanOrEqual(3) | ||
|
||
// Verify we indeed saw 4 replacements: | ||
try self.capturedLogs(of: main).shouldContain(grep: "which replaces the previously known: [Member(sact://second-0:") | ||
try self.capturedLogs(of: main).shouldContain(grep: "which replaces the previously known: [Member(sact://second-1:") | ||
try self.capturedLogs(of: main).shouldContain(grep: "which replaces the previously known: [Member(sact://second-2:") | ||
try self.capturedLogs(of: main).shouldContain(grep: "which replaces the previously known: [Member(sact://second-3:") | ||
// 4 is not replaced | ||
} | ||
|
||
distributed actor ServiceActor { | ||
var hellosSentCount: Int = 0 | ||
|
||
init(actorSystem: ActorSystem) async { | ||
self.actorSystem = actorSystem | ||
await actorSystem.receptionist.checkIn(self, with: .aggressiveNodeReplacementService) | ||
actorSystem.log.notice("Registering actor with \(DistributedReception.Key<ServiceActor>.aggressiveNodeReplacementService)!") | ||
} | ||
|
||
distributed func randomInt(in range: Range<Int> = 0 ..< 10) async throws -> Int { | ||
return Int.random(in: range) | ||
} | ||
} | ||
} | ||
|
||
extension DistributedReception.Key { | ||
static var aggressiveNodeReplacementService: DistributedReception.Key<AggressiveNodeReplacementClusteredTests.ServiceActor> { | ||
"aggressive-rejoin-service-actors" | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
//===----------------------------------------------------------------------===// | ||
// | ||
// This source file is part of the Swift Distributed Actors open source project | ||
// | ||
// Copyright (c) 2022 Apple Inc. and the Swift Distributed Actors project authors | ||
// Licensed under Apache License v2.0 | ||
// | ||
// See LICENSE.txt for license information | ||
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors | ||
// | ||
// SPDX-License-Identifier: Apache-2.0 | ||
// | ||
//===----------------------------------------------------------------------===// | ||
|
||
import DistributedCluster | ||
|
||
typealias DefaultDistributedActorSystem = ClusterSystem |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same formatting as our example formatters in samples