Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add activity on connection #1734

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions projects/RabbitMQ.Client/Impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,10 @@ internal void TakeOver(Connection other)
internal async ValueTask<IConnection> OpenAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

Activity? connectionActivity = RabbitMQActivitySource.OpenConnection(_frameHandler);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How/where does this activity get completed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are totally right. Missing some using keyword.
By reading the experimental activity on dotnet 9 http client I don't know if at some point we should link producer and consumer activity to the connection activity?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by "should link producer and consumer activity"? My thinking was that creating the connection is the activity. Once the connection is created/established, the activity can be completed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Link how? Have the connection activity be the parent?

try
{
RabbitMqClientEventSource.Log.ConnectionOpened();

cancellationToken.ThrowIfCancellationRequested();

// Note: this must happen *after* the frame handler is started
Expand All @@ -248,8 +247,9 @@ await _channel0.ConnectionOpenAsync(_config.VirtualHost, cancellationToken)

return this;
}
catch
catch (Exception ex)
{
connectionActivity?.ReportException(ex);
try
{
var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.InternalError, "FailedOpen");
Expand Down
71 changes: 60 additions & 11 deletions projects/RabbitMQ.Client/Impl/RabbitMQActivitySource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ namespace RabbitMQ.Client
{
public static class RabbitMQActivitySource
{
private const string ExceptionEventName = "exception";
private const string ExceptionMessageTag = "exception.message";
private const string ExceptionStackTraceTag = "exception.stacktrace";

private const string ExceptionTypeTag = "exception.type";

// These constants are defined in the OpenTelemetry specification:
// https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#messaging-attributes
internal const string MessageId = "messaging.message.id";
Expand Down Expand Up @@ -38,10 +44,15 @@ public static class RabbitMQActivitySource
private static readonly ActivitySource s_subscriberSource =
new ActivitySource(SubscriberSourceName, AssemblyVersion);

private static readonly ActivitySource s_connectionSource =
new ActivitySource(ConnectionSourceName, AssemblyVersion);

public const string PublisherSourceName = "RabbitMQ.Client.Publisher";
public const string SubscriberSourceName = "RabbitMQ.Client.Subscriber";
public const string ConnectionSourceName = "RabbitMQ.Client.Connection";

public static Action<Activity, IDictionary<string, object?>> ContextInjector { get; set; } = DefaultContextInjector;
public static Action<Activity, IDictionary<string, object?>> ContextInjector { get; set; } =
DefaultContextInjector;

public static Func<IReadOnlyBasicProperties, ActivityContext> ContextExtractor { get; set; } =
DefaultContextExtractor;
Expand All @@ -56,6 +67,20 @@ public static class RabbitMQActivitySource
new KeyValuePair<string, object?>(ProtocolVersion, "0.9.1")
};

internal static Activity? OpenConnection(IFrameHandler frameHandler)
{
if (!s_connectionSource.HasListeners())
{
return null;
}

Comment on lines +72 to +76
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (!s_connectionSource.HasListeners())
{
return null;
}

I don't think this is necessary. ActivitySource.CreateActivity will do this check and return null.

https://github.com/dotnet/runtime/blob/1622f514684d94a521bfb41c88a27079ad943ee7/src/libraries/System.Diagnostics.DiagnosticSource/src/System/Diagnostics/ActivitySource.cs#L192-L200

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I take the same logic as the rest of the implem but you are right when I read the Microsoft code this line is useless

Activity? connectionActivity =
s_connectionSource.StartRabbitMQActivity("connection attempt", ActivityKind.Client);
connectionActivity?
.SetNetworkTags(frameHandler);
return connectionActivity;
}

internal static Activity? Send(string routingKey, string exchange, int bodySize,
ActivityContext linkedContext = default)
{
Expand All @@ -66,18 +91,21 @@ public static class RabbitMQActivitySource

Activity? activity = linkedContext == default
? s_publisherSource.StartRabbitMQActivity(
UseRoutingKeyAsOperationName ? $"{routingKey} {MessagingOperationTypeSend}" : MessagingOperationTypeSend,
UseRoutingKeyAsOperationName
? $"{routingKey} {MessagingOperationTypeSend}"
: MessagingOperationTypeSend,
ActivityKind.Producer)
: s_publisherSource.StartLinkedRabbitMQActivity(
UseRoutingKeyAsOperationName ? $"{routingKey} {MessagingOperationTypeSend}" : MessagingOperationTypeSend,
UseRoutingKeyAsOperationName
? $"{routingKey} {MessagingOperationTypeSend}"
: MessagingOperationTypeSend,
ActivityKind.Producer, linkedContext);
if (activity != null && activity.IsAllDataRequested)
{
PopulateMessagingTags(MessagingOperationTypeSend, routingKey, exchange, 0, bodySize, activity);
}

return activity;

}

internal static Activity? ReceiveEmpty(string queue)
Expand All @@ -88,7 +116,9 @@ public static class RabbitMQActivitySource
}

Activity? activity = s_subscriberSource.StartRabbitMQActivity(
UseRoutingKeyAsOperationName ? $"{queue} {MessagingOperationTypeReceive}" : MessagingOperationTypeReceive,
UseRoutingKeyAsOperationName
? $"{queue} {MessagingOperationTypeReceive}"
: MessagingOperationTypeReceive,
ActivityKind.Consumer);
if (activity != null && activity.IsAllDataRequested)
{
Expand All @@ -110,11 +140,14 @@ public static class RabbitMQActivitySource

// Extract the PropagationContext of the upstream parent from the message headers.
Activity? activity = s_subscriberSource.StartLinkedRabbitMQActivity(
UseRoutingKeyAsOperationName ? $"{routingKey} {MessagingOperationTypeReceive}" : MessagingOperationTypeReceive, ActivityKind.Consumer,
UseRoutingKeyAsOperationName
? $"{routingKey} {MessagingOperationTypeReceive}"
: MessagingOperationTypeReceive, ActivityKind.Consumer,
ContextExtractor(readOnlyBasicProperties));
if (activity != null && activity.IsAllDataRequested)
{
PopulateMessagingTags(MessagingOperationTypeReceive, routingKey, exchange, deliveryTag, readOnlyBasicProperties,
PopulateMessagingTags(MessagingOperationTypeReceive, routingKey, exchange, deliveryTag,
readOnlyBasicProperties,
bodySize, activity);
}

Expand All @@ -131,7 +164,9 @@ public static class RabbitMQActivitySource

// Extract the PropagationContext of the upstream parent from the message headers.
Activity? activity = s_subscriberSource.StartLinkedRabbitMQActivity(
UseRoutingKeyAsOperationName ? $"{routingKey} {MessagingOperationTypeProcess}" : MessagingOperationTypeProcess,
UseRoutingKeyAsOperationName
? $"{routingKey} {MessagingOperationTypeProcess}"
: MessagingOperationTypeProcess,
ActivityKind.Consumer, ContextExtractor(basicProperties));
if (activity != null && activity.IsAllDataRequested)
{
Expand All @@ -142,10 +177,22 @@ public static class RabbitMQActivitySource
return activity;
}

internal static void ReportException(this Activity activity, Exception exception)
{
ActivityTagsCollection exceptionTags = new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to just call .AddTag on the current Activity for each tag instead of adding a new event.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opentelemetry spec states that exception should be recorded as event and not tag on the current activity

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

System.Diagnostics.DiagnosticSource 9.0.0 adds a AddException method to the Activity class.

You can see the implementation here.

Although it's safe to add a dependency to version 9.0.0 of this package, if you don't want to risk that, you can add it as an extension method to this project.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This recordException is basically a copy paste of the MCR method, I don't know if dependency bump is allowed

exceptionTags.Add(new KeyValuePair<string, object?>(ExceptionMessageTag, exception.Message));
exceptionTags.Add(new KeyValuePair<string, object?>(ExceptionStackTraceTag, exception.ToString()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
exceptionTags.Add(new KeyValuePair<string, object?>(ExceptionStackTraceTag, exception.ToString()));
// Note that "exception.stacktrace" is the full exception detail, not just the StackTrace property.
// See https://opentelemetry.io/docs/specs/semconv/attributes-registry/exception/
// and https://github.com/open-telemetry/opentelemetry-specification/pull/697#discussion_r453662519
exceptionTags.Add(new KeyValuePair<string, object?>(ExceptionStackTraceTag, exception.ToString()));

exceptionTags.Add(new KeyValuePair<string, object?>(ExceptionTypeTag, exception.GetType().ToString()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
exceptionTags.Add(new KeyValuePair<string, object?>(ExceptionTypeTag, exception.GetType().ToString()));
exceptionTags.Add(new KeyValuePair<string, object?>(ExceptionTypeTag, exception.GetType().FullName));

activity.AddEvent(new ActivityEvent(ExceptionEventName, default, exceptionTags));

activity.SetStatus(ActivityStatusCode.Error);
}

private static Activity? StartRabbitMQActivity(this ActivitySource source, string name, ActivityKind kind,
ActivityContext parentContext = default)
{
return source.CreateActivity(name, kind, parentContext, idFormat: ActivityIdFormat.W3C, tags: CreationTags)?.Start();
return source.CreateActivity(name, kind, parentContext, idFormat: ActivityIdFormat.W3C, tags: CreationTags)
?.Start();
}

private static Activity? StartLinkedRabbitMQActivity(this ActivitySource source, string name, ActivityKind kind,
Expand Down Expand Up @@ -273,7 +320,8 @@ private static ActivityContext DefaultContextExtractor(IReadOnlyBasicProperties
return default;
}

DistributedContextPropagator.Current.ExtractTraceIdAndState(props.Headers, DefaultContextGetter, out string? traceParent, out string? traceState);
DistributedContextPropagator.Current.ExtractTraceIdAndState(props.Headers, DefaultContextGetter,
out string? traceParent, out string? traceState);
return ActivityContext.TryParse(traceParent, traceState, out ActivityContext context) ? context : default;
}

Expand All @@ -288,7 +336,8 @@ private static void DefaultContextSetter(object? carrier, string name, string va
carrierDictionary[name] = value;
}

private static void DefaultContextGetter(object? carrier, string name, out string? value, out IEnumerable<string>? values)
private static void DefaultContextGetter(object? carrier, string name, out string? value,
out IEnumerable<string>? values)
{
if (carrier is IDictionary<string, object> carrierDict &&
carrierDict.TryGetValue(name, out object? propsVal) && propsVal is byte[] bytes)
Expand Down
1 change: 1 addition & 0 deletions projects/RabbitMQ.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
const RabbitMQ.Client.RabbitMQActivitySource.ConnectionSourceName = "RabbitMQ.Client.Connection" -> string!
Loading