Skip to content

Commit

Permalink
Convert APIs to async
Browse files Browse the repository at this point in the history
Change `ping` and `pingRequest` to async and remove `onResponse` callback parameter.

This change requires Swift 5.5.
  • Loading branch information
yim-lee committed Jun 19, 2022
1 parent 91efaf0 commit eaca089
Show file tree
Hide file tree
Showing 10 changed files with 465 additions and 237 deletions.
14 changes: 10 additions & 4 deletions Package.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// swift-tools-version:5.2
// swift-tools-version:5.5
// The swift-tools-version declares the minimum version of Swift required to build this package.

import class Foundation.ProcessInfo
Expand Down Expand Up @@ -102,7 +102,7 @@ var targets: [PackageDescription.Target] = [
// ==== ------------------------------------------------------------------------------------------------------------
// MARK: Integration Tests - `it_` prefixed

.target(
.executableTarget(
name: "it_Clustered_swim_suspension_reachability",
dependencies: [
"SWIM",
Expand All @@ -116,8 +116,8 @@ var targets: [PackageDescription.Target] = [
]

var dependencies: [Package.Dependency] = [
.package(url: "https://github.com/apple/swift-nio.git", from: "2.19.0"),
.package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.8.0"),
.package(url: "https://github.com/apple/swift-nio.git", from: "2.19.0"),
.package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.8.0"),
.package(url: "https://github.com/apple/swift-nio-extras.git", from: "1.5.1"),

// ~~~ SSWG APIs ~~~
Expand All @@ -144,6 +144,12 @@ let products: [PackageDescription.Product] = [

var package = Package(
name: "swift-cluster-membership",
platforms: [
.macOS(.v10_15),
.iOS(.v13),
.tvOS(.v13),
.watchOS(.v6),
],
products: products,

dependencies: dependencies,
Expand Down
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,4 +196,3 @@ We look forward to hear from you.
Please refer to [CONTRIBUTING](CONTRIBUTING.md) guide to learn about the process of submitting pull requests,
and refer to the [HANDBOOK](HANDBOOK.md) for terminology and other useful tips for working with this library.


33 changes: 16 additions & 17 deletions Sources/SWIM/Peer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the Swift Cluster Membership open source project
//
// Copyright (c) 2020 Apple Inc. and the Swift Cluster Membership project authors
// Copyright (c) 2020-2022 Apple Inc. and the Swift Cluster Membership project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
Expand All @@ -19,7 +19,7 @@ import enum Dispatch.DispatchTimeInterval
/// Any peer in the cluster, can be used used to identify a peer using its unique node that it represents.
public protocol SWIMAddressablePeer {
/// Node that this peer is representing.
var node: ClusterMembership.Node { get }
nonisolated var node: ClusterMembership.Node { get }
}

/// SWIM A peer which originated a `ping`, should be replied to with an `ack`.
Expand All @@ -38,7 +38,7 @@ public protocol SWIMPingOriginPeer: SWIMAddressablePeer {
target: SWIMPeer,
incarnation: SWIM.Incarnation,
payload: SWIM.GossipPayload
)
) async throws
}

/// A SWIM peer which originated a `pingRequest` and thus can receive either an `ack` or `nack` from the intermediary.
Expand All @@ -56,31 +56,31 @@ public protocol SWIMPingRequestOriginPeer: SWIMPingOriginPeer {
func nack(
acknowledging sequenceNumber: SWIM.SequenceNumber,
target: SWIMPeer
)
) async throws
}

/// SWIM peer which can be initiated contact with, by sending ping or ping request messages.
public protocol SWIMPeer: SWIMAddressablePeer {
/// Perform a probe of this peer by sending a `ping` message.
///
/// We expect the reply to be an `ack`, upon which the `onResponse`
/// We expect the reply to be an `ack`.
///
/// - parameters:
/// - payload: additional gossip information to be processed by the recipient
/// - origin: the origin peer that has initiated this ping message (i.e. "myself" of the sender)
/// replies (`ack`s) from to this ping should be send to this peer
/// - timeout: timeout during which we expect the other peer to have replied to us with a `PingResponse` about the pinged node.
/// If we get no response about that peer in that time, this `ping` is considered failed, and the onResponse MUST be invoked with a `.timeout`.
/// - onResponse: must be invoked when the a corresponding reply (`ack`) or `timeout` event for this ping occurs.
/// No guarantees about concurrency or threading are made with regards to where/how this invocation will take place,
/// so implementation shells may want to hop to the right executor or protect their state using some other way when before handling the response.
///
/// - Returns the corresponding reply (`ack`) or `timeout` event for this ping request occurs.
///
/// - Throws if the ping fails or if the reply is `nack`.
func ping(
payload: SWIM.GossipPayload,
from origin: SWIMPingOriginPeer,
timeout: DispatchTimeInterval,
sequenceNumber: SWIM.SequenceNumber,
onResponse: @escaping (Result<SWIM.PingResponse, Error>) -> Void
)
sequenceNumber: SWIM.SequenceNumber
) async throws -> SWIM.PingResponse

/// Send a ping request to this peer, asking it to perform an "indirect ping" of the target on our behalf.
///
Expand All @@ -95,15 +95,14 @@ public protocol SWIMPeer: SWIMAddressablePeer {
/// replies (`ack`s) from this indirect ping should be forwarded to it.
/// - timeout: timeout during which we expect the other peer to have replied to us with a `PingResponse` about the pinged node.
/// If we get no response about that peer in that time, this `pingRequest` is considered failed, and the onResponse MUST be invoked with a `.timeout`.
/// - onResponse: must be invoked when the a corresponding reply (ack, nack) or timeout event for this ping request occurs.
/// No guarantees about concurrency or threading are made with regards to where/how this invocation will take place,
/// so implementation shells may want to hop to the right executor or protect their state using some other way when before handling the response.
///
/// - Returns the corresponding reply (`ack`, `nack`) or `timeout` event for this ping request occurs.
/// - Throws if the ping request fails
func pingRequest(
target: SWIMPeer,
payload: SWIM.GossipPayload,
from origin: SWIMPingRequestOriginPeer,
timeout: DispatchTimeInterval,
sequenceNumber: SWIM.SequenceNumber,
onResponse: @escaping (Result<SWIM.PingResponse, Error>) -> Void
)
sequenceNumber: SWIM.SequenceNumber
) async throws -> SWIM.PingResponse
}
15 changes: 7 additions & 8 deletions Sources/SWIMNIOExample/Coding.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the Swift Cluster Membership open source project
//
// Copyright (c) 2020 Apple Inc. and the Swift Cluster Membership project authors
// Copyright (c) 2020-2022 Apple Inc. and the Swift Cluster Membership project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
Expand Down Expand Up @@ -106,20 +106,20 @@ extension SWIM.Message: Codable {
}

extension CodingUserInfoKey {
static let channelUserInfoKey: CodingUserInfoKey = CodingUserInfoKey(rawValue: "nio_peer_channel")!
static let channelUserInfoKey = CodingUserInfoKey(rawValue: "nio_peer_channel")!
}

extension SWIM.NIOPeer: Codable {
public init(from decoder: Decoder) throws {
public convenience init(from decoder: Decoder) throws {
let container = try decoder.singleValueContainer()
self.node = try container.decode(Node.self)
let node = try container.decode(Node.self)
guard let channel = decoder.userInfo[.channelUserInfoKey] as? Channel else {
fatalError("Expected channelUserInfoKey to be present in userInfo, unable to decode SWIM.NIOPeer!")
}
self.channel = channel
self.init(node: node, channel: channel)
}

public func encode(to encoder: Encoder) throws {
public nonisolated func encode(to encoder: Encoder) throws {
var container = encoder.singleValueContainer()
try container.encode(self.node)
}
Expand Down Expand Up @@ -176,8 +176,7 @@ extension ClusterMembership.Node: Codable {
atIndex = repr.index(after: atIndex)

let name: String?
if let nameEndIndex = repr[atIndex...].firstIndex(of: "@"),
nameEndIndex < repr.endIndex {
if let nameEndIndex = repr[atIndex...].firstIndex(of: "@"), nameEndIndex < repr.endIndex {
name = String(repr[atIndex ..< nameEndIndex])
atIndex = repr.index(after: nameEndIndex)
} else {
Expand Down
94 changes: 55 additions & 39 deletions Sources/SWIMNIOExample/NIOPeer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the Swift Cluster Membership open source project
//
// Copyright (c) 2020 Apple Inc. and the Swift Cluster Membership project authors
// Copyright (c) 2020-2022 Apple Inc. and the Swift Cluster Membership project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
Expand All @@ -19,12 +19,12 @@ import NIO
import NIOConcurrencyHelpers
import SWIM

extension SWIM {
public extension SWIM {
/// SWIMPeer designed to deliver messages over UDP in collaboration with the SWIMNIOHandler.
public struct NIOPeer: SWIMPeer, SWIMPingOriginPeer, SWIMPingRequestOriginPeer, CustomStringConvertible {
public var node: Node
actor NIOPeer: SWIMPeer, SWIMPingOriginPeer, SWIMPingRequestOriginPeer, CustomStringConvertible {
public let node: Node

internal var channel: Channel
internal let channel: Channel

public init(node: Node, channel: Channel) {
self.node = node
Expand All @@ -35,60 +35,64 @@ extension SWIM {
payload: GossipPayload,
from origin: SWIMPingOriginPeer,
timeout: DispatchTimeInterval,
sequenceNumber: SWIM.SequenceNumber,
onResponse: @escaping (Result<PingResponse, Error>) -> Void
) {
sequenceNumber: SWIM.SequenceNumber
) async throws -> PingResponse {
guard let originPeer = origin as? SWIM.NIOPeer else {
fatalError("Peers MUST be of type SWIM.NIOPeer, yet was: \(origin)")
}
let message = SWIM.Message.ping(replyTo: originPeer, payload: payload, sequenceNumber: sequenceNumber)

let command = SWIMNIOWriteCommand(message: message, to: self.node, replyTimeout: timeout.toNIO, replyCallback: { reply in
switch reply {
case .success(.response(let pingResponse)):
assert(sequenceNumber == pingResponse.sequenceNumber, "callback invoked with not matching sequence number! Submitted with \(sequenceNumber) but invoked with \(pingResponse.sequenceNumber)!")
onResponse(.success(pingResponse))
case .failure(let error):
onResponse(.failure(error))
return try await withCheckedThrowingContinuation { continuation in
let message = SWIM.Message.ping(replyTo: originPeer, payload: payload, sequenceNumber: sequenceNumber)
let command = SWIMNIOWriteCommand(message: message, to: self.node, replyTimeout: timeout.toNIO, replyCallback: { reply in
switch reply {
case .success(.response(let pingResponse)):
assert(sequenceNumber == pingResponse.sequenceNumber, "callback invoked with not matching sequence number! Submitted with \(sequenceNumber) but invoked with \(pingResponse.sequenceNumber)!")
continuation.resume(returning: pingResponse)

case .success(let other):
fatalError("Unexpected message, got: [\(other)]:\(reflecting: type(of: other)) while expected \(PingResponse.self)")
}
})
case .failure(let error):
continuation.resume(throwing: error)

self.channel.writeAndFlush(command, promise: nil)
case .success(let other):
continuation.resume(throwing: SWIMNIOIllegalMessageTypeError("Unexpected message, got: [\(other)]:\(reflecting: type(of: other)) while expected \(PingResponse.self)"))
}
})

self.channel.writeAndFlush(command, promise: nil)
}
}

public func pingRequest(
target: SWIMPeer,
payload: GossipPayload,
from origin: SWIMPingRequestOriginPeer,
timeout: DispatchTimeInterval,
sequenceNumber: SWIM.SequenceNumber,
onResponse: @escaping (Result<PingResponse, Error>) -> Void
) {
sequenceNumber: SWIM.SequenceNumber
) async throws -> PingResponse {
guard let targetPeer = target as? SWIM.NIOPeer else {
fatalError("Peers MUST be of type SWIM.NIOPeer, yet was: \(target)")
}
guard let originPeer = origin as? SWIM.NIOPeer else {
fatalError("Peers MUST be of type SWIM.NIOPeer, yet was: \(origin)")
}
let message = SWIM.Message.pingRequest(target: targetPeer, replyTo: originPeer, payload: payload, sequenceNumber: sequenceNumber)

let command = SWIMNIOWriteCommand(message: message, to: self.node, replyTimeout: timeout.toNIO, replyCallback: { reply in
switch reply {
case .success(.response(let pingResponse)):
assert(sequenceNumber == pingResponse.sequenceNumber, "callback invoked with not matching sequence number! Submitted with \(sequenceNumber) but invoked with \(pingResponse.sequenceNumber)!")
onResponse(.success(pingResponse))
case .failure(let error):
onResponse(.failure(error))
return try await withCheckedThrowingContinuation { continuation in
let message = SWIM.Message.pingRequest(target: targetPeer, replyTo: originPeer, payload: payload, sequenceNumber: sequenceNumber)
let command = SWIMNIOWriteCommand(message: message, to: self.node, replyTimeout: timeout.toNIO, replyCallback: { reply in
switch reply {
case .success(.response(let pingResponse)):
assert(sequenceNumber == pingResponse.sequenceNumber, "callback invoked with not matching sequence number! Submitted with \(sequenceNumber) but invoked with \(pingResponse.sequenceNumber)!")
continuation.resume(returning: pingResponse)

case .success(let other):
fatalError("Unexpected message, got: \(other) while expected \(PingResponse.self)")
}
})
case .failure(let error):
continuation.resume(throwing: error)

self.channel.writeAndFlush(command, promise: nil)
case .success(let other):
continuation.resume(throwing: SWIMNIOIllegalMessageTypeError("Unexpected message, got: \(other) while expected \(PingResponse.self)"))
}
})

self.channel.writeAndFlush(command, promise: nil)
}
}

public func ack(
Expand All @@ -113,14 +117,14 @@ extension SWIM {
self.channel.writeAndFlush(command, promise: nil)
}

public var description: String {
public nonisolated var description: String {
"NIOPeer(\(self.node))"
}
}
}

extension SWIM.NIOPeer: Hashable {
public func hash(into hasher: inout Hasher) {
public nonisolated func hash(into hasher: inout Hasher) {
self.node.hash(into: &hasher)
}

Expand All @@ -147,3 +151,15 @@ public struct SWIMNIOTimeoutError: Error, CustomStringConvertible {
"SWIMNIOTimeoutError(timeout: \(self.timeout.prettyDescription), \(self.message))"
}
}

public struct SWIMNIOIllegalMessageTypeError: Error, CustomStringConvertible {
let message: String

init(_ message: String) {
self.message = message
}

public var description: String {
"SWIMNIOIllegalMessageTypeError(\(self.message))"
}
}
Loading

0 comments on commit eaca089

Please sign in to comment.