From 88d2ad60bb1b6ea0162eb8fa3cd2d006b17c4c55 Mon Sep 17 00:00:00 2001 From: Rajkumar Rangaraj Date: Tue, 26 Nov 2024 11:10:03 -0800 Subject: [PATCH 1/3] [otlp] Grpc Status check and retry (#6000) Co-authored-by: Mikel Blanchard --- .../ExportClient/BaseOtlpGrpcExportClient.cs | 8 +- .../ExportClient/ExportClientGrpcResponse.cs | 12 +- .../ExportClient/ExportClientResponse.cs | 3 - .../ExportClient/Grpc/GrpcProtocolHelpers.cs | 26 ++-- .../ExportClient/Grpc/Status.cs | 7 + .../ExportClient/OtlpGrpcLogExportClient.cs | 2 +- .../OtlpGrpcMetricsExportClient.cs | 2 +- .../ExportClient/OtlpGrpcTraceExportClient.cs | 2 +- .../Implementation/ExportClient/OtlpRetry.cs | 62 ++++++++- .../ProtobufOtlpGrpcExportClient.cs | 128 ++++++++++++++++-- .../ProtobufOtlpHttpExportClient.cs | 2 +- ...penTelemetryProtocolExporterEventSource.cs | 84 ++++++++++++ .../OtlpExporterTransmissionHandler.cs | 2 +- ...ProtobufOtlpExporterTransmissionHandler.cs | 8 +- .../MockCollectorIntegrationTests.cs | 77 ++++++----- .../OtlpRetryTests.cs | 2 +- 16 files changed, 348 insertions(+), 79 deletions(-) diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/BaseOtlpGrpcExportClient.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/BaseOtlpGrpcExportClient.cs index b585f2fc088..2eab9778852 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/BaseOtlpGrpcExportClient.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/BaseOtlpGrpcExportClient.cs @@ -13,7 +13,13 @@ namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClie /// Type of export request. internal abstract class BaseOtlpGrpcExportClient : IExportClient { - protected static readonly ExportClientGrpcResponse SuccessExportResponse = new ExportClientGrpcResponse(success: true, deadlineUtc: default, exception: null); + protected static readonly ExportClientGrpcResponse SuccessExportResponse + = new( + success: true, + deadlineUtc: default, + exception: null, + status: null, + grpcStatusDetailsHeader: null); protected BaseOtlpGrpcExportClient(OtlpExporterOptions options) { diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ExportClientGrpcResponse.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ExportClientGrpcResponse.cs index 4a96a7ad7cb..339e0ab78ad 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ExportClientGrpcResponse.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ExportClientGrpcResponse.cs @@ -1,6 +1,8 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 +using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient.Grpc; + namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient; internal sealed class ExportClientGrpcResponse : ExportClientResponse @@ -8,8 +10,16 @@ internal sealed class ExportClientGrpcResponse : ExportClientResponse public ExportClientGrpcResponse( bool success, DateTime deadlineUtc, - Exception? exception) + Exception? exception, + Status? status, + string? grpcStatusDetailsHeader) : base(success, deadlineUtc, exception) { + this.Status = status; + this.GrpcStatusDetailsHeader = grpcStatusDetailsHeader; } + + public Status? Status { get; } + + public string? GrpcStatusDetailsHeader { get; } } diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ExportClientResponse.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ExportClientResponse.cs index 3a14b537256..fcccd151920 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ExportClientResponse.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ExportClientResponse.cs @@ -1,8 +1,6 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -using System.Diagnostics.CodeAnalysis; - namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient; internal abstract class ExportClientResponse @@ -14,7 +12,6 @@ protected ExportClientResponse(bool success, DateTime deadlineUtc, Exception? ex this.DeadlineUtc = deadlineUtc; } - [MemberNotNullWhen(false, nameof(Exception))] public bool Success { get; } public Exception? Exception { get; } diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/Grpc/GrpcProtocolHelpers.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/Grpc/GrpcProtocolHelpers.cs index 0a7db69c391..29a0a1b4d39 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/Grpc/GrpcProtocolHelpers.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/Grpc/GrpcProtocolHelpers.cs @@ -15,7 +15,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -using System.Diagnostics.CodeAnalysis; #if NET462 using System.Net.Http; #endif @@ -27,38 +26,30 @@ internal static class GrpcProtocolHelpers { internal const string StatusTrailer = "grpc-status"; internal const string MessageTrailer = "grpc-message"; - internal const string CancelledDetail = "No grpc-status found on response."; - public static Status? GetResponseStatus(HttpHeaders trailingHeaders, HttpResponseMessage httpResponse) + public static Status GetResponseStatus(HttpResponseMessage httpResponse, HttpHeaders trailingHeaders) { - Status? status; try { - var result = trailingHeaders.Any() ? TryGetStatusCore(trailingHeaders, out status) : TryGetStatusCore(httpResponse.Headers, out status); - - if (!result) - { - status = new Status(StatusCode.Cancelled, CancelledDetail); - } + return trailingHeaders.Any() + ? GetStatusCore(trailingHeaders) + : GetStatusCore(httpResponse.Headers); } catch (Exception ex) { // Handle error from parsing badly formed status - status = new Status(StatusCode.Cancelled, ex.Message, ex); + return new Status(StatusCode.Internal, ex.Message, ex); } - - return status; } - public static bool TryGetStatusCore(HttpHeaders headers, [NotNullWhen(true)] out Status? status) + public static Status GetStatusCore(HttpHeaders headers) { var grpcStatus = GetHeaderValue(headers, StatusTrailer); // grpc-status is a required trailer if (grpcStatus == null) { - status = null; - return false; + return Status.NoReply; } int statusValue; @@ -79,8 +70,7 @@ public static bool TryGetStatusCore(HttpHeaders headers, [NotNullWhen(true)] out grpcMessage = Uri.UnescapeDataString(grpcMessage); } - status = new Status((StatusCode)statusValue, grpcMessage ?? string.Empty); - return true; + return new Status((StatusCode)statusValue, grpcMessage ?? string.Empty); } public static string? GetHeaderValue(HttpHeaders? headers, string name, bool first = false) diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/Grpc/Status.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/Grpc/Status.cs index 477177b130d..89445891970 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/Grpc/Status.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/Grpc/Status.cs @@ -25,6 +25,8 @@ namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClie [DebuggerDisplay("{DebuggerToString(),nq}")] internal struct Status { + public const string NoReplyDetailMessage = "No grpc-status found on response."; + /// /// Default result of a successful RPC. StatusCode=OK, empty details message. /// @@ -35,6 +37,11 @@ internal struct Status /// public static readonly Status DefaultCancelled = new Status(StatusCode.Cancelled, string.Empty); + /// + /// Default result of a cancelled RPC with no grpc-status found on response. + /// + public static readonly Status NoReply = new Status(StatusCode.Internal, NoReplyDetailMessage); + /// /// Initializes a new instance of the struct. /// diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpGrpcLogExportClient.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpGrpcLogExportClient.cs index e90f05ff5d2..dc9d31feeb3 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpGrpcLogExportClient.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpGrpcLogExportClient.cs @@ -39,7 +39,7 @@ public override ExportClientResponse SendExportRequest(OtlpCollector.ExportLogsS { OpenTelemetryProtocolExporterEventSource.Log.FailedToReachCollector(this.Endpoint, ex); - return new ExportClientGrpcResponse(success: false, deadlineUtc: deadlineUtc, exception: ex); + return new ExportClientGrpcResponse(success: false, deadlineUtc, ex, null, null); } } } diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpGrpcMetricsExportClient.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpGrpcMetricsExportClient.cs index d3c498648e5..b67b0789da0 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpGrpcMetricsExportClient.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpGrpcMetricsExportClient.cs @@ -39,7 +39,7 @@ public override ExportClientResponse SendExportRequest(OtlpCollector.ExportMetri { OpenTelemetryProtocolExporterEventSource.Log.FailedToReachCollector(this.Endpoint, ex); - return new ExportClientGrpcResponse(success: false, deadlineUtc: deadlineUtc, exception: ex); + return new ExportClientGrpcResponse(false, deadlineUtc, ex, null, null); } } } diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpGrpcTraceExportClient.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpGrpcTraceExportClient.cs index b29c3f91275..b30efe6de18 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpGrpcTraceExportClient.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpGrpcTraceExportClient.cs @@ -39,7 +39,7 @@ public override ExportClientResponse SendExportRequest(OtlpCollector.ExportTrace { OpenTelemetryProtocolExporterEventSource.Log.FailedToReachCollector(this.Endpoint, ex); - return new ExportClientGrpcResponse(success: false, deadlineUtc: deadlineUtc, exception: ex); + return new ExportClientGrpcResponse(false, deadlineUtc, ex, null, null); } } } diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpRetry.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpRetry.cs index 984cc91c158..2639333bce7 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpRetry.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpRetry.cs @@ -83,12 +83,54 @@ public static bool ShouldHandleHttpRequestException(Exception? exception) public static bool TryGetGrpcRetryResult(ExportClientGrpcResponse response, int retryDelayMilliseconds, out RetryResult retryResult) { + retryResult = default; + if (response.Exception is RpcException rpcException) { return TryGetRetryResult(rpcException.StatusCode, IsGrpcStatusCodeRetryable, response.DeadlineUtc, rpcException.Trailers, TryGetGrpcRetryDelay, retryDelayMilliseconds, out retryResult); } + else if (response.Status != null) + { + var nextRetryDelayMilliseconds = retryDelayMilliseconds; + + if (IsDeadlineExceeded(response.DeadlineUtc)) + { + return false; + } + + var throttleDelay = Grpc.GrpcStatusDeserializer.TryGetGrpcRetryDelay(response.GrpcStatusDetailsHeader); + var retryable = IsGrpcStatusCodeRetryable(response.Status.Value.StatusCode, throttleDelay.HasValue); + + if (!retryable) + { + return false; + } + + var delayDuration = throttleDelay ?? TimeSpan.FromMilliseconds(GetRandomNumber(0, nextRetryDelayMilliseconds)); + + if (IsDeadlineExceeded(response.DeadlineUtc + delayDuration)) + { + return false; + } + + if (throttleDelay.HasValue) + { + try + { + // TODO: Consider making nextRetryDelayMilliseconds a double to avoid the need for convert/overflow handling + nextRetryDelayMilliseconds = Convert.ToInt32(throttleDelay.Value.TotalMilliseconds); + } + catch (OverflowException) + { + nextRetryDelayMilliseconds = MaxBackoffMilliseconds; + } + } + + nextRetryDelayMilliseconds = CalculateNextRetryDelay(nextRetryDelayMilliseconds); + retryResult = new RetryResult(throttleDelay.HasValue, delayDuration, nextRetryDelayMilliseconds); + return true; + } - retryResult = default; return false; } @@ -216,6 +258,24 @@ private static bool IsGrpcStatusCodeRetryable(StatusCode statusCode, bool hasRet } } + private static bool IsGrpcStatusCodeRetryable(Grpc.StatusCode statusCode, bool hasRetryDelay) + { + switch (statusCode) + { + case Grpc.StatusCode.Cancelled: + case Grpc.StatusCode.DeadlineExceeded: + case Grpc.StatusCode.Aborted: + case Grpc.StatusCode.OutOfRange: + case Grpc.StatusCode.Unavailable: + case Grpc.StatusCode.DataLoss: + return true; + case Grpc.StatusCode.ResourceExhausted: + return hasRetryDelay; + default: + return false; + } + } + private static bool IsHttpStatusCodeRetryable(HttpStatusCode statusCode, bool hasRetryDelay) { switch (statusCode) diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ProtobufOtlpGrpcExportClient.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ProtobufOtlpGrpcExportClient.cs index c4f5ab5fcc6..0cabbcb53ac 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ProtobufOtlpGrpcExportClient.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ProtobufOtlpGrpcExportClient.cs @@ -5,7 +5,7 @@ using System.Net.Http; #endif using System.Net.Http.Headers; -using Grpc.Core; +using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient.Grpc; using OpenTelemetry.Internal; namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient; @@ -13,10 +13,19 @@ namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClie /// Base class for sending OTLP export request over gRPC. internal sealed class ProtobufOtlpGrpcExportClient : IProtobufExportClient { + public const string GrpcStatusDetailsHeader = "grpc-status-details-bin"; private static readonly ExportClientHttpResponse SuccessExportResponse = new(success: true, deadlineUtc: default, response: null, exception: null); private static readonly MediaTypeHeaderValue MediaHeaderValue = new("application/grpc"); private static readonly Version Http2RequestVersion = new(2, 0); + private static readonly ExportClientGrpcResponse DefaultExceptionExportClientGrpcResponse + = new( + success: false, + deadlineUtc: default, + exception: null, + status: null, + grpcStatusDetailsHeader: null); + public ProtobufOtlpGrpcExportClient(OtlpExporterOptions options, HttpClient httpClient, string signalPath) { Guard.ThrowIfNull(options); @@ -44,25 +53,120 @@ public ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, try { using var httpRequest = this.CreateHttpRequest(buffer, contentLength); - using var httpResponse = this.SendHttpRequest(httpRequest, cancellationToken); try { httpResponse.EnsureSuccessStatusCode(); } - catch (HttpRequestException ex) + catch (HttpRequestException) + { + throw; + } + + var trailingHeaders = httpResponse.TrailingHeaders(); + Status status = GrpcProtocolHelpers.GetResponseStatus(httpResponse, trailingHeaders); + + if (status.Detail.Equals(Status.NoReplyDetailMessage)) + { + using var responseStream = httpResponse.Content.ReadAsStreamAsync().GetAwaiter().GetResult(); + int firstByte = responseStream.ReadByte(); + + if (firstByte == -1) + { + if (status.StatusCode == StatusCode.OK) + { + status = new Status(StatusCode.Internal, "Failed to deserialize response message."); + } + + OpenTelemetryProtocolExporterEventSource.Log.ResponseDeserializationFailed(this.Endpoint.ToString()); + + return new ExportClientGrpcResponse( + success: false, + deadlineUtc: deadlineUtc, + exception: null, + status: status, + grpcStatusDetailsHeader: null); + } + + // Note: Trailing headers might not be fully available until the + // response stream is consumed. gRPC often sends critical + // information like error details or final statuses in trailing + // headers which can only be reliably accessed after reading + // the response body. + trailingHeaders = httpResponse.TrailingHeaders(); + status = GrpcProtocolHelpers.GetResponseStatus(httpResponse, trailingHeaders); + } + + if (status.StatusCode == StatusCode.OK) + { + OpenTelemetryProtocolExporterEventSource.Log.ExportSuccess(this.Endpoint.ToString(), "Export completed successfully."); + return SuccessExportResponse; + } + + string? grpcStatusDetailsHeader = null; + if (status.StatusCode == StatusCode.ResourceExhausted || status.StatusCode == StatusCode.Unavailable) { - return new ExportClientHttpResponse(success: false, deadlineUtc: deadlineUtc, response: httpResponse, ex); + grpcStatusDetailsHeader = GrpcProtocolHelpers.GetHeaderValue(trailingHeaders, GrpcStatusDetailsHeader); } - // TODO: Hande retries & failures. - return SuccessExportResponse; + OpenTelemetryProtocolExporterEventSource.Log.ExportFailure(this.Endpoint.ToString(), "Export failed due to unexpected status code."); + + return new ExportClientGrpcResponse( + success: false, + deadlineUtc: deadlineUtc, + exception: null, + status: status, + grpcStatusDetailsHeader: grpcStatusDetailsHeader); + } + catch (HttpRequestException ex) when (ex.InnerException is TimeoutException || IsTransientNetworkError(ex)) + { + // Handle transient HTTP errors (retryable) + OpenTelemetryProtocolExporterEventSource.Log.TransientHttpError(this.Endpoint, ex); + return new ExportClientGrpcResponse( + success: false, + deadlineUtc: deadlineUtc, + exception: ex, + status: new Status(StatusCode.Unavailable, "Transient HTTP error - retryable"), + grpcStatusDetailsHeader: null); + } + catch (HttpRequestException ex) + { + // Handle non-retryable HTTP errors. + OpenTelemetryProtocolExporterEventSource.Log.HttpRequestFailed(this.Endpoint, ex); + return new ExportClientGrpcResponse( + success: false, + deadlineUtc: deadlineUtc, + exception: ex, + status: null, + grpcStatusDetailsHeader: null); + } + catch (OperationCanceledException ex) when (!cancellationToken.IsCancellationRequested) + { + // Handle unexpected cancellation. + OpenTelemetryProtocolExporterEventSource.Log.OperationUnexpectedlyCanceled(this.Endpoint, ex); + return new ExportClientGrpcResponse( + success: false, + deadlineUtc: deadlineUtc, + exception: ex, + status: new Status(StatusCode.Cancelled, "Operation was canceled unexpectedly."), + grpcStatusDetailsHeader: null); + } + catch (TaskCanceledException ex) when (ex.InnerException is TimeoutException) + { + // Handle TaskCanceledException caused by TimeoutException. + OpenTelemetryProtocolExporterEventSource.Log.RequestTimedOut(this.Endpoint, ex); + return new ExportClientGrpcResponse( + success: false, + deadlineUtc: deadlineUtc, + exception: ex, + status: new Status(StatusCode.DeadlineExceeded, "Request timed out."), + grpcStatusDetailsHeader: null); } - catch (RpcException ex) + catch (Exception ex) { OpenTelemetryProtocolExporterEventSource.Log.FailedToReachCollector(this.Endpoint, ex); - return new ExportClientGrpcResponse(success: false, deadlineUtc: deadlineUtc, exception: ex); + return DefaultExceptionExportClientGrpcResponse; } } @@ -99,4 +203,12 @@ public bool Shutdown(int timeoutMilliseconds) this.HttpClient.CancelPendingRequests(); return true; } + + private static bool IsTransientNetworkError(HttpRequestException ex) + { + return ex.InnerException is System.Net.Sockets.SocketException socketEx && + (socketEx.SocketErrorCode == System.Net.Sockets.SocketError.TimedOut || + socketEx.SocketErrorCode == System.Net.Sockets.SocketError.ConnectionReset || + socketEx.SocketErrorCode == System.Net.Sockets.SocketError.HostUnreachable); + } } diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ProtobufOtlpHttpExportClient.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ProtobufOtlpHttpExportClient.cs index 41ae58d7b69..118d428dcb5 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ProtobufOtlpHttpExportClient.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ProtobufOtlpHttpExportClient.cs @@ -51,7 +51,6 @@ public ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, try { using var httpRequest = this.CreateHttpRequest(buffer, contentLength); - using var httpResponse = this.SendHttpRequest(httpRequest, cancellationToken); try @@ -60,6 +59,7 @@ public ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, } catch (HttpRequestException ex) { + OpenTelemetryProtocolExporterEventSource.Log.HttpRequestFailed(this.Endpoint, ex); return new ExportClientHttpResponse(success: false, deadlineUtc: deadlineUtc, response: httpResponse, ex); } diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/OpenTelemetryProtocolExporterEventSource.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/OpenTelemetryProtocolExporterEventSource.cs index 721fc7359e2..e9544cf981e 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/OpenTelemetryProtocolExporterEventSource.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/OpenTelemetryProtocolExporterEventSource.cs @@ -49,6 +49,42 @@ public void RetryStoredRequestException(Exception ex) } } + [NonEvent] + public void TransientHttpError(Uri endpoint, Exception ex) + { + if (Log.IsEnabled(EventLevel.Warning, EventKeywords.All)) + { + this.TransientHttpError(endpoint.ToString(), ex.ToInvariantString()); + } + } + + [NonEvent] + public void HttpRequestFailed(Uri endpoint, Exception ex) + { + if (Log.IsEnabled(EventLevel.Error, EventKeywords.All)) + { + this.HttpRequestFailed(endpoint.ToString(), ex.ToInvariantString()); + } + } + + [NonEvent] + public void OperationUnexpectedlyCanceled(Uri endpoint, Exception ex) + { + if (Log.IsEnabled(EventLevel.Warning, EventKeywords.All)) + { + this.OperationUnexpectedlyCanceled(endpoint.ToString(), ex.ToInvariantString()); + } + } + + [NonEvent] + public void RequestTimedOut(Uri endpoint, Exception ex) + { + if (Log.IsEnabled(EventLevel.Warning, EventKeywords.All)) + { + this.RequestTimedOut(endpoint.ToString(), ex.ToInvariantString()); + } + } + [Event(2, Message = "Exporter failed send data to collector to {0} endpoint. Data will not be sent. Exception: {1}", Level = EventLevel.Error)] public void FailedToReachCollector(string rawCollectorUri, string ex) { @@ -121,6 +157,54 @@ public void BufferResizeFailedDueToMemory(string signalType) this.WriteEvent(15, signalType); } + [Event(16, Message = "Transient HTTP error occurred when communicating with {0}. Exception: {1}", Level = EventLevel.Warning)] + public void TransientHttpError(string endpoint, string exceptionMessage) + { + this.WriteEvent(16, endpoint, exceptionMessage); + } + + [Event(17, Message = "HTTP request to {0} failed. Exception: {1}", Level = EventLevel.Error)] + public void HttpRequestFailed(string endpoint, string exceptionMessage) + { + this.WriteEvent(17, endpoint, exceptionMessage); + } + + [Event(18, Message = "Operation unexpectedly canceled for endpoint {0}. Exception: {1}", Level = EventLevel.Warning)] + public void OperationUnexpectedlyCanceled(string endpoint, string exceptionMessage) + { + this.WriteEvent(18, endpoint, exceptionMessage); + } + + [Event(19, Message = "Request to endpoint {0} timed out. Exception: {1}", Level = EventLevel.Warning)] + public void RequestTimedOut(string endpoint, string exceptionMessage) + { + this.WriteEvent(19, endpoint, exceptionMessage); + } + + [Event(20, Message = "Failed to deserialize response from {0}.", Level = EventLevel.Error)] + public void ResponseDeserializationFailed(string endpoint) + { + this.WriteEvent(20, endpoint); + } + + [Event(21, Message = "Export succeeded for {0}. Message: {1}", Level = EventLevel.Informational)] + public void ExportSuccess(string endpoint, string message) + { + this.WriteEvent(21, endpoint, message); + } + + [Event(22, Message = "Export encountered GRPC status warning for {0}. Status code: {1}", Level = EventLevel.Warning)] + public void GrpcStatusWarning(string endpoint, string statusCode) + { + this.WriteEvent(22, endpoint, statusCode); + } + + [Event(23, Message = "Export failed for {0}. Message: {1}", Level = EventLevel.Error)] + public void ExportFailure(string endpoint, string message) + { + this.WriteEvent(23, endpoint, message); + } + void IConfigurationExtensionsLogger.LogInvalidConfigurationValue(string key, string value) { this.InvalidConfigurationValue(key, value); diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Transmission/OtlpExporterTransmissionHandler.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Transmission/OtlpExporterTransmissionHandler.cs index 12b76bdf83e..9ecb6c4785f 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Transmission/OtlpExporterTransmissionHandler.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Transmission/OtlpExporterTransmissionHandler.cs @@ -121,7 +121,7 @@ protected bool TryRetryRequest(TRequest request, DateTime deadlineUtc, out Expor response = this.ExportClient.SendExportRequest(request, deadlineUtc); if (!response.Success) { - OpenTelemetryProtocolExporterEventSource.Log.ExportMethodException(response.Exception, isRetry: true); + OpenTelemetryProtocolExporterEventSource.Log.ExportMethodException(response.Exception!, isRetry: true); return false; } diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Transmission/ProtobufOtlpExporterTransmissionHandler.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Transmission/ProtobufOtlpExporterTransmissionHandler.cs index db7ef77f74c..70dad49f9cd 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Transmission/ProtobufOtlpExporterTransmissionHandler.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Transmission/ProtobufOtlpExporterTransmissionHandler.cs @@ -119,13 +119,7 @@ protected virtual void OnShutdown(int timeoutMilliseconds) protected bool TryRetryRequest(byte[] request, int contentLength, DateTime deadlineUtc, out ExportClientResponse response) { response = this.ExportClient.SendExportRequest(request, contentLength, deadlineUtc); - if (!response.Success) - { - OpenTelemetryProtocolExporterEventSource.Log.ExportMethodException(response.Exception, isRetry: true); - return false; - } - - return true; + return response.Success; } /// diff --git a/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/MockCollectorIntegrationTests.cs b/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/MockCollectorIntegrationTests.cs index 27b64e08f63..c9e2c19ae09 100644 --- a/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/MockCollectorIntegrationTests.cs +++ b/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/MockCollectorIntegrationTests.cs @@ -5,7 +5,6 @@ using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Net; -using Google.Protobuf; using Grpc.Core; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; @@ -75,7 +74,7 @@ public async Task TestRecoveryAfterFailedExport() await httpClient.GetAsync($"/MockCollector/SetResponseCodes/{string.Join(",", codes.Select(x => (int)x))}"); var exportResults = new List(); - using var otlpExporter = new OtlpTraceExporter(new OtlpExporterOptions() { Endpoint = new Uri($"http://localhost:{testGrpcPort}") }); + using var otlpExporter = new ProtobufOtlpTraceExporter(new OtlpExporterOptions() { Endpoint = new Uri($"http://localhost:{testGrpcPort}") }); var delegatingExporter = new DelegatingExporter { OnExportFunc = (batch) => @@ -180,10 +179,14 @@ public async Task GrpcRetryTests(bool useRetryTransmissionHandler, ExportResult var exporterOptions = new OtlpExporterOptions() { Endpoint = endpoint, TimeoutMilliseconds = 20000, Protocol = OtlpExportProtocol.Grpc }; var configuration = new ConfigurationBuilder() - .AddInMemoryCollection(new Dictionary { [ExperimentalOptions.OtlpRetryEnvVar] = useRetryTransmissionHandler ? "in_memory" : null }) - .Build(); + .AddInMemoryCollection(new Dictionary + { + [ExperimentalOptions.OtlpRetryEnvVar] = useRetryTransmissionHandler ? "in_memory" : null, + [ExperimentalOptions.OtlpUseCustomSerializer] = "true", + }) + .Build(); - using var otlpExporter = new OtlpTraceExporter(exporterOptions, new SdkLimitOptions(), new ExperimentalOptions(configuration)); + using var otlpExporter = new ProtobufOtlpTraceExporter(exporterOptions, new SdkLimitOptions(), new ExperimentalOptions(configuration)); var activitySourceName = "otel.grpc.retry.test"; using var source = new ActivitySource(activitySourceName); @@ -265,10 +268,14 @@ public async Task HttpRetryTests(bool useRetryTransmissionHandler, ExportResult var exporterOptions = new OtlpExporterOptions() { Endpoint = endpoint, TimeoutMilliseconds = 20000, Protocol = OtlpExportProtocol.HttpProtobuf }; var configuration = new ConfigurationBuilder() - .AddInMemoryCollection(new Dictionary { [ExperimentalOptions.OtlpRetryEnvVar] = useRetryTransmissionHandler ? "in_memory" : null }) - .Build(); + .AddInMemoryCollection(new Dictionary + { + [ExperimentalOptions.OtlpRetryEnvVar] = useRetryTransmissionHandler ? "in_memory" : null, + [ExperimentalOptions.OtlpUseCustomSerializer] = "true", + }) + .Build(); - using var otlpExporter = new OtlpTraceExporter(exporterOptions, new SdkLimitOptions(), new ExperimentalOptions(configuration)); + using var otlpExporter = new ProtobufOtlpTraceExporter(exporterOptions, new SdkLimitOptions(), new ExperimentalOptions(configuration)); var activitySourceName = "otel.http.retry.test"; using var source = new ActivitySource(activitySourceName); @@ -347,31 +354,32 @@ public async Task HttpPersistentStorageRetryTests(bool usePersistentStorageTrans var exporterOptions = new OtlpExporterOptions() { Endpoint = endpoint, TimeoutMilliseconds = 20000 }; - var exportClient = new OtlpHttpTraceExportClient(exporterOptions, new HttpClient()); + var exportClient = new ProtobufOtlpHttpExportClient(exporterOptions, new HttpClient(), "/v1/traces"); // TODO: update this to configure via experimental environment variable. - OtlpExporterTransmissionHandler transmissionHandler; + ProtobufOtlpExporterTransmissionHandler transmissionHandler; MockFileProvider? mockProvider = null; if (usePersistentStorageTransmissionHandler) { mockProvider = new MockFileProvider(); - transmissionHandler = new OtlpExporterPersistentStorageTransmissionHandler( + transmissionHandler = new ProtobufOtlpExporterPersistentStorageTransmissionHandler( mockProvider, exportClient, - exporterOptions.TimeoutMilliseconds, - (byte[] data) => - { - var request = new ExportTraceServiceRequest(); - request.MergeFrom(data); - return request; - }); + exporterOptions.TimeoutMilliseconds); } else { - transmissionHandler = new OtlpExporterTransmissionHandler(exportClient, exporterOptions.TimeoutMilliseconds); + transmissionHandler = new ProtobufOtlpExporterTransmissionHandler(exportClient, exporterOptions.TimeoutMilliseconds); } - using var otlpExporter = new OtlpTraceExporter(exporterOptions, new(), new(), transmissionHandler); + var configuration = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary + { + [ExperimentalOptions.OtlpUseCustomSerializer] = "true", + }) + .Build(); + + using var otlpExporter = new ProtobufOtlpTraceExporter(exporterOptions, new(), new(configuration), transmissionHandler); var activitySourceName = "otel.http.persistent.storage.retry.test"; using var source = new ActivitySource(activitySourceName); @@ -397,7 +405,7 @@ public async Task HttpPersistentStorageRetryTests(bool usePersistentStorageTrans Assert.Single(mockProvider!.TryGetBlobs()); // Force Retry - Assert.True((transmissionHandler as OtlpExporterPersistentStorageTransmissionHandler)!.InitiateAndWaitForRetryProcess(-1)); + Assert.True((transmissionHandler as ProtobufOtlpExporterPersistentStorageTransmissionHandler)?.InitiateAndWaitForRetryProcess(-1)); Assert.False(mockProvider.TryGetBlob(out _)); } @@ -486,31 +494,32 @@ public async Task GrpcPersistentStorageRetryTests(bool usePersistentStorageTrans var exporterOptions = new OtlpExporterOptions() { Endpoint = endpoint, TimeoutMilliseconds = 20000 }; - var exportClient = new OtlpGrpcTraceExportClient(exporterOptions); + var exportClient = new ProtobufOtlpGrpcExportClient(exporterOptions, new HttpClient(), "opentelemetry.proto.collector.trace.v1.TraceService/Export"); // TODO: update this to configure via experimental environment variable. - OtlpExporterTransmissionHandler transmissionHandler; + ProtobufOtlpExporterTransmissionHandler transmissionHandler; MockFileProvider? mockProvider = null; if (usePersistentStorageTransmissionHandler) { mockProvider = new MockFileProvider(); - transmissionHandler = new OtlpExporterPersistentStorageTransmissionHandler( + transmissionHandler = new ProtobufOtlpExporterPersistentStorageTransmissionHandler( mockProvider, exportClient, - exporterOptions.TimeoutMilliseconds, - (byte[] data) => - { - var request = new ExportTraceServiceRequest(); - request.MergeFrom(data); - return request; - }); + exporterOptions.TimeoutMilliseconds); } else { - transmissionHandler = new OtlpExporterTransmissionHandler(exportClient, exporterOptions.TimeoutMilliseconds); + transmissionHandler = new ProtobufOtlpExporterTransmissionHandler(exportClient, exporterOptions.TimeoutMilliseconds); } - using var otlpExporter = new OtlpTraceExporter(exporterOptions, new(), new(), transmissionHandler); + var configuration = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary + { + [ExperimentalOptions.OtlpUseCustomSerializer] = "true", + }) + .Build(); + + using var otlpExporter = new ProtobufOtlpTraceExporter(exporterOptions, new(), new(configuration), transmissionHandler); var activitySourceName = "otel.grpc.persistent.storage.retry.test"; using var source = new ActivitySource(activitySourceName); @@ -536,7 +545,7 @@ public async Task GrpcPersistentStorageRetryTests(bool usePersistentStorageTrans Assert.Single(mockProvider.TryGetBlobs()); // Force Retry - Assert.True((transmissionHandler as OtlpExporterPersistentStorageTransmissionHandler)!.InitiateAndWaitForRetryProcess(-1)); + Assert.True((transmissionHandler as ProtobufOtlpExporterPersistentStorageTransmissionHandler)?.InitiateAndWaitForRetryProcess(-1)); Assert.False(mockProvider.TryGetBlob(out _)); } diff --git a/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/OtlpRetryTests.cs b/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/OtlpRetryTests.cs index 23ba18d0f6e..e72faee429c 100644 --- a/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/OtlpRetryTests.cs +++ b/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/OtlpRetryTests.cs @@ -234,7 +234,7 @@ public GrpcRetryAttempt( this.ThrottleDelay = throttleDelay != null ? throttleDelay.ToTimeSpan() : null; - this.Response = new ExportClientGrpcResponse(expectedSuccess, deadlineUtc, rpcException); + this.Response = new ExportClientGrpcResponse(expectedSuccess, deadlineUtc, rpcException, null, null); this.ExpectedNextRetryDelayMilliseconds = expectedNextRetryDelayMilliseconds; From 7eeddf5f2298a548f5f2afa14599ece953e17d25 Mon Sep 17 00:00:00 2001 From: Mikel Blanchard Date: Wed, 27 Nov 2024 11:37:55 -0800 Subject: [PATCH 2/3] [otlp] Refactor shared protobuf otlp export client code into a base class (#6001) --- .../ExportClient/ProtobufOtlpExportClient.cs | 100 ++++++++++++++++++ .../ProtobufOtlpGrpcExportClient.cs | 79 +++----------- .../ProtobufOtlpHttpExportClient.cs | 66 +----------- 3 files changed, 118 insertions(+), 127 deletions(-) create mode 100644 src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ProtobufOtlpExportClient.cs diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ProtobufOtlpExportClient.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ProtobufOtlpExportClient.cs new file mode 100644 index 00000000000..519afcd548a --- /dev/null +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ProtobufOtlpExportClient.cs @@ -0,0 +1,100 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#if NETFRAMEWORK +using System.Net.Http; +#endif +using System.Net.Http.Headers; +using OpenTelemetry.Internal; + +namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient; + +internal abstract class ProtobufOtlpExportClient : IProtobufExportClient +{ + private static readonly Version Http2RequestVersion = new(2, 0); + +#if NET + private static readonly bool SynchronousSendSupportedByCurrentPlatform; + + static ProtobufOtlpExportClient() + { +#if NET + // See: https://github.com/dotnet/runtime/blob/280f2a0c60ce0378b8db49adc0eecc463d00fe5d/src/libraries/System.Net.Http/src/System/Net/Http/HttpClientHandler.AnyMobile.cs#L767 + SynchronousSendSupportedByCurrentPlatform = !OperatingSystem.IsAndroid() + && !OperatingSystem.IsIOS() + && !OperatingSystem.IsTvOS() + && !OperatingSystem.IsBrowser(); +#endif + } +#endif + + protected ProtobufOtlpExportClient(OtlpExporterOptions options, HttpClient httpClient, string signalPath) + { + Guard.ThrowIfNull(options); + Guard.ThrowIfNull(httpClient); + Guard.ThrowIfNull(signalPath); + + Uri exporterEndpoint = options.Endpoint.AppendPathIfNotPresent(signalPath); + this.Endpoint = new UriBuilder(exporterEndpoint).Uri; + this.Headers = options.GetHeaders>((d, k, v) => d.Add(k, v)); + this.HttpClient = httpClient; + } + + internal HttpClient HttpClient { get; } + + internal Uri Endpoint { get; } + + internal IReadOnlyDictionary Headers { get; } + + internal abstract MediaTypeHeaderValue MediaTypeHeader { get; } + + internal virtual bool RequireHttp2 => false; + + public abstract ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default); + + /// + public bool Shutdown(int timeoutMilliseconds) + { + this.HttpClient.CancelPendingRequests(); + return true; + } + + protected HttpRequestMessage CreateHttpRequest(byte[] buffer, int contentLength) + { + var request = new HttpRequestMessage(HttpMethod.Post, this.Endpoint); + + if (this.RequireHttp2) + { + request.Version = Http2RequestVersion; + +#if NET6_0_OR_GREATER + request.VersionPolicy = HttpVersionPolicy.RequestVersionExact; +#endif + } + + foreach (var header in this.Headers) + { + request.Headers.Add(header.Key, header.Value); + } + + // TODO: Support compression. + + request.Content = new ByteArrayContent(buffer, 0, contentLength); + request.Content.Headers.ContentType = this.MediaTypeHeader; + + return request; + } + + protected HttpResponseMessage SendHttpRequest(HttpRequestMessage request, CancellationToken cancellationToken) + { +#if NET + // Note: SendAsync must be used with HTTP/2 because synchronous send is + // not supported. + return this.RequireHttp2 || !SynchronousSendSupportedByCurrentPlatform + ? this.HttpClient.SendAsync(request, cancellationToken).GetAwaiter().GetResult() + : this.HttpClient.Send(request, cancellationToken); +#else + return this.HttpClient.SendAsync(request, cancellationToken).GetAwaiter().GetResult(); +#endif + } +} diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ProtobufOtlpGrpcExportClient.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ProtobufOtlpGrpcExportClient.cs index 0cabbcb53ac..d70149e8344 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ProtobufOtlpGrpcExportClient.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ProtobufOtlpGrpcExportClient.cs @@ -6,17 +6,15 @@ #endif using System.Net.Http.Headers; using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient.Grpc; -using OpenTelemetry.Internal; namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient; /// Base class for sending OTLP export request over gRPC. -internal sealed class ProtobufOtlpGrpcExportClient : IProtobufExportClient +internal sealed class ProtobufOtlpGrpcExportClient : ProtobufOtlpExportClient { public const string GrpcStatusDetailsHeader = "grpc-status-details-bin"; private static readonly ExportClientHttpResponse SuccessExportResponse = new(success: true, deadlineUtc: default, response: null, exception: null); private static readonly MediaTypeHeaderValue MediaHeaderValue = new("application/grpc"); - private static readonly Version Http2RequestVersion = new(2, 0); private static readonly ExportClientGrpcResponse DefaultExceptionExportClientGrpcResponse = new( @@ -27,49 +25,34 @@ private static readonly ExportClientGrpcResponse DefaultExceptionExportClientGrp grpcStatusDetailsHeader: null); public ProtobufOtlpGrpcExportClient(OtlpExporterOptions options, HttpClient httpClient, string signalPath) + : base(options, httpClient, signalPath) { - Guard.ThrowIfNull(options); - Guard.ThrowIfNull(httpClient); - Guard.ThrowIfNull(signalPath); - Guard.ThrowIfInvalidTimeout(options.TimeoutMilliseconds); - - Uri exporterEndpoint = options.Endpoint.AppendPathIfNotPresent(signalPath); - this.Endpoint = new UriBuilder(exporterEndpoint).Uri; - this.Headers = options.GetHeaders>((d, k, v) => d.Add(k, v)); - this.HttpClient = httpClient; } - internal HttpClient HttpClient { get; } + internal override MediaTypeHeaderValue MediaTypeHeader => MediaHeaderValue; - internal Uri Endpoint { get; set; } - - internal IReadOnlyDictionary Headers { get; } - - internal int TimeoutMilliseconds { get; } + internal override bool RequireHttp2 => true; /// - public ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default) + public override ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default) { try { using var httpRequest = this.CreateHttpRequest(buffer, contentLength); using var httpResponse = this.SendHttpRequest(httpRequest, cancellationToken); - try - { - httpResponse.EnsureSuccessStatusCode(); - } - catch (HttpRequestException) - { - throw; - } + httpResponse.EnsureSuccessStatusCode(); var trailingHeaders = httpResponse.TrailingHeaders(); Status status = GrpcProtocolHelpers.GetResponseStatus(httpResponse, trailingHeaders); if (status.Detail.Equals(Status.NoReplyDetailMessage)) { +#if NET + using var responseStream = httpResponse.Content.ReadAsStream(cancellationToken); +#else using var responseStream = httpResponse.Content.ReadAsStreamAsync().GetAwaiter().GetResult(); +#endif int firstByte = responseStream.ReadByte(); if (firstByte == -1) @@ -170,45 +153,11 @@ public ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, } } - public HttpRequestMessage CreateHttpRequest(byte[] buffer, int contentLength) - { - var request = new HttpRequestMessage(HttpMethod.Post, this.Endpoint); - request.Version = Http2RequestVersion; - -#if NET6_0_OR_GREATER - request.VersionPolicy = HttpVersionPolicy.RequestVersionExact; -#endif - - foreach (var header in this.Headers) - { - request.Headers.Add(header.Key, header.Value); - } - - // TODO: Support compression. - - request.Content = new ByteArrayContent(buffer, 0, contentLength); - request.Content.Headers.ContentType = MediaHeaderValue; - - return request; - } - - public HttpResponseMessage SendHttpRequest(HttpRequestMessage request, CancellationToken cancellationToken) - { - return this.HttpClient.SendAsync(request, cancellationToken).GetAwaiter().GetResult(); - } - - /// - public bool Shutdown(int timeoutMilliseconds) - { - this.HttpClient.CancelPendingRequests(); - return true; - } - private static bool IsTransientNetworkError(HttpRequestException ex) { - return ex.InnerException is System.Net.Sockets.SocketException socketEx && - (socketEx.SocketErrorCode == System.Net.Sockets.SocketError.TimedOut || - socketEx.SocketErrorCode == System.Net.Sockets.SocketError.ConnectionReset || - socketEx.SocketErrorCode == System.Net.Sockets.SocketError.HostUnreachable); + return ex.InnerException is System.Net.Sockets.SocketException socketEx + && (socketEx.SocketErrorCode == System.Net.Sockets.SocketError.TimedOut + || socketEx.SocketErrorCode == System.Net.Sockets.SocketError.ConnectionReset + || socketEx.SocketErrorCode == System.Net.Sockets.SocketError.HostUnreachable); } } diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ProtobufOtlpHttpExportClient.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ProtobufOtlpHttpExportClient.cs index 118d428dcb5..3d3e62c7bf1 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ProtobufOtlpHttpExportClient.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ProtobufOtlpHttpExportClient.cs @@ -5,48 +5,24 @@ using System.Net.Http; #endif using System.Net.Http.Headers; -using OpenTelemetry.Internal; namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient; /// Class for sending OTLP trace export request over HTTP. -internal sealed class ProtobufOtlpHttpExportClient : IProtobufExportClient +internal sealed class ProtobufOtlpHttpExportClient : ProtobufOtlpExportClient { private static readonly MediaTypeHeaderValue MediaHeaderValue = new("application/x-protobuf"); private static readonly ExportClientHttpResponse SuccessExportResponse = new(success: true, deadlineUtc: default, response: null, exception: null); -#if NET - private readonly bool synchronousSendSupportedByCurrentPlatform; -#endif internal ProtobufOtlpHttpExportClient(OtlpExporterOptions options, HttpClient httpClient, string signalPath) + : base(options, httpClient, signalPath) { - Guard.ThrowIfNull(options); - Guard.ThrowIfNull(httpClient); - Guard.ThrowIfNull(signalPath); - Guard.ThrowIfInvalidTimeout(options.TimeoutMilliseconds); - - Uri exporterEndpoint = options.Endpoint.AppendPathIfNotPresent(signalPath); - this.Endpoint = new UriBuilder(exporterEndpoint).Uri; - this.Headers = options.GetHeaders>((d, k, v) => d.Add(k, v)); - this.HttpClient = httpClient; - -#if NET - // See: https://github.com/dotnet/runtime/blob/280f2a0c60ce0378b8db49adc0eecc463d00fe5d/src/libraries/System.Net.Http/src/System/Net/Http/HttpClientHandler.AnyMobile.cs#L767 - this.synchronousSendSupportedByCurrentPlatform = !OperatingSystem.IsAndroid() - && !OperatingSystem.IsIOS() - && !OperatingSystem.IsTvOS() - && !OperatingSystem.IsBrowser(); -#endif } - internal HttpClient HttpClient { get; } - - internal Uri Endpoint { get; set; } - - internal IReadOnlyDictionary Headers { get; } + internal override MediaTypeHeaderValue MediaTypeHeader => MediaHeaderValue; /// - public ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default) + public override ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default) { try { @@ -71,38 +47,4 @@ public ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, return new ExportClientHttpResponse(success: false, deadlineUtc: deadlineUtc, response: null, exception: ex); } } - - /// - public bool Shutdown(int timeoutMilliseconds) - { - this.HttpClient.CancelPendingRequests(); - return true; - } - - public HttpRequestMessage CreateHttpRequest(byte[] exportRequest, int contentLength) - { - var request = new HttpRequestMessage(HttpMethod.Post, this.Endpoint); - - foreach (var header in this.Headers) - { - request.Headers.Add(header.Key, header.Value); - } - - var content = new ByteArrayContent(exportRequest, 0, contentLength); - content.Headers.ContentType = MediaHeaderValue; - request.Content = content; - - return request; - } - - public HttpResponseMessage SendHttpRequest(HttpRequestMessage request, CancellationToken cancellationToken) - { -#if NET - return this.synchronousSendSupportedByCurrentPlatform - ? this.HttpClient.Send(request, cancellationToken) - : this.HttpClient.SendAsync(request, cancellationToken).GetAwaiter().GetResult(); -#else - return this.HttpClient.SendAsync(request, cancellationToken).GetAwaiter().GetResult(); -#endif - } } From 84e6afbebae67f4e9b498f1702edfe4ceccf34ec Mon Sep 17 00:00:00 2001 From: Rajkumar Rangaraj Date: Wed, 27 Nov 2024 14:28:38 -0800 Subject: [PATCH 3/3] [otlp] Replace the current trace implementation with the new one (#6003) --- .../Implementation/ActivityExtensions.cs | 445 ------------------ .../ExportClient/OtlpGrpcTraceExportClient.cs | 45 -- .../ExportClient/OtlpHttpTraceExportClient.cs | 69 --- .../ProtobufOtlpHttpExportClient.cs | 2 +- .../OtlpExporterOptionsExtensions.cs | 52 +- .../OtlpTraceExporter.cs | 49 +- .../OtlpTraceExporterHelperExtensions.cs | 11 +- .../ProtobufOtlpTraceExporter.cs | 102 ---- .../Exporter/OtlpGrpcExporterBenchmarks.cs | 3 +- .../Exporter/OtlpHttpExporterBenchmarks.cs | 3 +- .../OtlpHttpTraceExportClientTests.cs | 23 +- .../MockCollectorIntegrationTests.cs | 10 +- .../OtlpExporterOptionsExtensionsTests.cs | 50 +- .../OtlpTraceExporterTests.cs | 178 ++----- .../TestProtobufExportClient.cs | 40 ++ 15 files changed, 183 insertions(+), 899 deletions(-) delete mode 100644 src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ActivityExtensions.cs delete mode 100644 src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpGrpcTraceExportClient.cs delete mode 100644 src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpHttpTraceExportClient.cs delete mode 100644 src/OpenTelemetry.Exporter.OpenTelemetryProtocol/ProtobufOtlpTraceExporter.cs create mode 100644 test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/TestProtobufExportClient.cs diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ActivityExtensions.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ActivityExtensions.cs deleted file mode 100644 index 313a8e9f59b..00000000000 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ActivityExtensions.cs +++ /dev/null @@ -1,445 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -using System.Collections.Concurrent; -using System.Diagnostics; -using System.Runtime.CompilerServices; -using Google.Protobuf; -using OpenTelemetry.Internal; -using OpenTelemetry.Proto.Collector.Trace.V1; -using OpenTelemetry.Proto.Common.V1; -using OpenTelemetry.Proto.Resource.V1; -using OpenTelemetry.Proto.Trace.V1; -using OpenTelemetry.Trace; -using OtlpTrace = OpenTelemetry.Proto.Trace.V1; - -namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation; - -internal static class ActivityExtensions -{ - private static readonly ConcurrentBag SpanListPool = new(); - - internal static void AddBatch( - this ExportTraceServiceRequest request, - SdkLimitOptions sdkLimitOptions, - Resource processResource, - in Batch activityBatch) - { - Dictionary spansByLibrary = new Dictionary(); - ResourceSpans resourceSpans = new ResourceSpans - { - Resource = processResource, - }; - request.ResourceSpans.Add(resourceSpans); - - var maxTags = sdkLimitOptions.AttributeCountLimit ?? int.MaxValue; - - foreach (var activity in activityBatch) - { - Span? span = activity.ToOtlpSpan(sdkLimitOptions); - if (span == null) - { - OpenTelemetryProtocolExporterEventSource.Log.CouldNotTranslateActivity( - nameof(ActivityExtensions), - nameof(AddBatch)); - continue; - } - - var activitySourceName = activity.Source.Name; - if (!spansByLibrary.TryGetValue(activitySourceName, out var scopeSpans)) - { - scopeSpans = GetSpanListFromPool(activity.Source, maxTags, sdkLimitOptions.AttributeValueLengthLimit); - - spansByLibrary.Add(activitySourceName, scopeSpans); - resourceSpans.ScopeSpans.Add(scopeSpans); - } - - scopeSpans.Spans.Add(span); - } - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal static void Return(this ExportTraceServiceRequest request) - { - var resourceSpans = request.ResourceSpans.FirstOrDefault(); - if (resourceSpans == null) - { - return; - } - - foreach (var scopeSpan in resourceSpans.ScopeSpans) - { - scopeSpan.Spans.Clear(); - scopeSpan.Scope.Attributes.Clear(); - SpanListPool.Add(scopeSpan); - } - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal static ScopeSpans GetSpanListFromPool(ActivitySource activitySource, int maxTags, int? attributeValueLengthLimit) - { - if (!SpanListPool.TryTake(out var scopeSpans)) - { - scopeSpans = new ScopeSpans - { - Scope = new InstrumentationScope - { - Name = activitySource.Name, // Name is enforced to not be null, but it can be empty. - Version = activitySource.Version ?? string.Empty, // NRE throw by proto - }, - }; - } - else - { - scopeSpans.Scope.Name = activitySource.Name; // Name is enforced to not be null, but it can be empty. - scopeSpans.Scope.Version = activitySource.Version ?? string.Empty; // NRE throw by proto - } - - if (activitySource.Tags != null) - { - var scopeAttributes = scopeSpans.Scope.Attributes; - - if (activitySource.Tags is IReadOnlyList> activitySourceTagsList) - { - for (int i = 0; i < activitySourceTagsList.Count; i++) - { - if (scopeAttributes.Count < maxTags) - { - OtlpTagWriter.Instance.TryWriteTag(ref scopeAttributes, activitySourceTagsList[i], attributeValueLengthLimit); - } - else - { - scopeSpans.Scope.DroppedAttributesCount++; - } - } - } - else - { - foreach (var tag in activitySource.Tags) - { - if (scopeAttributes.Count < maxTags) - { - OtlpTagWriter.Instance.TryWriteTag(ref scopeAttributes, tag, attributeValueLengthLimit); - } - else - { - scopeSpans.Scope.DroppedAttributesCount++; - } - } - } - } - - return scopeSpans; - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal static Span? ToOtlpSpan(this Activity activity, SdkLimitOptions sdkLimitOptions) - { - if (activity.IdFormat != ActivityIdFormat.W3C) - { - // Only ActivityIdFormat.W3C is supported, in principle this should never be - // hit under the OpenTelemetry SDK. - return null; - } - - byte[] traceIdBytes = new byte[16]; - byte[] spanIdBytes = new byte[8]; - - activity.TraceId.CopyTo(traceIdBytes); - activity.SpanId.CopyTo(spanIdBytes); - - var parentSpanIdString = ByteString.Empty; - if (activity.ParentSpanId != default) - { - byte[] parentSpanIdBytes = new byte[8]; - activity.ParentSpanId.CopyTo(parentSpanIdBytes); - parentSpanIdString = UnsafeByteOperations.UnsafeWrap(parentSpanIdBytes); - } - - var startTimeUnixNano = activity.StartTimeUtc.ToUnixTimeNanoseconds(); - var otlpSpan = new Span - { - Name = activity.DisplayName, - - // There is an offset of 1 on the OTLP enum. - Kind = (Span.Types.SpanKind)(activity.Kind + 1), - - TraceId = UnsafeByteOperations.UnsafeWrap(traceIdBytes), - SpanId = UnsafeByteOperations.UnsafeWrap(spanIdBytes), - ParentSpanId = parentSpanIdString, - TraceState = activity.TraceStateString ?? string.Empty, - - StartTimeUnixNano = (ulong)startTimeUnixNano, - EndTimeUnixNano = (ulong)(startTimeUnixNano + activity.Duration.ToNanoseconds()), - }; - - TagEnumerationState otlpTags = new() - { - SdkLimitOptions = sdkLimitOptions, - Span = otlpSpan, - }; - otlpTags.EnumerateTags(activity, sdkLimitOptions.SpanAttributeCountLimit ?? int.MaxValue); - - if (activity.Kind == ActivityKind.Client || activity.Kind == ActivityKind.Producer) - { - PeerServiceResolver.Resolve(ref otlpTags, out string? peerServiceName, out bool addAsTag); - - if (peerServiceName != null && addAsTag) - { - otlpSpan.Attributes.Add( - new KeyValue - { - Key = SemanticConventions.AttributePeerService, - Value = new AnyValue { StringValue = peerServiceName }, - }); - } - } - - otlpSpan.Status = activity.ToOtlpStatus(ref otlpTags); - - EventEnumerationState otlpEvents = new() - { - SdkLimitOptions = sdkLimitOptions, - Span = otlpSpan, - }; - otlpEvents.EnumerateEvents(activity, sdkLimitOptions.SpanEventCountLimit ?? int.MaxValue); - - LinkEnumerationState otlpLinks = new() - { - SdkLimitOptions = sdkLimitOptions, - Span = otlpSpan, - }; - otlpLinks.EnumerateLinks(activity, sdkLimitOptions.SpanLinkCountLimit ?? int.MaxValue); - - otlpSpan.Flags = ToOtlpSpanFlags(activity.Context.TraceFlags, activity.HasRemoteParent); - - return otlpSpan; - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private static OtlpTrace.Status? ToOtlpStatus(this Activity activity, ref TagEnumerationState otlpTags) - { - var statusCodeForTagValue = StatusHelper.GetStatusCodeForTagValue(otlpTags.StatusCode); - if (activity.Status == ActivityStatusCode.Unset && statusCodeForTagValue == null) - { - return null; - } - - OtlpTrace.Status.Types.StatusCode otlpActivityStatusCode = OtlpTrace.Status.Types.StatusCode.Unset; - string? otlpStatusDescription = null; - if (activity.Status != ActivityStatusCode.Unset) - { - // The numerical values of the two enumerations match, a simple cast is enough. - otlpActivityStatusCode = (OtlpTrace.Status.Types.StatusCode)(int)activity.Status; - if (activity.Status == ActivityStatusCode.Error && !string.IsNullOrEmpty(activity.StatusDescription)) - { - otlpStatusDescription = activity.StatusDescription; - } - } - else - { - if (statusCodeForTagValue != StatusCode.Unset) - { - // The numerical values of the two enumerations match, a simple cast is enough. - otlpActivityStatusCode = (OtlpTrace.Status.Types.StatusCode)(int)statusCodeForTagValue!; - if (statusCodeForTagValue == StatusCode.Error && !string.IsNullOrEmpty(otlpTags.StatusDescription)) - { - otlpStatusDescription = otlpTags.StatusDescription; - } - } - } - - var otlpStatus = new OtlpTrace.Status { Code = otlpActivityStatusCode }; - if (!string.IsNullOrEmpty(otlpStatusDescription)) - { - otlpStatus.Message = otlpStatusDescription; - } - - return otlpStatus; - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private static Span.Types.Link ToOtlpLink(in ActivityLink activityLink, SdkLimitOptions sdkLimitOptions) - { - byte[] traceIdBytes = new byte[16]; - byte[] spanIdBytes = new byte[8]; - - activityLink.Context.TraceId.CopyTo(traceIdBytes); - activityLink.Context.SpanId.CopyTo(spanIdBytes); - - var otlpLink = new Span.Types.Link - { - TraceId = UnsafeByteOperations.UnsafeWrap(traceIdBytes), - SpanId = UnsafeByteOperations.UnsafeWrap(spanIdBytes), - }; - - int maxTags = sdkLimitOptions.SpanLinkAttributeCountLimit ?? int.MaxValue; - - var otlpLinkAttributes = otlpLink.Attributes; - - foreach (ref readonly var tag in activityLink.EnumerateTagObjects()) - { - if (otlpLinkAttributes.Count == maxTags) - { - otlpLink.DroppedAttributesCount++; - continue; - } - - OtlpTagWriter.Instance.TryWriteTag(ref otlpLinkAttributes, tag, sdkLimitOptions.AttributeValueLengthLimit); - } - - otlpLink.Flags = ToOtlpSpanFlags(activityLink.Context.TraceFlags, activityLink.Context.IsRemote); - - return otlpLink; - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private static Span.Types.Event ToOtlpEvent(in ActivityEvent activityEvent, SdkLimitOptions sdkLimitOptions) - { - var otlpEvent = new Span.Types.Event - { - Name = activityEvent.Name, - TimeUnixNano = (ulong)activityEvent.Timestamp.ToUnixTimeNanoseconds(), - }; - - int maxTags = sdkLimitOptions.SpanEventAttributeCountLimit ?? int.MaxValue; - - var otlpEventAttributes = otlpEvent.Attributes; - - foreach (ref readonly var tag in activityEvent.EnumerateTagObjects()) - { - if (otlpEventAttributes.Count == maxTags) - { - otlpEvent.DroppedAttributesCount++; - continue; - } - - OtlpTagWriter.Instance.TryWriteTag(ref otlpEventAttributes, tag, sdkLimitOptions.AttributeValueLengthLimit); - } - - return otlpEvent; - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private static uint ToOtlpSpanFlags(ActivityTraceFlags activityTraceFlags, bool isRemote) - { - SpanFlags flags = (SpanFlags)activityTraceFlags; - - flags |= SpanFlags.ContextHasIsRemoteMask; - - if (isRemote) - { - flags |= SpanFlags.ContextIsRemoteMask; - } - - return (uint)flags; - } - - private struct TagEnumerationState : PeerServiceResolver.IPeerServiceState - { - public SdkLimitOptions SdkLimitOptions; - - public Span Span; - - public string? StatusCode; - - public string? StatusDescription; - - public string? PeerService { get; set; } - - public int? PeerServicePriority { get; set; } - - public string? HostName { get; set; } - - public string? IpAddress { get; set; } - - public long Port { get; set; } - - public void EnumerateTags(Activity activity, int maxTags) - { - var otlpSpanAttributes = this.Span.Attributes; - - foreach (ref readonly var tag in activity.EnumerateTagObjects()) - { - if (tag.Value == null) - { - continue; - } - - var key = tag.Key; - - switch (key) - { - case SpanAttributeConstants.StatusCodeKey: - this.StatusCode = tag.Value as string; - continue; - case SpanAttributeConstants.StatusDescriptionKey: - this.StatusDescription = tag.Value as string; - continue; - } - - if (otlpSpanAttributes.Count == maxTags) - { - this.Span.DroppedAttributesCount++; - } - else - { - OtlpTagWriter.Instance.TryWriteTag(ref otlpSpanAttributes, tag, this.SdkLimitOptions.AttributeValueLengthLimit); - } - - if (tag.Value is string tagStringValue) - { - PeerServiceResolver.InspectTag(ref this, key, tagStringValue); - } - else if (tag.Value is int tagIntValue) - { - PeerServiceResolver.InspectTag(ref this, key, tagIntValue); - } - } - } - } - - private struct EventEnumerationState - { - public SdkLimitOptions SdkLimitOptions; - - public Span Span; - - public void EnumerateEvents(Activity activity, int maxEvents) - { - foreach (ref readonly var @event in activity.EnumerateEvents()) - { - if (this.Span.Events.Count < maxEvents) - { - this.Span.Events.Add(ToOtlpEvent(in @event, this.SdkLimitOptions)); - } - else - { - this.Span.DroppedEventsCount++; - } - } - } - } - - private struct LinkEnumerationState - { - public SdkLimitOptions SdkLimitOptions; - - public Span Span; - - public void EnumerateLinks(Activity activity, int maxLinks) - { - foreach (ref readonly var link in activity.EnumerateLinks()) - { - if (this.Span.Links.Count < maxLinks) - { - this.Span.Links.Add(ToOtlpLink(in link, this.SdkLimitOptions)); - } - else - { - this.Span.DroppedLinksCount++; - } - } - } - } -} diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpGrpcTraceExportClient.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpGrpcTraceExportClient.cs deleted file mode 100644 index b30efe6de18..00000000000 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpGrpcTraceExportClient.cs +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -using Grpc.Core; -using OtlpCollector = OpenTelemetry.Proto.Collector.Trace.V1; - -namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient; - -/// Class for sending OTLP trace export request over gRPC. -internal sealed class OtlpGrpcTraceExportClient : BaseOtlpGrpcExportClient -{ - private readonly OtlpCollector.TraceService.TraceServiceClient traceClient; - - public OtlpGrpcTraceExportClient(OtlpExporterOptions options, OtlpCollector.TraceService.TraceServiceClient? traceServiceClient = null) - : base(options) - { - if (traceServiceClient != null) - { - this.traceClient = traceServiceClient; - } - else - { - this.Channel = options.CreateChannel(); - this.traceClient = new OtlpCollector.TraceService.TraceServiceClient(this.Channel); - } - } - - /// - public override ExportClientResponse SendExportRequest(OtlpCollector.ExportTraceServiceRequest request, DateTime deadlineUtc, CancellationToken cancellationToken = default) - { - try - { - this.traceClient.Export(request, headers: this.Headers, deadline: deadlineUtc, cancellationToken: cancellationToken); - - // We do not need to return back response and deadline for successful response so using cached value. - return SuccessExportResponse; - } - catch (RpcException ex) - { - OpenTelemetryProtocolExporterEventSource.Log.FailedToReachCollector(this.Endpoint, ex); - - return new ExportClientGrpcResponse(false, deadlineUtc, ex, null, null); - } - } -} diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpHttpTraceExportClient.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpHttpTraceExportClient.cs deleted file mode 100644 index cff10229bcd..00000000000 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpHttpTraceExportClient.cs +++ /dev/null @@ -1,69 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -using System.Net; -#if NETFRAMEWORK -using System.Net.Http; -#endif -using System.Net.Http.Headers; -using System.Runtime.CompilerServices; -using Google.Protobuf; -using OtlpCollector = OpenTelemetry.Proto.Collector.Trace.V1; - -namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient; - -/// Class for sending OTLP trace export request over HTTP. -internal sealed class OtlpHttpTraceExportClient : BaseOtlpHttpExportClient -{ - internal const string MediaContentType = "application/x-protobuf"; - private const string TracesExportPath = "v1/traces"; - - public OtlpHttpTraceExportClient(OtlpExporterOptions options, HttpClient httpClient) - : base(options, httpClient, TracesExportPath) - { - } - - protected override HttpContent CreateHttpContent(OtlpCollector.ExportTraceServiceRequest exportRequest) - { - return new ExportRequestContent(exportRequest); - } - - internal sealed class ExportRequestContent : HttpContent - { - private static readonly MediaTypeHeaderValue ProtobufMediaTypeHeader = new(MediaContentType); - - private readonly OtlpCollector.ExportTraceServiceRequest exportRequest; - - public ExportRequestContent(OtlpCollector.ExportTraceServiceRequest exportRequest) - { - this.exportRequest = exportRequest; - this.Headers.ContentType = ProtobufMediaTypeHeader; - } - -#if NET - protected override void SerializeToStream(Stream stream, TransportContext? context, CancellationToken cancellationToken) - { - this.SerializeToStreamInternal(stream); - } -#endif - - protected override Task SerializeToStreamAsync(Stream stream, TransportContext? context) - { - this.SerializeToStreamInternal(stream); - return Task.CompletedTask; - } - - protected override bool TryComputeLength(out long length) - { - // We can't know the length of the content being pushed to the output stream. - length = -1; - return false; - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private void SerializeToStreamInternal(Stream stream) - { - this.exportRequest.WriteTo(stream); - } - } -} diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ProtobufOtlpHttpExportClient.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ProtobufOtlpHttpExportClient.cs index 3d3e62c7bf1..11bc932fa5a 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ProtobufOtlpHttpExportClient.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/ProtobufOtlpHttpExportClient.cs @@ -11,7 +11,7 @@ namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClie /// Class for sending OTLP trace export request over HTTP. internal sealed class ProtobufOtlpHttpExportClient : ProtobufOtlpExportClient { - private static readonly MediaTypeHeaderValue MediaHeaderValue = new("application/x-protobuf"); + internal static readonly MediaTypeHeaderValue MediaHeaderValue = new("application/x-protobuf"); private static readonly ExportClientHttpResponse SuccessExportResponse = new(success: true, deadlineUtc: default, response: null, exception: null); internal ProtobufOtlpHttpExportClient(OtlpExporterOptions options, HttpClient httpClient, string signalPath) diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpExporterOptionsExtensions.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpExporterOptionsExtensions.cs index 361980ca938..57198880c47 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpExporterOptionsExtensions.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpExporterOptionsExtensions.cs @@ -16,7 +16,6 @@ using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission; using LogOtlpCollector = OpenTelemetry.Proto.Collector.Logs.V1; using MetricsOtlpCollector = OpenTelemetry.Proto.Collector.Metrics.V1; -using TraceOtlpCollector = OpenTelemetry.Proto.Collector.Trace.V1; namespace OpenTelemetry.Exporter; @@ -99,42 +98,6 @@ public static THeaders GetHeaders(this OtlpExporterOptions options, Ac return headers; } - public static OtlpExporterTransmissionHandler GetTraceExportTransmissionHandler(this OtlpExporterOptions options, ExperimentalOptions experimentalOptions) - { - var exportClient = GetTraceExportClient(options); - - // `HttpClient.Timeout.TotalMilliseconds` would be populated with the correct timeout value for both the exporter configuration cases: - // 1. User provides their own HttpClient. This case is straightforward as the user wants to use their `HttpClient` and thereby the same client's timeout value. - // 2. If the user configures timeout via the exporter options, then the timeout set for the `HttpClient` initialized by the exporter will be set to user provided value. - double timeoutMilliseconds = exportClient is OtlpHttpTraceExportClient httpTraceExportClient - ? httpTraceExportClient.HttpClient.Timeout.TotalMilliseconds - : options.TimeoutMilliseconds; - - if (experimentalOptions.EnableInMemoryRetry) - { - return new OtlpExporterRetryTransmissionHandler(exportClient, timeoutMilliseconds); - } - else if (experimentalOptions.EnableDiskRetry) - { - Debug.Assert(!string.IsNullOrEmpty(experimentalOptions.DiskRetryDirectoryPath), $"{nameof(experimentalOptions.DiskRetryDirectoryPath)} is null or empty"); - - return new OtlpExporterPersistentStorageTransmissionHandler( - exportClient, - timeoutMilliseconds, - (byte[] data) => - { - var request = new TraceOtlpCollector.ExportTraceServiceRequest(); - request.MergeFrom(data); - return request; - }, - Path.Combine(experimentalOptions.DiskRetryDirectoryPath, "traces")); - } - else - { - return new OtlpExporterTransmissionHandler(exportClient, timeoutMilliseconds); - } - } - public static ProtobufOtlpExporterTransmissionHandler GetProtobufExportTransmissionHandler(this OtlpExporterOptions options, ExperimentalOptions experimentalOptions, OtlpSignalType otlpSignalType) { var exportClient = GetProtobufExportClient(options, otlpSignalType); @@ -169,6 +132,11 @@ public static IProtobufExportClient GetProtobufExportClient(this OtlpExporterOpt { var httpClient = options.HttpClientFactory?.Invoke() ?? throw new InvalidOperationException("OtlpExporterOptions was missing HttpClientFactory or it returned null."); + if (options.Protocol != OtlpExportProtocol.Grpc && options.Protocol != OtlpExportProtocol.HttpProtobuf) + { + throw new NotSupportedException($"Protocol {options.Protocol} is not supported."); + } + return otlpSignalType switch { OtlpSignalType.Traces => options.Protocol == OtlpExportProtocol.Grpc @@ -255,16 +223,6 @@ public static IProtobufExportClient GetProtobufExportClient(this OtlpExporterOpt } } - public static IExportClient GetTraceExportClient(this OtlpExporterOptions options) => - options.Protocol switch - { - OtlpExportProtocol.Grpc => new OtlpGrpcTraceExportClient(options), - OtlpExportProtocol.HttpProtobuf => new OtlpHttpTraceExportClient( - options, - options.HttpClientFactory?.Invoke() ?? throw new InvalidOperationException("OtlpExporterOptions was missing HttpClientFactory or it returned null.")), - _ => throw new NotSupportedException($"Protocol {options.Protocol} is not supported."), - }; - public static IExportClient GetMetricsExportClient(this OtlpExporterOptions options) => options.Protocol switch { diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpTraceExporter.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpTraceExporter.cs index da92667e033..bc7e062f648 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpTraceExporter.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpTraceExporter.cs @@ -1,11 +1,12 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 +using System.Buffers.Binary; using System.Diagnostics; using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation; +using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Serializer; using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission; -using OtlpCollector = OpenTelemetry.Proto.Collector.Trace.V1; -using OtlpResource = OpenTelemetry.Proto.Resource.V1; +using OpenTelemetry.Resources; namespace OpenTelemetry.Exporter; @@ -16,9 +17,15 @@ namespace OpenTelemetry.Exporter; public class OtlpTraceExporter : BaseExporter { private readonly SdkLimitOptions sdkLimitOptions; - private readonly OtlpExporterTransmissionHandler transmissionHandler; + private readonly ProtobufOtlpExporterTransmissionHandler transmissionHandler; + private readonly int startWritePosition; - private OtlpResource.Resource? processResource; + private Resource? resource; + + // Initial buffer size set to ~732KB. + // This choice allows us to gradually grow the buffer while targeting a final capacity of around 100 MB, + // by the 7th doubling to maintain efficient allocation without frequent resizing. + private byte[] buffer = new byte[750000]; /// /// Initializes a new instance of the class. @@ -40,17 +47,17 @@ internal OtlpTraceExporter( OtlpExporterOptions exporterOptions, SdkLimitOptions sdkLimitOptions, ExperimentalOptions experimentalOptions, - OtlpExporterTransmissionHandler? transmissionHandler = null) + ProtobufOtlpExporterTransmissionHandler? transmissionHandler = null) { Debug.Assert(exporterOptions != null, "exporterOptions was null"); Debug.Assert(sdkLimitOptions != null, "sdkLimitOptions was null"); this.sdkLimitOptions = sdkLimitOptions!; - - this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetTraceExportTransmissionHandler(experimentalOptions); + this.startWritePosition = exporterOptions!.Protocol == OtlpExportProtocol.Grpc ? 5 : 0; + this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetProtobufExportTransmissionHandler(experimentalOptions, OtlpSignalType.Traces); } - internal OtlpResource.Resource ProcessResource => this.processResource ??= this.ParentProvider.GetResource().ToOtlpResource(); + internal Resource Resource => this.resource ??= this.ParentProvider.GetResource(); /// public override ExportResult Export(in Batch activityBatch) @@ -58,13 +65,22 @@ public override ExportResult Export(in Batch activityBatch) // Prevents the exporter's gRPC and HTTP operations from being instrumented. using var scope = SuppressInstrumentationScope.Begin(); - var request = new OtlpCollector.ExportTraceServiceRequest(); - try { - request.AddBatch(this.sdkLimitOptions, this.ProcessResource, activityBatch); + int writePosition = ProtobufOtlpTraceSerializer.WriteTraceData(this.buffer, this.startWritePosition, this.sdkLimitOptions, this.Resource, activityBatch); + + if (this.startWritePosition == 5) + { + // Grpc payload consists of 3 parts + // byte 0 - Specifying if the payload is compressed. + // 1-4 byte - Specifies the length of payload in big endian format. + // 5 and above - Protobuf serialized data. + Span data = new Span(this.buffer, 1, 4); + var dataLength = writePosition - 5; + BinaryPrimitives.WriteUInt32BigEndian(data, (uint)dataLength); + } - if (!this.transmissionHandler.TrySubmitRequest(request)) + if (!this.transmissionHandler.TrySubmitRequest(this.buffer, writePosition)) { return ExportResult.Failure; } @@ -74,17 +90,10 @@ public override ExportResult Export(in Batch activityBatch) OpenTelemetryProtocolExporterEventSource.Log.ExportMethodException(ex); return ExportResult.Failure; } - finally - { - request.Return(); - } return ExportResult.Success; } /// - protected override bool OnShutdown(int timeoutMilliseconds) - { - return this.transmissionHandler.Shutdown(timeoutMilliseconds); - } + protected override bool OnShutdown(int timeoutMilliseconds) => this.transmissionHandler.Shutdown(timeoutMilliseconds); } diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpTraceExporterHelperExtensions.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpTraceExporterHelperExtensions.cs index 731010fbdc1..3a18b3da423 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpTraceExporterHelperExtensions.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpTraceExporterHelperExtensions.cs @@ -136,16 +136,7 @@ internal static BaseProcessor BuildOtlpExporterProcessor( exporterOptions!.TryEnableIHttpClientFactoryIntegration(serviceProvider!, "OtlpTraceExporter"); - BaseExporter otlpExporter; - - if (experimentalOptions != null && experimentalOptions.UseCustomProtobufSerializer) - { - otlpExporter = new ProtobufOtlpTraceExporter(exporterOptions!, sdkLimitOptions!, experimentalOptions!); - } - else - { - otlpExporter = new OtlpTraceExporter(exporterOptions!, sdkLimitOptions!, experimentalOptions!); - } + BaseExporter otlpExporter = new OtlpTraceExporter(exporterOptions!, sdkLimitOptions!, experimentalOptions!); if (configureExporterInstance != null) { diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/ProtobufOtlpTraceExporter.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/ProtobufOtlpTraceExporter.cs deleted file mode 100644 index b9723cee465..00000000000 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/ProtobufOtlpTraceExporter.cs +++ /dev/null @@ -1,102 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -using System.Buffers.Binary; -using System.Diagnostics; -using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation; -using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Serializer; -using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission; -using OpenTelemetry.Resources; - -namespace OpenTelemetry.Exporter; - -/// -/// Exporter consuming and exporting the data using -/// the OpenTelemetry protocol (OTLP). -/// -internal sealed class ProtobufOtlpTraceExporter : BaseExporter -{ - private readonly SdkLimitOptions sdkLimitOptions; - private readonly ProtobufOtlpExporterTransmissionHandler transmissionHandler; - private readonly int startWritePosition; - - private Resource? resource; - - // Initial buffer size set to ~732KB. - // This choice allows us to gradually grow the buffer while targeting a final capacity of around 100 MB, - // by the 7th doubling to maintain efficient allocation without frequent resizing. - private byte[] buffer = new byte[750000]; - - /// - /// Initializes a new instance of the class. - /// - /// Configuration options for the export. - public ProtobufOtlpTraceExporter(OtlpExporterOptions options) - : this(options, sdkLimitOptions: new(), experimentalOptions: new(), transmissionHandler: null) - { - } - - /// - /// Initializes a new instance of the class. - /// - /// . - /// . - /// . - /// . - internal ProtobufOtlpTraceExporter( - OtlpExporterOptions exporterOptions, - SdkLimitOptions sdkLimitOptions, - ExperimentalOptions experimentalOptions, - ProtobufOtlpExporterTransmissionHandler? transmissionHandler = null) - { - Debug.Assert(exporterOptions != null, "exporterOptions was null"); - Debug.Assert(sdkLimitOptions != null, "sdkLimitOptions was null"); - - this.sdkLimitOptions = sdkLimitOptions!; - this.startWritePosition = exporterOptions!.Protocol == OtlpExportProtocol.Grpc ? 5 : 0; - this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetProtobufExportTransmissionHandler(experimentalOptions, OtlpSignalType.Traces); - } - - internal Resource Resource => this.resource ??= this.ParentProvider.GetResource(); - - /// - public override ExportResult Export(in Batch activityBatch) - { - // Prevents the exporter's gRPC and HTTP operations from being instrumented. - using var scope = SuppressInstrumentationScope.Begin(); - - try - { - int writePosition = ProtobufOtlpTraceSerializer.WriteTraceData(this.buffer, this.startWritePosition, this.sdkLimitOptions, this.Resource, activityBatch); - - if (this.startWritePosition == 5) - { - // Grpc payload consists of 3 parts - // byte 0 - Specifying if the payload is compressed. - // 1-4 byte - Specifies the length of payload in big endian format. - // 5 and above - Protobuf serialized data. - Span data = new Span(this.buffer, 1, 4); - var dataLength = writePosition - 5; - BinaryPrimitives.WriteUInt32BigEndian(data, (uint)dataLength); - } - - if (!this.transmissionHandler.TrySubmitRequest(this.buffer, writePosition)) - { - return ExportResult.Failure; - } - } - catch (Exception ex) - { - OpenTelemetryProtocolExporterEventSource.Log.ExportMethodException(ex); - return ExportResult.Failure; - } - - return ExportResult.Success; - } - - /// - protected override bool OnShutdown(int timeoutMilliseconds) - { - return this.transmissionHandler.Shutdown(timeoutMilliseconds); - } -} diff --git a/test/Benchmarks/Exporter/OtlpGrpcExporterBenchmarks.cs b/test/Benchmarks/Exporter/OtlpGrpcExporterBenchmarks.cs index 5f41ac0a3f1..ba7163a179a 100644 --- a/test/Benchmarks/Exporter/OtlpGrpcExporterBenchmarks.cs +++ b/test/Benchmarks/Exporter/OtlpGrpcExporterBenchmarks.cs @@ -12,7 +12,6 @@ using OpenTelemetryProtocol::OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation; using OpenTelemetryProtocol::OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient; using OpenTelemetryProtocol::OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission; -using OpenTelemetryProtocol::OpenTelemetry.Proto.Collector.Trace.V1; namespace Benchmarks.Exporter; @@ -36,7 +35,7 @@ public void GlobalSetup() options, new SdkLimitOptions(), new ExperimentalOptions(), - new OtlpExporterTransmissionHandler(new OtlpGrpcTraceExportClient(options, new TestTraceServiceClient()), options.TimeoutMilliseconds)); + new ProtobufOtlpExporterTransmissionHandler(new ProtobufOtlpGrpcExportClient(options, options.HttpClientFactory(), "opentelemetry.proto.collector.trace.v1.TraceService/Export"), options.TimeoutMilliseconds)); this.activity = ActivityHelper.CreateTestActivity(); this.activityBatch = new CircularBuffer(this.NumberOfSpans); diff --git a/test/Benchmarks/Exporter/OtlpHttpExporterBenchmarks.cs b/test/Benchmarks/Exporter/OtlpHttpExporterBenchmarks.cs index 1d6c0ad5c14..9603c147ac4 100644 --- a/test/Benchmarks/Exporter/OtlpHttpExporterBenchmarks.cs +++ b/test/Benchmarks/Exporter/OtlpHttpExporterBenchmarks.cs @@ -13,7 +13,6 @@ using OpenTelemetryProtocol::OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation; using OpenTelemetryProtocol::OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient; using OpenTelemetryProtocol::OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission; -using OpenTelemetryProtocol::OpenTelemetry.Proto.Collector.Trace.V1; namespace Benchmarks.Exporter; @@ -64,7 +63,7 @@ public void GlobalSetup() options, new SdkLimitOptions(), new ExperimentalOptions(), - new OtlpExporterTransmissionHandler(new OtlpHttpTraceExportClient(options, options.HttpClientFactory()), options.TimeoutMilliseconds)); + new ProtobufOtlpExporterTransmissionHandler(new ProtobufOtlpHttpExportClient(options, options.HttpClientFactory(), "v1/traces"), options.TimeoutMilliseconds)); this.activity = ActivityHelper.CreateTestActivity(); this.activityBatch = new CircularBuffer(this.NumberOfSpans); diff --git a/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/Implementation/ExportClient/OtlpHttpTraceExportClientTests.cs b/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/Implementation/ExportClient/OtlpHttpTraceExportClientTests.cs index 0b932f87d39..b156204429f 100644 --- a/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/Implementation/ExportClient/OtlpHttpTraceExportClientTests.cs +++ b/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/Implementation/ExportClient/OtlpHttpTraceExportClientTests.cs @@ -7,6 +7,7 @@ #endif using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation; using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient; +using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Serializer; using OpenTelemetry.Resources; using OpenTelemetry.Trace; using Xunit; @@ -43,7 +44,7 @@ public void NewOtlpHttpTraceExportClient_OtlpExporterOptions_ExporterHasCorrectP Headers = $"{header1.Name}={header1.Value}, {header2.Name} = {header2.Value}", }; - var client = new OtlpHttpTraceExportClient(options, options.HttpClientFactory()); + var client = new ProtobufOtlpHttpExportClient(options, options.HttpClientFactory(), "/v1/traces"); Assert.NotNull(client.HttpClient); @@ -85,7 +86,7 @@ public void SendExportRequest_ExportTraceServiceRequest_SendsCorrectHttpRequest( var httpClient = new HttpClient(testHttpHandler); - var exportClient = new OtlpHttpTraceExportClient(options, httpClient); + var exportClient = new ProtobufOtlpHttpExportClient(options, httpClient, string.Empty); var resourceBuilder = ResourceBuilder.CreateEmpty(); if (includeServiceNameInResource) @@ -131,10 +132,10 @@ void RunTest(Batch batch) var deadlineUtc = DateTime.UtcNow.AddMilliseconds(httpClient.Timeout.TotalMilliseconds); var request = new OtlpCollector.ExportTraceServiceRequest(); - request.AddBatch(DefaultSdkLimitOptions, resourceBuilder.Build().ToOtlpResource(), batch); + var (buffer, contentLength) = CreateTraceExportRequest(DefaultSdkLimitOptions, batch, resourceBuilder.Build()); // Act - var result = exportClient.SendExportRequest(request, deadlineUtc); + var result = exportClient.SendExportRequest(buffer, contentLength, deadlineUtc); var httpRequest = testHttpHandler.HttpRequestMessage; @@ -154,8 +155,11 @@ void RunTest(Batch batch) } Assert.NotNull(testHttpHandler.HttpRequestContent); - Assert.IsType(httpRequest.Content); - Assert.Contains(httpRequest.Content.Headers, h => h.Key == "Content-Type" && h.Value.First() == OtlpHttpTraceExportClient.MediaContentType); + + // TODO: Revisit once the HttpClient part is overridden. + // Assert.IsType(httpRequest.Content); + Assert.NotNull(httpRequest.Content); + Assert.Contains(httpRequest.Content.Headers, h => h.Key == "Content-Type" && h.Value.First() == ProtobufOtlpHttpExportClient.MediaHeaderValue.ToString()); var exportTraceRequest = OtlpCollector.ExportTraceServiceRequest.Parser.ParseFrom(testHttpHandler.HttpRequestContent); Assert.NotNull(exportTraceRequest); @@ -173,4 +177,11 @@ void RunTest(Batch batch) } } } + + private static (byte[] Buffer, int ContentLength) CreateTraceExportRequest(SdkLimitOptions sdkOptions, in Batch batch, Resource resource) + { + var buffer = new byte[4096]; + var writePosition = ProtobufOtlpTraceSerializer.WriteTraceData(buffer, 0, sdkOptions, resource, batch); + return (buffer, writePosition); + } } diff --git a/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/MockCollectorIntegrationTests.cs b/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/MockCollectorIntegrationTests.cs index c9e2c19ae09..e0fa32e5970 100644 --- a/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/MockCollectorIntegrationTests.cs +++ b/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/MockCollectorIntegrationTests.cs @@ -74,7 +74,7 @@ public async Task TestRecoveryAfterFailedExport() await httpClient.GetAsync($"/MockCollector/SetResponseCodes/{string.Join(",", codes.Select(x => (int)x))}"); var exportResults = new List(); - using var otlpExporter = new ProtobufOtlpTraceExporter(new OtlpExporterOptions() { Endpoint = new Uri($"http://localhost:{testGrpcPort}") }); + using var otlpExporter = new OtlpTraceExporter(new OtlpExporterOptions() { Endpoint = new Uri($"http://localhost:{testGrpcPort}") }); var delegatingExporter = new DelegatingExporter { OnExportFunc = (batch) => @@ -186,7 +186,7 @@ public async Task GrpcRetryTests(bool useRetryTransmissionHandler, ExportResult }) .Build(); - using var otlpExporter = new ProtobufOtlpTraceExporter(exporterOptions, new SdkLimitOptions(), new ExperimentalOptions(configuration)); + using var otlpExporter = new OtlpTraceExporter(exporterOptions, new SdkLimitOptions(), new ExperimentalOptions(configuration)); var activitySourceName = "otel.grpc.retry.test"; using var source = new ActivitySource(activitySourceName); @@ -275,7 +275,7 @@ public async Task HttpRetryTests(bool useRetryTransmissionHandler, ExportResult }) .Build(); - using var otlpExporter = new ProtobufOtlpTraceExporter(exporterOptions, new SdkLimitOptions(), new ExperimentalOptions(configuration)); + using var otlpExporter = new OtlpTraceExporter(exporterOptions, new SdkLimitOptions(), new ExperimentalOptions(configuration)); var activitySourceName = "otel.http.retry.test"; using var source = new ActivitySource(activitySourceName); @@ -379,7 +379,7 @@ public async Task HttpPersistentStorageRetryTests(bool usePersistentStorageTrans }) .Build(); - using var otlpExporter = new ProtobufOtlpTraceExporter(exporterOptions, new(), new(configuration), transmissionHandler); + using var otlpExporter = new OtlpTraceExporter(exporterOptions, new(), new(configuration), transmissionHandler); var activitySourceName = "otel.http.persistent.storage.retry.test"; using var source = new ActivitySource(activitySourceName); @@ -519,7 +519,7 @@ public async Task GrpcPersistentStorageRetryTests(bool usePersistentStorageTrans }) .Build(); - using var otlpExporter = new ProtobufOtlpTraceExporter(exporterOptions, new(), new(configuration), transmissionHandler); + using var otlpExporter = new OtlpTraceExporter(exporterOptions, new(), new(configuration), transmissionHandler); var activitySourceName = "otel.grpc.persistent.storage.retry.test"; using var source = new ActivitySource(activitySourceName); diff --git a/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/OtlpExporterOptionsExtensionsTests.cs b/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/OtlpExporterOptionsExtensionsTests.cs index 4018998b890..4e139cd99ec 100644 --- a/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/OtlpExporterOptionsExtensionsTests.cs +++ b/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/OtlpExporterOptionsExtensionsTests.cs @@ -91,8 +91,8 @@ public void GetHeaders_NoOptionHeaders_ReturnsStandardHeaders(string? optionHead } [Theory] - [InlineData(OtlpExportProtocol.Grpc, typeof(OtlpGrpcTraceExportClient))] - [InlineData(OtlpExportProtocol.HttpProtobuf, typeof(OtlpHttpTraceExportClient))] + [InlineData(OtlpExportProtocol.Grpc, typeof(ProtobufOtlpGrpcExportClient))] + [InlineData(OtlpExportProtocol.HttpProtobuf, typeof(ProtobufOtlpHttpExportClient))] public void GetTraceExportClient_SupportedProtocol_ReturnsCorrectExportClient(OtlpExportProtocol protocol, Type expectedExportClientType) { var options = new OtlpExporterOptions @@ -100,7 +100,7 @@ public void GetTraceExportClient_SupportedProtocol_ReturnsCorrectExportClient(Ot Protocol = protocol, }; - var exportClient = options.GetTraceExportClient(); + var exportClient = options.GetProtobufExportClient(OtlpSignalType.Traces); Assert.Equal(expectedExportClientType, exportClient.GetType()); } @@ -113,7 +113,7 @@ public void GetTraceExportClient_UnsupportedProtocol_Throws() Protocol = (OtlpExportProtocol)123, }; - Assert.Throws(() => options.GetTraceExportClient()); + Assert.Throws(() => options.GetProtobufExportClient(OtlpSignalType.Traces)); } [Theory] @@ -131,27 +131,27 @@ public void AppendPathIfNotPresent_TracesPath_AppendsCorrectly(string inputUri, } [Theory] - [InlineData(OtlpExportProtocol.Grpc, typeof(OtlpGrpcTraceExportClient), false, 10000, null)] - [InlineData(OtlpExportProtocol.HttpProtobuf, typeof(OtlpHttpTraceExportClient), false, 10000, null)] - [InlineData(OtlpExportProtocol.HttpProtobuf, typeof(OtlpHttpTraceExportClient), true, 8000, null)] + [InlineData(OtlpExportProtocol.Grpc, typeof(ProtobufOtlpGrpcExportClient), false, 10000, null)] + [InlineData(OtlpExportProtocol.HttpProtobuf, typeof(ProtobufOtlpHttpExportClient), false, 10000, null)] + [InlineData(OtlpExportProtocol.HttpProtobuf, typeof(ProtobufOtlpHttpExportClient), true, 8000, null)] [InlineData(OtlpExportProtocol.Grpc, typeof(OtlpGrpcMetricsExportClient), false, 10000, null)] [InlineData(OtlpExportProtocol.HttpProtobuf, typeof(OtlpHttpMetricsExportClient), false, 10000, null)] [InlineData(OtlpExportProtocol.HttpProtobuf, typeof(OtlpHttpMetricsExportClient), true, 8000, null)] [InlineData(OtlpExportProtocol.Grpc, typeof(OtlpGrpcLogExportClient), false, 10000, null)] [InlineData(OtlpExportProtocol.HttpProtobuf, typeof(OtlpHttpLogExportClient), false, 10000, null)] [InlineData(OtlpExportProtocol.HttpProtobuf, typeof(OtlpHttpLogExportClient), true, 8000, null)] - [InlineData(OtlpExportProtocol.Grpc, typeof(OtlpGrpcTraceExportClient), false, 10000, "in_memory")] - [InlineData(OtlpExportProtocol.HttpProtobuf, typeof(OtlpHttpTraceExportClient), false, 10000, "in_memory")] - [InlineData(OtlpExportProtocol.HttpProtobuf, typeof(OtlpHttpTraceExportClient), true, 8000, "in_memory")] + [InlineData(OtlpExportProtocol.Grpc, typeof(ProtobufOtlpGrpcExportClient), false, 10000, "in_memory")] + [InlineData(OtlpExportProtocol.HttpProtobuf, typeof(ProtobufOtlpHttpExportClient), false, 10000, "in_memory")] + [InlineData(OtlpExportProtocol.HttpProtobuf, typeof(ProtobufOtlpHttpExportClient), true, 8000, "in_memory")] [InlineData(OtlpExportProtocol.Grpc, typeof(OtlpGrpcMetricsExportClient), false, 10000, "in_memory")] [InlineData(OtlpExportProtocol.HttpProtobuf, typeof(OtlpHttpMetricsExportClient), false, 10000, "in_memory")] [InlineData(OtlpExportProtocol.HttpProtobuf, typeof(OtlpHttpMetricsExportClient), true, 8000, "in_memory")] [InlineData(OtlpExportProtocol.Grpc, typeof(OtlpGrpcLogExportClient), false, 10000, "in_memory")] [InlineData(OtlpExportProtocol.HttpProtobuf, typeof(OtlpHttpLogExportClient), false, 10000, "in_memory")] [InlineData(OtlpExportProtocol.HttpProtobuf, typeof(OtlpHttpLogExportClient), true, 8000, "in_memory")] - [InlineData(OtlpExportProtocol.Grpc, typeof(OtlpGrpcTraceExportClient), false, 10000, "disk")] - [InlineData(OtlpExportProtocol.HttpProtobuf, typeof(OtlpHttpTraceExportClient), false, 10000, "disk")] - [InlineData(OtlpExportProtocol.HttpProtobuf, typeof(OtlpHttpTraceExportClient), true, 8000, "disk")] + [InlineData(OtlpExportProtocol.Grpc, typeof(ProtobufOtlpGrpcExportClient), false, 10000, "disk")] + [InlineData(OtlpExportProtocol.HttpProtobuf, typeof(ProtobufOtlpHttpExportClient), false, 10000, "disk")] + [InlineData(OtlpExportProtocol.HttpProtobuf, typeof(ProtobufOtlpHttpExportClient), true, 8000, "disk")] [InlineData(OtlpExportProtocol.Grpc, typeof(OtlpGrpcMetricsExportClient), false, 10000, "disk")] [InlineData(OtlpExportProtocol.HttpProtobuf, typeof(OtlpHttpMetricsExportClient), false, 10000, "disk")] [InlineData(OtlpExportProtocol.HttpProtobuf, typeof(OtlpHttpMetricsExportClient), true, 8000, "disk")] @@ -173,9 +173,9 @@ public void GetTransmissionHandler_InitializesCorrectHandlerExportClientAndTimeo .AddInMemoryCollection(new Dictionary { [ExperimentalOptions.OtlpRetryEnvVar] = retryStrategy }) .Build(); - if (exportClientType == typeof(OtlpGrpcTraceExportClient) || exportClientType == typeof(OtlpHttpTraceExportClient)) + if (exportClientType == typeof(ProtobufOtlpGrpcExportClient) || exportClientType == typeof(ProtobufOtlpHttpExportClient)) { - var transmissionHandler = exporterOptions.GetTraceExportTransmissionHandler(new ExperimentalOptions(configuration)); + var transmissionHandler = exporterOptions.GetProtobufExportTransmissionHandler(new ExperimentalOptions(configuration), OtlpSignalType.Traces); AssertTransmissionHandler(transmissionHandler, exportClientType, expectedTimeoutMilliseconds, retryStrategy); } @@ -212,4 +212,24 @@ private static void AssertTransmissionHandler(OtlpExporterTransmissionHandler Assert.Equal(expectedTimeoutMilliseconds, transmissionHandler.TimeoutMilliseconds); } + + private static void AssertTransmissionHandler(ProtobufOtlpExporterTransmissionHandler transmissionHandler, Type exportClientType, int expectedTimeoutMilliseconds, string? retryStrategy) + { + if (retryStrategy == "in_memory") + { + Assert.True(transmissionHandler is ProtobufOtlpExporterRetryTransmissionHandler); + } + else if (retryStrategy == "disk") + { + Assert.True(transmissionHandler is ProtobufOtlpExporterPersistentStorageTransmissionHandler); + } + else + { + Assert.True(transmissionHandler is ProtobufOtlpExporterTransmissionHandler); + } + + Assert.Equal(exportClientType, transmissionHandler.ExportClient.GetType()); + + Assert.Equal(expectedTimeoutMilliseconds, transmissionHandler.TimeoutMilliseconds); + } } diff --git a/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/OtlpTraceExporterTests.cs b/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/OtlpTraceExporterTests.cs index eda9a6fa037..6c876dda3d6 100644 --- a/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/OtlpTraceExporterTests.cs +++ b/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/OtlpTraceExporterTests.cs @@ -128,11 +128,9 @@ public void ServiceProviderHttpClientFactoryInvoked() } [Theory] - [InlineData(true, true)] - [InlineData(false, true)] - [InlineData(true, false)] - [InlineData(false, false)] - public void ToOtlpResourceSpansTest(bool includeServiceNameInResource, bool useCustomSerializer) + [InlineData(true)] + [InlineData(false)] + public void ToOtlpResourceSpansTest(bool includeServiceNameInResource) { var evenTags = new[] { new KeyValuePair("k0", "v0") }; var oddTags = new[] { new KeyValuePair("k1", "v1") }; @@ -175,16 +173,7 @@ public void ToOtlpResourceSpansTest(bool includeServiceNameInResource, bool useC void RunTest(SdkLimitOptions sdkOptions, Batch batch) { - var request = new OtlpCollector.ExportTraceServiceRequest(); - - if (useCustomSerializer) - { - request = CreateTraceExportRequest(sdkOptions, batch, resourceBuilder.Build()); - } - else - { - request.AddBatch(sdkOptions, resourceBuilder.Build().ToOtlpResource(), batch); - } + var request = CreateTraceExportRequest(sdkOptions, batch, resourceBuilder.Build()); Assert.Single(request.ResourceSpans); var otlpResource = request.ResourceSpans.First().Resource; @@ -231,10 +220,8 @@ void RunTest(SdkLimitOptions sdkOptions, Batch batch) } } - [Theory] - [InlineData(true)] - [InlineData(false)] - public void ScopeAttributesRemainConsistentAcrossMultipleBatches(bool useCustomSerializer) + [Fact] + public void ScopeAttributesRemainConsistentAcrossMultipleBatches() { var activitySourceTags = new TagList { @@ -275,16 +262,7 @@ public void ScopeAttributesRemainConsistentAcrossMultipleBatches(bool useCustomS void RunTest(SdkLimitOptions sdkOptions, Batch batch, ActivitySource activitySource) { - var request = new OtlpCollector.ExportTraceServiceRequest(); - - if (useCustomSerializer) - { - request = CreateTraceExportRequest(sdkOptions, batch, resourceBuilder.Build()); - } - else - { - request.AddBatch(sdkOptions, resourceBuilder.Build().ToOtlpResource(), batch); - } + var request = CreateTraceExportRequest(sdkOptions, batch, resourceBuilder.Build()); var resourceSpans = request.ResourceSpans.First(); Assert.NotNull(request.ResourceSpans.First()); @@ -305,8 +283,7 @@ void RunTest(SdkLimitOptions sdkOptions, Batch batch, ActivitySource a } // Return and re-add batch to simulate reuse - request.Return(); - request.AddBatch(DefaultSdkLimitOptions, ResourceBuilder.CreateDefault().Build().ToOtlpResource(), batch); + request = CreateTraceExportRequest(DefaultSdkLimitOptions, batch, ResourceBuilder.CreateDefault().Build()); resourceSpans = request.ResourceSpans.First(); scopeSpans = resourceSpans.ScopeSpans.First(); @@ -320,16 +297,11 @@ void RunTest(SdkLimitOptions sdkOptions, Batch batch, ActivitySource a { Assert.Contains(scope.Attributes, (kvp) => kvp.Key == tag.Key && kvp.Value.StringValue == (string?)tag.Value); } - - // Return and re-add batch to simulate reuse - request.Return(); } } - [Theory] - [InlineData(true)] - [InlineData(false)] - public void ScopeAttributesLimitsTest(bool useCustomSerializer) + [Fact] + public void ScopeAttributesLimitsTest() { var sdkOptions = new SdkLimitOptions() { @@ -367,16 +339,7 @@ public void ScopeAttributesLimitsTest(bool useCustomSerializer) void RunTest(SdkLimitOptions sdkOptions, Batch batch) { - var request = new OtlpCollector.ExportTraceServiceRequest(); - - if (useCustomSerializer) - { - request = CreateTraceExportRequest(sdkOptions, batch, resourceBuilder.Build()); - } - else - { - request.AddBatch(sdkOptions, resourceBuilder.Build().ToOtlpResource(), batch); - } + var request = CreateTraceExportRequest(sdkOptions, batch, resourceBuilder.Build()); var resourceSpans = request.ResourceSpans.First(); Assert.NotNull(request.ResourceSpans.First()); @@ -392,19 +355,11 @@ void RunTest(SdkLimitOptions sdkOptions, Batch batch) Assert.Equal("1234", scope.Attributes[0].Value.StringValue); this.ArrayValueAsserts(scope.Attributes[1].Value.ArrayValue.Values); Assert.Equal(new object().ToString()!.Substring(0, 4), scope.Attributes[2].Value.StringValue); - - // Return and re-add batch to simulate reuse - if (!useCustomSerializer) - { - request.Return(); - } } } - [Theory] - [InlineData(true)] - [InlineData(false)] - public void SpanLimitsTest(bool useCustomSerializer) + [Fact] + public void SpanLimitsTest() { var sdkOptions = new SdkLimitOptions() { @@ -439,7 +394,7 @@ public void SpanLimitsTest(bool useCustomSerializer) activity.AddEvent(event1); activity.AddEvent(event2); - var otlpSpan = useCustomSerializer ? ToOtlpSpan(sdkOptions, activity) : activity.ToOtlpSpan(sdkOptions); + var otlpSpan = ToOtlpSpan(sdkOptions, activity); Assert.NotNull(otlpSpan); Assert.Equal(3, otlpSpan.Attributes.Count); @@ -465,10 +420,8 @@ public void SpanLimitsTest(bool useCustomSerializer) Assert.Equal(new object().ToString()!.Substring(0, 4), otlpSpan.Links[0].Attributes[2].Value.StringValue); } - [Theory] - [InlineData(true)] - [InlineData(false)] - public void ToOtlpSpanTest(bool useCustomSerializer) + [Fact] + public void ToOtlpSpanTest() { using var activitySource = new ActivitySource(nameof(this.ToOtlpSpanTest)); @@ -510,7 +463,7 @@ public void ToOtlpSpanTest(bool useCustomSerializer) rootActivity.TraceId.CopyTo(traceIdSpan); var traceId = traceIdSpan.ToArray(); - var otlpSpan = useCustomSerializer ? ToOtlpSpan(DefaultSdkLimitOptions, rootActivity) : rootActivity.ToOtlpSpan(DefaultSdkLimitOptions); + var otlpSpan = ToOtlpSpan(DefaultSdkLimitOptions, rootActivity); Assert.NotNull(otlpSpan); Assert.Equal("root", otlpSpan.Name); @@ -546,7 +499,7 @@ public void ToOtlpSpanTest(bool useCustomSerializer) rootActivity.Context.SpanId.CopyTo(parentIdSpan); var parentId = parentIdSpan.ToArray(); - otlpSpan = useCustomSerializer ? ToOtlpSpan(DefaultSdkLimitOptions, childActivity) : childActivity.ToOtlpSpan(DefaultSdkLimitOptions); + otlpSpan = ToOtlpSpan(DefaultSdkLimitOptions, childActivity); Assert.NotNull(otlpSpan); Assert.Equal("child", otlpSpan.Name); @@ -581,10 +534,8 @@ public void ToOtlpSpanTest(bool useCustomSerializer) Assert.False(flags.HasFlag(OtlpTrace.SpanFlags.ContextIsRemoteMask)); } - [Theory] - [InlineData(true)] - [InlineData(false)] - public void ToOtlpSpanActivitiesWithNullArrayTest(bool useCustomSerializer) + [Fact] + public void ToOtlpSpanActivitiesWithNullArrayTest() { using var activitySource = new ActivitySource(nameof(this.ToOtlpSpanTest)); @@ -594,7 +545,7 @@ public void ToOtlpSpanActivitiesWithNullArrayTest(bool useCustomSerializer) var stringArr = new string?[] { "test", string.Empty, null }; rootActivity.SetTag("stringArray", stringArr); - var otlpSpan = useCustomSerializer ? ToOtlpSpan(DefaultSdkLimitOptions, rootActivity) : rootActivity.ToOtlpSpan(DefaultSdkLimitOptions); + var otlpSpan = ToOtlpSpan(DefaultSdkLimitOptions, rootActivity); Assert.NotNull(otlpSpan); @@ -607,20 +558,17 @@ public void ToOtlpSpanActivitiesWithNullArrayTest(bool useCustomSerializer) } [Theory] - [InlineData(ActivityStatusCode.Unset, "Description will be ignored if status is Unset.", true)] - [InlineData(ActivityStatusCode.Ok, "Description will be ignored if status is Okay.", true)] - [InlineData(ActivityStatusCode.Error, "Description will be kept if status is Error.", true)] - [InlineData(ActivityStatusCode.Unset, "Description will be ignored if status is Unset.", false)] - [InlineData(ActivityStatusCode.Ok, "Description will be ignored if status is Okay.", false)] - [InlineData(ActivityStatusCode.Error, "Description will be kept if status is Error.", false)] - public void ToOtlpSpanNativeActivityStatusTest(ActivityStatusCode expectedStatusCode, string statusDescription, bool useCustomSerializer) + [InlineData(ActivityStatusCode.Unset, "Description will be ignored if status is Unset.")] + [InlineData(ActivityStatusCode.Ok, "Description will be ignored if status is Okay.")] + [InlineData(ActivityStatusCode.Error, "Description will be kept if status is Error.")] + public void ToOtlpSpanNativeActivityStatusTest(ActivityStatusCode expectedStatusCode, string statusDescription) { using var activitySource = new ActivitySource(nameof(this.ToOtlpSpanTest)); using var activity = activitySource.StartActivity("Name"); Assert.NotNull(activity); activity.SetStatus(expectedStatusCode, statusDescription); - var otlpSpan = useCustomSerializer ? ToOtlpSpan(DefaultSdkLimitOptions, activity) : activity.ToOtlpSpan(DefaultSdkLimitOptions); + var otlpSpan = ToOtlpSpan(DefaultSdkLimitOptions, activity); Assert.NotNull(otlpSpan); if (expectedStatusCode == ActivityStatusCode.Unset) { @@ -655,7 +603,7 @@ public void ToOtlpSpanStatusTagTest(StatusCode expectedStatusCode, string status activity.SetTag(SpanAttributeConstants.StatusCodeKey, statusCodeTagValue); activity.SetTag(SpanAttributeConstants.StatusDescriptionKey, statusDescription); - var otlpSpan = activity.ToOtlpSpan(DefaultSdkLimitOptions); + var otlpSpan = ToOtlpSpan(DefaultSdkLimitOptions, activity); Assert.NotNull(otlpSpan); Assert.NotNull(otlpSpan.Status); @@ -683,7 +631,7 @@ public void ToOtlpSpanStatusTagIsCaseInsensitiveTest(StatusCode expectedStatusCo Assert.NotNull(activity); activity.SetTag(SpanAttributeConstants.StatusCodeKey, statusCodeTagValue); - var otlpSpan = activity.ToOtlpSpan(DefaultSdkLimitOptions); + var otlpSpan = ToOtlpSpan(DefaultSdkLimitOptions, activity); Assert.NotNull(otlpSpan); Assert.NotNull(otlpSpan.Status); @@ -702,7 +650,7 @@ public void ToOtlpSpanActivityStatusTakesPrecedenceOverStatusTagsWhenActivitySta activity.SetTag(SpanAttributeConstants.StatusCodeKey, "ERROR"); activity.SetTag(SpanAttributeConstants.StatusDescriptionKey, tagDescriptionOnError); - var otlpSpan = activity.ToOtlpSpan(DefaultSdkLimitOptions); + var otlpSpan = ToOtlpSpan(DefaultSdkLimitOptions, activity); Assert.NotNull(otlpSpan); Assert.NotNull(otlpSpan.Status); @@ -721,7 +669,7 @@ public void ToOtlpSpanActivityStatusTakesPrecedenceOverStatusTagsWhenActivitySta activity.SetStatus(ActivityStatusCode.Error, statusDescriptionOnError); activity.SetTag(SpanAttributeConstants.StatusCodeKey, "OK"); - var otlpSpan = activity.ToOtlpSpan(DefaultSdkLimitOptions); + var otlpSpan = ToOtlpSpan(DefaultSdkLimitOptions, activity); Assert.NotNull(otlpSpan); Assert.NotNull(otlpSpan.Status); @@ -730,11 +678,9 @@ public void ToOtlpSpanActivityStatusTakesPrecedenceOverStatusTagsWhenActivitySta } [Theory] - [InlineData(true, true)] - [InlineData(false, true)] - [InlineData(true, false)] - [InlineData(false, false)] - public void ToOtlpSpanTraceStateTest(bool traceStateWasSet, bool useCustomSerializer) + [InlineData(true)] + [InlineData(false)] + public void ToOtlpSpanTraceStateTest(bool traceStateWasSet) { using var activitySource = new ActivitySource(nameof(this.ToOtlpSpanTest)); using var activity = activitySource.StartActivity("Name"); @@ -745,7 +691,7 @@ public void ToOtlpSpanTraceStateTest(bool traceStateWasSet, bool useCustomSerial activity.TraceStateString = tracestate; } - var otlpSpan = useCustomSerializer ? ToOtlpSpan(DefaultSdkLimitOptions, activity) : activity.ToOtlpSpan(DefaultSdkLimitOptions); + var otlpSpan = ToOtlpSpan(DefaultSdkLimitOptions, activity); Assert.NotNull(otlpSpan); if (traceStateWasSet) @@ -759,26 +705,6 @@ public void ToOtlpSpanTraceStateTest(bool traceStateWasSet, bool useCustomSerial } } - [Fact] - public void ToOtlpSpanPeerServiceTest() - { - using var activitySource = new ActivitySource(nameof(this.ToOtlpSpanTest)); - - using var rootActivity = activitySource.StartActivity("root", ActivityKind.Client); - - Assert.NotNull(rootActivity); - rootActivity.SetTag(SemanticConventions.AttributeHttpHost, "opentelemetry.io"); - - var otlpSpan = rootActivity.ToOtlpSpan(DefaultSdkLimitOptions); - - Assert.NotNull(otlpSpan); - - var peerService = otlpSpan.Attributes.FirstOrDefault(kvp => kvp.Key == SemanticConventions.AttributePeerService); - - Assert.NotNull(peerService); - Assert.Equal("opentelemetry.io", peerService.Value.StringValue); - } - [Fact] public void UseOpenTelemetryProtocolActivityExporterWithCustomActivityProcessor() { @@ -817,10 +743,10 @@ public void UseOpenTelemetryProtocolActivityExporterWithCustomActivityProcessor( [Fact] public void Shutdown_ClientShutdownIsCalled() { - var exportClientMock = new TestExportClient(); + var exportClientMock = new TestProtobufExportClient(); var exporterOptions = new OtlpExporterOptions(); - var transmissionHandler = new OtlpExporterTransmissionHandler(exportClientMock, exporterOptions.TimeoutMilliseconds); + var transmissionHandler = new ProtobufOtlpExporterTransmissionHandler(exportClientMock, exporterOptions.TimeoutMilliseconds); using var exporter = new OtlpTraceExporter(new OtlpExporterOptions(), DefaultSdkLimitOptions, DefaultExperimentalOptions, transmissionHandler); exporter.Shutdown(); @@ -934,15 +860,11 @@ public void NamedOptionsMutateSeparateInstancesTest() } [Theory] - [InlineData(true, true, true)] - [InlineData(true, false, true)] - [InlineData(false, true, true)] - [InlineData(false, false, true)] - [InlineData(true, true, false)] - [InlineData(true, false, false)] - [InlineData(false, true, false)] - [InlineData(false, false, false)] - public void SpanFlagsTest(bool isRecorded, bool isRemote, bool useCustomSerializer) + [InlineData(true, true)] + [InlineData(true, false)] + [InlineData(false, true)] + [InlineData(false, false)] + public void SpanFlagsTest(bool isRecorded, bool isRemote) { using var activitySource = new ActivitySource(nameof(this.SpanFlagsTest)); @@ -955,7 +877,7 @@ public void SpanFlagsTest(bool isRecorded, bool isRemote, bool useCustomSerializ using var rootActivity = activitySource.StartActivity("root", ActivityKind.Server, ctx); Assert.NotNull(rootActivity); - var otlpSpan = useCustomSerializer ? ToOtlpSpan(DefaultSdkLimitOptions, rootActivity) : rootActivity.ToOtlpSpan(DefaultSdkLimitOptions); + var otlpSpan = ToOtlpSpan(DefaultSdkLimitOptions, rootActivity); Assert.NotNull(otlpSpan); var flags = (OtlpTrace.SpanFlags)otlpSpan.Flags; @@ -984,15 +906,11 @@ public void SpanFlagsTest(bool isRecorded, bool isRemote, bool useCustomSerializ } [Theory] - [InlineData(true, true, true)] - [InlineData(true, false, true)] - [InlineData(false, true, true)] - [InlineData(false, false, true)] - [InlineData(true, true, false)] - [InlineData(true, false, false)] - [InlineData(false, true, false)] - [InlineData(false, false, false)] - public void SpanLinkFlagsTest(bool isRecorded, bool isRemote, bool useCustomSerializer) + [InlineData(true, true)] + [InlineData(true, false)] + [InlineData(false, true)] + [InlineData(false, false)] + public void SpanLinkFlagsTest(bool isRecorded, bool isRemote) { using var activitySource = new ActivitySource(nameof(this.SpanLinkFlagsTest)); @@ -1010,7 +928,7 @@ public void SpanLinkFlagsTest(bool isRecorded, bool isRemote, bool useCustomSeri using var rootActivity = activitySource.StartActivity("root", ActivityKind.Server, default(ActivityContext), links: links); Assert.NotNull(rootActivity); - var otlpSpan = useCustomSerializer ? ToOtlpSpan(DefaultSdkLimitOptions, rootActivity) : rootActivity.ToOtlpSpan(DefaultSdkLimitOptions); + var otlpSpan = ToOtlpSpan(DefaultSdkLimitOptions, rootActivity); Assert.NotNull(otlpSpan); var spanLink = Assert.Single(otlpSpan.Links); diff --git a/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/TestProtobufExportClient.cs b/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/TestProtobufExportClient.cs new file mode 100644 index 00000000000..28fab7ea74c --- /dev/null +++ b/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/TestProtobufExportClient.cs @@ -0,0 +1,40 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient; + +namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests; + +internal class TestProtobufExportClient(bool throwException = false) : IProtobufExportClient +{ + public bool SendExportRequestCalled { get; private set; } + + public bool ShutdownCalled { get; private set; } + + public bool ThrowException { get; set; } = throwException; + + public ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default) + { + if (this.ThrowException) + { + throw new Exception("Exception thrown from SendExportRequest"); + } + + this.SendExportRequestCalled = true; + return new TestExportClientResponse(true, deadlineUtc, null); + } + + public bool Shutdown(int timeoutMilliseconds) + { + this.ShutdownCalled = true; + return true; + } + + private class TestExportClientResponse : ExportClientResponse + { + public TestExportClientResponse(bool success, DateTime deadline, Exception? exception) + : base(success, deadline, exception) + { + } + } +}