From 202f70ae9e294f89973208924c23f31c19b44b9a Mon Sep 17 00:00:00 2001 From: Lawrence Forooghian Date: Thu, 22 Aug 2024 15:15:00 +0100 Subject: [PATCH] Implement the Subscription type MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Note that XCTAssertEqual doesn’t allow you to write `await` inside its arguments, hence the indirection to get the result of a couple of `async let`s. Hopefully we’ll be able to migrate to Swift Testing at some point, which will resolve this; see #21. I’ve also implemented MessageSubscription by wrapping Subscription. --- Package.swift | 4 + Package@swift-6.swift | 4 + Sources/AblyChat/BufferingPolicy.swift | 11 ++ Sources/AblyChat/Messages.swift | 34 +++++- Sources/AblyChat/Subscription.swift | 105 ++++++++++++++++-- .../MessageSubscriptionTests.swift | 26 +++++ Tests/AblyChatTests/SubscriptionTests.swift | 54 +++++++++ 7 files changed, 224 insertions(+), 14 deletions(-) create mode 100644 Tests/AblyChatTests/MessageSubscriptionTests.swift create mode 100644 Tests/AblyChatTests/SubscriptionTests.swift diff --git a/Package.swift b/Package.swift index bf4b0de..b4d5955 100644 --- a/Package.swift +++ b/Package.swift @@ -59,6 +59,10 @@ let package = Package( name: "AblyChatTests", dependencies: [ "AblyChat", + .product( + name: "AsyncAlgorithms", + package: "swift-async-algorithms" + ), ] ), .executableTarget( diff --git a/Package@swift-6.swift b/Package@swift-6.swift index 538f334..fc233cd 100644 --- a/Package@swift-6.swift +++ b/Package@swift-6.swift @@ -45,6 +45,10 @@ let package = Package( name: "AblyChatTests", dependencies: [ "AblyChat", + .product( + name: "AsyncAlgorithms", + package: "swift-async-algorithms" + ), ] ), .executableTarget( diff --git a/Sources/AblyChat/BufferingPolicy.swift b/Sources/AblyChat/BufferingPolicy.swift index 110dac3..b944090 100644 --- a/Sources/AblyChat/BufferingPolicy.swift +++ b/Sources/AblyChat/BufferingPolicy.swift @@ -4,4 +4,15 @@ public enum BufferingPolicy: Sendable { case unbounded case bufferingOldest(Int) case bufferingNewest(Int) + + internal func asAsyncStreamBufferingPolicy() -> AsyncStream.Continuation.BufferingPolicy { + switch self { + case let .bufferingNewest(count): + .bufferingNewest(count) + case let .bufferingOldest(count): + .bufferingOldest(count) + case .unbounded: + .unbounded + } + } } diff --git a/Sources/AblyChat/Messages.swift b/Sources/AblyChat/Messages.swift index 9f7a620..de1ba9c 100644 --- a/Sources/AblyChat/Messages.swift +++ b/Sources/AblyChat/Messages.swift @@ -54,21 +54,43 @@ public struct QueryOptionsWithoutDirection: Sendable { public struct MessageSubscription: Sendable, AsyncSequence { public typealias Element = Message - public init(mockAsyncSequence _: T) where T.Element == Element { - fatalError("Not yet implemented") + private var subscription: Subscription + + private typealias MockGetPreviousMessages = @Sendable (QueryOptionsWithoutDirection) async throws -> any PaginatedResult + private var mockGetPreviousMessages: MockGetPreviousMessages? + + internal init(bufferingPolicy: BufferingPolicy) { + subscription = .init(bufferingPolicy: bufferingPolicy) + } + + public init(mockAsyncSequence: T, mockGetPreviousMessages _: (QueryOptionsWithoutDirection) async throws -> any PaginatedResult) where T.Element == Element { + subscription = .init(mockAsyncSequence: mockAsyncSequence) + } + + internal func emit(_ element: Element) { + subscription.emit(element) } - public func getPreviousMessages(params _: QueryOptionsWithoutDirection) async throws -> any PaginatedResult { - fatalError("Not yet implemented") + public func getPreviousMessages(params: QueryOptionsWithoutDirection) async throws -> any PaginatedResult { + guard let mockImplementation = mockGetPreviousMessages else { + fatalError("Not yet implemented") + } + return try await mockImplementation(params) } public struct AsyncIterator: AsyncIteratorProtocol { + private var subscriptionIterator: Subscription.AsyncIterator + + fileprivate init(subscriptionIterator: Subscription.AsyncIterator) { + self.subscriptionIterator = subscriptionIterator + } + public mutating func next() async -> Element? { - fatalError("Not implemented") + await subscriptionIterator.next() } } public func makeAsyncIterator() -> AsyncIterator { - fatalError("Not implemented") + .init(subscriptionIterator: subscription.makeAsyncIterator()) } } diff --git a/Sources/AblyChat/Subscription.swift b/Sources/AblyChat/Subscription.swift index e79b09b..b07f6a3 100644 --- a/Sources/AblyChat/Subscription.swift +++ b/Sources/AblyChat/Subscription.swift @@ -4,22 +4,111 @@ // // At some point we should define how this thing behaves when you iterate over it from multiple loops, or when you pass it around. I’m not yet sufficiently experienced with `AsyncSequence` to know what’s idiomatic. I tried the same thing out with `AsyncStream` (two tasks iterating over a single stream) and it appears that each element is delivered to precisely one consumer. But we can leave that for later. On a similar note consider whether it makes a difference whether this is a struct or a class. // -// TODO: I wanted to implement this as a protocol (from which `MessageSubscription` would then inherit) but struggled to do so, hence the struct. Try again sometime. We can also revisit our implementation of `AsyncSequence` if we migrate to Swift 6, which adds primary types and typed errors to `AsyncSequence` and should make things easier. -public struct Subscription: Sendable, AsyncSequence { - // This is a workaround for the fact that, as mentioned above, `Subscription` is a struct when I would have liked it to be a protocol. It allows people mocking our SDK to create a `Subscription` so that they can return it from their mocks. The intention of this initializer is that if you use it, then the created `Subscription` will just replay the sequence that you pass it. - public init(mockAsyncSequence _: T) where T.Element == Element { - fatalError("Not implemented") +// TODO: I wanted to implement this as a protocol (from which `MessageSubscription` would then inherit) but struggled to do so, hence the struct. Try again sometime. We can also revisit our implementation of `AsyncSequence` if we migrate to Swift 6, which adds primary types and typed errors to `AsyncSequence` and should make things easier; see https://github.com/ably-labs/ably-chat-swift/issues/21. +public struct Subscription: Sendable, AsyncSequence { + private enum Mode: Sendable { + case `default`(stream: AsyncStream, continuation: AsyncStream.Continuation) + case mockAsyncSequence(AnyNonThrowingAsyncSequence) } - // (The below is just necessary boilerplate to get this to compile; the key point is that `next()` does not have a `throws` annotation.) + /// A type-erased AsyncSequence that doesn’t throw any errors. + fileprivate struct AnyNonThrowingAsyncSequence: AsyncSequence, Sendable { + private var makeAsyncIteratorImpl: @Sendable () -> AsyncIterator + + init(asyncSequence: T) where T.Element == Element { + makeAsyncIteratorImpl = { + AsyncIterator(asyncIterator: asyncSequence.makeAsyncIterator()) + } + } + + fileprivate struct AsyncIterator: AsyncIteratorProtocol { + private var nextImpl: () async -> Element? + + init(asyncIterator: T) where T.Element == Element { + var iterator = asyncIterator + nextImpl = { () async -> Element? in + do { + return try await iterator.next() + } catch { + fatalError("The AsyncSequence passed to Subscription.init(mockAsyncSequence:) threw an error: \(error). This is not supported.") + } + } + } + + mutating func next() async -> Element? { + await nextImpl() + } + } + + func makeAsyncIterator() -> AsyncIterator { + makeAsyncIteratorImpl() + } + } + + private let mode: Mode + + internal init(bufferingPolicy: BufferingPolicy) { + let (stream, continuation) = AsyncStream.makeStream(of: Element.self, bufferingPolicy: bufferingPolicy.asAsyncStreamBufferingPolicy()) + mode = .default(stream: stream, continuation: continuation) + } + + // This is a workaround for the fact that, as mentioned above, `Subscription` is a struct when I would have liked it to be a protocol. It allows people mocking our SDK to create a `Subscription` so that they can return it from their mocks. The intention of this initializer is that if you use it, then the created `Subscription` will just replay the sequence that you pass it. It is a programmer error to pass a throwing AsyncSequence. + public init(mockAsyncSequence: T) where T.Element == Element { + mode = .mockAsyncSequence(.init(asyncSequence: mockAsyncSequence)) + } + + /** + Causes the subscription to make a new element available on its `AsyncSequence` interface. + + It is a programmer error to call this when the receiver was created using ``init(mockAsyncSequence:)``. + */ + internal func emit(_ element: Element) { + switch mode { + case let .default(_, continuation): + continuation.yield(element) + case .mockAsyncSequence: + fatalError("`emit` cannot be called on a Subscription that was created using init(mockAsyncSequence:)") + } + } public struct AsyncIterator: AsyncIteratorProtocol { + fileprivate enum Mode { + case `default`(iterator: AsyncStream.AsyncIterator) + case mockAsyncSequence(iterator: AnyNonThrowingAsyncSequence.AsyncIterator) + + mutating func next() async -> Element? { + switch self { + case var .default(iterator: iterator): + let next = await iterator.next() + self = .default(iterator: iterator) + return next + case var .mockAsyncSequence(iterator: iterator): + let next = await iterator.next() + self = .mockAsyncSequence(iterator: iterator) + return next + } + } + } + + private var mode: Mode + + fileprivate init(mode: Mode) { + self.mode = mode + } + public mutating func next() async -> Element? { - fatalError("Not implemented") + await mode.next() } } public func makeAsyncIterator() -> AsyncIterator { - fatalError("Not implemented") + let iteratorMode: AsyncIterator.Mode = switch mode { + case let .default(stream: stream, continuation: _): + .default(iterator: stream.makeAsyncIterator()) + case let .mockAsyncSequence(asyncSequence): + .mockAsyncSequence(iterator: asyncSequence.makeAsyncIterator()) + } + + return .init(mode: iteratorMode) } } diff --git a/Tests/AblyChatTests/MessageSubscriptionTests.swift b/Tests/AblyChatTests/MessageSubscriptionTests.swift new file mode 100644 index 0000000..a3a80d5 --- /dev/null +++ b/Tests/AblyChatTests/MessageSubscriptionTests.swift @@ -0,0 +1,26 @@ +@testable import AblyChat +import AsyncAlgorithms +import XCTest + +class SubscriptionTests: XCTestCase { + func testWithMockAsyncSequence() async { + let subscription = Subscription(mockAsyncSequence: ["First", "Second"].async) + + async let emittedElements = Array(subscription.prefix(2)) + + let awaitedEmittedElements = await emittedElements + XCTAssertEqual(awaitedEmittedElements, ["First", "Second"]) + } + + func testEmit() async { + let subscription = Subscription(bufferingPolicy: .unbounded) + + async let emittedElements = Array(subscription.prefix(2)) + + subscription.emit("First") + subscription.emit("Second") + + let awaitedEmittedElements = await emittedElements + XCTAssertEqual(awaitedEmittedElements, ["First", "Second"]) + } +} diff --git a/Tests/AblyChatTests/SubscriptionTests.swift b/Tests/AblyChatTests/SubscriptionTests.swift new file mode 100644 index 0000000..7d77b48 --- /dev/null +++ b/Tests/AblyChatTests/SubscriptionTests.swift @@ -0,0 +1,54 @@ +@testable import AblyChat +import AsyncAlgorithms +import XCTest + +private final class MockPaginatedResult: PaginatedResult { + var items: [T] { fatalError("Not implemented") } + + var hasNext: Bool { fatalError("Not implemented") } + + var isLast: Bool { fatalError("Not implemented") } + + var next: (any AblyChat.PaginatedResult)? { fatalError("Not implemented") } + + var first: any AblyChat.PaginatedResult { fatalError("Not implemented") } + + var current: any AblyChat.PaginatedResult { fatalError("Not implemented") } + + init() {} +} + +class MessageSubscriptionTests: XCTestCase { + let messages = ["First", "Second"].map { + Message(timeserial: "", clientID: "", roomID: "", text: $0, createdAt: .now, metadata: [:], headers: [:]) + } + + func testWithMockAsyncSequence() async { + let subscription = MessageSubscription(mockAsyncSequence: messages.async) { _ in fatalError("Not implemented") } + + async let emittedElements = Array(subscription.prefix(2)) + + let awaitedEmittedElements = await emittedElements + XCTAssertEqual(awaitedEmittedElements.map(\.text), ["First", "Second"]) + } + + func testEmit() async { + let subscription = MessageSubscription(bufferingPolicy: .unbounded) + + async let emittedElements = Array(subscription.prefix(2)) + + subscription.emit(messages[0]) + subscription.emit(messages[1]) + + let awaitedEmittedElements = await emittedElements + XCTAssertEqual(awaitedEmittedElements.map(\.text), ["First", "Second"]) + } + + func funcTestMockGetPreviousMessages() async throws { + let mockPaginatedResult = MockPaginatedResult() + let subscription = MessageSubscription(mockAsyncSequence: [].async) { _ in mockPaginatedResult } + + let result = try await subscription.getPreviousMessages(params: .init()) + XCTAssertIdentical(result, mockPaginatedResult) + } +}