-
Notifications
You must be signed in to change notification settings - Fork 592
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add activity on connection #1734
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -11,6 +11,12 @@ namespace RabbitMQ.Client | |||||||||||
{ | ||||||||||||
public static class RabbitMQActivitySource | ||||||||||||
{ | ||||||||||||
private const string ExceptionEventName = "exception"; | ||||||||||||
private const string ExceptionMessageTag = "exception.message"; | ||||||||||||
private const string ExceptionStackTraceTag = "exception.stacktrace"; | ||||||||||||
|
||||||||||||
private const string ExceptionTypeTag = "exception.type"; | ||||||||||||
|
||||||||||||
// These constants are defined in the OpenTelemetry specification: | ||||||||||||
// https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#messaging-attributes | ||||||||||||
internal const string MessageId = "messaging.message.id"; | ||||||||||||
|
@@ -38,10 +44,15 @@ public static class RabbitMQActivitySource | |||||||||||
private static readonly ActivitySource s_subscriberSource = | ||||||||||||
new ActivitySource(SubscriberSourceName, AssemblyVersion); | ||||||||||||
|
||||||||||||
private static readonly ActivitySource s_connectionSource = | ||||||||||||
new ActivitySource(ConnectionSourceName, AssemblyVersion); | ||||||||||||
|
||||||||||||
public const string PublisherSourceName = "RabbitMQ.Client.Publisher"; | ||||||||||||
public const string SubscriberSourceName = "RabbitMQ.Client.Subscriber"; | ||||||||||||
public const string ConnectionSourceName = "RabbitMQ.Client.Connection"; | ||||||||||||
|
||||||||||||
public static Action<Activity, IDictionary<string, object?>> ContextInjector { get; set; } = DefaultContextInjector; | ||||||||||||
public static Action<Activity, IDictionary<string, object?>> ContextInjector { get; set; } = | ||||||||||||
DefaultContextInjector; | ||||||||||||
|
||||||||||||
public static Func<IReadOnlyBasicProperties, ActivityContext> ContextExtractor { get; set; } = | ||||||||||||
DefaultContextExtractor; | ||||||||||||
|
@@ -56,6 +67,20 @@ public static class RabbitMQActivitySource | |||||||||||
new KeyValuePair<string, object?>(ProtocolVersion, "0.9.1") | ||||||||||||
}; | ||||||||||||
|
||||||||||||
internal static Activity? OpenConnection(IFrameHandler frameHandler) | ||||||||||||
{ | ||||||||||||
if (!s_connectionSource.HasListeners()) | ||||||||||||
{ | ||||||||||||
return null; | ||||||||||||
} | ||||||||||||
|
||||||||||||
Comment on lines
+72
to
+76
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
I don't think this is necessary. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I take the same logic as the rest of the implem but you are right when I read the Microsoft code this line is useless |
||||||||||||
Activity? connectionActivity = | ||||||||||||
s_connectionSource.StartRabbitMQActivity("connection attempt", ActivityKind.Client); | ||||||||||||
connectionActivity? | ||||||||||||
.SetNetworkTags(frameHandler); | ||||||||||||
return connectionActivity; | ||||||||||||
} | ||||||||||||
|
||||||||||||
internal static Activity? Send(string routingKey, string exchange, int bodySize, | ||||||||||||
ActivityContext linkedContext = default) | ||||||||||||
{ | ||||||||||||
|
@@ -66,18 +91,21 @@ public static class RabbitMQActivitySource | |||||||||||
|
||||||||||||
Activity? activity = linkedContext == default | ||||||||||||
? s_publisherSource.StartRabbitMQActivity( | ||||||||||||
UseRoutingKeyAsOperationName ? $"{routingKey} {MessagingOperationTypeSend}" : MessagingOperationTypeSend, | ||||||||||||
UseRoutingKeyAsOperationName | ||||||||||||
? $"{routingKey} {MessagingOperationTypeSend}" | ||||||||||||
: MessagingOperationTypeSend, | ||||||||||||
ActivityKind.Producer) | ||||||||||||
: s_publisherSource.StartLinkedRabbitMQActivity( | ||||||||||||
UseRoutingKeyAsOperationName ? $"{routingKey} {MessagingOperationTypeSend}" : MessagingOperationTypeSend, | ||||||||||||
UseRoutingKeyAsOperationName | ||||||||||||
? $"{routingKey} {MessagingOperationTypeSend}" | ||||||||||||
: MessagingOperationTypeSend, | ||||||||||||
ActivityKind.Producer, linkedContext); | ||||||||||||
if (activity != null && activity.IsAllDataRequested) | ||||||||||||
{ | ||||||||||||
PopulateMessagingTags(MessagingOperationTypeSend, routingKey, exchange, 0, bodySize, activity); | ||||||||||||
} | ||||||||||||
|
||||||||||||
return activity; | ||||||||||||
|
||||||||||||
} | ||||||||||||
|
||||||||||||
internal static Activity? ReceiveEmpty(string queue) | ||||||||||||
|
@@ -88,7 +116,9 @@ public static class RabbitMQActivitySource | |||||||||||
} | ||||||||||||
|
||||||||||||
Activity? activity = s_subscriberSource.StartRabbitMQActivity( | ||||||||||||
UseRoutingKeyAsOperationName ? $"{queue} {MessagingOperationTypeReceive}" : MessagingOperationTypeReceive, | ||||||||||||
UseRoutingKeyAsOperationName | ||||||||||||
? $"{queue} {MessagingOperationTypeReceive}" | ||||||||||||
: MessagingOperationTypeReceive, | ||||||||||||
ActivityKind.Consumer); | ||||||||||||
if (activity != null && activity.IsAllDataRequested) | ||||||||||||
{ | ||||||||||||
|
@@ -110,11 +140,14 @@ public static class RabbitMQActivitySource | |||||||||||
|
||||||||||||
// Extract the PropagationContext of the upstream parent from the message headers. | ||||||||||||
Activity? activity = s_subscriberSource.StartLinkedRabbitMQActivity( | ||||||||||||
UseRoutingKeyAsOperationName ? $"{routingKey} {MessagingOperationTypeReceive}" : MessagingOperationTypeReceive, ActivityKind.Consumer, | ||||||||||||
UseRoutingKeyAsOperationName | ||||||||||||
? $"{routingKey} {MessagingOperationTypeReceive}" | ||||||||||||
: MessagingOperationTypeReceive, ActivityKind.Consumer, | ||||||||||||
ContextExtractor(readOnlyBasicProperties)); | ||||||||||||
if (activity != null && activity.IsAllDataRequested) | ||||||||||||
{ | ||||||||||||
PopulateMessagingTags(MessagingOperationTypeReceive, routingKey, exchange, deliveryTag, readOnlyBasicProperties, | ||||||||||||
PopulateMessagingTags(MessagingOperationTypeReceive, routingKey, exchange, deliveryTag, | ||||||||||||
readOnlyBasicProperties, | ||||||||||||
bodySize, activity); | ||||||||||||
} | ||||||||||||
|
||||||||||||
|
@@ -131,7 +164,9 @@ public static class RabbitMQActivitySource | |||||||||||
|
||||||||||||
// Extract the PropagationContext of the upstream parent from the message headers. | ||||||||||||
Activity? activity = s_subscriberSource.StartLinkedRabbitMQActivity( | ||||||||||||
UseRoutingKeyAsOperationName ? $"{routingKey} {MessagingOperationTypeProcess}" : MessagingOperationTypeProcess, | ||||||||||||
UseRoutingKeyAsOperationName | ||||||||||||
? $"{routingKey} {MessagingOperationTypeProcess}" | ||||||||||||
: MessagingOperationTypeProcess, | ||||||||||||
ActivityKind.Consumer, ContextExtractor(basicProperties)); | ||||||||||||
if (activity != null && activity.IsAllDataRequested) | ||||||||||||
{ | ||||||||||||
|
@@ -142,10 +177,22 @@ public static class RabbitMQActivitySource | |||||||||||
return activity; | ||||||||||||
} | ||||||||||||
|
||||||||||||
internal static void ReportException(this Activity activity, Exception exception) | ||||||||||||
{ | ||||||||||||
ActivityTagsCollection exceptionTags = new(); | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should be able to just call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Opentelemetry spec states that exception should be recorded as event and not tag on the current activity There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. System.Diagnostics.DiagnosticSource 9.0.0 adds a You can see the implementation here. Although it's safe to add a dependency to version 9.0.0 of this package, if you don't want to risk that, you can add it as an extension method to this project. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This recordException is basically a copy paste of the MCR method, I don't know if dependency bump is allowed |
||||||||||||
exceptionTags.Add(new KeyValuePair<string, object?>(ExceptionMessageTag, exception.Message)); | ||||||||||||
exceptionTags.Add(new KeyValuePair<string, object?>(ExceptionStackTraceTag, exception.ToString())); | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||
exceptionTags.Add(new KeyValuePair<string, object?>(ExceptionTypeTag, exception.GetType().ToString())); | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||
activity.AddEvent(new ActivityEvent(ExceptionEventName, default, exceptionTags)); | ||||||||||||
|
||||||||||||
activity.SetStatus(ActivityStatusCode.Error); | ||||||||||||
} | ||||||||||||
|
||||||||||||
private static Activity? StartRabbitMQActivity(this ActivitySource source, string name, ActivityKind kind, | ||||||||||||
ActivityContext parentContext = default) | ||||||||||||
{ | ||||||||||||
return source.CreateActivity(name, kind, parentContext, idFormat: ActivityIdFormat.W3C, tags: CreationTags)?.Start(); | ||||||||||||
return source.CreateActivity(name, kind, parentContext, idFormat: ActivityIdFormat.W3C, tags: CreationTags) | ||||||||||||
?.Start(); | ||||||||||||
} | ||||||||||||
|
||||||||||||
private static Activity? StartLinkedRabbitMQActivity(this ActivitySource source, string name, ActivityKind kind, | ||||||||||||
|
@@ -273,7 +320,8 @@ private static ActivityContext DefaultContextExtractor(IReadOnlyBasicProperties | |||||||||||
return default; | ||||||||||||
} | ||||||||||||
|
||||||||||||
DistributedContextPropagator.Current.ExtractTraceIdAndState(props.Headers, DefaultContextGetter, out string? traceParent, out string? traceState); | ||||||||||||
DistributedContextPropagator.Current.ExtractTraceIdAndState(props.Headers, DefaultContextGetter, | ||||||||||||
out string? traceParent, out string? traceState); | ||||||||||||
return ActivityContext.TryParse(traceParent, traceState, out ActivityContext context) ? context : default; | ||||||||||||
} | ||||||||||||
|
||||||||||||
|
@@ -288,7 +336,8 @@ private static void DefaultContextSetter(object? carrier, string name, string va | |||||||||||
carrierDictionary[name] = value; | ||||||||||||
} | ||||||||||||
|
||||||||||||
private static void DefaultContextGetter(object? carrier, string name, out string? value, out IEnumerable<string>? values) | ||||||||||||
private static void DefaultContextGetter(object? carrier, string name, out string? value, | ||||||||||||
out IEnumerable<string>? values) | ||||||||||||
{ | ||||||||||||
if (carrier is IDictionary<string, object> carrierDict && | ||||||||||||
carrierDict.TryGetValue(name, out object? propsVal) && propsVal is byte[] bytes) | ||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
const RabbitMQ.Client.RabbitMQActivitySource.ConnectionSourceName = "RabbitMQ.Client.Connection" -> string! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How/where does this activity get completed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are totally right. Missing some using keyword.
By reading the experimental activity on dotnet 9 http client I don't know if at some point we should link producer and consumer activity to the connection activity?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by "should link producer and consumer activity"? My thinking was that creating the connection is the activity. Once the connection is created/established, the activity can be completed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Link how? Have the connection activity be the parent?