Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Rebello <[email protected]>
  • Loading branch information
rebello95 committed Jun 6, 2024
1 parent 142a8d8 commit c308226
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 68 deletions.
19 changes: 11 additions & 8 deletions Libraries/Connect/Internal/Streaming/ClientOnlyAsyncStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,21 @@ import Foundation
final class ClientOnlyAsyncStream<
Input: ProtobufMessage, Output: ProtobufMessage
>: BidirectionalAsyncStream<Input, Output> {
private let receivedMessageCount = Locked(0)
private let receivedResults = Locked([StreamResult<Output>]())

override func handleResultFromServer(_ result: StreamResult<Output>) {
let receivedMessageCount = self.receivedMessageCount.perform { value in
if case .message = result {
value += 1
let (isComplete, results) = self.receivedResults.perform { results in
results.append(result)
if case .complete = result {
return (true, ClientOnlyStreamValidation.validatedFinalClientStreamResults(results))
} else {
return (false, [])
}
return value
}
super.handleResultFromServer(
result.validatedForClientStream(receivedMessageCount: receivedMessageCount)
)
guard isComplete else {
return
}
results.forEach(super.handleResultFromServer)
}
}

Expand Down
17 changes: 11 additions & 6 deletions Libraries/Connect/Internal/Streaming/ClientOnlyStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import SwiftProtobuf
/// supporting both callbacks and async/await. This is internal to the package, and not public.
final class ClientOnlyStream<Input: ProtobufMessage, Output: ProtobufMessage>: @unchecked Sendable {
private let onResult: @Sendable (StreamResult<Output>) -> Void
private let receivedMessageCount = Locked(0)
private let receivedResults = Locked([StreamResult<Output>]())
/// Callbacks used to send outbound data and close the stream.
/// Optional because these callbacks are not available until the stream is initialized.
private var requestCallbacks: RequestCallbacks<Input>?
Expand Down Expand Up @@ -49,13 +49,18 @@ final class ClientOnlyStream<Input: ProtobufMessage, Output: ProtobufMessage>: @
///
/// - parameter result: The new result that was received.
func handleResultFromServer(_ result: StreamResult<Output>) {
let receivedMessageCount = self.receivedMessageCount.perform { value in
if case .message = result {
value += 1
let (isComplete, results) = self.receivedResults.perform { results in
results.append(result)
if case .complete = result {
return (true, ClientOnlyStreamValidation.validatedFinalClientStreamResults(results))
} else {
return (false, [])
}
return value
}
self.onResult(result.validatedForClientStream(receivedMessageCount: receivedMessageCount))
guard isComplete else {
return
}
results.forEach(self.onResult)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2022-2024 The Connect Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import Foundation

/// Namespace for performing client-only stream validation.
enum ClientOnlyStreamValidation {
/// Applies some validations which are only relevant for client-only streams.
///
/// Should be called after all values have been received over a client stream. Since client
/// streams only expect 1 result, all values returned from the server should be buffered before
/// being validated here and returned to the caller.
///
/// - parameter results: The buffered list of results to validate.
///
/// - returns: The list of stream results which should be returned to the caller.
static func validatedFinalClientStreamResults<T>(
_ results: [StreamResult<T>]
) -> [StreamResult<T>] {
var messageCount = 0
for result in results {
switch result {
case .headers:
continue
case .message:
messageCount += 1
case .complete(let code, _, _):
if code != .ok {
return results
}
}
}

if messageCount < 1 {
return [
.complete(
code: .internalError, error: ConnectError(
code: .unimplemented, message: "unary stream has no messages"
), trailers: nil
),
]
} else if messageCount > 1 {
return [
.complete(
code: .internalError, error: ConnectError(
code: .unimplemented, message: "unary stream has multiple messages"
), trailers: nil
),
]
} else {
return results
}
}
}

This file was deleted.

0 comments on commit c308226

Please sign in to comment.