Skip to content

Commit

Permalink
Implement the Subscription type
Browse files Browse the repository at this point in the history
Note that XCTAssertEqual doesn’t allow you to write `await` inside its
arguments, hence the indirection to get the result of a couple of `async
let`s. Hopefully we’ll be able to migrate to Swift Testing at some
point, which will resolve this; see #21.

I’ve also implemented MessageSubscription by wrapping Subscription.
  • Loading branch information
lawrence-forooghian committed Aug 24, 2024
1 parent 783865c commit 202f70a
Show file tree
Hide file tree
Showing 7 changed files with 224 additions and 14 deletions.
4 changes: 4 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ let package = Package(
name: "AblyChatTests",
dependencies: [
"AblyChat",
.product(
name: "AsyncAlgorithms",
package: "swift-async-algorithms"
),
]
),
.executableTarget(
Expand Down
4 changes: 4 additions & 0 deletions [email protected]
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ let package = Package(
name: "AblyChatTests",
dependencies: [
"AblyChat",
.product(
name: "AsyncAlgorithms",
package: "swift-async-algorithms"
),
]
),
.executableTarget(
Expand Down
11 changes: 11 additions & 0 deletions Sources/AblyChat/BufferingPolicy.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,15 @@ public enum BufferingPolicy: Sendable {
case unbounded
case bufferingOldest(Int)
case bufferingNewest(Int)

internal func asAsyncStreamBufferingPolicy<T>() -> AsyncStream<T>.Continuation.BufferingPolicy {
switch self {
case let .bufferingNewest(count):
.bufferingNewest(count)
case let .bufferingOldest(count):
.bufferingOldest(count)
case .unbounded:
.unbounded
}
}
}
34 changes: 28 additions & 6 deletions Sources/AblyChat/Messages.swift
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,43 @@ public struct QueryOptionsWithoutDirection: Sendable {
public struct MessageSubscription: Sendable, AsyncSequence {
public typealias Element = Message

public init<T: AsyncSequence>(mockAsyncSequence _: T) where T.Element == Element {
fatalError("Not yet implemented")
private var subscription: Subscription<Element>

private typealias MockGetPreviousMessages = @Sendable (QueryOptionsWithoutDirection) async throws -> any PaginatedResult<Message>
private var mockGetPreviousMessages: MockGetPreviousMessages?

internal init(bufferingPolicy: BufferingPolicy) {
subscription = .init(bufferingPolicy: bufferingPolicy)
}

public init<T: AsyncSequence & Sendable>(mockAsyncSequence: T, mockGetPreviousMessages _: (QueryOptionsWithoutDirection) async throws -> any PaginatedResult<Message>) where T.Element == Element {
subscription = .init(mockAsyncSequence: mockAsyncSequence)
}

internal func emit(_ element: Element) {
subscription.emit(element)
}

public func getPreviousMessages(params _: QueryOptionsWithoutDirection) async throws -> any PaginatedResult<Message> {
fatalError("Not yet implemented")
public func getPreviousMessages(params: QueryOptionsWithoutDirection) async throws -> any PaginatedResult<Message> {
guard let mockImplementation = mockGetPreviousMessages else {
fatalError("Not yet implemented")
}
return try await mockImplementation(params)
}

public struct AsyncIterator: AsyncIteratorProtocol {
private var subscriptionIterator: Subscription<Element>.AsyncIterator

fileprivate init(subscriptionIterator: Subscription<Element>.AsyncIterator) {
self.subscriptionIterator = subscriptionIterator
}

public mutating func next() async -> Element? {
fatalError("Not implemented")
await subscriptionIterator.next()
}
}

public func makeAsyncIterator() -> AsyncIterator {
fatalError("Not implemented")
.init(subscriptionIterator: subscription.makeAsyncIterator())
}
}
105 changes: 97 additions & 8 deletions Sources/AblyChat/Subscription.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,111 @@
//
// At some point we should define how this thing behaves when you iterate over it from multiple loops, or when you pass it around. I’m not yet sufficiently experienced with `AsyncSequence` to know what’s idiomatic. I tried the same thing out with `AsyncStream` (two tasks iterating over a single stream) and it appears that each element is delivered to precisely one consumer. But we can leave that for later. On a similar note consider whether it makes a difference whether this is a struct or a class.
//
// TODO: I wanted to implement this as a protocol (from which `MessageSubscription` would then inherit) but struggled to do so, hence the struct. Try again sometime. We can also revisit our implementation of `AsyncSequence` if we migrate to Swift 6, which adds primary types and typed errors to `AsyncSequence` and should make things easier.
public struct Subscription<Element>: Sendable, AsyncSequence {
// This is a workaround for the fact that, as mentioned above, `Subscription` is a struct when I would have liked it to be a protocol. It allows people mocking our SDK to create a `Subscription` so that they can return it from their mocks. The intention of this initializer is that if you use it, then the created `Subscription` will just replay the sequence that you pass it.
public init<T: AsyncSequence>(mockAsyncSequence _: T) where T.Element == Element {
fatalError("Not implemented")
// TODO: I wanted to implement this as a protocol (from which `MessageSubscription` would then inherit) but struggled to do so, hence the struct. Try again sometime. We can also revisit our implementation of `AsyncSequence` if we migrate to Swift 6, which adds primary types and typed errors to `AsyncSequence` and should make things easier; see https://github.com/ably-labs/ably-chat-swift/issues/21.
public struct Subscription<Element: Sendable>: Sendable, AsyncSequence {
private enum Mode: Sendable {
case `default`(stream: AsyncStream<Element>, continuation: AsyncStream<Element>.Continuation)
case mockAsyncSequence(AnyNonThrowingAsyncSequence)
}

// (The below is just necessary boilerplate to get this to compile; the key point is that `next()` does not have a `throws` annotation.)
/// A type-erased AsyncSequence that doesn’t throw any errors.
fileprivate struct AnyNonThrowingAsyncSequence: AsyncSequence, Sendable {
private var makeAsyncIteratorImpl: @Sendable () -> AsyncIterator

init<T: AsyncSequence & Sendable>(asyncSequence: T) where T.Element == Element {
makeAsyncIteratorImpl = {
AsyncIterator(asyncIterator: asyncSequence.makeAsyncIterator())
}
}

fileprivate struct AsyncIterator: AsyncIteratorProtocol {
private var nextImpl: () async -> Element?

init<T: AsyncIteratorProtocol>(asyncIterator: T) where T.Element == Element {
var iterator = asyncIterator
nextImpl = { () async -> Element? in
do {
return try await iterator.next()
} catch {
fatalError("The AsyncSequence passed to Subscription.init(mockAsyncSequence:) threw an error: \(error). This is not supported.")
}
}
}

mutating func next() async -> Element? {
await nextImpl()
}
}

func makeAsyncIterator() -> AsyncIterator {
makeAsyncIteratorImpl()
}
}

private let mode: Mode

internal init(bufferingPolicy: BufferingPolicy) {
let (stream, continuation) = AsyncStream.makeStream(of: Element.self, bufferingPolicy: bufferingPolicy.asAsyncStreamBufferingPolicy())
mode = .default(stream: stream, continuation: continuation)
}

// This is a workaround for the fact that, as mentioned above, `Subscription` is a struct when I would have liked it to be a protocol. It allows people mocking our SDK to create a `Subscription` so that they can return it from their mocks. The intention of this initializer is that if you use it, then the created `Subscription` will just replay the sequence that you pass it. It is a programmer error to pass a throwing AsyncSequence.
public init<T: AsyncSequence & Sendable>(mockAsyncSequence: T) where T.Element == Element {
mode = .mockAsyncSequence(.init(asyncSequence: mockAsyncSequence))
}

/**
Causes the subscription to make a new element available on its `AsyncSequence` interface.

It is a programmer error to call this when the receiver was created using ``init(mockAsyncSequence:)``.
*/
internal func emit(_ element: Element) {
switch mode {
case let .default(_, continuation):
continuation.yield(element)
case .mockAsyncSequence:
fatalError("`emit` cannot be called on a Subscription that was created using init(mockAsyncSequence:)")
}
}

public struct AsyncIterator: AsyncIteratorProtocol {
fileprivate enum Mode {
case `default`(iterator: AsyncStream<Element>.AsyncIterator)
case mockAsyncSequence(iterator: AnyNonThrowingAsyncSequence.AsyncIterator)

mutating func next() async -> Element? {
switch self {
case var .default(iterator: iterator):
let next = await iterator.next()
self = .default(iterator: iterator)
return next
case var .mockAsyncSequence(iterator: iterator):
let next = await iterator.next()
self = .mockAsyncSequence(iterator: iterator)
return next
}
}
}

private var mode: Mode

fileprivate init(mode: Mode) {
self.mode = mode
}

public mutating func next() async -> Element? {
fatalError("Not implemented")
await mode.next()
}
}

public func makeAsyncIterator() -> AsyncIterator {
fatalError("Not implemented")
let iteratorMode: AsyncIterator.Mode = switch mode {
case let .default(stream: stream, continuation: _):
.default(iterator: stream.makeAsyncIterator())
case let .mockAsyncSequence(asyncSequence):
.mockAsyncSequence(iterator: asyncSequence.makeAsyncIterator())
}

return .init(mode: iteratorMode)
}
}
26 changes: 26 additions & 0 deletions Tests/AblyChatTests/MessageSubscriptionTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
@testable import AblyChat
import AsyncAlgorithms
import XCTest

class SubscriptionTests: XCTestCase {
func testWithMockAsyncSequence() async {
let subscription = Subscription(mockAsyncSequence: ["First", "Second"].async)

async let emittedElements = Array(subscription.prefix(2))

let awaitedEmittedElements = await emittedElements
XCTAssertEqual(awaitedEmittedElements, ["First", "Second"])
}

func testEmit() async {
let subscription = Subscription<String>(bufferingPolicy: .unbounded)

async let emittedElements = Array(subscription.prefix(2))

subscription.emit("First")
subscription.emit("Second")

let awaitedEmittedElements = await emittedElements
XCTAssertEqual(awaitedEmittedElements, ["First", "Second"])
}
}
54 changes: 54 additions & 0 deletions Tests/AblyChatTests/SubscriptionTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
@testable import AblyChat
import AsyncAlgorithms
import XCTest

private final class MockPaginatedResult<T>: PaginatedResult {
var items: [T] { fatalError("Not implemented") }

var hasNext: Bool { fatalError("Not implemented") }

var isLast: Bool { fatalError("Not implemented") }

var next: (any AblyChat.PaginatedResult<T>)? { fatalError("Not implemented") }

var first: any AblyChat.PaginatedResult<T> { fatalError("Not implemented") }

var current: any AblyChat.PaginatedResult<T> { fatalError("Not implemented") }

init() {}
}

class MessageSubscriptionTests: XCTestCase {
let messages = ["First", "Second"].map {
Message(timeserial: "", clientID: "", roomID: "", text: $0, createdAt: .now, metadata: [:], headers: [:])
}

func testWithMockAsyncSequence() async {
let subscription = MessageSubscription(mockAsyncSequence: messages.async) { _ in fatalError("Not implemented") }

async let emittedElements = Array(subscription.prefix(2))

let awaitedEmittedElements = await emittedElements
XCTAssertEqual(awaitedEmittedElements.map(\.text), ["First", "Second"])
}

func testEmit() async {
let subscription = MessageSubscription(bufferingPolicy: .unbounded)

async let emittedElements = Array(subscription.prefix(2))

subscription.emit(messages[0])
subscription.emit(messages[1])

let awaitedEmittedElements = await emittedElements
XCTAssertEqual(awaitedEmittedElements.map(\.text), ["First", "Second"])
}

func funcTestMockGetPreviousMessages() async throws {
let mockPaginatedResult = MockPaginatedResult<Message>()
let subscription = MessageSubscription(mockAsyncSequence: [].async) { _ in mockPaginatedResult }

let result = try await subscription.getPreviousMessages(params: .init())
XCTAssertIdentical(result, mockPaginatedResult)
}
}

0 comments on commit 202f70a

Please sign in to comment.