Skip to content

Commit

Permalink
fix(realtime): losing postgres_changes on resubscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
grdsdev committed Oct 30, 2024
1 parent aa5ba6e commit e6f20b2
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 109 deletions.
6 changes: 4 additions & 2 deletions Examples/SlackClone/AppView.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ final class AppViewModel {
var session: Session?
var selectedChannel: Channel?

var realtimeConnectionStatus: RealtimeClientV2.Status?
var realtimeConnectionStatus: RealtimeClientStatus?

init() {
Task {
for await (event, session) in supabase.auth.authStateChanges {
Logger.main.debug("AuthStateChange: \(event.rawValue)")
guard [.signedIn, .signedOut, .initialSession, .tokenRefreshed].contains(event) else { return }
guard [.signedIn, .signedOut, .initialSession, .tokenRefreshed].contains(event) else {
return
}
self.session = session

if session == nil {
Expand Down
5 changes: 3 additions & 2 deletions Examples/SlackClone/Supabase.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ let decoder: JSONDecoder = {
}()

let supabase = SupabaseClient(
supabaseURL: URL(string: "https://rkehabxkxxpcbpzsammm.supabase.red")!,
supabaseKey: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6InJrZWhhYnhreHhwY2JwenNhbW1tIiwicm9sZSI6ImFub24iLCJpYXQiOjE3Mjk3NTgzODgsImV4cCI6MjA0NTMzNDM4OH0.rTpPEGk9fMjHXXR49drfyF6IkrNYeL_-yGGDa1JaXTY",
supabaseURL: URL(string: "http://127.0.0.1:54321")!,
supabaseKey:
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6ImFub24iLCJleHAiOjE5ODM4MTI5OTZ9.CRXP1A7WOeoJeXxjNni43kdQwgnWNReilDMblYTn_I0",
options: SupabaseClientOptions(
db: .init(encoder: encoder, decoder: decoder),
auth: .init(redirectToURL: URL(string: "com.supabase.slack-clone://login-callback")),
Expand Down
18 changes: 8 additions & 10 deletions Sources/Realtime/V2/RealtimeChannelV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

import ConcurrencyExtras
import Foundation
import Helpers
import HTTPTypes
import Helpers

#if canImport(FoundationNetworking)
import FoundationNetworking
Expand Down Expand Up @@ -59,7 +59,9 @@ extension Socket {
addChannel: { [weak client] in client?.addChannel($0) },
removeChannel: { [weak client] in await client?.removeChannel($0) },
push: { [weak client] in await client?.push($0) },
httpSend: { [weak client] in try await client?.http.send($0) ?? .init(data: Data(), response: HTTPURLResponse()) }
httpSend: { [weak client] in
try await client?.http.send($0) ?? .init(data: Data(), response: HTTPURLResponse())
}
)
}
}
Expand Down Expand Up @@ -128,11 +130,6 @@ public final class RealtimeChannelV2: Sendable {
await socket.connect()
}

guard status != .subscribed else {
logger?.warning("Channel \(topic) is already subscribed")
return
}

socket.addChannel(self)

status = .subscribing
Expand Down Expand Up @@ -185,7 +182,8 @@ public final class RealtimeChannelV2: Sendable {
@available(
*,
deprecated,
message: "manually updating auth token per channel is not recommended, please use `setAuth` in RealtimeClient instead."
message:
"manually updating auth token per channel is not recommended, please use `setAuth` in RealtimeClient instead."
)
public func updateAuth(jwt: String?) async {
logger?.debug("Updating auth token for channel \(topic)")
Expand Down Expand Up @@ -238,8 +236,8 @@ public final class RealtimeChannelV2: Sendable {
event: event,
payload: message,
private: config.isPrivate
),
],
)
]
]
)
)
Expand Down
10 changes: 6 additions & 4 deletions Sources/Realtime/V2/RealtimeClientV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import ConcurrencyExtras
import Foundation
import Helpers
import Network

#if canImport(FoundationNetworking)
import FoundationNetworking
Expand Down Expand Up @@ -345,15 +346,18 @@ public final class RealtimeClientV2: Sendable {
}

/// Disconnects client.
public func disconnect() {
/// - Parameters:
/// - code: A numeric status code to send on disconnect.
/// - reason: A custom reason for the disconnect.
public func disconnect(code: Int? = nil, reason: String? = nil) {
options.logger?.debug("Closing WebSocket connection")
mutableState.withValue {
$0.ref = 0
$0.messageTask?.cancel()
$0.heartbeatTask?.cancel()
$0.connectionTask?.cancel()
}
ws.disconnect()
ws.disconnect(code: code, reason: reason)
status = .disconnected
}

Expand Down Expand Up @@ -490,8 +494,6 @@ public final class RealtimeClientV2: Sendable {
}
}

import Network

final class NetworkMonitor: @unchecked Sendable {
static let shared = NetworkMonitor()

Expand Down
64 changes: 50 additions & 14 deletions Sources/Realtime/V2/WebSocketClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import Helpers
import FoundationNetworking
#endif

enum WebSocketClientError: Error {
case unsupportedData
}

enum ConnectionStatus {
case connected
case disconnected(reason: String, code: URLSessionWebSocketTask.CloseCode)
Expand All @@ -23,7 +27,7 @@ protocol WebSocketClient: Sendable {
func send(_ message: RealtimeMessageV2) async throws
func receive() -> AsyncThrowingStream<RealtimeMessageV2, any Error>
func connect() -> AsyncStream<ConnectionStatus>
func disconnect()
func disconnect(code: Int?, reason: String?)
}

final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @unchecked Sendable {
Expand All @@ -33,7 +37,7 @@ final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @

struct MutableState {
var continuation: AsyncStream<ConnectionStatus>.Continuation?
var connection: WebSocketConnection<RealtimeMessageV2, RealtimeMessageV2>?
var task: URLSessionWebSocketTask?
}

private let mutableState = LockIsolated(MutableState())
Expand All @@ -47,11 +51,15 @@ final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @
logger = options.logger
}

deinit {
mutableState.task?.cancel(with: .goingAway, reason: nil)
}

func connect() -> AsyncStream<ConnectionStatus> {
mutableState.withValue { state in
let session = URLSession(configuration: configuration, delegate: self, delegateQueue: nil)
let task = session.webSocketTask(with: realtimeURL)
state.connection = WebSocketConnection(task: task)
state.task = task
task.resume()

let (stream, continuation) = AsyncStream<ConnectionStatus>.makeStream()
Expand All @@ -60,27 +68,55 @@ final class WebSocket: NSObject, URLSessionWebSocketDelegate, WebSocketClient, @
}
}

func disconnect() {
func disconnect(code: Int?, reason: String?) {
mutableState.withValue { state in
state.connection?.close()
if let code {
state.task?.cancel(
with: URLSessionWebSocketTask.CloseCode(rawValue: code) ?? .invalid,
reason: reason?.data(using: .utf8))
} else {
state.task?.cancel()
}
}
}

func receive() -> AsyncThrowingStream<RealtimeMessageV2, any Error> {
guard let connection = mutableState.connection else {
return .finished(
throwing: RealtimeError(
"receive() called before connect(). Make sure to call `connect()` before calling `receive()`."
)
)
AsyncThrowingStream { [weak self] in
guard let self else { return nil }

let task = mutableState.task

guard
let message = try await task?.receive(),
!Task.isCancelled
else { return nil }

switch message {
case .data(let data):
let message = try JSONDecoder().decode(RealtimeMessageV2.self, from: data)
return message

case .string(let string):
guard let data = string.data(using: .utf8) else {
throw WebSocketClientError.unsupportedData
}

let message = try JSONDecoder().decode(RealtimeMessageV2.self, from: data)
return message

@unknown default:
assertionFailure("Unsupported message type.")
task?.cancel(with: .unsupportedData, reason: nil)
throw WebSocketClientError.unsupportedData
}
}

return connection.receive()
}

func send(_ message: RealtimeMessageV2) async throws {
logger?.verbose("Sending message: \(message)")
try await mutableState.connection?.send(message)

let data = try JSONEncoder().encode(message)
try await mutableState.task?.send(.data(data))
}

// MARK: - URLSessionWebSocketDelegate
Expand Down
77 changes: 0 additions & 77 deletions Sources/Realtime/V2/WebSocketConnection.swift

This file was deleted.

0 comments on commit e6f20b2

Please sign in to comment.