Skip to content

Commit

Permalink
=cluster handle aggressively rejoining/replacing nodes from same host…
Browse files Browse the repository at this point in the history
…/port pair (#1083)

Co-authored-by: Yim Lee <[email protected]>
  • Loading branch information
ktoso and yim-lee authored Nov 1, 2022
1 parent d816a10 commit 4934208
Show file tree
Hide file tree
Showing 28 changed files with 271 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ struct SamplePrettyLogHandler: LogHandler {

let file = file.split(separator: "/").last ?? ""
let line = line
print("\(self.timestamp()) [\(file):\(line)] [\(nodeInfo)\(Self.CONSOLE_BOLD)\(label)\(Self.CONSOLE_RESET)] [\(level)] \(message)\(metadataString)")
print("\(self.timestamp()) \(level) [\(nodeInfo)\(Self.CONSOLE_BOLD)\(label)\(Self.CONSOLE_RESET)][\(file):\(line)] \(message)\(metadataString)")
}

internal func prettyPrint(metadata: Logger.MetadataValue) -> String {
Expand Down
13 changes: 8 additions & 5 deletions Sources/DistributedActorsTestKit/LogCapture.swift
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,13 @@ extension LogCapture {
public func printLogs() {
for log in self.logs {
var metadataString: String = ""
var actorPath: String = ""
var actorIdentifier: String = ""
if var metadata = log.metadata {
if let path = metadata.removeValue(forKey: "actor/path") {
actorPath = "[\(path)]"
if let id = metadata.removeValue(forKey: "actor/id") {
actorIdentifier = "[\(id)]"
_ = metadata.removeValue(forKey: "actor/path") // discard it
} else if let path = metadata.removeValue(forKey: "actor/path") {
actorIdentifier = "[\(path)]"
}

metadata.removeValue(forKey: "label")
Expand Down Expand Up @@ -148,10 +151,10 @@ extension LogCapture {
metadataString = String(metadataString.dropLast(1))
}
}
let date = ActorOriginLogHandler._createFormatter().string(from: log.date)
let date = ActorOriginLogHandler._createSimpleFormatter().string(from: log.date)
let file = log.file.split(separator: "/").last ?? ""
let line = log.line
print("[captured] [\(self.captureLabel)] [\(date)] [\(file):\(line)]\(actorPath) [\(log.level)] \(log.message)\(metadataString)")
print("[captured] [\(self.captureLabel)] \(date) \(log.level) \(actorIdentifier) [\(file):\(line)] \(log.message)\(metadataString)")
}
}

Expand Down
8 changes: 8 additions & 0 deletions Sources/DistributedCluster/ActorLogging.swift
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,14 @@ struct ActorOriginLogHandler: LogHandler {
return formatter
}

public static func _createSimpleFormatter() -> DateFormatter {
let formatter = DateFormatter()
formatter.dateFormat = "H:m:ss.SSSS"
formatter.locale = Locale(identifier: "en_US")
formatter.calendar = Calendar(identifier: .gregorian)
return formatter
}

private let context: LoggingContext

private var targetLogger: Logger
Expand Down
2 changes: 1 addition & 1 deletion Sources/DistributedCluster/Cluster/Cluster+Event.swift
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ extension Cluster.MembershipChange: CustomStringConvertible {
if let replaced = self.replaced {
base = "[replaced:\(reflecting: replaced)] by \(reflecting: self.node)"
} else {
base = "\(self.node)"
base = "\(reflecting: self.node)"
}
return base +
" :: " +
Expand Down
50 changes: 40 additions & 10 deletions Sources/DistributedCluster/Cluster/Cluster+Membership.swift
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ extension Cluster {
///
/// This operation is guaranteed to return a member if it was added to the membership UNLESS the member has been `.removed`
/// and dropped which happens only after an extended period of time. // FIXME: That period of time is not implemented
public func uniqueMember(_ node: Cluster.Node) -> Cluster.Member? {
public func member(_ node: Cluster.Node) -> Cluster.Member? {
self._members[node]
}

/// Picks "first", in terms of least progressed among its lifecycle member in presence of potentially multiple members
/// for a non-unique `Node`. In practice, this happens when an existing node is superseded by a "replacement", and the
/// previous node becomes immediately down.
public func member(_ endpoint: Cluster.Endpoint) -> Cluster.Member? {
public func anyMember(forEndpoint endpoint: Cluster.Endpoint) -> Cluster.Member? {
self._members.values.sorted(by: Cluster.MemberStatus.lifecycleOrdering).first(where: { $0.node.endpoint == endpoint })
}

Expand All @@ -90,14 +90,22 @@ extension Cluster {
self._members.count
}

/// More efficient than using `members(atLeast:)` followed by a `.count`
/// More efficient than using ``members(atLeast:reachability:)`` followed by a `.count`
public func count(atLeast status: Cluster.MemberStatus) -> Int {
self._members.values
.lazy
.filter { member in status <= member.status }
.count
}

/// More efficient than using ``members(atMost:reachability:)`` followed by a `.count`
public func count(atMost status: Cluster.MemberStatus) -> Int {
self._members.values
.lazy
.filter { member in member.status <= status }
.count
}

/// More efficient than using `members(withStatus:)` followed by a `.count`
public func count(withStatus status: Cluster.MemberStatus) -> Int {
self._members.values
Expand Down Expand Up @@ -153,6 +161,13 @@ extension Cluster {
}
}

/// Returns all members that are part of this membership, and have ``Cluster/MemberStatus`` that is *at most*
/// the passed in `status` and `reachability`. See ``Cluster/MemberStatus`` to learn more about the meaning of "at most".
///
/// - Parameters:
/// - status: "at most" status for which to check the members for
/// - reachability: optional reachability that the members will be filtered by
/// - Returns: array of members matching those checks. Can be empty.
public func members(atMost status: Cluster.MemberStatus, reachability: Cluster.MemberReachability? = nil) -> [Cluster.Member] {
if status == .removed, reachability == nil {
return Array(self._members.values)
Expand Down Expand Up @@ -189,7 +204,7 @@ extension Cluster {
/// Certain actions can only be performed by the "leader" of a group.
public internal(set) var leader: Cluster.Member? {
get {
self._leaderNode.flatMap { self.uniqueMember($0) }
self._leaderNode.flatMap { self.member($0) }
}
set {
self._leaderNode = newValue?.node
Expand Down Expand Up @@ -223,6 +238,21 @@ extension Cluster {
}
}

extension Cluster.Membership: Sequence {
public struct Iterator: IteratorProtocol {
public typealias Element = Cluster.Member
internal var it: Dictionary<Cluster.Node, Cluster.Member>.Values.Iterator

public mutating func next() -> Cluster.Member? {
self.it.next()
}
}

public func makeIterator() -> Iterator {
.init(it: self._members.values.makeIterator())
}
}

// Implementation notes: Membership/Member equality
//
// Membership equality is special, as it manually DOES take into account the Member's states (status, reachability),
Expand Down Expand Up @@ -300,7 +330,7 @@ extension Cluster.Membership {
return self.removeCompletely(change.node)
}

if let knownUnique = self.uniqueMember(change.node) {
if let knownUnique = self.member(change.node) {
// it is known uniquely, so we just update its status
return self.mark(knownUnique.node, as: change.status)
}
Expand All @@ -311,7 +341,7 @@ extension Cluster.Membership {
return nil
}

if let previousMember = self.member(change.node.endpoint) {
if let previousMember = self.anyMember(forEndpoint: change.node.endpoint) {
// we are joining "over" an existing incarnation of a node; causing the existing node to become .down immediately
if previousMember.status < .down {
_ = self.mark(previousMember.node, as: .down)
Expand Down Expand Up @@ -398,7 +428,7 @@ extension Cluster.Membership {
///
/// If the membership not aware of this address the update is treated as a no-op.
public mutating func mark(_ node: Cluster.Node, as status: Cluster.MemberStatus) -> Cluster.MembershipChange? {
if let existingExactMember = self.uniqueMember(node) {
if let existingExactMember = self.member(node) {
guard existingExactMember.status < status else {
// this would be a "move backwards" which we do not do; membership only moves forward
return nil
Expand All @@ -412,7 +442,7 @@ extension Cluster.Membership {
self._members[existingExactMember.node] = updatedMember

return Cluster.MembershipChange(member: existingExactMember, toStatus: status)
} else if let beingReplacedMember = self.member(node.endpoint) {
} else if let beingReplacedMember = self.anyMember(forEndpoint: node.endpoint) {
// We did not get a member by exact Cluster.Node match, but we got one by Node match...
// this means this new node that we are trying to mark is a "replacement" and the `beingReplacedNode` must be .downed!

Expand Down Expand Up @@ -619,14 +649,14 @@ extension Cluster.Membership {
// TODO: diffing is not super well tested, may lose up numbers
static func _diff(from: Cluster.Membership, to: Cluster.Membership) -> MembershipDiff {
var entries: [Cluster.MembershipChange] = []
entries.reserveCapacity(max(from._members.count, to._members.count))
entries.reserveCapacity(Swift.max(from._members.count, to._members.count))

// TODO: can likely be optimized more
var to = to

// iterate over the original member set, and remove from the `to` set any seen members
for member in from._members.values {
if let toMember = to.uniqueMember(member.node) {
if let toMember = to.member(member.node) {
to._members.removeValue(forKey: member.node)
if member.status != toMember.status {
entries.append(.init(member: member, toStatus: toMember.status))
Expand Down
6 changes: 3 additions & 3 deletions Sources/DistributedCluster/Cluster/ClusterControl.swift
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ public struct ClusterControl {
}
}

guard let foundMember = membership.uniqueMember(node) else {
guard let foundMember = membership.member(node) else {
if status == .down || status == .removed {
// so we're seeing an already removed member, this can indeed happen and is okey
return Cluster.Member(node: node, status: .removed).asUnreachable
Expand Down Expand Up @@ -273,7 +273,7 @@ public struct ClusterControl {
@discardableResult
public func waitFor(_ endpoint: Cluster.Endpoint, _ status: Cluster.MemberStatus, within: Duration) async throws -> Cluster.Member? {
try await self.waitForMembershipEventually(Cluster.Member?.self, within: within) { membership in
guard let foundMember = membership.member(endpoint) else {
guard let foundMember = membership.anyMember(forEndpoint: endpoint) else {
if status == .down || status == .removed {
return nil
}
Expand Down Expand Up @@ -306,7 +306,7 @@ public struct ClusterControl {
}
}

guard let foundMember = membership.uniqueMember(node) else {
guard let foundMember = membership.member(node) else {
if atLeastStatus == .down || atLeastStatus == .removed {
// so we're seeing an already removed member, this can indeed happen and is okey
return Cluster.Member(node: node, status: .removed).asUnreachable
Expand Down
8 changes: 4 additions & 4 deletions Sources/DistributedCluster/Cluster/ClusterShell.swift
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ extension ClusterShell {
return self.retryHandshake(context, state, initiated: initiated)

case .failureDetectorReachabilityChanged(let node, let reachability):
guard let member = state.membership.uniqueMember(node) else {
guard let member = state.membership.member(node) else {
return .same // reachability change of unknown node
}
switch reachability {
Expand All @@ -505,8 +505,8 @@ extension ClusterShell {
case .shutdown(let receptacle):
return self.onShutdownCommand(context, state: state, signalOnceUnbound: receptacle)

case .downCommand(let node):
if let member = state.membership.member(node) {
case .downCommand(let endpoint):
if let member = state.membership.anyMember(forEndpoint: endpoint) {
return self.ready(state: self.onDownCommand(context, state: state, member: member))
} else {
return self.ready(state: state)
Expand Down Expand Up @@ -1173,7 +1173,7 @@ extension ClusterShell {
]
)

guard let myselfMember = state.membership.uniqueMember(myselfNode) else {
guard let myselfMember = state.membership.member(myselfNode) else {
state.log.error("Unable to find Cluster.Member for \(myselfNode) self node! This should not happen, please file an issue.")
return .same
}
Expand Down
4 changes: 2 additions & 2 deletions Sources/DistributedCluster/Cluster/ClusterShellState.swift
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ internal struct ClusterShellState: ReadOnlyClusterState {

let selfNode: Cluster.Node
var selfMember: Cluster.Member {
if let member = self.membership.uniqueMember(self.selfNode) {
if let member = self.membership.member(self.selfNode) {
return member
} else {
fatalError("""
Expand Down Expand Up @@ -391,7 +391,7 @@ extension ClusterShellState {
}

let change: Cluster.MembershipChange?
if let replacedMember = self.membership.member(handshake.remoteNode.endpoint) {
if let replacedMember = self.membership.anyMember(forEndpoint: handshake.remoteNode.endpoint) {
change = self.membership.applyMembershipChange(Cluster.MembershipChange(replaced: replacedMember, by: Cluster.Member(node: handshake.remoteNode, status: .joining)))
} else {
change = self.membership.applyMembershipChange(Cluster.MembershipChange(member: Cluster.Member(node: handshake.remoteNode, status: .joining)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,9 @@ internal distributed actor DowningStrategyShell {
func markAsDown(members: Set<Cluster.Member>) {
for member in members {
self.log.info(
"Decision to [.down] member [\(member)]!", metadata: self.metadata([
"Decide to [.down] member [\(member)]!", metadata: self.metadata([
"downing/node": "\(reflecting: member.node)",
"member/status/previous": "\(member.status)",
])
)
self.actorSystem.cluster.down(member: member)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ extension Cluster {
let causalRelation: VersionVector.CausalRelation = self.version.compareTo(incoming.version)

// 1.1) Protect the node from any gossip from a .down node (!), it cannot and MUST NOT be trusted.
let incomingGossipOwnerKnownLocally = self.membership.uniqueMember(incoming.owner)
guard let incomingOwnerMember = incoming.membership.uniqueMember(incoming.owner) else {
let incomingGossipOwnerKnownLocally = self.membership.member(incoming.owner)
guard let incomingOwnerMember = incoming.membership.member(incoming.owner) else {
return .init(causalRelation: causalRelation, effectiveChanges: [])
}
switch incomingGossipOwnerKnownLocally {
Expand All @@ -83,7 +83,7 @@ extension Cluster {
// 1.2) Protect from zombies: Any nodes that we know are dead or down, we should not accept any information from
let incomingConcurrentDownMembers = incoming.membership.members(atLeast: .down)
for pruneFromIncomingBeforeMerge in incomingConcurrentDownMembers
where self.membership.uniqueMember(pruneFromIncomingBeforeMerge.node) == nil
where self.membership.member(pruneFromIncomingBeforeMerge.node) == nil
{
_ = incoming.pruneMember(pruneFromIncomingBeforeMerge)
}
Expand Down
8 changes: 6 additions & 2 deletions Sources/DistributedCluster/Cluster/NodeDeathWatcher.swift
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,14 @@ enum NodeDeathWatcherShell {
}

case .membershipChange(let change) where change.isAtLeast(.down):
context.log.trace("Node down: \(change)!")
context.log.trace("Node down: \(change)!", metadata: [
"node": "\(reflecting: change.node)",
])
instance.handleAddressDown(change)
case .membershipChange(let change):
context.log.trace("Node change: \(change)!")
context.log.trace("Node change: \(change)!", metadata: [
"node": "\(reflecting: change.node)",
])
instance.onMembershipChanged(change)

default:
Expand Down
7 changes: 7 additions & 0 deletions Sources/DistributedCluster/Cluster/SWIM/SWIMActor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,13 @@ internal distributed actor SWIMActor: SWIMPeer, SWIMAddressablePeer, CustomStrin
)
}

/// If SWIM claims we are dead, ignore this; we should be informed about this in high-level gossip soon enough.
if change.status == .dead,
change.member.node.asClusterNode == self.id.node
{
return
}

let reachability: Cluster.MemberReachability
switch change.status {
case .alive, .suspect:
Expand Down
1 change: 1 addition & 0 deletions Sources/DistributedCluster/ClusterSystem.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,7 @@ extension ClusterSystem {

log.trace("Actor ready, well-known as: \(wellKnownName)", metadata: [
"actor/id": "\(actor.id)",
"actor/type": "\(type(of: actor))",
])

self._managedWellKnownDistributedActors[wellKnownName] = actor
Expand Down
2 changes: 1 addition & 1 deletion Sources/DistributedCluster/Docs.docc/Clustering.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ You can observe ``Cluster/Event``s emitted by `system.cluster.events` (``Cluster

There is also convenience APIs available on ``ClusterControl`` (`system.cluster`):
- ``ClusterControl/joined(endpoint:within:)`` which allows you to suspend until a specific node becomes ``Cluster/MemberStatus/joining`` in the cluster membership, or
- ``ClusterControl/waitFor(_:_:within:)-1xiqo`` which allows you to suspend until a node reaches a specific ``Cluster/MemberStatus``.
- ``ClusterControl/waitFor(_:_:within:)-2aq7r`` which allows you to suspend until a node reaches a specific ``Cluster/MemberStatus``.

### Automatic Node Discovery

Expand Down
3 changes: 0 additions & 3 deletions Sources/MultiNodeTestKit/MultiNodeTestConductor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,8 @@ extension MultiNodeTestConductor {
checkPoint: MultiNode.Checkpoint,
waitTime: Duration) async throws
{
self.actorSystem.log.warning("CHECKPOINT FROM [\(node)]")
try await RemoteCall.with(timeout: waitTime) {
self.actorSystem.log.warning("CHECKPOINT FROM [\(node)] INNER (\(__isRemoteActor(self)))")
try await self._enterCheckPoint(node: node, checkPoint: checkPoint)
self.actorSystem.log.warning("CHECKPOINT FROM [\(node)] DONE")
}
}

Expand Down
Loading

0 comments on commit 4934208

Please sign in to comment.