From 1ef2a2681b843619a134ea24ecf9e6788758093d Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Mon, 28 Oct 2024 06:09:36 -0300 Subject: [PATCH 1/7] wip --- Examples/SlackClone/Supabase.swift | 4 +- Sources/Realtime/V2/RealtimeClientV2.swift | 63 +++++++++++++++++++--- 2 files changed, 57 insertions(+), 10 deletions(-) diff --git a/Examples/SlackClone/Supabase.swift b/Examples/SlackClone/Supabase.swift index 2723557e..7ff1c010 100644 --- a/Examples/SlackClone/Supabase.swift +++ b/Examples/SlackClone/Supabase.swift @@ -21,8 +21,8 @@ let decoder: JSONDecoder = { }() let supabase = SupabaseClient( - supabaseURL: URL(string: "http://localhost:54321")!, - supabaseKey: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6ImFub24iLCJleHAiOjE5ODM4MTI5OTZ9.CRXP1A7WOeoJeXxjNni43kdQwgnWNReilDMblYTn_I0", + supabaseURL: URL(string: "https://rkehabxkxxpcbpzsammm.supabase.red")!, + supabaseKey: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6InJrZWhhYnhreHhwY2JwenNhbW1tIiwicm9sZSI6ImFub24iLCJpYXQiOjE3Mjk3NTgzODgsImV4cCI6MjA0NTMzNDM4OH0.rTpPEGk9fMjHXXR49drfyF6IkrNYeL_-yGGDa1JaXTY", options: SupabaseClientOptions( db: .init(encoder: encoder, decoder: decoder), auth: .init(redirectToURL: URL(string: "com.supabase.slack-clone://login-callback")), diff --git a/Sources/Realtime/V2/RealtimeClientV2.swift b/Sources/Realtime/V2/RealtimeClientV2.swift index cd4ead9f..e9c9a8d9 100644 --- a/Sources/Realtime/V2/RealtimeClientV2.swift +++ b/Sources/Realtime/V2/RealtimeClientV2.swift @@ -20,8 +20,13 @@ public final class RealtimeClientV2: Sendable { var accessToken: String? var ref = 0 var pendingHeartbeatRef: Int? + + /// Long-running task that keeps sending heartbeat messages. var heartbeatTask: Task? + + /// Long-running task for listening for incoming messages from WebSocket. var messageTask: Task? + var connectionTask: Task? var channels: [String: RealtimeChannelV2] = [:] var sendBuffer: [@Sendable () async -> Void] = [] @@ -34,13 +39,14 @@ public final class RealtimeClientV2: Sendable { let http: any HTTPClientType let apikey: String? + /// All managed channels indexed by their topics. public var channels: [String: RealtimeChannelV2] { mutableState.channels } private let statusEventEmitter = EventEmitter(initialEvent: .disconnected) - /// AsyncStream that emits when connection status change. + /// Listen for connection status changes. /// /// You can also use ``onStatusChange(_:)`` for a closure based method. public var statusChange: AsyncStream { @@ -198,6 +204,13 @@ public final class RealtimeClientV2: Sendable { await connect(reconnect: true) } + /// Creates a new channel and bind it to this client. + /// - Parameters: + /// - topic: Channel's topic. + /// - options: Configuration options for the channel. + /// - Returns: Channel instance. + /// + /// - Note: This method doesn't subscribe to the channel, call ``RealtimeChannelV2/subscribe()`` on the returned channel instance. public func channel( _ topic: String, options: @Sendable (inout RealtimeChannelConfig) -> Void = { _ in } @@ -223,6 +236,9 @@ public final class RealtimeClientV2: Sendable { } } + /// Unsubscribe and removes channel. + /// + /// If there is no channel left, client is disconnected. public func removeChannel(_ channel: RealtimeChannelV2) async { if channel.status == .subscribed { await channel.unsubscribe() @@ -238,6 +254,7 @@ public final class RealtimeClientV2: Sendable { } } + /// Unsubscribes and removes all channels. public func removeAllChannels() async { await withTaskGroup(of: Void.self) { group in for channel in channels.values { @@ -327,6 +344,7 @@ public final class RealtimeClientV2: Sendable { } } + /// Disconnects client. public func disconnect() { options.logger?.debug("Closing WebSocket connection") mutableState.withValue { @@ -388,13 +406,14 @@ public final class RealtimeClientV2: Sendable { try Task.checkCancellation() try await self?.ws.send(message) } catch { - self?.options.logger?.error(""" - Failed to send message: - \(message) - - Error: - \(error) - """) + self?.options.logger?.error( + """ + Failed to send message: + \(message) + + Error: + \(error) + """) } } @@ -470,3 +489,31 @@ public final class RealtimeClientV2: Sendable { url.appendingPathComponent("api/broadcast") } } + +import Network + +final class NetworkMonitor: @unchecked Sendable { + static let shared = NetworkMonitor() + + private let monitor: NWPathMonitor + private let queue = DispatchQueue(label: "NetworkMonitor") + + private(set) var isConnected: Bool = false + + private init() { + monitor = NWPathMonitor() + } + + func start(_ onChange: (@Sendable () -> Void)? = nil) { + monitor.pathUpdateHandler = { [weak self] path in + self?.isConnected = path.status != .unsatisfied + onChange?() + } + + monitor.start(queue: queue) + } + + func stop() { + monitor.cancel() + } +} From 8d18e0265386324ebff2e843974b25e92227c4fa Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Wed, 30 Oct 2024 11:09:32 -0300 Subject: [PATCH 2/7] fix(realtime): losing postgres_changes on resubscribe --- Examples/SlackClone/AppView.swift | 6 +- Examples/SlackClone/Supabase.swift | 5 +- Sources/Realtime/V2/RealtimeChannelV2.swift | 5 -- Sources/Realtime/V2/RealtimeClientV2.swift | 10 ++- Sources/Realtime/V2/WebSocketClient.swift | 64 +++++++++++---- Sources/Realtime/V2/WebSocketConnection.swift | 77 ------------------- 6 files changed, 63 insertions(+), 104 deletions(-) delete mode 100644 Sources/Realtime/V2/WebSocketConnection.swift diff --git a/Examples/SlackClone/AppView.swift b/Examples/SlackClone/AppView.swift index e15aaed5..61193a2a 100644 --- a/Examples/SlackClone/AppView.swift +++ b/Examples/SlackClone/AppView.swift @@ -15,13 +15,15 @@ final class AppViewModel { var session: Session? var selectedChannel: Channel? - var realtimeConnectionStatus: RealtimeClientV2.Status? + var realtimeConnectionStatus: RealtimeClientStatus? init() { Task { for await (event, session) in supabase.auth.authStateChanges { Logger.main.debug("AuthStateChange: \(event.rawValue)") - guard [.signedIn, .signedOut, .initialSession, .tokenRefreshed].contains(event) else { return } + guard [.signedIn, .signedOut, .initialSession, .tokenRefreshed].contains(event) else { + return + } self.session = session if session == nil { diff --git a/Examples/SlackClone/Supabase.swift b/Examples/SlackClone/Supabase.swift index 7ff1c010..e57513e4 100644 --- a/Examples/SlackClone/Supabase.swift +++ b/Examples/SlackClone/Supabase.swift @@ -21,8 +21,9 @@ let decoder: JSONDecoder = { }() let supabase = SupabaseClient( - supabaseURL: URL(string: "https://rkehabxkxxpcbpzsammm.supabase.red")!, - supabaseKey: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6InJrZWhhYnhreHhwY2JwenNhbW1tIiwicm9sZSI6ImFub24iLCJpYXQiOjE3Mjk3NTgzODgsImV4cCI6MjA0NTMzNDM4OH0.rTpPEGk9fMjHXXR49drfyF6IkrNYeL_-yGGDa1JaXTY", + supabaseURL: URL(string: "http://127.0.0.1:54321")!, + supabaseKey: + "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6ImFub24iLCJleHAiOjE5ODM4MTI5OTZ9.CRXP1A7WOeoJeXxjNni43kdQwgnWNReilDMblYTn_I0", options: SupabaseClientOptions( db: .init(encoder: encoder, decoder: decoder), auth: .init(redirectToURL: URL(string: "com.supabase.slack-clone://login-callback")), diff --git a/Sources/Realtime/V2/RealtimeChannelV2.swift b/Sources/Realtime/V2/RealtimeChannelV2.swift index 8facb88c..3a4f136e 100644 --- a/Sources/Realtime/V2/RealtimeChannelV2.swift +++ b/Sources/Realtime/V2/RealtimeChannelV2.swift @@ -130,11 +130,6 @@ public final class RealtimeChannelV2: Sendable { await socket.connect() } - guard status != .subscribed else { - logger?.warning("Channel \(topic) is already subscribed") - return - } - socket.addChannel(self) status = .subscribing diff --git a/Sources/Realtime/V2/RealtimeClientV2.swift b/Sources/Realtime/V2/RealtimeClientV2.swift index e9c9a8d9..4911a15c 100644 --- a/Sources/Realtime/V2/RealtimeClientV2.swift +++ b/Sources/Realtime/V2/RealtimeClientV2.swift @@ -8,6 +8,7 @@ import ConcurrencyExtras import Foundation import Helpers +import Network #if canImport(FoundationNetworking) import FoundationNetworking @@ -345,7 +346,10 @@ public final class RealtimeClientV2: Sendable { } /// Disconnects client. - public func disconnect() { + /// - Parameters: + /// - code: A numeric status code to send on disconnect. + /// - reason: A custom reason for the disconnect. + public func disconnect(code: Int? = nil, reason: String? = nil) { options.logger?.debug("Closing WebSocket connection") mutableState.withValue { $0.ref = 0 @@ -353,7 +357,7 @@ public final class RealtimeClientV2: Sendable { $0.heartbeatTask?.cancel() $0.connectionTask?.cancel() } - ws.disconnect() + ws.disconnect(code: code, reason: reason) status = .disconnected } @@ -490,8 +494,6 @@ public final class RealtimeClientV2: Sendable { } } -import Network - final class NetworkMonitor: @unchecked Sendable { static let shared = NetworkMonitor() diff --git a/Sources/Realtime/V2/WebSocketClient.swift b/Sources/Realtime/V2/WebSocketClient.swift index b977a699..0634f774 100644 --- a/Sources/Realtime/V2/WebSocketClient.swift +++ b/Sources/Realtime/V2/WebSocketClient.swift @@ -13,6 +13,10 @@ import Helpers import FoundationNetworking #endif +enum WebSocketClientError: Error { + case unsupportedData +} + enum ConnectionStatus { case connected case disconnected(reason: String, code: URLSessionWebSocketTask.CloseCode) @@ -23,7 +27,7 @@ protocol WebSocketClient: Sendable { func send(_ message: RealtimeMessageV2) async throws func receive() -> AsyncThrowingStream func connect() -> AsyncStream - func disconnect() + func disconnect(code: Int?, reason: String?) } final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @unchecked Sendable { @@ -33,7 +37,7 @@ final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @ struct MutableState { var continuation: AsyncStream.Continuation? - var connection: WebSocketConnection? + var task: URLSessionWebSocketTask? } private let mutableState = LockIsolated(MutableState()) @@ -47,11 +51,15 @@ final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @ logger = options.logger } + deinit { + mutableState.task?.cancel(with: .goingAway, reason: nil) + } + func connect() -> AsyncStream { mutableState.withValue { state in let session = URLSession(configuration: configuration, delegate: self, delegateQueue: nil) let task = session.webSocketTask(with: realtimeURL) - state.connection = WebSocketConnection(task: task) + state.task = task task.resume() let (stream, continuation) = AsyncStream.makeStream() @@ -60,27 +68,55 @@ final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @ } } - func disconnect() { + func disconnect(code: Int?, reason: String?) { mutableState.withValue { state in - state.connection?.close() + if let code { + state.task?.cancel( + with: URLSessionWebSocketTask.CloseCode(rawValue: code) ?? .invalid, + reason: reason?.data(using: .utf8)) + } else { + state.task?.cancel() + } } } func receive() -> AsyncThrowingStream { - guard let connection = mutableState.connection else { - return .finished( - throwing: RealtimeError( - "receive() called before connect(). Make sure to call `connect()` before calling `receive()`." - ) - ) + AsyncThrowingStream { [weak self] in + guard let self else { return nil } + + let task = mutableState.task + + guard + let message = try await task?.receive(), + !Task.isCancelled + else { return nil } + + switch message { + case .data(let data): + let message = try JSONDecoder().decode(RealtimeMessageV2.self, from: data) + return message + + case .string(let string): + guard let data = string.data(using: .utf8) else { + throw WebSocketClientError.unsupportedData + } + + let message = try JSONDecoder().decode(RealtimeMessageV2.self, from: data) + return message + + @unknown default: + assertionFailure("Unsupported message type.") + task?.cancel(with: .unsupportedData, reason: nil) + throw WebSocketClientError.unsupportedData + } } - - return connection.receive() } func send(_ message: RealtimeMessageV2) async throws { logger?.verbose("Sending message: \(message)") - try await mutableState.connection?.send(message) + + let data = try JSONEncoder().encode(message) + try await mutableState.task?.send(.data(data)) } // MARK: - URLSessionWebSocketDelegate diff --git a/Sources/Realtime/V2/WebSocketConnection.swift b/Sources/Realtime/V2/WebSocketConnection.swift deleted file mode 100644 index 8e30ac53..00000000 --- a/Sources/Realtime/V2/WebSocketConnection.swift +++ /dev/null @@ -1,77 +0,0 @@ -// -// WebSocketConnection.swift -// -// -// Created by Guilherme Souza on 29/03/24. -// - -import Foundation - -#if canImport(FoundationNetworking) - import FoundationNetworking -#endif - -enum WebSocketConnectionError: Error { - case unsupportedData -} - -final class WebSocketConnection: Sendable { - private let task: URLSessionWebSocketTask - private let encoder: JSONEncoder - private let decoder: JSONDecoder - - init( - task: URLSessionWebSocketTask, - encoder: JSONEncoder = JSONEncoder(), - decoder: JSONDecoder = JSONDecoder() - ) { - self.task = task - self.encoder = encoder - self.decoder = decoder - - task.resume() - } - - deinit { - task.cancel(with: .goingAway, reason: nil) - } - - func receiveOnce() async throws -> Incoming { - switch try await task.receive() { - case let .data(data): - let message = try decoder.decode(Incoming.self, from: data) - return message - - case let .string(string): - guard let data = string.data(using: .utf8) else { - throw WebSocketConnectionError.unsupportedData - } - - let message = try decoder.decode(Incoming.self, from: data) - return message - - @unknown default: - assertionFailure("Unsupported message type.") - task.cancel(with: .unsupportedData, reason: nil) - throw WebSocketConnectionError.unsupportedData - } - } - - func send(_ message: Outgoing) async throws { - let data = try encoder.encode(message) - try await task.send(.data(data)) - } - - func receive() -> AsyncThrowingStream { - AsyncThrowingStream { [weak self] in - guard let self else { return nil } - - let message = try await receiveOnce() - return Task.isCancelled ? nil : message - } - } - - func close() { - task.cancel(with: .normalClosure, reason: nil) - } -} From fb9829f6deac5c295685386951e708d2664935c8 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Wed, 30 Oct 2024 11:18:22 -0300 Subject: [PATCH 3/7] remove unused import --- Sources/Realtime/V2/RealtimeClientV2.swift | 27 ---------------------- 1 file changed, 27 deletions(-) diff --git a/Sources/Realtime/V2/RealtimeClientV2.swift b/Sources/Realtime/V2/RealtimeClientV2.swift index 4911a15c..5c072c4b 100644 --- a/Sources/Realtime/V2/RealtimeClientV2.swift +++ b/Sources/Realtime/V2/RealtimeClientV2.swift @@ -8,7 +8,6 @@ import ConcurrencyExtras import Foundation import Helpers -import Network #if canImport(FoundationNetworking) import FoundationNetworking @@ -493,29 +492,3 @@ public final class RealtimeClientV2: Sendable { url.appendingPathComponent("api/broadcast") } } - -final class NetworkMonitor: @unchecked Sendable { - static let shared = NetworkMonitor() - - private let monitor: NWPathMonitor - private let queue = DispatchQueue(label: "NetworkMonitor") - - private(set) var isConnected: Bool = false - - private init() { - monitor = NWPathMonitor() - } - - func start(_ onChange: (@Sendable () -> Void)? = nil) { - monitor.pathUpdateHandler = { [weak self] path in - self?.isConnected = path.status != .unsatisfied - onChange?() - } - - monitor.start(queue: queue) - } - - func stop() { - monitor.cancel() - } -} From 7c0a19725c9a5c64edf89576ac4a3ba211152432 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Wed, 30 Oct 2024 11:57:39 -0300 Subject: [PATCH 4/7] fix tests --- Tests/RealtimeTests/MockWebSocketClient.swift | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Tests/RealtimeTests/MockWebSocketClient.swift b/Tests/RealtimeTests/MockWebSocketClient.swift index d6e3fc51..bcabc958 100644 --- a/Tests/RealtimeTests/MockWebSocketClient.swift +++ b/Tests/RealtimeTests/MockWebSocketClient.swift @@ -93,5 +93,6 @@ final class MockWebSocketClient: WebSocketClient { return stream } - func disconnect() {} + func disconnect(code: Int?, reason: String?) { + } } From 3b61ae21634151f8443472fecd38160ac46734a9 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Thu, 31 Oct 2024 10:48:58 -0300 Subject: [PATCH 5/7] fix parsing of tokenExpired event type --- Package.swift | 1 + Sources/Realtime/V2/CallbackManager.swift | 11 +++--- Sources/Realtime/V2/PushV2.swift | 38 ++++++++++++--------- Sources/Realtime/V2/RealtimeChannelV2.swift | 36 ++++++++++--------- Sources/Realtime/V2/RealtimeMessageV2.swift | 7 ---- 5 files changed, 47 insertions(+), 46 deletions(-) diff --git a/Package.swift b/Package.swift index 1e8278e6..86d770cd 100644 --- a/Package.swift +++ b/Package.swift @@ -120,6 +120,7 @@ let package = Package( name: "Realtime", dependencies: [ .product(name: "ConcurrencyExtras", package: "swift-concurrency-extras"), + .product(name: "IssueReporting", package: "xctest-dynamic-overlay"), "Helpers", ] ), diff --git a/Sources/Realtime/V2/CallbackManager.swift b/Sources/Realtime/V2/CallbackManager.swift index 1723b9f4..3d92e184 100644 --- a/Sources/Realtime/V2/CallbackManager.swift +++ b/Sources/Realtime/V2/CallbackManager.swift @@ -1,10 +1,3 @@ -// -// CallbackManager.swift -// -// -// Created by Guilherme Souza on 24/12/23. -// - import ConcurrencyExtras import Foundation import Helpers @@ -26,6 +19,10 @@ final class CallbackManager: Sendable { mutableState.callbacks } + deinit { + reset() + } + @discardableResult func addBroadcastCallback( event: String, diff --git a/Sources/Realtime/V2/PushV2.swift b/Sources/Realtime/V2/PushV2.swift index 77e8f1be..199e6b74 100644 --- a/Sources/Realtime/V2/PushV2.swift +++ b/Sources/Realtime/V2/PushV2.swift @@ -27,25 +27,31 @@ actor PushV2 { } func send() async -> PushStatus { - await channel?.socket.push(message) - - if channel?.config.broadcast.acknowledgeBroadcasts == true { - do { - return try await withTimeout(interval: channel?.socket.options().timeoutInterval ?? 10) { - await withCheckedContinuation { - self.receivedContinuation = $0 - } + guard let channel = channel else { + return .error + } + + await channel.socket.push(message) + + if !channel.config.broadcast.acknowledgeBroadcasts { + // channel was configured with `ack = false`, + // don't wait for a response and return `ok`. + return .ok + } + + do { + return try await withTimeout(interval: channel.socket.options().timeoutInterval) { + await withCheckedContinuation { continuation in + self.receivedContinuation = continuation } - } catch is TimeoutError { - channel?.logger?.debug("Push timed out.") - return .timeout - } catch { - channel?.logger?.error("Error sending push: \(error)") - return .error } + } catch is TimeoutError { + channel.logger?.debug("Push timed out.") + return .timeout + } catch { + channel.logger?.error("Error sending push: \(error.localizedDescription)") + return .error } - - return .ok } func didReceive(status: PushStatus) { diff --git a/Sources/Realtime/V2/RealtimeChannelV2.swift b/Sources/Realtime/V2/RealtimeChannelV2.swift index 3a4f136e..e9a72e69 100644 --- a/Sources/Realtime/V2/RealtimeChannelV2.swift +++ b/Sources/Realtime/V2/RealtimeChannelV2.swift @@ -1,14 +1,8 @@ -// -// RealtimeChannelV2.swift -// -// -// Created by Guilherme Souza on 26/12/23. -// - import ConcurrencyExtras import Foundation import HTTPTypes import Helpers +import IssueReporting #if canImport(FoundationNetworking) import FoundationNetworking @@ -123,9 +117,10 @@ public final class RealtimeChannelV2: Sendable { public func subscribe() async { if socket.status() != .connected { if socket.options().connectOnSubscribe != true { - fatalError( + reportIssue( "You can't subscribe to a channel while the realtime client is not connected. Did you forget to call `realtime.connect()`?" ) + return } await socket.connect() } @@ -261,15 +256,21 @@ public final class RealtimeChannelV2: Sendable { } } + /// Tracks the given state in the channel. + /// - Parameter state: The state to be tracked, conforming to `Codable`. + /// - Throws: An error if the tracking fails. public func track(_ state: some Codable) async throws { try await track(state: JSONObject(state)) } + /// Tracks the given state in the channel. + /// - Parameter state: The state to be tracked as a `JSONObject`. public func track(state: JSONObject) async { - assert( - status == .subscribed, - "You can only track your presence after subscribing to the channel. Did you forget to call `channel.subscribe()`?" - ) + if status != .subscribed { + reportIssue( + "You can only track your presence after subscribing to the channel. Did you forget to call `channel.subscribe()`?" + ) + } await push( ChannelEvent.presence, @@ -281,6 +282,7 @@ public final class RealtimeChannelV2: Sendable { ) } + /// Stops tracking the current state in the channel. public func untrack() async { await push( ChannelEvent.presence, @@ -515,10 +517,12 @@ public final class RealtimeChannelV2: Sendable { filter: String?, callback: @escaping @Sendable (AnyAction) -> Void ) -> RealtimeSubscription { - precondition( - status != .subscribed, - "You cannot call postgresChange after joining the channel" - ) + guard status == .subscribed else { + reportIssue( + "You cannot call postgresChange after joining the channel, this won't work as expected." + ) + return RealtimeSubscription {} + } let config = PostgresJoinConfig( event: event, diff --git a/Sources/Realtime/V2/RealtimeMessageV2.swift b/Sources/Realtime/V2/RealtimeMessageV2.swift index d288aece..200498cd 100644 --- a/Sources/Realtime/V2/RealtimeMessageV2.swift +++ b/Sources/Realtime/V2/RealtimeMessageV2.swift @@ -1,10 +1,3 @@ -// -// RealtimeMessageV2.swift -// -// -// Created by Guilherme Souza on 11/01/24. -// - import Foundation import Helpers From c306c8695c84bada05c79acf50da1f4e3752f0cc Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Thu, 31 Oct 2024 11:13:11 -0300 Subject: [PATCH 6/7] fix tests --- Sources/Realtime/V2/RealtimeChannelV2.swift | 2 +- Tests/RealtimeTests/RealtimeTests.swift | 244 +++++++++++--------- 2 files changed, 137 insertions(+), 109 deletions(-) diff --git a/Sources/Realtime/V2/RealtimeChannelV2.swift b/Sources/Realtime/V2/RealtimeChannelV2.swift index e9a72e69..5e52455c 100644 --- a/Sources/Realtime/V2/RealtimeChannelV2.swift +++ b/Sources/Realtime/V2/RealtimeChannelV2.swift @@ -517,7 +517,7 @@ public final class RealtimeChannelV2: Sendable { filter: String?, callback: @escaping @Sendable (AnyAction) -> Void ) -> RealtimeSubscription { - guard status == .subscribed else { + guard status != .subscribed else { reportIssue( "You cannot call postgresChange after joining the channel, this won't work as expected." ) diff --git a/Tests/RealtimeTests/RealtimeTests.swift b/Tests/RealtimeTests/RealtimeTests.swift index e8ff0570..95ae970c 100644 --- a/Tests/RealtimeTests/RealtimeTests.swift +++ b/Tests/RealtimeTests/RealtimeTests.swift @@ -1,10 +1,12 @@ import ConcurrencyExtras import CustomDump import Helpers -@testable import Realtime +import InlineSnapshotTesting import TestHelpers import XCTest +@testable import Realtime + #if canImport(FoundationNetworking) import FoundationNetworking #endif @@ -91,10 +93,48 @@ final class RealtimeTests: XCTestCase { ws.mockReceive(.messagesSubscribed) await channel.subscribe() - expectNoDifference( - ws.sentMessages, - [.subscribeToMessages(ref: "1", joinRef: "1")] - ) + assertInlineSnapshot(of: ws.sentMessages, as: .json) { + """ + [ + { + "event" : "phx_join", + "join_ref" : "1", + "payload" : { + "access_token" : "anon.api.key", + "config" : { + "broadcast" : { + "ack" : false, + "self" : false + }, + "postgres_changes" : [ + { + "event" : "INSERT", + "schema" : "public", + "table" : "messages" + }, + { + "event" : "UPDATE", + "schema" : "public", + "table" : "messages" + }, + { + "event" : "DELETE", + "schema" : "public", + "table" : "messages" + } + ], + "presence" : { + "key" : "" + }, + "private" : false + } + }, + "ref" : "1", + "topic" : "realtime:public:messages" + } + ] + """ + } } func testSubscribeTimeout() async throws { @@ -132,39 +172,72 @@ final class RealtimeTests: XCTestCase { try? await Task.sleep(nanoseconds: NSEC_PER_SEC * 2) - let joinSentMessages = ws.sentMessages.filter { $0.event == "phx_join" } - - let expectedMessages = try [ - RealtimeMessageV2( - joinRef: "1", - ref: "1", - topic: "realtime:public:messages", - event: "phx_join", - payload: JSONObject( - RealtimeJoinPayload( - config: RealtimeJoinConfig(), - accessToken: apiKey - ) - ) - ), - RealtimeMessageV2( - joinRef: "2", - ref: "2", - topic: "realtime:public:messages", - event: "phx_join", - payload: JSONObject( - RealtimeJoinPayload( - config: RealtimeJoinConfig(), - accessToken: apiKey - ) - ) - ), - ] - - expectNoDifference( - joinSentMessages, - expectedMessages - ) + assertInlineSnapshot(of: ws.sentMessages, as: .json) { + """ + [ + { + "event" : "phx_join", + "join_ref" : "1", + "payload" : { + "access_token" : "anon.api.key", + "config" : { + "broadcast" : { + "ack" : false, + "self" : false + }, + "postgres_changes" : [ + + ], + "presence" : { + "key" : "" + }, + "private" : false + } + }, + "ref" : "1", + "topic" : "realtime:public:messages" + }, + { + "event" : "heartbeat", + "payload" : { + + }, + "ref" : "2", + "topic" : "phoenix" + }, + { + "event" : "phx_join", + "join_ref" : "2", + "payload" : { + "access_token" : "anon.api.key", + "config" : { + "broadcast" : { + "ack" : false, + "self" : false + }, + "postgres_changes" : [ + + ], + "presence" : { + "key" : "" + }, + "private" : false + } + }, + "ref" : "2", + "topic" : "realtime:public:messages" + }, + { + "event" : "heartbeat", + "payload" : { + + }, + "ref" : "3", + "topic" : "phoenix" + } + ] + """ + } } func testHeartbeat() async throws { @@ -262,30 +335,27 @@ final class RealtimeTests: XCTestCase { try await channel.broadcast(event: "test", message: ["value": 42]) let request = await http.receivedRequests.last - expectNoDifference( - request?.headers, - [ - .contentType: "application/json", - .apiKey: "anon.api.key", - .authorization: "Bearer anon.api.key", - ] - ) - - let body = try XCTUnwrap(request?.body) - let json = try JSONDecoder().decode(JSONObject.self, from: body) - expectNoDifference( - json, - [ - "messages": [ - [ - "topic": "realtime:public:messages", - "event": "test", - "payload": ["value": 42], - "private": false, - ], - ], - ] - ) + assertInlineSnapshot(of: request?.urlRequest, as: .raw(pretty: true)) { + """ + POST https://localhost:54321/realtime/v1/api/broadcast + Authorization: Bearer anon.api.key + Content-Type: application/json + apiKey: anon.api.key + + { + "messages" : [ + { + "event" : "test", + "payload" : { + "value" : 42 + }, + "private" : false, + "topic" : "realtime:public:messages" + } + ] + } + """ + } } private func connectSocketAndWait() async { @@ -295,31 +365,6 @@ final class RealtimeTests: XCTestCase { } extension RealtimeMessageV2 { - static func subscribeToMessages(ref: String?, joinRef: String?) -> RealtimeMessageV2 { - Self( - joinRef: joinRef, - ref: ref, - topic: "realtime:public:messages", - event: "phx_join", - payload: [ - "access_token": "anon.api.key", - "config": [ - "broadcast": [ - "self": false, - "ack": false, - ], - "postgres_changes": [ - ["table": "messages", "event": "INSERT", "schema": "public"], - ["table": "messages", "schema": "public", "event": "UPDATE"], - ["schema": "public", "table": "messages", "event": "DELETE"], - ], - "presence": ["key": ""], - "private": false, - ], - ] - ) - } - static let messagesSubscribed = Self( joinRef: nil, ref: "2", @@ -328,29 +373,12 @@ extension RealtimeMessageV2 { payload: [ "response": [ "postgres_changes": [ - ["id": 43783255, "event": "INSERT", "schema": "public", "table": "messages"], - ["id": 124973000, "event": "UPDATE", "schema": "public", "table": "messages"], - ["id": 85243397, "event": "DELETE", "schema": "public", "table": "messages"], - ], + ["id": 43_783_255, "event": "INSERT", "schema": "public", "table": "messages"], + ["id": 124_973_000, "event": "UPDATE", "schema": "public", "table": "messages"], + ["id": 85_243_397, "event": "DELETE", "schema": "public", "table": "messages"], + ] ], "status": "ok", ] ) - - static let heartbeatResponse = Self( - joinRef: nil, - ref: "1", - topic: "phoenix", - event: "phx_reply", - payload: [ - "response": [:], - "status": "ok", - ] - ) -} - -struct TestLogger: SupabaseLogger { - func log(message: SupabaseLogMessage) { - print(message.description) - } } From 676711816db2813f32d956ece21b312a26bf0574 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Thu, 31 Oct 2024 14:02:42 -0300 Subject: [PATCH 7/7] fix tests --- Tests/RealtimeTests/RealtimeTests.swift | 18 +----------------- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/Tests/RealtimeTests/RealtimeTests.swift b/Tests/RealtimeTests/RealtimeTests.swift index 95ae970c..35a318cc 100644 --- a/Tests/RealtimeTests/RealtimeTests.swift +++ b/Tests/RealtimeTests/RealtimeTests.swift @@ -172,7 +172,7 @@ final class RealtimeTests: XCTestCase { try? await Task.sleep(nanoseconds: NSEC_PER_SEC * 2) - assertInlineSnapshot(of: ws.sentMessages, as: .json) { + assertInlineSnapshot(of: ws.sentMessages.filter { $0.event == "phx_join" }, as: .json) { """ [ { @@ -197,14 +197,6 @@ final class RealtimeTests: XCTestCase { "ref" : "1", "topic" : "realtime:public:messages" }, - { - "event" : "heartbeat", - "payload" : { - - }, - "ref" : "2", - "topic" : "phoenix" - }, { "event" : "phx_join", "join_ref" : "2", @@ -226,14 +218,6 @@ final class RealtimeTests: XCTestCase { }, "ref" : "2", "topic" : "realtime:public:messages" - }, - { - "event" : "heartbeat", - "payload" : { - - }, - "ref" : "3", - "topic" : "phoenix" } ] """