Skip to content

Commit

Permalink
Implement the Subscription type
Browse files Browse the repository at this point in the history
Note that XCTAssertEqual doesn’t allow you to write `await` inside its
arguments, hence the indirection to get the result of a couple of `async
let`s. Hopefully we’ll be able to migrate to Swift Testing at some
point, which will resolve this; see #21.
  • Loading branch information
lawrence-forooghian committed Aug 22, 2024
1 parent d33d665 commit 6e82605
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 8 deletions.
4 changes: 4 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ let package = Package(
name: "AblyChatTests",
dependencies: [
"AblyChat",
.product(
name: "AsyncAlgorithms",
package: "swift-async-algorithms"
),
]
),
.executableTarget(
Expand Down
4 changes: 4 additions & 0 deletions [email protected]
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ let package = Package(
name: "AblyChatTests",
dependencies: [
"AblyChat",
.product(
name: "AsyncAlgorithms",
package: "swift-async-algorithms"
),
]
),
.executableTarget(
Expand Down
11 changes: 11 additions & 0 deletions Sources/AblyChat/BufferingPolicy.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,15 @@ public enum BufferingPolicy: Sendable {
case unbounded
case bufferingOldest(Int)
case bufferingNewest(Int)

internal func asAsyncStreamBufferingPolicy<T>() -> AsyncStream<T>.Continuation.BufferingPolicy {
switch self {
case let .bufferingNewest(count):
.bufferingNewest(count)
case let .bufferingOldest(count):
.bufferingOldest(count)
case .unbounded:
.unbounded
}
}
}
106 changes: 98 additions & 8 deletions Sources/AblyChat/Subscription.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,112 @@
//
// At some point we should define how this thing behaves when you iterate over it from multiple loops, or when you pass it around. I’m not yet sufficiently experienced with `AsyncSequence` to know what’s idiomatic. I tried the same thing out with `AsyncStream` (two tasks iterating over a single stream) and it appears that each element is delivered to precisely one consumer. But we can leave that for later. On a similar note consider whether it makes a difference whether this is a struct or a class.
//
// TODO: I wanted to implement this as a protocol (from which `MessageSubscription` would then inherit) but struggled to do so, hence the struct. Try again sometime. We can also revisit our implementation of `AsyncSequence` if we migrate to Swift 6, which adds primary types and typed errors to `AsyncSequence` and should make things easier.
public struct Subscription<Element>: Sendable, AsyncSequence {
// This is a workaround for the fact that, as mentioned above, `Subscription` is a struct when I would have liked it to be a protocol. It allows people mocking our SDK to create a `Subscription` so that they can return it from their mocks. The intention of this initializer is that if you use it, then the created `Subscription` will just replay the sequence that you pass it.
public init<T: AsyncSequence>(mockAsyncSequence _: T) where T.Element == Element {
fatalError("Not implemented")
// TODO: I wanted to implement this as a protocol (from which `MessageSubscription` would then inherit) but struggled to do so, hence the struct. Try again sometime. We can also revisit our implementation of `AsyncSequence` if we migrate to Swift 6, which adds primary types and typed errors to `AsyncSequence` and should make things easier; see https://github.com/ably-labs/ably-chat-swift/issues/21.
public struct Subscription<Element: Sendable>: Sendable, AsyncSequence {
private enum Mode: Sendable {
case `default`(stream: AsyncStream<Element>, continuation: AsyncStream<Element>.Continuation)
case mockAsyncSequence(TypeErasedNonThrowingAsyncSequence)
}

// (The below is just necessary boilerplate to get this to compile; the key point is that `next()` does not have a `throws` annotation.)
fileprivate struct TypeErasedNonThrowingAsyncSequence: AsyncSequence, Sendable {
private var makeAsyncIteratorImpl: @Sendable () -> AsyncIterator

init<T: AsyncSequence & Sendable>(asyncSequence: T) where T.Element == Element {
makeAsyncIteratorImpl = {
AsyncIterator(asyncIterator: asyncSequence.makeAsyncIterator())
}
}

fileprivate struct AsyncIterator: AsyncIteratorProtocol {
private var nextImpl: () async -> Element?

init<T: AsyncIteratorProtocol>(asyncIterator: T) where T.Element == Element {
var iterator = asyncIterator
nextImpl = { () async -> Element? in
let element: Element?
do {
return try await iterator.next()
} catch {
fatalError("The AsyncSequence passed to Subscription.init(mockAsyncSequence:) threw an error: \(error). This is not supported.")
}
return element
}
}

mutating func next() async -> Element? {
await nextImpl()
}
}

func makeAsyncIterator() -> AsyncIterator {
makeAsyncIteratorImpl()
}
}

private let mode: Mode

internal init(bufferingPolicy: BufferingPolicy) {
let (stream, continuation) = AsyncStream.makeStream(of: Element.self, bufferingPolicy: bufferingPolicy.asAsyncStreamBufferingPolicy())
mode = .default(stream: stream, continuation: continuation)
}

// This is a workaround for the fact that, as mentioned above, `Subscription` is a struct when I would have liked it to be a protocol. It allows people mocking our SDK to create a `Subscription` so that they can return it from their mocks. The intention of this initializer is that if you use it, then the created `Subscription` will just replay the sequence that you pass it. It is a programmer error to pass a throwing AsyncSequence.
public init<T: AsyncSequence & Sendable>(mockAsyncSequence: T) where T.Element == Element {
mode = .mockAsyncSequence(.init(asyncSequence: mockAsyncSequence))
}

/**
Causes the subscription to make a new element available on its `AsyncSequence` interface.
It is a programmer error to call this when the receiver was created using ``init(mockAsyncSequence:)``.
*/
internal func emit(_ element: Element) {
switch mode {
case let .default(_, continuation):
continuation.yield(element)
case .mockAsyncSequence:
fatalError("`emit` cannot be called on a Subscription that was created using init(mockAsyncSequence:)")
}
}

public struct AsyncIterator: AsyncIteratorProtocol {
fileprivate enum Mode {
case `default`(iterator: AsyncStream<Element>.AsyncIterator)
case mockAsyncSequence(iterator: TypeErasedNonThrowingAsyncSequence.AsyncIterator)

mutating func next() async -> Element? {
switch self {
case var .default(iterator: iterator):
let next = await iterator.next()
self = .default(iterator: iterator)
return next
case var .mockAsyncSequence(iterator: iterator):
let next = await iterator.next()
self = .mockAsyncSequence(iterator: iterator)
return next
}
}
}

private var mode: Mode

fileprivate init(mode: Mode) {
self.mode = mode
}

public mutating func next() async -> Element? {
fatalError("Not implemented")
await mode.next()
}
}

public func makeAsyncIterator() -> AsyncIterator {
fatalError("Not implemented")
let iteratorMode: AsyncIterator.Mode = switch mode {
case let .default(stream: stream, continuation: _):
.default(iterator: stream.makeAsyncIterator())
case let .mockAsyncSequence(asyncSequence):
.mockAsyncSequence(iterator: asyncSequence.makeAsyncIterator())
}

return .init(mode: iteratorMode)
}
}
26 changes: 26 additions & 0 deletions Tests/AblyChatTests/SubscriptionTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
@testable import AblyChat
import AsyncAlgorithms
import XCTest

class SubscriptionTests: XCTestCase {
func testWithMockAsyncSequence() async {
let subscription = Subscription(mockAsyncSequence: ["First", "Second"].async)

async let emittedElements = Array(subscription.prefix(2))

let awaitedEmittedElements = await emittedElements
XCTAssertEqual(awaitedEmittedElements, ["First", "Second"])
}

func testEmit() async {
let subscription = Subscription<String>(bufferingPolicy: .unbounded)

async let emittedElements = Array(subscription.prefix(2))

subscription.emit("First")
subscription.emit("Second")

let awaitedEmittedElements = await emittedElements
XCTAssertEqual(awaitedEmittedElements, ["First", "Second"])
}
}

0 comments on commit 6e82605

Please sign in to comment.