Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
grdsdev committed Oct 30, 2024
1 parent 0c6c495 commit aa5ba6e
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 10 deletions.
4 changes: 2 additions & 2 deletions Examples/SlackClone/Supabase.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ let decoder: JSONDecoder = {
}()

let supabase = SupabaseClient(
supabaseURL: URL(string: "http://localhost:54321")!,
supabaseKey: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6ImFub24iLCJleHAiOjE5ODM4MTI5OTZ9.CRXP1A7WOeoJeXxjNni43kdQwgnWNReilDMblYTn_I0",
supabaseURL: URL(string: "https://rkehabxkxxpcbpzsammm.supabase.red")!,
supabaseKey: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6InJrZWhhYnhreHhwY2JwenNhbW1tIiwicm9sZSI6ImFub24iLCJpYXQiOjE3Mjk3NTgzODgsImV4cCI6MjA0NTMzNDM4OH0.rTpPEGk9fMjHXXR49drfyF6IkrNYeL_-yGGDa1JaXTY",
options: SupabaseClientOptions(
db: .init(encoder: encoder, decoder: decoder),
auth: .init(redirectToURL: URL(string: "com.supabase.slack-clone://login-callback")),
Expand Down
63 changes: 55 additions & 8 deletions Sources/Realtime/V2/RealtimeClientV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@ public final class RealtimeClientV2: Sendable {
var accessToken: String?
var ref = 0
var pendingHeartbeatRef: Int?

/// Long-running task that keeps sending heartbeat messages.
var heartbeatTask: Task<Void, Never>?

/// Long-running task for listening for incoming messages from WebSocket.
var messageTask: Task<Void, Never>?

var connectionTask: Task<Void, Never>?
var channels: [String: RealtimeChannelV2] = [:]
var sendBuffer: [@Sendable () async -> Void] = []
Expand All @@ -34,13 +39,14 @@ public final class RealtimeClientV2: Sendable {
let http: any HTTPClientType
let apikey: String?

/// All managed channels indexed by their topics.
public var channels: [String: RealtimeChannelV2] {
mutableState.channels
}

private let statusEventEmitter = EventEmitter<RealtimeClientStatus>(initialEvent: .disconnected)

/// AsyncStream that emits when connection status change.
/// Listen for connection status changes.
///
/// You can also use ``onStatusChange(_:)`` for a closure based method.
public var statusChange: AsyncStream<RealtimeClientStatus> {
Expand Down Expand Up @@ -198,6 +204,13 @@ public final class RealtimeClientV2: Sendable {
await connect(reconnect: true)
}

/// Creates a new channel and bind it to this client.
/// - Parameters:
/// - topic: Channel's topic.
/// - options: Configuration options for the channel.
/// - Returns: Channel instance.
///
/// - Note: This method doesn't subscribe to the channel, call ``RealtimeChannelV2/subscribe()`` on the returned channel instance.
public func channel(
_ topic: String,
options: @Sendable (inout RealtimeChannelConfig) -> Void = { _ in }
Expand All @@ -223,6 +236,9 @@ public final class RealtimeClientV2: Sendable {
}
}

/// Unsubscribe and removes channel.
///
/// If there is no channel left, client is disconnected.
public func removeChannel(_ channel: RealtimeChannelV2) async {
if channel.status == .subscribed {
await channel.unsubscribe()
Expand All @@ -238,6 +254,7 @@ public final class RealtimeClientV2: Sendable {
}
}

/// Unsubscribes and removes all channels.
public func removeAllChannels() async {
await withTaskGroup(of: Void.self) { group in
for channel in channels.values {
Expand Down Expand Up @@ -327,6 +344,7 @@ public final class RealtimeClientV2: Sendable {
}
}

/// Disconnects client.
public func disconnect() {
options.logger?.debug("Closing WebSocket connection")
mutableState.withValue {
Expand Down Expand Up @@ -388,13 +406,14 @@ public final class RealtimeClientV2: Sendable {
try Task.checkCancellation()
try await self?.ws.send(message)
} catch {
self?.options.logger?.error("""
Failed to send message:
\(message)
Error:
\(error)
""")
self?.options.logger?.error(
"""
Failed to send message:
\(message)
Error:
\(error)
""")
}
}

Expand Down Expand Up @@ -470,3 +489,31 @@ public final class RealtimeClientV2: Sendable {
url.appendingPathComponent("api/broadcast")
}
}

import Network

final class NetworkMonitor: @unchecked Sendable {
static let shared = NetworkMonitor()

private let monitor: NWPathMonitor
private let queue = DispatchQueue(label: "NetworkMonitor")

private(set) var isConnected: Bool = false

private init() {
monitor = NWPathMonitor()
}

func start(_ onChange: (@Sendable () -> Void)? = nil) {
monitor.pathUpdateHandler = { [weak self] path in
self?.isConnected = path.status != .unsatisfied
onChange?()
}

monitor.start(queue: queue)
}

func stop() {
monitor.cancel()
}
}

0 comments on commit aa5ba6e

Please sign in to comment.