diff --git a/Examples/ElizaCocoaPodsApp/Podfile.lock b/Examples/ElizaCocoaPodsApp/Podfile.lock index 727fcb34..51daf1d2 100644 --- a/Examples/ElizaCocoaPodsApp/Podfile.lock +++ b/Examples/ElizaCocoaPodsApp/Podfile.lock @@ -20,4 +20,4 @@ SPEC CHECKSUMS: PODFILE CHECKSUM: b598f373a6ab5add976b09c2ac79029bf2200d48 -COCOAPODS: 1.13.0 +COCOAPODS: 1.15.2 diff --git a/Libraries/Connect/Internal/Interceptors/ConnectInterceptor.swift b/Libraries/Connect/Internal/Interceptors/ConnectInterceptor.swift index 6d06a41e..eacb7224 100644 --- a/Libraries/Connect/Internal/Interceptors/ConnectInterceptor.swift +++ b/Libraries/Connect/Internal/Interceptors/ConnectInterceptor.swift @@ -83,28 +83,51 @@ extension ConnectInterceptor: UnaryInterceptor { ] = current.value }) - if let encoding = response.headers[HeaderConstants.contentEncoding]?.first, - let compressionPool = self.config.responseCompressionPool(forName: encoding), - let message = response.message.flatMap({ try? compressionPool.decompress(data: $0) }) + let finalResponse: HTTPResponse + let contentType = response.headers[HeaderConstants.contentType]?.first ?? "" + if response.code == .ok && !contentType.hasPrefix("application/\(self.config.codec.name())") { - proceed(HTTPResponse( - code: response.code, - headers: headers, - message: message, - trailers: trailers, - error: response.error, + // If content-type looks like it could be an RPC server's response, consider + // this an internal error. + let code: Code = contentType.hasPrefix("application/") ? .internalError : .unknown + finalResponse = HTTPResponse( + code: code, headers: headers, message: nil, trailers: trailers, + error: ConnectError(code: code, message: "unexpected content-type: \(contentType)"), tracingInfo: response.tracingInfo - )) + ) + } else if let encoding = response.headers[HeaderConstants.contentEncoding]?.first { + if let compressionPool = self.config.responseCompressionPool(forName: encoding), + let message = response.message.flatMap({ try? compressionPool.decompress(data: $0) }) + { + finalResponse = HTTPResponse( + code: response.code, + headers: headers, + message: message, + trailers: trailers, + error: response.error, + tracingInfo: response.tracingInfo + ) + } else { + finalResponse = HTTPResponse( + code: .internalError, + headers: headers, + message: nil, + trailers: trailers, + error: ConnectError(code: .internalError, message: "unexpected encoding"), + tracingInfo: response.tracingInfo + ) + } } else { - proceed(HTTPResponse( + finalResponse = HTTPResponse( code: response.code, headers: headers, message: response.message, trailers: trailers, error: response.error, tracingInfo: response.tracingInfo - )) + ) } + proceed(finalResponse) } } @@ -146,13 +169,36 @@ extension ConnectInterceptor: StreamInterceptor { switch result { case .headers(let headers): self.streamResponseHeaders.value = headers - proceed(result) + let contentType = headers[HeaderConstants.contentType]?.first ?? "" + if contentType != "application/connect+\(self.config.codec.name())" { + // If content-type looks like it could be an RPC server's response, consider + // this an internal error. + let code: Code = contentType.hasPrefix("application/connect+") + ? .internalError + : .unknown + proceed(.complete( + code: code, error: ConnectError( + code: code, message: "unexpected content-type: \(contentType)" + ), trailers: nil + )) + } else { + proceed(result) + } case .message(let data): do { let responseCompressionPool = self.streamResponseHeaders.value?[ HeaderConstants.connectStreamingContentEncoding ]?.first.flatMap { self.config.responseCompressionPool(forName: $0) } + if responseCompressionPool == nil && Envelope.isCompressed(data) { + proceed(.complete( + code: .internalError, error: ConnectError( + code: .internalError, message: "received unexpected compressed message" + ), trailers: nil + )) + return + } + let (headerByte, message) = try Envelope.unpackMessage( data, compressionPool: responseCompressionPool ) diff --git a/Libraries/Connect/Internal/Interceptors/GRPCWebInterceptor.swift b/Libraries/Connect/Internal/Interceptors/GRPCWebInterceptor.swift index 63f09381..ada8671a 100644 --- a/Libraries/Connect/Internal/Interceptors/GRPCWebInterceptor.swift +++ b/Libraries/Connect/Internal/Interceptors/GRPCWebInterceptor.swift @@ -61,12 +61,39 @@ extension GRPCWebInterceptor: UnaryInterceptor { response.headers, trailers: response.trailers ) + if grpcCode != .ok || connectError != nil { + proceed(HTTPResponse( + // Rewrite the gRPC code if it is "ok" but `connectError` is non-nil. + code: grpcCode == .ok ? .unknown : grpcCode, + headers: response.headers, + message: response.message, + trailers: response.trailers, + error: connectError, + tracingInfo: response.tracingInfo + )) + } else { + proceed(HTTPResponse( + code: .unimplemented, + headers: response.headers, + message: response.message, + trailers: response.trailers, + error: ConnectError( + code: .unimplemented, message: "unary response has no message" + ), + tracingInfo: response.tracingInfo + )) + } + return + } + + let contentType = response.headers[HeaderConstants.contentType]?.first ?? "" + if response.code == .ok && !self.contentTypeIsExpectedGRPCWeb(contentType) { + // If content-type looks like it could be a gRPC server's response, consider + // this an internal error. + let code: Code = self.contentTypeIsGRPCWeb(contentType) ? .internalError : .unknown proceed(HTTPResponse( - code: grpcCode, - headers: response.headers, - message: response.message, - trailers: response.trailers, - error: connectError, + code: code, headers: response.headers, message: nil, trailers: response.trailers, + error: ConnectError(code: code, message: "unexpected content-type: \(contentType)"), tracingInfo: response.tracingInfo )) return @@ -75,6 +102,18 @@ extension GRPCWebInterceptor: UnaryInterceptor { let compressionPool = response.headers[HeaderConstants.grpcContentEncoding]? .first .flatMap { self.config.responseCompressionPool(forName: $0) } + if compressionPool == nil && Envelope.isCompressed(responseData) { + proceed(HTTPResponse( + code: .internalError, headers: response.headers, message: nil, + trailers: response.trailers, + error: ConnectError( + code: .internalError, message: "received unexpected compressed message" + ), + tracingInfo: response.tracingInfo + )) + return + } + do { // gRPC Web returns data in 2 chunks (either/both of which may be compressed): // 1. OPTIONAL (when not trailers-only): The (headers and length prefixed) @@ -107,7 +146,7 @@ extension GRPCWebInterceptor: UnaryInterceptor { } } catch let error { proceed(HTTPResponse( - code: .unknown, + code: .unimplemented, headers: response.headers, message: response.message, trailers: response.trailers, @@ -146,6 +185,19 @@ extension GRPCWebInterceptor: StreamInterceptor { ) { switch result { case .headers(let headers): + let contentType = headers[HeaderConstants.contentType]?.first ?? "" + if !self.contentTypeIsExpectedGRPCWeb(contentType) { + // If content-type looks like it could be a gRPC server's response, consider + // this an internal error. + let code: Code = self.contentTypeIsGRPCWeb(contentType) ? .internalError : .unknown + proceed(.complete( + code: code, error: ConnectError( + code: code, message: "unexpected content-type: \(contentType)" + ), trailers: headers + )) + return + } + if let grpcCode = headers.grpcStatus() { // Headers-only response. proceed(.complete( @@ -193,9 +245,20 @@ extension GRPCWebInterceptor: StreamInterceptor { proceed(result) } } -} -// MARK: - Private + // MARK: - Private + + private func contentTypeIsGRPCWeb(_ contentType: String) -> Bool { + return contentType == "application/grpc-web" + || contentType.hasPrefix("application/grpc-web+") + } + + private func contentTypeIsExpectedGRPCWeb(_ contentType: String) -> Bool { + let codecName = self.config.codec.name() + return (codecName == "proto" && contentType == "application/grpc-web") + || contentType == "application/grpc-web+\(codecName)" + } +} private struct TrailersDecodingError: Error {} @@ -228,13 +291,23 @@ private extension Trailers { private extension HTTPResponse { func withHandledGRPCWebTrailers(_ trailers: Trailers, message: Data?) -> Self { let (grpcCode, error) = ConnectError.parseGRPCHeaders(self.headers, trailers: trailers) - if grpcCode == .ok { + if grpcCode != .ok || error != nil { return HTTPResponse( - code: grpcCode, + // Rewrite the gRPC code if it is "ok" but `connectError` is non-nil. + code: grpcCode == .ok ? .unknown : grpcCode, headers: self.headers, - message: message, + message: nil, trailers: trailers, - error: nil, + error: error, + tracingInfo: self.tracingInfo + ) + } else if message?.isEmpty != false { + return HTTPResponse( + code: .unimplemented, + headers: self.headers, + message: nil, + trailers: trailers, + error: ConnectError(code: .unimplemented, message: "unary response has no message"), tracingInfo: self.tracingInfo ) } else { @@ -243,7 +316,7 @@ private extension HTTPResponse { headers: self.headers, message: message, trailers: trailers, - error: error, + error: nil, tracingInfo: self.tracingInfo ) } diff --git a/Libraries/Connect/Internal/Streaming/BidirectionalAsyncStream.swift b/Libraries/Connect/Internal/Streaming/BidirectionalAsyncStream.swift index 565fc4b8..f0f2d53c 100644 --- a/Libraries/Connect/Internal/Streaming/BidirectionalAsyncStream.swift +++ b/Libraries/Connect/Internal/Streaming/BidirectionalAsyncStream.swift @@ -14,11 +14,14 @@ import SwiftProtobuf -/// Concrete implementation of `BidirectionalAsyncStreamInterface`. +/// Concrete **internal** implementation of `BidirectionalAsyncStreamInterface`. /// Provides the necessary wiring to bridge from closures/callbacks to Swift's `AsyncStream` /// to work with async/await. +/// +/// If the library removes callback support in favor of only supporting async/await in the future, +/// this class can be simplified. @available(iOS 13, *) -final class BidirectionalAsyncStream< +class BidirectionalAsyncStream< Input: ProtobufMessage, Output: ProtobufMessage >: @unchecked Sendable { /// The underlying async stream that will be exposed to the consumer. @@ -71,10 +74,10 @@ final class BidirectionalAsyncStream< } /// Send a result to the consumer over the `results()` `AsyncStream`. - /// Should be called by the protocol client when a result is received. + /// Should be called by the protocol client when a result is received from the network. /// /// - parameter result: The new result that was received. - func receive(_ result: StreamResult) { + func handleResultFromServer(_ result: StreamResult) { self.receiveResult(result) } } @@ -103,7 +106,3 @@ extension BidirectionalAsyncStream: BidirectionalAsyncStreamInterface { self.requestCallbacks?.cancel() } } - -// Conforms to the client-only interface since it matches exactly and the implementation is internal -@available(iOS 13, *) -extension BidirectionalAsyncStream: ClientOnlyAsyncStreamInterface {} diff --git a/Libraries/Connect/Internal/Streaming/BidirectionalStream.swift b/Libraries/Connect/Internal/Streaming/BidirectionalStream.swift index 95195b72..1bf1aa6f 100644 --- a/Libraries/Connect/Internal/Streaming/BidirectionalStream.swift +++ b/Libraries/Connect/Internal/Streaming/BidirectionalStream.swift @@ -14,7 +14,7 @@ import SwiftProtobuf -/// Concrete implementation of `BidirectionalStreamInterface`. +/// Concrete **internal** implementation of `BidirectionalStreamInterface`. final class BidirectionalStream: Sendable { private let requestCallbacks: RequestCallbacks @@ -40,6 +40,3 @@ extension BidirectionalStream: BidirectionalStreamInterface { self.requestCallbacks.cancel() } } - -// Conforms to the client-only interface since it matches exactly and the implementation is internal -extension BidirectionalStream: ClientOnlyStreamInterface {} diff --git a/Libraries/Connect/Internal/Streaming/ClientOnlyAsyncStream.swift b/Libraries/Connect/Internal/Streaming/ClientOnlyAsyncStream.swift new file mode 100644 index 00000000..07e18dac --- /dev/null +++ b/Libraries/Connect/Internal/Streaming/ClientOnlyAsyncStream.swift @@ -0,0 +1,50 @@ +// Copyright 2022-2024 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import Foundation + +/// Concrete **internal** implementation of `ClientOnlyAsyncStreamInterface`. +/// Provides the necessary wiring to bridge from closures/callbacks to Swift's `AsyncStream` +/// to work with async/await. +/// +/// This subclasses `BidirectionalAsyncStream` since its behavior is purely additive (it overlays +/// some additional validation) and both types are internal to the package, not public. +@available(iOS 13, *) +final class ClientOnlyAsyncStream< + Input: ProtobufMessage, Output: ProtobufMessage +>: BidirectionalAsyncStream { + private let receivedResults = Locked([StreamResult]()) + + override func handleResultFromServer(_ result: StreamResult) { + let (isComplete, results) = self.receivedResults.perform { results in + results.append(result) + if case .complete = result { + return (true, ClientOnlyStreamValidation.validatedFinalClientStreamResults(results)) + } else { + return (false, []) + } + } + guard isComplete else { + return + } + results.forEach(super.handleResultFromServer) + } +} + +@available(iOS 13, *) +extension ClientOnlyAsyncStream: ClientOnlyAsyncStreamInterface { + func closeAndReceive() { + self.close() + } +} diff --git a/Libraries/Connect/Internal/Streaming/ClientOnlyStream.swift b/Libraries/Connect/Internal/Streaming/ClientOnlyStream.swift new file mode 100644 index 00000000..85dec6a1 --- /dev/null +++ b/Libraries/Connect/Internal/Streaming/ClientOnlyStream.swift @@ -0,0 +1,85 @@ +// Copyright 2022-2024 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import SwiftProtobuf + +/// Concrete **internal** implementation of `ClientOnlyStreamInterface`. +/// +/// The complexity around configuring callbacks on this type is an artifact of the library +/// supporting both callbacks and async/await. This is internal to the package, and not public. +final class ClientOnlyStream: @unchecked Sendable { + private let onResult: @Sendable (StreamResult) -> Void + private let receivedResults = Locked([StreamResult]()) + /// Callbacks used to send outbound data and close the stream. + /// Optional because these callbacks are not available until the stream is initialized. + private var requestCallbacks: RequestCallbacks? + + private struct NotConfiguredForSendingError: Swift.Error {} + + init(onResult: @escaping @Sendable (StreamResult) -> Void) { + self.onResult = onResult + } + + /// Enable sending data over this stream by providing a set of request callbacks to route data + /// to the network client. Must be called before calling `send()`. + /// + /// - parameter requestCallbacks: Callbacks to use for sending request data and closing the + /// stream. + /// + /// - returns: This instance of the stream (useful for chaining). + @discardableResult + func configureForSending(with requestCallbacks: RequestCallbacks) -> Self { + self.requestCallbacks = requestCallbacks + return self + } + + /// Send a result to the consumer after doing additional validations for client-only streams. + /// Should be called by the protocol client when a result is received from the network. + /// + /// - parameter result: The new result that was received. + func handleResultFromServer(_ result: StreamResult) { + let (isComplete, results) = self.receivedResults.perform { results in + results.append(result) + if case .complete = result { + return (true, ClientOnlyStreamValidation.validatedFinalClientStreamResults(results)) + } else { + return (false, []) + } + } + guard isComplete else { + return + } + results.forEach(self.onResult) + } +} + +extension ClientOnlyStream: ClientOnlyStreamInterface { + @discardableResult + func send(_ input: Input) throws -> Self { + guard let sendData = self.requestCallbacks?.sendData else { + throw NotConfiguredForSendingError() + } + + sendData(input) + return self + } + + func closeAndReceive() { + self.requestCallbacks?.sendClose() + } + + func cancel() { + self.requestCallbacks?.cancel() + } +} diff --git a/Libraries/Connect/Internal/Streaming/ClientOnlyStreamValidation.swift b/Libraries/Connect/Internal/Streaming/ClientOnlyStreamValidation.swift new file mode 100644 index 00000000..101ea08a --- /dev/null +++ b/Libraries/Connect/Internal/Streaming/ClientOnlyStreamValidation.swift @@ -0,0 +1,65 @@ +// Copyright 2022-2024 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import Foundation + +/// Namespace for performing client-only stream validation. +enum ClientOnlyStreamValidation { + /// Applies some validations which are only relevant for client-only streams. + /// + /// Should be called after all values have been received over a client stream. Since client + /// streams only expect 1 result, all values returned from the server should be buffered before + /// being validated here and returned to the caller. + /// + /// - parameter results: The buffered list of results to validate. + /// + /// - returns: The list of stream results which should be returned to the caller. + static func validatedFinalClientStreamResults( + _ results: [StreamResult] + ) -> [StreamResult] { + var messageCount = 0 + for result in results { + switch result { + case .headers: + continue + case .message: + messageCount += 1 + case .complete(let code, _, _): + if code != .ok { + return results + } + } + } + + if messageCount < 1 { + return [ + .complete( + code: .internalError, error: ConnectError( + code: .unimplemented, message: "unary stream has no messages" + ), trailers: nil + ), + ] + } else if messageCount > 1 { + return [ + .complete( + code: .internalError, error: ConnectError( + code: .unimplemented, message: "unary stream has multiple messages" + ), trailers: nil + ), + ] + } else { + return results + } + } +} diff --git a/Libraries/Connect/Internal/Streaming/ServerOnlyAsyncStream.swift b/Libraries/Connect/Internal/Streaming/ServerOnlyAsyncStream.swift index ced2cf83..cf223296 100644 --- a/Libraries/Connect/Internal/Streaming/ServerOnlyAsyncStream.swift +++ b/Libraries/Connect/Internal/Streaming/ServerOnlyAsyncStream.swift @@ -14,7 +14,7 @@ import SwiftProtobuf -/// Concrete implementation of `ServerOnlyAsyncStreamInterface`. +/// Concrete **internal** implementation of `ServerOnlyAsyncStreamInterface`. @available(iOS 13, *) final class ServerOnlyAsyncStream: Sendable { private let bidirectionalStream: BidirectionalAsyncStream diff --git a/Libraries/Connect/Internal/Streaming/ServerOnlyStream.swift b/Libraries/Connect/Internal/Streaming/ServerOnlyStream.swift index c7128556..d70e1778 100644 --- a/Libraries/Connect/Internal/Streaming/ServerOnlyStream.swift +++ b/Libraries/Connect/Internal/Streaming/ServerOnlyStream.swift @@ -14,7 +14,7 @@ import SwiftProtobuf -/// Concrete implementation of `ServerOnlyStreamInterface`. +/// Concrete **internal** implementation of `ServerOnlyStreamInterface`. final class ServerOnlyStream: Sendable { private let bidirectionalStream: BidirectionalStream diff --git a/Libraries/Connect/Internal/Streaming/URLSessionStream.swift b/Libraries/Connect/Internal/Streaming/URLSessionStream.swift index 80e5dab5..a735a1e0 100644 --- a/Libraries/Connect/Internal/Streaming/URLSessionStream.swift +++ b/Libraries/Connect/Internal/Streaming/URLSessionStream.swift @@ -122,13 +122,7 @@ final class URLSessionStream: NSObject, @unchecked Sendable { self.responseCallbacks.receiveClose( code, [:], - ConnectError( - code: code, - message: error.localizedDescription, - exception: nil, - details: [], - metadata: [:] - ) + ConnectError(code: code, message: error.localizedDescription) ) } else { self.responseCallbacks.receiveClose(.ok, [:], nil) diff --git a/Libraries/Connect/Internal/Utilities/Locked.swift b/Libraries/Connect/Internal/Utilities/Locked.swift index bd20c4e1..6dbf8984 100644 --- a/Libraries/Connect/Internal/Utilities/Locked.swift +++ b/Libraries/Connect/Internal/Utilities/Locked.swift @@ -16,12 +16,12 @@ import Foundation /// Class containing an internal lock which can be used to ensure thread-safe access to an /// underlying value. Conforms to `Sendable`, making it accessible from `@Sendable` closures. -final class Locked: @unchecked Sendable { +final class Locked: @unchecked Sendable { private let lock = Lock() - private var wrappedValue: T + private var wrappedValue: Wrapped /// Thread-safe access to the underlying value. - var value: T { + var value: Wrapped { get { self.lock.perform { self.wrappedValue } } set { self.lock.perform { self.wrappedValue = newValue } } } @@ -29,13 +29,16 @@ final class Locked: @unchecked Sendable { /// Perform an action with the underlying value, potentially updating that value. /// /// - parameter action: Closure to perform with the underlying value. - func perform(action: @escaping (inout T) -> Void) { - self.lock.perform { + /// + /// - returns: The value returned by the closure. + @discardableResult + func perform(action: @escaping (inout Wrapped) -> Result) -> Result { + return self.lock.perform { action(&self.wrappedValue) } } - init(_ value: T) { + init(_ value: Wrapped) { self.wrappedValue = value } } diff --git a/Libraries/Connect/PackageInternal/ConnectError+GRPC.swift b/Libraries/Connect/PackageInternal/ConnectError+GRPC.swift index b603c878..38a83f6d 100644 --- a/Libraries/Connect/PackageInternal/ConnectError+GRPC.swift +++ b/Libraries/Connect/PackageInternal/ConnectError+GRPC.swift @@ -15,8 +15,8 @@ import Foundation extension ConnectError { - /// This should not be considered part of Connect's public/stable interface, and is subject - /// to change. When the compiler supports it, this should be package-internal. + /// **This should not be considered part of Connect's public/stable interface, and is subject + /// to change. When the compiler supports it, this should be package-internal.** /// /// Parses gRPC headers and/or trailers to obtain the status and any potential error. /// @@ -29,12 +29,8 @@ extension ConnectError { _ headers: Headers?, trailers: Trailers? ) -> (grpcCode: Code, error: ConnectError?) { // "Trailers-only" responses can be sent in the headers or trailers block. - // Check for a valid gRPC status in the headers first, then in the trailers. - guard let grpcCode = headers?.grpcStatus() ?? trailers?.grpcStatus() else { - return (.unknown, ConnectError( - code: .unknown, message: "RPC response missing status", exception: nil, - details: [], metadata: [:] - )) + guard let grpcCode = trailers?.grpcStatus() ?? headers?.grpcStatus() else { + return (.unknown, ConnectError(code: .unknown, message: "RPC response missing status")) } if grpcCode == .ok { diff --git a/Libraries/Connect/PackageInternal/Envelope.swift b/Libraries/Connect/PackageInternal/Envelope.swift index 261ec8db..e8378134 100644 --- a/Libraries/Connect/PackageInternal/Envelope.swift +++ b/Libraries/Connect/PackageInternal/Envelope.swift @@ -15,11 +15,16 @@ import Foundation import SwiftProtobuf -/// Provides functionality for packing and unpacking (headers and length prefixed) messages. +/// **This should not be considered part of Connect's public/stable interface, and is subject +/// to change. When the compiler supports it, this should be package-internal.** /// -/// This should not be considered part of Connect's public/stable interface, and is subject -/// to change. When the compiler supports it, this should be package-internal. +/// Provides functionality for packing and unpacking (headers and length prefixed) messages. public enum Envelope { + /// The total number of bytes that will prefix a message. + public static var prefixLength: Int { + return 5 // Header flags (1 byte) + message length (4 bytes) + } + /// Packs a message into an "envelope", adding required header bytes and optionally /// applying compression. /// @@ -85,15 +90,14 @@ public enum Envelope { } } - // MARK: - Internal - - enum Error: Swift.Error { - case missingExpectedCompressionPool - } - - /// The total number of bytes that will prefix a message. - static var prefixLength: Int { - return 5 // Header flags (1 byte) + message length (4 bytes) + /// Determines whether the specified data is compressed + /// by assessing its compression prefix flag. + /// + /// - parameter packedData: The packed data to analyze. + /// + /// - returns: True if the data is compressed. + public static func isCompressed(_ packedData: Data) -> Bool { + return !packedData.isEmpty && (0b00000001 & packedData[0] != 0) } /// Computes the length of the message contained by a packed chunk of data. @@ -107,7 +111,7 @@ public enum Envelope { /// - returns: The length of the next expected message in the packed data. If multiple chunks /// are specified, this will return the length of the first. Returns -1 if there is /// not enough prefix data to determine the message length. - static func messageLength(forPackedData data: Data) -> Int { + public static func messageLength(forPackedData data: Data) -> Int { guard data.count >= self.prefixLength else { return -1 } @@ -119,6 +123,12 @@ public enum Envelope { return Int(messageLength) } + // MARK: - Internal + + enum Error: Swift.Error { + case missingExpectedCompressionPool + } + // MARK: - Private private static func write(lengthOf message: Data, to buffer: inout Data) { diff --git a/Libraries/Connect/PackageInternal/Headers+GRPC.swift b/Libraries/Connect/PackageInternal/Headers+GRPC.swift index bada05c6..a9cace45 100644 --- a/Libraries/Connect/PackageInternal/Headers+GRPC.swift +++ b/Libraries/Connect/PackageInternal/Headers+GRPC.swift @@ -15,8 +15,8 @@ import Foundation extension Headers { - /// This should not be considered part of Connect's public/stable interface, and is subject - /// to change. When the compiler supports it, this should be package-internal. + /// **This should not be considered part of Connect's public/stable interface, and is subject + /// to change. When the compiler supports it, this should be package-internal.** /// /// Adds required headers to gRPC and gRPC-Web requests/streams. /// diff --git a/Libraries/Connect/PackageInternal/Trailers+gRPC.swift b/Libraries/Connect/PackageInternal/Trailers+gRPC.swift index 520fea81..b994d9cf 100644 --- a/Libraries/Connect/PackageInternal/Trailers+gRPC.swift +++ b/Libraries/Connect/PackageInternal/Trailers+gRPC.swift @@ -13,8 +13,8 @@ // limitations under the License. extension Trailers { - /// This should not be considered part of Connect's public/stable interface, and is subject - /// to change. When the compiler supports it, this should be package-internal. + /// **This should not be considered part of Connect's public/stable interface, and is subject + /// to change. When the compiler supports it, this should be package-internal.** /// /// Identifies the status code from gRPC and gRPC-Web trailers. /// diff --git a/Libraries/Connect/Public/Implementation/Clients/ProtocolClient.swift b/Libraries/Connect/Public/Implementation/Clients/ProtocolClient.swift index 214aafc8..7fd6f858 100644 --- a/Libraries/Connect/Public/Implementation/Clients/ProtocolClient.swift +++ b/Libraries/Connect/Public/Implementation/Clients/ProtocolClient.swift @@ -183,9 +183,11 @@ extension ProtocolClient: ProtocolClientInterface { headers: Headers, onResult: @escaping @Sendable (StreamResult) -> Void ) -> any ClientOnlyStreamInterface { - return BidirectionalStream(requestCallbacks: self.createRequestCallbacks( - path: path, headers: headers, onResult: onResult - )) + let clientOnly = ClientOnlyStream(onResult: onResult) + let callbacks: RequestCallbacks = self.createRequestCallbacks( + path: path, headers: headers, onResult: { clientOnly.handleResultFromServer($0) } + ) + return clientOnly.configureForSending(with: callbacks) } public func serverOnlyStream< @@ -226,7 +228,8 @@ extension ProtocolClient: ProtocolClientInterface { ) -> any BidirectionalAsyncStreamInterface { let bidirectionalAsync = BidirectionalAsyncStream() let callbacks: RequestCallbacks = self.createRequestCallbacks( - path: path, headers: headers, onResult: { bidirectionalAsync.receive($0) } + path: path, headers: headers, + onResult: { bidirectionalAsync.handleResultFromServer($0) } ) return bidirectionalAsync.configureForSending(with: callbacks) } @@ -236,11 +239,11 @@ extension ProtocolClient: ProtocolClientInterface { path: String, headers: Headers ) -> any ClientOnlyAsyncStreamInterface { - let bidirectionalAsync = BidirectionalAsyncStream() + let clientOnlyAsync = ClientOnlyAsyncStream() let callbacks: RequestCallbacks = self.createRequestCallbacks( - path: path, headers: headers, onResult: { bidirectionalAsync.receive($0) } + path: path, headers: headers, onResult: { clientOnlyAsync.handleResultFromServer($0) } ) - return bidirectionalAsync.configureForSending(with: callbacks) + return clientOnlyAsync.configureForSending(with: callbacks) } @available(iOS 13, *) @@ -250,7 +253,8 @@ extension ProtocolClient: ProtocolClientInterface { ) -> any ServerOnlyAsyncStreamInterface { let bidirectionalAsync = BidirectionalAsyncStream() let callbacks: RequestCallbacks = self.createRequestCallbacks( - path: path, headers: headers, onResult: { bidirectionalAsync.receive($0) } + path: path, headers: headers, + onResult: { bidirectionalAsync.handleResultFromServer($0) } ) return ServerOnlyAsyncStream( bidirectionalStream: bidirectionalAsync.configureForSending(with: callbacks) diff --git a/Libraries/Connect/Public/Implementation/Clients/URLSessionHTTPClient.swift b/Libraries/Connect/Public/Implementation/Clients/URLSessionHTTPClient.swift index 551f6d1f..f045732d 100644 --- a/Libraries/Connect/Public/Implementation/Clients/URLSessionHTTPClient.swift +++ b/Libraries/Connect/Public/Implementation/Clients/URLSessionHTTPClient.swift @@ -70,7 +70,7 @@ open class URLSessionHTTPClient: NSObject, HTTPClientInterface, @unchecked Senda error: ConnectError( code: code, message: error.localizedDescription, - exception: error, details: [], metadata: [:] + exception: error ), tracingInfo: nil )) @@ -83,7 +83,7 @@ open class URLSessionHTTPClient: NSObject, HTTPClientInterface, @unchecked Senda error: ConnectError( code: .unknown, message: "unexpected response type \(type(of: urlResponse))", - exception: error, details: [], metadata: [:] + exception: error ), tracingInfo: nil )) diff --git a/Libraries/Connect/Public/Interfaces/ConnectError.swift b/Libraries/Connect/Public/Interfaces/ConnectError.swift index d7432930..c935c94a 100644 --- a/Libraries/Connect/Public/Interfaces/ConnectError.swift +++ b/Libraries/Connect/Public/Interfaces/ConnectError.swift @@ -48,7 +48,8 @@ public struct ConnectError: Swift.Error, Sendable { } public init( - code: Code, message: String?, exception: Error?, details: [Detail], metadata: Headers + code: Code, message: String?, + exception: Error? = nil, details: [Detail] = [], metadata: Headers = [:] ) { self.code = code self.message = message @@ -113,8 +114,7 @@ extension ConnectError { guard let source = source else { return .init( - code: code, message: "empty error message from source", exception: nil, - details: [], metadata: metadata + code: code, message: "empty error message from source", metadata: metadata ) } @@ -123,16 +123,13 @@ extension ConnectError { } catch let error { return .init( code: code, message: String(data: source, encoding: .utf8), - exception: error, details: [], metadata: metadata + exception: error, metadata: metadata ) } } public static func canceled() -> Self { - return .init( - code: .canceled, message: "request canceled by client", - exception: nil, details: [], metadata: [:] - ) + return .init(code: .canceled, message: "request canceled by client") } } diff --git a/Libraries/Connect/Public/Interfaces/Streaming/AsyncAwait/ClientOnlyAsyncStreamInterface.swift b/Libraries/Connect/Public/Interfaces/Streaming/AsyncAwait/ClientOnlyAsyncStreamInterface.swift index 134f39db..4a1a4116 100644 --- a/Libraries/Connect/Public/Interfaces/Streaming/AsyncAwait/ClientOnlyAsyncStreamInterface.swift +++ b/Libraries/Connect/Public/Interfaces/Streaming/AsyncAwait/ClientOnlyAsyncStreamInterface.swift @@ -39,8 +39,9 @@ public protocol ClientOnlyAsyncStreamInterface { /// - returns: An `AsyncStream` that contains all outputs/results from the stream. func results() -> AsyncStream> - /// Close the stream. No calls to `send()` are valid after calling `close()`. - func close() + /// Close the stream and await a response. + /// No calls to `send()` are valid after calling `closeAndReceive()`. + func closeAndReceive() /// Cancel the stream and return a canceled code. /// No calls to `send()` are valid after calling `cancel()`. diff --git a/Libraries/Connect/Public/Interfaces/Streaming/Callbacks/ClientOnlyStreamInterface.swift b/Libraries/Connect/Public/Interfaces/Streaming/Callbacks/ClientOnlyStreamInterface.swift index 3a8bd5c0..e1ffc133 100644 --- a/Libraries/Connect/Public/Interfaces/Streaming/Callbacks/ClientOnlyStreamInterface.swift +++ b/Libraries/Connect/Public/Interfaces/Streaming/Callbacks/ClientOnlyStreamInterface.swift @@ -28,8 +28,9 @@ public protocol ClientOnlyStreamInterface { @discardableResult func send(_ input: Input) throws -> Self - /// Close the stream. No calls to `send()` are valid after calling `close()`. - func close() + /// Close the stream and await a response. + /// No calls to `send()` are valid after calling `closeAndReceive()`. + func closeAndReceive() /// Cancel the stream and return a canceled code. /// No calls to `send()` are valid after calling `cancel()`. diff --git a/Libraries/ConnectMocks/MockClientOnlyAsyncStream.swift b/Libraries/ConnectMocks/MockClientOnlyAsyncStream.swift index 07dd0289..692a0b94 100644 --- a/Libraries/ConnectMocks/MockClientOnlyAsyncStream.swift +++ b/Libraries/ConnectMocks/MockClientOnlyAsyncStream.swift @@ -27,6 +27,9 @@ import SwiftProtobuf open class MockClientOnlyAsyncStream< Input: ProtobufMessage, Output: ProtobufMessage ->: MockBidirectionalAsyncStream, - ClientOnlyAsyncStreamInterface, - @unchecked Sendable {} +>: MockBidirectionalAsyncStream, ClientOnlyAsyncStreamInterface, @unchecked Sendable +{ + open func closeAndReceive() { + self.close() + } +} diff --git a/Libraries/ConnectMocks/MockClientOnlyStream.swift b/Libraries/ConnectMocks/MockClientOnlyStream.swift index a1f14431..d14b80ab 100644 --- a/Libraries/ConnectMocks/MockClientOnlyStream.swift +++ b/Libraries/ConnectMocks/MockClientOnlyStream.swift @@ -26,6 +26,8 @@ import SwiftProtobuf open class MockClientOnlyStream< Input: ProtobufMessage, Output: ProtobufMessage ->: MockBidirectionalStream, - ClientOnlyStreamInterface, - @unchecked Sendable {} +>: MockBidirectionalStream, ClientOnlyStreamInterface, @unchecked Sendable { + open func closeAndReceive() { + self.close() + } +} diff --git a/Libraries/ConnectNIO/Internal/Extensions/ConnectError+Extensions.swift b/Libraries/ConnectNIO/Internal/Extensions/ConnectError+Extensions.swift index 66103100..29ea36f2 100644 --- a/Libraries/ConnectNIO/Internal/Extensions/ConnectError+Extensions.swift +++ b/Libraries/ConnectNIO/Internal/Extensions/ConnectError+Extensions.swift @@ -16,12 +16,6 @@ import Connect extension ConnectError { static func deadlineExceeded() -> Self { - return .init( - code: .deadlineExceeded, - message: "timed out", - exception: nil, - details: [], - metadata: [:] - ) + return .init(code: .deadlineExceeded, message: "timed out") } } diff --git a/Libraries/ConnectNIO/Internal/GRPCInterceptor.swift b/Libraries/ConnectNIO/Internal/GRPCInterceptor.swift index 654d5443..b6b5e377 100644 --- a/Libraries/ConnectNIO/Internal/GRPCInterceptor.swift +++ b/Libraries/ConnectNIO/Internal/GRPCInterceptor.swift @@ -59,26 +59,86 @@ extension GRPCInterceptor: UnaryInterceptor { return } + let contentType = response.headers[HeaderConstants.contentType]?.first ?? "" + if !self.contentTypeIsExpectedGRPC(contentType) { + // If content-type looks like it could be a gRPC server's response, consider + // this an internal error. + let code: Code = self.contentTypeIsGRPC(contentType) ? .internalError : .unknown + proceed(HTTPResponse( + code: code, headers: response.headers, message: nil, trailers: response.trailers, + error: ConnectError(code: code, message: "unexpected content-type: \(contentType)"), + tracingInfo: response.tracingInfo + )) + return + } + let (grpcCode, connectError) = ConnectError.parseGRPCHeaders( response.headers, trailers: response.trailers ) guard grpcCode == .ok, let rawData = response.message, !rawData.isEmpty else { + if response.trailers.grpcStatus() == nil && response.message?.isEmpty == false { + proceed(HTTPResponse( + code: .internalError, + headers: response.headers, + message: response.message, + trailers: response.trailers, + error: ConnectError( + code: .internalError, + message: "unary response message should be followed by trailers" + ), + tracingInfo: response.tracingInfo + )) + } else if grpcCode == .ok { + proceed(HTTPResponse( + code: .unimplemented, + headers: response.headers, + message: response.message, + trailers: response.trailers, + error: ConnectError( + code: .unimplemented, message: "unary response has no message" + ), + tracingInfo: response.tracingInfo + )) + } else { + proceed(HTTPResponse( + code: grpcCode, + headers: response.headers, + message: response.message, + trailers: response.trailers, + error: connectError ?? response.error, + tracingInfo: response.tracingInfo + )) + } + return + } + + let compressionPool = response + .headers[HeaderConstants.grpcContentEncoding]? + .first + .flatMap { self.config.responseCompressionPool(forName: $0) } + if compressionPool == nil && Envelope.isCompressed(rawData) { proceed(HTTPResponse( - code: grpcCode, + code: .internalError, headers: response.headers, message: nil, + trailers: response.trailers, error: ConnectError( + code: .internalError, message: "received unexpected compressed message" + ), tracingInfo: response.tracingInfo + )) + return + } else if Envelope.containsMultipleGRPCMessages(rawData) { + proceed(HTTPResponse( + code: .unimplemented, headers: response.headers, - message: response.message, + message: nil, trailers: response.trailers, - error: connectError ?? response.error, + error: ConnectError( + code: .unimplemented, message: "unary response has multiple messages" + ), tracingInfo: response.tracingInfo )) return } - let compressionPool = response - .headers[HeaderConstants.grpcContentEncoding]? - .first - .flatMap { self.config.responseCompressionPool(forName: $0) } do { let messageData = try Envelope.unpackMessage( rawData, compressionPool: compressionPool @@ -133,6 +193,20 @@ extension GRPCInterceptor: StreamInterceptor { switch result { case .headers(let headers): self.streamResponseHeaders.value = headers + + let contentType = headers[HeaderConstants.contentType]?.first ?? "" + if !self.contentTypeIsExpectedGRPC(contentType) { + // If content-type looks like it could be a gRPC server's response, consider + // this an internal error. + let code: Code = self.contentTypeIsGRPC(contentType) ? .internalError : .unknown + proceed(.complete( + code: code, error: ConnectError( + code: code, message: "unexpected content-type: \(contentType)" + ), trailers: headers + )) + return + } + proceed(result) case .message(let rawData): @@ -140,9 +214,19 @@ extension GRPCInterceptor: StreamInterceptor { let responseCompressionPool = self.streamResponseHeaders.value?[ HeaderConstants.grpcContentEncoding ]?.first.flatMap { self.config.responseCompressionPool(forName: $0) } - proceed(.message(try Envelope.unpackMessage( + if responseCompressionPool == nil && Envelope.isCompressed(rawData) { + proceed(.complete( + code: .internalError, error: ConnectError( + code: .internalError, message: "received unexpected compressed message" + ), trailers: [:] + )) + return + } + + let unpackedMessage = try Envelope.unpackMessage( rawData, compressionPool: responseCompressionPool - ).unpacked)) + ).unpacked + proceed(.message(unpackedMessage)) } catch let error { // TODO: Close the stream here? proceed(.complete(code: .unknown, error: error, trailers: nil)) @@ -174,6 +258,26 @@ extension GRPCInterceptor: StreamInterceptor { } } } + + // MARK: - Private + + private func contentTypeIsGRPC(_ contentType: String) -> Bool { + return contentType == "application/grpc" + || contentType.hasPrefix("application/grpc+") + } + + private func contentTypeIsExpectedGRPC(_ contentType: String) -> Bool { + let codecName = self.config.codec.name() + return (codecName == "proto" && contentType == "application/grpc") + || contentType == "application/grpc+\(codecName)" + } +} + +private extension Envelope { + static func containsMultipleGRPCMessages(_ packedData: Data) -> Bool { + let messageLength = self.messageLength(forPackedData: packedData) + return packedData.count > messageLength + self.prefixLength + } } private final class Locked: @unchecked Sendable { diff --git a/Makefile b/Makefile index c3fb7b61..c077052d 100644 --- a/Makefile +++ b/Makefile @@ -78,8 +78,8 @@ $(BIN)/license-headers: Makefile testconformance: ## Run all conformance tests swift build -c release --product ConnectConformanceClient mv ./.build/release/ConnectConformanceClient $(BIN) - PATH="$(abspath $(BIN)):$(PATH)" connectconformance --trace --conf ./Tests/ConformanceClient/InvocationConfigs/urlsession.yaml --known-failing @./Tests/ConformanceClient/InvocationConfigs/urlsession-opt-outs.txt --mode client $(BIN)/ConnectConformanceClient httpclient=urlsession - PATH="$(abspath $(BIN)):$(PATH)" connectconformance --trace --conf ./Tests/ConformanceClient/InvocationConfigs/nio.yaml --known-failing @./Tests/ConformanceClient/InvocationConfigs/nio-opt-outs.txt --mode client $(BIN)/ConnectConformanceClient httpclient=nio + PATH="$(abspath $(BIN)):$(PATH)" connectconformance --trace --conf ./Tests/ConformanceClient/InvocationConfigs/urlsession.yaml --mode client $(BIN)/ConnectConformanceClient httpclient=urlsession + PATH="$(abspath $(BIN)):$(PATH)" connectconformance --trace --conf ./Tests/ConformanceClient/InvocationConfigs/nio.yaml --mode client $(BIN)/ConnectConformanceClient httpclient=nio .PHONY: testunit testunit: ## Run all unit tests diff --git a/Tests/ConformanceClient/InvocationConfigs/nio-opt-outs.txt b/Tests/ConformanceClient/InvocationConfigs/nio-opt-outs.txt deleted file mode 100644 index 260f45de..00000000 --- a/Tests/ConformanceClient/InvocationConfigs/nio-opt-outs.txt +++ /dev/null @@ -1,26 +0,0 @@ -Connect Unexpected Responses/HTTPVersion:2/TLS:false/client-stream/multiple-responses -Connect Unexpected Responses/HTTPVersion:2/TLS:false/client-stream/ok-but-no-response -Connect Unexpected Responses/HTTPVersion:2/TLS:false/unexpected-codec -Connect Unexpected Responses/HTTPVersion:2/TLS:false/unexpected-compressed-message -Connect Unexpected Responses/HTTPVersion:2/TLS:false/unexpected-compression -Connect Unexpected Responses/HTTPVersion:2/TLS:false/unexpected-content-type -Connect Unexpected Responses/HTTPVersion:2/TLS:false/unexpected-stream-codec -gRPC Unexpected Responses/HTTPVersion:2/TLS:false/client-stream/multiple-responses -gRPC Unexpected Responses/HTTPVersion:2/TLS:false/client-stream/ok-but-no-response -gRPC Unexpected Responses/HTTPVersion:2/TLS:false/trailers-only/ignore-header-if-body-present -gRPC Unexpected Responses/HTTPVersion:2/TLS:false/trailers-only/ignore-header-if-trailer-present -gRPC Unexpected Responses/HTTPVersion:2/TLS:false/unary/multiple-responses -gRPC Unexpected Responses/HTTPVersion:2/TLS:false/unary/ok-but-no-response -gRPC Unexpected Responses/HTTPVersion:2/TLS:false/unexpected-codec -gRPC Unexpected Responses/HTTPVersion:2/TLS:false/unexpected-compressed-message -gRPC Unexpected Responses/HTTPVersion:2/TLS:false/unexpected-compression -gRPC-Web Unexpected Responses/HTTPVersion:2/TLS:false/trailers-in-body/client-stream/multiple-responses -gRPC-Web Unexpected Responses/HTTPVersion:2/TLS:false/trailers-in-body/client-stream/ok-but-no-response -gRPC-Web Unexpected Responses/HTTPVersion:2/TLS:false/trailers-in-body/unary/multiple-responses -gRPC-Web Unexpected Responses/HTTPVersion:2/TLS:false/trailers-in-body/unary/ok-but-no-response -gRPC-Web Unexpected Responses/HTTPVersion:2/TLS:false/trailers-only/client-stream/ok-but-no-response -gRPC-Web Unexpected Responses/HTTPVersion:2/TLS:false/trailers-only/ignore-header-if-body-present -gRPC-Web Unexpected Responses/HTTPVersion:2/TLS:false/trailers-only/unary/ok-but-no-response -gRPC-Web Unexpected Responses/HTTPVersion:2/TLS:false/unexpected-codec -gRPC-Web Unexpected Responses/HTTPVersion:2/TLS:false/unexpected-compressed-message -gRPC-Web Unexpected Responses/HTTPVersion:2/TLS:false/unexpected-compression diff --git a/Tests/ConformanceClient/InvocationConfigs/urlsession-opt-outs.txt b/Tests/ConformanceClient/InvocationConfigs/urlsession-opt-outs.txt deleted file mode 100644 index f184a9d0..00000000 --- a/Tests/ConformanceClient/InvocationConfigs/urlsession-opt-outs.txt +++ /dev/null @@ -1,17 +0,0 @@ -Connect Unexpected Responses/HTTPVersion:1/TLS:false/client-stream/multiple-responses -Connect Unexpected Responses/HTTPVersion:1/TLS:false/client-stream/ok-but-no-response -Connect Unexpected Responses/HTTPVersion:1/TLS:false/unexpected-codec -Connect Unexpected Responses/HTTPVersion:1/TLS:false/unexpected-compressed-message -Connect Unexpected Responses/HTTPVersion:1/TLS:false/unexpected-compression -Connect Unexpected Responses/HTTPVersion:1/TLS:false/unexpected-content-type -Connect Unexpected Responses/HTTPVersion:1/TLS:false/unexpected-stream-codec -gRPC-Web Unexpected Responses/HTTPVersion:1/TLS:false/trailers-in-body/client-stream/multiple-responses -gRPC-Web Unexpected Responses/HTTPVersion:1/TLS:false/trailers-in-body/client-stream/ok-but-no-response -gRPC-Web Unexpected Responses/HTTPVersion:1/TLS:false/trailers-in-body/unary/multiple-responses -gRPC-Web Unexpected Responses/HTTPVersion:1/TLS:false/trailers-in-body/unary/ok-but-no-response -gRPC-Web Unexpected Responses/HTTPVersion:1/TLS:false/trailers-only/client-stream/ok-but-no-response -gRPC-Web Unexpected Responses/HTTPVersion:1/TLS:false/trailers-only/ignore-header-if-body-present -gRPC-Web Unexpected Responses/HTTPVersion:1/TLS:false/trailers-only/unary/ok-but-no-response -gRPC-Web Unexpected Responses/HTTPVersion:1/TLS:false/unexpected-codec -gRPC-Web Unexpected Responses/HTTPVersion:1/TLS:false/unexpected-compressed-message -gRPC-Web Unexpected Responses/HTTPVersion:1/TLS:false/unexpected-compression diff --git a/Tests/ConformanceClient/Sources/ConformanceInvoker.swift b/Tests/ConformanceClient/Sources/ConformanceInvoker.swift index ead7bc0b..6cbd921a 100644 --- a/Tests/ConformanceClient/Sources/ConformanceInvoker.swift +++ b/Tests/ConformanceClient/Sources/ConformanceInvoker.swift @@ -265,9 +265,9 @@ final class ConformanceInvoker { stream.cancel() } case .afterNumResponses: // Does not apply to client-only streams. - stream.close() + stream.closeAndReceive() case .none: - stream.close() + stream.closeAndReceive() } var conformanceResult = ConformanceResult() diff --git a/Tests/UnitTests/ConnectLibraryTests/ConnectTests/EnvelopeTests.swift b/Tests/UnitTests/ConnectLibraryTests/ConnectTests/EnvelopeTests.swift index 3e09378d..6fc0ee22 100644 --- a/Tests/UnitTests/ConnectLibraryTests/ConnectTests/EnvelopeTests.swift +++ b/Tests/UnitTests/ConnectLibraryTests/ConnectTests/EnvelopeTests.swift @@ -23,6 +23,7 @@ final class EnvelopeTests: XCTestCase { ) let compressed = try GzipCompressionPool().compress(data: originalData) XCTAssertEqual(packed[0], 1) // Compression flag = true + XCTAssertTrue(Envelope.isCompressed(packed)) XCTAssertEqual(Envelope.messageLength(forPackedData: packed), compressed.count) XCTAssertEqual(packed[5...], compressed) // Post-prefix data should match compressed value @@ -35,6 +36,7 @@ final class EnvelopeTests: XCTestCase { let originalData = Data(repeating: 0xa, count: 50) let packed = Envelope.packMessage(originalData, using: nil) XCTAssertEqual(packed[0], 0) // Compression flag = false + XCTAssertFalse(Envelope.isCompressed(packed)) XCTAssertEqual(Envelope.messageLength(forPackedData: packed), originalData.count) XCTAssertEqual(packed[5...], originalData) // Post-prefix data should match compressed value @@ -50,6 +52,7 @@ final class EnvelopeTests: XCTestCase { originalData, using: .init(minBytes: 100, pool: GzipCompressionPool()) ) XCTAssertEqual(packed[0], 0) // Compression flag = false + XCTAssertFalse(Envelope.isCompressed(packed)) XCTAssertEqual(Envelope.messageLength(forPackedData: packed), originalData.count) XCTAssertEqual(packed[5...], originalData) // Post-prefix data should match compressed value @@ -66,6 +69,7 @@ final class EnvelopeTests: XCTestCase { ) let compressed = try GzipCompressionPool().compress(data: originalData) XCTAssertEqual(packed[0], 1) // Compression flag = true + XCTAssertTrue(Envelope.isCompressed(packed)) XCTAssertEqual(Envelope.messageLength(forPackedData: packed), compressed.count) XCTAssertEqual(packed[5...], compressed) // Post-prefix data should match compressed value