From 98f0267800f6fd47186b36ea282820d329f025f9 Mon Sep 17 00:00:00 2001 From: Lawrence Forooghian Date: Thu, 22 Aug 2024 15:15:00 +0100 Subject: [PATCH] Implement the Subscription type MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Note that XCTAssertEqual doesn’t allow you to write `await` inside its arguments, hence the indirection to get the result of a couple of `async let`s. Hopefully we’ll be able to migrate to Swift Testing at some point, which will resolve this; see #21. --- Package.swift | 4 + Sources/AblyChat/BufferingPolicy.swift | 11 ++ Sources/AblyChat/Subscription.swift | 106 ++++++++++++++++++-- Tests/AblyChatTests/SubscriptionTests.swift | 26 +++++ 4 files changed, 139 insertions(+), 8 deletions(-) create mode 100644 Tests/AblyChatTests/SubscriptionTests.swift diff --git a/Package.swift b/Package.swift index bf4b0de..b4d5955 100644 --- a/Package.swift +++ b/Package.swift @@ -59,6 +59,10 @@ let package = Package( name: "AblyChatTests", dependencies: [ "AblyChat", + .product( + name: "AsyncAlgorithms", + package: "swift-async-algorithms" + ), ] ), .executableTarget( diff --git a/Sources/AblyChat/BufferingPolicy.swift b/Sources/AblyChat/BufferingPolicy.swift index 110dac3..b944090 100644 --- a/Sources/AblyChat/BufferingPolicy.swift +++ b/Sources/AblyChat/BufferingPolicy.swift @@ -4,4 +4,15 @@ public enum BufferingPolicy: Sendable { case unbounded case bufferingOldest(Int) case bufferingNewest(Int) + + internal func asAsyncStreamBufferingPolicy() -> AsyncStream.Continuation.BufferingPolicy { + switch self { + case let .bufferingNewest(count): + .bufferingNewest(count) + case let .bufferingOldest(count): + .bufferingOldest(count) + case .unbounded: + .unbounded + } + } } diff --git a/Sources/AblyChat/Subscription.swift b/Sources/AblyChat/Subscription.swift index e79b09b..70665d3 100644 --- a/Sources/AblyChat/Subscription.swift +++ b/Sources/AblyChat/Subscription.swift @@ -4,22 +4,112 @@ // // At some point we should define how this thing behaves when you iterate over it from multiple loops, or when you pass it around. I’m not yet sufficiently experienced with `AsyncSequence` to know what’s idiomatic. I tried the same thing out with `AsyncStream` (two tasks iterating over a single stream) and it appears that each element is delivered to precisely one consumer. But we can leave that for later. On a similar note consider whether it makes a difference whether this is a struct or a class. // -// TODO: I wanted to implement this as a protocol (from which `MessageSubscription` would then inherit) but struggled to do so, hence the struct. Try again sometime. We can also revisit our implementation of `AsyncSequence` if we migrate to Swift 6, which adds primary types and typed errors to `AsyncSequence` and should make things easier. -public struct Subscription: Sendable, AsyncSequence { - // This is a workaround for the fact that, as mentioned above, `Subscription` is a struct when I would have liked it to be a protocol. It allows people mocking our SDK to create a `Subscription` so that they can return it from their mocks. The intention of this initializer is that if you use it, then the created `Subscription` will just replay the sequence that you pass it. - public init(mockAsyncSequence _: T) where T.Element == Element { - fatalError("Not implemented") +// TODO: I wanted to implement this as a protocol (from which `MessageSubscription` would then inherit) but struggled to do so, hence the struct. Try again sometime. We can also revisit our implementation of `AsyncSequence` if we migrate to Swift 6, which adds primary types and typed errors to `AsyncSequence` and should make things easier; see https://github.com/ably-labs/ably-chat-swift/issues/21. +public struct Subscription: Sendable, AsyncSequence { + private enum Mode: Sendable { + case `default`(stream: AsyncStream, continuation: AsyncStream.Continuation) + case mockAsyncSequence(TypeErasedNonThrowingAsyncSequence) } - // (The below is just necessary boilerplate to get this to compile; the key point is that `next()` does not have a `throws` annotation.) + fileprivate struct TypeErasedNonThrowingAsyncSequence: AsyncSequence, Sendable { + private var makeAsyncIteratorImpl: @Sendable () -> AsyncIterator + + init(asyncSequence: T) where T.Element == Element { + makeAsyncIteratorImpl = { + AsyncIterator(asyncIterator: asyncSequence.makeAsyncIterator()) + } + } + + fileprivate struct AsyncIterator: AsyncIteratorProtocol { + private var nextImpl: () async -> Element? + + init(asyncIterator: T) where T.Element == Element { + var iterator = asyncIterator + nextImpl = { () async -> Element? in + let element: Element? + do { + return try await iterator.next() + } catch { + fatalError("The AsyncSequence passed to Subscription.init(mockAsyncSequence:) threw an error: \(error). This is not supported.") + } + return element + } + } + + mutating func next() async -> Element? { + await nextImpl() + } + } + + func makeAsyncIterator() -> AsyncIterator { + makeAsyncIteratorImpl() + } + } + + private let mode: Mode + + internal init(bufferingPolicy: BufferingPolicy) { + let (stream, continuation) = AsyncStream.makeStream(of: Element.self, bufferingPolicy: bufferingPolicy.asAsyncStreamBufferingPolicy()) + mode = .default(stream: stream, continuation: continuation) + } + + // This is a workaround for the fact that, as mentioned above, `Subscription` is a struct when I would have liked it to be a protocol. It allows people mocking our SDK to create a `Subscription` so that they can return it from their mocks. The intention of this initializer is that if you use it, then the created `Subscription` will just replay the sequence that you pass it. It is a programmer error to pass a throwing AsyncSequence. + public init(mockAsyncSequence: T) where T.Element == Element { + mode = .mockAsyncSequence(.init(asyncSequence: mockAsyncSequence)) + } + + /** + Causes the subscription to make a new element available on its `AsyncSequence` interface. + + It is a programmer error to call this when the receiver was created using ``init(mockAsyncSequence:)``. + */ + internal func emit(_ element: Element) { + switch mode { + case let .default(_, continuation): + continuation.yield(element) + case .mockAsyncSequence: + fatalError("`emit` cannot be called on a Subscription that was created using init(mockAsyncSequence:)") + } + } public struct AsyncIterator: AsyncIteratorProtocol { + fileprivate enum Mode { + case `default`(iterator: AsyncStream.AsyncIterator) + case mockAsyncSequence(iterator: TypeErasedNonThrowingAsyncSequence.AsyncIterator) + + mutating func next() async -> Element? { + switch self { + case var .default(iterator: iterator): + let next = await iterator.next() + self = .default(iterator: iterator) + return next + case var .mockAsyncSequence(iterator: iterator): + let next = await iterator.next() + self = .mockAsyncSequence(iterator: iterator) + return next + } + } + } + + private var mode: Mode + + fileprivate init(mode: Mode) { + self.mode = mode + } + public mutating func next() async -> Element? { - fatalError("Not implemented") + await mode.next() } } public func makeAsyncIterator() -> AsyncIterator { - fatalError("Not implemented") + let iteratorMode: AsyncIterator.Mode = switch mode { + case let .default(stream: stream, continuation: _): + .default(iterator: stream.makeAsyncIterator()) + case let .mockAsyncSequence(asyncSequence): + .mockAsyncSequence(iterator: asyncSequence.makeAsyncIterator()) + } + + return .init(mode: iteratorMode) } } diff --git a/Tests/AblyChatTests/SubscriptionTests.swift b/Tests/AblyChatTests/SubscriptionTests.swift new file mode 100644 index 0000000..a3a80d5 --- /dev/null +++ b/Tests/AblyChatTests/SubscriptionTests.swift @@ -0,0 +1,26 @@ +@testable import AblyChat +import AsyncAlgorithms +import XCTest + +class SubscriptionTests: XCTestCase { + func testWithMockAsyncSequence() async { + let subscription = Subscription(mockAsyncSequence: ["First", "Second"].async) + + async let emittedElements = Array(subscription.prefix(2)) + + let awaitedEmittedElements = await emittedElements + XCTAssertEqual(awaitedEmittedElements, ["First", "Second"]) + } + + func testEmit() async { + let subscription = Subscription(bufferingPolicy: .unbounded) + + async let emittedElements = Array(subscription.prefix(2)) + + subscription.emit("First") + subscription.emit("Second") + + let awaitedEmittedElements = await emittedElements + XCTAssertEqual(awaitedEmittedElements, ["First", "Second"]) + } +}