Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(realtime): lost postgres_changes on resubscribe #585

Merged
merged 7 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Examples/SlackClone/AppView.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ final class AppViewModel {
var session: Session?
var selectedChannel: Channel?

var realtimeConnectionStatus: RealtimeClientV2.Status?
var realtimeConnectionStatus: RealtimeClientStatus?

init() {
Task {
for await (event, session) in supabase.auth.authStateChanges {
Logger.main.debug("AuthStateChange: \(event.rawValue)")
guard [.signedIn, .signedOut, .initialSession, .tokenRefreshed].contains(event) else { return }
guard [.signedIn, .signedOut, .initialSession, .tokenRefreshed].contains(event) else {
return
}
self.session = session

if session == nil {
Expand Down
5 changes: 3 additions & 2 deletions Examples/SlackClone/Supabase.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ let decoder: JSONDecoder = {
}()

let supabase = SupabaseClient(
supabaseURL: URL(string: "http://localhost:54321")!,
supabaseKey: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6ImFub24iLCJleHAiOjE5ODM4MTI5OTZ9.CRXP1A7WOeoJeXxjNni43kdQwgnWNReilDMblYTn_I0",
supabaseURL: URL(string: "http://127.0.0.1:54321")!,
supabaseKey:
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6ImFub24iLCJleHAiOjE5ODM4MTI5OTZ9.CRXP1A7WOeoJeXxjNni43kdQwgnWNReilDMblYTn_I0",
options: SupabaseClientOptions(
db: .init(encoder: encoder, decoder: decoder),
auth: .init(redirectToURL: URL(string: "com.supabase.slack-clone://login-callback")),
Expand Down
1 change: 1 addition & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ let package = Package(
name: "Realtime",
dependencies: [
.product(name: "ConcurrencyExtras", package: "swift-concurrency-extras"),
.product(name: "IssueReporting", package: "xctest-dynamic-overlay"),
"Helpers",
]
),
Expand Down
11 changes: 4 additions & 7 deletions Sources/Realtime/V2/CallbackManager.swift
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
//
// CallbackManager.swift
//
//
// Created by Guilherme Souza on 24/12/23.
//

import ConcurrencyExtras
import Foundation
import Helpers
Expand All @@ -26,6 +19,10 @@ final class CallbackManager: Sendable {
mutableState.callbacks
}

deinit {
reset()
}

@discardableResult
func addBroadcastCallback(
event: String,
Expand Down
38 changes: 22 additions & 16 deletions Sources/Realtime/V2/PushV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,31 @@ actor PushV2 {
}

func send() async -> PushStatus {
await channel?.socket.push(message)

if channel?.config.broadcast.acknowledgeBroadcasts == true {
do {
return try await withTimeout(interval: channel?.socket.options().timeoutInterval ?? 10) {
await withCheckedContinuation {
self.receivedContinuation = $0
}
guard let channel = channel else {
return .error
}

await channel.socket.push(message)

if !channel.config.broadcast.acknowledgeBroadcasts {
// channel was configured with `ack = false`,
// don't wait for a response and return `ok`.
return .ok
}

do {
return try await withTimeout(interval: channel.socket.options().timeoutInterval) {
await withCheckedContinuation { continuation in
self.receivedContinuation = continuation
}
} catch is TimeoutError {
channel?.logger?.debug("Push timed out.")
return .timeout
} catch {
channel?.logger?.error("Error sending push: \(error)")
return .error
}
} catch is TimeoutError {
channel.logger?.debug("Push timed out.")
return .timeout
} catch {
channel.logger?.error("Error sending push: \(error.localizedDescription)")
return .error
}

return .ok
}

func didReceive(status: PushStatus) {
Expand Down
41 changes: 20 additions & 21 deletions Sources/Realtime/V2/RealtimeChannelV2.swift
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
//
// RealtimeChannelV2.swift
//
//
// Created by Guilherme Souza on 26/12/23.
//

import ConcurrencyExtras
import Foundation
import HTTPTypes
import Helpers
import IssueReporting

#if canImport(FoundationNetworking)
import FoundationNetworking
Expand Down Expand Up @@ -123,18 +117,14 @@ public final class RealtimeChannelV2: Sendable {
public func subscribe() async {
if socket.status() != .connected {
if socket.options().connectOnSubscribe != true {
fatalError(
reportIssue(
"You can't subscribe to a channel while the realtime client is not connected. Did you forget to call `realtime.connect()`?"
)
return
}
await socket.connect()
}

guard status != .subscribed else {
logger?.warning("Channel \(topic) is already subscribed")
return
}

socket.addChannel(self)

status = .subscribing
Expand Down Expand Up @@ -266,15 +256,21 @@ public final class RealtimeChannelV2: Sendable {
}
}

/// Tracks the given state in the channel.
/// - Parameter state: The state to be tracked, conforming to `Codable`.
/// - Throws: An error if the tracking fails.
public func track(_ state: some Codable) async throws {
try await track(state: JSONObject(state))
}

/// Tracks the given state in the channel.
/// - Parameter state: The state to be tracked as a `JSONObject`.
public func track(state: JSONObject) async {
assert(
status == .subscribed,
"You can only track your presence after subscribing to the channel. Did you forget to call `channel.subscribe()`?"
)
if status != .subscribed {
reportIssue(
"You can only track your presence after subscribing to the channel. Did you forget to call `channel.subscribe()`?"
)
}

await push(
ChannelEvent.presence,
Expand All @@ -286,6 +282,7 @@ public final class RealtimeChannelV2: Sendable {
)
}

/// Stops tracking the current state in the channel.
public func untrack() async {
await push(
ChannelEvent.presence,
Expand Down Expand Up @@ -520,10 +517,12 @@ public final class RealtimeChannelV2: Sendable {
filter: String?,
callback: @escaping @Sendable (AnyAction) -> Void
) -> RealtimeSubscription {
precondition(
status != .subscribed,
"You cannot call postgresChange after joining the channel"
)
guard status != .subscribed else {
reportIssue(
"You cannot call postgresChange after joining the channel, this won't work as expected."
)
return RealtimeSubscription {}
}

let config = PostgresJoinConfig(
event: event,
Expand Down
42 changes: 32 additions & 10 deletions Sources/Realtime/V2/RealtimeClientV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@ public final class RealtimeClientV2: Sendable {
var accessToken: String?
var ref = 0
var pendingHeartbeatRef: Int?

/// Long-running task that keeps sending heartbeat messages.
var heartbeatTask: Task<Void, Never>?

/// Long-running task for listening for incoming messages from WebSocket.
var messageTask: Task<Void, Never>?

var connectionTask: Task<Void, Never>?
var channels: [String: RealtimeChannelV2] = [:]
var sendBuffer: [@Sendable () async -> Void] = []
Expand All @@ -34,13 +39,14 @@ public final class RealtimeClientV2: Sendable {
let http: any HTTPClientType
let apikey: String?

/// All managed channels indexed by their topics.
public var channels: [String: RealtimeChannelV2] {
mutableState.channels
}

private let statusEventEmitter = EventEmitter<RealtimeClientStatus>(initialEvent: .disconnected)

/// AsyncStream that emits when connection status change.
/// Listen for connection status changes.
///
/// You can also use ``onStatusChange(_:)`` for a closure based method.
public var statusChange: AsyncStream<RealtimeClientStatus> {
Expand Down Expand Up @@ -198,6 +204,13 @@ public final class RealtimeClientV2: Sendable {
await connect(reconnect: true)
}

/// Creates a new channel and bind it to this client.
/// - Parameters:
/// - topic: Channel's topic.
/// - options: Configuration options for the channel.
/// - Returns: Channel instance.
///
/// - Note: This method doesn't subscribe to the channel, call ``RealtimeChannelV2/subscribe()`` on the returned channel instance.
public func channel(
_ topic: String,
options: @Sendable (inout RealtimeChannelConfig) -> Void = { _ in }
Expand All @@ -223,6 +236,9 @@ public final class RealtimeClientV2: Sendable {
}
}

/// Unsubscribe and removes channel.
///
/// If there is no channel left, client is disconnected.
public func removeChannel(_ channel: RealtimeChannelV2) async {
if channel.status == .subscribed {
await channel.unsubscribe()
Expand All @@ -238,6 +254,7 @@ public final class RealtimeClientV2: Sendable {
}
}

/// Unsubscribes and removes all channels.
public func removeAllChannels() async {
await withTaskGroup(of: Void.self) { group in
for channel in channels.values {
Expand Down Expand Up @@ -327,15 +344,19 @@ public final class RealtimeClientV2: Sendable {
}
}

public func disconnect() {
/// Disconnects client.
/// - Parameters:
/// - code: A numeric status code to send on disconnect.
/// - reason: A custom reason for the disconnect.
public func disconnect(code: Int? = nil, reason: String? = nil) {
options.logger?.debug("Closing WebSocket connection")
mutableState.withValue {
$0.ref = 0
$0.messageTask?.cancel()
$0.heartbeatTask?.cancel()
$0.connectionTask?.cancel()
}
ws.disconnect()
ws.disconnect(code: code, reason: reason)
status = .disconnected
}

Expand Down Expand Up @@ -388,13 +409,14 @@ public final class RealtimeClientV2: Sendable {
try Task.checkCancellation()
try await self?.ws.send(message)
} catch {
self?.options.logger?.error("""
Failed to send message:
\(message)

Error:
\(error)
""")
self?.options.logger?.error(
"""
Failed to send message:
\(message)

Error:
\(error)
""")
}
}

Expand Down
7 changes: 0 additions & 7 deletions Sources/Realtime/V2/RealtimeMessageV2.swift
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
//
// RealtimeMessageV2.swift
//
//
// Created by Guilherme Souza on 11/01/24.
//

import Foundation
import Helpers

Expand Down
Loading
Loading