Skip to content

Commit

Permalink
Add option to include metadata in server interceptor
Browse files Browse the repository at this point in the history
  • Loading branch information
gjcairo committed Jan 24, 2025
1 parent 13ea905 commit 89b90e7
Show file tree
Hide file tree
Showing 4 changed files with 266 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,11 @@ public struct ClientOTelTracingInterceptor: ClientInterceptor {
let messageReceivedCounter = Atomic(1)
hookedSequence = HookedRPCAsyncSequence(wrapping: success.bodyParts) { part in
switch part {
case .message(let message):
case .message:
var event = SpanEvent(name: "rpc.message")
event.attributes[GRPCTracingKeys.rpcMessageType] = "RECEIVED"
event.attributes[GRPCTracingKeys.rpcMessageID] =
messageReceivedCounter
messageReceivedCounter
.wrappingAdd(1, ordering: .sequentiallyConsistent)
.oldValue
span.addEvent(event)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,13 @@ package import Tracing
/// - https://opentelemetry.io/docs/specs/semconv/rpc/grpc/
public struct ServerOTelTracingInterceptor: ServerInterceptor {
private let extractor: ServerRequestExtractor
private let traceEachMessage: Bool
private var serverHostname: String
private var networkTransportMethod: String

private let traceEachMessage: Bool
private var includeRequestMetadata: Bool
private var includeResponseMetadata: Bool

/// Create a new instance of a ``ServerOTelTracingInterceptor``.
///
/// - Parameters:
Expand All @@ -42,15 +45,24 @@ public struct ServerOTelTracingInterceptor: ServerInterceptor {
/// `network.transport` attribute in spans.
/// - traceEachMessage: If `true`, each response part sent and request part received will be recorded as a separate
/// event in a tracing span.
/// - includeRequestMetadata: if `true`, **all** metadata keys with string values included in the request will be added to the span as attributes.
/// - includeResponseMetadata: if `true`, **all** metadata keys with string values included in the response will be added to the span as attributes.
///
/// - Important: Be careful when setting `includeRequestMetadata` or `includeResponseMetadata` to `true`,
/// as including all request/response metadata can be a security risk.
public init(
serverHostname: String,
networkTransportMethod: String,
traceEachMessage: Bool = true
traceEachMessage: Bool = true,
includeRequestMetadata: Bool = false,
includeResponseMetadata: Bool = false
) {
self.extractor = ServerRequestExtractor()
self.traceEachMessage = traceEachMessage
self.serverHostname = serverHostname
self.networkTransportMethod = networkTransportMethod
self.includeRequestMetadata = includeRequestMetadata
self.includeResponseMetadata = includeResponseMetadata
}

/// This interceptor will extract whatever `ServiceContext` key-value pairs have been inserted into the
Expand Down Expand Up @@ -109,6 +121,10 @@ public struct ServerOTelTracingInterceptor: ServerInterceptor {
networkTransportMethod: self.networkTransportMethod
)

if self.includeRequestMetadata {
span.setMetadataStringAttributesAsRequestSpanAttributes(request.metadata)
}

var request = request
if self.traceEachMessage {
let messageReceivedCounter = Atomic(1)
Expand All @@ -128,6 +144,10 @@ public struct ServerOTelTracingInterceptor: ServerInterceptor {

var response = try await next(request, context)

if self.includeResponseMetadata {
span.setMetadataStringAttributesAsResponseSpanAttributes(response.metadata)
}

switch response.accepted {
case .success(var success):
let wrappedProducer = success.producer
Expand All @@ -148,11 +168,15 @@ public struct ServerOTelTracingInterceptor: ServerInterceptor {
}
)

let wrappedResult = try await wrappedProducer(
let trailingMetadata = try await wrappedProducer(
RPCWriter(wrapping: eventEmittingWriter)
)

return wrappedResult
if self.includeResponseMetadata {
span.setMetadataStringAttributesAsResponseSpanAttributes(trailingMetadata)
}

return trailingMetadata
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ enum GRPCTracingKeys {
static let networkPeerPort = "network.peer.port"

fileprivate static let requestMetadataPrefix = "rpc.grpc.request.metadata."
fileprivate static let responseMetadataPrefix = "rpc.grpc.response.metadata."
}

extension Span {
Expand Down Expand Up @@ -134,6 +135,13 @@ extension Span {
)
}

func setMetadataStringAttributesAsResponseSpanAttributes(_ metadata: Metadata) {
self.setMetadataStringAttributesAsSpanAttributes(
metadata,
prefix: GRPCTracingKeys.responseMetadataPrefix
)
}

private func setMetadataStringAttributesAsSpanAttributes(_ metadata: Metadata, prefix: String) {
for (key, value) in metadata {
switch value {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@ struct OTelTracingClientInterceptorTests {
bodyParts: RPCAsyncSequence(
wrapping: AsyncThrowingStream<StreamingClientResponse.Contents.BodyPart, any Error> {
$0.yield(.message(["response"]))
$0.yield(
.trailingMetadata([
"some-repeated-response-metadata": "some-repeated-response-value1",
"some-repeated-response-metadata": "some-repeated-response-value2",
])
)
$0.finish()
}
)
Expand Down Expand Up @@ -349,10 +355,12 @@ struct OTelTracingClientInterceptorTests {
bodyParts: RPCAsyncSequence(
wrapping: AsyncThrowingStream<StreamingClientResponse.Contents.BodyPart, any Error> {
$0.yield(.message(["response"]))
$0.yield(.trailingMetadata([
"some-repeated-response-metadata": "some-repeated-response-value1",
"some-repeated-response-metadata": "some-repeated-response-value2"
]))
$0.yield(
.trailingMetadata([
"some-repeated-response-metadata": "some-repeated-response-value1",
"some-repeated-response-metadata": "some-repeated-response-value2",
])
)
$0.finish()
}
)
Expand Down Expand Up @@ -845,6 +853,222 @@ struct OTelTracingServerInterceptorTests {
}
}

@Test("All string-valued request metadata is included if opted-in")
func testRequestMetadataOptIn() async throws {
let methodDescriptor = MethodDescriptor(
fullyQualifiedService: "OTelTracingServerInterceptorTests",
method: "testRequestMetadataOptIn"
)
let interceptor = ServerOTelTracingInterceptor(
serverHostname: "someserver.com",
networkTransportMethod: "tcp",
includeRequestMetadata: true
)
let request = ServerRequest(
metadata: [
"some-request-metadata": "some-request-value",
"some-repeated-request-metadata": "some-repeated-request-value1",
"some-repeated-request-metadata": "some-repeated-request-value2",
"some-request-metadata-bin": .binary([1]),
],
message: [UInt8]()
)
let response = try await interceptor.intercept(
tracer: self.tracer,
request: .init(single: request),
context: ServerContext(
descriptor: methodDescriptor,
remotePeer: "ipv4:10.1.2.80:567",
localPeer: "ipv4:10.1.2.90:123",
cancellation: .init()
)
) { request, _ in
for try await _ in request.messages {
// We need to iterate over the messages for the span to be able to record the events.
}

return StreamingServerResponse<String>(
accepted: .success(
.init(
metadata: [
"some-response-metadata": "some-response-value",
"some-response-metadata-bin": .binary([2]),
],
producer: { writer in
try await writer.write("response1")
try await writer.write("response2")
return [
"some-repeated-response-metadata": "some-repeated-response-value1",
"some-repeated-response-metadata": "some-repeated-response-value2",
]
}
)
)
)
}

// Get the response out into a response stream, and assert its contents
let (responseStream, responseStreamContinuation) = AsyncStream<String>.makeStream()
let responseContents = try response.accepted.get()
let trailingMetadata = try await responseContents.producer(
RPCWriter(wrapping: TestWriter(streamContinuation: responseStreamContinuation))
)
responseStreamContinuation.finish()

#expect(
trailingMetadata == [
"some-repeated-response-metadata": "some-repeated-response-value1",
"some-repeated-response-metadata": "some-repeated-response-value2",
]
)
await assertStreamContentsEqual(["response1", "response2"], responseStream)

assertTestSpanComponents(forMethod: methodDescriptor, tracer: self.tracer) { events in
#expect(
events == [
// Recorded when request is received
TestSpanEvent("rpc.message", ["rpc.message.type": "RECEIVED", "rpc.message.id": 1]),
// Recorded when `response1` is sent
TestSpanEvent("rpc.message", ["rpc.message.type": "SENT", "rpc.message.id": 1]),
// Recorded when `response2` is sent
TestSpanEvent("rpc.message", ["rpc.message.type": "SENT", "rpc.message.id": 2]),
]
)
} assertAttributes: { attributes in
#expect(
attributes == [
"rpc.system": "grpc",
"rpc.method": .string(methodDescriptor.method),
"rpc.service": .string(methodDescriptor.service.fullyQualifiedService),
"server.address": "someserver.com",
"server.port": 123,
"network.peer.address": "10.1.2.90",
"network.peer.port": 123,
"network.transport": "tcp",
"network.type": "ipv4",
"client.address": "10.1.2.80",
"client.port": 567,
"rpc.grpc.request.metadata.some-request-metadata": "some-request-value",
"rpc.grpc.request.metadata.some-repeated-request-metadata": .stringArray([
"some-repeated-request-value1", "some-repeated-request-value2",
]),
]
)
} assertStatus: { status in
#expect(status == nil)
} assertErrors: { errors in
#expect(errors.isEmpty)
}
}

@Test("All string-valued response metadata is included if opted-in")
func testResponseMetadataOptIn() async throws {
let methodDescriptor = MethodDescriptor(
fullyQualifiedService: "OTelTracingServerInterceptorTests",
method: "testResponseMetadataOptIn"
)
let interceptor = ServerOTelTracingInterceptor(
serverHostname: "someserver.com",
networkTransportMethod: "tcp",
includeResponseMetadata: true
)
let request = ServerRequest(
metadata: [
"some-request-metadata": "some-request-value",
"some-repeated-request-metadata": "some-repeated-request-value1",
"some-repeated-request-metadata": "some-repeated-request-value2",
"some-request-metadata-bin": .binary([1]),
],
message: [UInt8]()
)
let response = try await interceptor.intercept(
tracer: self.tracer,
request: .init(single: request),
context: ServerContext(
descriptor: methodDescriptor,
remotePeer: "ipv4:10.1.2.80:567",
localPeer: "ipv4:10.1.2.90:123",
cancellation: .init()
)
) { request, _ in
for try await _ in request.messages {
// We need to iterate over the messages for the span to be able to record the events.
}

return StreamingServerResponse<String>(
accepted: .success(
.init(
metadata: [
"some-response-metadata": "some-response-value",
"some-response-metadata-bin": .binary([2]),
],
producer: { writer in
try await writer.write("response1")
try await writer.write("response2")
return [
"some-repeated-response-metadata": "some-repeated-response-value1",
"some-repeated-response-metadata": "some-repeated-response-value2",
]
}
)
)
)
}

// Get the response out into a response stream, and assert its contents
let (responseStream, responseStreamContinuation) = AsyncStream<String>.makeStream()
let responseContents = try response.accepted.get()
let trailingMetadata = try await responseContents.producer(
RPCWriter(wrapping: TestWriter(streamContinuation: responseStreamContinuation))
)
responseStreamContinuation.finish()

#expect(
trailingMetadata == [
"some-repeated-response-metadata": "some-repeated-response-value1",
"some-repeated-response-metadata": "some-repeated-response-value2",
]
)
await assertStreamContentsEqual(["response1", "response2"], responseStream)

assertTestSpanComponents(forMethod: methodDescriptor, tracer: self.tracer) { events in
#expect(
events == [
// Recorded when request is received
TestSpanEvent("rpc.message", ["rpc.message.type": "RECEIVED", "rpc.message.id": 1]),
// Recorded when `response1` is sent
TestSpanEvent("rpc.message", ["rpc.message.type": "SENT", "rpc.message.id": 1]),
// Recorded when `response2` is sent
TestSpanEvent("rpc.message", ["rpc.message.type": "SENT", "rpc.message.id": 2]),
]
)
} assertAttributes: { attributes in
#expect(
attributes == [
"rpc.system": "grpc",
"rpc.method": .string(methodDescriptor.method),
"rpc.service": .string(methodDescriptor.service.fullyQualifiedService),
"server.address": "someserver.com",
"server.port": 123,
"network.peer.address": "10.1.2.90",
"network.peer.port": 123,
"network.transport": "tcp",
"network.type": "ipv4",
"client.address": "10.1.2.80",
"client.port": 567,
"rpc.grpc.response.metadata.some-response-metadata": "some-response-value",
"rpc.grpc.response.metadata.some-repeated-response-metadata": .stringArray([
"some-repeated-response-value1", "some-repeated-response-value2",
]),
]
)
} assertStatus: { status in
#expect(status == nil)
} assertErrors: { errors in
#expect(errors.isEmpty)
}
}

@Test("RPC that throws is correctly recorded")
func testThrowingRPC() async throws {
let methodDescriptor = MethodDescriptor(
Expand Down

0 comments on commit 89b90e7

Please sign in to comment.