From 8bb03cb3ef7ab50e01186b40357715d903e683ca Mon Sep 17 00:00:00 2001 From: Scott Fleener Date: Mon, 23 Sep 2024 20:14:19 +0000 Subject: [PATCH] Integrate OpenTelemetry into the proxy OpenCensus is a deprecated protocol and is slated to be removed from upstream collectors soon. This wires up the proxy to optionally use OpenTelmetry as the format for exported traces. Currently, this defaults to the existing OpenCensus exporter, and we can switch the default later. [#10111](linkerd/linkerd2#10111) Signed-off-by: Scott Fleener --- Cargo.lock | 4 + linkerd/app/Cargo.toml | 1 + linkerd/app/core/Cargo.toml | 1 + linkerd/app/core/src/http_tracing.rs | 162 ++++++------------ linkerd/app/core/src/lib.rs | 3 +- linkerd/app/core/src/metrics.rs | 6 +- linkerd/app/inbound/src/lib.rs | 12 +- linkerd/app/outbound/src/http/server.rs | 2 +- linkerd/app/outbound/src/lib.rs | 4 +- linkerd/app/src/env.rs | 28 ++- .../app/src/env/{opencensus.rs => trace.rs} | 0 linkerd/app/src/lib.rs | 50 +++--- linkerd/app/src/oc_collector.rs | 103 ----------- linkerd/app/src/trace_collector.rs | 120 +++++++++++++ .../app/src/trace_collector/oc_collector.rs | 63 +++++++ .../app/src/trace_collector/otel_collector.rs | 118 +++++++++++++ linkerd/opencensus/Cargo.toml | 1 + linkerd/opencensus/src/lib.rs | 75 +++++++- linkerd/opentelemetry/Cargo.toml | 1 + linkerd/opentelemetry/src/lib.rs | 57 +++++- linkerd/trace-context/src/export.rs | 18 ++ linkerd/trace-context/src/lib.rs | 22 +++ linkerd2-proxy/src/main.rs | 11 +- tools/src/bin/gen-protos.rs | 41 ++++- 24 files changed, 626 insertions(+), 277 deletions(-) rename linkerd/app/src/env/{opencensus.rs => trace.rs} (100%) delete mode 100644 linkerd/app/src/oc_collector.rs create mode 100644 linkerd/app/src/trace_collector.rs create mode 100644 linkerd/app/src/trace_collector/oc_collector.rs create mode 100644 linkerd/app/src/trace_collector/otel_collector.rs create mode 100644 linkerd/trace-context/src/export.rs diff --git a/Cargo.lock b/Cargo.lock index bce8c03238..60fb24d2ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1126,6 +1126,7 @@ dependencies = [ "linkerd-app-outbound", "linkerd-error", "linkerd-opencensus", + "linkerd-opentelemetry", "linkerd-tonic-stream", "rangemap", "regex", @@ -1184,6 +1185,7 @@ dependencies = [ "linkerd-meshtls", "linkerd-metrics", "linkerd-opencensus", + "linkerd-opentelemetry", "linkerd-proxy-api-resolve", "linkerd-proxy-balance", "linkerd-proxy-client-policy", @@ -1743,6 +1745,7 @@ dependencies = [ "http-body", "linkerd-error", "linkerd-metrics", + "linkerd-trace-context", "opencensus-proto", "tokio", "tokio-stream", @@ -1759,6 +1762,7 @@ dependencies = [ "http-body", "linkerd-error", "linkerd-metrics", + "linkerd-trace-context", "opentelemetry", "opentelemetry-proto", "opentelemetry_sdk", diff --git a/linkerd/app/Cargo.toml b/linkerd/app/Cargo.toml index af443530a3..1420c6edfe 100644 --- a/linkerd/app/Cargo.toml +++ b/linkerd/app/Cargo.toml @@ -25,6 +25,7 @@ linkerd-app-inbound = { path = "./inbound" } linkerd-app-outbound = { path = "./outbound" } linkerd-error = { path = "../error" } linkerd-opencensus = { path = "../opencensus" } +linkerd-opentelemetry = { path = "../opentelemetry" } linkerd-tonic-stream = { path = "../tonic-stream" } rangemap = "1" regex = "1" diff --git a/linkerd/app/core/Cargo.toml b/linkerd/app/core/Cargo.toml index 43e19a2816..fff82289f7 100644 --- a/linkerd/app/core/Cargo.toml +++ b/linkerd/app/core/Cargo.toml @@ -47,6 +47,7 @@ linkerd-io = { path = "../../io" } linkerd-meshtls = { path = "../../meshtls", default-features = false } linkerd-metrics = { path = "../../metrics", features = ["process", "stack"] } linkerd-opencensus = { path = "../../opencensus" } +linkerd-opentelemetry = { path = "../../opentelemetry" } linkerd-proxy-api-resolve = { path = "../../proxy/api-resolve" } linkerd-proxy-balance = { path = "../../proxy/balance" } linkerd-proxy-core = { path = "../../proxy/core" } diff --git a/linkerd/app/core/src/http_tracing.rs b/linkerd/app/core/src/http_tracing.rs index 6b24c599e3..b1d61080ee 100644 --- a/linkerd/app/core/src/http_tracing.rs +++ b/linkerd/app/core/src/http_tracing.rs @@ -1,139 +1,73 @@ use linkerd_error::Error; -use linkerd_opencensus::proto::trace::v1 as oc; use linkerd_stack::layer; -use linkerd_trace_context::{self as trace_context, TraceContext}; -use std::{collections::HashMap, sync::Arc}; -use thiserror::Error; +use linkerd_trace_context as trace_context; +use linkerd_trace_context::export::{ExportSpan, SpanKind, SpanLabels}; +use linkerd_trace_context::{Span, TraceContext}; +use std::str::FromStr; +use std::sync::Arc; use tokio::sync::mpsc; -pub type OpenCensusSink = Option>; -pub type Labels = Arc>; - -/// SpanConverter converts trace_context::Span objects into OpenCensus agent -/// protobuf span objects. SpanConverter receives trace_context::Span objects by -/// implmenting the SpanSink trait. For each span that it receives, it converts -/// it to an OpenCensus span and then sends it on the provided mpsc::Sender. -#[derive(Clone)] -pub struct SpanConverter { - kind: Kind, - sink: mpsc::Sender, - labels: Labels, +#[derive(Debug, Copy, Clone, Default)] +pub enum CollectorProtocol { + #[default] + OpenCensus, + OpenTelemetry, } -#[derive(Debug, Error)] -#[error("ID '{:?} should have {} bytes, but it has {}", self.id, self.expected_size, self.actual_size)] -pub struct IdLengthError { - id: Vec, - expected_size: usize, - actual_size: usize, +impl FromStr for CollectorProtocol { + type Err = (); + + fn from_str(s: &str) -> Result { + match s { + "opencensus" => Ok(CollectorProtocol::OpenCensus), + "opentelemetry" => Ok(CollectorProtocol::OpenTelemetry), + _ => Err(()), + } + } } +pub type SpanSink = mpsc::Sender; + pub fn server( - sink: OpenCensusSink, - labels: impl Into, + sink: Option, + labels: impl Into, ) -> impl layer::Layer, S>> + Clone { - SpanConverter::layer(Kind::Server, sink, labels) + TraceContext::layer(sink.map(move |sink| SpanConverter { + kind: SpanKind::Server, + sink, + labels: labels.into(), + })) } pub fn client( - sink: OpenCensusSink, - labels: impl Into, + sink: Option, + labels: impl Into, ) -> impl layer::Layer, S>> + Clone { - SpanConverter::layer(Kind::Client, sink, labels) -} - -#[derive(Copy, Clone, Debug, PartialEq)] -enum Kind { - Server = 1, - Client = 2, + TraceContext::layer(sink.map(move |sink| SpanConverter { + kind: SpanKind::Client, + sink, + labels: labels.into(), + })) } -impl SpanConverter { - fn layer( - kind: Kind, - sink: OpenCensusSink, - labels: impl Into, - ) -> impl layer::Layer, S>> + Clone { - TraceContext::layer(sink.map(move |sink| Self { - kind, - sink, - labels: labels.into(), - })) - } - - fn mk_span(&self, mut span: trace_context::Span) -> Result { - let mut attributes = HashMap::::new(); - for (k, v) in self.labels.iter() { - attributes.insert( - k.clone(), - oc::AttributeValue { - value: Some(oc::attribute_value::Value::StringValue(truncatable( - v.clone(), - ))), - }, - ); - } - for (k, v) in span.labels.drain() { - attributes.insert( - k.to_string(), - oc::AttributeValue { - value: Some(oc::attribute_value::Value::StringValue(truncatable(v))), - }, - ); - } - Ok(oc::Span { - trace_id: into_bytes(span.trace_id, 16)?, - span_id: into_bytes(span.span_id, 8)?, - tracestate: None, - parent_span_id: into_bytes(span.parent_id, 8)?, - name: Some(truncatable(span.span_name)), - kind: self.kind as i32, - start_time: Some(span.start.into()), - end_time: Some(span.end.into()), - attributes: Some(oc::span::Attributes { - attribute_map: attributes, - dropped_attributes_count: 0, - }), - stack_trace: None, - time_events: None, - links: None, - status: None, // TODO: this is gRPC status; we must read response trailers to populate this - resource: None, - same_process_as_parent_span: Some(self.kind == Kind::Client), - child_span_count: None, - }) - } +#[derive(Clone)] +pub struct SpanConverter { + kind: SpanKind, + sink: SpanSink, + labels: SpanLabels, } impl trace_context::SpanSink for SpanConverter { - #[inline] fn is_enabled(&self) -> bool { true } - fn try_send(&mut self, span: trace_context::Span) -> Result<(), Error> { - let span = self.mk_span(span)?; - self.sink.try_send(span).map_err(Into::into) - } -} - -fn into_bytes(id: trace_context::Id, size: usize) -> Result, IdLengthError> { - let bytes: Vec = id.into(); - if bytes.len() == size { - Ok(bytes) - } else { - let actual_size = bytes.len(); - Err(IdLengthError { - id: bytes, - expected_size: size, - actual_size, - }) - } -} - -fn truncatable(value: String) -> oc::TruncatableString { - oc::TruncatableString { - value, - truncated_byte_count: 0, + fn try_send(&mut self, span: Span) -> Result<(), Error> { + self.sink.try_send(ExportSpan { + span, + kind: self.kind, + labels: Arc::clone(&self.labels), + })?; + Ok(()) } } diff --git a/linkerd/app/core/src/lib.rs b/linkerd/app/core/src/lib.rs index fc98a3b621..8d3a433791 100644 --- a/linkerd/app/core/src/lib.rs +++ b/linkerd/app/core/src/lib.rs @@ -40,6 +40,7 @@ pub use linkerd_http_metrics as http_metrics; pub use linkerd_idle_cache as idle_cache; pub use linkerd_io as io; pub use linkerd_opencensus as opencensus; +pub use linkerd_opentelemetry as opentelemetry; pub use linkerd_service_profiles as profiles; pub use linkerd_stack_metrics as stack_metrics; pub use linkerd_stack_tracing as stack_tracing; @@ -65,7 +66,7 @@ pub struct ProxyRuntime { pub identity: identity::creds::Receiver, pub metrics: metrics::Proxy, pub tap: proxy::tap::Registry, - pub span_sink: http_tracing::OpenCensusSink, + pub span_sink: Option, pub drain: drain::Watch, } diff --git a/linkerd/app/core/src/metrics.rs b/linkerd/app/core/src/metrics.rs index 3fd374031b..084403e365 100644 --- a/linkerd/app/core/src/metrics.rs +++ b/linkerd/app/core/src/metrics.rs @@ -9,7 +9,7 @@ pub use crate::transport::labels::{TargetAddr, TlsAccept}; use crate::{ classify::Class, - control, http_metrics, opencensus, profiles, stack_metrics, + control, http_metrics, opencensus, opentelemetry, profiles, stack_metrics, svc::Param, tls, transport::{self, labels::TlsConnect}, @@ -39,6 +39,7 @@ pub struct Metrics { pub proxy: Proxy, pub control: ControlHttp, pub opencensus: opencensus::metrics::Registry, + pub opentelemetry: opentelemetry::metrics::Registry, } #[derive(Clone, Debug)] @@ -191,11 +192,13 @@ impl Metrics { }; let (opencensus, opencensus_report) = opencensus::metrics::new(); + let (opentelemetry, opentelemetry_report) = opentelemetry::metrics::new(); let metrics = Metrics { proxy, control, opencensus, + opentelemetry, }; let report = endpoint_report @@ -205,6 +208,7 @@ impl Metrics { .and_report(control_report) .and_report(transport_report) .and_report(opencensus_report) + .and_report(opentelemetry_report) .and_report(stack); (metrics, report) diff --git a/linkerd/app/inbound/src/lib.rs b/linkerd/app/inbound/src/lib.rs index 88ec18afb8..5074979d0c 100644 --- a/linkerd/app/inbound/src/lib.rs +++ b/linkerd/app/inbound/src/lib.rs @@ -18,12 +18,13 @@ mod server; #[cfg(any(test, feature = "test-util", fuzzing))] pub mod test_util; +#[cfg(fuzzing)] +pub use self::http::fuzz as http_fuzz; pub use self::{metrics::InboundMetrics, policy::DefaultPolicy}; +use linkerd_app_core::http_tracing::SpanSink; use linkerd_app_core::{ config::{ConnectConfig, ProxyConfig, QueueConfig}, - drain, - http_tracing::OpenCensusSink, - identity, io, + drain, identity, io, proxy::{tap, tcp}, svc, transport::{self, Remote, ServerAddr}, @@ -33,9 +34,6 @@ use std::{fmt::Debug, time::Duration}; use thiserror::Error; use tracing::debug_span; -#[cfg(fuzzing)] -pub use self::http::fuzz as http_fuzz; - #[derive(Clone, Debug)] pub struct Config { pub allow_discovery: NameMatch, @@ -67,7 +65,7 @@ struct Runtime { metrics: InboundMetrics, identity: identity::creds::Receiver, tap: tap::Registry, - span_sink: OpenCensusSink, + span_sink: Option, drain: drain::Watch, } diff --git a/linkerd/app/outbound/src/http/server.rs b/linkerd/app/outbound/src/http/server.rs index dc8f77cb1c..0fb34ded58 100644 --- a/linkerd/app/outbound/src/http/server.rs +++ b/linkerd/app/outbound/src/http/server.rs @@ -47,7 +47,7 @@ impl Outbound> { .check_new_service::>() .push(ServerRescue::layer(config.emit_headers)) .check_new_service::>() - // Initiates OpenCensus tracing. + // Initiates OpenTelemetry tracing. .push_on_service(http_tracing::server(rt.span_sink.clone(), trace_labels())) .push_on_service(http::BoxResponse::layer()) // Convert origin form HTTP/1 URIs to absolute form for Hyper's diff --git a/linkerd/app/outbound/src/lib.rs b/linkerd/app/outbound/src/lib.rs index 75ee6c70d5..2cc8fccecc 100644 --- a/linkerd/app/outbound/src/lib.rs +++ b/linkerd/app/outbound/src/lib.rs @@ -6,11 +6,11 @@ #![allow(opaque_hidden_inferred_bound)] #![forbid(unsafe_code)] +use linkerd_app_core::http_tracing::SpanSink; use linkerd_app_core::{ config::{ProxyConfig, QueueConfig}, drain, exp_backoff::ExponentialBackoff, - http_tracing::OpenCensusSink, identity, io, metrics::prom, profiles, @@ -95,7 +95,7 @@ struct Runtime { metrics: OutboundMetrics, identity: identity::NewClient, tap: tap::Registry, - span_sink: OpenCensusSink, + span_sink: Option, drain: drain::Watch, } diff --git a/linkerd/app/src/env.rs b/linkerd/app/src/env.rs index 49e00a88ba..de526955ed 100644 --- a/linkerd/app/src/env.rs +++ b/linkerd/app/src/env.rs @@ -1,4 +1,5 @@ -use crate::{dns, gateway, identity, inbound, oc_collector, outbound, policy, spire}; +use crate::{dns, gateway, identity, inbound, outbound, policy, spire, trace_collector}; +use linkerd_app_core::http_tracing::CollectorProtocol; use linkerd_app_core::{ addr, config::*, @@ -19,7 +20,7 @@ use tracing::{debug, error, info, warn}; mod control; mod http2; -mod opencensus; +mod trace; mod types; use self::types::*; @@ -146,6 +147,7 @@ const ENV_OUTBOUND_DISABLE_INFORMATIONAL_HEADERS: &str = "LINKERD2_PROXY_OUTBOUND_DISABLE_INFORMATIONAL_HEADERS"; pub const ENV_TRACE_ATTRIBUTES_PATH: &str = "LINKERD2_PROXY_TRACE_ATTRIBUTES_PATH"; +pub const ENV_TRACE_PROTOCOL: &str = "LINKERD2_PROXY_TRACE_PROTOCOL"; /// Constrains which destination names may be used for profile/route discovery. /// @@ -428,7 +430,8 @@ pub fn parse_config(strings: &S) -> Result let hostname = strings.get(ENV_HOSTNAME); - let oc_attributes_file_path = strings.get(ENV_TRACE_ATTRIBUTES_PATH); + let trace_attributes_file_path = strings.get(ENV_TRACE_ATTRIBUTES_PATH); + let trace_protocol = strings.get(ENV_TRACE_PROTOCOL); let trace_collector_addr = parse_control_addr(strings, ENV_TRACE_COLLECTOR_SVC_BASE); @@ -813,8 +816,8 @@ pub fn parse_config(strings: &S) -> Result .into(), }; - let oc_collector = match trace_collector_addr? { - None => oc_collector::Config::Disabled, + let trace_collector = match trace_collector_addr? { + None => trace_collector::Config::Disabled, Some(addr) => { let connect = if addr.addr.is_loopback() { inbound.proxy.connect.clone() @@ -826,14 +829,20 @@ pub fn parse_config(strings: &S) -> Result } else { outbound.http_request_queue.failfast_timeout }; - let attributes = oc_attributes_file_path + let attributes = trace_attributes_file_path .map(|path| match path.and_then(|p| p.parse::().ok()) { - Some(path) => opencensus::read_trace_attributes(&path), + Some(path) => trace::read_trace_attributes(&path), None => HashMap::new(), }) .unwrap_or_default(); - oc_collector::Config::Enabled(Box::new(oc_collector::EnabledConfig { + let trace_protocol = trace_protocol + .map(|proto| proto.and_then(|p| p.parse::().ok())) + .ok() + .flatten() + .unwrap_or_default(); + + trace_collector::Config::Enabled(Box::new(trace_collector::EnabledConfig { attributes, hostname: hostname?, control: ControlConfig { @@ -844,6 +853,7 @@ pub fn parse_config(strings: &S) -> Result failfast_timeout, }, }, + kind: trace_protocol, })) } }; @@ -923,7 +933,7 @@ pub fn parse_config(strings: &S) -> Result dns, dst, tap, - oc_collector, + trace_collector, policy, identity, outbound, diff --git a/linkerd/app/src/env/opencensus.rs b/linkerd/app/src/env/trace.rs similarity index 100% rename from linkerd/app/src/env/opencensus.rs rename to linkerd/app/src/env/trace.rs diff --git a/linkerd/app/src/lib.rs b/linkerd/app/src/lib.rs index aa73893851..70e6639e86 100644 --- a/linkerd/app/src/lib.rs +++ b/linkerd/app/src/lib.rs @@ -7,10 +7,10 @@ pub mod dst; pub mod env; pub mod identity; -pub mod oc_collector; pub mod policy; pub mod spire; pub mod tap; +pub mod trace_collector; pub use self::metrics::Metrics; use futures::{future, Future, FutureExt}; @@ -60,7 +60,7 @@ pub struct Config { pub policy: policy::Config, pub admin: admin::Config, pub tap: tap::Config, - pub oc_collector: oc_collector::Config, + pub trace_collector: trace_collector::Config, /// Grace period for graceful shutdowns. /// @@ -75,7 +75,7 @@ pub struct App { dst: ControlAddr, identity: identity::Identity, inbound_addr: Local, - oc_collector: oc_collector::OcCollector, + trace_collector: trace_collector::TraceCollector, outbound_addr: Local, outbound_addr_additional: Option>, start_proxy: Pin + Send + 'static>>, @@ -123,7 +123,7 @@ impl Config { policy, identity, inbound, - oc_collector, + trace_collector, outbound, gateway, tap, @@ -188,16 +188,25 @@ impl Config { }) }?; - debug!(config = ?oc_collector, "Building client"); - let oc_collector = { - let control_metrics = - ControlMetrics::register(registry.sub_registry_with_prefix("opencensus")); + debug!(config = ?trace_collector, "Building client"); + let trace_collector = { + let control_metrics = ControlMetrics::register( + registry.sub_registry_with_prefix(trace_collector.metrics_prefix()), + ); let identity = identity.receiver().new_client(); let dns = dns.resolver; let client_metrics = metrics.control.clone(); - let metrics = metrics.opencensus; - info_span!("opencensus").in_scope(|| { - oc_collector.build(identity, dns, metrics, control_metrics, client_metrics) + let otel_metrics = metrics.opentelemetry; + let oc_metrics = metrics.opencensus; + trace_collector.info_span().in_scope(|| { + trace_collector.build( + identity, + dns, + oc_metrics, + otel_metrics, + control_metrics, + client_metrics, + ) }) }?; @@ -205,7 +214,7 @@ impl Config { identity: identity.receiver(), metrics: metrics.proxy, tap: tap.registry(), - span_sink: oc_collector.span_sink(), + span_sink: trace_collector.span_sink(), drain: drain_rx.clone(), }; let inbound = Inbound::new(inbound, runtime.clone()); @@ -309,7 +318,7 @@ impl Config { drain: drain_tx, identity, inbound_addr, - oc_collector, + trace_collector, outbound_addr, outbound_addr_additional, start_proxy, @@ -369,10 +378,10 @@ impl App { self.identity.receiver().local_id().clone() } - pub fn opencensus_addr(&self) -> Option<&ControlAddr> { - match self.oc_collector { - oc_collector::OcCollector::Disabled { .. } => None, - oc_collector::OcCollector::Enabled(ref oc) => Some(&oc.addr), + pub fn tracing_addr(&self) -> Option<&ControlAddr> { + match self.trace_collector { + trace_collector::TraceCollector::Disabled { .. } => None, + crate::trace_collector::TraceCollector::Enabled(ref oc) => Some(&oc.addr), } } @@ -381,7 +390,7 @@ impl App { admin, drain, identity, - oc_collector, + trace_collector: collector, start_proxy, tap, .. @@ -446,8 +455,9 @@ impl App { tokio::spawn(serve.instrument(info_span!("tap").or_current())); } - if let oc_collector::OcCollector::Enabled(oc) = oc_collector { - tokio::spawn(oc.task.instrument(info_span!("opencensus").or_current())); + if let trace_collector::TraceCollector::Enabled(collector) = collector { + let span = collector.info_span().or_current(); + tokio::spawn(collector.task.instrument(span)); } // we don't care if the admin shutdown channel is diff --git a/linkerd/app/src/oc_collector.rs b/linkerd/app/src/oc_collector.rs deleted file mode 100644 index e8f25e23c5..0000000000 --- a/linkerd/app/src/oc_collector.rs +++ /dev/null @@ -1,103 +0,0 @@ -use linkerd_app_core::{ - control, dns, identity, metrics::ControlHttp as HttpMetrics, svc::NewService, Error, -}; -use linkerd_opencensus::{self as opencensus, metrics, proto}; -use std::{collections::HashMap, future::Future, pin::Pin, time::SystemTime}; -use tokio::sync::mpsc; -use tokio_stream::wrappers::ReceiverStream; -use tracing::Instrument; - -#[derive(Clone, Debug)] -pub enum Config { - Disabled, - Enabled(Box), -} - -#[derive(Clone, Debug)] -pub struct EnabledConfig { - pub control: control::Config, - pub attributes: HashMap, - pub hostname: Option, -} - -pub type Task = Pin + Send + 'static>>; - -pub type SpanSink = mpsc::Sender; - -pub enum OcCollector { - Disabled, - Enabled(Box), -} - -pub struct EnabledCollector { - pub addr: control::ControlAddr, - pub span_sink: SpanSink, - pub task: Task, -} - -impl Config { - const SPAN_BUFFER_CAPACITY: usize = 100; - const SERVICE_NAME: &'static str = "linkerd-proxy"; - - pub fn build( - self, - identity: identity::NewClient, - dns: dns::Resolver, - legacy_metrics: metrics::Registry, - control_metrics: control::Metrics, - client_metrics: HttpMetrics, - ) -> Result { - match self { - Config::Disabled => Ok(OcCollector::Disabled), - Config::Enabled(inner) => { - let addr = inner.control.addr.clone(); - let svc = inner - .control - .build(dns, client_metrics, control_metrics, identity) - .new_service(()); - - let (span_sink, spans_rx) = mpsc::channel(Self::SPAN_BUFFER_CAPACITY); - let spans_rx = ReceiverStream::new(spans_rx); - - let task = { - use self::proto::agent::common::v1 as oc; - - let node = oc::Node { - identifier: Some(oc::ProcessIdentifier { - host_name: inner.hostname.unwrap_or_default(), - pid: std::process::id(), - start_timestamp: Some(SystemTime::now().into()), - }), - service_info: Some(oc::ServiceInfo { - name: Self::SERVICE_NAME.to_string(), - }), - attributes: inner.attributes, - ..oc::Node::default() - }; - - let addr = addr.clone(); - Box::pin( - opencensus::export_spans(svc, node, spans_rx, legacy_metrics).instrument( - tracing::debug_span!("opencensus", peer.addr = %addr).or_current(), - ), - ) - }; - - Ok(OcCollector::Enabled(Box::new(EnabledCollector { - addr, - task, - span_sink, - }))) - } - } - } -} - -impl OcCollector { - pub fn span_sink(&self) -> Option { - match self { - OcCollector::Disabled => None, - OcCollector::Enabled(inner) => Some(inner.span_sink.clone()), - } - } -} diff --git a/linkerd/app/src/trace_collector.rs b/linkerd/app/src/trace_collector.rs new file mode 100644 index 0000000000..1f7462ee00 --- /dev/null +++ b/linkerd/app/src/trace_collector.rs @@ -0,0 +1,120 @@ +use linkerd_app_core::http_tracing::{CollectorProtocol, SpanSink}; +use linkerd_app_core::metrics::ControlHttp as HttpMetrics; +use linkerd_app_core::svc::NewService; +use linkerd_app_core::{control, dns, identity, opencensus, opentelemetry}; +use linkerd_error::Error; +use std::collections::HashMap; +use std::future::Future; +use std::pin::Pin; +use tracing::info_span; + +pub mod oc_collector; +pub mod otel_collector; + +#[derive(Clone, Debug)] +pub enum Config { + Disabled, + Enabled(Box), +} + +#[derive(Clone, Debug)] +pub struct EnabledConfig { + pub control: control::Config, + pub attributes: HashMap, + pub hostname: Option, + pub kind: CollectorProtocol, +} + +pub type Task = Pin + Send + 'static>>; + +pub enum TraceCollector { + Disabled, + Enabled(Box), +} + +pub struct EnabledCollector { + pub addr: control::ControlAddr, + pub kind: CollectorProtocol, + pub span_sink: SpanSink, + pub task: Task, +} + +impl EnabledCollector { + pub fn info_span(&self) -> tracing::Span { + match self.kind { + CollectorProtocol::OpenCensus => info_span!("opencensus"), + CollectorProtocol::OpenTelemetry => info_span!("opentelemetry"), + } + } +} + +impl TraceCollector { + pub fn span_sink(&self) -> Option { + match self { + TraceCollector::Disabled => None, + TraceCollector::Enabled(inner) => Some(inner.span_sink.clone()), + } + } +} + +impl Config { + pub fn info_span(&self) -> tracing::Span { + match self { + Config::Disabled => info_span!("disabled_tracing"), + Config::Enabled(config) => match config.kind { + CollectorProtocol::OpenCensus => info_span!("opencensus"), + CollectorProtocol::OpenTelemetry => info_span!("opentelemetry"), + }, + } + } + + pub fn metrics_prefix(&self) -> &'static str { + match self { + Config::Disabled => "disabled_tracing", + Config::Enabled(config) => match config.kind { + CollectorProtocol::OpenCensus => "opencensus", + CollectorProtocol::OpenTelemetry => "opentelemetry", + }, + } + } + + pub fn build( + self, + identity: identity::NewClient, + dns: dns::Resolver, + legacy_oc_metrics: opencensus::metrics::Registry, + legacy_otel_metrics: opentelemetry::metrics::Registry, + control_metrics: control::Metrics, + client_metrics: HttpMetrics, + ) -> Result { + match self { + Config::Disabled => Ok(TraceCollector::Disabled), + Config::Enabled(inner) => { + let addr = inner.control.addr.clone(); + let svc = inner + .control + .build(dns, client_metrics, control_metrics, identity) + .new_service(()); + + let collector = match inner.kind { + CollectorProtocol::OpenCensus => oc_collector::create_collector( + addr.clone(), + inner.hostname, + inner.attributes, + svc, + legacy_oc_metrics, + ), + CollectorProtocol::OpenTelemetry => otel_collector::create_collector( + addr.clone(), + inner.hostname, + inner.attributes, + svc, + legacy_otel_metrics, + ), + }; + + Ok(TraceCollector::Enabled(Box::new(collector))) + } + } + } +} diff --git a/linkerd/app/src/trace_collector/oc_collector.rs b/linkerd/app/src/trace_collector/oc_collector.rs new file mode 100644 index 0000000000..ee2fb7e0e7 --- /dev/null +++ b/linkerd/app/src/trace_collector/oc_collector.rs @@ -0,0 +1,63 @@ +use crate::trace_collector::EnabledCollector; +use linkerd_app_core::control::ControlAddr; +use linkerd_app_core::http_tracing::CollectorProtocol; +use linkerd_app_core::proxy::http::HttpBody; +use linkerd_app_core::Error; +use linkerd_opencensus::{self as opencensus, metrics, proto}; +use std::{collections::HashMap, time::SystemTime}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tonic::body::BoxBody; +use tonic::client::GrpcService; +use tracing::Instrument; + +const SPAN_BUFFER_CAPACITY: usize = 100; +const SERVICE_NAME: &str = "linkerd-proxy"; + +pub(super) fn create_collector( + addr: ControlAddr, + hostname: Option, + attributes: HashMap, + svc: S, + legacy_metrics: metrics::Registry, +) -> EnabledCollector +where + S: GrpcService + Clone + Send + 'static, + S::Error: Into, + S::Future: Send, + S::ResponseBody: Default + HttpBody + Send + 'static, + ::Error: Into + Send, +{ + let (span_sink, spans_rx) = mpsc::channel(SPAN_BUFFER_CAPACITY); + let spans_rx = ReceiverStream::new(spans_rx); + + let task = { + use self::proto::agent::common::v1 as oc; + + let node = oc::Node { + identifier: Some(oc::ProcessIdentifier { + host_name: hostname.unwrap_or_default(), + pid: std::process::id(), + start_timestamp: Some(SystemTime::now().into()), + }), + service_info: Some(oc::ServiceInfo { + name: SERVICE_NAME.to_string(), + }), + attributes, + ..oc::Node::default() + }; + + let addr = addr.clone(); + Box::pin( + opencensus::export_spans(svc, node, spans_rx, legacy_metrics) + .instrument(tracing::debug_span!("opencensus", peer.addr = %addr).or_current()), + ) + }; + + EnabledCollector { + addr, + task, + span_sink, + kind: CollectorProtocol::OpenCensus, + } +} diff --git a/linkerd/app/src/trace_collector/otel_collector.rs b/linkerd/app/src/trace_collector/otel_collector.rs new file mode 100644 index 0000000000..daa6fa1f72 --- /dev/null +++ b/linkerd/app/src/trace_collector/otel_collector.rs @@ -0,0 +1,118 @@ +use super::EnabledCollector; +use linkerd_app_core::control::ControlAddr; +use linkerd_app_core::http_tracing::CollectorProtocol; +use linkerd_app_core::proxy::http::HttpBody; +use linkerd_app_core::Error; +use linkerd_opentelemetry::{ + self as opentelemetry, metrics, + proto::proto::common::v1::{any_value, AnyValue, KeyValue}, + proto::transform::common::ResourceAttributesWithSchema, +}; +use std::time::UNIX_EPOCH; +use std::{collections::HashMap, time::SystemTime}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tonic::body::BoxBody; +use tonic::client::GrpcService; +use tracing::Instrument; + +const SPAN_BUFFER_CAPACITY: usize = 100; +const SERVICE_NAME: &str = "linkerd-proxy"; + +pub(super) fn create_collector( + addr: ControlAddr, + hostname: Option, + attributes: HashMap, + svc: S, + legacy_metrics: metrics::Registry, +) -> EnabledCollector +where + S: GrpcService + Clone + Send + 'static, + S::Error: Into, + S::Future: Send, + S::ResponseBody: Default + HttpBody + Send + 'static, + ::Error: Into + Send, +{ + let (span_sink, spans_rx) = mpsc::channel(SPAN_BUFFER_CAPACITY); + let spans_rx = ReceiverStream::new(spans_rx); + + let mut resources = ResourceAttributesWithSchema::default(); + + resources + .attributes + .0 + .push(SERVICE_NAME.with_key("service.name")); + resources + .attributes + .0 + .push((std::process::id() as i64).with_key("process.pid")); + + resources.attributes.0.push( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs() as i64) + .unwrap_or_else(|e| -(e.duration().as_secs() as i64)) + .with_key("process.start_timestamp"), + ); + resources + .attributes + .0 + .push(hostname.unwrap_or_default().with_key("host.name")); + + resources.attributes.0.extend( + attributes + .into_iter() + .map(|(key, value)| value.with_key(&key)), + ); + + let addr = addr.clone(); + let task = Box::pin( + opentelemetry::export_spans(svc, spans_rx, resources, legacy_metrics) + .instrument(tracing::debug_span!("opentelemetry", peer.addr = %addr).or_current()), + ); + + EnabledCollector { + addr, + task, + span_sink, + kind: CollectorProtocol::OpenTelemetry, + } +} + +trait IntoAnyValue +where + Self: Sized, +{ + fn into_any_value(self) -> AnyValue; + + fn with_key(self, key: &str) -> KeyValue { + KeyValue { + key: key.to_string(), + value: Some(self.into_any_value()), + } + } +} + +impl IntoAnyValue for String { + fn into_any_value(self) -> AnyValue { + AnyValue { + value: Some(any_value::Value::StringValue(self)), + } + } +} + +impl IntoAnyValue for &str { + fn into_any_value(self) -> AnyValue { + AnyValue { + value: Some(any_value::Value::StringValue(self.to_string())), + } + } +} + +impl IntoAnyValue for i64 { + fn into_any_value(self) -> AnyValue { + AnyValue { + value: Some(any_value::Value::IntValue(self)), + } + } +} diff --git a/linkerd/opencensus/Cargo.toml b/linkerd/opencensus/Cargo.toml index 65d1b45eed..87699ec356 100644 --- a/linkerd/opencensus/Cargo.toml +++ b/linkerd/opencensus/Cargo.toml @@ -12,6 +12,7 @@ http = "0.2" http-body = "0.4" linkerd-error = { path = "../error" } linkerd-metrics = { path = "../metrics" } +linkerd-trace-context = { path = "../trace-context" } opencensus-proto = { path = "../../opencensus-proto" } tonic = { version = "0.10", default-features = false, features = [ "prost", diff --git a/linkerd/opencensus/src/lib.rs b/linkerd/opencensus/src/lib.rs index dd7d0a156c..65fbf2c4b0 100644 --- a/linkerd/opencensus/src/lib.rs +++ b/linkerd/opencensus/src/lib.rs @@ -6,17 +6,19 @@ pub mod metrics; use futures::stream::{Stream, StreamExt}; use http_body::Body as HttpBody; use linkerd_error::Error; +use linkerd_trace_context::export::{ExportSpan, SpanKind}; use metrics::Registry; pub use opencensus_proto as proto; use opencensus_proto::agent::common::v1::Node; use opencensus_proto::agent::trace::v1::{ trace_service_client::TraceServiceClient, ExportTraceServiceRequest, }; -use opencensus_proto::trace::v1::Span; +use opencensus_proto::trace::v1::{Span, TruncatableString}; +use std::collections::HashMap; use tokio::{sync::mpsc, time}; use tokio_stream::wrappers::ReceiverStream; use tonic::{self as grpc, body::BoxBody, client::GrpcService}; -use tracing::{debug, trace}; +use tracing::{debug, info, trace}; pub async fn export_spans(client: T, node: Node, spans: S, metrics: Registry) where @@ -24,7 +26,7 @@ where T::Error: Into, T::ResponseBody: Default + HttpBody + Send + 'static, ::Error: Into + Send, - S: Stream + Unpin, + S: Stream + Unpin, { debug!("Span exporter running"); SpanExporter::new(client, node, spans, metrics).run().await @@ -49,7 +51,7 @@ where T::Error: Into, T::ResponseBody: Default + HttpBody + Send + 'static, ::Error: Into + Send, - S: Stream + Unpin, + S: Stream + Unpin, { const MAX_BATCH_SIZE: usize = 1000; const MAX_BATCH_IDLE: time::Duration = time::Duration::from_secs(10); @@ -175,6 +177,13 @@ where res = spans.next() => match res { Some(span) => { trace!(?span, "Adding to batch"); + let span = match convert_span(span) { + Ok(span) => span, + Err(error) => { + info!(%error, "Span dropped"); + continue; + } + }; accum.push(span); } None => return Err(SpanRxClosed), @@ -192,3 +201,61 @@ where } } } + +fn convert_span(span: ExportSpan) -> Result { + use proto::trace::v1 as oc; + + let ExportSpan { + mut span, + kind, + labels, + } = span; + + let mut attributes = HashMap::::new(); + for (k, v) in labels.iter() { + attributes.insert( + k.clone(), + oc::AttributeValue { + value: Some(oc::attribute_value::Value::StringValue(truncatable( + v.clone(), + ))), + }, + ); + } + for (k, v) in span.labels.drain() { + attributes.insert( + k.to_string(), + oc::AttributeValue { + value: Some(oc::attribute_value::Value::StringValue(truncatable(v))), + }, + ); + } + Ok(Span { + trace_id: span.trace_id.into_bytes::<16>()?.to_vec(), + span_id: span.span_id.into_bytes::<8>()?.to_vec(), + tracestate: None, + parent_span_id: span.parent_id.into_bytes::<8>()?.to_vec(), + name: Some(truncatable(span.span_name)), + kind: kind as i32, + start_time: Some(span.start.into()), + end_time: Some(span.end.into()), + attributes: Some(oc::span::Attributes { + attribute_map: attributes, + dropped_attributes_count: 0, + }), + stack_trace: None, + time_events: None, + links: None, + status: None, // TODO: this is gRPC status; we must read response trailers to populate this + resource: None, + same_process_as_parent_span: Some(kind == SpanKind::Client), + child_span_count: None, + }) +} + +fn truncatable(value: String) -> TruncatableString { + TruncatableString { + value, + truncated_byte_count: 0, + } +} diff --git a/linkerd/opentelemetry/Cargo.toml b/linkerd/opentelemetry/Cargo.toml index 78c0ae7f72..dbd51627d5 100644 --- a/linkerd/opentelemetry/Cargo.toml +++ b/linkerd/opentelemetry/Cargo.toml @@ -12,6 +12,7 @@ http = "0.2" http-body = "0.4" linkerd-error = { path = "../error" } linkerd-metrics = { path = "../metrics" } +linkerd-trace-context = { path = "../trace-context" } opentelemetry = { version = "0.23", default-features = false, features = ["trace"] } opentelemetry_sdk = { version = "0.23", default-features = false, features = ["trace"] } opentelemetry-proto = { path = "../../opentelemetry-proto" } diff --git a/linkerd/opentelemetry/src/lib.rs b/linkerd/opentelemetry/src/lib.rs index 2f983cade7..90c0caeef6 100644 --- a/linkerd/opentelemetry/src/lib.rs +++ b/linkerd/opentelemetry/src/lib.rs @@ -6,17 +6,26 @@ pub mod metrics; use futures::stream::{Stream, StreamExt}; use http_body::Body as HttpBody; use linkerd_error::Error; +use linkerd_trace_context as trace_context; use metrics::Registry; +pub use opentelemetry as otel; +use opentelemetry::trace::{ + SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState, +}; +use opentelemetry::KeyValue; pub use opentelemetry_proto as proto; use opentelemetry_proto::proto::collector::trace::v1::trace_service_client::TraceServiceClient; use opentelemetry_proto::proto::collector::trace::v1::ExportTraceServiceRequest; use opentelemetry_proto::proto::trace::v1::ResourceSpans; use opentelemetry_proto::transform::common::ResourceAttributesWithSchema; use opentelemetry_proto::transform::trace::group_spans_by_resource_and_scope; +pub use opentelemetry_sdk as sdk; pub use opentelemetry_sdk::export::trace::SpanData; +use opentelemetry_sdk::trace::SpanLinks; use tokio::{sync::mpsc, time}; use tonic::{self as grpc, body::BoxBody, client::GrpcService}; -use tracing::{debug, trace}; +use trace_context::export::ExportSpan; +use tracing::{debug, info, trace}; pub async fn export_spans( client: T, @@ -28,7 +37,7 @@ pub async fn export_spans( T::Error: Into, T::ResponseBody: Default + HttpBody + Send + 'static, ::Error: Into + Send, - S: Stream + Unpin, + S: Stream + Unpin, { debug!("Span exporter running"); SpanExporter::new(client, spans, resource, metrics) @@ -55,7 +64,7 @@ where T::Error: Into, T::ResponseBody: Default + HttpBody + Send + 'static, ::Error: Into + Send, - S: Stream + Unpin, + S: Stream + Unpin, { const MAX_BATCH_SIZE: usize = 1000; const MAX_BATCH_IDLE: time::Duration = time::Duration::from_secs(10); @@ -189,6 +198,14 @@ where res = span_stream.next() => match res { Some(span) => { trace!(?span, "Adding to batch"); + let span = match convert_span(span) { + Ok(span) => span, + Err(error) => { + info!(%error, "Span dropped"); + continue; + } + }; + input_accum.push(span); } None => break Err(SpanRxClosed), @@ -210,3 +227,37 @@ where res } } + +fn convert_span(span: ExportSpan) -> Result { + let ExportSpan { span, kind, labels } = span; + + let mut attributes = Vec::::new(); + for (k, v) in labels.iter() { + attributes.push(KeyValue::new(k.clone(), v.clone())); + } + let is_remote = kind != trace_context::export::SpanKind::Client; + Ok(SpanData { + parent_span_id: SpanId::from_bytes(span.parent_id.into_bytes()?), + span_kind: match kind { + trace_context::export::SpanKind::Server => SpanKind::Server, + trace_context::export::SpanKind::Client => SpanKind::Client, + }, + name: span.span_name.into(), + start_time: span.start, + end_time: span.end, + attributes, + dropped_attributes_count: 0, + links: SpanLinks::default(), + status: Status::Unset, // TODO: this is gRPC status; we must read response trailers to populate this + resource: Default::default(), + span_context: SpanContext::new( + TraceId::from_bytes(span.trace_id.into_bytes()?), + SpanId::from_bytes(span.span_id.into_bytes()?), + TraceFlags::default(), + is_remote, + TraceState::NONE, + ), + events: Default::default(), + instrumentation_lib: Default::default(), + }) +} diff --git a/linkerd/trace-context/src/export.rs b/linkerd/trace-context/src/export.rs new file mode 100644 index 0000000000..262c98ee7f --- /dev/null +++ b/linkerd/trace-context/src/export.rs @@ -0,0 +1,18 @@ +use crate::Span; +use std::collections::HashMap; +use std::sync::Arc; + +#[derive(Copy, Clone, Debug, PartialEq)] +pub enum SpanKind { + Server = 1, + Client = 2, +} + +pub type SpanLabels = Arc>; + +#[derive(Debug)] +pub struct ExportSpan { + pub span: Span, + pub kind: SpanKind, + pub labels: SpanLabels, +} diff --git a/linkerd/trace-context/src/lib.rs b/linkerd/trace-context/src/lib.rs index ed8c36017c..f88b04bca4 100644 --- a/linkerd/trace-context/src/lib.rs +++ b/linkerd/trace-context/src/lib.rs @@ -1,6 +1,7 @@ #![deny(rust_2018_idioms, clippy::disallowed_methods, clippy::disallowed_types)] #![forbid(unsafe_code)] +pub mod export; mod propagation; mod service; @@ -18,6 +19,27 @@ const SPAN_ID_LEN: usize = 8; #[derive(Debug, Default)] pub struct Id(Vec); +#[derive(Debug, Error)] +#[error("ID '{:?} should have {} bytes, but it has {}", self.id, self.expected_size, self.actual_size)] +pub struct IdLengthError { + id: Vec, + expected_size: usize, + actual_size: usize, +} + +impl Id { + pub fn into_bytes(self) -> Result<[u8; N], IdLengthError> { + self.as_ref().try_into().map_err(|_| { + let bytes: Vec = self.into(); + IdLengthError { + expected_size: N, + actual_size: bytes.len(), + id: bytes, + } + }) + } +} + #[derive(Debug, Default)] pub struct Flags(u8); diff --git a/linkerd2-proxy/src/main.rs b/linkerd2-proxy/src/main.rs index 61c54102df..f198d135e2 100644 --- a/linkerd2-proxy/src/main.rs +++ b/linkerd2-proxy/src/main.rs @@ -108,14 +108,11 @@ fn main() { ), } - if let Some(oc) = app.opencensus_addr() { - match oc.identity.value() { - None => info!("OpenCensus tracing collector at {}", oc.addr), + if let Some(tracing) = app.tracing_addr() { + match tracing.identity.value() { + None => info!("Tracing collector at {}", tracing.addr), Some(tls) => { - info!( - "OpenCensus tracing collector at {} ({})", - oc.addr, tls.server_id - ) + info!("Tracing collector at {} ({})", tracing.addr, tls.server_id) } } } diff --git a/tools/src/bin/gen-protos.rs b/tools/src/bin/gen-protos.rs index 4e85f3bffe..7100d21633 100644 --- a/tools/src/bin/gen-protos.rs +++ b/tools/src/bin/gen-protos.rs @@ -1,8 +1,12 @@ +use std::path::{Path, PathBuf}; + fn main() { - let opencensus_dir = { - let manifest_dir = std::path::PathBuf::from(std::env!("CARGO_MANIFEST_DIR")); - manifest_dir.parent().unwrap().join("opencensus-proto") - }; + generate_opentelemetry_protos(); + generate_opencensus_protos(); +} + +fn generate_opencensus_protos() { + let opencensus_dir = get_proto_dir("opencensus-proto"); let out_dir = opencensus_dir.join("src").join("gen"); @@ -17,12 +21,39 @@ fn main() { ] }; + generate_protos(&out_dir, iface_files, &opencensus_dir); +} + +fn generate_opentelemetry_protos() { + let opentelemetry_dir = get_proto_dir("opentelemetry-proto"); + + let out_dir = opentelemetry_dir.join("src").join("gen"); + + let iface_files = { + let proto_dir = opentelemetry_dir.join("opentelemetry").join("proto"); + &[ + proto_dir.join("collector/trace/v1/trace_service.proto"), + proto_dir.join("common/v1/common.proto"), + proto_dir.join("resource/v1/resource.proto"), + proto_dir.join("trace/v1/trace.proto"), + ] + }; + + generate_protos(&out_dir, iface_files, &opentelemetry_dir); +} + +fn get_proto_dir(name: &str) -> PathBuf { + let manifest_dir = std::path::PathBuf::from(std::env!("CARGO_MANIFEST_DIR")); + manifest_dir.parent().unwrap().join(name) +} + +fn generate_protos(out_dir: &Path, iface_files: &[PathBuf], includes: &Path) { if let Err(error) = tonic_build::configure() .build_client(true) .build_server(false) .emit_rerun_if_changed(false) .out_dir(out_dir) - .compile(iface_files, &[opencensus_dir]) + .compile(iface_files, &[includes]) { eprintln!("\nfailed to compile protos: {}", error); }