Skip to content

Commit

Permalink
EventStreams: rename "terminate" to "while"
Browse files Browse the repository at this point in the history
Use `predicate` when referring to `while` internally, use `while` in the
public API

Invert the value of the return closure.
  • Loading branch information
paulhdk committed Sep 20, 2024
1 parent 59bf51e commit 1487eda
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 29 deletions.
44 changes: 22 additions & 22 deletions Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ where Upstream.Element == ArraySlice<UInt8> {

/// An optional closure that determines whether the given byte sequence is the terminating byte sequence defined by the API.
/// - Parameter: A byte chunk.
/// - Returns: `True` if the given byte sequence is the terminating byte sequence defined by the API.
private let terminate: (@Sendable (ArraySlice<UInt8>) -> Bool)?
/// - Returns: `True` until the terminating byte sequence is received.
private let predicate: (@Sendable (ArraySlice<UInt8>) -> Bool)?

/// Creates a new sequence.
/// - Parameters:
/// - upstream: The upstream sequence of arbitrary byte chunks.
/// - terminate: An optional closure that determines whether the given byte sequence is the terminating byte sequence defined by the API.
public init(upstream: Upstream, terminate: (@Sendable (ArraySlice<UInt8>) -> Bool)?) {
/// - while: An optional closure that determines whether the given byte sequence is the terminating byte sequence defined by the API.
public init(upstream: Upstream, while predicate: (@Sendable (ArraySlice<UInt8>) -> Bool)?) {
self.upstream = upstream
self.terminate = terminate
self.predicate = predicate
}
}

Expand All @@ -60,13 +60,13 @@ extension ServerSentEventsDeserializationSequence: AsyncSequence {

/// An optional closure that determines whether the given byte sequence is the terminating byte sequence defined by the API.
/// - Parameter: A byte chunk.
/// - Returns: `True` if the given byte sequence is the terminating byte sequence defined by the API.
let terminate: ((ArraySlice<UInt8>) -> Bool)?
/// - Returns: `True` until the terminating byte sequence is received.
let predicate: ((ArraySlice<UInt8>) -> Bool)?

init(upstream: any AsyncIteratorProtocol, terminate: ((ArraySlice<UInt8>) -> Bool)?) {
init(upstream: any AsyncIteratorProtocol, while predicate: ((ArraySlice<UInt8>) -> Bool)?) {
self.upstream = upstream as! UpstreamIterator
self.stateMachine = .init(terminate: terminate)
self.terminate = terminate
self.stateMachine = .init(while: predicate)
self.predicate = predicate
}

/// Asynchronously advances to the next element and returns it, or ends the
Expand All @@ -91,7 +91,7 @@ extension ServerSentEventsDeserializationSequence: AsyncSequence {
/// Creates the asynchronous iterator that produces elements of this
/// asynchronous sequence.
public func makeAsyncIterator() -> Iterator<Upstream.AsyncIterator> {
Iterator(upstream: upstream.makeAsyncIterator(), terminate: terminate)
Iterator(upstream: upstream.makeAsyncIterator(), while: predicate)
}
}

Expand All @@ -102,27 +102,27 @@ extension AsyncSequence where Element == ArraySlice<UInt8>, Self: Sendable {
/// Use this method if the event's `data` field is not JSON, or if you don't want to parse it using `asDecodedServerSentEventsWithJSONData`.
/// - Parameter: An optional closure that determines whether the given byte sequence is the terminating byte sequence defined by the API.
/// - Returns: A sequence that provides the events.
public func asDecodedServerSentEvents(terminate: (@Sendable (ArraySlice<UInt8>) -> Bool)? = nil) -> ServerSentEventsDeserializationSequence<
public func asDecodedServerSentEvents(while predicate: (@Sendable (ArraySlice<UInt8>) -> Bool)? = nil) -> ServerSentEventsDeserializationSequence<
ServerSentEventsLineDeserializationSequence<Self>
> { .init(upstream: ServerSentEventsLineDeserializationSequence(upstream: self), terminate: terminate) }
> { .init(upstream: ServerSentEventsLineDeserializationSequence(upstream: self), while: predicate) }

/// Returns another sequence that decodes each event's data as the provided type using the provided decoder.
///
/// Use this method if the event's `data` field is JSON.
/// - Parameters:
/// - dataType: The type to decode the JSON data into.
/// - decoder: The JSON decoder to use.
/// - terminate: An optional closure that determines whether the given byte sequence is the terminating byte sequence defined by the API.
/// - while: An optional closure that determines whether the given byte sequence is the terminating byte sequence defined by the API.
/// - Returns: A sequence that provides the events with the decoded JSON data.
public func asDecodedServerSentEventsWithJSONData<JSONDataType: Decodable>(
of dataType: JSONDataType.Type = JSONDataType.self,
decoder: JSONDecoder = .init(),
terminate: (@Sendable (ArraySlice<UInt8>) -> Bool)? = nil
while predicate: (@Sendable (ArraySlice<UInt8>) -> Bool)? = nil
) -> AsyncThrowingMapSequence<
ServerSentEventsDeserializationSequence<ServerSentEventsLineDeserializationSequence<Self>>,
ServerSentEventWithJSONData<JSONDataType>
> {
asDecodedServerSentEvents(terminate: terminate)
asDecodedServerSentEvents(while: predicate)
.map { event in
ServerSentEventWithJSONData(
event: event.event,
Expand Down Expand Up @@ -160,13 +160,13 @@ extension ServerSentEventsDeserializationSequence.Iterator {

/// An optional closure that determines whether the given byte sequence is the terminating byte sequence defined by the API.
/// - Parameter: A sequence of byte chunks.
/// - Returns: `True` if the given byte sequence is the terminating byte sequence defined by the API.
let terminate: ((ArraySlice<UInt8>) -> Bool)?
/// - Returns: `True` until the terminating byte sequence is received.
let predicate: ((ArraySlice<UInt8>) -> Bool)?

/// Creates a new state machine.
init(terminate: ((ArraySlice<UInt8>) -> Bool)? = nil) {
init(while predicate: ((ArraySlice<UInt8>) -> Bool)? = nil) {
self.state = .accumulatingEvent(.init(), buffer: [])
self.terminate = terminate}
self.predicate = predicate}

/// An action returned by the `next` method.
enum NextAction {
Expand Down Expand Up @@ -198,9 +198,9 @@ extension ServerSentEventsDeserializationSequence.Iterator {
// If the last character of data is a newline, strip it.
if event.data?.hasSuffix("\n") ?? false { event.data?.removeLast() }

if let terminate = terminate {
if let predicate = predicate {
if let data = event.data {
if terminate(ArraySlice(Data(data.utf8))) {
if !predicate(ArraySlice(Data(data.utf8))) {
return .returnNil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import XCTest
import Foundation

final class Test_ServerSentEventsDecoding: Test_Runtime {
func _test(input: String, output: [ServerSentEvent], file: StaticString = #filePath, line: UInt = #line, terminate: ((ArraySlice<UInt8>) -> Bool)? = nil, eventCountOffset: Int = 0)
func _test(input: String, output: [ServerSentEvent], file: StaticString = #filePath, line: UInt = #line, while predicate: ((ArraySlice<UInt8>) -> Bool)? = nil, eventCountOffset: Int = 0)
async throws
{
let sequence = asOneBytePerElementSequence(ArraySlice(input.utf8)).asDecodedServerSentEvents()
Expand Down Expand Up @@ -101,8 +101,8 @@ final class Test_ServerSentEventsDecoding: Test_Runtime {
output: [
.init(data: "hello\nworld")
],
terminate: { incomingData in
incomingData == ArraySlice<UInt8>(Data("[DONE]".utf8))
while: { incomingData in
incomingData != ArraySlice<UInt8>(Data("[DONE]".utf8))
},
eventCountOffset: -2
)
Expand All @@ -112,10 +112,10 @@ final class Test_ServerSentEventsDecoding: Test_Runtime {
output: [ServerSentEventWithJSONData<JSONType>],
file: StaticString = #filePath,
line: UInt = #line,
terminate: (@Sendable (ArraySlice<UInt8>) -> Bool)? = nil
while predicate: (@Sendable (ArraySlice<UInt8>) -> Bool)? = nil
) async throws {
let sequence = asOneBytePerElementSequence(ArraySlice(input.utf8))
.asDecodedServerSentEventsWithJSONData(of: JSONType.self, terminate: terminate)
.asDecodedServerSentEventsWithJSONData(of: JSONType.self, while: predicate)
let events = try await [ServerSentEventWithJSONData<JSONType>](collecting: sequence)
XCTAssertEqual(events.count, output.count, file: file, line: line)
for (index, linePair) in zip(events, output).enumerated() {
Expand Down Expand Up @@ -171,8 +171,8 @@ final class Test_ServerSentEventsDecoding: Test_Runtime {
.init(event: "event1", data: TestEvent(index: 1), id: "1"),
.init(event: "event2", data: TestEvent(index: 2), id: "2"),
],
terminate: { incomingData in
incomingData == ArraySlice<UInt8>(Data("[DONE]".utf8))
while: { incomingData in
incomingData != ArraySlice<UInt8>(Data("[DONE]".utf8))
}
)
}
Expand Down

0 comments on commit 1487eda

Please sign in to comment.