Skip to content

Commit

Permalink
trigger event
Browse files Browse the repository at this point in the history
  • Loading branch information
grdsdev committed Nov 4, 2024
1 parent 4ef61bb commit 72fccff
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 30 deletions.
13 changes: 13 additions & 0 deletions Sources/Realtime/V2/CallbackManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,19 @@ final class CallbackManager: Sendable {
}
}

func triggerSystem(message: RealtimeMessageV2) {
let systemCallbacks = mutableState.callbacks.compactMap {
if case .system(let callback) = $0 {
return callback
}
return nil
}

for systemCallback in systemCallbacks {
systemCallback.callback(message)
}
}

func reset() {
mutableState.setValue(MutableState())
}
Expand Down
46 changes: 33 additions & 13 deletions Sources/Realtime/V2/RealtimeChannelV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

import ConcurrencyExtras
import Foundation
import Helpers
import HTTPTypes
import Helpers

#if canImport(FoundationNetworking)
import FoundationNetworking
Expand Down Expand Up @@ -59,7 +59,9 @@ extension Socket {
addChannel: { [weak client] in client?.addChannel($0) },
removeChannel: { [weak client] in await client?.removeChannel($0) },
push: { [weak client] in await client?.push($0) },
httpSend: { [weak client] in try await client?.http.send($0) ?? .init(data: Data(), response: HTTPURLResponse()) }
httpSend: { [weak client] in
try await client?.http.send($0) ?? .init(data: Data(), response: HTTPURLResponse())
}
)
}
}
Expand Down Expand Up @@ -185,7 +187,8 @@ public final class RealtimeChannelV2: Sendable {
@available(
*,
deprecated,
message: "manually updating auth token per channel is not recommended, please use `setAuth` in RealtimeClient instead."
message:
"manually updating auth token per channel is not recommended, please use `setAuth` in RealtimeClient instead."
)
public func updateAuth(jwt: String?) async {
logger?.debug("Updating auth token for channel \(topic)")
Expand Down Expand Up @@ -238,8 +241,8 @@ public final class RealtimeChannelV2: Sendable {
event: event,
payload: message,
private: config.isPrivate
),
],
)
]
]
)
)
Expand Down Expand Up @@ -295,20 +298,27 @@ public final class RealtimeChannelV2: Sendable {

func onMessage(_ message: RealtimeMessageV2) async {
do {
guard let eventType = message.eventType else {
guard let eventType = message._eventType else {
logger?.debug("Received message without event type: \(message)")
return
}

switch eventType {
case .tokenExpired:
logger?.debug(
"Received token expired event. This should not happen, please report this warning."
)
// deprecated type
break

case .system:
logger?.debug("Subscribed to channel \(message.topic)")
status = .subscribed
if message.status == .ok {
logger?.debug("Subscribed to channel \(message.topic)")
status = .subscribed
} else {
logger?.debug(
"Failed to subscribe to channel \(message.topic): \(message.payload)"
)
}

callbackManager.triggerSystem(message: message)

case .reply:
guard
Expand Down Expand Up @@ -545,14 +555,24 @@ public final class RealtimeChannelV2: Sendable {
}
}

public func onSystem(callback: @escaping @Sendable (RealtimeMessageV2) -> Void) -> Subscription {
/// Listen for `system` event.
public func onSystem(
callback: @escaping @Sendable (RealtimeMessageV2) -> Void
) -> RealtimeSubscription {
let id = callbackManager.addSystemCallback(callback: callback)
return Subscription { [weak callbackManager, logger] in
return RealtimeSubscription { [weak callbackManager, logger] in
logger?.debug("Removing system callback with id: \(id)")
callbackManager?.removeCallback(id: id)
}
}

/// Listen for `system` event.
public func onSystem(
callback: @escaping @Sendable () -> Void
) -> RealtimeSubscription {
self.onSystem { _ in callback() }
}

@discardableResult
func push(_ event: String, ref: String? = nil, payload: JSONObject = [:]) async -> PushStatus {
let push = mutableState.withValue {
Expand Down
21 changes: 15 additions & 6 deletions Sources/Realtime/V2/RealtimeMessageV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,22 @@ public struct RealtimeMessageV2: Hashable, Codable, Sendable {
self.payload = payload
}

var status: PushStatus? {
/// Status for the received message if any.
public var status: PushStatus? {
payload["status"]
.flatMap(\.stringValue)
.flatMap(PushStatus.init(rawValue:))
}

public var eventType: EventType? {
@available(
*, deprecated,
message: "Access to event type will be removed, please inspect raw event value instead."
)
public var eventType: EventType? { _eventType }

var _eventType: EventType? {
switch event {
case ChannelEvent.system where status == .ok: .system
case ChannelEvent.system: .system
case ChannelEvent.postgresChanges:
.postgresChanges
case ChannelEvent.broadcast:
Expand All @@ -44,9 +51,6 @@ public struct RealtimeMessageV2: Hashable, Codable, Sendable {
.presenceDiff
case ChannelEvent.presenceState:
.presenceState
case ChannelEvent.system
where payload["message"]?.stringValue?.contains("access token has expired") == true:
.tokenExpired
case ChannelEvent.reply:
.reply
default:
Expand All @@ -62,6 +66,11 @@ public struct RealtimeMessageV2: Hashable, Codable, Sendable {
case error
case presenceDiff
case presenceState
@available(
*, deprecated,
message:
"tokenExpired gets returned as system, check payload for verifying if is a token expiration."
)
case tokenExpired
case reply
}
Expand Down
38 changes: 29 additions & 9 deletions Tests/RealtimeTests/CallbackManagerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
import ConcurrencyExtras
import CustomDump
import Helpers
@testable import Realtime
import XCTest

@testable import Realtime

final class CallbackManagerTests: XCTestCase {
func testIntegration() {
let callbackManager = CallbackManager()
Expand Down Expand Up @@ -52,13 +53,15 @@ final class CallbackManagerTests: XCTestCase {
let callbackManager = CallbackManager()
XCTAssertNoLeak(callbackManager)

let changes = [PostgresJoinConfig(
event: .update,
schema: "public",
table: "users",
filter: nil,
id: 1
)]
let changes = [
PostgresJoinConfig(
event: .update,
schema: "public",
table: "users",
filter: nil,
id: 1
)
]

callbackManager.setServerChanges(changes: changes)

Expand Down Expand Up @@ -118,7 +121,8 @@ final class CallbackManagerTests: XCTestCase {
receivedActions.withValue { $0.append(action) }
}

let deleteSpecificUserId = callbackManager
let deleteSpecificUserId =
callbackManager
.addPostgresCallback(filter: deleteSpecificUserFilter) { action in
receivedActions.withValue { $0.append(action) }
}
Expand Down Expand Up @@ -215,6 +219,22 @@ final class CallbackManagerTests: XCTestCase {
expectNoDifference(receivedAction.value?.joins, joins)
expectNoDifference(receivedAction.value?.leaves, leaves)
}

func testTriggerSystem() {
let callbackManager = CallbackManager()

let receivedMessage = LockIsolated(RealtimeMessageV2?.none)
callbackManager.addSystemCallback { message in
receivedMessage.setValue(message)
}

callbackManager.triggerSystem(
message: RealtimeMessageV2(
joinRef: nil, ref: nil, topic: "test", event: "system", payload: ["status": "ok"]))

XCTAssertEqual(receivedMessage.value?._eventType, .system)
XCTAssertEqual(receivedMessage.value?.status, .ok)
}
}

extension XCTestCase {
Expand Down
13 changes: 11 additions & 2 deletions Tests/RealtimeTests/RealtimeChannelTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
//

import InlineSnapshotTesting
@testable import Realtime
import XCTest
import XCTestDynamicOverlay

@testable import Realtime

final class RealtimeChannelTests: XCTestCase {
let sut = RealtimeChannelV2(
topic: "topic",
Expand Down Expand Up @@ -48,9 +49,13 @@ final class RealtimeChannelTests: XCTestCase {

sut.onPresenceChange { _ in }.store(in: &subscriptions)

sut.onSystem {
}
.store(in: &subscriptions)

assertInlineSnapshot(of: sut.callbackManager.callbacks, as: .dump) {
"""
7 elements
8 elements
▿ RealtimeCallback
▿ postgres: PostgresCallback
- callback: (Function)
Expand Down Expand Up @@ -112,6 +117,10 @@ final class RealtimeChannelTests: XCTestCase {
▿ presence: PresenceCallback
- callback: (Function)
- id: 7
▿ RealtimeCallback
▿ system: SystemCallback
- callback: (Function)
- id: 8
"""
}
Expand Down

0 comments on commit 72fccff

Please sign in to comment.