diff --git a/src/main/java/org/zalando/nakadi/controller/SubscriptionStreamController.java b/src/main/java/org/zalando/nakadi/controller/SubscriptionStreamController.java index 00d9f59df0..f88052d69e 100644 --- a/src/main/java/org/zalando/nakadi/controller/SubscriptionStreamController.java +++ b/src/main/java/org/zalando/nakadi/controller/SubscriptionStreamController.java @@ -221,7 +221,7 @@ private StreamingResponseBody stream(final String subscriptionId, subscriptionValidationService.validatePartitionsToStream(subscription, streamParameters.getPartitions()); streamer = subscriptionStreamerFactory.build(subscription, streamParameters, output, - connectionReady, blacklistService, parentSubscriptionSpan); + connectionReady, blacklistService, parentSubscriptionSpan, client.getClientId()); streamer.stream(); } catch (final InterruptedException ex) { LOG.warn("Interrupted while streaming with " + streamer, ex); diff --git a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionStreamerFactory.java b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionStreamerFactory.java index 85b5276fc0..296c30b00d 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionStreamerFactory.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionStreamerFactory.java @@ -86,7 +86,8 @@ public SubscriptionStreamer build( final StreamParameters streamParameters, final SubscriptionOutput output, final AtomicBoolean connectionReady, - final BlacklistService blacklistService, final Span parentSpan) + final BlacklistService blacklistService, + final Span parentSpan, final String clientId) throws InternalNakadiException, NoSuchEventTypeException { final Session session = Session.generate(1, streamParameters.getPartitions()); final ZkSubscriptionClient zkClient = zkClientFactory.createClient( @@ -97,6 +98,8 @@ public SubscriptionStreamer build( if (parentSpan != null) { streamSpan = TracingService.getNewSpanWithReference("streaming_async", System.currentTimeMillis(), parentSpan.context()); + streamSpan.setTag("client", clientId); + streamSpan.setTag("subscription.id", subscription.getId()); } else { streamSpan = null; }