Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ECO-4963] Clean up when user finishes with subscription #207

Merged
merged 3 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 37 additions & 63 deletions Example/AblyChatExample/Mocks/MockClients.swift
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ actor MockRoom: Room {

var status: RoomStatus = .initialized

private var mockSubscriptions: [MockSubscription<RoomStatusChange>] = []
private let mockSubscriptions = MockSubscriptionStorage<RoomStatusChange>()

func attach() async throws {
print("Mock client attached to room with roomID: \(roomID)")
Expand All @@ -75,11 +75,9 @@ actor MockRoom: Room {
}

private func createSubscription() -> MockSubscription<RoomStatusChange> {
let subscription = MockSubscription<RoomStatusChange>(randomElement: {
mockSubscriptions.create(randomElement: {
RoomStatusChange(current: [.attached, .attached, .attached, .attached, .attaching(error: nil), .attaching(error: nil), .suspended(error: .createUnknownError())].randomElement()!, previous: .attaching(error: nil))
}, interval: 8)
mockSubscriptions.append(subscription)
return subscription
}

func onStatusChange(bufferingPolicy _: BufferingPolicy) async -> Subscription<RoomStatusChange> {
Expand All @@ -92,7 +90,7 @@ actor MockMessages: Messages {
let roomID: String
let channel: RealtimeChannelProtocol

private var mockSubscriptions: [MockSubscription<Message>] = []
private let mockSubscriptions = MockSubscriptionStorage<Message>()

init(clientID: String, roomID: String) {
self.clientID = clientID
Expand All @@ -101,7 +99,7 @@ actor MockMessages: Messages {
}

private func createSubscription() -> MockSubscription<Message> {
let subscription = MockSubscription<Message>(randomElement: {
mockSubscriptions.create(randomElement: {
Message(
serial: "\(Date().timeIntervalSince1970)",
action: .create,
Expand All @@ -113,8 +111,6 @@ actor MockMessages: Messages {
headers: [:]
)
}, interval: 3)
mockSubscriptions.append(subscription)
return subscription
}

func subscribe(bufferingPolicy _: BufferingPolicy) async -> MessageSubscription {
Expand All @@ -138,9 +134,7 @@ actor MockMessages: Messages {
metadata: params.metadata ?? [:],
headers: params.headers ?? [:]
)
for subscription in mockSubscriptions {
subscription.emit(message)
}
mockSubscriptions.emit(message)
return message
}

Expand All @@ -154,7 +148,7 @@ actor MockRoomReactions: RoomReactions {
let roomID: String
let channel: RealtimeChannelProtocol

private var mockSubscriptions: [MockSubscription<Reaction>] = []
private let mockSubscriptions = MockSubscriptionStorage<Reaction>()

init(clientID: String, roomID: String) {
self.clientID = clientID
Expand All @@ -163,7 +157,7 @@ actor MockRoomReactions: RoomReactions {
}

private func createSubscription() -> MockSubscription<Reaction> {
let subscription = MockSubscription<Reaction>(randomElement: {
mockSubscriptions.create(randomElement: {
Reaction(
type: ReactionType.allCases.randomElement()!.emoji,
metadata: [:],
Expand All @@ -173,8 +167,6 @@ actor MockRoomReactions: RoomReactions {
isSelf: false
)
}, interval: Double.random(in: 0.1 ... 0.5))
mockSubscriptions.append(subscription)
return subscription
}

func send(params: SendReactionParams) async throws {
Expand All @@ -186,9 +178,7 @@ actor MockRoomReactions: RoomReactions {
clientID: clientID,
isSelf: false
)
for subscription in mockSubscriptions {
subscription.emit(reaction)
}
mockSubscriptions.emit(reaction)
}

func subscribe(bufferingPolicy _: BufferingPolicy) -> Subscription<Reaction> {
Expand All @@ -205,7 +195,7 @@ actor MockTyping: Typing {
let roomID: String
let channel: RealtimeChannelProtocol

private var mockSubscriptions: [MockSubscription<TypingEvent>] = []
private let mockSubscriptions = MockSubscriptionStorage<TypingEvent>()

init(clientID: String, roomID: String) {
self.clientID = clientID
Expand All @@ -214,14 +204,12 @@ actor MockTyping: Typing {
}

private func createSubscription() -> MockSubscription<TypingEvent> {
let subscription = MockSubscription<TypingEvent>(randomElement: {
mockSubscriptions.create(randomElement: {
TypingEvent(currentlyTyping: [
MockStrings.names.randomElement()!,
MockStrings.names.randomElement()!,
])
}, interval: 2)
mockSubscriptions.append(subscription)
return subscription
}

func subscribe(bufferingPolicy _: BufferingPolicy) -> Subscription<TypingEvent> {
Expand All @@ -233,15 +221,11 @@ actor MockTyping: Typing {
}

func start() async throws {
for subscription in mockSubscriptions {
subscription.emit(TypingEvent(currentlyTyping: [clientID]))
}
mockSubscriptions.emit(TypingEvent(currentlyTyping: [clientID]))
}

func stop() async throws {
for subscription in mockSubscriptions {
subscription.emit(TypingEvent(currentlyTyping: []))
}
mockSubscriptions.emit(TypingEvent(currentlyTyping: []))
}

func onDiscontinuity(bufferingPolicy _: BufferingPolicy) -> Subscription<DiscontinuityEvent> {
Expand All @@ -253,24 +237,22 @@ actor MockPresence: Presence {
let clientID: String
let roomID: String

private var mockSubscriptions: [MockSubscription<PresenceEvent>] = []
private let mockSubscriptions = MockSubscriptionStorage<PresenceEvent>()

init(clientID: String, roomID: String) {
self.clientID = clientID
self.roomID = roomID
}

private func createSubscription() -> MockSubscription<PresenceEvent> {
let subscription = MockSubscription<PresenceEvent>(randomElement: {
mockSubscriptions.create(randomElement: {
PresenceEvent(
action: [.enter, .leave].randomElement()!,
clientID: MockStrings.names.randomElement()!,
timestamp: Date(),
data: nil
)
}, interval: 5)
mockSubscriptions.append(subscription)
return subscription
}

func get() async throws -> [PresenceMember] {
Expand Down Expand Up @@ -310,16 +292,14 @@ actor MockPresence: Presence {
}

private func enter(dataForEvent: PresenceData?) async throws {
for subscription in mockSubscriptions {
subscription.emit(
PresenceEvent(
action: .enter,
clientID: clientID,
timestamp: Date(),
data: dataForEvent
)
mockSubscriptions.emit(
PresenceEvent(
action: .enter,
clientID: clientID,
timestamp: Date(),
data: dataForEvent
)
}
)
}

func update() async throws {
Expand All @@ -331,16 +311,14 @@ actor MockPresence: Presence {
}

private func update(dataForEvent: PresenceData? = nil) async throws {
for subscription in mockSubscriptions {
subscription.emit(
PresenceEvent(
action: .update,
clientID: clientID,
timestamp: Date(),
data: dataForEvent
)
mockSubscriptions.emit(
PresenceEvent(
action: .update,
clientID: clientID,
timestamp: Date(),
data: dataForEvent
)
}
)
}

func leave() async throws {
Expand All @@ -352,16 +330,14 @@ actor MockPresence: Presence {
}

func leave(dataForEvent: PresenceData? = nil) async throws {
for subscription in mockSubscriptions {
subscription.emit(
PresenceEvent(
action: .leave,
clientID: clientID,
timestamp: Date(),
data: dataForEvent
)
mockSubscriptions.emit(
PresenceEvent(
action: .leave,
clientID: clientID,
timestamp: Date(),
data: dataForEvent
)
}
)
}

func subscribe(event _: PresenceEventType, bufferingPolicy _: BufferingPolicy) -> Subscription<PresenceEvent> {
Expand All @@ -382,7 +358,7 @@ actor MockOccupancy: Occupancy {
let roomID: String
let channel: RealtimeChannelProtocol

private var mockSubscriptions: [MockSubscription<OccupancyEvent>] = []
private let mockSubscriptions = MockSubscriptionStorage<OccupancyEvent>()

init(clientID: String, roomID: String) {
self.clientID = clientID
Expand All @@ -391,12 +367,10 @@ actor MockOccupancy: Occupancy {
}

private func createSubscription() -> MockSubscription<OccupancyEvent> {
let subscription = MockSubscription<OccupancyEvent>(randomElement: {
mockSubscriptions.create(randomElement: {
let random = Int.random(in: 1 ... 10)
return OccupancyEvent(connections: random, presenceMembers: Int.random(in: 0 ... random))
}, interval: 1)
mockSubscriptions.append(subscription)
return subscription
}

func subscribe(bufferingPolicy _: BufferingPolicy) async -> Subscription<OccupancyEvent> {
Expand Down
8 changes: 7 additions & 1 deletion Example/AblyChatExample/Mocks/MockSubscription.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import Ably
import AblyChat
import AsyncAlgorithms

struct MockSubscription<T: Sendable>: Sendable, AsyncSequence {
final class MockSubscription<T: Sendable>: Sendable, AsyncSequence {
typealias Element = T
typealias AsyncTimerMockSequence = AsyncMapSequence<AsyncTimerSequence<ContinuousClock>, Element>
typealias MockMergedSequence = AsyncMerge2Sequence<AsyncStream<Element>, AsyncTimerMockSequence>
Expand All @@ -27,4 +27,10 @@ struct MockSubscription<T: Sendable>: Sendable, AsyncSequence {
randomElement()
})
}

func setOnTermination(_ onTermination: @escaping @Sendable () -> Void) {
continuation.onTermination = { _ in
onTermination()
}
}
}
41 changes: 41 additions & 0 deletions Example/AblyChatExample/Mocks/MockSubscriptionStorage.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import Foundation

// This is copied from ably-chat’s internal class `SubscriptionStorage`.
class MockSubscriptionStorage<Element: Sendable>: @unchecked Sendable {
// We hold a weak reference to the subscriptions that we create, so that the subscriptions’ termination handlers get called when the user releases their final reference to the subscription.
private struct WeaklyHeldSubscription {
weak var subscription: MockSubscription<Element>?
}

/// Access must be synchronised via ``lock``.
private var subscriptions: [UUID: WeaklyHeldSubscription] = [:]
private let lock = NSLock()

// You must not call the `setOnTermination` method of a subscription returned by this function, as it will replace the termination handler set by this function.
func create(randomElement: @escaping @Sendable () -> Element, interval: Double) -> MockSubscription<Element> {
let subscription = MockSubscription<Element>(randomElement: randomElement, interval: interval)
let id = UUID()

lock.lock()
subscriptions[id] = .init(subscription: subscription)
lock.unlock()

subscription.setOnTermination { [weak self] in
self?.subscriptionDidTerminate(id: id)
}

return subscription
}

private func subscriptionDidTerminate(id: UUID) {
lock.lock()
subscriptions.removeValue(forKey: id)
lock.unlock()
}

func emit(_ element: Element) {
for subscription in subscriptions.values {
subscription.subscription?.emit(element)
}
}
}
Loading
Loading