Skip to content

Commit

Permalink
fix(observability): propagate tracing span context in stream sink req…
Browse files Browse the repository at this point in the history
…uest building (#19712)

* fix(observability): propagate tracing span context in stream sink request building

* changelog

* feedback js
  • Loading branch information
neuronull authored Jan 26, 2024
1 parent cacb44f commit 4195071
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
The following metrics now correctly have the `component_kind`, `component_type`, and `component_id` tags:
- `component_errors_total`
- `component_discarded_events_total`

For the following sinks:
- `splunk_hec`
- `clickhouse`
- `loki`
- `redis`
- `azure_blob`
- `azure_monitor_logs`
- `webhdfs`
- `appsignal`
- `amqp`
- `aws_kinesis`
- `statsd`
- `honeycomb`
- `gcp_stackdriver_metrics`
- `gcs_chronicle_unstructured`
- `gcp_stackdriver_logs`
- `gcp_pubsub`
- `gcp_cloud_storage`
- `nats`
- `http`
- `kafka`
- `new_relic`
- `datadog_metrics`
- `datadog_traces`
- `datadog_events`
- `databend`
- `prometheus_remote_write`
- `pulsar`
- `aws_s3`
- `aws_sqs`
- `aws_sns`
- `elasticsearch`
8 changes: 8 additions & 0 deletions src/sinks/util/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::{
use futures_util::{stream::Map, Stream, StreamExt};
use pin_project::pin_project;
use tower::Service;
use tracing::Span;
use vector_lib::stream::{
batcher::{config::BatchConfig, Batcher},
ConcurrentMap, Driver, DriverResponse, ExpirationQueue, PartitionedBatcher,
Expand Down Expand Up @@ -115,10 +116,17 @@ pub trait SinkBuilderExt: Stream {
{
let builder = Arc::new(builder);

// The future passed into the concurrent map is spawned in a tokio thread so we must preserve
// the span context in order to propagate the sink's automatic tags.
let span = Arc::new(Span::current());

self.concurrent_map(limit, move |input| {
let builder = Arc::clone(&builder);
let span = Arc::clone(&span);

Box::pin(async move {
let _entered = span.enter();

// Split the input into metadata and events.
let (metadata, request_metadata_builder, events) = builder.split_input(input);

Expand Down

0 comments on commit 4195071

Please sign in to comment.