From 4195071d984a4d2107a2f5888bca82db0bab4b5c Mon Sep 17 00:00:00 2001 From: neuronull Date: Fri, 26 Jan 2024 13:12:26 -0700 Subject: [PATCH] fix(observability): propagate tracing span context in stream sink request building (#19712) * fix(observability): propagate tracing span context in stream sink request building * changelog * feedback js --- ...ontext_stream_sink_request_building.fix.md | 36 +++++++++++++++++++ src/sinks/util/builder.rs | 8 +++++ 2 files changed, 44 insertions(+) create mode 100644 changelog.d/19712_propagate_tracing_span_context_stream_sink_request_building.fix.md diff --git a/changelog.d/19712_propagate_tracing_span_context_stream_sink_request_building.fix.md b/changelog.d/19712_propagate_tracing_span_context_stream_sink_request_building.fix.md new file mode 100644 index 0000000000000..8bc7a2229da0b --- /dev/null +++ b/changelog.d/19712_propagate_tracing_span_context_stream_sink_request_building.fix.md @@ -0,0 +1,36 @@ +The following metrics now correctly have the `component_kind`, `component_type`, and `component_id` tags: + - `component_errors_total` + - `component_discarded_events_total` + +For the following sinks: + - `splunk_hec` + - `clickhouse` + - `loki` + - `redis` + - `azure_blob` + - `azure_monitor_logs` + - `webhdfs` + - `appsignal` + - `amqp` + - `aws_kinesis` + - `statsd` + - `honeycomb` + - `gcp_stackdriver_metrics` + - `gcs_chronicle_unstructured` + - `gcp_stackdriver_logs` + - `gcp_pubsub` + - `gcp_cloud_storage` + - `nats` + - `http` + - `kafka` + - `new_relic` + - `datadog_metrics` + - `datadog_traces` + - `datadog_events` + - `databend` + - `prometheus_remote_write` + - `pulsar` + - `aws_s3` + - `aws_sqs` + - `aws_sns` + - `elasticsearch` diff --git a/src/sinks/util/builder.rs b/src/sinks/util/builder.rs index bdf98a21df1f4..45c01ce41dcdd 100644 --- a/src/sinks/util/builder.rs +++ b/src/sinks/util/builder.rs @@ -12,6 +12,7 @@ use std::{ use futures_util::{stream::Map, Stream, StreamExt}; use pin_project::pin_project; use tower::Service; +use tracing::Span; use vector_lib::stream::{ batcher::{config::BatchConfig, Batcher}, ConcurrentMap, Driver, DriverResponse, ExpirationQueue, PartitionedBatcher, @@ -115,10 +116,17 @@ pub trait SinkBuilderExt: Stream { { let builder = Arc::new(builder); + // The future passed into the concurrent map is spawned in a tokio thread so we must preserve + // the span context in order to propagate the sink's automatic tags. + let span = Arc::new(Span::current()); + self.concurrent_map(limit, move |input| { let builder = Arc::clone(&builder); + let span = Arc::clone(&span); Box::pin(async move { + let _entered = span.enter(); + // Split the input into metadata and events. let (metadata, request_metadata_builder, events) = builder.split_input(input);