Skip to content

Commit

Permalink
Integrate OpenTelemetry into the proxy
Browse files Browse the repository at this point in the history
OpenCensus is a deprecated protocol and is slated to be removed from upstream collectors soon.

This wires up the proxy to optionally use OpenTelmetry as the format for exported traces. Currently, this defaults to the existing OpenCensus exporter, and we can switch the default later.

[#10111](linkerd/linkerd2#10111)

Signed-off-by: Scott Fleener <[email protected]>
  • Loading branch information
sfleen committed Sep 27, 2024
1 parent 642c2af commit 673f71f
Show file tree
Hide file tree
Showing 24 changed files with 600 additions and 274 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,7 @@ dependencies = [
"linkerd-app-outbound",
"linkerd-error",
"linkerd-opencensus",
"linkerd-opentelemetry",
"linkerd-tonic-stream",
"rangemap",
"regex",
Expand Down Expand Up @@ -1184,6 +1185,7 @@ dependencies = [
"linkerd-meshtls",
"linkerd-metrics",
"linkerd-opencensus",
"linkerd-opentelemetry",
"linkerd-proxy-api-resolve",
"linkerd-proxy-balance",
"linkerd-proxy-client-policy",
Expand Down Expand Up @@ -1743,6 +1745,7 @@ dependencies = [
"http-body",
"linkerd-error",
"linkerd-metrics",
"linkerd-trace-context",
"opencensus-proto",
"tokio",
"tokio-stream",
Expand All @@ -1759,6 +1762,7 @@ dependencies = [
"http-body",
"linkerd-error",
"linkerd-metrics",
"linkerd-trace-context",
"opentelemetry",
"opentelemetry-proto",
"opentelemetry_sdk",
Expand Down
1 change: 1 addition & 0 deletions linkerd/app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ linkerd-app-inbound = { path = "./inbound" }
linkerd-app-outbound = { path = "./outbound" }
linkerd-error = { path = "../error" }
linkerd-opencensus = { path = "../opencensus" }
linkerd-opentelemetry = { path = "../opentelemetry" }
linkerd-tonic-stream = { path = "../tonic-stream" }
rangemap = "1"
regex = "1"
Expand Down
1 change: 1 addition & 0 deletions linkerd/app/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ linkerd-io = { path = "../../io" }
linkerd-meshtls = { path = "../../meshtls", default-features = false }
linkerd-metrics = { path = "../../metrics", features = ["process", "stack"] }
linkerd-opencensus = { path = "../../opencensus" }
linkerd-opentelemetry = { path = "../../opentelemetry" }
linkerd-proxy-api-resolve = { path = "../../proxy/api-resolve" }
linkerd-proxy-balance = { path = "../../proxy/balance" }
linkerd-proxy-core = { path = "../../proxy/core" }
Expand Down
165 changes: 51 additions & 114 deletions linkerd/app/core/src/http_tracing.rs
Original file line number Diff line number Diff line change
@@ -1,139 +1,76 @@
use linkerd_error::Error;
use linkerd_opencensus::proto::trace::v1 as oc;
use linkerd_stack::layer;
use linkerd_trace_context::{self as trace_context, TraceContext};
use std::{collections::HashMap, sync::Arc};
use thiserror::Error;
use linkerd_trace_context::{
self as trace_context,
export::{ExportSpan, SpanKind, SpanLabels},
Span, TraceContext,
};
use std::{str::FromStr, sync::Arc};
use tokio::sync::mpsc;

pub type OpenCensusSink = Option<mpsc::Sender<oc::Span>>;
pub type Labels = Arc<HashMap<String, String>>;

/// SpanConverter converts trace_context::Span objects into OpenCensus agent
/// protobuf span objects. SpanConverter receives trace_context::Span objects by
/// implmenting the SpanSink trait. For each span that it receives, it converts
/// it to an OpenCensus span and then sends it on the provided mpsc::Sender.
#[derive(Clone)]
pub struct SpanConverter {
kind: Kind,
sink: mpsc::Sender<oc::Span>,
labels: Labels,
#[derive(Debug, Copy, Clone, Default)]
pub enum CollectorProtocol {
#[default]
OpenCensus,
OpenTelemetry,
}

#[derive(Debug, Error)]
#[error("ID '{:?} should have {} bytes, but it has {}", self.id, self.expected_size, self.actual_size)]
pub struct IdLengthError {
id: Vec<u8>,
expected_size: usize,
actual_size: usize,
impl FromStr for CollectorProtocol {
type Err = ();

fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.eq_ignore_ascii_case("opencensus") {
Ok(Self::OpenCensus)
} else if s.eq_ignore_ascii_case("opentelemetry") {
Ok(Self::OpenTelemetry)

Check warning on line 25 in linkerd/app/core/src/http_tracing.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/core/src/http_tracing.rs#L21-L25

Added lines #L21 - L25 were not covered by tests
} else {
Err(())

Check warning on line 27 in linkerd/app/core/src/http_tracing.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/core/src/http_tracing.rs#L27

Added line #L27 was not covered by tests
}
}
}

pub type SpanSink = mpsc::Sender<ExportSpan>;

pub fn server<S>(
sink: OpenCensusSink,
labels: impl Into<Labels>,
sink: Option<SpanSink>,
labels: impl Into<SpanLabels>,
) -> impl layer::Layer<S, Service = TraceContext<Option<SpanConverter>, S>> + Clone {
SpanConverter::layer(Kind::Server, sink, labels)
TraceContext::layer(sink.map(move |sink| SpanConverter {
kind: SpanKind::Server,
sink,
labels: labels.into(),

Check warning on line 41 in linkerd/app/core/src/http_tracing.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/core/src/http_tracing.rs#L39-L41

Added lines #L39 - L41 were not covered by tests
}))
}

pub fn client<S>(
sink: OpenCensusSink,
labels: impl Into<Labels>,
sink: Option<SpanSink>,
labels: impl Into<SpanLabels>,
) -> impl layer::Layer<S, Service = TraceContext<Option<SpanConverter>, S>> + Clone {
SpanConverter::layer(Kind::Client, sink, labels)
}

#[derive(Copy, Clone, Debug, PartialEq)]
enum Kind {
Server = 1,
Client = 2,
TraceContext::layer(sink.map(move |sink| SpanConverter {
kind: SpanKind::Client,
sink,
labels: labels.into(),

Check warning on line 52 in linkerd/app/core/src/http_tracing.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/core/src/http_tracing.rs#L50-L52

Added lines #L50 - L52 were not covered by tests
}))
}

impl SpanConverter {
fn layer<S>(
kind: Kind,
sink: OpenCensusSink,
labels: impl Into<Labels>,
) -> impl layer::Layer<S, Service = TraceContext<Option<Self>, S>> + Clone {
TraceContext::layer(sink.map(move |sink| Self {
kind,
sink,
labels: labels.into(),
}))
}

fn mk_span(&self, mut span: trace_context::Span) -> Result<oc::Span, IdLengthError> {
let mut attributes = HashMap::<String, oc::AttributeValue>::new();
for (k, v) in self.labels.iter() {
attributes.insert(
k.clone(),
oc::AttributeValue {
value: Some(oc::attribute_value::Value::StringValue(truncatable(
v.clone(),
))),
},
);
}
for (k, v) in span.labels.drain() {
attributes.insert(
k.to_string(),
oc::AttributeValue {
value: Some(oc::attribute_value::Value::StringValue(truncatable(v))),
},
);
}
Ok(oc::Span {
trace_id: into_bytes(span.trace_id, 16)?,
span_id: into_bytes(span.span_id, 8)?,
tracestate: None,
parent_span_id: into_bytes(span.parent_id, 8)?,
name: Some(truncatable(span.span_name)),
kind: self.kind as i32,
start_time: Some(span.start.into()),
end_time: Some(span.end.into()),
attributes: Some(oc::span::Attributes {
attribute_map: attributes,
dropped_attributes_count: 0,
}),
stack_trace: None,
time_events: None,
links: None,
status: None, // TODO: this is gRPC status; we must read response trailers to populate this
resource: None,
same_process_as_parent_span: Some(self.kind == Kind::Client),
child_span_count: None,
})
}
#[derive(Clone)]
pub struct SpanConverter {
kind: SpanKind,
sink: SpanSink,
labels: SpanLabels,
}

impl trace_context::SpanSink for SpanConverter {
#[inline]
fn is_enabled(&self) -> bool {
true
}

fn try_send(&mut self, span: trace_context::Span) -> Result<(), Error> {
let span = self.mk_span(span)?;
self.sink.try_send(span).map_err(Into::into)
}
}

fn into_bytes(id: trace_context::Id, size: usize) -> Result<Vec<u8>, IdLengthError> {
let bytes: Vec<u8> = id.into();
if bytes.len() == size {
Ok(bytes)
} else {
let actual_size = bytes.len();
Err(IdLengthError {
id: bytes,
expected_size: size,
actual_size,
})
}
}

fn truncatable(value: String) -> oc::TruncatableString {
oc::TruncatableString {
value,
truncated_byte_count: 0,
fn try_send(&mut self, span: Span) -> Result<(), Error> {
self.sink.try_send(ExportSpan {
span,
kind: self.kind,
labels: Arc::clone(&self.labels),

Check warning on line 72 in linkerd/app/core/src/http_tracing.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/core/src/http_tracing.rs#L68-L72

Added lines #L68 - L72 were not covered by tests
})?;
Ok(())

Check warning on line 74 in linkerd/app/core/src/http_tracing.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/core/src/http_tracing.rs#L74

Added line #L74 was not covered by tests
}
}
3 changes: 2 additions & 1 deletion linkerd/app/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub use linkerd_http_metrics as http_metrics;
pub use linkerd_idle_cache as idle_cache;
pub use linkerd_io as io;
pub use linkerd_opencensus as opencensus;
pub use linkerd_opentelemetry as opentelemetry;
pub use linkerd_service_profiles as profiles;
pub use linkerd_stack_metrics as stack_metrics;
pub use linkerd_stack_tracing as stack_tracing;
Expand All @@ -65,7 +66,7 @@ pub struct ProxyRuntime {
pub identity: identity::creds::Receiver,
pub metrics: metrics::Proxy,
pub tap: proxy::tap::Registry,
pub span_sink: http_tracing::OpenCensusSink,
pub span_sink: Option<http_tracing::SpanSink>,
pub drain: drain::Watch,
}

Expand Down
6 changes: 5 additions & 1 deletion linkerd/app/core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
pub use crate::transport::labels::{TargetAddr, TlsAccept};
use crate::{
classify::Class,
control, http_metrics, opencensus, profiles, stack_metrics,
control, http_metrics, opencensus, opentelemetry, profiles, stack_metrics,
svc::Param,
tls,
transport::{self, labels::TlsConnect},
Expand Down Expand Up @@ -39,6 +39,7 @@ pub struct Metrics {
pub proxy: Proxy,
pub control: ControlHttp,
pub opencensus: opencensus::metrics::Registry,
pub opentelemetry: opentelemetry::metrics::Registry,
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -191,11 +192,13 @@ impl Metrics {
};

let (opencensus, opencensus_report) = opencensus::metrics::new();
let (opentelemetry, opentelemetry_report) = opentelemetry::metrics::new();

let metrics = Metrics {
proxy,
control,
opencensus,
opentelemetry,
};

let report = endpoint_report
Expand All @@ -205,6 +208,7 @@ impl Metrics {
.and_report(control_report)
.and_report(transport_report)
.and_report(opencensus_report)
.and_report(opentelemetry_report)
.and_report(stack);

(metrics, report)
Expand Down
9 changes: 4 additions & 5 deletions linkerd/app/inbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ mod server;
#[cfg(any(test, feature = "test-util", fuzzing))]
pub mod test_util;

#[cfg(fuzzing)]
pub use self::http::fuzz as http_fuzz;
pub use self::{metrics::InboundMetrics, policy::DefaultPolicy};
use linkerd_app_core::{
config::{ConnectConfig, ProxyConfig, QueueConfig},
drain,
http_tracing::OpenCensusSink,
http_tracing::SpanSink,
identity, io,
proxy::{tap, tcp},
svc,
Expand All @@ -33,9 +35,6 @@ use std::{fmt::Debug, time::Duration};
use thiserror::Error;
use tracing::debug_span;

#[cfg(fuzzing)]
pub use self::http::fuzz as http_fuzz;

#[derive(Clone, Debug)]
pub struct Config {
pub allow_discovery: NameMatch,
Expand Down Expand Up @@ -67,7 +66,7 @@ struct Runtime {
metrics: InboundMetrics,
identity: identity::creds::Receiver,
tap: tap::Registry,
span_sink: OpenCensusSink,
span_sink: Option<SpanSink>,
drain: drain::Watch,
}

Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/outbound/src/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl<T> Outbound<svc::ArcNewCloneHttp<T>> {
.check_new_service::<T, http::Request<_>>()
.push(ServerRescue::layer(config.emit_headers))
.check_new_service::<T, http::Request<_>>()
// Initiates OpenCensus tracing.
// Initiates OpenTelemetry tracing.
.push_on_service(http_tracing::server(rt.span_sink.clone(), trace_labels()))
.push_on_service(http::BoxResponse::layer())
// Convert origin form HTTP/1 URIs to absolute form for Hyper's
Expand Down
4 changes: 2 additions & 2 deletions linkerd/app/outbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
#![allow(opaque_hidden_inferred_bound)]
#![forbid(unsafe_code)]

use linkerd_app_core::http_tracing::SpanSink;
use linkerd_app_core::{
config::{ProxyConfig, QueueConfig},
drain,
exp_backoff::ExponentialBackoff,
http_tracing::OpenCensusSink,
identity, io,
metrics::prom,
profiles,
Expand Down Expand Up @@ -95,7 +95,7 @@ struct Runtime {
metrics: OutboundMetrics,
identity: identity::NewClient,
tap: tap::Registry,
span_sink: OpenCensusSink,
span_sink: Option<SpanSink>,
drain: drain::Watch,
}

Expand Down
Loading

0 comments on commit 673f71f

Please sign in to comment.