From 02c3cbbbb38ab7a34625f12e728773e61eb0747e Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Thu, 16 Jan 2025 16:35:46 +0000 Subject: [PATCH 01/16] Fix license check script --- dev/license-check.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/license-check.sh b/dev/license-check.sh index ce643ac..a0ffd42 100755 --- a/dev/license-check.sh +++ b/dev/license-check.sh @@ -88,7 +88,7 @@ check_copyright_headers() { actual_sha=$(head -n "$((drop_first + expected_lines))" "$filename" \ | tail -n "$expected_lines" \ - | sed -e 's/201[56789]-20[12][0-9]/YEARS/' -e 's/20[12][0-9]/YEARS/' \ + | sed -e 's/20[12][0-9]-20[12][0-9]/YEARS/' -e 's/20[12][0-9]/YEARS/' \ | shasum \ | awk '{print $1}') From 424b8aaaee0d766c0de17ee129990758df90d18d Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Thu, 16 Jan 2025 16:52:37 +0000 Subject: [PATCH 02/16] Add OTel attributes and rename to ClienOTelTracingInterceptor --- .../ClientTracingInterceptor.swift | 140 ------------- .../ClientOTelTracingInterceptor.swift | 194 ++++++++++++++++++ .../SpanAttributes+RPCAttributes.swift | 60 ++++++ 3 files changed, 254 insertions(+), 140 deletions(-) delete mode 100644 Sources/GRPCInterceptors/ClientTracingInterceptor.swift create mode 100644 Sources/GRPCInterceptors/Tracing/ClientOTelTracingInterceptor.swift create mode 100644 Sources/GRPCInterceptors/Tracing/SpanAttributes+RPCAttributes.swift diff --git a/Sources/GRPCInterceptors/ClientTracingInterceptor.swift b/Sources/GRPCInterceptors/ClientTracingInterceptor.swift deleted file mode 100644 index a4c85c9..0000000 --- a/Sources/GRPCInterceptors/ClientTracingInterceptor.swift +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Copyright 2024, gRPC Authors All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -public import GRPCCore -internal import Tracing - -/// A client interceptor that injects tracing information into the request. -/// -/// The tracing information is taken from the current `ServiceContext`, and injected into the request's -/// metadata. It will then be picked up by the server-side ``ServerTracingInterceptor``. -/// -/// For more information, refer to the documentation for `swift-distributed-tracing`. -public struct ClientTracingInterceptor: ClientInterceptor { - private let injector: ClientRequestInjector - private let emitEventOnEachWrite: Bool - - /// Create a new instance of a ``ClientTracingInterceptor``. - /// - /// - Parameter emitEventOnEachWrite: If `true`, each request part sent and response part - /// received will be recorded as a separate event in a tracing span. Otherwise, only the request/response - /// start and end will be recorded as events. - public init(emitEventOnEachWrite: Bool = false) { - self.injector = ClientRequestInjector() - self.emitEventOnEachWrite = emitEventOnEachWrite - } - - /// This interceptor will inject as the request's metadata whatever `ServiceContext` key-value pairs - /// have been made available by the tracing implementation bootstrapped in your application. - /// - /// Which key-value pairs are injected will depend on the specific tracing implementation - /// that has been configured when bootstrapping `swift-distributed-tracing` in your application. - public func intercept( - request: StreamingClientRequest, - context: ClientContext, - next: ( - StreamingClientRequest, - ClientContext - ) async throws -> StreamingClientResponse - ) async throws -> StreamingClientResponse where Input: Sendable, Output: Sendable { - var request = request - let tracer = InstrumentationSystem.tracer - let serviceContext = ServiceContext.current ?? .topLevel - - tracer.inject( - serviceContext, - into: &request.metadata, - using: self.injector - ) - - return try await tracer.withSpan( - context.descriptor.fullyQualifiedMethod, - context: serviceContext, - ofKind: .client - ) { span in - span.addEvent("Request started") - - if self.emitEventOnEachWrite { - let wrappedProducer = request.producer - request.producer = { writer in - let eventEmittingWriter = HookedWriter( - wrapping: writer, - beforeEachWrite: { - span.addEvent("Sending request part") - }, - afterEachWrite: { - span.addEvent("Sent request part") - } - ) - - do { - try await wrappedProducer(RPCWriter(wrapping: eventEmittingWriter)) - } catch { - span.addEvent("Error encountered") - throw error - } - - span.addEvent("Request end") - } - } - - var response: StreamingClientResponse - do { - response = try await next(request, context) - } catch { - span.addEvent("Error encountered") - throw error - } - - switch response.accepted { - case .success(var success): - if self.emitEventOnEachWrite { - let onEachPartRecordingSequence = success.bodyParts.map { element in - span.addEvent("Received response part") - return element - } - let onFinishRecordingSequence = OnFinishAsyncSequence( - wrapping: onEachPartRecordingSequence - ) { - span.addEvent("Received response end") - } - success.bodyParts = RPCAsyncSequence(wrapping: onFinishRecordingSequence) - response.accepted = .success(success) - } else { - let onFinishRecordingSequence = OnFinishAsyncSequence(wrapping: success.bodyParts) { - span.addEvent("Received response end") - } - success.bodyParts = RPCAsyncSequence(wrapping: onFinishRecordingSequence) - response.accepted = .success(success) - } - case .failure: - span.addEvent("Received error response") - } - - return response - } - } -} - -/// An injector responsible for injecting the required instrumentation keys from the `ServiceContext` into -/// the request metadata. -struct ClientRequestInjector: Instrumentation.Injector { - typealias Carrier = Metadata - - func inject(_ value: String, forKey key: String, into carrier: inout Carrier) { - carrier.addString(value, forKey: key) - } -} diff --git a/Sources/GRPCInterceptors/Tracing/ClientOTelTracingInterceptor.swift b/Sources/GRPCInterceptors/Tracing/ClientOTelTracingInterceptor.swift new file mode 100644 index 0000000..c379c25 --- /dev/null +++ b/Sources/GRPCInterceptors/Tracing/ClientOTelTracingInterceptor.swift @@ -0,0 +1,194 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public import GRPCCore +internal import Tracing +internal import Synchronization + +/// A client interceptor that injects tracing information into the request. +/// +/// The tracing information is taken from the current `ServiceContext`, and injected into the request's +/// metadata. It will then be picked up by the server-side ``ServerTracingInterceptor``. +/// +/// For more information, refer to the documentation for `swift-distributed-tracing`. +public struct ClientOTelTracingInterceptor: ClientInterceptor { + private let injector: ClientRequestInjector + private let emitEventOnEachWrite: Bool + private var serverHostname: String + private var networkTransportMethod: String + + /// Create a new instance of a ``ClientOTelTracingInterceptor``. + /// + /// - Parameters: + /// - severHostname: The hostname of the RPC server. This will be the value for the `server.address` attribute in spans. + /// - networkTransportMethod: The transport in use (e.g. "tcp", "udp"). This will be the value for the + /// `network.transport` attribute in spans. + /// - emitEventOnEachWrite: If `true`, each request part sent and response part received will be recorded as a separate + /// event in a tracing span. Otherwise, only the request/response start and end will be recorded as events. + public init( + serverHostname: String, + networkTransportMethod: String, + emitEventOnEachWrite: Bool = false + ) { + self.injector = ClientRequestInjector() + self.serverHostname = serverHostname + self.networkTransportMethod = networkTransportMethod + self.emitEventOnEachWrite = emitEventOnEachWrite + } + + /// This interceptor will inject as the request's metadata whatever `ServiceContext` key-value pairs + /// have been made available by the tracing implementation bootstrapped in your application. + /// + /// Which key-value pairs are injected will depend on the specific tracing implementation + /// that has been configured when bootstrapping `swift-distributed-tracing` in your application. + /// + /// It will also inject all required and recommended span and event attributes, and set span status, as defined by OpenTelemetry's + /// documentation on: + /// - https://opentelemetry.io/docs/specs/semconv/rpc/rpc-spans + /// - https://opentelemetry.io/docs/specs/semconv/rpc/grpc/ + public func intercept( + request: StreamingClientRequest, + context: ClientContext, + next: ( + StreamingClientRequest, + ClientContext + ) async throws -> StreamingClientResponse + ) async throws -> StreamingClientResponse where Input: Sendable, Output: Sendable { + var request = request + let tracer = InstrumentationSystem.tracer + let serviceContext = ServiceContext.current ?? .topLevel + + tracer.inject( + serviceContext, + into: &request.metadata, + using: self.injector + ) + + return try await tracer.withSpan( + context.descriptor.fullyQualifiedMethod, + context: serviceContext, + ofKind: .client + ) { span in + self.setOTelSpanAttributes(into: span, context: context) + + span.addEvent("Request started") + + let wrappedProducer = request.producer + if self.emitEventOnEachWrite { + request.producer = { writer in + let messageSentCounter = Atomic(1) + let eventEmittingWriter = HookedWriter( + wrapping: writer, + beforeEachWrite: {}, + afterEachWrite: { + var event = SpanEvent(name: "rpc.message") + event.attributes.rpc.messageType = "SENT" + event.attributes.rpc.messageID = messageSentCounter + .wrappingAdd(1, ordering: .sequentiallyConsistent) + .oldValue + span.addEvent(event) + } + ) + try await wrappedProducer(RPCWriter(wrapping: eventEmittingWriter)) + span.addEvent("Request ended") + } + } else { + request.producer = { writer in + try await wrappedProducer(RPCWriter(wrapping: writer)) + span.addEvent("Request ended") + } + } + + var response = try await next(request, context) + switch response.accepted { + case .success(var success): + span.addEvent("Received response start") + span.attributes.rpc.grpcStatusCode = 0 + if self.emitEventOnEachWrite { + let messageReceivedCounter = Atomic(1) + let onEachPartRecordingSequence = success.bodyParts.map { element in + var event = SpanEvent(name: "rpc.message") + event.attributes.rpc.messageType = "RECEIVED" + event.attributes.rpc.messageID = messageReceivedCounter + .wrappingAdd(1, ordering: .sequentiallyConsistent) + .oldValue + span.addEvent(event) + return element + } + + let onFinishRecordingSequence = OnFinishAsyncSequence( + wrapping: onEachPartRecordingSequence + ) { + span.addEvent("Received response end") + } + + success.bodyParts = RPCAsyncSequence(wrapping: onFinishRecordingSequence) + response.accepted = .success(success) + } else { + let onFinishRecordingSequence = OnFinishAsyncSequence(wrapping: success.bodyParts) { + span.addEvent("Received response end") + } + + success.bodyParts = RPCAsyncSequence(wrapping: onFinishRecordingSequence) + response.accepted = .success(success) + } + + case .failure(let error): + span.attributes.rpc.grpcStatusCode = error.code.rawValue + span.setStatus(SpanStatus(code: .error)) + span.addEvent("Received error response") + span.recordError(error) + } + + return response + } + } + + private func setOTelSpanAttributes(into span: any Span, context: ClientContext) { + span.attributes.rpc.system = "grpc" + span.attributes.rpc.service = context.descriptor.service.fullyQualifiedService + span.attributes.rpc.method = context.descriptor.method + span.attributes.rpc.serverAddress = self.serverHostname + span.attributes.rpc.networkTransport = self.networkTransportMethod + + let peer = context.remotePeer + // We expect this address to be of either of these two formats: + // - :: for ipv4 and ipv6 addresses + // - unix: for UNIX domain sockets + let components = peer.split(separator: ":") + if components.count == 2 { + // This is the UDS case + span.attributes.rpc.networkType = String(components[0]) + span.attributes.rpc.networkPeerAddress = String(components[1]) + } else if components.count == 3 { + // This is the ipv4 or ipv6 case + span.attributes.rpc.networkType = String(components[0]) + span.attributes.rpc.networkPeerAddress = String(components[1]) + span.attributes.rpc.networkPeerPort = Int(components[2]) + span.attributes.rpc.serverPort = Int(components[2]) + } + } +} + +/// An injector responsible for injecting the required instrumentation keys from the `ServiceContext` into +/// the request metadata. +struct ClientRequestInjector: Instrumentation.Injector { + typealias Carrier = Metadata + + func inject(_ value: String, forKey key: String, into carrier: inout Carrier) { + carrier.addString(value, forKey: key) + } +} diff --git a/Sources/GRPCInterceptors/Tracing/SpanAttributes+RPCAttributes.swift b/Sources/GRPCInterceptors/Tracing/SpanAttributes+RPCAttributes.swift new file mode 100644 index 0000000..1f36613 --- /dev/null +++ b/Sources/GRPCInterceptors/Tracing/SpanAttributes+RPCAttributes.swift @@ -0,0 +1,60 @@ +/* + * Copyright 2025, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Tracing + +@dynamicMemberLookup +package struct RPCAttributes: SpanAttributeNamespace { + var attributes: SpanAttributes + + init(attributes: SpanAttributes) { + self.attributes = attributes + } + + struct NestedSpanAttributes: NestedSpanAttributesProtocol { + init() {} + + var system: Key { "rpc.system" } + var method: Key { "rpc.method" } + var service: Key { "rpc.service" } + var messageID: Key { "rpc.message.id" } + var messageType: Key { "rpc.message.type" } + var grpcStatusCode: Key { "rpc.grpc.status_code" } + + var serverAddress: Key{ "server.address" } + var serverPort: Key { "server.port" } + + var clientAddress: Key { "client.address" } + var clientPort: Key { "client.port" } + + var networkTransport: Key { "network.transport" } + var networkType: Key { "network.type" } + var networkPeerAddress: Key { "network.peer.address" } + var networkPeerPort: Key { "network.peer.port" } + } +} + +package extension SpanAttributes { + /// Semantic conventions for RPC spans. + var rpc: RPCAttributes { + get { + .init(attributes: self) + } + set { + self = newValue.attributes + } + } +} From 7cada1d5f6b0f9b2a750f57c8eae08208bb01257 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Thu, 16 Jan 2025 16:53:32 +0000 Subject: [PATCH 03/16] Small changes to server interceptor --- .../ServerTracingInterceptor.swift | 22 +++++-------------- 1 file changed, 5 insertions(+), 17 deletions(-) rename Sources/GRPCInterceptors/{ => Tracing}/ServerTracingInterceptor.swift (89%) diff --git a/Sources/GRPCInterceptors/ServerTracingInterceptor.swift b/Sources/GRPCInterceptors/Tracing/ServerTracingInterceptor.swift similarity index 89% rename from Sources/GRPCInterceptors/ServerTracingInterceptor.swift rename to Sources/GRPCInterceptors/Tracing/ServerTracingInterceptor.swift index 413752d..0df9094 100644 --- a/Sources/GRPCInterceptors/ServerTracingInterceptor.swift +++ b/Sources/GRPCInterceptors/Tracing/ServerTracingInterceptor.swift @@ -98,35 +98,23 @@ public struct ServerTracingInterceptor: ServerInterceptor { } ) - let wrappedResult: Metadata - do { - wrappedResult = try await wrappedProducer( - RPCWriter(wrapping: eventEmittingWriter) - ) - } catch { - span.addEvent("Error encountered") - throw error - } + let wrappedResult = try await wrappedProducer( + RPCWriter(wrapping: eventEmittingWriter) + ) span.addEvent("Sent response end") return wrappedResult } } else { success.producer = { writer in - let wrappedResult: Metadata - do { - wrappedResult = try await wrappedProducer(writer) - } catch { - span.addEvent("Error encountered") - throw error - } - + let wrappedResult = try await wrappedProducer(writer) span.addEvent("Sent response end") return wrappedResult } } response = .init(accepted: .success(success)) + case .failure: span.addEvent("Sent error response") } From ab38ee30729d05dabdc666126c8e1c6f815c7deb Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Thu, 16 Jan 2025 16:53:39 +0000 Subject: [PATCH 04/16] Update tests --- .../TracingInterceptorTests.swift | 397 +++++++++++++++--- .../TracingTestsUtilities.swift | 65 ++- 2 files changed, 393 insertions(+), 69 deletions(-) diff --git a/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift b/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift index cf847ca..205a1f0 100644 --- a/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift +++ b/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift @@ -28,7 +28,10 @@ final class TracingInterceptorTests: XCTestCase { func testClientInterceptor() async throws { var serviceContext = ServiceContext.topLevel let traceIDString = UUID().uuidString - let interceptor = ClientTracingInterceptor(emitEventOnEachWrite: false) + let interceptor = ClientOTelTracingInterceptor( + serverHostname: "someserver.com", + networkTransportMethod: "tcp" + ) let (stream, continuation) = AsyncStream.makeStream() serviceContext.traceID = traceIDString @@ -45,7 +48,7 @@ final class TracingInterceptorTests: XCTestCase { try await writer.write(contentsOf: ["request1"]) try await writer.write(contentsOf: ["request2"]) }), - context: ClientContext(descriptor: methodDescriptor, remotePeer: "", localPeer: "") + context: self.getClientContext(forMethod: methodDescriptor) ) { stream, _ in // Assert the metadata contains the injected context key-value. XCTAssertEqual(stream.metadata, ["trace-id": "\(traceIDString)"]) @@ -66,30 +69,111 @@ final class TracingInterceptorTests: XCTestCase { ) } - var streamIterator = stream.makeAsyncIterator() - var element = await streamIterator.next() - XCTAssertEqual(element, "request1") - element = await streamIterator.next() - XCTAssertEqual(element, "request2") - element = await streamIterator.next() - XCTAssertNil(element) - - var messages = response.messages.makeAsyncIterator() - var message = try await messages.next() - XCTAssertEqual(message, ["response"]) - message = try await messages.next() - XCTAssertNil(message) - - let tracer = InstrumentationSystem.tracer as! TestTracer - XCTAssertEqual( - tracer.getEventsForTestSpan(ofOperationName: methodDescriptor.fullyQualifiedMethod).map { - $0.name - }, - [ + await AssertStreamContentsEqual(["request1", "request2"], stream) + try await AssertStreamContentsEqual([["response"]], response.messages) + + AssertTestSpanComponents(forMethod: methodDescriptor) { events in + XCTAssertEqual(events.map({ $0.name }), [ "Request started", - "Received response end", - ] + "Request ended", + "Received response start", + "Received response end" + ]) + } assertAttributes: { attributes in + XCTAssertEqual(attributes, [ + "rpc.system": .string("grpc"), + "rpc.method": .string(methodDescriptor.method), + "rpc.service": .string(methodDescriptor.service.fullyQualifiedService), + "rpc.grpc.status_code": .int(0), + "server.address": .string("someserver.com"), + "server.port": .int(567), + "network.peer.address": .string("10.1.2.80"), + "network.peer.port": .int(567), + "network.transport": .string("tcp"), + "network.type": .string("ipv4") + ]) + } assertStatus: { status in + XCTAssertNil(status) + } assertErrors: { errors in + XCTAssertEqual(errors, []) + } + } + } + + func testClientInterceptor_UDS() async throws { + var serviceContext = ServiceContext.topLevel + let traceIDString = UUID().uuidString + let interceptor = ClientOTelTracingInterceptor( + serverHostname: "someserver.com", + networkTransportMethod: "tcp" + ) + let (stream, continuation) = AsyncStream.makeStream() + serviceContext.traceID = traceIDString + + // FIXME: use 'ServiceContext.withValue(serviceContext)' + // + // This is blocked on: https://github.com/apple/swift-service-context/pull/46 + try await ServiceContext.$current.withValue(serviceContext) { + let methodDescriptor = MethodDescriptor( + fullyQualifiedService: "TracingInterceptorTests", + method: "testClientInterceptor" ) + let response = try await interceptor.intercept( + request: .init(producer: { writer in + try await writer.write(contentsOf: ["request1"]) + try await writer.write(contentsOf: ["request2"]) + }), + context: ClientContext( + descriptor: methodDescriptor, + remotePeer: "unix:some-path", + localPeer: "unix:some-path" + ) + ) { stream, _ in + // Assert the metadata contains the injected context key-value. + XCTAssertEqual(stream.metadata, ["trace-id": "\(traceIDString)"]) + + // Write into the response stream to make sure the `producer` closure's called. + let writer = RPCWriter(wrapping: TestWriter(streamContinuation: continuation)) + try await stream.producer(writer) + continuation.finish() + + return .init( + metadata: [], + bodyParts: RPCAsyncSequence( + wrapping: AsyncThrowingStream { + $0.yield(.message(["response"])) + $0.finish() + } + ) + ) + } + + await AssertStreamContentsEqual(["request1", "request2"], stream) + try await AssertStreamContentsEqual([["response"]], response.messages) + + AssertTestSpanComponents(forMethod: methodDescriptor) { events in + XCTAssertEqual(events.map({ $0.name }), [ + "Request started", + "Request ended", + "Received response start", + "Received response end" + ]) + } assertAttributes: { attributes in + XCTAssertEqual(attributes, [ + "rpc.system": .string("grpc"), + "rpc.method": .string(methodDescriptor.method), + "rpc.service": .string(methodDescriptor.service.fullyQualifiedService), + "rpc.grpc.status_code": .int(0), + "server.address": .string("someserver.com"), + "network.peer.address": .string("some-path"), + "network.transport": .string("tcp"), + "network.type": .string("unix") + ]) + } assertStatus: { status in + XCTAssertNil(status) + } assertErrors: { errors in + XCTAssertEqual(errors, []) + } } } @@ -100,7 +184,11 @@ final class TracingInterceptorTests: XCTestCase { ) var serviceContext = ServiceContext.topLevel let traceIDString = UUID().uuidString - let interceptor = ClientTracingInterceptor(emitEventOnEachWrite: true) + let interceptor = ClientOTelTracingInterceptor( + serverHostname: "someserver.com", + networkTransportMethod: "tcp", + emitEventOnEachWrite: true + ) let (stream, continuation) = AsyncStream.makeStream() serviceContext.traceID = traceIDString @@ -113,7 +201,7 @@ final class TracingInterceptorTests: XCTestCase { try await writer.write(contentsOf: ["request1"]) try await writer.write(contentsOf: ["request2"]) }), - context: ClientContext(descriptor: methodDescriptor, remotePeer: "", localPeer: "") + context: self.getClientContext(forMethod: methodDescriptor) ) { stream, _ in // Assert the metadata contains the injected context key-value. XCTAssertEqual(stream.metadata, ["trace-id": "\(traceIDString)"]) @@ -134,41 +222,185 @@ final class TracingInterceptorTests: XCTestCase { ) } - var streamIterator = stream.makeAsyncIterator() - var element = await streamIterator.next() - XCTAssertEqual(element, "request1") - element = await streamIterator.next() - XCTAssertEqual(element, "request2") - element = await streamIterator.next() - XCTAssertNil(element) - - var messages = response.messages.makeAsyncIterator() - var message = try await messages.next() - XCTAssertEqual(message, ["response"]) - message = try await messages.next() - XCTAssertNil(message) - - let tracer = InstrumentationSystem.tracer as! TestTracer - XCTAssertEqual( - tracer.getEventsForTestSpan(ofOperationName: methodDescriptor.fullyQualifiedMethod).map { - $0.name - }, - [ - "Request started", + await AssertStreamContentsEqual(["request1", "request2"], stream) + try await AssertStreamContentsEqual([["response"]], response.messages) + + AssertTestSpanComponents(forMethod: methodDescriptor) { events in + XCTAssertEqual(events, [ + TestSpanEvent("Request started", [:]), // Recorded when `request1` is sent - "Sending request part", - "Sent request part", + TestSpanEvent("rpc.message", ["rpc.message.type": "SENT", "rpc.message.id": 1]), // Recorded when `request2` is sent - "Sending request part", - "Sent request part", + TestSpanEvent("rpc.message", ["rpc.message.type": "SENT", "rpc.message.id": 2]), // Recorded after all request parts have been sent - "Request end", + TestSpanEvent("Request ended", [:]), // Recorded when receiving response part - "Received response part", + TestSpanEvent("Received response start", [:]), + TestSpanEvent("rpc.message", ["rpc.message.type": "RECEIVED", "rpc.message.id": 1]), // Recorded at end of response - "Received response end", - ] + TestSpanEvent("Received response end", [:]), + ]) + } assertAttributes: { attributes in + XCTAssertEqual(attributes, [ + "rpc.system": .string("grpc"), + "rpc.method": .string(methodDescriptor.method), + "rpc.service": .string(methodDescriptor.service.fullyQualifiedService), + "rpc.grpc.status_code": .int(0), + "server.address": .string("someserver.com"), + "server.port": .int(567), + "network.peer.address": .string("10.1.2.80"), + "network.peer.port": .int(567), + "network.transport": .string("tcp"), + "network.type": .string("ipv4") + ]) + } assertStatus: { status in + XCTAssertNil(status) + } assertErrors: { errors in + XCTAssertEqual(errors, []) + } + } + } + + func testClientInterceptorErrorEncountered() async throws { + var serviceContext = ServiceContext.topLevel + let traceIDString = UUID().uuidString + let interceptor = ClientOTelTracingInterceptor( + serverHostname: "someserver.com", + networkTransportMethod: "tcp" + ) + let (_, continuation) = AsyncStream.makeStream() + serviceContext.traceID = traceIDString + + // FIXME: use 'ServiceContext.withValue(serviceContext)' + // + // This is blocked on: https://github.com/apple/swift-service-context/pull/46 + await ServiceContext.$current.withValue(serviceContext) { + let methodDescriptor = MethodDescriptor( + fullyQualifiedService: "TracingInterceptorTests", + method: "testClientInterceptorErrorEncountered" ) + do { + _ = try await interceptor.intercept( + request: .init(producer: { writer in + try await writer.write("request") + throw TracingInterceptorTestError.testError + }), + context: self.getClientContext(forMethod: methodDescriptor) + ) { stream, _ in + // Assert the metadata contains the injected context key-value. + XCTAssertEqual(stream.metadata, ["trace-id": "\(traceIDString)"]) + + // Write into the response stream to make sure the `producer` closure's called. + let writer = RPCWriter(wrapping: TestWriter(streamContinuation: continuation)) + try await stream.producer(writer) + continuation.finish() + + return .init( + metadata: [], + bodyParts: RPCAsyncSequence( + wrapping: AsyncThrowingStream { + $0.yield(.message(["response"])) + $0.finish() + } + ) + ) + } + XCTFail("Should have thrown") + } catch { + AssertTestSpanComponents(forMethod: methodDescriptor) { events in + // We errored when writing our request, so this is the only event we should be seeing + XCTAssertEqual(events.map({ $0.name }), ["Request started"]) + } assertAttributes: { attributes in + // The attributes should not contain a grpc status code, as the request was never even sent. + XCTAssertEqual(attributes, [ + "rpc.system": .string("grpc"), + "rpc.method": .string(methodDescriptor.method), + "rpc.service": .string(methodDescriptor.service.fullyQualifiedService), + "server.address": .string("someserver.com"), + "server.port": .int(567), + "network.peer.address": .string("10.1.2.80"), + "network.peer.port": .int(567), + "network.transport": .string("tcp"), + "network.type": .string("ipv4") + ]) + } assertStatus: { status in + XCTAssertNil(status) + } assertErrors: { errors in + XCTAssertEqual(errors, [.testError]) + } + } + } + } + + func testClientInterceptorErrorReponse() async throws { + var serviceContext = ServiceContext.topLevel + let traceIDString = UUID().uuidString + let interceptor = ClientOTelTracingInterceptor( + serverHostname: "someserver.com", + networkTransportMethod: "tcp" + ) + let (stream, continuation) = AsyncStream.makeStream() + serviceContext.traceID = traceIDString + + // FIXME: use 'ServiceContext.withValue(serviceContext)' + // + // This is blocked on: https://github.com/apple/swift-service-context/pull/46 + try await ServiceContext.$current.withValue(serviceContext) { + let methodDescriptor = MethodDescriptor( + fullyQualifiedService: "TracingInterceptorTests", + method: "testClientInterceptor" + ) + let response: StreamingClientResponse = try await interceptor.intercept( + request: .init(producer: { writer in + try await writer.write(contentsOf: ["request"]) + }), + context: self.getClientContext(forMethod: methodDescriptor) + ) { stream, _ in + // Assert the metadata contains the injected context key-value. + XCTAssertEqual(stream.metadata, ["trace-id": "\(traceIDString)"]) + + // Write into the response stream to make sure the `producer` closure's called. + let writer = RPCWriter(wrapping: TestWriter(streamContinuation: continuation)) + try await stream.producer(writer) + continuation.finish() + + return .init(error: RPCError(code: .unavailable, message: "This should not work")) + } + + await AssertStreamContentsEqual(["request"], stream) + + switch response.accepted { + case .success: + XCTFail("Response should have failed") + + case .failure(let failure): + XCTAssertEqual(failure, RPCError(code: .unavailable, message: "This should not work")) + } + + AssertTestSpanComponents(forMethod: methodDescriptor) { events in + XCTAssertEqual(events.map({ $0.name }), [ + "Request started", + "Request ended", + "Received error response" + ]) + } assertAttributes: { attributes in + XCTAssertEqual(attributes, [ + "rpc.system": .string("grpc"), + "rpc.method": .string(methodDescriptor.method), + "rpc.service": .string(methodDescriptor.service.fullyQualifiedService), + "rpc.grpc.status_code": .int(14), // this is unavailable's raw code + "server.address": .string("someserver.com"), + "server.port": .int(567), + "network.peer.address": .string("10.1.2.80"), + "network.peer.port": .int(567), + "network.transport": .string("tcp"), + "network.type": .string("ipv4") + ]) + } assertStatus: { status in + XCTAssertEqual(status, .some(.init(code: .error))) + } assertErrors: { errors in + XCTAssertEqual(errors.count, 1) + } } } @@ -194,7 +426,7 @@ final class TracingInterceptorTests: XCTestCase { let tracer = InstrumentationSystem.tracer as! TestTracer XCTAssertEqual( - tracer.getEventsForTestSpan(ofOperationName: methodDescriptor.fullyQualifiedMethod).map { + tracer.getEventsForTestSpan(ofOperation: methodDescriptor.fullyQualifiedMethod).map { $0.name }, [ @@ -264,7 +496,7 @@ final class TracingInterceptorTests: XCTestCase { let tracer = InstrumentationSystem.tracer as! TestTracer XCTAssertEqual( - tracer.getEventsForTestSpan(ofOperationName: methodDescriptor.fullyQualifiedMethod).map { + tracer.getEventsForTestSpan(ofOperation: methodDescriptor.fullyQualifiedMethod).map { $0.name }, [ @@ -334,7 +566,7 @@ final class TracingInterceptorTests: XCTestCase { let tracer = InstrumentationSystem.tracer as! TestTracer XCTAssertEqual( - tracer.getEventsForTestSpan(ofOperationName: methodDescriptor.fullyQualifiedMethod).map { + tracer.getEventsForTestSpan(ofOperation: methodDescriptor.fullyQualifiedMethod).map { $0.name }, [ @@ -351,4 +583,53 @@ final class TracingInterceptorTests: XCTestCase { ] ) } + + private func getClientContext(forMethod method: MethodDescriptor) -> ClientContext { + ClientContext( + descriptor: method, + remotePeer: "ipv4:10.1.2.80:567", + localPeer: "ipv6:localhost:1234" + ) + } + + private func getTestSpanForMethod(_ methodDescriptor: MethodDescriptor) -> TestSpan { + let tracer = InstrumentationSystem.tracer as! TestTracer + return tracer.getSpan(ofOperation: methodDescriptor.fullyQualifiedMethod)! + } + + private func AssertTestSpanComponents( + forMethod method: MethodDescriptor, + assertEvents: ([TestSpanEvent]) -> Void, + assertAttributes: (SpanAttributes) -> Void, + assertStatus: (SpanStatus?) -> Void, + assertErrors: ([TracingInterceptorTestError]) -> Void + ) { + let span = self.getTestSpanForMethod(method) + assertEvents(span.events.map({ TestSpanEvent($0) })) + assertAttributes(span.attributes) + assertStatus(span.status) + assertErrors(span.errors) + } + + private func AssertStreamContentsEqual( + _ array: [T], + _ stream: any AsyncSequence + ) async throws { + var streamElements = [T]() + for try await element in stream { + streamElements.append(element) + } + XCTAssertEqual(streamElements, array) + } + + private func AssertStreamContentsEqual( + _ array: [T], + _ stream: any AsyncSequence + ) async { + var streamElements = [T]() + for await element in stream { + streamElements.append(element) + } + XCTAssertEqual(streamElements, array) + } } diff --git a/Tests/GRPCInterceptorsTests/TracingTestsUtilities.swift b/Tests/GRPCInterceptorsTests/TracingTestsUtilities.swift index a5af7df..6b7c868 100644 --- a/Tests/GRPCInterceptorsTests/TracingTestsUtilities.swift +++ b/Tests/GRPCInterceptorsTests/TracingTestsUtilities.swift @@ -23,9 +23,12 @@ final class TestTracer: Tracer { private let testSpans: Mutex<[String: TestSpan]> = .init([:]) - func getEventsForTestSpan(ofOperationName operationName: String) -> [SpanEvent] { - let span = self.testSpans.withLock({ $0[operationName] }) - return span?.events ?? [] + func getSpan(ofOperation operationName: String) -> TestSpan? { + self.testSpans.withLock { $0[operationName] } + } + + func getEventsForTestSpan(ofOperation operationName: String) -> [SpanEvent] { + self.getSpan(ofOperation: operationName)?.events ?? [] } func extract( @@ -75,6 +78,7 @@ final class TestSpan: Span, Sendable { var attributes: Tracing.SpanAttributes var status: Tracing.SpanStatus? var events: [Tracing.SpanEvent] = [] + var errors: [TracingInterceptorTestError] } private let state: Mutex @@ -98,13 +102,26 @@ final class TestSpan: Span, Sendable { self.state.withLock { $0.events } } + var status: SpanStatus? { + self.state.withLock { $0.status } + } + + var errors: [TracingInterceptorTestError] { + self.state.withLock { $0.errors } + } + init( context: ServiceContextModule.ServiceContext, operationName: String, attributes: Tracing.SpanAttributes = [:], isRecording: Bool = true ) { - let state = State(context: context, operationName: operationName, attributes: attributes) + let state = State( + context: context, + operationName: operationName, + attributes: attributes, + errors: [] + ) self.state = Mutex(state) self.isRecording = isRecording } @@ -122,12 +139,8 @@ final class TestSpan: Span, Sendable { attributes: Tracing.SpanAttributes, at instant: @autoclosure () -> Instant ) where Instant: Tracing.TracerInstant { - self.setStatus( - .init( - code: .error, - message: "Error: \(error), attributes: \(attributes), at instant: \(instant())" - ) - ) + // For the purposes of these tests, we don't really care about the error being thrown + self.state.withLock { $0.errors.append(TracingInterceptorTestError.testError) } } func addLink(_ link: Tracing.SpanLink) { @@ -137,7 +150,7 @@ final class TestSpan: Span, Sendable { } func end(at instant: @autoclosure () -> Instant) where Instant: Tracing.TracerInstant { - self.setStatus(.init(code: .ok, message: "Ended at instant: \(instant())")) + // no-op } } @@ -192,3 +205,33 @@ struct TestWriter: RPCWriterProtocol { } } } + +struct TestSpanEvent: Equatable, CustomDebugStringConvertible { + var name: String + var attributes: SpanAttributes + + var debugDescription: String { + var attributesDescription = "" + self.attributes.forEach { key, value in + attributesDescription += " \(key): \(value)," + } + + return """ + (name: \(self.name), attributes: [\(attributesDescription)]) + """ + } + + init(_ name: String, _ attributes: SpanAttributes) { + self.name = name + self.attributes = attributes + } + + init(_ spanEvent: SpanEvent) { + self.name = spanEvent.name + self.attributes = spanEvent.attributes + } +} + +enum TracingInterceptorTestError: Error, Equatable { + case testError +} From 7a9559435f3faf6b25cc8b1bd66ab9959bc8e9ce Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Thu, 16 Jan 2025 16:54:06 +0000 Subject: [PATCH 05/16] Formatting --- .../ClientOTelTracingInterceptor.swift | 8 +- .../SpanAttributes+RPCAttributes.swift | 6 +- .../TracingInterceptorTests.swift | 203 ++++++++++-------- .../TracingTestsUtilities.swift | 4 +- 4 files changed, 125 insertions(+), 96 deletions(-) diff --git a/Sources/GRPCInterceptors/Tracing/ClientOTelTracingInterceptor.swift b/Sources/GRPCInterceptors/Tracing/ClientOTelTracingInterceptor.swift index c379c25..488fa69 100644 --- a/Sources/GRPCInterceptors/Tracing/ClientOTelTracingInterceptor.swift +++ b/Sources/GRPCInterceptors/Tracing/ClientOTelTracingInterceptor.swift @@ -15,8 +15,8 @@ */ public import GRPCCore -internal import Tracing internal import Synchronization +internal import Tracing /// A client interceptor that injects tracing information into the request. /// @@ -96,7 +96,8 @@ public struct ClientOTelTracingInterceptor: ClientInterceptor { afterEachWrite: { var event = SpanEvent(name: "rpc.message") event.attributes.rpc.messageType = "SENT" - event.attributes.rpc.messageID = messageSentCounter + event.attributes.rpc.messageID = + messageSentCounter .wrappingAdd(1, ordering: .sequentiallyConsistent) .oldValue span.addEvent(event) @@ -122,7 +123,8 @@ public struct ClientOTelTracingInterceptor: ClientInterceptor { let onEachPartRecordingSequence = success.bodyParts.map { element in var event = SpanEvent(name: "rpc.message") event.attributes.rpc.messageType = "RECEIVED" - event.attributes.rpc.messageID = messageReceivedCounter + event.attributes.rpc.messageID = + messageReceivedCounter .wrappingAdd(1, ordering: .sequentiallyConsistent) .oldValue span.addEvent(event) diff --git a/Sources/GRPCInterceptors/Tracing/SpanAttributes+RPCAttributes.swift b/Sources/GRPCInterceptors/Tracing/SpanAttributes+RPCAttributes.swift index 1f36613..c998ef6 100644 --- a/Sources/GRPCInterceptors/Tracing/SpanAttributes+RPCAttributes.swift +++ b/Sources/GRPCInterceptors/Tracing/SpanAttributes+RPCAttributes.swift @@ -34,7 +34,7 @@ package struct RPCAttributes: SpanAttributeNamespace { var messageType: Key { "rpc.message.type" } var grpcStatusCode: Key { "rpc.grpc.status_code" } - var serverAddress: Key{ "server.address" } + var serverAddress: Key { "server.address" } var serverPort: Key { "server.port" } var clientAddress: Key { "client.address" } @@ -47,9 +47,9 @@ package struct RPCAttributes: SpanAttributeNamespace { } } -package extension SpanAttributes { +extension SpanAttributes { /// Semantic conventions for RPC spans. - var rpc: RPCAttributes { + package var rpc: RPCAttributes { get { .init(attributes: self) } diff --git a/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift b/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift index 205a1f0..bb759de 100644 --- a/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift +++ b/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift @@ -73,25 +73,31 @@ final class TracingInterceptorTests: XCTestCase { try await AssertStreamContentsEqual([["response"]], response.messages) AssertTestSpanComponents(forMethod: methodDescriptor) { events in - XCTAssertEqual(events.map({ $0.name }), [ - "Request started", - "Request ended", - "Received response start", - "Received response end" - ]) + XCTAssertEqual( + events.map({ $0.name }), + [ + "Request started", + "Request ended", + "Received response start", + "Received response end", + ] + ) } assertAttributes: { attributes in - XCTAssertEqual(attributes, [ - "rpc.system": .string("grpc"), - "rpc.method": .string(methodDescriptor.method), - "rpc.service": .string(methodDescriptor.service.fullyQualifiedService), - "rpc.grpc.status_code": .int(0), - "server.address": .string("someserver.com"), - "server.port": .int(567), - "network.peer.address": .string("10.1.2.80"), - "network.peer.port": .int(567), - "network.transport": .string("tcp"), - "network.type": .string("ipv4") - ]) + XCTAssertEqual( + attributes, + [ + "rpc.system": .string("grpc"), + "rpc.method": .string(methodDescriptor.method), + "rpc.service": .string(methodDescriptor.service.fullyQualifiedService), + "rpc.grpc.status_code": .int(0), + "server.address": .string("someserver.com"), + "server.port": .int(567), + "network.peer.address": .string("10.1.2.80"), + "network.peer.port": .int(567), + "network.transport": .string("tcp"), + "network.type": .string("ipv4"), + ] + ) } assertStatus: { status in XCTAssertNil(status) } assertErrors: { errors in @@ -152,23 +158,29 @@ final class TracingInterceptorTests: XCTestCase { try await AssertStreamContentsEqual([["response"]], response.messages) AssertTestSpanComponents(forMethod: methodDescriptor) { events in - XCTAssertEqual(events.map({ $0.name }), [ - "Request started", - "Request ended", - "Received response start", - "Received response end" - ]) + XCTAssertEqual( + events.map({ $0.name }), + [ + "Request started", + "Request ended", + "Received response start", + "Received response end", + ] + ) } assertAttributes: { attributes in - XCTAssertEqual(attributes, [ - "rpc.system": .string("grpc"), - "rpc.method": .string(methodDescriptor.method), - "rpc.service": .string(methodDescriptor.service.fullyQualifiedService), - "rpc.grpc.status_code": .int(0), - "server.address": .string("someserver.com"), - "network.peer.address": .string("some-path"), - "network.transport": .string("tcp"), - "network.type": .string("unix") - ]) + XCTAssertEqual( + attributes, + [ + "rpc.system": .string("grpc"), + "rpc.method": .string(methodDescriptor.method), + "rpc.service": .string(methodDescriptor.service.fullyQualifiedService), + "rpc.grpc.status_code": .int(0), + "server.address": .string("someserver.com"), + "network.peer.address": .string("some-path"), + "network.transport": .string("tcp"), + "network.type": .string("unix"), + ] + ) } assertStatus: { status in XCTAssertNil(status) } assertErrors: { errors in @@ -226,33 +238,39 @@ final class TracingInterceptorTests: XCTestCase { try await AssertStreamContentsEqual([["response"]], response.messages) AssertTestSpanComponents(forMethod: methodDescriptor) { events in - XCTAssertEqual(events, [ - TestSpanEvent("Request started", [:]), - // Recorded when `request1` is sent - TestSpanEvent("rpc.message", ["rpc.message.type": "SENT", "rpc.message.id": 1]), - // Recorded when `request2` is sent - TestSpanEvent("rpc.message", ["rpc.message.type": "SENT", "rpc.message.id": 2]), - // Recorded after all request parts have been sent - TestSpanEvent("Request ended", [:]), - // Recorded when receiving response part - TestSpanEvent("Received response start", [:]), - TestSpanEvent("rpc.message", ["rpc.message.type": "RECEIVED", "rpc.message.id": 1]), - // Recorded at end of response - TestSpanEvent("Received response end", [:]), - ]) + XCTAssertEqual( + events, + [ + TestSpanEvent("Request started", [:]), + // Recorded when `request1` is sent + TestSpanEvent("rpc.message", ["rpc.message.type": "SENT", "rpc.message.id": 1]), + // Recorded when `request2` is sent + TestSpanEvent("rpc.message", ["rpc.message.type": "SENT", "rpc.message.id": 2]), + // Recorded after all request parts have been sent + TestSpanEvent("Request ended", [:]), + // Recorded when receiving response part + TestSpanEvent("Received response start", [:]), + TestSpanEvent("rpc.message", ["rpc.message.type": "RECEIVED", "rpc.message.id": 1]), + // Recorded at end of response + TestSpanEvent("Received response end", [:]), + ] + ) } assertAttributes: { attributes in - XCTAssertEqual(attributes, [ - "rpc.system": .string("grpc"), - "rpc.method": .string(methodDescriptor.method), - "rpc.service": .string(methodDescriptor.service.fullyQualifiedService), - "rpc.grpc.status_code": .int(0), - "server.address": .string("someserver.com"), - "server.port": .int(567), - "network.peer.address": .string("10.1.2.80"), - "network.peer.port": .int(567), - "network.transport": .string("tcp"), - "network.type": .string("ipv4") - ]) + XCTAssertEqual( + attributes, + [ + "rpc.system": .string("grpc"), + "rpc.method": .string(methodDescriptor.method), + "rpc.service": .string(methodDescriptor.service.fullyQualifiedService), + "rpc.grpc.status_code": .int(0), + "server.address": .string("someserver.com"), + "server.port": .int(567), + "network.peer.address": .string("10.1.2.80"), + "network.peer.port": .int(567), + "network.transport": .string("tcp"), + "network.type": .string("ipv4"), + ] + ) } assertStatus: { status in XCTAssertNil(status) } assertErrors: { errors in @@ -312,17 +330,20 @@ final class TracingInterceptorTests: XCTestCase { XCTAssertEqual(events.map({ $0.name }), ["Request started"]) } assertAttributes: { attributes in // The attributes should not contain a grpc status code, as the request was never even sent. - XCTAssertEqual(attributes, [ - "rpc.system": .string("grpc"), - "rpc.method": .string(methodDescriptor.method), - "rpc.service": .string(methodDescriptor.service.fullyQualifiedService), - "server.address": .string("someserver.com"), - "server.port": .int(567), - "network.peer.address": .string("10.1.2.80"), - "network.peer.port": .int(567), - "network.transport": .string("tcp"), - "network.type": .string("ipv4") - ]) + XCTAssertEqual( + attributes, + [ + "rpc.system": .string("grpc"), + "rpc.method": .string(methodDescriptor.method), + "rpc.service": .string(methodDescriptor.service.fullyQualifiedService), + "server.address": .string("someserver.com"), + "server.port": .int(567), + "network.peer.address": .string("10.1.2.80"), + "network.peer.port": .int(567), + "network.transport": .string("tcp"), + "network.type": .string("ipv4"), + ] + ) } assertStatus: { status in XCTAssertNil(status) } assertErrors: { errors in @@ -378,24 +399,30 @@ final class TracingInterceptorTests: XCTestCase { } AssertTestSpanComponents(forMethod: methodDescriptor) { events in - XCTAssertEqual(events.map({ $0.name }), [ - "Request started", - "Request ended", - "Received error response" - ]) + XCTAssertEqual( + events.map({ $0.name }), + [ + "Request started", + "Request ended", + "Received error response", + ] + ) } assertAttributes: { attributes in - XCTAssertEqual(attributes, [ - "rpc.system": .string("grpc"), - "rpc.method": .string(methodDescriptor.method), - "rpc.service": .string(methodDescriptor.service.fullyQualifiedService), - "rpc.grpc.status_code": .int(14), // this is unavailable's raw code - "server.address": .string("someserver.com"), - "server.port": .int(567), - "network.peer.address": .string("10.1.2.80"), - "network.peer.port": .int(567), - "network.transport": .string("tcp"), - "network.type": .string("ipv4") - ]) + XCTAssertEqual( + attributes, + [ + "rpc.system": .string("grpc"), + "rpc.method": .string(methodDescriptor.method), + "rpc.service": .string(methodDescriptor.service.fullyQualifiedService), + "rpc.grpc.status_code": .int(14), // this is unavailable's raw code + "server.address": .string("someserver.com"), + "server.port": .int(567), + "network.peer.address": .string("10.1.2.80"), + "network.peer.port": .int(567), + "network.transport": .string("tcp"), + "network.type": .string("ipv4"), + ] + ) } assertStatus: { status in XCTAssertEqual(status, .some(.init(code: .error))) } assertErrors: { errors in diff --git a/Tests/GRPCInterceptorsTests/TracingTestsUtilities.swift b/Tests/GRPCInterceptorsTests/TracingTestsUtilities.swift index 6b7c868..a6b64b3 100644 --- a/Tests/GRPCInterceptorsTests/TracingTestsUtilities.swift +++ b/Tests/GRPCInterceptorsTests/TracingTestsUtilities.swift @@ -217,8 +217,8 @@ struct TestSpanEvent: Equatable, CustomDebugStringConvertible { } return """ - (name: \(self.name), attributes: [\(attributesDescription)]) - """ + (name: \(self.name), attributes: [\(attributesDescription)]) + """ } init(_ name: String, _ attributes: SpanAttributes) { From dd7bc2319ef9735e8b93b727ba17c7fc86d8c2c8 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Mon, 20 Jan 2025 16:05:46 +0000 Subject: [PATCH 06/16] Handle response failures + do not record custom events --- .../HookedAsyncSequence.swift | 95 +++++++++++++++++++ .../OnFinishAsyncSequence.swift | 56 ----------- .../ClientOTelTracingInterceptor.swift | 69 +++++++------- .../TracingInterceptorTests.swift | 58 ++++------- 4 files changed, 146 insertions(+), 132 deletions(-) create mode 100644 Sources/GRPCInterceptors/HookedAsyncSequence.swift delete mode 100644 Sources/GRPCInterceptors/OnFinishAsyncSequence.swift diff --git a/Sources/GRPCInterceptors/HookedAsyncSequence.swift b/Sources/GRPCInterceptors/HookedAsyncSequence.swift new file mode 100644 index 0000000..83de5e0 --- /dev/null +++ b/Sources/GRPCInterceptors/HookedAsyncSequence.swift @@ -0,0 +1,95 @@ +/* + * Copyright 2025, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +internal struct HookedRPCAsyncSequence: AsyncSequence, Sendable where Wrapped.Element: Sendable { + private let wrapped: Wrapped + + private let forEachElement: @Sendable (Wrapped.Element) -> Void + private let onFinish: @Sendable () -> Void + private let onFailure: @Sendable (any Error) -> Void + + init( + wrapping sequence: Wrapped, + forEachElement: @escaping @Sendable (Wrapped.Element) -> Void, + onFinish: @escaping @Sendable () -> Void, + onFailure: @escaping @Sendable (any Error) -> Void + ) { + self.wrapped = sequence + self.forEachElement = forEachElement + self.onFinish = onFinish + self.onFailure = onFailure + } + + func makeAsyncIterator() -> HookedAsyncIterator { + HookedAsyncIterator( + self.wrapped, + forEachElement: self.forEachElement, + onFinish: self.onFinish, + onFailure: self.onFailure + ) + } + + struct HookedAsyncIterator: AsyncIteratorProtocol { + typealias Element = Wrapped.Element + + private var wrapped: Wrapped.AsyncIterator + private let forEachElement: @Sendable (Wrapped.Element) -> Void + private let onFinish: @Sendable () -> Void + private let onFailure: @Sendable (any Error) -> Void + + init( + _ sequence: Wrapped, + forEachElement: @escaping @Sendable (Wrapped.Element) -> Void, + onFinish: @escaping @Sendable () -> Void, + onFailure: @escaping @Sendable (any Error) -> Void + ) { + self.wrapped = sequence.makeAsyncIterator() + self.forEachElement = forEachElement + self.onFinish = onFinish + self.onFailure = onFailure + } + + mutating func next(isolation actor: isolated (any Actor)?) async throws(Wrapped.Failure) -> Wrapped.Element? { + do { + if let element = try await self.wrapped.next(isolation: actor) { + self.forEachElement(element) + return element + } + + self.onFinish() + return nil + } catch { + self.onFailure(error) + throw error + } + } + + mutating func next() async throws -> Wrapped.Element? { + do { + if let element = try await self.wrapped.next() { + self.forEachElement(element) + return element + } + + self.onFinish() + return nil + } catch { + self.onFailure(error) + throw error + } + } + } +} diff --git a/Sources/GRPCInterceptors/OnFinishAsyncSequence.swift b/Sources/GRPCInterceptors/OnFinishAsyncSequence.swift deleted file mode 100644 index f7a8f64..0000000 --- a/Sources/GRPCInterceptors/OnFinishAsyncSequence.swift +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright 2024, gRPC Authors All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -struct OnFinishAsyncSequence: AsyncSequence, Sendable { - private let _makeAsyncIterator: @Sendable () -> AsyncIterator - - init( - wrapping other: S, - onFinish: @escaping @Sendable () -> Void - ) where S.Element == Element, S: Sendable { - self._makeAsyncIterator = { - AsyncIterator(wrapping: other.makeAsyncIterator(), onFinish: onFinish) - } - } - - func makeAsyncIterator() -> AsyncIterator { - self._makeAsyncIterator() - } - - struct AsyncIterator: AsyncIteratorProtocol { - private var iterator: any AsyncIteratorProtocol - private var onFinish: (@Sendable () -> Void)? - - fileprivate init( - wrapping other: Iterator, - onFinish: @escaping @Sendable () -> Void - ) where Iterator: AsyncIteratorProtocol, Iterator.Element == Element { - self.iterator = other - self.onFinish = onFinish - } - - mutating func next() async throws -> Element? { - let elem = try await self.iterator.next() - - if elem == nil { - self.onFinish?() - self.onFinish = nil - } - - return elem as? Element - } - } -} diff --git a/Sources/GRPCInterceptors/Tracing/ClientOTelTracingInterceptor.swift b/Sources/GRPCInterceptors/Tracing/ClientOTelTracingInterceptor.swift index 488fa69..f8badf7 100644 --- a/Sources/GRPCInterceptors/Tracing/ClientOTelTracingInterceptor.swift +++ b/Sources/GRPCInterceptors/Tracing/ClientOTelTracingInterceptor.swift @@ -26,7 +26,7 @@ internal import Tracing /// For more information, refer to the documentation for `swift-distributed-tracing`. public struct ClientOTelTracingInterceptor: ClientInterceptor { private let injector: ClientRequestInjector - private let emitEventOnEachWrite: Bool + private let traceEachMessage: Bool private var serverHostname: String private var networkTransportMethod: String @@ -36,17 +36,17 @@ public struct ClientOTelTracingInterceptor: ClientInterceptor { /// - severHostname: The hostname of the RPC server. This will be the value for the `server.address` attribute in spans. /// - networkTransportMethod: The transport in use (e.g. "tcp", "udp"). This will be the value for the /// `network.transport` attribute in spans. - /// - emitEventOnEachWrite: If `true`, each request part sent and response part received will be recorded as a separate + /// - traceEachMessage: If `true`, each request part sent and response part received will be recorded as a separate /// event in a tracing span. Otherwise, only the request/response start and end will be recorded as events. public init( serverHostname: String, networkTransportMethod: String, - emitEventOnEachWrite: Bool = false + traceEachMessage: Bool = true ) { self.injector = ClientRequestInjector() self.serverHostname = serverHostname self.networkTransportMethod = networkTransportMethod - self.emitEventOnEachWrite = emitEventOnEachWrite + self.traceEachMessage = traceEachMessage } /// This interceptor will inject as the request's metadata whatever `ServiceContext` key-value pairs @@ -84,10 +84,8 @@ public struct ClientOTelTracingInterceptor: ClientInterceptor { ) { span in self.setOTelSpanAttributes(into: span, context: context) - span.addEvent("Request started") - - let wrappedProducer = request.producer - if self.emitEventOnEachWrite { + if self.traceEachMessage { + let wrappedProducer = request.producer request.producer = { writer in let messageSentCounter = Atomic(1) let eventEmittingWriter = HookedWriter( @@ -104,54 +102,53 @@ public struct ClientOTelTracingInterceptor: ClientInterceptor { } ) try await wrappedProducer(RPCWriter(wrapping: eventEmittingWriter)) - span.addEvent("Request ended") - } - } else { - request.producer = { writer in - try await wrappedProducer(RPCWriter(wrapping: writer)) - span.addEvent("Request ended") } } var response = try await next(request, context) switch response.accepted { case .success(var success): - span.addEvent("Received response start") - span.attributes.rpc.grpcStatusCode = 0 - if self.emitEventOnEachWrite { + let hookedSequence: HookedRPCAsyncSequence< + RPCAsyncSequence.Contents.BodyPart, any Error> + > + if self.traceEachMessage { let messageReceivedCounter = Atomic(1) - let onEachPartRecordingSequence = success.bodyParts.map { element in + hookedSequence = HookedRPCAsyncSequence(wrapping: success.bodyParts) { _ in var event = SpanEvent(name: "rpc.message") event.attributes.rpc.messageType = "RECEIVED" - event.attributes.rpc.messageID = - messageReceivedCounter + event.attributes.rpc.messageID = messageReceivedCounter .wrappingAdd(1, ordering: .sequentiallyConsistent) .oldValue span.addEvent(event) - return element - } - - let onFinishRecordingSequence = OnFinishAsyncSequence( - wrapping: onEachPartRecordingSequence - ) { - span.addEvent("Received response end") + } onFinish: { + span.attributes.rpc.grpcStatusCode = 0 + } onFailure: { error in + if let rpcError = error as? RPCError { + span.attributes.rpc.grpcStatusCode = rpcError.code.rawValue + } + span.setStatus(SpanStatus(code: .error)) + span.recordError(error) } - - success.bodyParts = RPCAsyncSequence(wrapping: onFinishRecordingSequence) - response.accepted = .success(success) } else { - let onFinishRecordingSequence = OnFinishAsyncSequence(wrapping: success.bodyParts) { - span.addEvent("Received response end") + hookedSequence = HookedRPCAsyncSequence(wrapping: success.bodyParts) { _ in + // Nothing to do if traceEachMessage is false + } onFinish: { + span.attributes.rpc.grpcStatusCode = 0 + } onFailure: { error in + if let rpcError = error as? RPCError { + span.attributes.rpc.grpcStatusCode = rpcError.code.rawValue + } + span.setStatus(SpanStatus(code: .error)) + span.recordError(error) } - - success.bodyParts = RPCAsyncSequence(wrapping: onFinishRecordingSequence) - response.accepted = .success(success) } + success.bodyParts = RPCAsyncSequence(wrapping: hookedSequence) + response.accepted = .success(success) + case .failure(let error): span.attributes.rpc.grpcStatusCode = error.code.rawValue span.setStatus(SpanStatus(code: .error)) - span.addEvent("Received error response") span.recordError(error) } diff --git a/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift b/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift index bb759de..b6dcc02 100644 --- a/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift +++ b/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift @@ -30,7 +30,8 @@ final class TracingInterceptorTests: XCTestCase { let traceIDString = UUID().uuidString let interceptor = ClientOTelTracingInterceptor( serverHostname: "someserver.com", - networkTransportMethod: "tcp" + networkTransportMethod: "tcp", + traceEachMessage: false ) let (stream, continuation) = AsyncStream.makeStream() serviceContext.traceID = traceIDString @@ -73,15 +74,8 @@ final class TracingInterceptorTests: XCTestCase { try await AssertStreamContentsEqual([["response"]], response.messages) AssertTestSpanComponents(forMethod: methodDescriptor) { events in - XCTAssertEqual( - events.map({ $0.name }), - [ - "Request started", - "Request ended", - "Received response start", - "Received response end", - ] - ) + // No events are recorded + XCTAssertTrue(events.isEmpty) } assertAttributes: { attributes in XCTAssertEqual( attributes, @@ -111,7 +105,8 @@ final class TracingInterceptorTests: XCTestCase { let traceIDString = UUID().uuidString let interceptor = ClientOTelTracingInterceptor( serverHostname: "someserver.com", - networkTransportMethod: "tcp" + networkTransportMethod: "tcp", + traceEachMessage: false ) let (stream, continuation) = AsyncStream.makeStream() serviceContext.traceID = traceIDString @@ -158,15 +153,8 @@ final class TracingInterceptorTests: XCTestCase { try await AssertStreamContentsEqual([["response"]], response.messages) AssertTestSpanComponents(forMethod: methodDescriptor) { events in - XCTAssertEqual( - events.map({ $0.name }), - [ - "Request started", - "Request ended", - "Received response start", - "Received response end", - ] - ) + // No events are recorded + XCTAssertTrue(events.isEmpty) } assertAttributes: { attributes in XCTAssertEqual( attributes, @@ -199,7 +187,7 @@ final class TracingInterceptorTests: XCTestCase { let interceptor = ClientOTelTracingInterceptor( serverHostname: "someserver.com", networkTransportMethod: "tcp", - emitEventOnEachWrite: true + traceEachMessage: true ) let (stream, continuation) = AsyncStream.makeStream() serviceContext.traceID = traceIDString @@ -241,18 +229,12 @@ final class TracingInterceptorTests: XCTestCase { XCTAssertEqual( events, [ - TestSpanEvent("Request started", [:]), // Recorded when `request1` is sent TestSpanEvent("rpc.message", ["rpc.message.type": "SENT", "rpc.message.id": 1]), // Recorded when `request2` is sent TestSpanEvent("rpc.message", ["rpc.message.type": "SENT", "rpc.message.id": 2]), - // Recorded after all request parts have been sent - TestSpanEvent("Request ended", [:]), // Recorded when receiving response part - TestSpanEvent("Received response start", [:]), - TestSpanEvent("rpc.message", ["rpc.message.type": "RECEIVED", "rpc.message.id": 1]), - // Recorded at end of response - TestSpanEvent("Received response end", [:]), + TestSpanEvent("rpc.message", ["rpc.message.type": "RECEIVED", "rpc.message.id": 1]) ] ) } assertAttributes: { attributes in @@ -284,7 +266,8 @@ final class TracingInterceptorTests: XCTestCase { let traceIDString = UUID().uuidString let interceptor = ClientOTelTracingInterceptor( serverHostname: "someserver.com", - networkTransportMethod: "tcp" + networkTransportMethod: "tcp", + traceEachMessage: false ) let (_, continuation) = AsyncStream.makeStream() serviceContext.traceID = traceIDString @@ -326,8 +309,8 @@ final class TracingInterceptorTests: XCTestCase { XCTFail("Should have thrown") } catch { AssertTestSpanComponents(forMethod: methodDescriptor) { events in - // We errored when writing our request, so this is the only event we should be seeing - XCTAssertEqual(events.map({ $0.name }), ["Request started"]) + // No events are recorded + XCTAssertTrue(events.isEmpty) } assertAttributes: { attributes in // The attributes should not contain a grpc status code, as the request was never even sent. XCTAssertEqual( @@ -358,7 +341,8 @@ final class TracingInterceptorTests: XCTestCase { let traceIDString = UUID().uuidString let interceptor = ClientOTelTracingInterceptor( serverHostname: "someserver.com", - networkTransportMethod: "tcp" + networkTransportMethod: "tcp", + traceEachMessage: false ) let (stream, continuation) = AsyncStream.makeStream() serviceContext.traceID = traceIDString @@ -399,14 +383,8 @@ final class TracingInterceptorTests: XCTestCase { } AssertTestSpanComponents(forMethod: methodDescriptor) { events in - XCTAssertEqual( - events.map({ $0.name }), - [ - "Request started", - "Request ended", - "Received error response", - ] - ) + // No events are recorded + XCTAssertTrue(events.isEmpty) } assertAttributes: { attributes in XCTAssertEqual( attributes, From a2adc5eaf9cac80994954ced26241b8d274bdb03 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Mon, 20 Jan 2025 16:23:36 +0000 Subject: [PATCH 07/16] Add PeerAddress and refactor span attributes --- Sources/GRPCInterceptors/HookedWriter.swift | 5 - .../ClientOTelTracingInterceptor.swift | 34 +--- .../Tracing/ServerTracingInterceptor.swift | 3 - .../SpanAttributes+RPCAttributes.swift | 149 ++++++++++++++- .../TracingInterceptorTests.swift | 174 +++++++++++++----- 5 files changed, 280 insertions(+), 85 deletions(-) diff --git a/Sources/GRPCInterceptors/HookedWriter.swift b/Sources/GRPCInterceptors/HookedWriter.swift index b93c491..1baec5b 100644 --- a/Sources/GRPCInterceptors/HookedWriter.swift +++ b/Sources/GRPCInterceptors/HookedWriter.swift @@ -18,27 +18,22 @@ internal import Tracing struct HookedWriter: RPCWriterProtocol { private let writer: any RPCWriterProtocol - private let beforeEachWrite: @Sendable () -> Void private let afterEachWrite: @Sendable () -> Void init( wrapping other: some RPCWriterProtocol, - beforeEachWrite: @Sendable @escaping () -> Void, afterEachWrite: @Sendable @escaping () -> Void ) { self.writer = other - self.beforeEachWrite = beforeEachWrite self.afterEachWrite = afterEachWrite } func write(_ element: Element) async throws { - self.beforeEachWrite() try await self.writer.write(element) self.afterEachWrite() } func write(contentsOf elements: some Sequence) async throws { - self.beforeEachWrite() try await self.writer.write(contentsOf: elements) self.afterEachWrite() } diff --git a/Sources/GRPCInterceptors/Tracing/ClientOTelTracingInterceptor.swift b/Sources/GRPCInterceptors/Tracing/ClientOTelTracingInterceptor.swift index f8badf7..e3c1dd3 100644 --- a/Sources/GRPCInterceptors/Tracing/ClientOTelTracingInterceptor.swift +++ b/Sources/GRPCInterceptors/Tracing/ClientOTelTracingInterceptor.swift @@ -1,5 +1,5 @@ /* - * Copyright 2024, gRPC Authors All rights reserved. + * Copyright 2024-2025, gRPC Authors All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -82,7 +82,11 @@ public struct ClientOTelTracingInterceptor: ClientInterceptor { context: serviceContext, ofKind: .client ) { span in - self.setOTelSpanAttributes(into: span, context: context) + span.setOTelClientSpanGRPCAttributes( + context: context, + serverHostname: self.serverHostname, + networkTransportMethod: self.networkTransportMethod + ) if self.traceEachMessage { let wrappedProducer = request.producer @@ -90,7 +94,6 @@ public struct ClientOTelTracingInterceptor: ClientInterceptor { let messageSentCounter = Atomic(1) let eventEmittingWriter = HookedWriter( wrapping: writer, - beforeEachWrite: {}, afterEachWrite: { var event = SpanEvent(name: "rpc.message") event.attributes.rpc.messageType = "SENT" @@ -155,31 +158,6 @@ public struct ClientOTelTracingInterceptor: ClientInterceptor { return response } } - - private func setOTelSpanAttributes(into span: any Span, context: ClientContext) { - span.attributes.rpc.system = "grpc" - span.attributes.rpc.service = context.descriptor.service.fullyQualifiedService - span.attributes.rpc.method = context.descriptor.method - span.attributes.rpc.serverAddress = self.serverHostname - span.attributes.rpc.networkTransport = self.networkTransportMethod - - let peer = context.remotePeer - // We expect this address to be of either of these two formats: - // - :: for ipv4 and ipv6 addresses - // - unix: for UNIX domain sockets - let components = peer.split(separator: ":") - if components.count == 2 { - // This is the UDS case - span.attributes.rpc.networkType = String(components[0]) - span.attributes.rpc.networkPeerAddress = String(components[1]) - } else if components.count == 3 { - // This is the ipv4 or ipv6 case - span.attributes.rpc.networkType = String(components[0]) - span.attributes.rpc.networkPeerAddress = String(components[1]) - span.attributes.rpc.networkPeerPort = Int(components[2]) - span.attributes.rpc.serverPort = Int(components[2]) - } - } } /// An injector responsible for injecting the required instrumentation keys from the `ServiceContext` into diff --git a/Sources/GRPCInterceptors/Tracing/ServerTracingInterceptor.swift b/Sources/GRPCInterceptors/Tracing/ServerTracingInterceptor.swift index 0df9094..eeaea16 100644 --- a/Sources/GRPCInterceptors/Tracing/ServerTracingInterceptor.swift +++ b/Sources/GRPCInterceptors/Tracing/ServerTracingInterceptor.swift @@ -90,9 +90,6 @@ public struct ServerTracingInterceptor: ServerInterceptor { success.producer = { writer in let eventEmittingWriter = HookedWriter( wrapping: writer, - beforeEachWrite: { - span.addEvent("Sending response part") - }, afterEachWrite: { span.addEvent("Sent response part") } diff --git a/Sources/GRPCInterceptors/Tracing/SpanAttributes+RPCAttributes.swift b/Sources/GRPCInterceptors/Tracing/SpanAttributes+RPCAttributes.swift index c998ef6..dc95e4d 100644 --- a/Sources/GRPCInterceptors/Tracing/SpanAttributes+RPCAttributes.swift +++ b/Sources/GRPCInterceptors/Tracing/SpanAttributes+RPCAttributes.swift @@ -14,7 +14,8 @@ * limitations under the License. */ -import Tracing +internal import Tracing +internal import GRPCCore @dynamicMemberLookup package struct RPCAttributes: SpanAttributeNamespace { @@ -58,3 +59,149 @@ extension SpanAttributes { } } } + +extension Span { + // See: https://opentelemetry.io/docs/specs/semconv/rpc/rpc-spans/ + func setOTelClientSpanGRPCAttributes( + context: ClientContext, + serverHostname: String, + networkTransportMethod: String + ) { + self.attributes.rpc.system = "grpc" + self.attributes.rpc.serverAddress = serverHostname + self.attributes.rpc.networkTransport = networkTransportMethod + self.attributes.rpc.service = context.descriptor.service.fullyQualifiedService + self.attributes.rpc.method = context.descriptor.method + + // Set server address information + switch PeerAddress(context.remotePeer) { + case .ipv4(let address, let port): + self.attributes.rpc.networkType = "ipv4" + self.attributes.rpc.networkPeerAddress = address + self.attributes.rpc.networkPeerPort = port + self.attributes.rpc.serverPort = port + + case .ipv6(let address, let port): + self.attributes.rpc.networkType = "ipv6" + self.attributes.rpc.networkPeerAddress = address + self.attributes.rpc.networkPeerPort = port + self.attributes.rpc.serverPort = port + + case .unixDomainSocket(let path): + self.attributes.rpc.networkType = "unix" + self.attributes.rpc.networkPeerAddress = path + + case .other(let address): + // We can't nicely format the span attributes to contain the appropriate information here, + // so include the whole thing as part of the server address. + self.attributes.rpc.serverAddress = address + } + } + + func setOTelServerSpanGRPCAttributes( + context: ServerContext, + serverHostname: String, + networkTransportMethod: String + ) { + self.attributes.rpc.system = "grpc" + self.attributes.rpc.serverAddress = serverHostname + self.attributes.rpc.networkTransport = networkTransportMethod + self.attributes.rpc.service = context.descriptor.service.fullyQualifiedService + self.attributes.rpc.method = context.descriptor.method + + // Set server address information + switch PeerAddress(context.localPeer) { + case .ipv4(let address, let port): + self.attributes.rpc.networkType = "ipv4" + self.attributes.rpc.networkPeerAddress = address + self.attributes.rpc.networkPeerPort = port + self.attributes.rpc.serverPort = port + + case .ipv6(let address, let port): + self.attributes.rpc.networkType = "ipv6" + self.attributes.rpc.networkPeerAddress = address + self.attributes.rpc.networkPeerPort = port + self.attributes.rpc.serverPort = port + + case .unixDomainSocket(let path): + self.attributes.rpc.networkType = "unix" + self.attributes.rpc.networkPeerAddress = path + + case .other(let address): + // We can't nicely format the span attributes to contain the appropriate information here, + // so include the whole thing as part of the server address. + self.attributes.rpc.serverAddress = address + } + + switch PeerAddress(context.remotePeer) { + case .ipv4(let address, let port): + self.attributes.rpc.clientAddress = address + self.attributes.rpc.clientPort = port + + case .ipv6(let address, let port): + self.attributes.rpc.clientAddress = address + self.attributes.rpc.clientPort = port + + case .unixDomainSocket(let path): + self.attributes.rpc.clientAddress = path + + case .other(let address): + self.attributes.rpc.clientAddress = address + } + } +} + +private enum PeerAddress { + case ipv4(address: String, port: Int?) + case ipv6(address: String, port: Int?) + case unixDomainSocket(path: String) + case other(String) + + init(_ address: String) { + // We expect this address to be of one of these formats: + // - ipv4:: for ipv4 addresses + // - ipv6:[]: for ipv6 addresses + // - unix: for UNIX domain sockets + let addressComponents = address.split(separator: ":", omittingEmptySubsequences: false) + + guard addressComponents.count > 1 else { + // This is some unexpected/unknown format, so we have no way of splitting it up nicely. + self = .other(address) + return + } + + // Check what type the transport is... + switch addressComponents[0] { + case "ipv4": + guard addressComponents.count == 3, let port = Int(addressComponents[2]) else { + // This is some unexpected/unknown format, so we have no way of splitting it up nicely. + self = .other(address) + return + } + self = .ipv4(address: String(addressComponents[1]), port: port) + + case "ipv6": + guard addressComponents.count > 2, let port = Int(addressComponents.last!) else { + // This is some unexpected/unknown format, so we have no way of splitting it up nicely. + self = .other(address) + return + } + self = .ipv6( + address: String(addressComponents[1...makeStream() + let (requestStream, requestStreamContinuation) = AsyncStream.makeStream() serviceContext.traceID = traceIDString // FIXME: use 'ServiceContext.withValue(serviceContext)' @@ -49,15 +51,19 @@ final class TracingInterceptorTests: XCTestCase { try await writer.write(contentsOf: ["request1"]) try await writer.write(contentsOf: ["request2"]) }), - context: self.getClientContext(forMethod: methodDescriptor) + context: ClientContext( + descriptor: methodDescriptor, + remotePeer: "ipv4:10.1.2.80:567", + localPeer: "ipv4:10.1.2.80:123" + ) ) { stream, _ in // Assert the metadata contains the injected context key-value. XCTAssertEqual(stream.metadata, ["trace-id": "\(traceIDString)"]) - // Write into the response stream to make sure the `producer` closure's called. - let writer = RPCWriter(wrapping: TestWriter(streamContinuation: continuation)) + // Write into the request stream to make sure the `producer` closure's called. + let writer = RPCWriter(wrapping: TestWriter(streamContinuation: requestStreamContinuation)) try await stream.producer(writer) - continuation.finish() + requestStreamContinuation.finish() return .init( metadata: [], @@ -70,7 +76,7 @@ final class TracingInterceptorTests: XCTestCase { ) } - await AssertStreamContentsEqual(["request1", "request2"], stream) + await AssertStreamContentsEqual(["request1", "request2"], requestStream) try await AssertStreamContentsEqual([["response"]], response.messages) AssertTestSpanComponents(forMethod: methodDescriptor) { events in @@ -100,6 +106,85 @@ final class TracingInterceptorTests: XCTestCase { } } + func testClientInterceptor_IPv6() async throws { + var serviceContext = ServiceContext.topLevel + let traceIDString = UUID().uuidString + let interceptor = ClientOTelTracingInterceptor( + serverHostname: "someserver.com", + networkTransportMethod: "tcp", + traceEachMessage: false + ) + let (requestStream, requestStreamContinuation) = AsyncStream.makeStream() + serviceContext.traceID = traceIDString + + // FIXME: use 'ServiceContext.withValue(serviceContext)' + // + // This is blocked on: https://github.com/apple/swift-service-context/pull/46 + try await ServiceContext.$current.withValue(serviceContext) { + let methodDescriptor = MethodDescriptor( + fullyQualifiedService: "TracingInterceptorTests", + method: "testClientInterceptor" + ) + let response = try await interceptor.intercept( + request: .init(producer: { writer in + try await writer.write(contentsOf: ["request1"]) + try await writer.write(contentsOf: ["request2"]) + }), + context: ClientContext( + descriptor: methodDescriptor, + remotePeer: "ipv6:2001::130F:::09C0:876A:130B:1234", + localPeer: "ipv6:ff06:0:0:0:0:0:0:c3:5678" + ) + ) { stream, _ in + // Assert the metadata contains the injected context key-value. + XCTAssertEqual(stream.metadata, ["trace-id": "\(traceIDString)"]) + + // Write into the request stream to make sure the `producer` closure's called. + let writer = RPCWriter(wrapping: TestWriter(streamContinuation: requestStreamContinuation)) + try await stream.producer(writer) + requestStreamContinuation.finish() + + return .init( + metadata: [], + bodyParts: RPCAsyncSequence( + wrapping: AsyncThrowingStream { + $0.yield(.message(["response"])) + $0.finish() + } + ) + ) + } + + await AssertStreamContentsEqual(["request1", "request2"], requestStream) + try await AssertStreamContentsEqual([["response"]], response.messages) + + AssertTestSpanComponents(forMethod: methodDescriptor) { events in + // No events are recorded + XCTAssertTrue(events.isEmpty) + } assertAttributes: { attributes in + XCTAssertEqual( + attributes, + [ + "rpc.system": .string("grpc"), + "rpc.method": .string(methodDescriptor.method), + "rpc.service": .string(methodDescriptor.service.fullyQualifiedService), + "rpc.grpc.status_code": .int(0), + "server.address": .string("someserver.com"), + "server.port": .int(1234), + "network.peer.address": .string("2001::130F:::09C0:876A:130B"), + "network.peer.port": .int(1234), + "network.transport": .string("tcp"), + "network.type": .string("ipv6"), + ] + ) + } assertStatus: { status in + XCTAssertNil(status) + } assertErrors: { errors in + XCTAssertEqual(errors, []) + } + } + } + func testClientInterceptor_UDS() async throws { var serviceContext = ServiceContext.topLevel let traceIDString = UUID().uuidString @@ -108,7 +193,7 @@ final class TracingInterceptorTests: XCTestCase { networkTransportMethod: "tcp", traceEachMessage: false ) - let (stream, continuation) = AsyncStream.makeStream() + let (requestStream, requestStreamContinuation) = AsyncStream.makeStream() serviceContext.traceID = traceIDString // FIXME: use 'ServiceContext.withValue(serviceContext)' @@ -133,10 +218,10 @@ final class TracingInterceptorTests: XCTestCase { // Assert the metadata contains the injected context key-value. XCTAssertEqual(stream.metadata, ["trace-id": "\(traceIDString)"]) - // Write into the response stream to make sure the `producer` closure's called. - let writer = RPCWriter(wrapping: TestWriter(streamContinuation: continuation)) + // Write into the request stream to make sure the `producer` closure's called. + let writer = RPCWriter(wrapping: TestWriter(streamContinuation: requestStreamContinuation)) try await stream.producer(writer) - continuation.finish() + requestStreamContinuation.finish() return .init( metadata: [], @@ -149,7 +234,7 @@ final class TracingInterceptorTests: XCTestCase { ) } - await AssertStreamContentsEqual(["request1", "request2"], stream) + await AssertStreamContentsEqual(["request1", "request2"], requestStream) try await AssertStreamContentsEqual([["response"]], response.messages) AssertTestSpanComponents(forMethod: methodDescriptor) { events in @@ -189,7 +274,7 @@ final class TracingInterceptorTests: XCTestCase { networkTransportMethod: "tcp", traceEachMessage: true ) - let (stream, continuation) = AsyncStream.makeStream() + let (requestStream, requestStreamContinuation) = AsyncStream.makeStream() serviceContext.traceID = traceIDString // FIXME: use 'ServiceContext.withValue(serviceContext)' @@ -201,15 +286,19 @@ final class TracingInterceptorTests: XCTestCase { try await writer.write(contentsOf: ["request1"]) try await writer.write(contentsOf: ["request2"]) }), - context: self.getClientContext(forMethod: methodDescriptor) + context: ClientContext( + descriptor: methodDescriptor, + remotePeer: "ipv4:10.1.2.80:567", + localPeer: "ipv4:10.1.2.80:123" + ) ) { stream, _ in // Assert the metadata contains the injected context key-value. XCTAssertEqual(stream.metadata, ["trace-id": "\(traceIDString)"]) - // Write into the response stream to make sure the `producer` closure's called. - let writer = RPCWriter(wrapping: TestWriter(streamContinuation: continuation)) + // Write into the request stream to make sure the `producer` closure's called. + let writer = RPCWriter(wrapping: TestWriter(streamContinuation: requestStreamContinuation)) try await stream.producer(writer) - continuation.finish() + requestStreamContinuation.finish() return .init( metadata: [], @@ -222,7 +311,7 @@ final class TracingInterceptorTests: XCTestCase { ) } - await AssertStreamContentsEqual(["request1", "request2"], stream) + await AssertStreamContentsEqual(["request1", "request2"], requestStream) try await AssertStreamContentsEqual([["response"]], response.messages) AssertTestSpanComponents(forMethod: methodDescriptor) { events in @@ -263,13 +352,12 @@ final class TracingInterceptorTests: XCTestCase { func testClientInterceptorErrorEncountered() async throws { var serviceContext = ServiceContext.topLevel - let traceIDString = UUID().uuidString let interceptor = ClientOTelTracingInterceptor( serverHostname: "someserver.com", networkTransportMethod: "tcp", traceEachMessage: false ) - let (_, continuation) = AsyncStream.makeStream() + let traceIDString = UUID().uuidString serviceContext.traceID = traceIDString // FIXME: use 'ServiceContext.withValue(serviceContext)' @@ -281,30 +369,18 @@ final class TracingInterceptorTests: XCTestCase { method: "testClientInterceptorErrorEncountered" ) do { - _ = try await interceptor.intercept( - request: .init(producer: { writer in - try await writer.write("request") - throw TracingInterceptorTestError.testError - }), - context: self.getClientContext(forMethod: methodDescriptor) + let _: StreamingClientResponse = try await interceptor.intercept( + request: StreamingClientRequest(of: Void.self, producer: { writer in }), + context: ClientContext( + descriptor: methodDescriptor, + remotePeer: "ipv4:10.1.2.80:567", + localPeer: "ipv4:10.1.2.80:123" + ) ) { stream, _ in // Assert the metadata contains the injected context key-value. XCTAssertEqual(stream.metadata, ["trace-id": "\(traceIDString)"]) - // Write into the response stream to make sure the `producer` closure's called. - let writer = RPCWriter(wrapping: TestWriter(streamContinuation: continuation)) - try await stream.producer(writer) - continuation.finish() - - return .init( - metadata: [], - bodyParts: RPCAsyncSequence( - wrapping: AsyncThrowingStream { - $0.yield(.message(["response"])) - $0.finish() - } - ) - ) + throw TracingInterceptorTestError.testError } XCTFail("Should have thrown") } catch { @@ -344,7 +420,7 @@ final class TracingInterceptorTests: XCTestCase { networkTransportMethod: "tcp", traceEachMessage: false ) - let (stream, continuation) = AsyncStream.makeStream() + let (requestStream, requestStreamContinuation) = AsyncStream.makeStream() serviceContext.traceID = traceIDString // FIXME: use 'ServiceContext.withValue(serviceContext)' @@ -359,20 +435,24 @@ final class TracingInterceptorTests: XCTestCase { request: .init(producer: { writer in try await writer.write(contentsOf: ["request"]) }), - context: self.getClientContext(forMethod: methodDescriptor) + context: ClientContext( + descriptor: methodDescriptor, + remotePeer: "ipv4:10.1.2.80:567", + localPeer: "ipv4:10.1.2.80:123" + ) ) { stream, _ in // Assert the metadata contains the injected context key-value. XCTAssertEqual(stream.metadata, ["trace-id": "\(traceIDString)"]) - // Write into the response stream to make sure the `producer` closure's called. - let writer = RPCWriter(wrapping: TestWriter(streamContinuation: continuation)) + // Write into the request stream to make sure the `producer` closure's called. + let writer = RPCWriter(wrapping: TestWriter(streamContinuation: requestStreamContinuation)) try await stream.producer(writer) - continuation.finish() + requestStreamContinuation.finish() return .init(error: RPCError(code: .unavailable, message: "This should not work")) } - await AssertStreamContentsEqual(["request"], stream) + await AssertStreamContentsEqual(["request"], requestStream) switch response.accepted { case .success: @@ -578,10 +658,8 @@ final class TracingInterceptorTests: XCTestCase { "Received request start", "Received request end", // Recorded when `response1` is sent - "Sending response part", "Sent response part", // Recorded when `response2` is sent - "Sending response part", "Sent response part", // Recorded when we're done sending response "Sent response end", From 4c11858056ebfa014c2da151e54542a0dd40c33f Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Mon, 20 Jan 2025 16:24:17 +0000 Subject: [PATCH 08/16] Formatting --- Sources/GRPCInterceptors/HookedAsyncSequence.swift | 7 +++++-- .../Tracing/ClientOTelTracingInterceptor.swift | 10 ++++++---- .../Tracing/SpanAttributes+RPCAttributes.swift | 6 ++++-- .../TracingInterceptorTests.swift | 2 +- 4 files changed, 16 insertions(+), 9 deletions(-) diff --git a/Sources/GRPCInterceptors/HookedAsyncSequence.swift b/Sources/GRPCInterceptors/HookedAsyncSequence.swift index 83de5e0..1054f6d 100644 --- a/Sources/GRPCInterceptors/HookedAsyncSequence.swift +++ b/Sources/GRPCInterceptors/HookedAsyncSequence.swift @@ -14,7 +14,8 @@ * limitations under the License. */ -internal struct HookedRPCAsyncSequence: AsyncSequence, Sendable where Wrapped.Element: Sendable { +internal struct HookedRPCAsyncSequence: AsyncSequence, Sendable +where Wrapped.Element: Sendable { private let wrapped: Wrapped private let forEachElement: @Sendable (Wrapped.Element) -> Void @@ -62,7 +63,9 @@ internal struct HookedRPCAsyncSequence: Async self.onFailure = onFailure } - mutating func next(isolation actor: isolated (any Actor)?) async throws(Wrapped.Failure) -> Wrapped.Element? { + mutating func next( + isolation actor: isolated (any Actor)? + ) async throws(Wrapped.Failure) -> Wrapped.Element? { do { if let element = try await self.wrapped.next(isolation: actor) { self.forEachElement(element) diff --git a/Sources/GRPCInterceptors/Tracing/ClientOTelTracingInterceptor.swift b/Sources/GRPCInterceptors/Tracing/ClientOTelTracingInterceptor.swift index e3c1dd3..a511b92 100644 --- a/Sources/GRPCInterceptors/Tracing/ClientOTelTracingInterceptor.swift +++ b/Sources/GRPCInterceptors/Tracing/ClientOTelTracingInterceptor.swift @@ -111,15 +111,17 @@ public struct ClientOTelTracingInterceptor: ClientInterceptor { var response = try await next(request, context) switch response.accepted { case .success(var success): - let hookedSequence: HookedRPCAsyncSequence< - RPCAsyncSequence.Contents.BodyPart, any Error> - > + let hookedSequence: + HookedRPCAsyncSequence< + RPCAsyncSequence.Contents.BodyPart, any Error> + > if self.traceEachMessage { let messageReceivedCounter = Atomic(1) hookedSequence = HookedRPCAsyncSequence(wrapping: success.bodyParts) { _ in var event = SpanEvent(name: "rpc.message") event.attributes.rpc.messageType = "RECEIVED" - event.attributes.rpc.messageID = messageReceivedCounter + event.attributes.rpc.messageID = + messageReceivedCounter .wrappingAdd(1, ordering: .sequentiallyConsistent) .oldValue span.addEvent(event) diff --git a/Sources/GRPCInterceptors/Tracing/SpanAttributes+RPCAttributes.swift b/Sources/GRPCInterceptors/Tracing/SpanAttributes+RPCAttributes.swift index dc95e4d..38e3d1e 100644 --- a/Sources/GRPCInterceptors/Tracing/SpanAttributes+RPCAttributes.swift +++ b/Sources/GRPCInterceptors/Tracing/SpanAttributes+RPCAttributes.swift @@ -14,8 +14,8 @@ * limitations under the License. */ -internal import Tracing internal import GRPCCore +internal import Tracing @dynamicMemberLookup package struct RPCAttributes: SpanAttributeNamespace { @@ -187,7 +187,9 @@ private enum PeerAddress { return } self = .ipv6( - address: String(addressComponents[1.. Date: Mon, 20 Jan 2025 17:43:21 +0000 Subject: [PATCH 09/16] Parametrise tests and move to swift-testing --- .../ClientOTelTracingInterceptor.swift | 21 +- .../TracingInterceptorTests.swift | 495 ++++++++---------- 2 files changed, 238 insertions(+), 278 deletions(-) diff --git a/Sources/GRPCInterceptors/Tracing/ClientOTelTracingInterceptor.swift b/Sources/GRPCInterceptors/Tracing/ClientOTelTracingInterceptor.swift index a511b92..cad60b4 100644 --- a/Sources/GRPCInterceptors/Tracing/ClientOTelTracingInterceptor.swift +++ b/Sources/GRPCInterceptors/Tracing/ClientOTelTracingInterceptor.swift @@ -16,7 +16,7 @@ public import GRPCCore internal import Synchronization -internal import Tracing +package import Tracing /// A client interceptor that injects tracing information into the request. /// @@ -66,9 +66,26 @@ public struct ClientOTelTracingInterceptor: ClientInterceptor { StreamingClientRequest, ClientContext ) async throws -> StreamingClientResponse + ) async throws -> StreamingClientResponse where Input: Sendable, Output: Sendable { + try await self.intercept( + tracer: InstrumentationSystem.tracer, + request: request, + context: context, + next: next + ) + } + + /// Same as ``intercept(request:context:next:)``, but allows specifying a `Tracer` for testing purposes. + package func intercept( + tracer: any Tracer, + request: StreamingClientRequest, + context: ClientContext, + next: ( + StreamingClientRequest, + ClientContext + ) async throws -> StreamingClientResponse ) async throws -> StreamingClientResponse where Input: Sendable, Output: Sendable { var request = request - let tracer = InstrumentationSystem.tracer let serviceContext = ServiceContext.current ?? .topLevel tracer.inject( diff --git a/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift b/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift index 7c19b76..28650b8 100644 --- a/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift +++ b/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift @@ -17,24 +17,26 @@ import GRPCCore import Tracing import XCTest +import Testing +import GRPCInterceptors -@testable import GRPCInterceptors +@Suite("OTel Tracing Client Interceptor Tests") +struct OTelTracingClientInterceptorTests { + private let tracer: TestTracer -final class TracingInterceptorTests: XCTestCase { - override class func setUp() { - InstrumentationSystem.bootstrap(TestTracer()) + init() { + self.tracer = TestTracer() } // - MARK: Client Interceptor Tests - func testClientInterceptor_IPv4() async throws { + @Test( + "Successful RPC is recorded correctly", + arguments: OTelTracingInterceptorTestAddressType.allCases + ) + func testSuccessfulRPC(addressType: OTelTracingInterceptorTestAddressType) async throws { var serviceContext = ServiceContext.topLevel let traceIDString = UUID().uuidString - let interceptor = ClientOTelTracingInterceptor( - serverHostname: "someserver.com", - networkTransportMethod: "tcp", - traceEachMessage: false - ) let (requestStream, requestStreamContinuation) = AsyncStream.makeStream() serviceContext.traceID = traceIDString @@ -42,102 +44,30 @@ final class TracingInterceptorTests: XCTestCase { // // This is blocked on: https://github.com/apple/swift-service-context/pull/46 try await ServiceContext.$current.withValue(serviceContext) { - let methodDescriptor = MethodDescriptor( - fullyQualifiedService: "TracingInterceptorTests", - method: "testClientInterceptor" + let interceptor = ClientOTelTracingInterceptor( + serverHostname: "someserver.com", + networkTransportMethod: "tcp", + traceEachMessage: false ) - let response = try await interceptor.intercept( - request: .init(producer: { writer in - try await writer.write(contentsOf: ["request1"]) - try await writer.write(contentsOf: ["request2"]) - }), - context: ClientContext( - descriptor: methodDescriptor, - remotePeer: "ipv4:10.1.2.80:567", - localPeer: "ipv4:10.1.2.80:123" - ) - ) { stream, _ in - // Assert the metadata contains the injected context key-value. - XCTAssertEqual(stream.metadata, ["trace-id": "\(traceIDString)"]) - - // Write into the request stream to make sure the `producer` closure's called. - let writer = RPCWriter(wrapping: TestWriter(streamContinuation: requestStreamContinuation)) - try await stream.producer(writer) - requestStreamContinuation.finish() - - return .init( - metadata: [], - bodyParts: RPCAsyncSequence( - wrapping: AsyncThrowingStream { - $0.yield(.message(["response"])) - $0.finish() - } - ) - ) - } - - await AssertStreamContentsEqual(["request1", "request2"], requestStream) - try await AssertStreamContentsEqual([["response"]], response.messages) - - AssertTestSpanComponents(forMethod: methodDescriptor) { events in - // No events are recorded - XCTAssertTrue(events.isEmpty) - } assertAttributes: { attributes in - XCTAssertEqual( - attributes, - [ - "rpc.system": .string("grpc"), - "rpc.method": .string(methodDescriptor.method), - "rpc.service": .string(methodDescriptor.service.fullyQualifiedService), - "rpc.grpc.status_code": .int(0), - "server.address": .string("someserver.com"), - "server.port": .int(567), - "network.peer.address": .string("10.1.2.80"), - "network.peer.port": .int(567), - "network.transport": .string("tcp"), - "network.type": .string("ipv4"), - ] - ) - } assertStatus: { status in - XCTAssertNil(status) - } assertErrors: { errors in - XCTAssertEqual(errors, []) - } - } - } - - func testClientInterceptor_IPv6() async throws { - var serviceContext = ServiceContext.topLevel - let traceIDString = UUID().uuidString - let interceptor = ClientOTelTracingInterceptor( - serverHostname: "someserver.com", - networkTransportMethod: "tcp", - traceEachMessage: false - ) - let (requestStream, requestStreamContinuation) = AsyncStream.makeStream() - serviceContext.traceID = traceIDString - - // FIXME: use 'ServiceContext.withValue(serviceContext)' - // - // This is blocked on: https://github.com/apple/swift-service-context/pull/46 - try await ServiceContext.$current.withValue(serviceContext) { let methodDescriptor = MethodDescriptor( fullyQualifiedService: "TracingInterceptorTests", method: "testClientInterceptor" ) + let testValues = self.getTestValues(addressType: addressType, methodDescriptor: methodDescriptor) let response = try await interceptor.intercept( + tracer: self.tracer, request: .init(producer: { writer in try await writer.write(contentsOf: ["request1"]) try await writer.write(contentsOf: ["request2"]) }), context: ClientContext( descriptor: methodDescriptor, - remotePeer: "ipv6:2001::130F:::09C0:876A:130B:1234", - localPeer: "ipv6:ff06:0:0:0:0:0:0:c3:5678" + remotePeer: testValues.remotePeerAddress, + localPeer: testValues.localPeerAddress ) ) { stream, _ in // Assert the metadata contains the injected context key-value. - XCTAssertEqual(stream.metadata, ["trace-id": "\(traceIDString)"]) + #expect(stream.metadata == ["trace-id": "\(traceIDString)"]) // Write into the request stream to make sure the `producer` closure's called. let writer = RPCWriter(wrapping: TestWriter(streamContinuation: requestStreamContinuation)) @@ -155,44 +85,27 @@ final class TracingInterceptorTests: XCTestCase { ) } - await AssertStreamContentsEqual(["request1", "request2"], requestStream) - try await AssertStreamContentsEqual([["response"]], response.messages) + await assertStreamContentsEqual(["request1", "request2"], requestStream) + try await assertStreamContentsEqual([["response"]], response.messages) - AssertTestSpanComponents(forMethod: methodDescriptor) { events in + assertTestSpanComponents(forMethod: methodDescriptor) { events in // No events are recorded - XCTAssertTrue(events.isEmpty) + #expect(events.isEmpty) } assertAttributes: { attributes in - XCTAssertEqual( - attributes, - [ - "rpc.system": .string("grpc"), - "rpc.method": .string(methodDescriptor.method), - "rpc.service": .string(methodDescriptor.service.fullyQualifiedService), - "rpc.grpc.status_code": .int(0), - "server.address": .string("someserver.com"), - "server.port": .int(1234), - "network.peer.address": .string("2001::130F:::09C0:876A:130B"), - "network.peer.port": .int(1234), - "network.transport": .string("tcp"), - "network.type": .string("ipv6"), - ] - ) + #expect(attributes == testValues.expectedSpanAttributes) } assertStatus: { status in - XCTAssertNil(status) + #expect(status == nil) } assertErrors: { errors in - XCTAssertEqual(errors, []) + #expect(errors == []) } } } - func testClientInterceptor_UDS() async throws { + @Test("All events are recorded when traceEachMessage is true") + func testAllEventsRecorded() async throws { var serviceContext = ServiceContext.topLevel let traceIDString = UUID().uuidString - let interceptor = ClientOTelTracingInterceptor( - serverHostname: "someserver.com", - networkTransportMethod: "tcp", - traceEachMessage: false - ) + let (requestStream, requestStreamContinuation) = AsyncStream.makeStream() serviceContext.traceID = traceIDString @@ -200,100 +113,30 @@ final class TracingInterceptorTests: XCTestCase { // // This is blocked on: https://github.com/apple/swift-service-context/pull/46 try await ServiceContext.$current.withValue(serviceContext) { + let interceptor = ClientOTelTracingInterceptor( + serverHostname: "someserver.com", + networkTransportMethod: "tcp", + traceEachMessage: true + ) let methodDescriptor = MethodDescriptor( fullyQualifiedService: "TracingInterceptorTests", - method: "testClientInterceptor" + method: "testClientInterceptorAllEventsRecorded" ) + let testValues = self.getTestValues(addressType: .ipv4, methodDescriptor: methodDescriptor) let response = try await interceptor.intercept( + tracer: self.tracer, request: .init(producer: { writer in try await writer.write(contentsOf: ["request1"]) try await writer.write(contentsOf: ["request2"]) }), context: ClientContext( descriptor: methodDescriptor, - remotePeer: "unix:some-path", - localPeer: "unix:some-path" - ) - ) { stream, _ in - // Assert the metadata contains the injected context key-value. - XCTAssertEqual(stream.metadata, ["trace-id": "\(traceIDString)"]) - - // Write into the request stream to make sure the `producer` closure's called. - let writer = RPCWriter(wrapping: TestWriter(streamContinuation: requestStreamContinuation)) - try await stream.producer(writer) - requestStreamContinuation.finish() - - return .init( - metadata: [], - bodyParts: RPCAsyncSequence( - wrapping: AsyncThrowingStream { - $0.yield(.message(["response"])) - $0.finish() - } - ) - ) - } - - await AssertStreamContentsEqual(["request1", "request2"], requestStream) - try await AssertStreamContentsEqual([["response"]], response.messages) - - AssertTestSpanComponents(forMethod: methodDescriptor) { events in - // No events are recorded - XCTAssertTrue(events.isEmpty) - } assertAttributes: { attributes in - XCTAssertEqual( - attributes, - [ - "rpc.system": .string("grpc"), - "rpc.method": .string(methodDescriptor.method), - "rpc.service": .string(methodDescriptor.service.fullyQualifiedService), - "rpc.grpc.status_code": .int(0), - "server.address": .string("someserver.com"), - "network.peer.address": .string("some-path"), - "network.transport": .string("tcp"), - "network.type": .string("unix"), - ] - ) - } assertStatus: { status in - XCTAssertNil(status) - } assertErrors: { errors in - XCTAssertEqual(errors, []) - } - } - } - - func testClientInterceptorAllEventsRecorded() async throws { - let methodDescriptor = MethodDescriptor( - fullyQualifiedService: "TracingInterceptorTests", - method: "testClientInterceptorAllEventsRecorded" - ) - var serviceContext = ServiceContext.topLevel - let traceIDString = UUID().uuidString - let interceptor = ClientOTelTracingInterceptor( - serverHostname: "someserver.com", - networkTransportMethod: "tcp", - traceEachMessage: true - ) - let (requestStream, requestStreamContinuation) = AsyncStream.makeStream() - serviceContext.traceID = traceIDString - - // FIXME: use 'ServiceContext.withValue(serviceContext)' - // - // This is blocked on: https://github.com/apple/swift-service-context/pull/46 - try await ServiceContext.$current.withValue(serviceContext) { - let response = try await interceptor.intercept( - request: .init(producer: { writer in - try await writer.write(contentsOf: ["request1"]) - try await writer.write(contentsOf: ["request2"]) - }), - context: ClientContext( - descriptor: methodDescriptor, - remotePeer: "ipv4:10.1.2.80:567", - localPeer: "ipv4:10.1.2.80:123" + remotePeer: testValues.remotePeerAddress, + localPeer: testValues.localPeerAddress ) ) { stream, _ in // Assert the metadata contains the injected context key-value. - XCTAssertEqual(stream.metadata, ["trace-id": "\(traceIDString)"]) + #expect(stream.metadata == ["trace-id": "\(traceIDString)"]) // Write into the request stream to make sure the `producer` closure's called. let writer = RPCWriter(wrapping: TestWriter(streamContinuation: requestStreamContinuation)) @@ -311,12 +154,11 @@ final class TracingInterceptorTests: XCTestCase { ) } - await AssertStreamContentsEqual(["request1", "request2"], requestStream) - try await AssertStreamContentsEqual([["response"]], response.messages) + await assertStreamContentsEqual(["request1", "request2"], requestStream) + try await assertStreamContentsEqual([["response"]], response.messages) - AssertTestSpanComponents(forMethod: methodDescriptor) { events in - XCTAssertEqual( - events, + assertTestSpanComponents(forMethod: methodDescriptor) { events in + #expect(events == [ // Recorded when `request1` is sent TestSpanEvent("rpc.message", ["rpc.message.type": "SENT", "rpc.message.id": 1]), @@ -327,36 +169,18 @@ final class TracingInterceptorTests: XCTestCase { ] ) } assertAttributes: { attributes in - XCTAssertEqual( - attributes, - [ - "rpc.system": .string("grpc"), - "rpc.method": .string(methodDescriptor.method), - "rpc.service": .string(methodDescriptor.service.fullyQualifiedService), - "rpc.grpc.status_code": .int(0), - "server.address": .string("someserver.com"), - "server.port": .int(567), - "network.peer.address": .string("10.1.2.80"), - "network.peer.port": .int(567), - "network.transport": .string("tcp"), - "network.type": .string("ipv4"), - ] - ) + #expect(attributes == testValues.expectedSpanAttributes) } assertStatus: { status in - XCTAssertNil(status) + #expect(status == nil) } assertErrors: { errors in - XCTAssertEqual(errors, []) + #expect(errors == []) } } } + @Test("RPC resulting in error is correctly recorded") func testClientInterceptorErrorEncountered() async throws { var serviceContext = ServiceContext.topLevel - let interceptor = ClientOTelTracingInterceptor( - serverHostname: "someserver.com", - networkTransportMethod: "tcp", - traceEachMessage: false - ) let traceIDString = UUID().uuidString serviceContext.traceID = traceIDString @@ -364,12 +188,18 @@ final class TracingInterceptorTests: XCTestCase { // // This is blocked on: https://github.com/apple/swift-service-context/pull/46 await ServiceContext.$current.withValue(serviceContext) { + let interceptor = ClientOTelTracingInterceptor( + serverHostname: "someserver.com", + networkTransportMethod: "tcp", + traceEachMessage: false + ) let methodDescriptor = MethodDescriptor( fullyQualifiedService: "TracingInterceptorTests", method: "testClientInterceptorErrorEncountered" ) do { let _: StreamingClientResponse = try await interceptor.intercept( + tracer: self.tracer, request: StreamingClientRequest(of: Void.self, producer: { writer in }), context: ClientContext( descriptor: methodDescriptor, @@ -378,48 +208,43 @@ final class TracingInterceptorTests: XCTestCase { ) ) { stream, _ in // Assert the metadata contains the injected context key-value. - XCTAssertEqual(stream.metadata, ["trace-id": "\(traceIDString)"]) - + #expect(stream.metadata == ["trace-id": "\(traceIDString)"]) + // Now throw throw TracingInterceptorTestError.testError } - XCTFail("Should have thrown") + Issue.record("Should have thrown") } catch { - AssertTestSpanComponents(forMethod: methodDescriptor) { events in + assertTestSpanComponents(forMethod: methodDescriptor) { events in // No events are recorded - XCTAssertTrue(events.isEmpty) + #expect(events.isEmpty) } assertAttributes: { attributes in // The attributes should not contain a grpc status code, as the request was never even sent. - XCTAssertEqual( - attributes, + #expect(attributes == [ - "rpc.system": .string("grpc"), + "rpc.system": "grpc", "rpc.method": .string(methodDescriptor.method), "rpc.service": .string(methodDescriptor.service.fullyQualifiedService), - "server.address": .string("someserver.com"), - "server.port": .int(567), - "network.peer.address": .string("10.1.2.80"), - "network.peer.port": .int(567), - "network.transport": .string("tcp"), - "network.type": .string("ipv4"), + "server.address": "someserver.com", + "server.port": 567, + "network.peer.address": "10.1.2.80", + "network.peer.port": 567, + "network.transport": "tcp", + "network.type": "ipv4", ] ) } assertStatus: { status in - XCTAssertNil(status) + #expect(status == nil) } assertErrors: { errors in - XCTAssertEqual(errors, [.testError]) + #expect(errors == [.testError]) } } } } + @Test("RPC with error response is correctly recorded") func testClientInterceptorErrorReponse() async throws { var serviceContext = ServiceContext.topLevel let traceIDString = UUID().uuidString - let interceptor = ClientOTelTracingInterceptor( - serverHostname: "someserver.com", - networkTransportMethod: "tcp", - traceEachMessage: false - ) let (requestStream, requestStreamContinuation) = AsyncStream.makeStream() serviceContext.traceID = traceIDString @@ -427,11 +252,17 @@ final class TracingInterceptorTests: XCTestCase { // // This is blocked on: https://github.com/apple/swift-service-context/pull/46 try await ServiceContext.$current.withValue(serviceContext) { + let interceptor = ClientOTelTracingInterceptor( + serverHostname: "someserver.com", + networkTransportMethod: "tcp", + traceEachMessage: false + ) let methodDescriptor = MethodDescriptor( fullyQualifiedService: "TracingInterceptorTests", method: "testClientInterceptor" ) let response: StreamingClientResponse = try await interceptor.intercept( + tracer: self.tracer, request: .init(producer: { writer in try await writer.write(contentsOf: ["request"]) }), @@ -442,7 +273,7 @@ final class TracingInterceptorTests: XCTestCase { ) ) { stream, _ in // Assert the metadata contains the injected context key-value. - XCTAssertEqual(stream.metadata, ["trace-id": "\(traceIDString)"]) + #expect(stream.metadata == ["trace-id": "\(traceIDString)"]) // Write into the request stream to make sure the `producer` closure's called. let writer = RPCWriter(wrapping: TestWriter(streamContinuation: requestStreamContinuation)) @@ -452,43 +283,149 @@ final class TracingInterceptorTests: XCTestCase { return .init(error: RPCError(code: .unavailable, message: "This should not work")) } - await AssertStreamContentsEqual(["request"], requestStream) + await assertStreamContentsEqual(["request"], requestStream) switch response.accepted { case .success: - XCTFail("Response should have failed") + Issue.record("Response should have failed") + return case .failure(let failure): - XCTAssertEqual(failure, RPCError(code: .unavailable, message: "This should not work")) + #expect(failure == RPCError(code: .unavailable, message: "This should not work")) } - AssertTestSpanComponents(forMethod: methodDescriptor) { events in + assertTestSpanComponents(forMethod: methodDescriptor) { events in // No events are recorded - XCTAssertTrue(events.isEmpty) + #expect(events.isEmpty) } assertAttributes: { attributes in - XCTAssertEqual( - attributes, + #expect(attributes == [ - "rpc.system": .string("grpc"), + "rpc.system": "grpc", "rpc.method": .string(methodDescriptor.method), "rpc.service": .string(methodDescriptor.service.fullyQualifiedService), - "rpc.grpc.status_code": .int(14), // this is unavailable's raw code - "server.address": .string("someserver.com"), - "server.port": .int(567), - "network.peer.address": .string("10.1.2.80"), - "network.peer.port": .int(567), - "network.transport": .string("tcp"), - "network.type": .string("ipv4"), + "rpc.grpc.status_code": 14, // this is unavailable's raw code + "server.address": "someserver.com", + "server.port": 567, + "network.peer.address": "10.1.2.80", + "network.peer.port": 567, + "network.transport": "tcp", + "network.type": "ipv4", ] ) } assertStatus: { status in - XCTAssertEqual(status, .some(.init(code: .error))) + #expect(status == .some(.init(code: .error))) } assertErrors: { errors in - XCTAssertEqual(errors.count, 1) + #expect(errors.count == 1) } } } + // - MARK: Utilities + + private func getTestValues(addressType: OTelTracingInterceptorTestAddressType, methodDescriptor: MethodDescriptor) -> OTelTracingInterceptorTestCaseValues { + switch addressType { + case .ipv4: + return OTelTracingInterceptorTestCaseValues( + remotePeerAddress: "ipv4:10.1.2.80:567", + localPeerAddress: "ipv4:10.1.2.80:123", + expectedSpanAttributes: [ + "rpc.system": "grpc", + "rpc.method": .string(methodDescriptor.method), + "rpc.service": .string(methodDescriptor.service.fullyQualifiedService), + "rpc.grpc.status_code": 0, + "server.address": "someserver.com", + "server.port": 567, + "network.peer.address": "10.1.2.80", + "network.peer.port": 567, + "network.transport": "tcp", + "network.type": "ipv4", + ] + ) + + case .ipv6: + return OTelTracingInterceptorTestCaseValues( + remotePeerAddress: "ipv6:2001::130F:::09C0:876A:130B:1234", + localPeerAddress: "ipv6:ff06:0:0:0:0:0:0:c3:5678", + expectedSpanAttributes: [ + "rpc.system": "grpc", + "rpc.method": .string(methodDescriptor.method), + "rpc.service": .string(methodDescriptor.service.fullyQualifiedService), + "rpc.grpc.status_code": 0, + "server.address": "someserver.com", + "server.port": 1234, + "network.peer.address": "2001::130F:::09C0:876A:130B", + "network.peer.port": 1234, + "network.transport": "tcp", + "network.type": "ipv6", + ] + ) + + case .uds: + return OTelTracingInterceptorTestCaseValues( + remotePeerAddress: "unix:some-path", + localPeerAddress: "unix:some-path", + expectedSpanAttributes: [ + "rpc.system": "grpc", + "rpc.method": .string(methodDescriptor.method), + "rpc.service": .string(methodDescriptor.service.fullyQualifiedService), + "rpc.grpc.status_code": 0, + "server.address": "someserver.com", + "network.peer.address": "some-path", + "network.transport": "tcp", + "network.type": "unix", + ] + ) + } + } + + private func getTestSpanForMethod(_ methodDescriptor: MethodDescriptor) -> TestSpan { + return self.tracer.getSpan(ofOperation: methodDescriptor.fullyQualifiedMethod)! + } + + private func assertTestSpanComponents( + forMethod method: MethodDescriptor, + assertEvents: ([TestSpanEvent]) -> Void, + assertAttributes: (SpanAttributes) -> Void, + assertStatus: (SpanStatus?) -> Void, + assertErrors: ([TracingInterceptorTestError]) -> Void + ) { + let span = self.getTestSpanForMethod(method) + assertEvents(span.events.map({ TestSpanEvent($0) })) + assertAttributes(span.attributes) + assertStatus(span.status) + assertErrors(span.errors) + } + + private func assertStreamContentsEqual( + _ array: [T], + _ stream: any AsyncSequence + ) async throws { + var streamElements = [T]() + for try await element in stream { + streamElements.append(element) + } + #expect(streamElements == array) + } + + private func assertStreamContentsEqual( + _ array: [T], + _ stream: any AsyncSequence + ) async { + var streamElements = [T]() + for await element in stream { + streamElements.append(element) + } + #expect(streamElements == array) + } +} + +final class TracingInterceptorTests: XCTestCase { + override class func setUp() { + InstrumentationSystem.bootstrap(TestTracer()) + } + + // - MARK: Server Interceptor Tests + func testServerInterceptorErrorResponse() async throws { let methodDescriptor = MethodDescriptor( fullyQualifiedService: "TracingInterceptorTests", @@ -667,20 +604,12 @@ final class TracingInterceptorTests: XCTestCase { ) } - private func getClientContext(forMethod method: MethodDescriptor) -> ClientContext { - ClientContext( - descriptor: method, - remotePeer: "ipv4:10.1.2.80:567", - localPeer: "ipv6:localhost:1234" - ) - } - private func getTestSpanForMethod(_ methodDescriptor: MethodDescriptor) -> TestSpan { let tracer = InstrumentationSystem.tracer as! TestTracer return tracer.getSpan(ofOperation: methodDescriptor.fullyQualifiedMethod)! } - private func AssertTestSpanComponents( + private func assertTestSpanComponents( forMethod method: MethodDescriptor, assertEvents: ([TestSpanEvent]) -> Void, assertAttributes: (SpanAttributes) -> Void, @@ -694,7 +623,7 @@ final class TracingInterceptorTests: XCTestCase { assertErrors(span.errors) } - private func AssertStreamContentsEqual( + private func assertStreamContentsEqual( _ array: [T], _ stream: any AsyncSequence ) async throws { @@ -705,7 +634,7 @@ final class TracingInterceptorTests: XCTestCase { XCTAssertEqual(streamElements, array) } - private func AssertStreamContentsEqual( + private func assertStreamContentsEqual( _ array: [T], _ stream: any AsyncSequence ) async { @@ -716,3 +645,17 @@ final class TracingInterceptorTests: XCTestCase { XCTAssertEqual(streamElements, array) } } + +enum OTelTracingInterceptorTestAddressType { + case ipv4 + case ipv6 + case uds + + static let allCases: [Self] = [.ipv4, .ipv6, .uds] +} + +struct OTelTracingInterceptorTestCaseValues { + let remotePeerAddress: String + let localPeerAddress: String + let expectedSpanAttributes: SpanAttributes +} From 4d24cd9e2d4c6e8c4b46894004d9b0e75af058c8 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Mon, 20 Jan 2025 17:43:34 +0000 Subject: [PATCH 10/16] Formatting --- .../TracingInterceptorTests.swift | 26 ++++++++++++------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift b/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift index 28650b8..5cfd3db 100644 --- a/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift +++ b/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift @@ -15,10 +15,10 @@ */ import GRPCCore +import GRPCInterceptors +import Testing import Tracing import XCTest -import Testing -import GRPCInterceptors @Suite("OTel Tracing Client Interceptor Tests") struct OTelTracingClientInterceptorTests { @@ -53,7 +53,10 @@ struct OTelTracingClientInterceptorTests { fullyQualifiedService: "TracingInterceptorTests", method: "testClientInterceptor" ) - let testValues = self.getTestValues(addressType: addressType, methodDescriptor: methodDescriptor) + let testValues = self.getTestValues( + addressType: addressType, + methodDescriptor: methodDescriptor + ) let response = try await interceptor.intercept( tracer: self.tracer, request: .init(producer: { writer in @@ -158,8 +161,8 @@ struct OTelTracingClientInterceptorTests { try await assertStreamContentsEqual([["response"]], response.messages) assertTestSpanComponents(forMethod: methodDescriptor) { events in - #expect(events == - [ + #expect( + events == [ // Recorded when `request1` is sent TestSpanEvent("rpc.message", ["rpc.message.type": "SENT", "rpc.message.id": 1]), // Recorded when `request2` is sent @@ -219,8 +222,8 @@ struct OTelTracingClientInterceptorTests { #expect(events.isEmpty) } assertAttributes: { attributes in // The attributes should not contain a grpc status code, as the request was never even sent. - #expect(attributes == - [ + #expect( + attributes == [ "rpc.system": "grpc", "rpc.method": .string(methodDescriptor.method), "rpc.service": .string(methodDescriptor.service.fullyQualifiedService), @@ -298,8 +301,8 @@ struct OTelTracingClientInterceptorTests { // No events are recorded #expect(events.isEmpty) } assertAttributes: { attributes in - #expect(attributes == - [ + #expect( + attributes == [ "rpc.system": "grpc", "rpc.method": .string(methodDescriptor.method), "rpc.service": .string(methodDescriptor.service.fullyQualifiedService), @@ -322,7 +325,10 @@ struct OTelTracingClientInterceptorTests { // - MARK: Utilities - private func getTestValues(addressType: OTelTracingInterceptorTestAddressType, methodDescriptor: MethodDescriptor) -> OTelTracingInterceptorTestCaseValues { + private func getTestValues( + addressType: OTelTracingInterceptorTestAddressType, + methodDescriptor: MethodDescriptor + ) -> OTelTracingInterceptorTestCaseValues { switch addressType { case .ipv4: return OTelTracingInterceptorTestCaseValues( From 34bbf9fd599872907f41c136f21c7f13e6c9fb7c Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Mon, 20 Jan 2025 18:01:29 +0000 Subject: [PATCH 11/16] Add new test --- .../TracingInterceptorTests.swift | 114 ++++++++++++++++-- 1 file changed, 102 insertions(+), 12 deletions(-) diff --git a/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift b/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift index 5cfd3db..9c0361e 100644 --- a/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift +++ b/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift @@ -50,8 +50,8 @@ struct OTelTracingClientInterceptorTests { traceEachMessage: false ) let methodDescriptor = MethodDescriptor( - fullyQualifiedService: "TracingInterceptorTests", - method: "testClientInterceptor" + fullyQualifiedService: "OTelTracingClientInterceptorTests", + method: "testSuccessfulRPC" ) let testValues = self.getTestValues( addressType: addressType, @@ -122,8 +122,8 @@ struct OTelTracingClientInterceptorTests { traceEachMessage: true ) let methodDescriptor = MethodDescriptor( - fullyQualifiedService: "TracingInterceptorTests", - method: "testClientInterceptorAllEventsRecorded" + fullyQualifiedService: "OTelTracingClientInterceptorTests", + method: "testAllEventsRecorded" ) let testValues = self.getTestValues(addressType: .ipv4, methodDescriptor: methodDescriptor) let response = try await interceptor.intercept( @@ -181,8 +181,8 @@ struct OTelTracingClientInterceptorTests { } } - @Test("RPC resulting in error is correctly recorded") - func testClientInterceptorErrorEncountered() async throws { + @Test("RPC that throws is correctly recorded") + func testThrowingRPC() async throws { var serviceContext = ServiceContext.topLevel let traceIDString = UUID().uuidString serviceContext.traceID = traceIDString @@ -197,8 +197,8 @@ struct OTelTracingClientInterceptorTests { traceEachMessage: false ) let methodDescriptor = MethodDescriptor( - fullyQualifiedService: "TracingInterceptorTests", - method: "testClientInterceptorErrorEncountered" + fullyQualifiedService: "OTelTracingClientInterceptorTests", + method: "testThrowingRPC" ) do { let _: StreamingClientResponse = try await interceptor.intercept( @@ -244,8 +244,8 @@ struct OTelTracingClientInterceptorTests { } } - @Test("RPC with error response is correctly recorded") - func testClientInterceptorErrorReponse() async throws { + @Test("RPC with a failure response is correctly recorded") + func testFailedRPC() async throws { var serviceContext = ServiceContext.topLevel let traceIDString = UUID().uuidString let (requestStream, requestStreamContinuation) = AsyncStream.makeStream() @@ -261,8 +261,8 @@ struct OTelTracingClientInterceptorTests { traceEachMessage: false ) let methodDescriptor = MethodDescriptor( - fullyQualifiedService: "TracingInterceptorTests", - method: "testClientInterceptor" + fullyQualifiedService: "OTelTracingClientInterceptorTests", + method: "testFailedRPC" ) let response: StreamingClientResponse = try await interceptor.intercept( tracer: self.tracer, @@ -323,6 +323,96 @@ struct OTelTracingClientInterceptorTests { } } + @Test("Accepted server-streaming RPC that throws error during response is correctly recorded") + func testAcceptedRPCWithError() async throws { + var serviceContext = ServiceContext.topLevel + let traceIDString = UUID().uuidString + serviceContext.traceID = traceIDString + + // FIXME: use 'ServiceContext.withValue(serviceContext)' + // + // This is blocked on: https://github.com/apple/swift-service-context/pull/46 + try await ServiceContext.$current.withValue(serviceContext) { + let interceptor = ClientOTelTracingInterceptor( + serverHostname: "someserver.com", + networkTransportMethod: "tcp", + traceEachMessage: false + ) + let methodDescriptor = MethodDescriptor( + fullyQualifiedService: "OTelTracingClientInterceptorTests", + method: "testAcceptedRPCWithError" + ) + let response: StreamingClientResponse = try await interceptor.intercept( + tracer: self.tracer, + request: .init(producer: { writer in + try await writer.write(contentsOf: ["request"]) + }), + context: ClientContext( + descriptor: methodDescriptor, + remotePeer: "ipv4:10.1.2.80:567", + localPeer: "ipv4:10.1.2.80:123" + ) + ) { stream, _ in + // Assert the metadata contains the injected context key-value. + #expect(stream.metadata == ["trace-id": "\(traceIDString)"]) + + return .init( + metadata: [], + bodyParts: RPCAsyncSequence( + wrapping: AsyncThrowingStream { + $0.finish(throwing: RPCError(code: .unavailable, message: "This should be thrown")) + } + ) + ) + } + + switch response.accepted { + case .success(let success): + do { + for try await _ in success.bodyParts { + // We don't care about any received messages here - we're not even writing any. + } + } catch { + #expect( + error as? RPCError + == RPCError( + code: .unavailable, + message: "This should be thrown" + ) + ) + } + + case .failure: + Issue.record("Response should have been successful") + return + } + + assertTestSpanComponents(forMethod: methodDescriptor) { events in + // No events are recorded + #expect(events.isEmpty) + } assertAttributes: { attributes in + #expect( + attributes == [ + "rpc.system": "grpc", + "rpc.method": .string(methodDescriptor.method), + "rpc.service": .string(methodDescriptor.service.fullyQualifiedService), + "rpc.grpc.status_code": 14, // this is unavailable's raw code + "server.address": "someserver.com", + "server.port": 567, + "network.peer.address": "10.1.2.80", + "network.peer.port": 567, + "network.transport": "tcp", + "network.type": "ipv4", + ] + ) + } assertStatus: { status in + #expect(status == .some(.init(code: .error))) + } assertErrors: { errors in + #expect(errors.count == 1) + } + } + } + // - MARK: Utilities private func getTestValues( From 01186b1c23c937bee2b528c0e5bc0864eaf47f7d Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Tue, 21 Jan 2025 11:11:37 +0000 Subject: [PATCH 12/16] PR changes --- .../HookedAsyncSequence.swift | 38 +--- .../ClientOTelTracingInterceptor.swift | 56 +++-- .../SpanAttributes+GRPCTracingKeys.swift | 134 +++++++++++ .../SpanAttributes+RPCAttributes.swift | 209 ------------------ .../PeerAddressTests.swift | 45 ++++ .../TracingInterceptorTests.swift | 2 +- 6 files changed, 226 insertions(+), 258 deletions(-) create mode 100644 Sources/GRPCInterceptors/Tracing/SpanAttributes+GRPCTracingKeys.swift delete mode 100644 Sources/GRPCInterceptors/Tracing/SpanAttributes+RPCAttributes.swift create mode 100644 Tests/GRPCInterceptorsTests/PeerAddressTests.swift diff --git a/Sources/GRPCInterceptors/HookedAsyncSequence.swift b/Sources/GRPCInterceptors/HookedAsyncSequence.swift index 1054f6d..fdbe100 100644 --- a/Sources/GRPCInterceptors/HookedAsyncSequence.swift +++ b/Sources/GRPCInterceptors/HookedAsyncSequence.swift @@ -19,27 +19,23 @@ where Wrapped.Element: Sendable { private let wrapped: Wrapped private let forEachElement: @Sendable (Wrapped.Element) -> Void - private let onFinish: @Sendable () -> Void - private let onFailure: @Sendable (any Error) -> Void + private let onFinish: @Sendable ((any Error)?) -> Void init( wrapping sequence: Wrapped, forEachElement: @escaping @Sendable (Wrapped.Element) -> Void, - onFinish: @escaping @Sendable () -> Void, - onFailure: @escaping @Sendable (any Error) -> Void + onFinish: @escaping @Sendable ((any Error)?) -> Void ) { self.wrapped = sequence self.forEachElement = forEachElement self.onFinish = onFinish - self.onFailure = onFailure } func makeAsyncIterator() -> HookedAsyncIterator { HookedAsyncIterator( self.wrapped, forEachElement: self.forEachElement, - onFinish: self.onFinish, - onFailure: self.onFailure + onFinish: self.onFinish ) } @@ -48,19 +44,16 @@ where Wrapped.Element: Sendable { private var wrapped: Wrapped.AsyncIterator private let forEachElement: @Sendable (Wrapped.Element) -> Void - private let onFinish: @Sendable () -> Void - private let onFailure: @Sendable (any Error) -> Void + private let onFinish: @Sendable ((any Error)?) -> Void init( _ sequence: Wrapped, forEachElement: @escaping @Sendable (Wrapped.Element) -> Void, - onFinish: @escaping @Sendable () -> Void, - onFailure: @escaping @Sendable (any Error) -> Void + onFinish: @escaping @Sendable ((any Error)?) -> Void ) { self.wrapped = sequence.makeAsyncIterator() self.forEachElement = forEachElement self.onFinish = onFinish - self.onFailure = onFailure } mutating func next( @@ -70,29 +63,18 @@ where Wrapped.Element: Sendable { if let element = try await self.wrapped.next(isolation: actor) { self.forEachElement(element) return element + } else { + self.onFinish(nil) + return nil } - - self.onFinish() - return nil } catch { - self.onFailure(error) + self.onFinish(error) throw error } } mutating func next() async throws -> Wrapped.Element? { - do { - if let element = try await self.wrapped.next() { - self.forEachElement(element) - return element - } - - self.onFinish() - return nil - } catch { - self.onFailure(error) - throw error - } + try await self.next(isolation: nil) } } } diff --git a/Sources/GRPCInterceptors/Tracing/ClientOTelTracingInterceptor.swift b/Sources/GRPCInterceptors/Tracing/ClientOTelTracingInterceptor.swift index cad60b4..b51834c 100644 --- a/Sources/GRPCInterceptors/Tracing/ClientOTelTracingInterceptor.swift +++ b/Sources/GRPCInterceptors/Tracing/ClientOTelTracingInterceptor.swift @@ -34,7 +34,7 @@ public struct ClientOTelTracingInterceptor: ClientInterceptor { /// /// - Parameters: /// - severHostname: The hostname of the RPC server. This will be the value for the `server.address` attribute in spans. - /// - networkTransportMethod: The transport in use (e.g. "tcp", "udp"). This will be the value for the + /// - networkTransportMethod: The transport in use (e.g. "tcp", "unix"). This will be the value for the /// `network.transport` attribute in spans. /// - traceEachMessage: If `true`, each request part sent and response part received will be recorded as a separate /// event in a tracing span. Otherwise, only the request/response start and end will be recorded as events. @@ -113,8 +113,8 @@ public struct ClientOTelTracingInterceptor: ClientInterceptor { wrapping: writer, afterEachWrite: { var event = SpanEvent(name: "rpc.message") - event.attributes.rpc.messageType = "SENT" - event.attributes.rpc.messageID = + event.attributes[GRPCTracingKeys.rpcMessageType] = "SENT" + event.attributes[GRPCTracingKeys.rpcMessageID] = messageSentCounter .wrappingAdd(1, ordering: .sequentiallyConsistent) .oldValue @@ -136,32 +136,36 @@ public struct ClientOTelTracingInterceptor: ClientInterceptor { let messageReceivedCounter = Atomic(1) hookedSequence = HookedRPCAsyncSequence(wrapping: success.bodyParts) { _ in var event = SpanEvent(name: "rpc.message") - event.attributes.rpc.messageType = "RECEIVED" - event.attributes.rpc.messageID = + event.attributes[GRPCTracingKeys.rpcMessageType] = "RECEIVED" + event.attributes[GRPCTracingKeys.rpcMessageID] = messageReceivedCounter .wrappingAdd(1, ordering: .sequentiallyConsistent) .oldValue span.addEvent(event) - } onFinish: { - span.attributes.rpc.grpcStatusCode = 0 - } onFailure: { error in - if let rpcError = error as? RPCError { - span.attributes.rpc.grpcStatusCode = rpcError.code.rawValue + } onFinish: { error in + if let error { + if let errorCode = error.grpcErrorCode { + span.attributes[GRPCTracingKeys.grpcStatusCode] = errorCode.rawValue + } + span.setStatus(SpanStatus(code: .error)) + span.recordError(error) + } else { + span.attributes[GRPCTracingKeys.grpcStatusCode] = 0 } - span.setStatus(SpanStatus(code: .error)) - span.recordError(error) } } else { hookedSequence = HookedRPCAsyncSequence(wrapping: success.bodyParts) { _ in // Nothing to do if traceEachMessage is false - } onFinish: { - span.attributes.rpc.grpcStatusCode = 0 - } onFailure: { error in - if let rpcError = error as? RPCError { - span.attributes.rpc.grpcStatusCode = rpcError.code.rawValue + } onFinish: { error in + if let error { + if let errorCode = error.grpcErrorCode { + span.attributes[GRPCTracingKeys.grpcStatusCode] = errorCode.rawValue + } + span.setStatus(SpanStatus(code: .error)) + span.recordError(error) + } else { + span.attributes[GRPCTracingKeys.grpcStatusCode] = 0 } - span.setStatus(SpanStatus(code: .error)) - span.recordError(error) } } @@ -169,7 +173,7 @@ public struct ClientOTelTracingInterceptor: ClientInterceptor { response.accepted = .success(success) case .failure(let error): - span.attributes.rpc.grpcStatusCode = error.code.rawValue + span.attributes[GRPCTracingKeys.grpcStatusCode] = error.code.rawValue span.setStatus(SpanStatus(code: .error)) span.recordError(error) } @@ -188,3 +192,15 @@ struct ClientRequestInjector: Instrumentation.Injector { carrier.addString(value, forKey: key) } } + +extension Error { + var grpcErrorCode: RPCError.Code? { + if let rpcError = self as? RPCError { + return rpcError.code + } else if let rpcError = self as? any RPCErrorConvertible { + return rpcError.rpcErrorCode + } else { + return nil + } + } +} diff --git a/Sources/GRPCInterceptors/Tracing/SpanAttributes+GRPCTracingKeys.swift b/Sources/GRPCInterceptors/Tracing/SpanAttributes+GRPCTracingKeys.swift new file mode 100644 index 0000000..c04a43e --- /dev/null +++ b/Sources/GRPCInterceptors/Tracing/SpanAttributes+GRPCTracingKeys.swift @@ -0,0 +1,134 @@ +/* + * Copyright 2025, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +internal import GRPCCore +internal import Tracing + +public enum GRPCTracingKeys { + static let rpcSystem = "rpc.system" + static let rpcMethod = "rpc.method" + static let rpcService = "rpc.service" + static let rpcMessageID = "rpc.message.id" + static let rpcMessageType = "rpc.message.type" + static let grpcStatusCode = "rpc.grpc.status_code" + + static let serverAddress = "server.address" + static let serverPort = "server.port" + + static let clientAddress = "client.address" + static let clientPort = "client.port" + + static let networkTransport = "network.transport" + static let networkType = "network.type" + static let networkPeerAddress = "network.peer.address" + static let networkPeerPort = "network.peer.port" +} + +extension Span { + // See: https://opentelemetry.io/docs/specs/semconv/rpc/rpc-spans/ + func setOTelClientSpanGRPCAttributes( + context: ClientContext, + serverHostname: String, + networkTransportMethod: String + ) { + self.attributes[GRPCTracingKeys.rpcSystem] = "grpc" + self.attributes[GRPCTracingKeys.serverAddress] = serverHostname + self.attributes[GRPCTracingKeys.networkTransport] = networkTransportMethod + self.attributes[GRPCTracingKeys.rpcService] = context.descriptor.service.fullyQualifiedService + self.attributes[GRPCTracingKeys.rpcMethod] = context.descriptor.method + + // Set server address information + switch PeerAddress(context.remotePeer) { + case .ipv4(let address, let port): + self.attributes[GRPCTracingKeys.networkType] = "ipv4" + self.attributes[GRPCTracingKeys.networkPeerAddress] = address + self.attributes[GRPCTracingKeys.networkPeerPort] = port + self.attributes[GRPCTracingKeys.serverPort] = port + + case .ipv6(let address, let port): + self.attributes[GRPCTracingKeys.networkType] = "ipv6" + self.attributes[GRPCTracingKeys.networkPeerAddress] = address + self.attributes[GRPCTracingKeys.networkPeerPort] = port + self.attributes[GRPCTracingKeys.serverPort] = port + + case .unixDomainSocket(let path): + self.attributes[GRPCTracingKeys.networkType] = "unix" + self.attributes[GRPCTracingKeys.networkPeerAddress] = path + + case .other(let address): + // We can't nicely format the span attributes to contain the appropriate information here, + // so include the whole thing as part of the peer address. + self.attributes[GRPCTracingKeys.networkPeerAddress] = address + } + } +} + +package enum PeerAddress: Equatable { + case ipv4(address: String, port: Int?) + case ipv6(address: String, port: Int?) + case unixDomainSocket(path: String) + case other(String) + + package init(_ address: String) { + // We expect this address to be of one of these formats: + // - ipv4:: for ipv4 addresses + // - ipv6:[]: for ipv6 addresses + // - unix: for UNIX domain sockets + let addressComponents = address.split(separator: ":", omittingEmptySubsequences: false) + + guard addressComponents.count > 1 else { + // This is some unexpected/unknown format, so we have no way of splitting it up nicely. + self = .other(address) + return + } + + // Check what type the transport is... + switch addressComponents[0] { + case "ipv4": + guard addressComponents.count == 3, let port = Int(addressComponents[2]) else { + // This is some unexpected/unknown format, so we have no way of splitting it up nicely. + self = .other(address) + return + } + self = .ipv4(address: String(addressComponents[1]), port: port) + + case "ipv6": + guard addressComponents.count > 2, let port = Int(addressComponents.last!) else { + // This is some unexpected/unknown format, so we have no way of splitting it up nicely. + self = .other(address) + return + } + self = .ipv6( + address: String( + addressComponents[1 ..< addressComponents.count - 1].joined(separator: ":") + ), + port: port + ) + + case "unix": + guard addressComponents.count == 2 else { + // This is some unexpected/unknown format, so we have no way of splitting it up nicely. + self = .other(address) + return + } + self = .unixDomainSocket(path: String(addressComponents[1])) + + default: + // This is some unexpected/unknown format, so we have no way of splitting it up nicely. + self = .other(address) + } + } +} diff --git a/Sources/GRPCInterceptors/Tracing/SpanAttributes+RPCAttributes.swift b/Sources/GRPCInterceptors/Tracing/SpanAttributes+RPCAttributes.swift deleted file mode 100644 index 38e3d1e..0000000 --- a/Sources/GRPCInterceptors/Tracing/SpanAttributes+RPCAttributes.swift +++ /dev/null @@ -1,209 +0,0 @@ -/* - * Copyright 2025, gRPC Authors All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -internal import GRPCCore -internal import Tracing - -@dynamicMemberLookup -package struct RPCAttributes: SpanAttributeNamespace { - var attributes: SpanAttributes - - init(attributes: SpanAttributes) { - self.attributes = attributes - } - - struct NestedSpanAttributes: NestedSpanAttributesProtocol { - init() {} - - var system: Key { "rpc.system" } - var method: Key { "rpc.method" } - var service: Key { "rpc.service" } - var messageID: Key { "rpc.message.id" } - var messageType: Key { "rpc.message.type" } - var grpcStatusCode: Key { "rpc.grpc.status_code" } - - var serverAddress: Key { "server.address" } - var serverPort: Key { "server.port" } - - var clientAddress: Key { "client.address" } - var clientPort: Key { "client.port" } - - var networkTransport: Key { "network.transport" } - var networkType: Key { "network.type" } - var networkPeerAddress: Key { "network.peer.address" } - var networkPeerPort: Key { "network.peer.port" } - } -} - -extension SpanAttributes { - /// Semantic conventions for RPC spans. - package var rpc: RPCAttributes { - get { - .init(attributes: self) - } - set { - self = newValue.attributes - } - } -} - -extension Span { - // See: https://opentelemetry.io/docs/specs/semconv/rpc/rpc-spans/ - func setOTelClientSpanGRPCAttributes( - context: ClientContext, - serverHostname: String, - networkTransportMethod: String - ) { - self.attributes.rpc.system = "grpc" - self.attributes.rpc.serverAddress = serverHostname - self.attributes.rpc.networkTransport = networkTransportMethod - self.attributes.rpc.service = context.descriptor.service.fullyQualifiedService - self.attributes.rpc.method = context.descriptor.method - - // Set server address information - switch PeerAddress(context.remotePeer) { - case .ipv4(let address, let port): - self.attributes.rpc.networkType = "ipv4" - self.attributes.rpc.networkPeerAddress = address - self.attributes.rpc.networkPeerPort = port - self.attributes.rpc.serverPort = port - - case .ipv6(let address, let port): - self.attributes.rpc.networkType = "ipv6" - self.attributes.rpc.networkPeerAddress = address - self.attributes.rpc.networkPeerPort = port - self.attributes.rpc.serverPort = port - - case .unixDomainSocket(let path): - self.attributes.rpc.networkType = "unix" - self.attributes.rpc.networkPeerAddress = path - - case .other(let address): - // We can't nicely format the span attributes to contain the appropriate information here, - // so include the whole thing as part of the server address. - self.attributes.rpc.serverAddress = address - } - } - - func setOTelServerSpanGRPCAttributes( - context: ServerContext, - serverHostname: String, - networkTransportMethod: String - ) { - self.attributes.rpc.system = "grpc" - self.attributes.rpc.serverAddress = serverHostname - self.attributes.rpc.networkTransport = networkTransportMethod - self.attributes.rpc.service = context.descriptor.service.fullyQualifiedService - self.attributes.rpc.method = context.descriptor.method - - // Set server address information - switch PeerAddress(context.localPeer) { - case .ipv4(let address, let port): - self.attributes.rpc.networkType = "ipv4" - self.attributes.rpc.networkPeerAddress = address - self.attributes.rpc.networkPeerPort = port - self.attributes.rpc.serverPort = port - - case .ipv6(let address, let port): - self.attributes.rpc.networkType = "ipv6" - self.attributes.rpc.networkPeerAddress = address - self.attributes.rpc.networkPeerPort = port - self.attributes.rpc.serverPort = port - - case .unixDomainSocket(let path): - self.attributes.rpc.networkType = "unix" - self.attributes.rpc.networkPeerAddress = path - - case .other(let address): - // We can't nicely format the span attributes to contain the appropriate information here, - // so include the whole thing as part of the server address. - self.attributes.rpc.serverAddress = address - } - - switch PeerAddress(context.remotePeer) { - case .ipv4(let address, let port): - self.attributes.rpc.clientAddress = address - self.attributes.rpc.clientPort = port - - case .ipv6(let address, let port): - self.attributes.rpc.clientAddress = address - self.attributes.rpc.clientPort = port - - case .unixDomainSocket(let path): - self.attributes.rpc.clientAddress = path - - case .other(let address): - self.attributes.rpc.clientAddress = address - } - } -} - -private enum PeerAddress { - case ipv4(address: String, port: Int?) - case ipv6(address: String, port: Int?) - case unixDomainSocket(path: String) - case other(String) - - init(_ address: String) { - // We expect this address to be of one of these formats: - // - ipv4:: for ipv4 addresses - // - ipv6:[]: for ipv6 addresses - // - unix: for UNIX domain sockets - let addressComponents = address.split(separator: ":", omittingEmptySubsequences: false) - - guard addressComponents.count > 1 else { - // This is some unexpected/unknown format, so we have no way of splitting it up nicely. - self = .other(address) - return - } - - // Check what type the transport is... - switch addressComponents[0] { - case "ipv4": - guard addressComponents.count == 3, let port = Int(addressComponents[2]) else { - // This is some unexpected/unknown format, so we have no way of splitting it up nicely. - self = .other(address) - return - } - self = .ipv4(address: String(addressComponents[1]), port: port) - - case "ipv6": - guard addressComponents.count > 2, let port = Int(addressComponents.last!) else { - // This is some unexpected/unknown format, so we have no way of splitting it up nicely. - self = .other(address) - return - } - self = .ipv6( - address: String( - addressComponents[1 ..< addressComponents.count - 1].joined(separator: ":") - ), - port: port - ) - - case "unix": - guard addressComponents.count == 2 else { - // This is some unexpected/unknown format, so we have no way of splitting it up nicely. - self = .other(address) - return - } - self = .unixDomainSocket(path: String(addressComponents[1])) - - default: - // This is some unexpected/unknown format, so we have no way of splitting it up nicely. - self = .other(address) - } - } -} diff --git a/Tests/GRPCInterceptorsTests/PeerAddressTests.swift b/Tests/GRPCInterceptorsTests/PeerAddressTests.swift new file mode 100644 index 0000000..e383a88 --- /dev/null +++ b/Tests/GRPCInterceptorsTests/PeerAddressTests.swift @@ -0,0 +1,45 @@ +/* + * Copyright 2025, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import GRPCInterceptors +import Testing + +@Suite("PeerAddress tests") +struct PeerAddressTests { + @Test("IPv4 addresses are correctly parsed") + func testIPv4() { + let address = PeerAddress("ipv4:10.1.2.80:567") + #expect(address == .ipv4(address: "10.1.2.80", port: 567)) + } + + @Test("IPv6 addresses are correctly parsed") + func testIPv6() { + let address = PeerAddress("ipv6:2001::130F:::09C0:876A:130B:1234") + #expect(address == .ipv6(address: "2001::130F:::09C0:876A:130B", port: 1234)) + } + + @Test("Unix domain sockets are correctly parsed") + func testUDS() { + let address = PeerAddress("unix:some-path") + #expect(address == .unixDomainSocket(path: "some-path")) + } + + @Test("Unrecognised addresses are correctly parsed") + func testOther() { + let address = PeerAddress("in-process:1234") + #expect(address == .other("in-process:1234")) + } +} diff --git a/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift b/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift index 9c0361e..5b20c6c 100644 --- a/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift +++ b/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift @@ -1,5 +1,5 @@ /* - * Copyright 2024, gRPC Authors All rights reserved. + * Copyright 2024-2025, gRPC Authors All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From 3275f726adf41d84085cd2a6390909fc91d81a2a Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Tue, 21 Jan 2025 19:42:36 +0000 Subject: [PATCH 13/16] Bring over PeerAddress parsing changes --- .../SpanAttributes+GRPCTracingKeys.swift | 34 +++++++++---------- .../PeerAddressTests.swift | 2 +- .../TracingInterceptorTests.swift | 4 +-- 3 files changed, 19 insertions(+), 21 deletions(-) diff --git a/Sources/GRPCInterceptors/Tracing/SpanAttributes+GRPCTracingKeys.swift b/Sources/GRPCInterceptors/Tracing/SpanAttributes+GRPCTracingKeys.swift index c04a43e..6857914 100644 --- a/Sources/GRPCInterceptors/Tracing/SpanAttributes+GRPCTracingKeys.swift +++ b/Sources/GRPCInterceptors/Tracing/SpanAttributes+GRPCTracingKeys.swift @@ -17,7 +17,7 @@ internal import GRPCCore internal import Tracing -public enum GRPCTracingKeys { +enum GRPCTracingKeys { static let rpcSystem = "rpc.system" static let rpcMethod = "rpc.method" static let rpcService = "rpc.service" @@ -87,7 +87,9 @@ package enum PeerAddress: Equatable { // - ipv4:: for ipv4 addresses // - ipv6:[]: for ipv6 addresses // - unix: for UNIX domain sockets - let addressComponents = address.split(separator: ":", omittingEmptySubsequences: false) + + // First get the first component so that we know what type of address we're dealing with + let addressComponents = address.split(separator: ":", maxSplits: 1) guard addressComponents.count > 1 else { // This is some unexpected/unknown format, so we have no way of splitting it up nicely. @@ -98,32 +100,28 @@ package enum PeerAddress: Equatable { // Check what type the transport is... switch addressComponents[0] { case "ipv4": - guard addressComponents.count == 3, let port = Int(addressComponents[2]) else { + let ipv4AddressComponents = addressComponents[1].split(separator: ":") + if ipv4AddressComponents.count == 2, let port = Int(ipv4AddressComponents[1]) { + self = .ipv4(address: String(ipv4AddressComponents[0]), port: port) + } else { // This is some unexpected/unknown format, so we have no way of splitting it up nicely. self = .other(address) - return } - self = .ipv4(address: String(addressComponents[1]), port: port) case "ipv6": - guard addressComponents.count > 2, let port = Int(addressComponents.last!) else { + // At this point, we are looking at an address with format: [
]: + // We drop the first character ('[') and split by ']:' to keep two components: the address + // and the port. + let ipv6AddressComponents = addressComponents[1].dropFirst().split(separator: "]:") + if ipv6AddressComponents.count == 2, let port = Int(ipv6AddressComponents[1]) { + self = .ipv6(address: String(ipv6AddressComponents[0]), port: port) + } else { // This is some unexpected/unknown format, so we have no way of splitting it up nicely. self = .other(address) - return } - self = .ipv6( - address: String( - addressComponents[1 ..< addressComponents.count - 1].joined(separator: ":") - ), - port: port - ) case "unix": - guard addressComponents.count == 2 else { - // This is some unexpected/unknown format, so we have no way of splitting it up nicely. - self = .other(address) - return - } + // Whatever comes after "unix:" is the self = .unixDomainSocket(path: String(addressComponents[1])) default: diff --git a/Tests/GRPCInterceptorsTests/PeerAddressTests.swift b/Tests/GRPCInterceptorsTests/PeerAddressTests.swift index e383a88..b7b4140 100644 --- a/Tests/GRPCInterceptorsTests/PeerAddressTests.swift +++ b/Tests/GRPCInterceptorsTests/PeerAddressTests.swift @@ -27,7 +27,7 @@ struct PeerAddressTests { @Test("IPv6 addresses are correctly parsed") func testIPv6() { - let address = PeerAddress("ipv6:2001::130F:::09C0:876A:130B:1234") + let address = PeerAddress("ipv6:[2001::130F:::09C0:876A:130B]:1234") #expect(address == .ipv6(address: "2001::130F:::09C0:876A:130B", port: 1234)) } diff --git a/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift b/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift index 5b20c6c..ce29402 100644 --- a/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift +++ b/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift @@ -440,8 +440,8 @@ struct OTelTracingClientInterceptorTests { case .ipv6: return OTelTracingInterceptorTestCaseValues( - remotePeerAddress: "ipv6:2001::130F:::09C0:876A:130B:1234", - localPeerAddress: "ipv6:ff06:0:0:0:0:0:0:c3:5678", + remotePeerAddress: "ipv6:[2001::130F:::09C0:876A:130B]:1234", + localPeerAddress: "ipv6:[ff06:0:0:0:0:0:0:c3]:5678", expectedSpanAttributes: [ "rpc.system": "grpc", "rpc.method": .string(methodDescriptor.method), From 10189c7235e2f6ba7baf4eedd5eb8ebf56c3ec0c Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 22 Jan 2025 16:34:53 +0000 Subject: [PATCH 14/16] PR changes --- .../SpanAttributes+GRPCTracingKeys.swift | 42 +++++++++---------- .../PeerAddressTests.swift | 22 ++++++++-- 2 files changed, 38 insertions(+), 26 deletions(-) diff --git a/Sources/GRPCInterceptors/Tracing/SpanAttributes+GRPCTracingKeys.swift b/Sources/GRPCInterceptors/Tracing/SpanAttributes+GRPCTracingKeys.swift index 6857914..186195a 100644 --- a/Sources/GRPCInterceptors/Tracing/SpanAttributes+GRPCTracingKeys.swift +++ b/Sources/GRPCInterceptors/Tracing/SpanAttributes+GRPCTracingKeys.swift @@ -65,13 +65,11 @@ extension Span { self.attributes[GRPCTracingKeys.serverPort] = port case .unixDomainSocket(let path): - self.attributes[GRPCTracingKeys.networkType] = "unix" self.attributes[GRPCTracingKeys.networkPeerAddress] = path - case .other(let address): - // We can't nicely format the span attributes to contain the appropriate information here, - // so include the whole thing as part of the peer address. - self.attributes[GRPCTracingKeys.networkPeerAddress] = address + case .none: + // We don't recognise this address format, so don't populate any fields. + () } } } @@ -80,9 +78,8 @@ package enum PeerAddress: Equatable { case ipv4(address: String, port: Int?) case ipv6(address: String, port: Int?) case unixDomainSocket(path: String) - case other(String) - package init(_ address: String) { + package init?(_ address: String) { // We expect this address to be of one of these formats: // - ipv4:: for ipv4 addresses // - ipv6:[]: for ipv6 addresses @@ -92,9 +89,8 @@ package enum PeerAddress: Equatable { let addressComponents = address.split(separator: ":", maxSplits: 1) guard addressComponents.count > 1 else { - // This is some unexpected/unknown format, so we have no way of splitting it up nicely. - self = .other(address) - return + // This is some unexpected/unknown format + return nil } // Check what type the transport is... @@ -104,20 +100,22 @@ package enum PeerAddress: Equatable { if ipv4AddressComponents.count == 2, let port = Int(ipv4AddressComponents[1]) { self = .ipv4(address: String(ipv4AddressComponents[0]), port: port) } else { - // This is some unexpected/unknown format, so we have no way of splitting it up nicely. - self = .other(address) + return nil } case "ipv6": - // At this point, we are looking at an address with format: [
]: - // We drop the first character ('[') and split by ']:' to keep two components: the address - // and the port. - let ipv6AddressComponents = addressComponents[1].dropFirst().split(separator: "]:") - if ipv6AddressComponents.count == 2, let port = Int(ipv6AddressComponents[1]) { - self = .ipv6(address: String(ipv6AddressComponents[0]), port: port) + if addressComponents[1].first == "[" { + // At this point, we are looking at an address with format: [
]: + // We drop the first character ('[') and split by ']:' to keep two components: the address + // and the port. + let ipv6AddressComponents = addressComponents[1].dropFirst().split(separator: "]:") + if ipv6AddressComponents.count == 2, let port = Int(ipv6AddressComponents[1]) { + self = .ipv6(address: String(ipv6AddressComponents[0]), port: port) + } else { + return nil + } } else { - // This is some unexpected/unknown format, so we have no way of splitting it up nicely. - self = .other(address) + return nil } case "unix": @@ -125,8 +123,8 @@ package enum PeerAddress: Equatable { self = .unixDomainSocket(path: String(addressComponents[1])) default: - // This is some unexpected/unknown format, so we have no way of splitting it up nicely. - self = .other(address) + // This is some unexpected/unknown format + return nil } } } diff --git a/Tests/GRPCInterceptorsTests/PeerAddressTests.swift b/Tests/GRPCInterceptorsTests/PeerAddressTests.swift index b7b4140..dc4249e 100644 --- a/Tests/GRPCInterceptorsTests/PeerAddressTests.swift +++ b/Tests/GRPCInterceptorsTests/PeerAddressTests.swift @@ -37,9 +37,23 @@ struct PeerAddressTests { #expect(address == .unixDomainSocket(path: "some-path")) } - @Test("Unrecognised addresses are correctly parsed") - func testOther() { - let address = PeerAddress("in-process:1234") - #expect(address == .other("in-process:1234")) + @Test( + "Unrecognised addresses return nil", + arguments: [ + "", + "unknown", + "in-process:1234", + "ipv4:", + "ipv4:1234", + "ipv6:", + "ipv6:123:456:789:123", + "ipv6:123:456:789]:123", + "ipv6:123:456:789]", + "unix", + ] + ) + func testOther(address: String) { + let address = PeerAddress(address) + #expect(address == nil) } } From 324a6ee5f165fa95523da7f64279ea7d0cd44090 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Thu, 23 Jan 2025 13:31:20 +0000 Subject: [PATCH 15/16] Fix test --- Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift | 1 - 1 file changed, 1 deletion(-) diff --git a/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift b/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift index ce29402..cead2ab 100644 --- a/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift +++ b/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift @@ -468,7 +468,6 @@ struct OTelTracingClientInterceptorTests { "server.address": "someserver.com", "network.peer.address": "some-path", "network.transport": "tcp", - "network.type": "unix", ] ) } From e160b26d0b1e5ab3d3c2ed7f67bdc2b350366583 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Thu, 23 Jan 2025 13:40:35 +0000 Subject: [PATCH 16/16] Depend on grpc-swift-protobuf main branch --- Package.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Package.swift b/Package.swift index db38820..63028e8 100644 --- a/Package.swift +++ b/Package.swift @@ -47,7 +47,7 @@ let dependencies: [Package.Dependency] = [ ), .package( url: "https://github.com/grpc/grpc-swift-protobuf.git", - exact: "1.0.0-beta.3" + branch: "main" ), .package( url: "https://github.com/apple/swift-protobuf.git",