From e6f20b2889b47c3ba1504d3f582b57a934df6961 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Wed, 30 Oct 2024 11:09:32 -0300 Subject: [PATCH] fix(realtime): losing postgres_changes on resubscribe --- Examples/SlackClone/AppView.swift | 6 +- Examples/SlackClone/Supabase.swift | 5 +- Sources/Realtime/V2/RealtimeChannelV2.swift | 18 ++--- Sources/Realtime/V2/RealtimeClientV2.swift | 10 ++- Sources/Realtime/V2/WebSocketClient.swift | 64 +++++++++++---- Sources/Realtime/V2/WebSocketConnection.swift | 77 ------------------- 6 files changed, 71 insertions(+), 109 deletions(-) delete mode 100644 Sources/Realtime/V2/WebSocketConnection.swift diff --git a/Examples/SlackClone/AppView.swift b/Examples/SlackClone/AppView.swift index e15aaed5..61193a2a 100644 --- a/Examples/SlackClone/AppView.swift +++ b/Examples/SlackClone/AppView.swift @@ -15,13 +15,15 @@ final class AppViewModel { var session: Session? var selectedChannel: Channel? - var realtimeConnectionStatus: RealtimeClientV2.Status? + var realtimeConnectionStatus: RealtimeClientStatus? init() { Task { for await (event, session) in supabase.auth.authStateChanges { Logger.main.debug("AuthStateChange: \(event.rawValue)") - guard [.signedIn, .signedOut, .initialSession, .tokenRefreshed].contains(event) else { return } + guard [.signedIn, .signedOut, .initialSession, .tokenRefreshed].contains(event) else { + return + } self.session = session if session == nil { diff --git a/Examples/SlackClone/Supabase.swift b/Examples/SlackClone/Supabase.swift index 7ff1c010..e57513e4 100644 --- a/Examples/SlackClone/Supabase.swift +++ b/Examples/SlackClone/Supabase.swift @@ -21,8 +21,9 @@ let decoder: JSONDecoder = { }() let supabase = SupabaseClient( - supabaseURL: URL(string: "https://rkehabxkxxpcbpzsammm.supabase.red")!, - supabaseKey: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6InJrZWhhYnhreHhwY2JwenNhbW1tIiwicm9sZSI6ImFub24iLCJpYXQiOjE3Mjk3NTgzODgsImV4cCI6MjA0NTMzNDM4OH0.rTpPEGk9fMjHXXR49drfyF6IkrNYeL_-yGGDa1JaXTY", + supabaseURL: URL(string: "http://127.0.0.1:54321")!, + supabaseKey: + "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6ImFub24iLCJleHAiOjE5ODM4MTI5OTZ9.CRXP1A7WOeoJeXxjNni43kdQwgnWNReilDMblYTn_I0", options: SupabaseClientOptions( db: .init(encoder: encoder, decoder: decoder), auth: .init(redirectToURL: URL(string: "com.supabase.slack-clone://login-callback")), diff --git a/Sources/Realtime/V2/RealtimeChannelV2.swift b/Sources/Realtime/V2/RealtimeChannelV2.swift index f29b144f..dc0a5003 100644 --- a/Sources/Realtime/V2/RealtimeChannelV2.swift +++ b/Sources/Realtime/V2/RealtimeChannelV2.swift @@ -7,8 +7,8 @@ import ConcurrencyExtras import Foundation -import Helpers import HTTPTypes +import Helpers #if canImport(FoundationNetworking) import FoundationNetworking @@ -59,7 +59,9 @@ extension Socket { addChannel: { [weak client] in client?.addChannel($0) }, removeChannel: { [weak client] in await client?.removeChannel($0) }, push: { [weak client] in await client?.push($0) }, - httpSend: { [weak client] in try await client?.http.send($0) ?? .init(data: Data(), response: HTTPURLResponse()) } + httpSend: { [weak client] in + try await client?.http.send($0) ?? .init(data: Data(), response: HTTPURLResponse()) + } ) } } @@ -128,11 +130,6 @@ public final class RealtimeChannelV2: Sendable { await socket.connect() } - guard status != .subscribed else { - logger?.warning("Channel \(topic) is already subscribed") - return - } - socket.addChannel(self) status = .subscribing @@ -185,7 +182,8 @@ public final class RealtimeChannelV2: Sendable { @available( *, deprecated, - message: "manually updating auth token per channel is not recommended, please use `setAuth` in RealtimeClient instead." + message: + "manually updating auth token per channel is not recommended, please use `setAuth` in RealtimeClient instead." ) public func updateAuth(jwt: String?) async { logger?.debug("Updating auth token for channel \(topic)") @@ -238,8 +236,8 @@ public final class RealtimeChannelV2: Sendable { event: event, payload: message, private: config.isPrivate - ), - ], + ) + ] ] ) ) diff --git a/Sources/Realtime/V2/RealtimeClientV2.swift b/Sources/Realtime/V2/RealtimeClientV2.swift index e9c9a8d9..4911a15c 100644 --- a/Sources/Realtime/V2/RealtimeClientV2.swift +++ b/Sources/Realtime/V2/RealtimeClientV2.swift @@ -8,6 +8,7 @@ import ConcurrencyExtras import Foundation import Helpers +import Network #if canImport(FoundationNetworking) import FoundationNetworking @@ -345,7 +346,10 @@ public final class RealtimeClientV2: Sendable { } /// Disconnects client. - public func disconnect() { + /// - Parameters: + /// - code: A numeric status code to send on disconnect. + /// - reason: A custom reason for the disconnect. + public func disconnect(code: Int? = nil, reason: String? = nil) { options.logger?.debug("Closing WebSocket connection") mutableState.withValue { $0.ref = 0 @@ -353,7 +357,7 @@ public final class RealtimeClientV2: Sendable { $0.heartbeatTask?.cancel() $0.connectionTask?.cancel() } - ws.disconnect() + ws.disconnect(code: code, reason: reason) status = .disconnected } @@ -490,8 +494,6 @@ public final class RealtimeClientV2: Sendable { } } -import Network - final class NetworkMonitor: @unchecked Sendable { static let shared = NetworkMonitor() diff --git a/Sources/Realtime/V2/WebSocketClient.swift b/Sources/Realtime/V2/WebSocketClient.swift index b977a699..0634f774 100644 --- a/Sources/Realtime/V2/WebSocketClient.swift +++ b/Sources/Realtime/V2/WebSocketClient.swift @@ -13,6 +13,10 @@ import Helpers import FoundationNetworking #endif +enum WebSocketClientError: Error { + case unsupportedData +} + enum ConnectionStatus { case connected case disconnected(reason: String, code: URLSessionWebSocketTask.CloseCode) @@ -23,7 +27,7 @@ protocol WebSocketClient: Sendable { func send(_ message: RealtimeMessageV2) async throws func receive() -> AsyncThrowingStream func connect() -> AsyncStream - func disconnect() + func disconnect(code: Int?, reason: String?) } final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @unchecked Sendable { @@ -33,7 +37,7 @@ final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @ struct MutableState { var continuation: AsyncStream.Continuation? - var connection: WebSocketConnection? + var task: URLSessionWebSocketTask? } private let mutableState = LockIsolated(MutableState()) @@ -47,11 +51,15 @@ final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @ logger = options.logger } + deinit { + mutableState.task?.cancel(with: .goingAway, reason: nil) + } + func connect() -> AsyncStream { mutableState.withValue { state in let session = URLSession(configuration: configuration, delegate: self, delegateQueue: nil) let task = session.webSocketTask(with: realtimeURL) - state.connection = WebSocketConnection(task: task) + state.task = task task.resume() let (stream, continuation) = AsyncStream.makeStream() @@ -60,27 +68,55 @@ final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @ } } - func disconnect() { + func disconnect(code: Int?, reason: String?) { mutableState.withValue { state in - state.connection?.close() + if let code { + state.task?.cancel( + with: URLSessionWebSocketTask.CloseCode(rawValue: code) ?? .invalid, + reason: reason?.data(using: .utf8)) + } else { + state.task?.cancel() + } } } func receive() -> AsyncThrowingStream { - guard let connection = mutableState.connection else { - return .finished( - throwing: RealtimeError( - "receive() called before connect(). Make sure to call `connect()` before calling `receive()`." - ) - ) + AsyncThrowingStream { [weak self] in + guard let self else { return nil } + + let task = mutableState.task + + guard + let message = try await task?.receive(), + !Task.isCancelled + else { return nil } + + switch message { + case .data(let data): + let message = try JSONDecoder().decode(RealtimeMessageV2.self, from: data) + return message + + case .string(let string): + guard let data = string.data(using: .utf8) else { + throw WebSocketClientError.unsupportedData + } + + let message = try JSONDecoder().decode(RealtimeMessageV2.self, from: data) + return message + + @unknown default: + assertionFailure("Unsupported message type.") + task?.cancel(with: .unsupportedData, reason: nil) + throw WebSocketClientError.unsupportedData + } } - - return connection.receive() } func send(_ message: RealtimeMessageV2) async throws { logger?.verbose("Sending message: \(message)") - try await mutableState.connection?.send(message) + + let data = try JSONEncoder().encode(message) + try await mutableState.task?.send(.data(data)) } // MARK: - URLSessionWebSocketDelegate diff --git a/Sources/Realtime/V2/WebSocketConnection.swift b/Sources/Realtime/V2/WebSocketConnection.swift deleted file mode 100644 index 8e30ac53..00000000 --- a/Sources/Realtime/V2/WebSocketConnection.swift +++ /dev/null @@ -1,77 +0,0 @@ -// -// WebSocketConnection.swift -// -// -// Created by Guilherme Souza on 29/03/24. -// - -import Foundation - -#if canImport(FoundationNetworking) - import FoundationNetworking -#endif - -enum WebSocketConnectionError: Error { - case unsupportedData -} - -final class WebSocketConnection: Sendable { - private let task: URLSessionWebSocketTask - private let encoder: JSONEncoder - private let decoder: JSONDecoder - - init( - task: URLSessionWebSocketTask, - encoder: JSONEncoder = JSONEncoder(), - decoder: JSONDecoder = JSONDecoder() - ) { - self.task = task - self.encoder = encoder - self.decoder = decoder - - task.resume() - } - - deinit { - task.cancel(with: .goingAway, reason: nil) - } - - func receiveOnce() async throws -> Incoming { - switch try await task.receive() { - case let .data(data): - let message = try decoder.decode(Incoming.self, from: data) - return message - - case let .string(string): - guard let data = string.data(using: .utf8) else { - throw WebSocketConnectionError.unsupportedData - } - - let message = try decoder.decode(Incoming.self, from: data) - return message - - @unknown default: - assertionFailure("Unsupported message type.") - task.cancel(with: .unsupportedData, reason: nil) - throw WebSocketConnectionError.unsupportedData - } - } - - func send(_ message: Outgoing) async throws { - let data = try encoder.encode(message) - try await task.send(.data(data)) - } - - func receive() -> AsyncThrowingStream { - AsyncThrowingStream { [weak self] in - guard let self else { return nil } - - let message = try await receiveOnce() - return Task.isCancelled ? nil : message - } - } - - func close() { - task.cancel(with: .normalClosure, reason: nil) - } -}