Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert APIs to async #84

Merged
merged 21 commits into from
Aug 3, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions Package.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// swift-tools-version:5.2
// swift-tools-version:5.7
// The swift-tools-version declares the minimum version of Swift required to build this package.

import class Foundation.ProcessInfo
Expand Down Expand Up @@ -102,7 +102,7 @@ var targets: [PackageDescription.Target] = [
// ==== ------------------------------------------------------------------------------------------------------------
// MARK: Integration Tests - `it_` prefixed

.target(
.executableTarget(
name: "it_Clustered_swim_suspension_reachability",
dependencies: [
"SWIM",
Expand All @@ -116,15 +116,13 @@ var targets: [PackageDescription.Target] = [
]

var dependencies: [Package.Dependency] = [
.package(url: "https://github.com/apple/swift-nio.git", from: "2.19.0"),
.package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.8.0"),
.package(url: "https://github.com/apple/swift-nio.git", from: "2.19.0"),
.package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.8.0"),
.package(url: "https://github.com/apple/swift-nio-extras.git", from: "1.5.1"),

// ~~~ SSWG APIs ~~~

.package(url: "https://github.com/apple/swift-log.git", from: "1.4.0"),
// swift-metrics 1.x and 2.x are almost API compatible, so most clients should use
.package(url: "https://github.com/apple/swift-metrics.git", "1.0.0" ..< "3.0.0"),
.package(url: "https://github.com/apple/swift-metrics.git", "2.3.2" ..< "3.0.0"), // since latest
]

let products: [PackageDescription.Product] = [
Expand All @@ -144,6 +142,12 @@ let products: [PackageDescription.Product] = [

var package = Package(
name: "swift-cluster-membership",
platforms: [
.macOS(.v13),
.iOS(.v16),
.tvOS(.v16),
.watchOS(.v9),
],
products: products,

dependencies: dependencies,
Expand Down
7 changes: 3 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,8 @@ public protocol SWIMPeer: SWIMAddressablePeer {
payload: SWIM.GossipPayload,
from origin: SWIMPingOriginPeer,
timeout: DispatchTimeInterval,
sequenceNumber: SWIM.SequenceNumber,
onResponse: @escaping (Result<SWIM.PingResponse, Error>) -> Void
)
sequenceNumber: SWIM.SequenceNumber
) async throws -> SWIM.PingResponse

// ...
}
Expand All @@ -73,7 +72,7 @@ Which usually means wrapping some connection, channel, or other identity with th

Then, on the receiving end of a peer, one has to implement receiving those messages and invoke all the corresponding `on<SomeMessage>(...)` callbacks defined on the `SWIM.Instance` (grouped under [SWIMProtocol](https://github.com/apple/swift-cluster-membership/blob/main/Sources/SWIM/SWIMInstance.swift#L24-L85)).

A piece of the SWIMProtocol is liste below to give you an idea about it:
A piece of the SWIMProtocol is listed below to give you an idea about it:


```swift
Expand Down
4 changes: 2 additions & 2 deletions Sources/ClusterMembership/Node.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@
///
/// Generally the node represents "some node we want to contact" if the `uid` is not set,
/// and if the `uid` is available "the specific instance of a node".
public struct Node: Hashable, Comparable, CustomStringConvertible {
public struct Node: Hashable, Sendable, Comparable, CustomStringConvertible {
/// Protocol that can be used to contact this node;
/// Does not have to be a formal protocol name and may be "swim" or a name which is understood by a membership implementation.
public var `protocol`: String
public var name: String?
public var host: String
public var port: Int

public var uid: UInt64?
public internal(set) var uid: UInt64?

public init(protocol: String, host: String, port: Int, uid: UInt64?) {
self.protocol = `protocol`
Expand Down
8 changes: 3 additions & 5 deletions Sources/SWIM/Events.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,15 @@
//===----------------------------------------------------------------------===//

import ClusterMembership
import struct Dispatch.DispatchTime
import enum Dispatch.DispatchTimeInterval

extension SWIM {
/// Emitted whenever a membership change happens.
///
/// Use `isReachabilityChange` to detect whether the is a change from an alive to unreachable/dead state or not,
/// and is worth emitting to user-code or not.
public struct MemberStatusChangedEvent: Equatable {
public struct MemberStatusChangedEvent<Peer: SWIMPeer>: Equatable {
/// The member that this change event is about.
public let member: SWIM.Member
public let member: SWIM.Member<Peer>

/// The resulting ("current") status of the `member`.
public var status: SWIM.Status {
Expand All @@ -38,7 +36,7 @@ extension SWIM {
public let previousStatus: SWIM.Status?

/// Create new event, representing a change of the member's status from a previous state to its current state.
public init(previousStatus: SWIM.Status?, member: SWIM.Member) {
public init(previousStatus: SWIM.Status?, member: SWIM.Member<Peer>) {
if let from = previousStatus, from == .dead {
precondition(member.status == .dead, "Change MUST NOT move status 'backwards' from [.dead] state to anything else, but did so, was: \(member)")
}
Expand Down
10 changes: 5 additions & 5 deletions Sources/SWIM/Member.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//===----------------------------------------------------------------------===//

import ClusterMembership
import struct Dispatch.DispatchTime
@preconcurrency import struct Dispatch.DispatchTime

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: SWIM Member
Expand All @@ -22,11 +22,11 @@ extension SWIM {
/// A `SWIM.Member` represents an active participant of the cluster.
///
/// It associates a specific `SWIMAddressablePeer` with its `SWIM.Status` and a number of other SWIM specific state information.
public struct Member {
public struct Member<Peer: SWIMPeer>: Sendable {
/// Peer reference, used to send messages to this cluster member.
///
/// Can represent the "local" member as well, use `swim.isMyself` to verify if a peer is `myself`.
public var peer: SWIMPeer
public var peer: Peer

/// `Node` of the member's `peer`.
public var node: ClusterMembership.Node {
Expand All @@ -46,7 +46,7 @@ extension SWIM {
public let localSuspicionStartedAt: DispatchTime? // could be "status updated at"?

/// Create a new member.
public init(peer: SWIMPeer, status: SWIM.Status, protocolPeriod: UInt64, suspicionStartedAt: DispatchTime? = nil) {
public init(peer: Peer, status: SWIM.Status, protocolPeriod: UInt64, suspicionStartedAt: DispatchTime? = nil) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to change suspicionStartedAt to an Instant type instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I seem to remember instant isn't great for this but maybe I'm wrong...?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ticketified #87

self.peer = peer
self.status = status
self.protocolPeriod = protocolPeriod
Expand Down Expand Up @@ -85,7 +85,7 @@ extension SWIM {

/// Manual Hashable conformance since we omit `suspicionStartedAt` from identity
extension SWIM.Member: Hashable, Equatable {
public static func == (lhs: SWIM.Member, rhs: SWIM.Member) -> Bool {
public static func == (lhs: SWIM.Member<Peer>, rhs: SWIM.Member<Peer>) -> Bool {
lhs.peer.node == rhs.peer.node &&
lhs.protocolPeriod == rhs.protocolPeriod &&
lhs.status == rhs.status
Expand Down
2 changes: 1 addition & 1 deletion Sources/SWIM/Metrics.swift
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ extension SWIM {

extension SWIM.Metrics {
/// Update member metrics metrics based on SWIM's membership.
public func updateMembership(_ members: SWIM.Membership) {
public func updateMembership(_ members: SWIM.Membership<some SWIMPeer>) {
var alives = 0
var suspects = 0
var unreachables = 0
Expand Down
71 changes: 41 additions & 30 deletions Sources/SWIM/Peer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the Swift Cluster Membership open source project
//
// Copyright (c) 2020 Apple Inc. and the Swift Cluster Membership project authors
// Copyright (c) 2020-2022 Apple Inc. and the Swift Cluster Membership project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
yim-lee marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -13,17 +13,23 @@
//===----------------------------------------------------------------------===//

import ClusterMembership
import struct Dispatch.DispatchTime
import enum Dispatch.DispatchTimeInterval

/// Any peer in the cluster, can be used used to identify a peer using its unique node that it represents.
public protocol SWIMAddressablePeer {
public protocol SWIMAddressablePeer: Sendable {
/// Node that this peer is representing.
var node: ClusterMembership.Node { get }
nonisolated var swimNode: ClusterMembership.Node { get }
}

extension SWIMAddressablePeer {
internal var node: ClusterMembership.Node {
self.swimNode
}
}

/// SWIM A peer which originated a `ping`, should be replied to with an `ack`.
public protocol SWIMPingOriginPeer: SWIMAddressablePeer {
associatedtype Peer: SWIMPeer

/// Acknowledge a `ping`.
///
/// - parameters:
Expand All @@ -35,14 +41,16 @@ public protocol SWIMPingOriginPeer: SWIMAddressablePeer {
/// It is already trimmed to be no larger than configured in `SWIM.Settings`.
func ack(
acknowledging sequenceNumber: SWIM.SequenceNumber,
target: SWIMPeer,
target: Peer,
incarnation: SWIM.Incarnation,
payload: SWIM.GossipPayload
)
payload: SWIM.GossipPayload<Peer>
) async throws
ktoso marked this conversation as resolved.
Show resolved Hide resolved
}

/// A SWIM peer which originated a `pingRequest` and thus can receive either an `ack` or `nack` from the intermediary.
public protocol SWIMPingRequestOriginPeer: SWIMPingOriginPeer {
associatedtype NackTarget: SWIMPeer

/// "Negative acknowledge" a ping.
///
/// This message may ONLY be send in an indirect-ping scenario from the "middle" peer.
Expand All @@ -55,32 +63,36 @@ public protocol SWIMPingRequestOriginPeer: SWIMPingOriginPeer {
/// - target: the target peer which was attempted to be pinged but we didn't get an ack from it yet and are sending a nack back eagerly
func nack(
acknowledging sequenceNumber: SWIM.SequenceNumber,
target: SWIMPeer
)
target: NackTarget
) async throws
}

/// SWIM peer which can be initiated contact with, by sending ping or ping request messages.
public protocol SWIMPeer: SWIMAddressablePeer {
associatedtype Peer: SWIMPeer
associatedtype PingOrigin: SWIMPingOriginPeer
associatedtype PingRequestOrigin: SWIMPingRequestOriginPeer

/// Perform a probe of this peer by sending a `ping` message.
///
/// We expect the reply to be an `ack`, upon which the `onResponse`
/// We expect the reply to be an `ack`.
///
/// - parameters:
/// - payload: additional gossip information to be processed by the recipient
/// - origin: the origin peer that has initiated this ping message (i.e. "myself" of the sender)
/// replies (`ack`s) from to this ping should be send to this peer
/// - timeout: timeout during which we expect the other peer to have replied to us with a `PingResponse` about the pinged node.
/// If we get no response about that peer in that time, this `ping` is considered failed, and the onResponse MUST be invoked with a `.timeout`.
/// - onResponse: must be invoked when the a corresponding reply (`ack`) or `timeout` event for this ping occurs.
/// No guarantees about concurrency or threading are made with regards to where/how this invocation will take place,
/// so implementation shells may want to hop to the right executor or protect their state using some other way when before handling the response.
///
/// - Returns the corresponding reply (`ack`) or `timeout` event for this ping request occurs.
///
/// - Throws if the ping fails or if the reply is `nack`.
func ping(
payload: SWIM.GossipPayload,
from origin: SWIMPingOriginPeer,
timeout: DispatchTimeInterval,
sequenceNumber: SWIM.SequenceNumber,
onResponse: @escaping (Result<SWIM.PingResponse, Error>) -> Void
)
payload: SWIM.GossipPayload<Peer>,
from origin: PingOrigin,
timeout: Duration,
sequenceNumber: SWIM.SequenceNumber
) async throws -> SWIM.PingResponse<Peer, PingRequestOrigin>

/// Send a ping request to this peer, asking it to perform an "indirect ping" of the target on our behalf.
///
Expand All @@ -95,15 +107,14 @@ public protocol SWIMPeer: SWIMAddressablePeer {
/// replies (`ack`s) from this indirect ping should be forwarded to it.
/// - timeout: timeout during which we expect the other peer to have replied to us with a `PingResponse` about the pinged node.
/// If we get no response about that peer in that time, this `pingRequest` is considered failed, and the onResponse MUST be invoked with a `.timeout`.
/// - onResponse: must be invoked when the a corresponding reply (ack, nack) or timeout event for this ping request occurs.
/// No guarantees about concurrency or threading are made with regards to where/how this invocation will take place,
/// so implementation shells may want to hop to the right executor or protect their state using some other way when before handling the response.
///
/// - Returns the corresponding reply (`ack`, `nack`) or `timeout` event for this ping request occurs.
/// - Throws if the ping request fails
func pingRequest(
target: SWIMPeer,
payload: SWIM.GossipPayload,
from origin: SWIMPingRequestOriginPeer,
timeout: DispatchTimeInterval,
sequenceNumber: SWIM.SequenceNumber,
onResponse: @escaping (Result<SWIM.PingResponse, Error>) -> Void
)
target: Peer,
payload: SWIM.GossipPayload<Peer>,
from origin: PingOrigin,
timeout: Duration,
sequenceNumber: SWIM.SequenceNumber
) async throws -> SWIM.PingResponse<Peer, PingRequestOrigin>
}
19 changes: 9 additions & 10 deletions Sources/SWIM/SWIM.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import ClusterMembership
import struct Dispatch.DispatchTime
import enum Dispatch.DispatchTimeInterval

extension SWIM {
/// Incarnation numbers serve as sequence number and used to determine which observation
Expand All @@ -26,23 +25,23 @@ extension SWIM {
public typealias SequenceNumber = UInt32

/// Typealias for the underlying membership representation.
public typealias Membership = Dictionary<Node, SWIM.Member>.Values
public typealias Membership<Peer: SWIMPeer> = Dictionary<Node, SWIM.Member<Peer>>.Values
}

extension SWIM {
/// Message sent in reply to a `.ping`.
///
/// The ack may be delivered directly in a request-response fashion between the probing and pinged members,
/// or indirectly, as a result of a `pingRequest` message.
public enum PingResponse {
public enum PingResponse<Peer: SWIMPeer, PingRequestOrigin: SWIMPingRequestOriginPeer>: Sendable {
/// - parameters:
/// - target: the target of the ping;
/// On the remote "pinged" node which is about to send an ack back to the ping origin this should be filled with the `myself` peer.
/// - incarnation: the incarnation of the peer sent in the `target` field
/// - payload: additional gossip data to be carried with the message.
/// - sequenceNumber: the `sequenceNumber` of the `ping` message this ack is a "reply" for;
/// It is used on the ping origin to co-relate the reply with its handling code.
case ack(target: SWIMPeer, incarnation: Incarnation, payload: GossipPayload, sequenceNumber: SWIM.SequenceNumber)
case ack(target: Peer, incarnation: Incarnation, payload: GossipPayload<Peer>, sequenceNumber: SWIM.SequenceNumber)

/// A `.nack` MAY ONLY be sent by an *intermediary* member which was received a `pingRequest` to perform a `ping` of some `target` member.
/// It SHOULD NOT be sent by a peer that received a `.ping` directly.
Expand All @@ -63,7 +62,7 @@ extension SWIM {
/// - payload: The gossip payload to be carried in this message.
///
/// - SeeAlso: Lifeguard IV.A. Local Health Aware Probe
case nack(target: SWIMPeer, sequenceNumber: SWIM.SequenceNumber)
case nack(target: Peer, sequenceNumber: SWIM.SequenceNumber)

/// This is a "pseudo-message", in the sense that it is not transported over the wire, but should be triggered
/// and fired into an implementation Shell when a ping has timed out.
Expand All @@ -82,7 +81,7 @@ extension SWIM {
/// In case of "cancelled" operations or similar semantics it is allowed to use a placeholder value here.
/// - sequenceNumber: the `sequenceNumber` of the `ping` message this ack is a "reply" for;
/// It is used on the ping origin to co-relate the reply with its handling code.
case timeout(target: SWIMPeer, pingRequestOrigin: SWIMPingRequestOriginPeer?, timeout: DispatchTimeInterval, sequenceNumber: SWIM.SequenceNumber)
case timeout(target: Peer, pingRequestOrigin: PingRequestOrigin?, timeout: Duration, sequenceNumber: SWIM.SequenceNumber)

/// Sequence number of the initial request this is a response to.
/// Used to pair up responses to the requests which initially caused them.
Expand All @@ -108,23 +107,23 @@ extension SWIM {
/// A piece of "gossip" about a specific member of the cluster.
///
/// A gossip will only be spread a limited number of times, as configured by `settings.gossip.gossipedEnoughTimes(_:members:)`.
public struct Gossip: Equatable {
public struct Gossip<Peer: SWIMPeer>: Equatable {
/// The specific member (including status) that this gossip is about.
///
/// A change in member status implies a new gossip must be created and the count for the rumor mongering must be reset.
public let member: SWIM.Member
public let member: SWIM.Member<Peer>
/// The number of times this specific gossip message was gossiped to another peer.
public internal(set) var numberOfTimesGossiped: Int
}

/// A `GossipPayload` is used to spread gossips about members.
public enum GossipPayload {
public enum GossipPayload<Peer: SWIMPeer>: Sendable {
/// Explicit case to signal "no gossip payload"
///
/// Effectively equivalent to an empty `.membership([])` case.
case none
/// Gossip information about a few select members.
case membership([SWIM.Member])
case membership([SWIM.Member<Peer>])
}
}

Expand Down
Loading