From 39dd3b94cb45dafa16367f66b2d4a99bcb3d2ec2 Mon Sep 17 00:00:00 2001 From: Lawrence Forooghian Date: Wed, 8 Jan 2025 11:25:48 -0300 Subject: [PATCH 1/3] Fix README example of discontinuities --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 13ae613..dd61c9f 100644 --- a/README.md +++ b/README.md @@ -237,8 +237,8 @@ Taking messages as an example, you can listen for discontinuities like so: ```swift let subscription = room.messages.onDiscontinuity() -for await error in subscription { - print("Recovering from the error: \(error)") +for await discontinuityEvent in subscription { + print("Recovering from the error: \(discontinuityEvent.error)") } ``` From 432fa11dd5e9f13b4a38e23ebbba40e17f417ac7 Mon Sep 17 00:00:00 2001 From: Lawrence Forooghian Date: Wed, 8 Jan 2025 09:51:00 -0300 Subject: [PATCH 2/3] Change Subscription types to classes I never really understood how the AsyncStream type, which this wraps, is a struct; it seems to behave like a reference type; for example: - if you start two iterations, one over the original value and one over a copy, then each element is received by precisely one of the iterations; - it seems to call its terminationHandler when the last reference to all of its copies goes away So, change these wrapper types to a reference type so that users can reason about them more easily, and to make it easy for me to add stored properties to them without having to worry about how to fit them into the reference-ish semantics (which I will do in an upcoming commit). --- Example/AblyChatExample/Mocks/MockSubscription.swift | 2 +- Sources/AblyChat/Messages.swift | 4 ++-- Sources/AblyChat/Subscription.swift | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Example/AblyChatExample/Mocks/MockSubscription.swift b/Example/AblyChatExample/Mocks/MockSubscription.swift index eb702e5..e9b978f 100644 --- a/Example/AblyChatExample/Mocks/MockSubscription.swift +++ b/Example/AblyChatExample/Mocks/MockSubscription.swift @@ -2,7 +2,7 @@ import Ably import AblyChat import AsyncAlgorithms -struct MockSubscription: Sendable, AsyncSequence { +final class MockSubscription: Sendable, AsyncSequence { typealias Element = T typealias AsyncTimerMockSequence = AsyncMapSequence, Element> typealias MockMergedSequence = AsyncMerge2Sequence, AsyncTimerMockSequence> diff --git a/Sources/AblyChat/Messages.swift b/Sources/AblyChat/Messages.swift index e333568..0bfd464 100644 --- a/Sources/AblyChat/Messages.swift +++ b/Sources/AblyChat/Messages.swift @@ -192,10 +192,10 @@ internal extension QueryOptions { } // Currently a copy-and-paste of `Subscription`; see notes on that one. For `MessageSubscription`, my intention is that the `BufferingPolicy` passed to `subscribe(bufferingPolicy:)` will also define what the `MessageSubscription` does with messages that are received _before_ the user starts iterating over the sequence (this buffering will allow us to implement the requirement that there be no discontinuity between the the last message returned by `getPreviousMessages` and the first element you get when you iterate). -public struct MessageSubscription: Sendable, AsyncSequence { +public final class MessageSubscription: Sendable, AsyncSequence { public typealias Element = Message - private var subscription: Subscription + private let subscription: Subscription // can be set by either initialiser private let getPreviousMessages: @Sendable (QueryOptions) async throws -> any PaginatedResult diff --git a/Sources/AblyChat/Subscription.swift b/Sources/AblyChat/Subscription.swift index b35ba83..5cb3e45 100644 --- a/Sources/AblyChat/Subscription.swift +++ b/Sources/AblyChat/Subscription.swift @@ -5,7 +5,7 @@ // At some point we should define how this thing behaves when you iterate over it from multiple loops, or when you pass it around. I’m not yet sufficiently experienced with `AsyncSequence` to know what’s idiomatic. I tried the same thing out with `AsyncStream` (two tasks iterating over a single stream) and it appears that each element is delivered to precisely one consumer. But we can leave that for later. On a similar note consider whether it makes a difference whether this is a struct or a class. // // I wanted to implement this as a protocol (from which `MessageSubscription` would then inherit) but struggled to do so (see https://forums.swift.org/t/struggling-to-create-a-protocol-that-inherits-from-asyncsequence-with-primary-associated-type/73950 where someone suggested it’s a compiler bug), hence the struct. I was also hoping that upon switching to Swift 6 we could use AsyncSequence’s `Failure` associated type to simplify the way in which we show that the subscription is non-throwing, but it turns out this can only be done in macOS 15 etc. So I think that for now we’re stuck with things the way they are. -public struct Subscription: Sendable, AsyncSequence { +public final class Subscription: Sendable, AsyncSequence { private enum Mode: Sendable { case `default`(stream: AsyncStream, continuation: AsyncStream.Continuation) case mockAsyncSequence(AnyNonThrowingAsyncSequence) From 9bf73310e5567f97207b7b95110be76b9a8bf91f Mon Sep 17 00:00:00 2001 From: Lawrence Forooghian Date: Wed, 18 Dec 2024 17:23:09 -0300 Subject: [PATCH 3/3] Clean up subscriptions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Hook in to AsyncStream’s termination notification mechanism, so that when the user discards a subscription or cancels the task in which they’re iterating over a subscription, we: - remove this subscription from our internal data structures - remove any corresponding ably-cocoa listeners that drive this subscription I’m sure that there will turn out to be a bunch of wrong stuff that I’ve done here, due to my still-shaky knowledge of concurrency stuff and AsyncSequence best practices, but it’s a start. Resolves #36. --- .../AblyChatExample/Mocks/MockClients.swift | 100 +++++++----------- .../Mocks/MockSubscription.swift | 6 ++ .../Mocks/MockSubscriptionStorage.swift | 41 +++++++ README.md | 40 ++----- Sources/AblyChat/DefaultConnection.swift | 6 +- Sources/AblyChat/DefaultMessages.swift | 15 ++- Sources/AblyChat/DefaultOccupancy.swift | 11 +- Sources/AblyChat/DefaultPresence.swift | 19 +++- .../DefaultRoomLifecycleContributor.swift | 17 ++- Sources/AblyChat/DefaultRoomReactions.swift | 6 +- Sources/AblyChat/DefaultTyping.swift | 9 +- Sources/AblyChat/Messages.swift | 8 ++ Sources/AblyChat/RoomLifecycleManager.swift | 74 ++++--------- Sources/AblyChat/Rooms.swift | 11 +- Sources/AblyChat/Subscription.swift | 30 +++++- Sources/AblyChat/SubscriptionStorage.swift | 60 +++++++++++ .../Mocks/MockFeatureChannel.swift | 11 +- .../Mocks/MockRealtimeChannel.swift | 4 +- .../MockRoomLifecycleContributorChannel.swift | 10 +- .../Mocks/MockRoomLifecycleManager.swift | 11 +- .../SubscriptionStorageTests.swift | 40 +++++++ Tests/AblyChatTests/SubscriptionTests.swift | 32 ++++++ 22 files changed, 354 insertions(+), 207 deletions(-) create mode 100644 Example/AblyChatExample/Mocks/MockSubscriptionStorage.swift create mode 100644 Sources/AblyChat/SubscriptionStorage.swift create mode 100644 Tests/AblyChatTests/SubscriptionStorageTests.swift diff --git a/Example/AblyChatExample/Mocks/MockClients.swift b/Example/AblyChatExample/Mocks/MockClients.swift index 0a82762..86c8209 100644 --- a/Example/AblyChatExample/Mocks/MockClients.swift +++ b/Example/AblyChatExample/Mocks/MockClients.swift @@ -64,7 +64,7 @@ actor MockRoom: Room { var status: RoomStatus = .initialized - private var mockSubscriptions: [MockSubscription] = [] + private let mockSubscriptions = MockSubscriptionStorage() func attach() async throws { print("Mock client attached to room with roomID: \(roomID)") @@ -75,11 +75,9 @@ actor MockRoom: Room { } private func createSubscription() -> MockSubscription { - let subscription = MockSubscription(randomElement: { + mockSubscriptions.create(randomElement: { RoomStatusChange(current: [.attached, .attached, .attached, .attached, .attaching(error: nil), .attaching(error: nil), .suspended(error: .createUnknownError())].randomElement()!, previous: .attaching(error: nil)) }, interval: 8) - mockSubscriptions.append(subscription) - return subscription } func onStatusChange(bufferingPolicy _: BufferingPolicy) async -> Subscription { @@ -92,7 +90,7 @@ actor MockMessages: Messages { let roomID: String let channel: RealtimeChannelProtocol - private var mockSubscriptions: [MockSubscription] = [] + private let mockSubscriptions = MockSubscriptionStorage() init(clientID: String, roomID: String) { self.clientID = clientID @@ -101,7 +99,7 @@ actor MockMessages: Messages { } private func createSubscription() -> MockSubscription { - let subscription = MockSubscription(randomElement: { + mockSubscriptions.create(randomElement: { Message( serial: "\(Date().timeIntervalSince1970)", action: .create, @@ -113,8 +111,6 @@ actor MockMessages: Messages { headers: [:] ) }, interval: 3) - mockSubscriptions.append(subscription) - return subscription } func subscribe(bufferingPolicy _: BufferingPolicy) async -> MessageSubscription { @@ -138,9 +134,7 @@ actor MockMessages: Messages { metadata: params.metadata ?? [:], headers: params.headers ?? [:] ) - for subscription in mockSubscriptions { - subscription.emit(message) - } + mockSubscriptions.emit(message) return message } @@ -154,7 +148,7 @@ actor MockRoomReactions: RoomReactions { let roomID: String let channel: RealtimeChannelProtocol - private var mockSubscriptions: [MockSubscription] = [] + private let mockSubscriptions = MockSubscriptionStorage() init(clientID: String, roomID: String) { self.clientID = clientID @@ -163,7 +157,7 @@ actor MockRoomReactions: RoomReactions { } private func createSubscription() -> MockSubscription { - let subscription = MockSubscription(randomElement: { + mockSubscriptions.create(randomElement: { Reaction( type: ReactionType.allCases.randomElement()!.emoji, metadata: [:], @@ -173,8 +167,6 @@ actor MockRoomReactions: RoomReactions { isSelf: false ) }, interval: Double.random(in: 0.1 ... 0.5)) - mockSubscriptions.append(subscription) - return subscription } func send(params: SendReactionParams) async throws { @@ -186,9 +178,7 @@ actor MockRoomReactions: RoomReactions { clientID: clientID, isSelf: false ) - for subscription in mockSubscriptions { - subscription.emit(reaction) - } + mockSubscriptions.emit(reaction) } func subscribe(bufferingPolicy _: BufferingPolicy) -> Subscription { @@ -205,7 +195,7 @@ actor MockTyping: Typing { let roomID: String let channel: RealtimeChannelProtocol - private var mockSubscriptions: [MockSubscription] = [] + private let mockSubscriptions = MockSubscriptionStorage() init(clientID: String, roomID: String) { self.clientID = clientID @@ -214,14 +204,12 @@ actor MockTyping: Typing { } private func createSubscription() -> MockSubscription { - let subscription = MockSubscription(randomElement: { + mockSubscriptions.create(randomElement: { TypingEvent(currentlyTyping: [ MockStrings.names.randomElement()!, MockStrings.names.randomElement()!, ]) }, interval: 2) - mockSubscriptions.append(subscription) - return subscription } func subscribe(bufferingPolicy _: BufferingPolicy) -> Subscription { @@ -233,15 +221,11 @@ actor MockTyping: Typing { } func start() async throws { - for subscription in mockSubscriptions { - subscription.emit(TypingEvent(currentlyTyping: [clientID])) - } + mockSubscriptions.emit(TypingEvent(currentlyTyping: [clientID])) } func stop() async throws { - for subscription in mockSubscriptions { - subscription.emit(TypingEvent(currentlyTyping: [])) - } + mockSubscriptions.emit(TypingEvent(currentlyTyping: [])) } func onDiscontinuity(bufferingPolicy _: BufferingPolicy) -> Subscription { @@ -253,7 +237,7 @@ actor MockPresence: Presence { let clientID: String let roomID: String - private var mockSubscriptions: [MockSubscription] = [] + private let mockSubscriptions = MockSubscriptionStorage() init(clientID: String, roomID: String) { self.clientID = clientID @@ -261,7 +245,7 @@ actor MockPresence: Presence { } private func createSubscription() -> MockSubscription { - let subscription = MockSubscription(randomElement: { + mockSubscriptions.create(randomElement: { PresenceEvent( action: [.enter, .leave].randomElement()!, clientID: MockStrings.names.randomElement()!, @@ -269,8 +253,6 @@ actor MockPresence: Presence { data: nil ) }, interval: 5) - mockSubscriptions.append(subscription) - return subscription } func get() async throws -> [PresenceMember] { @@ -310,16 +292,14 @@ actor MockPresence: Presence { } private func enter(dataForEvent: PresenceData?) async throws { - for subscription in mockSubscriptions { - subscription.emit( - PresenceEvent( - action: .enter, - clientID: clientID, - timestamp: Date(), - data: dataForEvent - ) + mockSubscriptions.emit( + PresenceEvent( + action: .enter, + clientID: clientID, + timestamp: Date(), + data: dataForEvent ) - } + ) } func update() async throws { @@ -331,16 +311,14 @@ actor MockPresence: Presence { } private func update(dataForEvent: PresenceData? = nil) async throws { - for subscription in mockSubscriptions { - subscription.emit( - PresenceEvent( - action: .update, - clientID: clientID, - timestamp: Date(), - data: dataForEvent - ) + mockSubscriptions.emit( + PresenceEvent( + action: .update, + clientID: clientID, + timestamp: Date(), + data: dataForEvent ) - } + ) } func leave() async throws { @@ -352,16 +330,14 @@ actor MockPresence: Presence { } func leave(dataForEvent: PresenceData? = nil) async throws { - for subscription in mockSubscriptions { - subscription.emit( - PresenceEvent( - action: .leave, - clientID: clientID, - timestamp: Date(), - data: dataForEvent - ) + mockSubscriptions.emit( + PresenceEvent( + action: .leave, + clientID: clientID, + timestamp: Date(), + data: dataForEvent ) - } + ) } func subscribe(event _: PresenceEventType, bufferingPolicy _: BufferingPolicy) -> Subscription { @@ -382,7 +358,7 @@ actor MockOccupancy: Occupancy { let roomID: String let channel: RealtimeChannelProtocol - private var mockSubscriptions: [MockSubscription] = [] + private let mockSubscriptions = MockSubscriptionStorage() init(clientID: String, roomID: String) { self.clientID = clientID @@ -391,12 +367,10 @@ actor MockOccupancy: Occupancy { } private func createSubscription() -> MockSubscription { - let subscription = MockSubscription(randomElement: { + mockSubscriptions.create(randomElement: { let random = Int.random(in: 1 ... 10) return OccupancyEvent(connections: random, presenceMembers: Int.random(in: 0 ... random)) }, interval: 1) - mockSubscriptions.append(subscription) - return subscription } func subscribe(bufferingPolicy _: BufferingPolicy) async -> Subscription { diff --git a/Example/AblyChatExample/Mocks/MockSubscription.swift b/Example/AblyChatExample/Mocks/MockSubscription.swift index e9b978f..9bd161c 100644 --- a/Example/AblyChatExample/Mocks/MockSubscription.swift +++ b/Example/AblyChatExample/Mocks/MockSubscription.swift @@ -27,4 +27,10 @@ final class MockSubscription: Sendable, AsyncSequence { randomElement() }) } + + func setOnTermination(_ onTermination: @escaping @Sendable () -> Void) { + continuation.onTermination = { _ in + onTermination() + } + } } diff --git a/Example/AblyChatExample/Mocks/MockSubscriptionStorage.swift b/Example/AblyChatExample/Mocks/MockSubscriptionStorage.swift new file mode 100644 index 0000000..5a14ba9 --- /dev/null +++ b/Example/AblyChatExample/Mocks/MockSubscriptionStorage.swift @@ -0,0 +1,41 @@ +import Foundation + +// This is copied from ably-chat’s internal class `SubscriptionStorage`. +class MockSubscriptionStorage: @unchecked Sendable { + // We hold a weak reference to the subscriptions that we create, so that the subscriptions’ termination handlers get called when the user releases their final reference to the subscription. + private struct WeaklyHeldSubscription { + weak var subscription: MockSubscription? + } + + /// Access must be synchronised via ``lock``. + private var subscriptions: [UUID: WeaklyHeldSubscription] = [:] + private let lock = NSLock() + + // You must not call the `setOnTermination` method of a subscription returned by this function, as it will replace the termination handler set by this function. + func create(randomElement: @escaping @Sendable () -> Element, interval: Double) -> MockSubscription { + let subscription = MockSubscription(randomElement: randomElement, interval: interval) + let id = UUID() + + lock.lock() + subscriptions[id] = .init(subscription: subscription) + lock.unlock() + + subscription.setOnTermination { [weak self] in + self?.subscriptionDidTerminate(id: id) + } + + return subscription + } + + private func subscriptionDidTerminate(id: UUID) { + lock.lock() + subscriptions.removeValue(forKey: id) + lock.unlock() + } + + func emit(_ element: Element) { + for subscription in subscriptions.values { + subscription.subscription?.emit(element) + } + } +} diff --git a/README.md b/README.md index dd61c9f..ee9590e 100644 --- a/README.md +++ b/README.md @@ -103,7 +103,7 @@ let error = await chatClient.connection.error ### Subscribing to connection status changes -You can subscribe to connection status changes by registering a listener, like so: +To subscribe to connection status changes, create a subscription with the `onStatusChange` method. You can then iterate over it using its `AsyncSequence` interface: ```swift let subscription = chatClient.connection.onStatusChange() @@ -112,12 +112,6 @@ for await statusChange in subscription { } ``` -To stop listening to changes, call the `unsubscribe` method on the returned subscription instance: - -```swift -subscription.unsubscribe() -``` - ## Chat rooms ### Creating or retrieving a chat room @@ -209,7 +203,7 @@ switch await room.status { ### Listening to room status updates -You can also subscribe to changes in the room status and be notified whenever they happen by registering a listener: +You can also subscribe to changes in the room status and be notified whenever they happen by creating a subscription using the room’s `onStatusChange` method and then iterating over this subscription using its `AsyncSequence` interface: ```swift let statusSubscription = try await room.onStatusChange() @@ -218,19 +212,13 @@ for await status in statusSubscription { } ``` -To stop listening to room status changes, call the `unsubscribe` method on the returned subscription instance: - -```swift -statusSubscription.unsubscribe() -``` - ## Handling discontinuity There may be instances where the connection to Ably is lost for a period of time, for example, when the user enters a tunnel. In many circumstances, the connection will recover and operation will continue with no discontinuity of messages. However, during extended periods of disconnection, continuity cannot be guaranteed and you'll need to take steps to recover messages you might have missed. -Each feature of the Chat SDK provides an `onDiscontinuity` method. Here you can register a listener that will be notified whenever a +Each feature of the Chat SDK provides an `onDiscontinuity` method. Here you can create a subscription that will emit a discontinuity event on its `AsyncSequence` interface whenever a discontinuity in that feature has been observed. Taking messages as an example, you can listen for discontinuities like so: @@ -242,8 +230,6 @@ for await discontinuityEvent in subscription { } ``` -To stop listening to discontinuities, call `unsubscribe` method on returned subscription instance. - ## Chat messages ### Subscribing to incoming messages @@ -257,8 +243,6 @@ for await message in messagesSubscription { } ``` -To stop listening for the new messages, call the `unsubscribe` method on the returned subscription instance. - ### Sending messages To send a message, simply call `send` on the room `messages` property, with the message you want to send: @@ -286,8 +270,8 @@ if paginatedResult.hasNext { ### Retrieving message history for a subscribed listener -In addition to being able to unsubscribe from messages, the return value from `messages.subscribe` also includes the `getPreviousMessages` -method. It can be used to request historical messages in the chat room that were sent up to the point that a particular listener was subscribed. It returns a +The return value from `messages.subscribe` includes the `getPreviousMessages` +method, which can be used to request historical messages in the chat room that were sent up to the point that a particular listener was subscribed. It returns a paginated response that can be used to request for more messages: ```swift @@ -349,7 +333,7 @@ try await room.presence.leave(data: ["status": "Bye!"]) ### Subscribing to presence updates -You can provide a single listener for all presence event types: +You can create a single subscription for all presence event types: ```swift let presenceSubscription = try await room.presence.subscribe(events: [.enter, .leave, .update]) @@ -358,8 +342,6 @@ for await event in presenceSubscription { } ``` -To stop listening for the presence updates, call the `unsubscribe` method on the returned subscription instance. - ## Typing indicators > [!NOTE] @@ -401,7 +383,7 @@ try await room.typing.stop() ### Subscribing to typing updates -To subscribe to typing events, create a subscription with the `subscribe` method: +To subscribe to typing events, create a subscription with the `subscribe` method. You can then iterate over it using its `AsyncSequence` interface: ```swift let typingSubscription = try await room.typing.subscribe() @@ -410,15 +392,13 @@ for await typing in typingSubscription { } ``` -To stop listening for the typing events, call the `unsubscribe` method on the returned subscription instance. - ## Occupancy of a chat room Occupancy tells you how many users are connected to the chat room. ### Subscribing to occupancy updates -To subscribe to occupancy updates, subscribe a listener to the chat room `occupancy` member: +To subscribe to occupancy updates, create a subscription by calling the `subscribe` method on the chat room’s `occupancy` member. You can then iterate over it using its `AsyncSequence` interface: ```swift let occupancySubscription = try await room.occupancy.subscribe() @@ -427,8 +407,6 @@ for await event in occupancySubscription { } ``` -To stop listening for the typing events, call the `unsubscribe` method on the returned subscription instance. - Occupancy updates are delivered in near-real-time, with updates in quick succession batched together for performance. ### Retrieving the occupancy of a chat room @@ -469,8 +447,6 @@ for await reaction in reactionSubscription { } ``` -To stop receiving reactions, call the `unsubscribe` method on the returned subscription instance. - ## Example app This repository contains an example app, written using SwiftUI, which demonstrates how to use the SDK. The code for this app is in the [`Example`](Example) directory. diff --git a/Sources/AblyChat/DefaultConnection.swift b/Sources/AblyChat/DefaultConnection.swift index 014af8b..782c012 100644 --- a/Sources/AblyChat/DefaultConnection.swift +++ b/Sources/AblyChat/DefaultConnection.swift @@ -30,7 +30,7 @@ internal final class DefaultConnection: Connection { let subscription = Subscription(bufferingPolicy: bufferingPolicy) // (CHA-CS5) The chat client must monitor the underlying realtime connection for connection status changes. - realtime.connection.on { [weak self] stateChange in + let eventListener = realtime.connection.on { [weak self] stateChange in guard let self else { return } @@ -95,6 +95,10 @@ internal final class DefaultConnection: Connection { } } + subscription.addTerminationHandler { [weak self] in + self?.realtime.connection.off(eventListener) + } + return subscription } } diff --git a/Sources/AblyChat/DefaultMessages.swift b/Sources/AblyChat/DefaultMessages.swift index 8229d3a..62901ca 100644 --- a/Sources/AblyChat/DefaultMessages.swift +++ b/Sources/AblyChat/DefaultMessages.swift @@ -15,7 +15,6 @@ internal final class DefaultMessages: Messages, EmitsDiscontinuities { private let clientID: String private let logger: InternalLogger - // TODO: https://github.com/ably-labs/ably-chat-swift/issues/36 - Handle unsubscribing in line with CHA-M4b // UUID acts as a unique identifier for each listener/subscription. MessageSubscriptionWrapper houses the subscription and the serial of when it was attached or resumed. private var subscriptionPoints: [UUID: MessageSubscriptionWrapper] = [:] @@ -53,7 +52,7 @@ internal final class DefaultMessages: Messages, EmitsDiscontinuities { // (CHA-M4c) When a realtime message with name set to message.created is received, it is translated into a message event, which contains a type field with the event type as well as a message field containing the Message Struct. This event is then broadcast to all subscribers. // (CHA-M4d) If a realtime message with an unknown name is received, the SDK shall silently discard the message, though it may log at DEBUG or TRACE level. // (CHA-M5k) Incoming realtime events that are malformed (unknown field should be ignored) shall not be emitted to subscribers. - channel.subscribe(RealtimeMessageName.chatMessage.rawValue) { message in + let eventListener = channel.subscribe(RealtimeMessageName.chatMessage.rawValue) { message in Task { // TODO: Revisit errors thrown as part of https://github.com/ably-labs/ably-chat-swift/issues/32 guard let ablyCocoaData = message.data, @@ -104,6 +103,18 @@ internal final class DefaultMessages: Messages, EmitsDiscontinuities { } } + messageSubscription.addTerminationHandler { + Task { + await MainActor.run { [weak self] () in + guard let self else { + return + } + channel.unsubscribe(eventListener) + subscriptionPoints.removeValue(forKey: uuid) + } + } + } + return messageSubscription } diff --git a/Sources/AblyChat/DefaultOccupancy.swift b/Sources/AblyChat/DefaultOccupancy.swift index 5fa7ef7..e67aec5 100644 --- a/Sources/AblyChat/DefaultOccupancy.swift +++ b/Sources/AblyChat/DefaultOccupancy.swift @@ -22,8 +22,10 @@ internal final class DefaultOccupancy: Occupancy, EmitsDiscontinuities { // (CHA-04d) If an invalid occupancy event is received on the channel, it shall be dropped. internal func subscribe(bufferingPolicy: BufferingPolicy) async -> Subscription { logger.log(message: "Subscribing to occupancy events", level: .debug) + let subscription = Subscription(bufferingPolicy: bufferingPolicy) - channel.subscribe(OccupancyEvents.meta.rawValue) { [logger] message in + + let eventListener = channel.subscribe(OccupancyEvents.meta.rawValue) { [logger] message in logger.log(message: "Received occupancy message: \(message)", level: .debug) guard let data = message.data as? [String: Any], let metrics = data["metrics"] as? [String: Any] @@ -40,6 +42,13 @@ internal final class DefaultOccupancy: Occupancy, EmitsDiscontinuities { logger.log(message: "Emitting occupancy event: \(occupancyEvent)", level: .debug) subscription.emit(occupancyEvent) } + + subscription.addTerminationHandler { [weak self] in + if let eventListener { + self?.channel.off(eventListener) + } + } + return subscription } diff --git a/Sources/AblyChat/DefaultPresence.swift b/Sources/AblyChat/DefaultPresence.swift index b6470c0..cc8dd52 100644 --- a/Sources/AblyChat/DefaultPresence.swift +++ b/Sources/AblyChat/DefaultPresence.swift @@ -199,7 +199,7 @@ internal final class DefaultPresence: Presence, EmitsDiscontinuities { internal func subscribe(event: PresenceEventType, bufferingPolicy: BufferingPolicy) async -> Subscription { logger.log(message: "Subscribing to presence events", level: .debug) let subscription = Subscription(bufferingPolicy: bufferingPolicy) - channel.presence.subscribe(event.toARTPresenceAction()) { [processPresenceSubscribe, logger] message in + let eventListener = channel.presence.subscribe(event.toARTPresenceAction()) { [processPresenceSubscribe, logger] message in logger.log(message: "Received presence message: \(message)", level: .debug) Task { // processPresenceSubscribe is logging so we don't need to log here @@ -207,13 +207,19 @@ internal final class DefaultPresence: Presence, EmitsDiscontinuities { subscription.emit(presenceEvent) } } + subscription.addTerminationHandler { [weak channel] in + if let eventListener { + channel?.presence.unsubscribe(eventListener) + } + } return subscription } internal func subscribe(events: [PresenceEventType], bufferingPolicy: BufferingPolicy) async -> Subscription { logger.log(message: "Subscribing to presence events", level: .debug) let subscription = Subscription(bufferingPolicy: bufferingPolicy) - for event in events { + + let eventListeners = events.map { event in channel.presence.subscribe(event.toARTPresenceAction()) { [processPresenceSubscribe, logger] message in logger.log(message: "Received presence message: \(message)", level: .debug) Task { @@ -222,6 +228,15 @@ internal final class DefaultPresence: Presence, EmitsDiscontinuities { } } } + + subscription.addTerminationHandler { [weak self] in + for eventListener in eventListeners { + if let eventListener { + self?.channel.presence.unsubscribe(eventListener) + } + } + } + return subscription } diff --git a/Sources/AblyChat/DefaultRoomLifecycleContributor.swift b/Sources/AblyChat/DefaultRoomLifecycleContributor.swift index c7e38ad..718d500 100644 --- a/Sources/AblyChat/DefaultRoomLifecycleContributor.swift +++ b/Sources/AblyChat/DefaultRoomLifecycleContributor.swift @@ -3,7 +3,7 @@ import Ably internal actor DefaultRoomLifecycleContributor: RoomLifecycleContributor, EmitsDiscontinuities, CustomDebugStringConvertible { internal nonisolated let channel: DefaultRoomLifecycleContributorChannel internal nonisolated let feature: RoomFeature - private var discontinuitySubscriptions: [Subscription] = [] + private var discontinuitySubscriptions = SubscriptionStorage() internal init(channel: DefaultRoomLifecycleContributorChannel, feature: RoomFeature) { self.channel = channel @@ -13,16 +13,11 @@ internal actor DefaultRoomLifecycleContributor: RoomLifecycleContributor, EmitsD // MARK: - Discontinuities internal func emitDiscontinuity(_ discontinuity: DiscontinuityEvent) { - for subscription in discontinuitySubscriptions { - subscription.emit(discontinuity) - } + discontinuitySubscriptions.emit(discontinuity) } internal func onDiscontinuity(bufferingPolicy: BufferingPolicy) -> Subscription { - let subscription = Subscription(bufferingPolicy: bufferingPolicy) - // TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36) - discontinuitySubscriptions.append(subscription) - return subscription + discontinuitySubscriptions.create(bufferingPolicy: bufferingPolicy) } // MARK: - CustomDebugStringConvertible @@ -56,9 +51,11 @@ internal final class DefaultRoomLifecycleContributorChannel: RoomLifecycleContri } internal func subscribeToState() async -> Subscription { - // TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36) let subscription = Subscription(bufferingPolicy: .unbounded) - underlyingChannel.on { subscription.emit($0) } + let eventListener = underlyingChannel.on { subscription.emit($0) } + subscription.addTerminationHandler { [weak underlyingChannel] in + underlyingChannel?.unsubscribe(eventListener) + } return subscription } diff --git a/Sources/AblyChat/DefaultRoomReactions.swift b/Sources/AblyChat/DefaultRoomReactions.swift index ea0ac89..6918484 100644 --- a/Sources/AblyChat/DefaultRoomReactions.swift +++ b/Sources/AblyChat/DefaultRoomReactions.swift @@ -40,7 +40,7 @@ internal final class DefaultRoomReactions: RoomReactions, EmitsDiscontinuities { let subscription = Subscription(bufferingPolicy: bufferingPolicy) // (CHA-ER4c) Realtime events with an unknown name shall be silently discarded. - channel.subscribe(RoomReactionEvents.reaction.rawValue) { [clientID, logger] message in + let eventListener = channel.subscribe(RoomReactionEvents.reaction.rawValue) { [clientID, logger] message in logger.log(message: "Received roomReaction message: \(message)", level: .debug) Task { do { @@ -82,6 +82,10 @@ internal final class DefaultRoomReactions: RoomReactions, EmitsDiscontinuities { } } + subscription.addTerminationHandler { [weak self] in + self?.channel.unsubscribe(eventListener) + } + return subscription } diff --git a/Sources/AblyChat/DefaultTyping.swift b/Sources/AblyChat/DefaultTyping.swift index 89796ca..5919633 100644 --- a/Sources/AblyChat/DefaultTyping.swift +++ b/Sources/AblyChat/DefaultTyping.swift @@ -25,7 +25,7 @@ internal final class DefaultTyping: Typing { let subscription = Subscription(bufferingPolicy: bufferingPolicy) let eventTracker = EventTracker() - channel.presence.subscribe { [weak self] message in + let eventListener = channel.presence.subscribe { [weak self] message in guard let self else { return } @@ -72,6 +72,13 @@ internal final class DefaultTyping: Typing { logger.log(message: "Failed to fetch presence set after \(maxRetryDuration) seconds. Giving up.", level: .error) } } + + subscription.addTerminationHandler { [weak self] in + if let eventListener { + self?.channel.presence.unsubscribe(eventListener) + } + } + return subscription } diff --git a/Sources/AblyChat/Messages.swift b/Sources/AblyChat/Messages.swift index 0bfd464..5a7114c 100644 --- a/Sources/AblyChat/Messages.swift +++ b/Sources/AblyChat/Messages.swift @@ -192,6 +192,10 @@ internal extension QueryOptions { } // Currently a copy-and-paste of `Subscription`; see notes on that one. For `MessageSubscription`, my intention is that the `BufferingPolicy` passed to `subscribe(bufferingPolicy:)` will also define what the `MessageSubscription` does with messages that are received _before_ the user starts iterating over the sequence (this buffering will allow us to implement the requirement that there be no discontinuity between the the last message returned by `getPreviousMessages` and the first element you get when you iterate). + +/// A non-throwing `AsyncSequence` whose element is ``Message``. The Chat SDK uses this type as the return value of the ``Messages`` methods that allow you to find out about received chat messages. +/// +/// You should only iterate over a given `MessageSubscription` once; the results of iterating more than once are undefined. public final class MessageSubscription: Sendable, AsyncSequence { public typealias Element = Message @@ -219,6 +223,10 @@ public final class MessageSubscription: Sendable, AsyncSequence { subscription.emit(element) } + internal func addTerminationHandler(_ onTermination: @escaping (@Sendable () -> Void)) { + subscription.addTerminationHandler(onTermination) + } + public func getPreviousMessages(params: QueryOptions) async throws -> any PaginatedResult { try await getPreviousMessages(params) } diff --git a/Sources/AblyChat/RoomLifecycleManager.swift b/Sources/AblyChat/RoomLifecycleManager.swift index 90f9ed7..2284447 100644 --- a/Sources/AblyChat/RoomLifecycleManager.swift +++ b/Sources/AblyChat/RoomLifecycleManager.swift @@ -87,8 +87,7 @@ internal actor DefaultRoomLifecycleManager! - // TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36) - private var roomStatusChangeSubscriptions: [Subscription] = [] + private var roomStatusChangeSubscriptions = SubscriptionStorage() private var operationResultContinuations = OperationResultContinuations() // MARK: - Initializers and `deinit` @@ -330,15 +329,12 @@ internal actor DefaultRoomLifecycleManager Subscription { - let subscription: Subscription = .init(bufferingPolicy: bufferingPolicy) - roomStatusChangeSubscriptions.append(subscription) - return subscription + roomStatusChangeSubscriptions.create(bufferingPolicy: bufferingPolicy) } #if DEBUG - // TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36) /// Supports the ``testsOnly_onRoomStatusChange()`` method. - private var statusChangeSubscriptions: [Subscription] = [] + private var statusChangeSubscriptions = SubscriptionStorage() internal struct StatusChange { internal var current: Status @@ -347,15 +343,7 @@ internal actor DefaultRoomLifecycleManager Subscription { - let subscription: Subscription = .init(bufferingPolicy: .unbounded) - statusChangeSubscriptions.append(subscription) - return subscription - } - - internal func emitStatusChange(_ change: StatusChange) { - for subscription in statusChangeSubscriptions { - subscription.emit(change) - } + statusChangeSubscriptions.create(bufferingPolicy: .unbounded) } #endif @@ -368,27 +356,20 @@ internal actor DefaultRoomLifecycleManager] = [] + private var stateChangeHandledSubscriptions = SubscriptionStorage() /// Returns a subscription which emits the contributor state changes that have been handled by the manager. /// @@ -401,9 +382,7 @@ internal actor DefaultRoomLifecycleManager Subscription { - let subscription = Subscription(bufferingPolicy: .unbounded) - stateChangeHandledSubscriptions.append(subscription) - return subscription + stateChangeHandledSubscriptions.create(bufferingPolicy: .unbounded) } internal func testsOnly_pendingDiscontinuityEvent(for contributor: Contributor) -> DiscontinuityEvent? { @@ -422,9 +401,8 @@ internal actor DefaultRoomLifecycleManager] = [] + private var transientDisconnectTimeoutHandledSubscriptions = SubscriptionStorage() /// Returns a subscription which emits the IDs of the transient disconnect timeouts that have been handled by the manager. /// @@ -432,9 +410,7 @@ internal actor DefaultRoomLifecycleManager Subscription { - let subscription = Subscription(bufferingPolicy: .unbounded) - transientDisconnectTimeoutHandledSubscriptions.append(subscription) - return subscription + transientDisconnectTimeoutHandledSubscriptions.create(bufferingPolicy: .unbounded) } #endif @@ -560,18 +536,14 @@ internal actor DefaultRoomLifecycleManager] = [] + private var operationWaitEventSubscriptions = SubscriptionStorage() /// Returns a subscription which emits an event each time one room lifecycle operation is going to wait for another to complete. internal func testsOnly_subscribeToOperationWaitEvents() -> Subscription { - let subscription = Subscription(bufferingPolicy: .unbounded) - operationWaitEventSubscriptions.append(subscription) - return subscription + operationWaitEventSubscriptions.create(bufferingPolicy: .unbounded) } #endif @@ -693,9 +662,7 @@ internal actor DefaultRoomLifecycleManager] = [] + private var statusChangeWaitEventSubscriptions = SubscriptionStorage() /// Returns a subscription which emits an event each time ``waitToBeAbleToPerformPresenceOperations(requestedByFeature:)`` is going to wait for a room status change. internal func testsOnly_subscribeToStatusChangeWaitEvents() -> Subscription { - let subscription = Subscription(bufferingPolicy: .unbounded) - statusChangeWaitEventSubscriptions.append(subscription) - return subscription + statusChangeWaitEventSubscriptions.create(bufferingPolicy: .unbounded) } #endif } diff --git a/Sources/AblyChat/Rooms.swift b/Sources/AblyChat/Rooms.swift index 3a061e2..d922a0a 100644 --- a/Sources/AblyChat/Rooms.swift +++ b/Sources/AblyChat/Rooms.swift @@ -137,22 +137,17 @@ internal actor DefaultRooms: Rooms { internal var waitedOperationType: OperationType } - // TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36) /// Supports the ``testsOnly_subscribeToOperationWaitEvents()`` method. - private var operationWaitEventSubscriptions: [Subscription] = [] + private var operationWaitEventSubscriptions = SubscriptionStorage() /// Returns a subscription which emits an event each time one operation is going to wait for another to complete. internal func testsOnly_subscribeToOperationWaitEvents() -> Subscription { - let subscription = Subscription(bufferingPolicy: .unbounded) - operationWaitEventSubscriptions.append(subscription) - return subscription + operationWaitEventSubscriptions.create(bufferingPolicy: .unbounded) } private func emitOperationWaitEvent(waitingOperationType: OperationType, waitedOperationType: OperationType) { let operationWaitEvent = OperationWaitEvent(waitingOperationType: waitingOperationType, waitedOperationType: waitedOperationType) - for subscription in operationWaitEventSubscriptions { - subscription.emit(operationWaitEvent) - } + operationWaitEventSubscriptions.emit(operationWaitEvent) } #endif diff --git a/Sources/AblyChat/Subscription.swift b/Sources/AblyChat/Subscription.swift index 5cb3e45..1ebb71c 100644 --- a/Sources/AblyChat/Subscription.swift +++ b/Sources/AblyChat/Subscription.swift @@ -1,3 +1,5 @@ +import Foundation + // A non-throwing `AsyncSequence` (means that we can iterate over it without a `try`). // // This should respect the `BufferingPolicy` passed to the `subscribe(bufferingPolicy:)` method. @@ -5,7 +7,11 @@ // At some point we should define how this thing behaves when you iterate over it from multiple loops, or when you pass it around. I’m not yet sufficiently experienced with `AsyncSequence` to know what’s idiomatic. I tried the same thing out with `AsyncStream` (two tasks iterating over a single stream) and it appears that each element is delivered to precisely one consumer. But we can leave that for later. On a similar note consider whether it makes a difference whether this is a struct or a class. // // I wanted to implement this as a protocol (from which `MessageSubscription` would then inherit) but struggled to do so (see https://forums.swift.org/t/struggling-to-create-a-protocol-that-inherits-from-asyncsequence-with-primary-associated-type/73950 where someone suggested it’s a compiler bug), hence the struct. I was also hoping that upon switching to Swift 6 we could use AsyncSequence’s `Failure` associated type to simplify the way in which we show that the subscription is non-throwing, but it turns out this can only be done in macOS 15 etc. So I think that for now we’re stuck with things the way they are. -public final class Subscription: Sendable, AsyncSequence { + +/// A non-throwing `AsyncSequence`. The Chat SDK uses this type as the return value of the methods that allow you to find out about events such as typing events, connection status changes, discontinuity events etc. +/// +/// You should only iterate over a given `Subscription` once; the results of iterating more than once are undefined. +public final class Subscription: @unchecked Sendable, AsyncSequence { private enum Mode: Sendable { case `default`(stream: AsyncStream, continuation: AsyncStream.Continuation) case mockAsyncSequence(AnyNonThrowingAsyncSequence) @@ -45,6 +51,9 @@ public final class Subscription: Sendable, AsyncSequence { } } + // Access must be synchronised using ``lock``. + private var terminationHandlers: [@Sendable () -> Void] = [] + private let lock = NSLock() private let mode: Mode internal init(bufferingPolicy: BufferingPolicy) { @@ -71,13 +80,24 @@ public final class Subscription: Sendable, AsyncSequence { } } - // TODO: https://github.com/ably-labs/ably-chat-swift/issues/36 Revisit how we want to unsubscribe to fulfil CHA-M4b & CHA-ER4b. I think exposing this publicly for all Subscription types is suitable. - public func unsubscribe() { + internal func addTerminationHandler(_ terminationHandler: @escaping (@Sendable () -> Void)) { + var terminationHandlers: [@Sendable () -> Void] + lock.lock() + terminationHandlers = self.terminationHandlers + terminationHandlers.append(terminationHandler) + self.terminationHandlers = terminationHandlers + lock.unlock() + switch mode { case let .default(_, continuation): - continuation.finish() + let constantTerminationHandlers = terminationHandlers + continuation.onTermination = { _ in + for terminationHandler in constantTerminationHandlers { + terminationHandler() + } + } case .mockAsyncSequence: - fatalError("`finish` cannot be called on a Subscription that was created using init(mockAsyncSequence:)") + fatalError("`addTerminationHandler(_:)` cannot be called on a Subscription that was created using init(mockAsyncSequence:)") } } diff --git a/Sources/AblyChat/SubscriptionStorage.swift b/Sources/AblyChat/SubscriptionStorage.swift new file mode 100644 index 0000000..ceca00f --- /dev/null +++ b/Sources/AblyChat/SubscriptionStorage.swift @@ -0,0 +1,60 @@ +import Foundation + +/// Maintains a list of `Subscription` objects, from which it removes a subscription once the subscription is no longer in use. +/// +/// Offers the ability to create a new subscription (using ``create(bufferingPolicy:)``) or to emit a value on all subscriptions (using ``emit(_:)``). +internal class SubscriptionStorage: @unchecked Sendable { + // A note about the use of `@unchecked Sendable` here: This is a type that updates its own state in response to external events (i.e. subscription termination), and I wasn’t sure how to perform this mutation in the context of some external actor that owns the mutable state held in this type. So instead I made this class own its mutable state and take responsibility for its synchronisation, and I decided to do perform this synchronisation manually instead of introducing _another_ layer of actors for something that really doesn’t seem like it should be an actor; it’s just meant to be a utility type. But we can revisit this decision. + + // We hold a weak reference to the subscriptions that we create, so that the subscriptions’ termination handlers get called when the user releases their final reference to the subscription. + private struct WeaklyHeldSubscription { + internal weak var subscription: Subscription? + } + + /// Access must be synchronised via ``lock``. + private var subscriptions: [UUID: WeaklyHeldSubscription] = [:] + private let lock = NSLock() + + /// Creates a subscription and adds it to the list managed by this `SubscriptionStorage` instance. + /// + /// The `SubscriptionStorage` instance will remove this subscription from its list once the subscription “terminates” (meaning that there are no longer any references to it, or the task in which it was being iterated was cancelled). + internal func create(bufferingPolicy: BufferingPolicy) -> Subscription { + let subscription = Subscription(bufferingPolicy: bufferingPolicy) + let id = UUID() + + lock.lock() + subscriptions[id] = .init(subscription: subscription) + lock.unlock() + + subscription.addTerminationHandler { [weak self] in + self?.subscriptionDidTerminate(id: id) + } + + return subscription + } + + #if DEBUG + internal var testsOnly_subscriptionCount: Int { + let count: Int + lock.lock() + count = subscriptions.count + lock.unlock() + return count + } + #endif + + private func subscriptionDidTerminate(id: UUID) { + lock.lock() + subscriptions.removeValue(forKey: id) + lock.unlock() + } + + /// Emits an element on all of the subscriptions in the reciever’s managed list. + internal func emit(_ element: Element) { + lock.lock() + for subscription in subscriptions.values { + subscription.subscription?.emit(element) + } + lock.unlock() + } +} diff --git a/Tests/AblyChatTests/Mocks/MockFeatureChannel.swift b/Tests/AblyChatTests/Mocks/MockFeatureChannel.swift index 106084f..2b66506 100644 --- a/Tests/AblyChatTests/Mocks/MockFeatureChannel.swift +++ b/Tests/AblyChatTests/Mocks/MockFeatureChannel.swift @@ -3,8 +3,7 @@ import Ably final actor MockFeatureChannel: FeatureChannel { let channel: RealtimeChannelProtocol - // TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36) - private var discontinuitySubscriptions: [Subscription] = [] + private var discontinuitySubscriptions = SubscriptionStorage() private let resultOfWaitToBeAbleToPerformPresenceOperations: Result? init( @@ -16,15 +15,11 @@ final actor MockFeatureChannel: FeatureChannel { } func onDiscontinuity(bufferingPolicy: BufferingPolicy) async -> Subscription { - let subscription = Subscription(bufferingPolicy: bufferingPolicy) - discontinuitySubscriptions.append(subscription) - return subscription + discontinuitySubscriptions.create(bufferingPolicy: bufferingPolicy) } func emitDiscontinuity(_ discontinuity: DiscontinuityEvent) { - for subscription in discontinuitySubscriptions { - subscription.emit(discontinuity) - } + discontinuitySubscriptions.emit(discontinuity) } func waitToBeAbleToPerformPresenceOperations(requestedByFeature _: RoomFeature) async throws(ARTErrorInfo) { diff --git a/Tests/AblyChatTests/Mocks/MockRealtimeChannel.swift b/Tests/AblyChatTests/Mocks/MockRealtimeChannel.swift index f4e7c57..071bc4a 100644 --- a/Tests/AblyChatTests/Mocks/MockRealtimeChannel.swift +++ b/Tests/AblyChatTests/Mocks/MockRealtimeChannel.swift @@ -163,7 +163,7 @@ final class MockRealtimeChannel: NSObject, RealtimeChannelProtocol { } func unsubscribe(_: ARTEventListener?) { - fatalError("Not implemented") + // no-op; revisit if we need to test something that depends on this method actually stopping `subscribe` from emitting more events } func unsubscribe(_: String, listener _: ARTEventListener?) { @@ -199,7 +199,7 @@ final class MockRealtimeChannel: NSObject, RealtimeChannelProtocol { } func off(_: ARTEventListener) { - fatalError("Not implemented") + // no-op; revisit if we need to test something that depends on this method actually stopping `on` from emitting more events } func off() { diff --git a/Tests/AblyChatTests/Mocks/MockRoomLifecycleContributorChannel.swift b/Tests/AblyChatTests/Mocks/MockRoomLifecycleContributorChannel.swift index a7b13a8..9b25c41 100644 --- a/Tests/AblyChatTests/Mocks/MockRoomLifecycleContributorChannel.swift +++ b/Tests/AblyChatTests/Mocks/MockRoomLifecycleContributorChannel.swift @@ -8,8 +8,7 @@ final actor MockRoomLifecycleContributorChannel: RoomLifecycleContributorChannel var state: ARTRealtimeChannelState var errorReason: ARTErrorInfo? - // TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36) - private var subscriptions: [Subscription] = [] + private var subscriptions = SubscriptionStorage() private(set) var attachCallCount = 0 private(set) var detachCallCount = 0 @@ -108,8 +107,7 @@ final actor MockRoomLifecycleContributorChannel: RoomLifecycleContributorChannel } func subscribeToState() -> Subscription { - let subscription = Subscription(bufferingPolicy: .unbounded) - subscriptions.append(subscription) + let subscription = subscriptions.create(bufferingPolicy: .unbounded) switch subscribeToStateBehavior { case .justAddSubscription: @@ -122,8 +120,6 @@ final actor MockRoomLifecycleContributorChannel: RoomLifecycleContributorChannel } func emitStateChange(_ stateChange: ARTChannelStateChange) { - for subscription in subscriptions { - subscription.emit(stateChange) - } + subscriptions.emit(stateChange) } } diff --git a/Tests/AblyChatTests/Mocks/MockRoomLifecycleManager.swift b/Tests/AblyChatTests/Mocks/MockRoomLifecycleManager.swift index 061a25f..e1254bd 100644 --- a/Tests/AblyChatTests/Mocks/MockRoomLifecycleManager.swift +++ b/Tests/AblyChatTests/Mocks/MockRoomLifecycleManager.swift @@ -8,8 +8,7 @@ actor MockRoomLifecycleManager: RoomLifecycleManager { private(set) var detachCallCount = 0 private(set) var releaseCallCount = 0 private let _roomStatus: RoomStatus? - // TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36) - private var subscriptions: [Subscription] = [] + private var subscriptions = SubscriptionStorage() init(attachResult: Result? = nil, detachResult: Result? = nil, roomStatus: RoomStatus? = nil) { self.attachResult = attachResult @@ -45,15 +44,11 @@ actor MockRoomLifecycleManager: RoomLifecycleManager { } func onRoomStatusChange(bufferingPolicy: BufferingPolicy) async -> Subscription { - let subscription = Subscription(bufferingPolicy: bufferingPolicy) - subscriptions.append(subscription) - return subscription + subscriptions.create(bufferingPolicy: bufferingPolicy) } func emitStatusChange(_ statusChange: RoomStatusChange) { - for subscription in subscriptions { - subscription.emit(statusChange) - } + subscriptions.emit(statusChange) } func waitToBeAbleToPerformPresenceOperations(requestedByFeature _: RoomFeature) async throws(ARTErrorInfo) { diff --git a/Tests/AblyChatTests/SubscriptionStorageTests.swift b/Tests/AblyChatTests/SubscriptionStorageTests.swift new file mode 100644 index 0000000..e89315b --- /dev/null +++ b/Tests/AblyChatTests/SubscriptionStorageTests.swift @@ -0,0 +1,40 @@ +@testable import AblyChat +import Testing + +struct SubscriptionStorageTests { + @Test + func emit() async throws { + let storage = SubscriptionStorage() + let subscriptions = (0 ..< 10).map { _ in storage.create(bufferingPolicy: .unbounded) } + storage.emit("hello") + + var emittedElements: [String] = [] + for subscription in subscriptions { + try emittedElements.append(#require(await subscription.first { _ in true })) + } + + #expect(emittedElements == Array(repeating: "hello", count: 10)) + } + + @Test + func removesSubscriptionOnTermination() async throws { + let storage = SubscriptionStorage() + let subscriptionTerminatedSignal = AsyncStream.makeStream() + + ({ + let subscription = storage.create(bufferingPolicy: .unbounded) + subscription.addTerminationHandler { + subscriptionTerminatedSignal.continuation.yield() + } + + withExtendedLifetime(subscription) { + #expect(storage.testsOnly_subscriptionCount == 1) + } + + // Now there are no more references to `subscription`. + })() + + await subscriptionTerminatedSignal.stream.first { _ in true } + #expect(storage.testsOnly_subscriptionCount == 0) + } +} diff --git a/Tests/AblyChatTests/SubscriptionTests.swift b/Tests/AblyChatTests/SubscriptionTests.swift index a8f7d43..58f889d 100644 --- a/Tests/AblyChatTests/SubscriptionTests.swift +++ b/Tests/AblyChatTests/SubscriptionTests.swift @@ -21,4 +21,36 @@ struct SubscriptionTests { #expect(await emittedElements == ["First", "Second"]) } + + @Test + func addTerminationHandler_terminationHandlerCalledWhenSubscriptionDiscarded() async throws { + let onTerminationCalled = AsyncStream.makeStream() + + ({ + let subscription = Subscription(bufferingPolicy: .unbounded) + subscription.addTerminationHandler { + onTerminationCalled.continuation.yield() + } + // Now there are no more references to `subscription`. + })() + + await onTerminationCalled.stream.first { _ in true } + } + + @Test + func addTerminationHandler_terminationHandlerCalledWhenIterationTaskCancelled() async throws { + let onTerminationCalled = AsyncStream.makeStream() + + let subscription = Subscription(bufferingPolicy: .unbounded) + subscription.addTerminationHandler { + onTerminationCalled.continuation.yield() + } + + let iterationTask = Task { + for await _ in subscription {} + } + iterationTask.cancel() + + await onTerminationCalled.stream.first { _ in true } + } }