Skip to content

Commit

Permalink
swim instance can be struct :party:
Browse files Browse the repository at this point in the history
  • Loading branch information
ktoso committed Aug 2, 2022
1 parent 012efee commit 161e1f2
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 131 deletions.
2 changes: 1 addition & 1 deletion Sources/SWIM/SWIM.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ extension SWIM {
///
/// The ack may be delivered directly in a request-response fashion between the probing and pinged members,
/// or indirectly, as a result of a `pingRequest` message.
public enum PingResponse<Peer: SWIMPeer, PingRequestOrigin: SWIMPingRequestOriginPeer> {
public enum PingResponse<Peer: SWIMPeer, PingRequestOrigin: SWIMPingRequestOriginPeer>: Sendable {
/// - parameters:
/// - target: the target of the ping;
/// On the remote "pinged" node which is about to send an ack back to the ping origin this should be filled with the `myself` peer.
Expand Down
60 changes: 30 additions & 30 deletions Sources/SWIM/SWIMInstance.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ extension SWIM {
/// **Please refer to `SWIM` for an in-depth discussion of the algorithm and extensions implemented in this package.**
///
/// - SeeAlso: `SWIM` for a complete and in depth discussion of the protocol.
public final class Instance<Peer: SWIMPeer,
public struct Instance<Peer: SWIMPeer,
PingOrigin: SWIMPingOriginPeer,
PingRequestOrigin: SWIMPingRequestOriginPeer>: SWIMProtocol { // TODO: could this be a struct?

Expand Down Expand Up @@ -90,7 +90,7 @@ extension SWIM {
private var _sequenceNumber: SWIM.SequenceNumber = 0
/// Sequence numbers are used to identify messages and pair them up into request/replies.
/// - SeeAlso: `SWIM.SequenceNumber`
public func nextSequenceNumber() -> SWIM.SequenceNumber {
public mutating func nextSequenceNumber() -> SWIM.SequenceNumber {
// TODO: can we make it internal? it does not really hurt having public
// TODO: sequence numbers per-target node? https://github.com/apple/swift-cluster-membership/issues/39
self._sequenceNumber += 1
Expand Down Expand Up @@ -151,7 +151,7 @@ extension SWIM {
}
}

private func nextIncarnation() {
private mutating func nextIncarnation() {
self._incarnation += 1
}

Expand Down Expand Up @@ -184,7 +184,7 @@ extension SWIM {
/// Adjust the Local Health-aware Multiplier based on the event causing it.
///
/// - Parameter event: event which causes the LHM adjustment.
public func adjustLHMultiplier(_ event: LHModifierEvent) {
public mutating func adjustLHMultiplier(_ event: LHModifierEvent) {
defer {
self.settings.logger.trace("Adjusted LHM multiplier", metadata: [
"swim/lhm/event": "\(event)",
Expand Down Expand Up @@ -227,7 +227,7 @@ extension SWIM {
/// initial contact with a node - i.e. one can construct a peer to "there should be a peer on this host/port" to send an initial ping,
/// however in reply a peer in gossip must ALWAYS include it's unique identifier in the node - such that we know it from
/// any new instance of a process on the same host/port pair.
internal func addMember(_ peer: Peer, status: SWIM.Status) -> [AddMemberDirective] {
internal mutating func addMember(_ peer: Peer, status: SWIM.Status) -> [AddMemberDirective] {
var directives: [AddMemberDirective] = []

// Guard 1) protect against adding already known dead members
Expand Down Expand Up @@ -338,7 +338,7 @@ extension SWIM {
/// but in a round-robin fashion. Instead, a newly joining member is inserted in the membership list at
/// a position that is chosen uniformly at random. On completing a traversal of the entire list,
/// rearranges the membership list to a random reordering.
func nextPeerToPing() -> Peer? {
mutating func nextPeerToPing() -> Peer? {
if self.membersToPing.isEmpty {
return nil
}
Expand Down Expand Up @@ -370,7 +370,7 @@ extension SWIM {
}

/// Mark a specific peer/member with the new status.
func mark(_ peer: Peer, as status: SWIM.Status) -> MarkedDirective {
mutating func mark(_ peer: Peer, as status: SWIM.Status) -> MarkedDirective {
let previousStatusOption = self.status(of: peer)

var status = status
Expand Down Expand Up @@ -426,21 +426,21 @@ extension SWIM {
case applied(previousStatus: SWIM.Status?, member: SWIM.Member<Peer>)
}

private func resetGossipPayloads(member: SWIM.Member<Peer>) {
private mutating func resetGossipPayloads(member: SWIM.Member<Peer>) {
// seems we gained a new member, and we need to reset gossip counts in order to ensure it also receive information about all nodes
// TODO: this would be a good place to trigger a full state sync, to speed up convergence; see https://github.com/apple/swift-cluster-membership/issues/37
self.members.forEach { self.addToGossip(member: $0) }
}

func incrementProtocolPeriod() {
mutating func incrementProtocolPeriod() {
self._protocolPeriod += 1
}

func advanceMembersToPingIndex() {
mutating func advanceMembersToPingIndex() {
self._membersToPingIndex = (self._membersToPingIndex + 1) % self.membersToPing.count
}

func removeFromMembersToPing(_ member: SWIM.Member<Peer>) {
mutating func removeFromMembersToPing(_ member: SWIM.Member<Peer>) {
if let index = self.membersToPing.firstIndex(where: { $0.peer.node == member.peer.node }) {
self.membersToPing.remove(at: index)
if index < self.membersToPingIndex {
Expand Down Expand Up @@ -530,7 +530,7 @@ extension SWIM {
/// letting it know that it is being suspected, such that it can refute the suspicion as soon as possible,
/// if if still is alive.
/// - Returns: The gossip payload to be gossiped.
public func makeGossipPayload(to target: SWIMAddressablePeer?) -> SWIM.GossipPayload<Peer> {
public mutating func makeGossipPayload(to target: SWIMAddressablePeer?) -> SWIM.GossipPayload<Peer> {
var membersToGossipAbout: [SWIM.Member<Peer>] = []
// Lifeguard IV. Buddy System
// Always send to a suspect its suspicion.
Expand Down Expand Up @@ -588,7 +588,7 @@ extension SWIM {
}

/// Adds `Member` to gossip messages.
internal func addToGossip(member: SWIM.Member<Peer>) {
internal mutating func addToGossip(member: SWIM.Member<Peer>) {
// we need to remove old state before we add the new gossip, so we don't gossip out stale state
self._messagesToGossip.remove(where: { $0.member.peer.node == member.peer.node })
self._messagesToGossip.append(.init(member: member, numberOfTimesGossiped: 0))
Expand Down Expand Up @@ -730,7 +730,7 @@ extension SWIM.Instance {
// ==== ------------------------------------------------------------------------------------------------------------
// MARK: On Periodic Ping Tick Handler

public func onPeriodicPingTick() -> [PeriodicPingTickDirective] {
public mutating func onPeriodicPingTick() -> [PeriodicPingTickDirective] {
defer {
self.incrementProtocolPeriod()
}
Expand Down Expand Up @@ -776,7 +776,7 @@ extension SWIM.Instance {
/// Check all suspects if any of them have been suspect for long enough that we should promote them to unreachable or dead.
///
/// Suspicion timeouts are calculated taking into account the number of peers suspecting a given member (LHA-Suspicion).
private func checkSuspicionTimeouts() -> [PeriodicPingTickDirective] {
private mutating func checkSuspicionTimeouts() -> [PeriodicPingTickDirective] {
var directives: [PeriodicPingTickDirective] = []

for suspect in self.suspects {
Expand Down Expand Up @@ -817,7 +817,7 @@ extension SWIM.Instance {
// ==== ------------------------------------------------------------------------------------------------------------
// MARK: On Ping Handler

public func onPing(pingOrigin: PingOrigin, payload: SWIM.GossipPayload<Peer>, sequenceNumber: SWIM.SequenceNumber) -> [PingDirective] {
public mutating func onPing(pingOrigin: PingOrigin, payload: SWIM.GossipPayload<Peer>, sequenceNumber: SWIM.SequenceNumber) -> [PingDirective] {
var directives: [PingDirective]

// 1) Process gossip
Expand Down Expand Up @@ -864,7 +864,7 @@ extension SWIM.Instance {
// ==== ------------------------------------------------------------------------------------------------------------
// MARK: On Ping Response Handlers

public func onPingResponse(response: SWIM.PingResponse<Peer, PingRequestOrigin>, pingRequestOrigin: PingRequestOrigin?, pingRequestSequenceNumber: SWIM.SequenceNumber?) -> [PingResponseDirective] {
mutating public func onPingResponse(response: SWIM.PingResponse<Peer, PingRequestOrigin>, pingRequestOrigin: PingRequestOrigin?, pingRequestSequenceNumber: SWIM.SequenceNumber?) -> [PingResponseDirective] {
switch response {
case .ack(let target, let incarnation, let payload, let sequenceNumber):
return self.onPingAckResponse(target: target, incarnation: incarnation, payload: payload, pingRequestOrigin: pingRequestOrigin, pingRequestSequenceNumber: pingRequestSequenceNumber, sequenceNumber: sequenceNumber)
Expand All @@ -875,7 +875,7 @@ extension SWIM.Instance {
}
}

func onPingAckResponse(
mutating func onPingAckResponse(
target pingedNode: Peer,
incarnation: SWIM.Incarnation,
payload: SWIM.GossipPayload<Peer>,
Expand Down Expand Up @@ -917,7 +917,7 @@ extension SWIM.Instance {
return directives
}

func onPingNackResponse(
mutating func onPingNackResponse(
target pingedNode: Peer,
pingRequestOrigin: PingRequestOrigin?,
sequenceNumber: SWIM.SequenceNumber
Expand All @@ -935,7 +935,7 @@ extension SWIM.Instance {
return []
}

func onPingResponseTimeout(
mutating func onPingResponseTimeout(
target: Peer,
timeout: Duration,
pingRequestOrigin: PingRequestOrigin?,
Expand Down Expand Up @@ -983,7 +983,7 @@ extension SWIM.Instance {
}

/// Prepare ping request directives such that the shell can easily fire those messages
func preparePingRequests(target: Peer) -> SendPingRequestDirective? {
mutating func preparePingRequests(target: Peer) -> SendPingRequestDirective? {
guard let lastKnownStatus = self.status(of: target) else {
// context.log.info("Skipping ping requests after failed ping to [\(toPing)] because node has been removed from member list") // FIXME allow logging
return nil
Expand Down Expand Up @@ -1080,7 +1080,7 @@ extension SWIM.Instance {
// ==== ------------------------------------------------------------------------------------------------------------
// MARK: On Ping Request

public func onPingRequest(
public mutating func onPingRequest(
target: Peer,
pingRequestOrigin: PingRequestOrigin,
payload: SWIM.GossipPayload<Peer>,
Expand Down Expand Up @@ -1161,7 +1161,7 @@ extension SWIM.Instance {
// MARK: On Ping Request Response

/// This should be called on first successful (non-nack) pingRequestResponse
public func onPingRequestResponse(_ response: SWIM.PingResponse<Peer, PingRequestOrigin>, pinged pingedPeer: Peer) -> [PingRequestResponseDirective] {
public mutating func onPingRequestResponse(_ response: SWIM.PingResponse<Peer, PingRequestOrigin>, pinged pingedPeer: Peer) -> [PingRequestResponseDirective] {
guard let previousStatus = self.status(of: pingedPeer) else {
// we do not process replies from an unknown member; it likely means we have removed it already for some reason.
return [.unknownMember]
Expand Down Expand Up @@ -1215,7 +1215,7 @@ extension SWIM.Instance {
}
}

public func onEveryPingRequestResponse(_ result: SWIM.PingResponse<Peer, PingRequestOrigin>, pinged peer: Peer) -> [PingRequestResponseDirective] {
public mutating func onEveryPingRequestResponse(_ result: SWIM.PingResponse<Peer, PingRequestOrigin>, pinged peer: Peer) -> [PingRequestResponseDirective] {
switch result {
case .timeout:
// Failed pingRequestResponse indicates a missed nack, we should adjust LHMultiplier
Expand Down Expand Up @@ -1253,7 +1253,7 @@ extension SWIM.Instance {
case ignoredDueToOlderStatus(currentStatus: SWIM.Status)
}

internal func onGossipPayload(_ payload: SWIM.GossipPayload<Peer>) -> [GossipProcessedDirective] {
internal mutating func onGossipPayload(_ payload: SWIM.GossipPayload<Peer>) -> [GossipProcessedDirective] {
switch payload {
case .none:
return []
Expand All @@ -1264,7 +1264,7 @@ extension SWIM.Instance {
}
}

internal func onGossipPayload(about member: SWIM.Member<Peer>) -> [GossipProcessedDirective] {
internal mutating func onGossipPayload(about member: SWIM.Member<Peer>) -> [GossipProcessedDirective] {
if self.isMyself(member) {
return [self.onMyselfGossipPayload(myself: member)]
} else {
Expand All @@ -1275,7 +1275,7 @@ extension SWIM.Instance {
/// ### Unreachability status handling
/// Performs all special handling of `.unreachable` such that if it is disabled members are automatically promoted to `.dead`.
/// See `settings.unreachability` for more details.
private func onMyselfGossipPayload(myself incoming: SWIM.Member<Peer>) -> GossipProcessedDirective {
private mutating func onMyselfGossipPayload(myself incoming: SWIM.Member<Peer>) -> GossipProcessedDirective {
assert(
self.peer.node == incoming.peer.node,
"""
Expand Down Expand Up @@ -1358,7 +1358,7 @@ extension SWIM.Instance {
/// ### Unreachability status handling
/// Performs all special handling of `.unreachable` such that if it is disabled members are automatically promoted to `.dead`.
/// See `settings.unreachability` for more details.
private func onOtherMemberGossipPayload(member: SWIM.Member<Peer>) -> [GossipProcessedDirective] {
private mutating func onOtherMemberGossipPayload(member: SWIM.Member<Peer>) -> [GossipProcessedDirective] {
assert(self.node != member.node, "Attempted to process gossip as-if not-myself, but WAS same peer, was: \(member). Myself: \(self.peer, orElse: "nil")")

guard self.isMember(member.peer) else {
Expand Down Expand Up @@ -1427,7 +1427,7 @@ extension SWIM.Instance {
// ==== ------------------------------------------------------------------------------------------------------------
// MARK: Confirm Dead

public func confirmDead(peer: Peer) -> ConfirmDeadDirective {
public mutating func confirmDead(peer: Peer) -> ConfirmDeadDirective {
if self.member(for: peer) == nil,
self._members.first(where: { $0.key == peer.node }) == nil {
return .ignored // this peer is absolutely unknown to us, we should not even emit events about it
Expand Down Expand Up @@ -1463,7 +1463,7 @@ extension SWIM.Instance {
return self.removedDeadMemberTombstones.contains(.init(uid: uid, deadlineProtocolPeriod: anythingAsNotTakenIntoAccountInEquality))
}

private func cleanupTombstones() { // time to cleanup the tombstones
private mutating func cleanupTombstones() { // time to cleanup the tombstones
self.removedDeadMemberTombstones = self.removedDeadMemberTombstones.filter {
// keep the ones where their deadline is still in the future
self.protocolPeriod < $0.deadlineProtocolPeriod
Expand Down
2 changes: 1 addition & 1 deletion Sources/SWIMNIOExample/SWIMNIOShell.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//===----------------------------------------------------------------------===//

import ClusterMembership
@preconcurrency import struct Dispatch.DispatchTime
import struct Dispatch.DispatchTime
import Logging
import NIO
import SWIM
Expand Down
2 changes: 1 addition & 1 deletion Tests/SWIMNIOExampleTests/SWIMNIOEmbeddedTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//===----------------------------------------------------------------------===//

import ClusterMembership
@preconcurrency import struct Dispatch.DispatchTime
import struct Dispatch.DispatchTime
import enum Dispatch.DispatchTimeInterval
import NIO
@testable import SWIM
Expand Down
Loading

0 comments on commit 161e1f2

Please sign in to comment.