diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ec6acb0..42e749f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,16 +1,14 @@ -name: Test +name: CI -on: - push: +on: push jobs: - test: - name: Test + library: runs-on: macos-latest steps: - - uses: actions/checkout@v2 - - name: Build - run: swift build -v + - uses: actions/checkout@v3 + - name: Select Xcode 14 + run: sudo xcode-select -s /Applications/Xcode_14.3.app - name: Test - run: swift test -v + run: swift test diff --git a/.gitignore b/.gitignore index 9e70373..c4560e4 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,8 @@ .swiftpm +/.vscode + # Xcode # # gitignore contributors: remember to update Global/Xcode.gitignore, Objective-C.gitignore & Swift.gitignore @@ -62,4 +64,3 @@ Carthage/Build fastlane/report.xml fastlane/screenshots - diff --git a/.swift-version b/.swift-version index c6e8a5a..82544bb 100644 --- a/.swift-version +++ b/.swift-version @@ -1,2 +1,2 @@ -5.3.0 +5.8.0 diff --git a/.swiftformat b/.swiftformat index 35d6950..74fe761 100644 --- a/.swiftformat +++ b/.swiftformat @@ -1,3 +1,8 @@ +--disable \ + hoistAwait, \ + hoistTry + +--decimalgrouping 3,5 --funcattributes prev-line --minversion 0.47.2 --maxwidth 96 @@ -5,4 +10,3 @@ --wraparguments before-first --wrapparameters before-first --wrapcollections before-first ---xcodeindentation enabled diff --git a/Package.resolved b/Package.resolved new file mode 100644 index 0000000..b4c00c3 --- /dev/null +++ b/Package.resolved @@ -0,0 +1,86 @@ +{ + "pins" : [ + { + "identity" : "async-extensions", + "kind" : "remoteSourceControl", + "location" : "https://github.com/shareup/async-extensions.git", + "state" : { + "revision" : "59504194f84b8c66a27503b5fd0640ac9b01f42a", + "version" : "4.1.0" + } + }, + { + "identity" : "dispatch-timer", + "kind" : "remoteSourceControl", + "location" : "https://github.com/shareup/dispatch-timer.git", + "state" : { + "revision" : "2d8c304aa6f382a7a362cd5a814884f3930c5662", + "version" : "3.0.1" + } + }, + { + "identity" : "swift-atomics", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-atomics.git", + "state" : { + "revision" : "6c89474e62719ddcc1e9614989fff2f68208fe10", + "version" : "1.1.0" + } + }, + { + "identity" : "swift-collections", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-collections.git", + "state" : { + "revision" : "937e904258d22af6e447a0b72c0bc67583ef64a2", + "version" : "1.0.4" + } + }, + { + "identity" : "swift-nio", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-nio.git", + "state" : { + "revision" : "9b2848d76f5caad08b97e71a04345aa5bdb23a06", + "version" : "2.49.0" + } + }, + { + "identity" : "swift-nio-ssl", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-nio-ssl.git", + "state" : { + "revision" : "4fb7ead803e38949eb1d6fabb849206a72c580f3", + "version" : "2.23.0" + } + }, + { + "identity" : "swift-nio-transport-services", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-nio-transport-services.git", + "state" : { + "revision" : "c0d9a144cfaec8d3d596aadde3039286a266c15c", + "version" : "1.15.0" + } + }, + { + "identity" : "synchronized", + "kind" : "remoteSourceControl", + "location" : "https://github.com/shareup/synchronized.git", + "state" : { + "revision" : "85653e23270ec88ae19f8d494157769487e34aed", + "version" : "4.0.1" + } + }, + { + "identity" : "websocket-kit", + "kind" : "remoteSourceControl", + "location" : "https://github.com/vapor/websocket-kit.git", + "state" : { + "revision" : "2b8885974e8d9f522e787805000553f4f7cce8a0", + "version" : "2.7.0" + } + } + ], + "version" : 2 +} diff --git a/Package.swift b/Package.swift index 823ab7a..b76d59f 100644 --- a/Package.swift +++ b/Package.swift @@ -1,10 +1,11 @@ -// swift-tools-version:5.3 +// swift-tools-version:5.8 + import PackageDescription let package = Package( name: "WebSocket", platforms: [ - .macOS(.v11), .iOS(.v14), .tvOS(.v14), .watchOS(.v7), + .macOS(.v12), .iOS(.v15), .tvOS(.v15), .watchOS(.v8), ], products: [ .library( @@ -12,15 +13,47 @@ let package = Package( targets: ["WebSocket"] ), ], - dependencies: [], + dependencies: [ + .package( + url: "https://github.com/shareup/async-extensions.git", + from: "4.1.0" + ), + .package( + url: "https://github.com/shareup/dispatch-timer.git", + from: "3.0.0" + ), + .package( + url: "https://github.com/vapor/websocket-kit.git", + from: "2.6.1" + ), + .package( + url: "https://github.com/apple/swift-nio.git", + from: "2.0.0" + ), + ], targets: [ .target( name: "WebSocket", - dependencies: [] + dependencies: [ + .product(name: "AsyncExtensions", package: "async-extensions"), + .product(name: "DispatchTimer", package: "dispatch-timer"), + ], + swiftSettings: [ + .unsafeFlags([ + "-Xfrontend", "-warn-concurrency", + "-Xfrontend", "-enable-actor-data-race-checks", + ]), + ] ), .testTarget( name: "WebSocketTests", - dependencies: ["WebSocket"] + dependencies: [ + .product(name: "NIO", package: "swift-nio"), + .product(name: "NIOHTTP1", package: "swift-nio"), + .product(name: "NIOWebSocket", package: "swift-nio"), + "WebSocket", + .product(name: "WebSocketKit", package: "websocket-kit"), + ] ), ] ) diff --git a/README.md b/README.md index 78bf3ae..0a84416 100644 --- a/README.md +++ b/README.md @@ -4,11 +4,26 @@ A concrete implementation of a WebSocket client implemented by wrapping Apple's [`NWConnection`](https://developer.apple.com/documentation/network/nwconnection). -The public "interface" of `WebSocket` is a simple struct whose public "methods" are exposed as closures. The reason for this design is to make it easy to inject fake `WebSocket`s into your code for testing purposes. +The public interface of `WebSocket` is a simple struct whose public methods are exposed as closures. The reason for this design is to make it easy to inject fake WebSockets into your code for testing purposes. The actual implementation is `SystemWebSocket`, but this type is not publicly accessible. Instead, you can access it via `WebSocket.system(url:)`. `SystemWebSocket` tries its best to mirror the documented behavior of web browsers' [`WebSocket`](http://developer.mozilla.org/en-US/docs/Web/API/WebSocket). Please report any deviations as bugs. -`WebSocket` exposes a simple API, makes heavy use of [Swift Concurrency](https://developer.apple.com/documentation/swift/swift_standard_library/concurrency), and conforms to Apple's Combine [`Publisher`](https://developer.apple.com/documentation/combine/publisher). +`WebSocket` exposes a simple API and makes heavy use of [Swift Concurrency](https://developer.apple.com/documentation/swift/swift_standard_library/concurrency). + +## Installation + +To use WebSocket, add a dependency to your Package.swift file: + +```swift +let package = Package( + dependencies: [ + .package( + url: "https://github.com/shareup/websocket-apple.git", + from: "4.0.0" + ) + ] +) +``` ## Usage diff --git a/Sources/WebSocket/OSLog+WebSocket.swift b/Sources/WebSocket/OSLog+WebSocket.swift index 4194a98..dc82cc5 100644 --- a/Sources/WebSocket/OSLog+WebSocket.swift +++ b/Sources/WebSocket/OSLog+WebSocket.swift @@ -2,8 +2,8 @@ import Foundation import os.log extension OSLog { - private static var subsystem = - Bundle.main.bundleIdentifier ?? "app.shareup.websocket-apple" - static let webSocket = OSLog(subsystem: subsystem, category: "websocket") } + +private let subsystem = + Bundle.main.bundleIdentifier ?? "app.shareup.websocket-apple" diff --git a/Sources/WebSocket/SystemURLSession.swift b/Sources/WebSocket/SystemURLSession.swift new file mode 100644 index 0000000..e5d5262 --- /dev/null +++ b/Sources/WebSocket/SystemURLSession.swift @@ -0,0 +1,120 @@ +import Foundation +import Synchronized + +func webSocketTask( + for url: URL, + options: WebSocketOptions, + onOpen: @escaping @Sendable () async -> Void, + onClose: @escaping @Sendable (WebSocketCloseCode, Data?) async -> Void +) -> URLSessionWebSocketTask { + let session = session(for: options) + + let task = session.webSocketTask(with: url) + task.maximumMessageSize = options.maximumMessageSize + + let delegate = session.delegate as! Delegate + delegate.set(onOpen: onOpen, onClose: onClose, for: ObjectIdentifier(task)) + + return task +} + +func cancelAndInvalidateAllTasks() { + sessions.access { sessions in + sessions.forEach { $0.value.invalidateAndCancel() } + sessions.removeAll() + } +} + +private let sessions = Locked<[WebSocketOptions: URLSession]>([:]) + +private func session(for options: WebSocketOptions) -> URLSession { + sessions.access { sessions in + if let session = sessions[options] { + return session + } else { + let session = URLSession( + configuration: configuration(with: options), + delegate: Delegate(), + delegateQueue: nil + ) + + sessions[options] = session + + return session + } + } +} + +private func configuration(with options: WebSocketOptions) -> URLSessionConfiguration { + let config = URLSessionConfiguration.default + config.waitsForConnectivity = false + config.timeoutIntervalForRequest = options.timeoutIntervalForRequest + config.timeoutIntervalForResource = options.timeoutIntervalForResource + return config +} + +private final class Delegate: NSObject, URLSessionWebSocketDelegate, Sendable { + private struct Callbacks: Sendable { + let onOpen: @Sendable () async -> Void + let onClose: @Sendable (WebSocketCloseCode, Data?) async -> Void + } + + // `Dictionary` + private let state: Locked<[ObjectIdentifier: Callbacks]> = .init([:]) + + func set( + onOpen: @escaping @Sendable () async -> Void, + onClose: @escaping @Sendable (WebSocketCloseCode, Data?) async -> Void, + for taskID: ObjectIdentifier + ) { + state.access { $0[taskID] = .init(onOpen: onOpen, onClose: onClose) } + } + + func urlSession( + _: URLSession, + webSocketTask: URLSessionWebSocketTask, + didOpenWithProtocol _: String? + ) { + let taskID = ObjectIdentifier(webSocketTask) + + if let onOpen = state.access({ $0[taskID]?.onOpen }) { + Task { await onOpen() } + } + } + + func urlSession( + _: URLSession, + webSocketTask: URLSessionWebSocketTask, + didCloseWith closeCode: URLSessionWebSocketTask.CloseCode, + reason: Data? + ) { + let taskID = ObjectIdentifier(webSocketTask) + + if let onClose = state.access({ $0[taskID]?.onClose }) { + Task { await onClose(WebSocketCloseCode(closeCode), reason) } + } + } + + func urlSession( + _: URLSession, + task: URLSessionTask, + didCompleteWithError error: Error? + ) { + let taskID = ObjectIdentifier(task) + + if let onClose = state.access({ $0[taskID]?.onClose }) { + Task { [weak self] in + if let error { + await onClose( + .abnormalClosure, + Data(error.localizedDescription.utf8) + ) + } else { + await onClose(.normalClosure, nil) + } + + self?.state.access { _ = $0.removeValue(forKey: taskID) } + } + } + } +} diff --git a/Sources/WebSocket/SystemWebSocket.swift b/Sources/WebSocket/SystemWebSocket.swift index 305a53a..8b5dcf9 100644 --- a/Sources/WebSocket/SystemWebSocket.swift +++ b/Sources/WebSocket/SystemWebSocket.swift @@ -1,7 +1,13 @@ -import Combine -import Foundation -import Network +import AsyncExtensions +@preconcurrency import Combine +@preconcurrency import Foundation import os.log +import Synchronized + +private typealias OpenFuture = AsyncExtensions.Future + +private typealias CloseFuture = + AsyncExtensions.Future<(code: WebSocketCloseCode, reason: Data?)> final actor SystemWebSocket: Publisher { typealias Output = WebSocketMessage @@ -17,22 +23,19 @@ final actor SystemWebSocket: Publisher { return true } } - private let url: URL - private let options: WebSocketOptions - private var _onOpen: WebSocketOnOpen - private var _onClose: WebSocketOnClose - private var state: WebSocketState = .unopened + nonisolated let url: URL + nonisolated let options: WebSocketOptions + nonisolated let onOpen: WebSocketOnOpen + nonisolated let onClose: WebSocketOnClose - private var messageIndex = 0 // Used to identify sent messages + private var state: State = .unopened - private let subject = PassthroughSubject() + private var didOpen: OpenFuture + private var didClose: CloseFuture? - private let webSocketQueue: DispatchQueue = .init( - label: "app.shareup.websocket.websocketqueue", - attributes: [], - autoreleaseFrequency: .workItem, - target: .global(qos: .default) - ) + private var messageIndex = 0 // Used to identify sent messages + + private nonisolated let subject = PassthroughSubject() // Deliver messages to the subscribers on a separate queue because it's a bad idea // to let the subscribers, who could potentially be doing long-running tasks with the @@ -40,6 +43,7 @@ final actor SystemWebSocket: Publisher { private let subscriberQueue = DispatchQueue( label: "app.shareup.websocket.subjectqueue", attributes: [], + autoreleaseFrequency: .workItem, target: DispatchQueue.global(qos: .default) ) @@ -51,63 +55,55 @@ final actor SystemWebSocket: Publisher { ) async throws { self.url = url self.options = options - _onOpen = onOpen - _onClose = onClose + self.onOpen = onOpen + self.onClose = onClose + + didOpen = .init(timeout: options.timeoutIntervalForRequest) + try connect() } deinit { - switch state { - case let .connecting(connection), let .open(connection): - connection.forceCancel() - default: - break - } + didOpen.fail(CancellationError()) + didClose?.fail(CancellationError()) + state.ws?.cancel() + subject.send(completion: .finished) } - nonisolated func receive(subscriber: S) - where S.Input == WebSocketMessage, S.Failure == Never - { + nonisolated func receive( + subscriber: S + ) where S.Input == WebSocketMessage, S.Failure == Never { subject .receive(on: subscriberQueue) .receive(subscriber: subscriber) } - func open(timeout: TimeInterval? = nil) async throws { + func open() async throws { switch state { - case .open: - return - - case .closing, .closed: - throw WebSocketError.openAfterConnectionClosed - case .unopened, .connecting: do { - try await withThrowingTaskGroup( - of: Void - .self - ) { (group: inout ThrowingTaskGroup) in - _ = group.addTaskUnlessCancelled { [weak self] in - guard let self = self else { return } - let _timeout = UInt64(timeout ?? self.options.timeoutIntervalForRequest) - try await Task.sleep(nanoseconds: _timeout * NSEC_PER_SEC) - throw CancellationError() - } - - _ = group.addTaskUnlessCancelled { [weak self] in - guard let self = self else { return } - while await !self.isOpen { - try await Task.sleep(nanoseconds: 10 * NSEC_PER_MSEC) - } - } - - _ = try await group.next() - group.cancelAll() - } - } catch { - doClose() + try await didOpen.value + } catch is CancellationError { + doClose(closeCode: .cancelled, reason: Data("cancelled".utf8)) + } catch is TimeoutError { + doClose(closeCode: .timeout, reason: Data("timeout".utf8)) + throw TimeoutError() + } catch let error as WebSocketError { + doClose( + closeCode: error.closeCode ?? .unknown, + reason: error.reason + ) + throw error + } catch { + preconditionFailure("Invalid error: \(String(reflecting: error))") } + + case .open: + return + + case .closed: + throw WebSocketError(.alreadyClosed, nil) } } @@ -115,7 +111,7 @@ final actor SystemWebSocket: Publisher { // Mirrors the document behavior of JavaScript's `WebSocket` // http://developer.mozilla.org/en-US/docs/Web/API/WebSocket/send switch state { - case let .open(connection): + case let .open(ws): messageIndex += 1 os_log( @@ -123,525 +119,201 @@ final actor SystemWebSocket: Publisher { log: .webSocket, type: .debug, messageIndex, - message.debugDescription - ) - - let context = NWConnection.ContentContext( - identifier: String(messageIndex), - metadata: [message.metadata] + message.description ) - try await withCheckedThrowingContinuation { (cont: CheckedContinuation) in - connection.send( - content: message.contentAsData, - contentContext: context, - isComplete: true, - completion: .contentProcessed { (error: NWError?) in - if let error = error { - cont.resume(throwing: error) - } else { - cont.resume() - } - } - ) - } + try await ws.send(message.wsMessage) case .unopened, .connecting: os_log( "send message while connecting: %s", log: .webSocket, type: .error, - message.debugDescription + message.description ) throw WebSocketError.sendMessageWhileConnecting - case .closing, .closed: + case .closed: os_log( "send message while closed: %s", log: .webSocket, type: .debug, - message.debugDescription + message.description ) } } - func close(_ closeCode: WebSocketCloseCode = .normalClosure) async throws { + func close( + code: WebSocketCloseCode = .normalClosure, + reason: Data? = nil, + timeout: TimeInterval? = nil + ) async throws { switch state { - case let .connecting(conn), let .open(conn): - os_log( - "close connection: code=%d state=%{public}s", - log: .webSocket, - type: .debug, - closeCode.rawValue, - state.description - ) - - try await withCheckedThrowingContinuation { (cont: CheckedContinuation) in - conn.send( - content: nil, - contentContext: .finalMessage, - isComplete: true, - completion: .contentProcessed { (error: Error?) in - if let error = error { - cont.resume(throwing: error) - } else { - cont.resume() - } - } + case .unopened: + doClose(closeCode: code, reason: reason) + + case .connecting, .open: + if let didClose { + _ = try await didClose.value + } else { + let didClose = CloseFuture( + timeout: timeout ?? options.timeoutIntervalForRequest ) + self.didClose = didClose + doClose(closeCode: code, reason: reason) + _ = try await didClose.value } - startClosing(connection: conn, error: closeCode.error) - case .unopened, .closing, .closed: - doClose() + case .closed: + doClose(closeCode: code, reason: reason) } } - - func forceClose(_ closeCode: WebSocketCloseCode) { - os_log( - "force close connection: code=%d state=%{public}s", - log: .webSocket, - type: .debug, - closeCode.rawValue, - state.description - ) - - doClose() - } - - func onOpen(_ block: @escaping WebSocketOnOpen) { - _onOpen = block - } - - func onClose(_ block: @escaping WebSocketOnClose) { - _onClose = block - } } private extension SystemWebSocket { var isUnopened: Bool { - switch state { - case .unopened: return true - default: return false - } - } - - func setState(_ state: WebSocketState) async { - self.state = state + guard case .unopened = state else { return false } + return true } func connect() throws { precondition(isUnopened) - - guard let components = URLComponents(url: url, resolvingAgainstBaseURL: false) else { - throw WebSocketError.invalidURL(url) - } - - let parameters = try self.parameters(with: components) - let connection = NWConnection(to: .url(url), using: parameters) - state = .connecting(connection) - connection.stateUpdateHandler = connectionStateUpdateHandler - connection.start(queue: webSocketQueue) - } - - func openReadyConnection(_ connection: NWConnection) { - os_log( - "open connection: connection_state=%{public}s", - log: .webSocket, - type: .debug, - connection.state.debugDescription - ) - - state = .open(connection) - _onOpen() - connection.receiveMessage(completion: onReceiveMessage) - } - - func startClosing(connection: NWConnection, error: NWError? = nil) { - state = .closing(error) - subject.send(completion: .finished) - connection.cancel() - } - - func doClose() { - // TODO: Switch to using `state.description` - os_log( - "do close connection: state=%{public}s", - log: .webSocket, - type: .debug, - state.debugDescription + let task = webSocketTask( + for: url, + options: options, + onOpen: { [weak self] in await self?.doOpen() }, + onClose: { [weak self] closeCode, reason async in + await self?.doClose(closeCode: closeCode, reason: reason) + } ) - - switch state { - case .closing(nil): - state = .closed(nil) - subject.send(completion: .finished) - _onClose(normalClosure) - - case let .closing(.some(err)): - state = .closed(.connectionError(err)) - subject.send(completion: .finished) - _onClose(closureWithError(err)) - - case .unopened: - state = .closed(nil) - subject.send(completion: .finished) - _onClose(abnormalClosure) - - case let .connecting(conn), let .open(conn): - state = .closed(nil) - subject.send(completion: .finished) - _onClose(abnormalClosure) - conn.forceCancel() - - case .closed: - // `PassthroughSubject` only sends completions once. - subject.send(completion: .finished) - } + state = .connecting(task) + task.resume() } - func doCloseWithError(_ error: WebSocketError) { - // TODO: Switch to using `state.description` - os_log( - "do close connection: state=%{public}s error=%{public}s", - log: .webSocket, - type: .debug, - state.debugDescription, - String(describing: error) - ) - + func doOpen() { switch state { - case let .closing(.some(err)): - state = .closed(.connectionError(err)) - subject.send(completion: .finished) - _onClose(closureWithError(err)) - - case .closing(nil): - state = .closed(error) - subject.send(completion: .finished) - _onClose(closureWithError(error)) + case let .connecting(ws): + os_log("open", log: .webSocket, type: .debug) + state = .open(ws) + onOpen() + didOpen.resolve() + doReceiveMessage(ws) case .unopened: - state = .closed(error) - subject.send(completion: .finished) - _onClose(closureWithError(error)) + os_log("received open before connecting", log: .webSocket, type: .error) + preconditionFailure("Cannot receive open before trying to connect") - case let .connecting(conn), let .open(conn): - state = .closed(nil) - subject.send(completion: .finished) - _onClose(closureWithError(error)) - conn.forceCancel() + case .open: + // Ignore this because there might be multiple consumers + // waiting on `.open(timeout:)` to return. + break case .closed: - // `PassthroughSubject` only sends completions once. - subject.send(completion: .finished) - } - } -} - -private extension SystemWebSocket { - func host(with urlComponents: URLComponents) throws -> NWEndpoint.Host { - guard let host = urlComponents.host else { - throw WebSocketError.invalidURLComponents(urlComponents) - } - return NWEndpoint.Host(host) - } - - func port(with urlComponents: URLComponents) throws -> NWEndpoint.Port { - if let raw = urlComponents.port, let port = NWEndpoint.Port(rawValue: UInt16(raw)) { - return port - } else if urlComponents.scheme == "ws" { - return NWEndpoint.Port.http - } else if urlComponents.scheme == "wss" { - return NWEndpoint.Port.https - } else { - throw WebSocketError.invalidURLComponents(urlComponents) + os_log( + "trying to open already-closed connection", + log: .webSocket, + type: .error + ) + doClose(closeCode: .alreadyClosed, reason: nil) } } - func parameters(with urlComponents: URLComponents) throws -> NWParameters { - let parameters: NWParameters - switch urlComponents.scheme { - case "ws": - parameters = .tcp - case "wss": - parameters = .tls - default: - throw WebSocketError.invalidURLComponents(urlComponents) - } - - let webSocketOptions = NWProtocolWebSocket.Options() - webSocketOptions.maximumMessageSize = options.maximumMessageSize - webSocketOptions.autoReplyPing = true + func doReceiveMessage(_ ws: URLSessionWebSocketTask) { + guard ws.closeCode == .invalid, !Task.isCancelled else { return } - parameters.defaultProtocolStack.applicationProtocols.insert(webSocketOptions, at: 0) + ws.receive { [weak self] (result: Result) in + guard let self, ws.closeCode == .invalid, !Task.isCancelled else { return } - return parameters - } -} - -private extension SystemWebSocket { - var connectionStateUpdateHandler: (NWConnection.State) -> Void { - { [weak self] (connectionState: NWConnection.State) in - Task { [weak self] in - guard let self = self else { return } - - let state = await self.state - - // TODO: Switch to using `state.description` + switch result { + case let .success(msg): + let message = WebSocketMessage(msg) os_log( - "connection state update: connection_state=%{public}s state=%{public}s", + "receive: message=%s", log: .webSocket, type: .debug, - connectionState.debugDescription, - state.debugDescription + message.description ) - - switch connectionState { - case .setup: - break - - case let .waiting(error): - await self.doCloseWithError(.connectionError(error)) - - case .preparing: - break - - case .ready: - switch state { - case let .connecting(conn): - await self.openReadyConnection(conn) - - case .open: - // TODO: Handle betterPathUpdate here? - break - - case .unopened, .closing, .closed: - // TODO: Switch to using `state.description` - os_log( - "unexpected connection ready: state=%{public}s", - log: .webSocket, - type: .error, - state.debugDescription - ) - } - - case let .failed(error): - switch state { - case let .connecting(conn), let .open(conn): - await self.startClosing(connection: conn, error: error) - - case .unopened, .closing, .closed: - break - } - - case .cancelled: - switch state { - case let .connecting(conn), let .open(conn): - await self.startClosing(connection: conn) - - case .unopened, .closing: - await self.doClose() - - case .closed: - break - } - - @unknown default: - assertionFailure("Unknown state '\(state)'") + subject.send(message) + Task { [weak self] in await self?.doReceiveMessage(ws) } + + case let .failure(error): + Task { [weak self] in + await self?.doClose( + closeCode: .abnormalClosure, + reason: Data(error.localizedDescription.utf8) + ) } } } } - var onReceiveMessage: (Data?, NWConnection.ContentContext?, Bool, NWError?) -> Void { - { [weak self] data, context, isMessageComplete, error in - guard let self = self else { return } - guard isMessageComplete else { return } - - Task { - switch (data, context, error) { - case let (.some(data), .some(context), .none): - await self.handleSuccessfulMessage(data: data, context: context) - case let (.none, _, .some(error)): - await self.handleMessageWithError(error) - default: - await self.handleUnknownMessage(data: data, context: context, error: error) - } - } - } - } - - func handleSuccessfulMessage(data: Data, context: NWConnection.ContentContext) { - guard case let .open(connection) = state else { return } + func doClose(closeCode: WebSocketCloseCode, reason: Data?) { + switch state { + case .unopened: + state = .closed(.init(closeCode, reason)) - switch context.websocketMessageType { - case .binary: + case let .connecting(ws), let .open(ws): os_log( - "receive binary: size=%d", + "close: code=%{public}s", log: .webSocket, type: .debug, - data.count + closeCode.description ) - subject.send(.data(data)) - case .text: - guard let text = String(data: data, encoding: .utf8) else { - startClosing(connection: connection, error: .posix(.EBADMSG)) - return + // When the task is not yet closed, this value is `.invalid`. + if ws.closeCode == .invalid { + if let code = closeCode.wsCloseCode { + ws.cancel(with: code, reason: reason) + } else { + ws.cancel() + } } - os_log( - "receive text: content=%s", - log: .webSocket, - type: .debug, - text - ) - subject.send(.text(text)) - - case .close: - doClose() - case .pong: - // TODO: Handle pongs at some point - break - - default: - let messageType = String(describing: context.websocketMessageType) - assertionFailure("Unexpected message type: \(messageType)") - } - - connection.receiveMessage(completion: onReceiveMessage) - } - - func handleMessageWithError(_ error: NWError) { - switch state { - case let .connecting(conn), let .open(conn): - - startClosing(connection: conn, error: error) + let close = WebSocketClose(closeCode, nil) + state = .closed(close) + onClose(close) + didClose?.resolve((code: closeCode, reason: reason)) + subject.send(completion: .finished) - case .unopened, .closing, .closed: - // TODO: Should we call `doClose()` here, instead? + case .closed: break } } - - func handleUnknownMessage( - data: Data?, - context: NWConnection.ContentContext?, - error: NWError? - ) { - func describeInputs() -> String { - String(describing: String(data: data ?? Data(), encoding: .utf8)) + " " + - String(describing: context) + " " + String(describing: error) - } - - // TODO: Switch to using `state.description` - os_log( - "unknown message: state=%{public}s message=%s", - log: .webSocket, - type: .error, - state.debugDescription, - describeInputs() - ) - - doCloseWithError(WebSocketError.receiveUnknownMessageType) - } -} - -private extension WebSocketMessage { - var metadata: NWProtocolWebSocket.Metadata { - switch self { - case .data: return .init(opcode: .binary) - case .text: return .init(opcode: .text) - } - } - - var contentAsData: Data { - switch self { - case let .data(data): return data - case let .text(text): return Data(text.utf8) - } - } -} - -private enum WebSocketState: CustomStringConvertible, CustomDebugStringConvertible { - case unopened - case connecting(NWConnection) - case open(NWConnection) - case closing(NWError?) - case closed(WebSocketError?) - - var description: String { - switch self { - case .unopened: return "unopened" - case .connecting: return "connecting" - case .open: return "open" - case .closing: return "closing" - case .closed: return "closed" - } - } - - var debugDescription: String { - switch self { - case .unopened: return "unopened" - case let .connecting(conn): return "connecting(\(String(reflecting: conn)))" - case let .open(conn): return "open(\(String(reflecting: conn)))" - case let .closing(error): return "closing(\(error.debugDescription))" - case let .closed(error): return "closed(\(error.debugDescription))" - } - } -} - -private extension NWConnection.ContentContext { - var webSocketMetadata: NWProtocolWebSocket.Metadata? { - let definition = NWProtocolWebSocket.definition - return protocolMetadata(definition: definition) as? NWProtocolWebSocket.Metadata - } - - var websocketMessageType: NWProtocolWebSocket.Opcode? { - webSocketMetadata?.opcode - } } -private extension NWError { - var shouldCloseConnectionWhileConnectingOrOpen: Bool { - switch self { - case .posix(.ECANCELED), .posix(.ENOTCONN): - return false - default: - print("Unhandled error in '\(#function)': \(debugDescription)") - return true +private extension SystemWebSocket { + enum State: CustomStringConvertible, CustomDebugStringConvertible { + case unopened + case connecting(URLSessionWebSocketTask) + case open(URLSessionWebSocketTask) + case closed(WebSocketClose) + + var ws: URLSessionWebSocketTask? { + switch self { + case let .connecting(ws), let .open(ws): + return ws + + case .unopened, .closed: + return nil + } } - } - var closeCode: WebSocketCloseCode { - switch self { - case .posix(.ECANCELED): - return .normalClosure - default: - print("Unhandled error in '\(#function)': \(debugDescription)") - return .normalClosure + var description: String { + switch self { + case .unopened: return "unopened" + case .connecting: return "connecting" + case .open: return "open" + case .closed: return "closed" + } } - } -} -private extension NWConnection.State { - var debugDescription: String { - switch self { - case .setup: return "setup" - case let .waiting(error): return "waiting(\(String(reflecting: error)))" - case .preparing: return "preparing" - case .ready: return "ready" - case let .failed(error): return "failed(\(String(reflecting: error)))" - case .cancelled: return "cancelled" - @unknown default: return "unknown" + var debugDescription: String { + switch self { + case .unopened: return "unopened" + case let .connecting(ws): return "connecting(\(String(reflecting: ws)))" + case let .open(ws): return "open(\(String(reflecting: ws)))" + case let .closed(error): return "closed(\(error.description))" + } } } } - -private extension Optional where Wrapped == NWError { - var debugDescription: String { - guard case let .some(error) = self else { return "" } - return String(reflecting: error) - } -} diff --git a/Sources/WebSocket/URLSessionWebSocketTaskCloseCode+WebSocketCloseCode.swift b/Sources/WebSocket/URLSessionWebSocketTaskCloseCode+WebSocketCloseCode.swift deleted file mode 100644 index 875c542..0000000 --- a/Sources/WebSocket/URLSessionWebSocketTaskCloseCode+WebSocketCloseCode.swift +++ /dev/null @@ -1,32 +0,0 @@ -import Foundation - -extension URLSessionWebSocketTask.CloseCode { - init?(_ closeCode: WebSocketCloseCode) { - self.init(rawValue: closeCode.rawValue) - } -} - -extension WebSocketCloseCode { - init?(_ closeCode: URLSessionWebSocketTask.CloseCode?) { - guard let closeCode = closeCode else { return nil } - self.init(rawValue: closeCode.rawValue) - } - - var urlSessionCloseCode: URLSessionWebSocketTask.CloseCode { - switch self { - case .invalid: return .invalid - case .normalClosure: return .normalClosure - case .goingAway: return .goingAway - case .protocolError: return .protocolError - case .unsupportedData: return .unsupportedData - case .noStatusReceived: return .noStatusReceived - case .abnormalClosure: return .abnormalClosure - case .invalidFramePayloadData: return .invalidFramePayloadData - case .policyViolation: return .policyViolation - case .messageTooBig: return .messageTooBig - case .mandatoryExtensionMissing: return .mandatoryExtensionMissing - case .internalServerError: return .internalServerError - case .tlsHandshakeFailure: return .tlsHandshakeFailure - } - } -} diff --git a/Sources/WebSocket/URLSessionWebSocketTaskMessage+WebSocket.swift b/Sources/WebSocket/URLSessionWebSocketTaskMessage+WebSocket.swift deleted file mode 100644 index ccb58b8..0000000 --- a/Sources/WebSocket/URLSessionWebSocketTaskMessage+WebSocket.swift +++ /dev/null @@ -1,40 +0,0 @@ -import Foundation - -extension URLSessionWebSocketTask.Message: CustomDebugStringConvertible { - public var debugDescription: String { - switch self { - case let .string(text): - return text - case let .data(data): - return "<\(data.count) bytes>" - @unknown default: - assertionFailure("Unsupported message: \(self)") - return "" - } - } -} - -extension WebSocketMessage { - init(_ message: URLSessionWebSocketTask.Message) { - switch message { - case let .data(data): - self = .data(data) - case let .string(string): - self = .text(string) - @unknown default: - assertionFailure("Unknown WebSocket Message type") - self = .text("") - } - } -} - -extension Result: CustomDebugStringConvertible where Success == WebSocketMessage { - public var debugDescription: String { - switch self { - case let .success(message): - return message.debugDescription - case let .failure(error): - return error.localizedDescription - } - } -} diff --git a/Sources/WebSocket/WebSocket.swift b/Sources/WebSocket/WebSocket.swift index e0547eb..65d6471 100644 --- a/Sources/WebSocket/WebSocket.swift +++ b/Sources/WebSocket/WebSocket.swift @@ -1,70 +1,87 @@ import Combine import Foundation +import Synchronized -public typealias WebSocketOnOpen = () -> Void -public typealias WebSocketOnClose = (WebSocketCloseResult) -> Void +public typealias WebSocketOnOpen = @Sendable () -> Void +public typealias WebSocketOnClose = @Sendable (WebSocketClose) + -> Void + +public struct WebSocket: Identifiable, Sendable { + public var id: Int -public struct WebSocket { /// Sets a closure to be called when the WebSocket connects successfully. - public var onOpen: (@escaping WebSocketOnOpen) async -> Void + public var onOpen: WebSocketOnOpen /// Sets a closure to be called when the WebSocket closes. - public var onClose: (@escaping WebSocketOnClose) async -> Void + public var onClose: WebSocketOnClose - /// Opens the WebSocket connect with an optional timeout. After this function - /// is awaited, the WebSocket connection is open ready to be used. If the + /// Opens the WebSocket connection. After this function returns, + /// the WebSocket connection is open ready to be used. If the /// connection fails or times out, an error is thrown. - public var open: (TimeInterval?) async throws -> Void + public var open: @Sendable () async throws -> Void /// Sends a close frame to the server with the given close code. - public var close: (WebSocketCloseCode) async throws -> Void + public var close: @Sendable (WebSocketCloseCode, TimeInterval?) async throws -> Void + + /// Invalidates **all** WebSocket connections. It should only be used + /// when all WebSocket connections in the current process need to be + /// cancelled. + public var invalidateAll: @Sendable () -> Void /// Sends a text or binary message. - public var send: (WebSocketMessage) async throws -> Void + public var send: @Sendable (WebSocketMessage) async throws -> Void /// Publishes messages received from WebSocket. Finishes when the /// WebSocket connection closes. - public var messagesPublisher: () -> AnyPublisher + public var messagesPublisher: @Sendable () + -> AnyPublisher public init( - onOpen: @escaping (@escaping WebSocketOnOpen) async -> Void = { _ in }, - onClose: @escaping (@escaping WebSocketOnClose) async -> Void = { _ in }, - open: @escaping (TimeInterval?) async throws -> Void = { _ in }, - close: @escaping (WebSocketCloseCode) async throws -> Void = { _ in }, - send: @escaping (WebSocketMessage) async throws -> Void = { _ in }, - messagesPublisher: @escaping () -> AnyPublisher = { + id: Int, + onOpen: @escaping WebSocketOnOpen = {}, + onClose: @escaping WebSocketOnClose = { _ in }, + open: @escaping @Sendable () async throws -> Void = {}, + close: @escaping @Sendable (WebSocketCloseCode, TimeInterval?) async throws + -> Void = { _, _ in }, + invalidateAll: @escaping @Sendable () -> Void = {}, + send: @escaping @Sendable (WebSocketMessage) async throws -> Void = { _ in }, + messagesPublisher: @escaping @Sendable () -> AnyPublisher = { Empty(completeImmediately: false).eraseToAnyPublisher() } ) { + self.id = id self.onOpen = onOpen self.onClose = onClose self.open = open self.close = close + self.invalidateAll = invalidateAll self.send = send self.messagesPublisher = messagesPublisher } } public extension WebSocket { - /// Calls `WebSocket.open(nil)`. - func open() async throws { - try await open(nil) + /// Calls `WebSocket.close(.normalClosure, nil)`. + func close() async throws { + try await close(.normalClosure, nil) } - /// Calls `WebSocket.close(closeCode: .goingAway)`. - func close() async throws { - try await close(.goingAway) + /// Calls `WebSocket.close(.normalClosure, timeout)`. + func close(timeout: TimeInterval) async throws { + try await close(.normalClosure, timeout) } /// The WebSocket's received messages as an asynchronous stream. var messages: AsyncStream { - var cancellable: AnyCancellable? + let cancellable = Locked(nil) return AsyncStream { cont in func finish() { - if cancellable != nil { - cont.finish() - cancellable = nil + cancellable.access { cancellable in + if cancellable != nil { + cont.finish() + cancellable = nil + } } } @@ -75,13 +92,13 @@ public extension WebSocket { receiveValue: { cont.yield($0) } ) - cancellable = _cancellable + cancellable.access { $0 = _cancellable } } } } public extension WebSocket { - /// System WebSocket implementation powered by the Network Framework. + /// System WebSocket implementation powered by `URLSessionWebSocketTask`. static func system( url: URL, options: WebSocketOptions = .init(), @@ -100,10 +117,12 @@ public extension WebSocket { // This is only intended for use in tests. internal static func system(_ ws: SystemWebSocket) async throws -> Self { Self( - onOpen: { onOpen in await ws.onOpen(onOpen) }, - onClose: { onClose in await ws.onClose(onClose) }, - open: { timeout in try await ws.open(timeout: timeout) }, - close: { code in try await ws.close(code) }, + id: Int(bitPattern: ObjectIdentifier(ws)), + onOpen: ws.onOpen, + onClose: ws.onClose, + open: { try await ws.open() }, + close: { code, timeout in try await ws.close(code: code, timeout: timeout) }, + invalidateAll: { cancelAndInvalidateAllTasks() }, send: { message in try await ws.send(message) }, messagesPublisher: { ws.eraseToAnyPublisher() } ) diff --git a/Sources/WebSocket/WebSocketClose.swift b/Sources/WebSocket/WebSocketClose.swift new file mode 100644 index 0000000..5214600 --- /dev/null +++ b/Sources/WebSocket/WebSocketClose.swift @@ -0,0 +1,29 @@ +import Foundation + +public struct WebSocketClose: Hashable, CustomStringConvertible, Sendable { + public let code: WebSocketCloseCode + public let reason: Data? + + public init(_ code: WebSocketCloseCode, _ reason: Data?) { + self.code = code + self.reason = reason + } + + public var description: String { "\(code.description)" } +} + +public extension WebSocketClose { + var isNormal: Bool { + switch code { + case .normalClosure: return true + default: return false + } + } + + var isCancelled: Bool { + switch code { + case .cancelled: return true + default: return false + } + } +} diff --git a/Sources/WebSocket/WebSocketCloseCode.swift b/Sources/WebSocket/WebSocketCloseCode.swift index c33552a..8faeff8 100644 --- a/Sources/WebSocket/WebSocketCloseCode.swift +++ b/Sources/WebSocket/WebSocketCloseCode.swift @@ -4,7 +4,7 @@ import Network /// A code indicating why a WebSocket connection closed. /// /// Mirrors [URLSessionWebSocketTask](https://developer.apple.com/documentation/foundation/urlsessionwebsockettask/closecode). -public enum WebSocketCloseCode: Int, CaseIterable { +public enum WebSocketCloseCode: Int, CaseIterable, Sendable { /// A code that indicates the connection is still open. case invalid = 0 @@ -14,66 +14,145 @@ public enum WebSocketCloseCode: Int, CaseIterable { /// A code that indicates an endpoint is going away. case goingAway = 1001 - /// A code that indicates an endpoint terminated the connection due to a protocol error. + /// A code that indicates an endpoint terminated the connection due to a + /// protocol error. case protocolError = 1002 - /// A code that indicates an endpoint terminated the connection after receiving a type of data it can’t accept. + /// A code that indicates an endpoint terminated the connection after + /// receiving a type of data it can’t accept. case unsupportedData = 1003 - /// A reserved code that indicates an endpoint expected a status code and didn’t receive one. + /// A reserved code that indicates an endpoint expected a status code and + /// didn’t receive one. case noStatusReceived = 1005 - /// A reserved code that indicates the connection closed without a close control frame. + /// A reserved code that indicates the connection closed without a close + /// control frame. case abnormalClosure = 1006 - /// A code that indicates the server terminated the connection because it received data inconsistent with the message’s type. + /// A code that indicates the server terminated the connection because it + /// received data inconsistent with the message’s type. case invalidFramePayloadData = 1007 - /// A code that indicates an endpoint terminated the connection because it received a message that violates its policy. + /// A code that indicates an endpoint terminated the connection because it + /// received a message that violates its policy. case policyViolation = 1008 - /// A code that indicates an endpoint is terminating the connection because it received a message too big for it to process. + /// A code that indicates an endpoint is terminating the connection because + /// it received a message too big for it to process. case messageTooBig = 1009 - /// A code that indicates the client terminated the connection because the server didn’t negotiate a required extension. + /// A code that indicates the client terminated the connection because the + /// server didn’t negotiate a required extension. case mandatoryExtensionMissing = 1010 - /// A code that indicates the server terminated the connection because it encountered an unexpected condition. + /// A code that indicates the server terminated the connection because it + /// encountered an unexpected condition. case internalServerError = 1011 - /// A reserved code that indicates the connection closed due to the failure to perform a TLS handshake. + /// A reserved code that indicates the connection closed due to the failure + /// to perform a TLS handshake. case tlsHandshakeFailure = 1015 + + // NOTE: Status codes in the range 4000-4999 are reserved for private use + // and thus can't be registered. Such codes can be used by prior + // agreements between WebSocket applications. The interpretation of + // these codes is undefined by this protocol. + // + // https://www.rfc-editor.org/rfc/rfc6455#section-7.4.1 + + /// A code that indicates the connection closed because it was cancelled by + /// the client. + case cancelled = 4000 + + /// A code that indicates the connection failed to open because it had + /// already been closed. + case alreadyClosed = 4001 + + /// A code that indicates the connection timed out while opening. + case timeout = 4002 + + /// A code that indicates the connection closed because of an unknown reason. + case unknown = 4999 } -extension WebSocketCloseCode { - var error: NWError? { +extension WebSocketCloseCode: CustomStringConvertible { + public var description: String { switch self { + case .invalid: return "invalid" + case .normalClosure: return "normalClosure" + case .goingAway: return "goingAway" + case .protocolError: return "protocolError" + case .unsupportedData: return "unsupportedData" + case .noStatusReceived: return "noStatusReceived" + case .abnormalClosure: return "abnormalClosure" + case .invalidFramePayloadData: return "invalidFramePayloadData" + case .policyViolation: return "policyViolation" + case .messageTooBig: return "messageTooBig" + case .mandatoryExtensionMissing: return "mandatoryExtensionMissing" + case .internalServerError: return "internalServerError" + case .tlsHandshakeFailure: return "tlsHandshakeFailure" + case .cancelled: return "cancelled" + case .alreadyClosed: return "alreadyClosed" + case .timeout: return "timeout" + case .unknown: return "unknown" + } + } +} + +extension WebSocketCloseCode { + init(_ code: URLSessionWebSocketTask.CloseCode) { + switch code { case .invalid: - return nil + self = .invalid case .normalClosure: - return nil + self = .normalClosure case .goingAway: - return nil + self = .goingAway case .protocolError: - return .posix(.EPROTO) + self = .protocolError case .unsupportedData: - return .posix(.EBADMSG) + self = .unsupportedData case .noStatusReceived: - return nil + self = .noStatusReceived case .abnormalClosure: - return nil + self = .abnormalClosure case .invalidFramePayloadData: - return nil + self = .invalidFramePayloadData case .policyViolation: - return nil + self = .policyViolation case .messageTooBig: - return .posix(.EMSGSIZE) + self = .messageTooBig case .mandatoryExtensionMissing: - return nil + self = .mandatoryExtensionMissing case .internalServerError: - return nil + self = .internalServerError case .tlsHandshakeFailure: - return .tls(errSSLHandshakeFail) + self = .tlsHandshakeFailure + @unknown default: + self = .unknown + } + } + + var wsCloseCode: URLSessionWebSocketTask.CloseCode? { + switch self { + case .invalid: return .invalid + case .normalClosure: return .normalClosure + case .goingAway: return .goingAway + case .protocolError: return .protocolError + case .unsupportedData: return .unsupportedData + case .noStatusReceived: return .noStatusReceived + case .abnormalClosure: return .abnormalClosure + case .invalidFramePayloadData: return .invalidFramePayloadData + case .policyViolation: return .policyViolation + case .messageTooBig: return .messageTooBig + case .mandatoryExtensionMissing: return .mandatoryExtensionMissing + case .internalServerError: return .internalServerError + case .tlsHandshakeFailure: return .tlsHandshakeFailure + case .cancelled: return nil + case .alreadyClosed: return nil + case .timeout: return nil + case .unknown: return nil } } } diff --git a/Sources/WebSocket/WebSocketCloseResult.swift b/Sources/WebSocket/WebSocketCloseResult.swift deleted file mode 100644 index 048421d..0000000 --- a/Sources/WebSocket/WebSocketCloseResult.swift +++ /dev/null @@ -1,7 +0,0 @@ -import Foundation - -public typealias WebSocketCloseResult = Result<(code: WebSocketCloseCode, reason: Data?), Error> - -internal let normalClosure: WebSocketCloseResult = .success((.normalClosure, nil)) -internal let abnormalClosure: WebSocketCloseResult = .success((.abnormalClosure, nil)) -internal let closureWithError: (Error) -> WebSocketCloseResult = { e in .failure(e) } diff --git a/Sources/WebSocket/WebSocketError.swift b/Sources/WebSocket/WebSocketError.swift index a7a7cc0..2f1f2ea 100644 --- a/Sources/WebSocket/WebSocketError.swift +++ b/Sources/WebSocket/WebSocketError.swift @@ -2,18 +2,26 @@ import Foundation import Network public enum WebSocketError: Error, Equatable { + case closeCodeAndReason(WebSocketCloseCode, Data?) case invalidURL(URL) - case invalidURLComponents(URLComponents) - case openAfterConnectionClosed case sendMessageWhileConnecting - case receiveMessageWhenNotOpen - case receiveUnknownMessageType - case connectionError(NWError) -} -extension Optional where Wrapped == WebSocketError { - var debugDescription: String { - guard case let .some(error) = self else { return "" } - return String(reflecting: error) + init(_ closeCode: WebSocketCloseCode?, _ reason: Data?) { + self = .closeCodeAndReason( + closeCode ?? .unknown, + reason + ) + } + + var closeCode: WebSocketCloseCode? { + guard case let .closeCodeAndReason(code, _) = self + else { return nil } + return code + } + + var reason: Data? { + guard case let .closeCodeAndReason(_, reason) = self + else { return nil } + return reason } } diff --git a/Sources/WebSocket/WebSocketMessage.swift b/Sources/WebSocket/WebSocketMessage.swift index 0cca3e7..9f6d7d2 100644 --- a/Sources/WebSocket/WebSocketMessage.swift +++ b/Sources/WebSocket/WebSocketMessage.swift @@ -2,7 +2,7 @@ import Foundation import Network /// An enumeration of the types of messages that can be sent or received. -public enum WebSocketMessage: CustomStringConvertible, CustomDebugStringConvertible, Hashable { +public enum WebSocketMessage: CustomStringConvertible, Hashable, Sendable { /// A WebSocket message that contains a block of data. case data(Data) @@ -11,10 +11,42 @@ public enum WebSocketMessage: CustomStringConvertible, CustomDebugStringConverti public var description: String { switch self { - case let .data(data): return "\(data.count) bytes" + case let .data(data): return String(decoding: data.prefix(100), as: UTF8.self) case let .text(text): return text } } +} + +public extension WebSocketMessage { + var stringValue: String? { + switch self { + case let .data(data): + return String(data: data, encoding: .utf8) + + case let .text(text): + return text + } + } +} + +extension WebSocketMessage { + init(_ message: URLSessionWebSocketTask.Message) { + switch message { + case let .data(data): + self = .data(data) - public var debugDescription: String { description } + case let .string(text): + self = .text(text) + + @unknown default: + fatalError("Unhandled message: \(message)") + } + } + + var wsMessage: URLSessionWebSocketTask.Message { + switch self { + case let .data(data): return .data(data) + case let .text(text): return .string(text) + } + } } diff --git a/Sources/WebSocket/WebSocketOptions.swift b/Sources/WebSocket/WebSocketOptions.swift index d88d0e2..eb190a7 100644 --- a/Sources/WebSocket/WebSocketOptions.swift +++ b/Sources/WebSocket/WebSocketOptions.swift @@ -1,6 +1,6 @@ import Foundation -public struct WebSocketOptions: Hashable { +public struct WebSocketOptions: Hashable, Sendable { public var maximumMessageSize: Int public var timeoutIntervalForRequest: TimeInterval public var timeoutIntervalForResource: TimeInterval diff --git a/Tests/WebSocketTests/Server/WebSocketServer.swift b/Tests/WebSocketTests/Server/WebSocketServer.swift index 735eb1e..c2bcebb 100644 --- a/Tests/WebSocketTests/Server/WebSocketServer.swift +++ b/Tests/WebSocketTests/Server/WebSocketServer.swift @@ -1,18 +1,18 @@ import Combine import Foundation -import Network +import NIO +import NIOHTTP1 +import NIOSSL +import NIOWebSocket import WebSocket - -enum WebSocketServerError: Error { - case couldNotCreatePort(UInt16) -} +import WebSocketKit enum WebSocketServerOutput: Hashable { - case die case message(WebSocketMessage) + case remoteClose } -private typealias E = WebSocketServerError +private typealias WS = WebSocketKit.WebSocket final class WebSocketServer { let port: UInt16 @@ -26,202 +26,111 @@ final class WebSocketServer { // Publisher the repeats everything sent to it by clients. private let inputSubject = PassthroughSubject() - private var listener: NWListener - private var connections: [NWConnection] = [] - - private let queue = DispatchQueue( - label: "app.shareup.websocketserverqueue", - qos: .default, - autoreleaseFrequency: .workItem, - target: .global() - ) + private let eventLoopGroup: EventLoopGroup + private var channel: Channel? init( port: UInt16, outputPublisher: P, - usesTLS: Bool = false, + usesTLS _: Bool = false, maximumMessageSize: Int = 1024 * 1024 ) throws where P.Output == WebSocketServerOutput, P.Failure == Error { self.port = port self.outputPublisher = outputPublisher.eraseToAnyPublisher() self.maximumMessageSize = maximumMessageSize - let parameters = NWParameters(tls: usesTLS ? .init() : nil) - parameters.allowLocalEndpointReuse = true - parameters.includePeerToPeer = true - parameters.acceptLocalOnly = true - - let options = NWProtocolWebSocket.Options() - options.autoReplyPing = true - options.maximumMessageSize = maximumMessageSize - - parameters.defaultProtocolStack.applicationProtocols.insert(options, at: 0) - - guard let port = NWEndpoint.Port(rawValue: port) - else { throw E.couldNotCreatePort(port) } - - listener = try NWListener(using: parameters, on: port) - - start() - } - - func forceClose() { - queue.sync { - connections.forEach { connection in - connection.forceCancel() - } - connections.removeAll() - listener.cancel() - } - } + eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) - var inputPublisher: AnyPublisher { - inputSubject.eraseToAnyPublisher() + channel = try makeWebSocket( + on: eventLoopGroup, + onUpgrade: onWebSocketUpgrade + ) + .bind(host: "127.0.0.1", port: Int(port)) + .wait() } -} -private extension WebSocketServer { - func start() { - listener.newConnectionHandler = onNewConnection - - listener.stateUpdateHandler = { [weak self] state in - guard let self = self else { return } - switch state { - case .failed: - self.close() - - default: - break + private func makeWebSocket( + on eventLoopGroup: EventLoopGroup, + onUpgrade: @escaping (HTTPRequestHead, WS) -> Void + ) -> ServerBootstrap { + ServerBootstrap(group: eventLoopGroup) + .serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) + .childChannelInitializer { (channel: Channel) in + let ws = NIOWebSocketServerUpgrader( + shouldUpgrade: { channel, _ in + channel.eventLoop.makeSucceededFuture([:]) + }, + upgradePipelineHandler: { channel, req in + WS.server(on: channel) { onUpgrade(req, $0) } + } + ) + + return channel.pipeline.configureHTTPServerPipeline( + withServerUpgrade: ( + upgraders: [ws], + completionHandler: { _ in } + ) + ) } - } - - listener.start(queue: queue) } - func broadcastMessage(_ message: WebSocketMessage) { - let context: NWConnection.ContentContext - let content: Data + private var onWebSocketUpgrade: (HTTPRequestHead, WS) -> Void { + { [weak self] (_: HTTPRequestHead, ws: WS) in + guard let self else { return } - switch message { - case let .data(data): - let metadata: NWProtocolWebSocket.Metadata = .init(opcode: .binary) - context = .init(identifier: String(message.hashValue), metadata: [metadata]) - content = data + let sub = outputPublisher + .sink( + receiveCompletion: { completion in + switch completion { + case .finished: + _ = ws.close(code:) - case let .text(string): - let metadata: NWProtocolWebSocket.Metadata = .init(opcode: .text) - context = .init(identifier: String(message.hashValue), metadata: [metadata]) - content = Data(string.utf8) - } + case .failure: + _ = ws.close(code: .unexpectedServerError) + } + }, + receiveValue: { output in + switch output { + case .remoteClose: + do { try ws.close(code: .goingAway).wait() } + catch {} + + case let .message(message): + switch message { + case let .data(data): + ws.send(raw: data, opcode: .binary) + + case let .text(text): + ws.send(text) + } + } + } + ) - connections.forEach { connection in - connection.send( - content: content, - contentContext: context, - isComplete: true, - completion: .contentProcessed { [weak self] error in - guard let _ = error else { return } - self?.closeConnection(connection) - } - ) - } - } + outputPublisherSubscription = sub - func close() { - connections.forEach { closeConnection($0) } - connections.removeAll() - listener.cancel() - } + ws.onText { [weak self] _, text in + self?.inputSubject.send(.text(text)) + } - func closeConnection(_ connection: NWConnection) { - connection.send( - content: nil, - contentContext: .finalMessage, - isComplete: true, - completion: .contentProcessed { _ in - connection.cancel() + ws.onBinary { [weak self] _, buffer in + guard let self, + let data = buffer.getData( + at: buffer.readerIndex, + length: buffer.readableBytes + ) + else { return } + inputSubject.send(.data(data)) } - ) + } } - func cancelConnection(_ connection: NWConnection) { - connection.forceCancel() - connections.removeAll(where: { $0 === connection }) + func shutDown() { + try? channel?.close(mode: .all).wait() + try? eventLoopGroup.syncShutdownGracefully() } - var onNewConnection: (NWConnection) -> Void { - { [weak self] (newConnection: NWConnection) in - guard let self = self else { return } - - self.connections.append(newConnection) - - func receive() { - newConnection.receiveMessage { [weak self] data, context, _, error in - guard let self = self else { return } - guard error == nil else { return self.closeConnection(newConnection) } - - guard let data = data, - let context = context, - let _metadata = context.protocolMetadata.first, - let metadata = _metadata as? NWProtocolWebSocket.Metadata - else { return } - - switch metadata.opcode { - case .binary: - self.inputSubject.send(.data(data)) - - case .text: - if let text = String(data: data, encoding: .utf8) { - self.inputSubject.send(.text(text)) - } - - default: - break - } - - receive() - } - } - receive() - - newConnection.stateUpdateHandler = { [weak self] state in - guard let self = self else { return } - - switch state { - case .ready: - guard self.outputPublisherSubscription == nil else { break } - self.outputPublisherSubscription = self.outputPublisher - .receive(on: self.queue) - .sink( - receiveCompletion: { [weak self] completion in - guard let self = self else { return } - guard case .failure = completion else { - self.cancelConnection(newConnection) - return - } - self.close() - }, - receiveValue: { [weak self] (output: WebSocketServerOutput) in - guard let self = self else { return } - switch output { - case .die: - self.cancelConnection(newConnection) - - case let .message(message): - self.broadcastMessage(message) - } - } - ) - - case .failed: - self.cancelConnection(newConnection) - - default: - break - } - } - - newConnection.start(queue: self.queue) - } + var inputPublisher: AnyPublisher { + inputSubject.eraseToAnyPublisher() } } diff --git a/Tests/WebSocketTests/SystemWebSocketTests.swift b/Tests/WebSocketTests/SystemWebSocketTests.swift index 44ff10e..14ef82b 100644 --- a/Tests/WebSocketTests/SystemWebSocketTests.swift +++ b/Tests/WebSocketTests/SystemWebSocketTests.swift @@ -1,8 +1,9 @@ import Combine +import Synchronized @testable import WebSocket import XCTest -private var ports = (50000 ... 52000).map { UInt16($0) } +private var ports = (50_000 ... 52_000).map { UInt16($0) } // NOTE: If `WebSocketTests` is not marked as `@MainActor`, calls to // `wait(for:timeout:)` prevent other asyncronous events from running. @@ -22,121 +23,109 @@ class SystemWebSocketTests: XCTestCase { func testCanConnectToAndDisconnectFromServer() async throws { let openEx = expectation(description: "Should have opened") let closeEx = expectation(description: "Should have closed") - let (server, client) = await makeServerAndClient( + let (server, client) = try await makeServerAndClient( onOpen: { openEx.fulfill() }, - onClose: { result in - switch result { - case let .success(close): - XCTAssertEqual(.normalClosure, close.code) - XCTAssertNil(close.reason) - closeEx.fulfill() - - case let .failure(error): - XCTFail("Should not have received error: \(error)") - } + onClose: { close in + XCTAssertEqual(.normalClosure, close.code) + XCTAssertNil(close.reason) + closeEx.fulfill() } ) - defer { server.forceClose() } + defer { server.shutDown() } - wait(for: [openEx], timeout: 2) + try await client.open() + await _fulfillment(of: [openEx], timeout: 2) let isOpen = await client.isOpen XCTAssertTrue(isOpen) try await client.close() - wait(for: [closeEx], timeout: 2) + await _fulfillment(of: [closeEx], timeout: 2) } func testErrorWhenServerIsUnreachable() async throws { let ex = expectation(description: "Should have errored") - let (server, client) = await makeOfflineServerAndClient( + let (server, client) = try await makeOfflineServerAndClient( onOpen: { XCTFail("Should not have opened") }, - onClose: { result in - switch result { - case let .success(close): - XCTFail("Should not have closed successfully: \(String(reflecting: close))") - - case let .failure(error): - guard let webSocketError = error as? WebSocketError, - case let .connectionError(nwerror) = webSocketError, - case let .posix(posix) = nwerror - else { return XCTFail("Closed with incorrect error: \(error)") } - XCTAssertEqual(.ECONNREFUSED, posix) - ex.fulfill() - } + onClose: { close in + XCTAssertEqual(.abnormalClosure, close.code) + XCTAssertNil(close.reason) + ex.fulfill() } ) - defer { server.forceClose() } + defer { server.shutDown() } - waitForExpectations(timeout: 2) + await _fulfillment(of: [ex], timeout: 2) let isClosed = await client.isClosed XCTAssertTrue(isClosed) } - func testErrorWhenRemoteCloses() async throws { + func _testErrorWhenRemoteCloses() async throws { let errorEx = expectation(description: "Should have closed") - let (server, client) = await makeServerAndClient( - onClose: { result in - switch result { - case let .success(close): - XCTFail("Should not have closed successfully: \(String(reflecting: close))") - - case let .failure(error): - guard let err = error as? WebSocketError, - case .receiveUnknownMessageType = err - else { return XCTFail("Should have received unknown message error") } + let (server, client) = try await makeServerAndClient( + onClose: { close in + DispatchQueue.main.async { + XCTAssertTrue( + close.code == .goingAway || close.code == .cancelled + ) errorEx.fulfill() } } ) - defer { server.forceClose() } + defer { server.shutDown() } - try await client.open() + // When running tests repeatedly (i.e., on the order of 1000s of times), + // sometimes the server fails and causes `.open()` to throw. + do { try await client.open() } + catch {} - subject.send(.die) - wait(for: [errorEx], timeout: 2) + subject.send(.remoteClose) + await _fulfillment(of: [errorEx], timeout: 2) } func testWebSocketCannotBeOpenedTwice() async throws { - var closeCount = 0 + let closeCount = Locked(0) let firstCloseEx = expectation(description: "Should have closed once") let secondCloseEx = expectation(description: "Should not have closed more than once") secondCloseEx.isInverted = true - let (server, client) = await makeServerAndClient( + let (server, client) = try await makeServerAndClient( onClose: { _ in - closeCount += 1 - if closeCount == 1 { + let c = closeCount.access { count -> Int in + count += 1 + return count + } + if c == 1 { firstCloseEx.fulfill() } else { secondCloseEx.fulfill() } } ) - defer { server.forceClose() } + defer { server.shutDown() } try await client.open() try await client.close() - wait(for: [firstCloseEx], timeout: 2) + await _fulfillment(of: [firstCloseEx], timeout: 2) do { try await client.open() XCTFail("Should not have successfully reopened") } catch { guard let wserror = error as? WebSocketError, - case .openAfterConnectionClosed = wserror + case .alreadyClosed = wserror.closeCode else { return XCTFail("Received wrong error: \(error)") } } - wait(for: [secondCloseEx], timeout: 0.1) + await _fulfillment(of: [secondCloseEx], timeout: 0.1) } func testPushAndReceiveText() async throws { - let (server, client) = await makeServerAndClient() - defer { server.forceClose() } + let (server, client) = try await makeServerAndClient() + defer { server.shutDown() } let sentEx = expectation(description: "Server should have received message") let sentSub = server.inputPublisher @@ -160,15 +149,15 @@ class SystemWebSocketTests: XCTestCase { defer { receivedSub.cancel() } try await client.send(.text("hello")) - wait(for: [sentEx], timeout: 2) + await _fulfillment(of: [sentEx], timeout: 2) subject.send(.message(.text("hi, to you too!"))) - wait(for: [receivedEx], timeout: 2) + await _fulfillment(of: [receivedEx], timeout: 2) } @available(iOS 15.0, macOS 12.0, *) func testPushAndReceiveTextWithAsyncPublisher() async throws { - let (server, client) = await makeServerAndClient() - defer { server.forceClose() } + let (server, client) = try await makeServerAndClient() + defer { server.shutDown() } try await client.open() @@ -186,8 +175,8 @@ class SystemWebSocketTests: XCTestCase { } func testPushAndReceiveData() async throws { - let (server, client) = await makeServerAndClient() - defer { server.forceClose() } + let (server, client) = try await makeServerAndClient() + defer { server.shutDown() } let sentEx = expectation(description: "Server should have received message") let sentSub = server.inputPublisher @@ -211,15 +200,15 @@ class SystemWebSocketTests: XCTestCase { defer { receivedSub.cancel() } try await client.send(.data(Data("hello".utf8))) - wait(for: [sentEx], timeout: 2) + await _fulfillment(of: [sentEx], timeout: 2) subject.send(.message(.data(Data("hi, to you too!".utf8)))) - wait(for: [receivedEx], timeout: 2) + await _fulfillment(of: [receivedEx], timeout: 2) } @available(iOS 15.0, macOS 12.0, *) func testPushAndReceiveDataWithAsyncPublisher() async throws { - let (server, client) = await makeServerAndClient() - defer { server.forceClose() } + let (server, client) = try await makeServerAndClient() + defer { server.shutDown() } try await client.open() @@ -236,75 +225,131 @@ class SystemWebSocketTests: XCTestCase { } } + @available(iOS 15.0, macOS 12.0, *) + func testPublisherFinishesOnClose() async throws { + let (server, client) = try await makeServerAndClient() + defer { server.shutDown() } + + try await client.open() + + let task = Task.detached { + var count = 1 + repeat { + await self.subject.send(.message(.text(String(count)))) + count += 1 + try await Task.sleep(nanoseconds: 20 * NSEC_PER_MSEC) + } while !Task.isCancelled + } + + var receivedMessages = 0 + for await message in client.values { + guard let _ = message.stringValue else { return XCTFail() } + receivedMessages += 1 + if receivedMessages == 3 { + try await client.close() + } + } + + XCTAssertEqual(3, receivedMessages) + + task.cancel() + } + + @available(iOS 15.0, macOS 12.0, *) + func testPublisherFinishesOnCloseFromServer() async throws { + let (server, client) = try await makeServerAndClient() + defer { server.shutDown() } + + try await client.open() + + let task = Task.detached { + var count = 1 + repeat { + await self.subject.send(.message(.text(String(count)))) + count += 1 + try await Task.sleep(nanoseconds: 20 * NSEC_PER_MSEC) + } while !Task.isCancelled + } + + var receivedMessages = 0 + for await message in client.values { + guard let _ = message.stringValue else { return XCTFail() } + receivedMessages += 1 + if receivedMessages == 3 { + subject.send(.remoteClose) + } + } + + XCTAssertEqual(3, receivedMessages) + + task.cancel() + } + func testWrappedSystemWebSocket() async throws { let openEx = expectation(description: "Should have opened") let closeEx = expectation(description: "Should have closed") - let (server, client) = await makeServerAndWrappedClient( + let (server, client) = try await makeServerAndWrappedClient( onOpen: { openEx.fulfill() }, - onClose: { result in - switch result { - case let .success((code, reason)): - XCTAssertEqual(.normalClosure, code) - XCTAssertNil(reason) - closeEx.fulfill() - case let .failure(error): - XCTFail("Should not have failed: \(error)") - } + onClose: { close in + XCTAssertEqual(.normalClosure, close.code) + XCTAssertNil(close.reason) + closeEx.fulfill() } ) - defer { server.forceClose() } + defer { server.shutDown() } - var messagesToSend: [WebSocketMessage] = [ - .text("one"), - .data(Data("two".utf8)), - .text("three"), + let messagesToSendToServer: [WebSocketMessage] = [ + .text("client: one"), + .data(Data("client: two".utf8)), + .text("client: three"), ] - var messagesToReceive: [WebSocketMessage] = [ - .text("one"), - .data(Data("two".utf8)), - .text("three"), + let messagesToReceiveFromServer: [WebSocketMessage] = [ + .text("server: one"), + .data(Data("server: two".utf8)), + .text("server: three"), ] + var messagesReceivedByServer = 0 let sentSub = server.inputPublisher .sink(receiveValue: { message in - let expected = messagesToSend.removeFirst() - XCTAssertEqual(expected, message) + let i = messagesReceivedByServer + defer { messagesReceivedByServer += 1 } + XCTAssertEqual(messagesToSendToServer[i], message) }) defer { sentSub.cancel() } // These two lines are redundant, but the goal // is to test everything in `WebSocket`. try await client.open() - wait(for: [openEx], timeout: 2) + await _fulfillment(of: [openEx], timeout: 2) - // These messages have to be sent after the `AsyncStream` is - // subscribed to below. So, we send them asynchronously. - let firstMessageToReceive = try XCTUnwrap(messagesToReceive.first) - let firstMessageToSend = try XCTUnwrap(messagesToSend.first) + // This message has to be sent after the `AsyncStream` is + // subscribed to below. + let messageToReceiveFromServer = messagesToReceiveFromServer[0] Task.detached { - await self.subject.send(.message(firstMessageToReceive)) - try await client.send(firstMessageToSend) + await self.subject.send(.message(messageToReceiveFromServer)) } + var messagesReceivedByClient = 0 for await message in client.messages { - let expected = messagesToReceive.removeFirst() - XCTAssertEqual(expected, message) - - if let messageToSend = messagesToSend.first, - let messageToReceive = messagesToReceive.first - { - try await client.send(messageToSend) - subject.send(.message(messageToReceive)) + let i = messagesReceivedByClient + defer { messagesReceivedByClient += 1 } + + XCTAssertEqual(messagesToReceiveFromServer[i], message) + try await client.send(messagesToSendToServer[i]) + + if i < 2 { + subject.send(.message(messagesToReceiveFromServer[i + 1])) } else { try await client.close() } } - XCTAssertTrue(messagesToSend.isEmpty) - XCTAssertTrue(messagesToReceive.isEmpty) + XCTAssertEqual(3, messagesReceivedByClient) + XCTAssertEqual(3, messagesReceivedByServer) - wait(for: [closeEx], timeout: 2) + await _fulfillment(of: [closeEx], timeout: 2) } } @@ -315,16 +360,17 @@ private let empty: Empty = Empty( ) private extension SystemWebSocketTests { - func url(_ port: UInt16) -> URL { URL(string: "ws://0.0.0.0:\(port)/socket")! } + func url(_ port: UInt16) -> URL { URL(string: "ws://127.0.0.1:\(port)/socket")! } func makeServerAndClient( - onOpen: @escaping () -> Void = {}, - onClose: @escaping (WebSocketCloseResult) -> Void = { _ in } - ) async -> (WebSocketServer, SystemWebSocket) { + onOpen: @escaping @Sendable () -> Void = {}, + onClose: @escaping @Sendable (WebSocketClose) -> Void = { _ in } + ) async throws -> (WebSocketServer, SystemWebSocket) { let port = ports.removeFirst() - let server = try! WebSocketServer(port: port, outputPublisher: subject) + let server = try WebSocketServer(port: port, outputPublisher: subject) let client = try! await SystemWebSocket( url: url(port), + options: .init(timeoutIntervalForRequest: 2), onOpen: onOpen, onClose: onClose ) @@ -332,13 +378,14 @@ private extension SystemWebSocketTests { } func makeOfflineServerAndClient( - onOpen: @escaping () -> Void = {}, - onClose: @escaping (WebSocketCloseResult) -> Void = { _ in } - ) async -> (WebSocketServer, SystemWebSocket) { + onOpen: @escaping @Sendable () -> Void = {}, + onClose: @escaping @Sendable (WebSocketClose) -> Void = { _ in } + ) async throws -> (WebSocketServer, SystemWebSocket) { let port = ports.removeFirst() - let server = try! WebSocketServer(port: 1, outputPublisher: empty) + let server = try WebSocketServer(port: 52_001, outputPublisher: empty) let client = try! await SystemWebSocket( url: url(port), + options: .init(timeoutIntervalForRequest: 2), onOpen: onOpen, onClose: onClose ) @@ -346,16 +393,30 @@ private extension SystemWebSocketTests { } func makeServerAndWrappedClient( - onOpen: @escaping () -> Void = {}, - onClose: @escaping (WebSocketCloseResult) -> Void = { _ in } - ) async -> (WebSocketServer, WebSocket) { + onOpen: @escaping @Sendable () -> Void = {}, + onClose: @escaping @Sendable (WebSocketClose) -> Void = { _ in } + ) async throws -> (WebSocketServer, WebSocket) { let port = ports.removeFirst() - let server = try! WebSocketServer(port: port, outputPublisher: subject) + let server = try WebSocketServer(port: port, outputPublisher: subject) let client = try! await SystemWebSocket( url: url(port), + options: .init(timeoutIntervalForRequest: 2), onOpen: onOpen, onClose: onClose ) return (server, try! await .system(client)) } } + +private extension SystemWebSocketTests { + func _fulfillment( + of expectations: [XCTestExpectation], + timeout seconds: TimeInterval + ) async { + #if compiler(>=5.8) + await fulfillment(of: expectations, timeout: seconds) + #else + wait(for: expectations, timeout: seconds) + #endif + } +} diff --git a/Tests/WebSocketTests/URLSessionWebSocketTaskCloseCodeTests.swift b/Tests/WebSocketTests/URLSessionWebSocketTaskCloseCodeTests.swift index 7ea92f7..13d5c48 100644 --- a/Tests/WebSocketTests/URLSessionWebSocketTaskCloseCodeTests.swift +++ b/Tests/WebSocketTests/URLSessionWebSocketTaskCloseCodeTests.swift @@ -18,12 +18,7 @@ class URLSessionWebSocketTaskCloseCodeTests: XCTestCase { ] zip(urlSessionCloseCodes, closeCodes).forEach { urlSessionCloseCode, closeCode in - XCTAssertEqual(urlSessionCloseCode, URLSessionWebSocketTask.CloseCode(closeCode)) + XCTAssertEqual(urlSessionCloseCode, closeCode.wsCloseCode) } } - - func testAllWebSocketCloseCodesHaveCorrespondingURLSessionCloseCodes() throws { - WebSocketCloseCode.allCases - .forEach { XCTAssertNotNil(URLSessionWebSocketTask.CloseCode($0)) } - } } diff --git a/bin/format.sh b/bin/format.sh new file mode 100755 index 0000000..c29f9b3 --- /dev/null +++ b/bin/format.sh @@ -0,0 +1,17 @@ +#!/usr/bin/env bash + +set -e + +SELF=`realpath $0` +DIR=`dirname $SELF` +DEV_DIR=`echo ${DIR%/*}` + +pushd "$DEV_DIR" &>/dev/null + +if command -v swiftformat >/dev/null 2>&1; then + swiftformat --quiet --config .swiftformat . +else + echo "warning: Install swiftformat by running 'brew install swiftformat'" +fi + +popd &>/dev/null