Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
added additional tags for stream span
Browse files Browse the repository at this point in the history
  • Loading branch information
Kunal Jha committed Oct 24, 2019
1 parent 0dfb4ff commit a746016
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ private StreamingResponseBody stream(final String subscriptionId,
subscriptionValidationService.validatePartitionsToStream(subscription,
streamParameters.getPartitions());
streamer = subscriptionStreamerFactory.build(subscription, streamParameters, output,
connectionReady, blacklistService, parentSubscriptionSpan);
connectionReady, blacklistService, parentSubscriptionSpan, client.getClientId());
streamer.stream();
} catch (final InterruptedException ex) {
LOG.warn("Interrupted while streaming with " + streamer, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public SubscriptionStreamer build(
final StreamParameters streamParameters,
final SubscriptionOutput output,
final AtomicBoolean connectionReady,
final BlacklistService blacklistService, final Span parentSpan)
final BlacklistService blacklistService,
final Span parentSpan, final String clientId)
throws InternalNakadiException, NoSuchEventTypeException {
final Session session = Session.generate(1, streamParameters.getPartitions());
final ZkSubscriptionClient zkClient = zkClientFactory.createClient(
Expand All @@ -97,6 +98,8 @@ public SubscriptionStreamer build(
if (parentSpan != null) {
streamSpan = TracingService.getNewSpanWithReference("streaming_async",
System.currentTimeMillis(), parentSpan.context());
streamSpan.setTag("client", clientId);
streamSpan.setTag("subscription.id", subscription.getId());
} else {
streamSpan = null;
}
Expand Down

0 comments on commit a746016

Please sign in to comment.