From 72fccff901f1a8d216f627484f36d01d5ec8200f Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Mon, 4 Nov 2024 14:56:26 -0300 Subject: [PATCH] trigger event --- Sources/Realtime/V2/CallbackManager.swift | 13 ++++++ Sources/Realtime/V2/RealtimeChannelV2.swift | 46 +++++++++++++------ Sources/Realtime/V2/RealtimeMessageV2.swift | 21 ++++++--- .../RealtimeTests/CallbackManagerTests.swift | 38 +++++++++++---- .../RealtimeTests/RealtimeChannelTests.swift | 13 +++++- 5 files changed, 101 insertions(+), 30 deletions(-) diff --git a/Sources/Realtime/V2/CallbackManager.swift b/Sources/Realtime/V2/CallbackManager.swift index 7e21b9b0..1723b9f4 100644 --- a/Sources/Realtime/V2/CallbackManager.swift +++ b/Sources/Realtime/V2/CallbackManager.swift @@ -154,6 +154,19 @@ final class CallbackManager: Sendable { } } + func triggerSystem(message: RealtimeMessageV2) { + let systemCallbacks = mutableState.callbacks.compactMap { + if case .system(let callback) = $0 { + return callback + } + return nil + } + + for systemCallback in systemCallbacks { + systemCallback.callback(message) + } + } + func reset() { mutableState.setValue(MutableState()) } diff --git a/Sources/Realtime/V2/RealtimeChannelV2.swift b/Sources/Realtime/V2/RealtimeChannelV2.swift index 6a9123d7..8facb88c 100644 --- a/Sources/Realtime/V2/RealtimeChannelV2.swift +++ b/Sources/Realtime/V2/RealtimeChannelV2.swift @@ -7,8 +7,8 @@ import ConcurrencyExtras import Foundation -import Helpers import HTTPTypes +import Helpers #if canImport(FoundationNetworking) import FoundationNetworking @@ -59,7 +59,9 @@ extension Socket { addChannel: { [weak client] in client?.addChannel($0) }, removeChannel: { [weak client] in await client?.removeChannel($0) }, push: { [weak client] in await client?.push($0) }, - httpSend: { [weak client] in try await client?.http.send($0) ?? .init(data: Data(), response: HTTPURLResponse()) } + httpSend: { [weak client] in + try await client?.http.send($0) ?? .init(data: Data(), response: HTTPURLResponse()) + } ) } } @@ -185,7 +187,8 @@ public final class RealtimeChannelV2: Sendable { @available( *, deprecated, - message: "manually updating auth token per channel is not recommended, please use `setAuth` in RealtimeClient instead." + message: + "manually updating auth token per channel is not recommended, please use `setAuth` in RealtimeClient instead." ) public func updateAuth(jwt: String?) async { logger?.debug("Updating auth token for channel \(topic)") @@ -238,8 +241,8 @@ public final class RealtimeChannelV2: Sendable { event: event, payload: message, private: config.isPrivate - ), - ], + ) + ] ] ) ) @@ -295,20 +298,27 @@ public final class RealtimeChannelV2: Sendable { func onMessage(_ message: RealtimeMessageV2) async { do { - guard let eventType = message.eventType else { + guard let eventType = message._eventType else { logger?.debug("Received message without event type: \(message)") return } switch eventType { case .tokenExpired: - logger?.debug( - "Received token expired event. This should not happen, please report this warning." - ) + // deprecated type + break case .system: - logger?.debug("Subscribed to channel \(message.topic)") - status = .subscribed + if message.status == .ok { + logger?.debug("Subscribed to channel \(message.topic)") + status = .subscribed + } else { + logger?.debug( + "Failed to subscribe to channel \(message.topic): \(message.payload)" + ) + } + + callbackManager.triggerSystem(message: message) case .reply: guard @@ -545,14 +555,24 @@ public final class RealtimeChannelV2: Sendable { } } - public func onSystem(callback: @escaping @Sendable (RealtimeMessageV2) -> Void) -> Subscription { + /// Listen for `system` event. + public func onSystem( + callback: @escaping @Sendable (RealtimeMessageV2) -> Void + ) -> RealtimeSubscription { let id = callbackManager.addSystemCallback(callback: callback) - return Subscription { [weak callbackManager, logger] in + return RealtimeSubscription { [weak callbackManager, logger] in logger?.debug("Removing system callback with id: \(id)") callbackManager?.removeCallback(id: id) } } + /// Listen for `system` event. + public func onSystem( + callback: @escaping @Sendable () -> Void + ) -> RealtimeSubscription { + self.onSystem { _ in callback() } + } + @discardableResult func push(_ event: String, ref: String? = nil, payload: JSONObject = [:]) async -> PushStatus { let push = mutableState.withValue { diff --git a/Sources/Realtime/V2/RealtimeMessageV2.swift b/Sources/Realtime/V2/RealtimeMessageV2.swift index ff45913e..d288aece 100644 --- a/Sources/Realtime/V2/RealtimeMessageV2.swift +++ b/Sources/Realtime/V2/RealtimeMessageV2.swift @@ -23,15 +23,22 @@ public struct RealtimeMessageV2: Hashable, Codable, Sendable { self.payload = payload } - var status: PushStatus? { + /// Status for the received message if any. + public var status: PushStatus? { payload["status"] .flatMap(\.stringValue) .flatMap(PushStatus.init(rawValue:)) } - public var eventType: EventType? { + @available( + *, deprecated, + message: "Access to event type will be removed, please inspect raw event value instead." + ) + public var eventType: EventType? { _eventType } + + var _eventType: EventType? { switch event { - case ChannelEvent.system where status == .ok: .system + case ChannelEvent.system: .system case ChannelEvent.postgresChanges: .postgresChanges case ChannelEvent.broadcast: @@ -44,9 +51,6 @@ public struct RealtimeMessageV2: Hashable, Codable, Sendable { .presenceDiff case ChannelEvent.presenceState: .presenceState - case ChannelEvent.system - where payload["message"]?.stringValue?.contains("access token has expired") == true: - .tokenExpired case ChannelEvent.reply: .reply default: @@ -62,6 +66,11 @@ public struct RealtimeMessageV2: Hashable, Codable, Sendable { case error case presenceDiff case presenceState + @available( + *, deprecated, + message: + "tokenExpired gets returned as system, check payload for verifying if is a token expiration." + ) case tokenExpired case reply } diff --git a/Tests/RealtimeTests/CallbackManagerTests.swift b/Tests/RealtimeTests/CallbackManagerTests.swift index 145db28e..779993b2 100644 --- a/Tests/RealtimeTests/CallbackManagerTests.swift +++ b/Tests/RealtimeTests/CallbackManagerTests.swift @@ -8,9 +8,10 @@ import ConcurrencyExtras import CustomDump import Helpers -@testable import Realtime import XCTest +@testable import Realtime + final class CallbackManagerTests: XCTestCase { func testIntegration() { let callbackManager = CallbackManager() @@ -52,13 +53,15 @@ final class CallbackManagerTests: XCTestCase { let callbackManager = CallbackManager() XCTAssertNoLeak(callbackManager) - let changes = [PostgresJoinConfig( - event: .update, - schema: "public", - table: "users", - filter: nil, - id: 1 - )] + let changes = [ + PostgresJoinConfig( + event: .update, + schema: "public", + table: "users", + filter: nil, + id: 1 + ) + ] callbackManager.setServerChanges(changes: changes) @@ -118,7 +121,8 @@ final class CallbackManagerTests: XCTestCase { receivedActions.withValue { $0.append(action) } } - let deleteSpecificUserId = callbackManager + let deleteSpecificUserId = + callbackManager .addPostgresCallback(filter: deleteSpecificUserFilter) { action in receivedActions.withValue { $0.append(action) } } @@ -215,6 +219,22 @@ final class CallbackManagerTests: XCTestCase { expectNoDifference(receivedAction.value?.joins, joins) expectNoDifference(receivedAction.value?.leaves, leaves) } + + func testTriggerSystem() { + let callbackManager = CallbackManager() + + let receivedMessage = LockIsolated(RealtimeMessageV2?.none) + callbackManager.addSystemCallback { message in + receivedMessage.setValue(message) + } + + callbackManager.triggerSystem( + message: RealtimeMessageV2( + joinRef: nil, ref: nil, topic: "test", event: "system", payload: ["status": "ok"])) + + XCTAssertEqual(receivedMessage.value?._eventType, .system) + XCTAssertEqual(receivedMessage.value?.status, .ok) + } } extension XCTestCase { diff --git a/Tests/RealtimeTests/RealtimeChannelTests.swift b/Tests/RealtimeTests/RealtimeChannelTests.swift index b260f40d..a6403cd3 100644 --- a/Tests/RealtimeTests/RealtimeChannelTests.swift +++ b/Tests/RealtimeTests/RealtimeChannelTests.swift @@ -6,10 +6,11 @@ // import InlineSnapshotTesting -@testable import Realtime import XCTest import XCTestDynamicOverlay +@testable import Realtime + final class RealtimeChannelTests: XCTestCase { let sut = RealtimeChannelV2( topic: "topic", @@ -48,9 +49,13 @@ final class RealtimeChannelTests: XCTestCase { sut.onPresenceChange { _ in }.store(in: &subscriptions) + sut.onSystem { + } + .store(in: &subscriptions) + assertInlineSnapshot(of: sut.callbackManager.callbacks, as: .dump) { """ - ▿ 7 elements + ▿ 8 elements ▿ RealtimeCallback ▿ postgres: PostgresCallback - callback: (Function) @@ -112,6 +117,10 @@ final class RealtimeChannelTests: XCTestCase { ▿ presence: PresenceCallback - callback: (Function) - id: 7 + ▿ RealtimeCallback + ▿ system: SystemCallback + - callback: (Function) + - id: 8 """ }