Skip to content

Commit

Permalink
feat(apple): realtime heartbeat
Browse files Browse the repository at this point in the history
  • Loading branch information
loks0n committed Nov 29, 2024
1 parent 3ca2123 commit a037987
Showing 1 changed file with 37 additions and 0 deletions.
37 changes: 37 additions & 0 deletions templates/swift/Sources/Services/Realtime.swift.twig
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ open class Realtime : Service {

private let TYPE_ERROR = "error"
private let TYPE_EVENT = "event"
private let TYPE_PONG = "pong"
private let DEBOUNCE_NANOS = 1_000_000
private let HEARTBEAT_INTERVAL: UInt64 = 20_000_000_000 // 20 seconds in nanoseconds

private var socketClient: WebSocketClient? = nil
private var activeChannels = Set<String>()
private var activeSubscriptions = [Int: RealtimeCallback]()
private var heartbeatTask: Task<Void, Error>? = nil

let connectSync = DispatchQueue(label: "ConnectSync")

Expand All @@ -20,6 +23,33 @@ open class Realtime : Service {
private var subscriptionsCounter = 0
private var reconnect = true

private func startHeartbeat() {
stopHeartbeat()
heartbeatTask = Task {
do {
while !Task.isCancelled {
if let client = socketClient, client.isConnected {
let pingMessage = ["type": "ping"]
if let jsonData = try JSONSerialization.data(withJSONObject: pingMessage),
let jsonString = String(data: jsonData, encoding: .utf8) {
client.send(text: jsonString)
}
}
try await Task.sleep(nanoseconds: HEARTBEAT_INTERVAL)
}
} catch {
if !Task.isCancelled {
print("Heartbeat task failed: \(error.localizedDescription)")
}
}
}
}

private func stopHeartbeat() {
heartbeatTask?.cancel()
heartbeatTask = nil
}

private func createSocket() async throws {
guard activeChannels.count > 0 else {
reconnect = false
Expand Down Expand Up @@ -50,6 +80,8 @@ open class Realtime : Service {
}

private func closeSocket() async throws {
stopHeartbeat()

guard let client = socketClient,
let group = client.threadGroup else {
return
Expand Down Expand Up @@ -163,6 +195,7 @@ extension Realtime: WebSocketClientDelegate {

public func onOpen(channel: Channel) {
self.reconnectAttempts = 0
startHeartbeat()
}

public func onMessage(text: String) {
Expand All @@ -172,13 +205,16 @@ extension Realtime: WebSocketClientDelegate {
switch type {
case TYPE_ERROR: try! handleResponseError(from: json)
case TYPE_EVENT: handleResponseEvent(from: json)
case TYPE_PONG: break // Handle pong response if needed
default: break
}
}
}
}

public func onClose(channel: Channel, data: Data) async throws {
stopHeartbeat()

if (!reconnect) {
reconnect = true
return
Expand All @@ -196,6 +232,7 @@ extension Realtime: WebSocketClientDelegate {
}

public func onError(error: Swift.Error?, status: HTTPResponseStatus?) {
stopHeartbeat()
print(error?.localizedDescription ?? "Unknown error")
}

Expand Down

0 comments on commit a037987

Please sign in to comment.