Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(realtime): auto reconnect after calling disconnect, and several refactors #627

Merged
merged 13 commits into from
Jan 8, 2025
46 changes: 23 additions & 23 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,29 +78,29 @@ jobs:
if: matrix.skip_release != '1'
run: make XCODEBUILD_ARGUMENT="${{ matrix.command }}" CONFIG=Release PLATFORM="${{ matrix.platform }}" xcodebuild

linux:
name: linux
strategy:
matrix:
swift-version: ["5.10"]
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: swift-actions/setup-swift@v2
with:
swift-version: ${{ matrix.swift-version }}
- name: Cache build
uses: actions/cache@v3
with:
path: |
.build
key: |
build-spm-linux-${{ matrix.swift-version }}-${{ hashFiles('**/Sources/**/*.swift', '**/Tests/**/*.swift', '**/Package.resolved') }}
restore-keys: |
build-spm-linux-${{ matrix.swift-version }}-
- run: make dot-env
- name: Run tests
run: swift test --skip IntegrationTests
# linux:
# name: linux
# strategy:
# matrix:
# swift-version: ["5.10"]
# runs-on: ubuntu-latest
# steps:
# - uses: actions/checkout@v4
# - uses: swift-actions/setup-swift@v2
# with:
# swift-version: ${{ matrix.swift-version }}
# - name: Cache build
# uses: actions/cache@v3
# with:
# path: |
# .build
# key: |
# build-spm-linux-${{ matrix.swift-version }}-${{ hashFiles('**/Sources/**/*.swift', '**/Tests/**/*.swift', '**/Package.resolved') }}
# restore-keys: |
# build-spm-linux-${{ matrix.swift-version }}-
# - run: make dot-env
# - name: Run tests
# run: swift test --skip IntegrationTests

# library-evolution:
# name: Library (evolution)
Expand Down
5 changes: 1 addition & 4 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,7 @@ let package = Package(
.product(name: "InlineSnapshotTesting", package: "swift-snapshot-testing"),
.product(name: "XCTestDynamicOverlay", package: "xctest-dynamic-overlay"),
"Helpers",
"Auth",
"PostgREST",
"Realtime",
"Storage",
"Supabase",
"TestHelpers",
],
resources: [.process("Fixtures")]
Expand Down
38 changes: 26 additions & 12 deletions Sources/Helpers/EventEmitter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import ConcurrencyExtras
import Foundation

/// A token for cancelling observations.
///
/// When this token gets deallocated it cancels the observation it was associated with. Store this token in another object to keep the observation alive.
public final class ObservationToken: @unchecked Sendable, Hashable {
private let _isCancelled = LockIsolated(false)
package var onCancel: @Sendable () -> Void
Expand Down Expand Up @@ -44,9 +47,7 @@ public final class ObservationToken: @unchecked Sendable, Hashable {
public func hash(into hasher: inout Hasher) {
hasher.combine(ObjectIdentifier(self))
}
}

extension ObservationToken {
public func store(in collection: inout some RangeReplaceableCollection<ObservationToken>) {
collection.append(self)
}
Expand All @@ -59,20 +60,29 @@ extension ObservationToken {
package final class EventEmitter<Event: Sendable>: Sendable {
public typealias Listener = @Sendable (Event) -> Void

private let listeners = LockIsolated<[(key: ObjectIdentifier, listener: Listener)]>([])
private let _lastEvent: LockIsolated<Event>
package var lastEvent: Event { _lastEvent.value }
struct MutableState {
var listeners: [(key: ObjectIdentifier, listener: Listener)] = []
var lastEvent: Event
}

let mutableState: LockIsolated<MutableState>

/// The last event emitted by this Emiter, or the initial event.
package var lastEvent: Event { mutableState.lastEvent }

let emitsLastEventWhenAttaching: Bool

package init(
initialEvent event: Event,
emitsLastEventWhenAttaching: Bool = true
) {
_lastEvent = LockIsolated(event)
mutableState = LockIsolated(MutableState(lastEvent: event))
self.emitsLastEventWhenAttaching = emitsLastEventWhenAttaching
}

/// Attaches a new listener for observing event emissions.
///
/// If emitter initialized with `emitsLastEventWhenAttaching = true`, listener gets called right away with last event.
package func attach(_ listener: @escaping Listener) -> ObservationToken {
defer {
if emitsLastEventWhenAttaching {
Expand All @@ -84,21 +94,24 @@ package final class EventEmitter<Event: Sendable>: Sendable {
let key = ObjectIdentifier(token)

token.onCancel = { [weak self] in
self?.listeners.withValue {
$0.removeAll { $0.key == key }
self?.mutableState.withValue {
$0.listeners.removeAll { $0.key == key }
}
}

listeners.withValue {
$0.append((key, listener))
mutableState.withValue {
$0.listeners.append((key, listener))
}

return token
}

/// Trigger a new event on all attached listeners, or a specific listener owned by the `token` provided.
package func emit(_ event: Event, to token: ObservationToken? = nil) {
_lastEvent.setValue(event)
let listeners = listeners.value
let listeners = mutableState.withValue {
$0.lastEvent = event
return $0.listeners
}

if let token {
listeners.first { $0.key == ObjectIdentifier(token) }?.listener(event)
Expand All @@ -109,6 +122,7 @@ package final class EventEmitter<Event: Sendable>: Sendable {
}
}

/// Returns a new ``AsyncStream`` for observing events emitted by this emitter.
package func stream() -> AsyncStream<Event> {
AsyncStream { continuation in
let token = attach { status in
Expand Down
4 changes: 2 additions & 2 deletions Sources/Realtime/V2/PushV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ actor PushV2 {
return .error
}

await channel.socket.push(message)
channel.socket.push(message)

if !channel.config.broadcast.acknowledgeBroadcasts {
// channel was configured with `ack = false`,
Expand All @@ -40,7 +40,7 @@ actor PushV2 {
}

do {
return try await withTimeout(interval: channel.socket.options().timeoutInterval) {
return try await withTimeout(interval: channel.socket.options.timeoutInterval) {
await withCheckedContinuation { continuation in
self.receivedContinuation = continuation
}
Expand Down
71 changes: 15 additions & 56 deletions Sources/Realtime/V2/RealtimeChannelV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,46 +25,6 @@ public struct RealtimeChannelConfig: Sendable {
public var isPrivate: Bool
}

struct Socket: Sendable {
var broadcastURL: @Sendable () -> URL
var status: @Sendable () -> RealtimeClientStatus
var options: @Sendable () -> RealtimeClientOptions
var accessToken: @Sendable () async -> String?
var apiKey: @Sendable () -> String?
var makeRef: @Sendable () -> Int

var connect: @Sendable () async -> Void
var addChannel: @Sendable (_ channel: RealtimeChannelV2) -> Void
var removeChannel: @Sendable (_ channel: RealtimeChannelV2) async -> Void
var push: @Sendable (_ message: RealtimeMessageV2) async -> Void
var httpSend: @Sendable (_ request: Helpers.HTTPRequest) async throws -> Helpers.HTTPResponse
}

extension Socket {
init(client: RealtimeClientV2) {
self.init(
broadcastURL: { [weak client] in client?.broadcastURL ?? URL(string: "http://localhost")! },
status: { [weak client] in client?.status ?? .disconnected },
options: { [weak client] in client?.options ?? .init() },
accessToken: { [weak client] in
if let accessToken = try? await client?.options.accessToken?() {
return accessToken
}
return client?.mutableState.accessToken
},
apiKey: { [weak client] in client?.apikey },
makeRef: { [weak client] in client?.makeRef() ?? 0 },
connect: { [weak client] in await client?.connect() },
addChannel: { [weak client] in client?.addChannel($0) },
removeChannel: { [weak client] in await client?.removeChannel($0) },
push: { [weak client] in await client?.push($0) },
httpSend: { [weak client] in
try await client?.http.send($0) ?? .init(data: Data(), response: HTTPURLResponse())
}
)
}
}

public final class RealtimeChannelV2: Sendable {
struct MutableState {
var clientChanges: [PostgresJoinConfig] = []
Expand All @@ -77,7 +37,8 @@ public final class RealtimeChannelV2: Sendable {
let topic: String
let config: RealtimeChannelConfig
let logger: (any SupabaseLogger)?
let socket: Socket
let socket: RealtimeClientV2
var joinRef: String? { mutableState.joinRef }

let callbackManager = CallbackManager()
private let statusEventEmitter = EventEmitter<RealtimeChannelStatus>(initialEvent: .unsubscribed)
Expand Down Expand Up @@ -105,7 +66,7 @@ public final class RealtimeChannelV2: Sendable {
init(
topic: String,
config: RealtimeChannelConfig,
socket: Socket,
socket: RealtimeClientV2,
logger: (any SupabaseLogger)?
) {
self.topic = topic
Expand All @@ -120,8 +81,8 @@ public final class RealtimeChannelV2: Sendable {

/// Subscribes to the channel
public func subscribe() async {
if socket.status() != .connected {
if socket.options().connectOnSubscribe != true {
if socket.status != .connected {
if socket.options.connectOnSubscribe != true {
reportIssue(
"You can't subscribe to a channel while the realtime client is not connected. Did you forget to call `realtime.connect()`?"
)
Expand All @@ -130,8 +91,6 @@ public final class RealtimeChannelV2: Sendable {
await socket.connect()
}

socket.addChannel(self)

status = .subscribing
logger?.debug("Subscribing to channel \(topic)")

Expand All @@ -144,10 +103,10 @@ public final class RealtimeChannelV2: Sendable {

let payload = RealtimeJoinPayload(
config: joinConfig,
accessToken: await socket.accessToken()
accessToken: await socket._getAccessToken()
)

let joinRef = socket.makeRef().description
let joinRef = socket.makeRef()
mutableState.withValue { $0.joinRef = joinRef }

logger?.debug("Subscribing to channel with body: \(joinConfig)")
Expand All @@ -159,7 +118,7 @@ public final class RealtimeChannelV2: Sendable {
)

do {
try await withTimeout(interval: socket.options().timeoutInterval) { [self] in
try await withTimeout(interval: socket.options.timeoutInterval) { [self] in
_ = await statusChange.first { @Sendable in $0 == .subscribed }
}
} catch {
Expand Down Expand Up @@ -215,17 +174,17 @@ public final class RealtimeChannelV2: Sendable {
}

var headers: HTTPFields = [.contentType: "application/json"]
if let apiKey = socket.apiKey() {
if let apiKey = socket.options.apikey {
headers[.apiKey] = apiKey
}
if let accessToken = await socket.accessToken() {
if let accessToken = await socket._getAccessToken() {
headers[.authorization] = "Bearer \(accessToken)"
}

let task = Task { [headers] in
_ = try? await socket.httpSend(
_ = try? await socket.http.send(
HTTPRequest(
url: socket.broadcastURL(),
url: socket.broadcastURL,
method: .post,
headers: headers,
body: JSONEncoder().encode(
Expand All @@ -245,7 +204,7 @@ public final class RealtimeChannelV2: Sendable {
}

if config.broadcast.acknowledgeBroadcasts {
try? await withTimeout(interval: socket.options().timeoutInterval) {
try? await withTimeout(interval: socket.options.timeoutInterval) {
await task.value
}
}
Expand Down Expand Up @@ -406,7 +365,7 @@ public final class RealtimeChannelV2: Sendable {
callbackManager.triggerBroadcast(event: event, json: payload)

case .close:
await socket.removeChannel(self)
socket._remove(self)
logger?.debug("Unsubscribed from channel \(message.topic)")
status = .unsubscribed

Expand Down Expand Up @@ -582,7 +541,7 @@ public final class RealtimeChannelV2: Sendable {
let push = mutableState.withValue {
let message = RealtimeMessageV2(
joinRef: $0.joinRef,
ref: ref ?? socket.makeRef().description,
ref: ref ?? socket.makeRef(),
topic: self.topic,
event: event,
payload: payload
Expand Down
Loading
Loading