diff --git a/Cargo.lock b/Cargo.lock index aca9ab8f2f..e7d34d2666 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1126,6 +1126,7 @@ dependencies = [ "linkerd-app-outbound", "linkerd-error", "linkerd-opencensus", + "linkerd-opentelemetry", "linkerd-tonic-stream", "rangemap", "regex", @@ -1184,6 +1185,7 @@ dependencies = [ "linkerd-meshtls", "linkerd-metrics", "linkerd-opencensus", + "linkerd-opentelemetry", "linkerd-proxy-api-resolve", "linkerd-proxy-balance", "linkerd-proxy-client-policy", @@ -1743,6 +1745,7 @@ dependencies = [ "http-body", "linkerd-error", "linkerd-metrics", + "linkerd-trace-context", "opencensus-proto", "tokio", "tokio-stream", @@ -1759,6 +1762,7 @@ dependencies = [ "http-body", "linkerd-error", "linkerd-metrics", + "linkerd-trace-context", "opentelemetry", "opentelemetry-proto", "opentelemetry_sdk", diff --git a/linkerd/app/Cargo.toml b/linkerd/app/Cargo.toml index af443530a3..1420c6edfe 100644 --- a/linkerd/app/Cargo.toml +++ b/linkerd/app/Cargo.toml @@ -25,6 +25,7 @@ linkerd-app-inbound = { path = "./inbound" } linkerd-app-outbound = { path = "./outbound" } linkerd-error = { path = "../error" } linkerd-opencensus = { path = "../opencensus" } +linkerd-opentelemetry = { path = "../opentelemetry" } linkerd-tonic-stream = { path = "../tonic-stream" } rangemap = "1" regex = "1" diff --git a/linkerd/app/core/Cargo.toml b/linkerd/app/core/Cargo.toml index 43e19a2816..fff82289f7 100644 --- a/linkerd/app/core/Cargo.toml +++ b/linkerd/app/core/Cargo.toml @@ -47,6 +47,7 @@ linkerd-io = { path = "../../io" } linkerd-meshtls = { path = "../../meshtls", default-features = false } linkerd-metrics = { path = "../../metrics", features = ["process", "stack"] } linkerd-opencensus = { path = "../../opencensus" } +linkerd-opentelemetry = { path = "../../opentelemetry" } linkerd-proxy-api-resolve = { path = "../../proxy/api-resolve" } linkerd-proxy-balance = { path = "../../proxy/balance" } linkerd-proxy-core = { path = "../../proxy/core" } diff --git a/linkerd/app/core/src/http_tracing.rs b/linkerd/app/core/src/http_tracing.rs index 6b24c599e3..b0ddb05ce5 100644 --- a/linkerd/app/core/src/http_tracing.rs +++ b/linkerd/app/core/src/http_tracing.rs @@ -1,139 +1,76 @@ use linkerd_error::Error; -use linkerd_opencensus::proto::trace::v1 as oc; use linkerd_stack::layer; -use linkerd_trace_context::{self as trace_context, TraceContext}; -use std::{collections::HashMap, sync::Arc}; -use thiserror::Error; +use linkerd_trace_context::{ + self as trace_context, + export::{ExportSpan, SpanKind, SpanLabels}, + Span, TraceContext, +}; +use std::{str::FromStr, sync::Arc}; use tokio::sync::mpsc; -pub type OpenCensusSink = Option>; -pub type Labels = Arc>; - -/// SpanConverter converts trace_context::Span objects into OpenCensus agent -/// protobuf span objects. SpanConverter receives trace_context::Span objects by -/// implmenting the SpanSink trait. For each span that it receives, it converts -/// it to an OpenCensus span and then sends it on the provided mpsc::Sender. -#[derive(Clone)] -pub struct SpanConverter { - kind: Kind, - sink: mpsc::Sender, - labels: Labels, +#[derive(Debug, Copy, Clone, Default)] +pub enum CollectorProtocol { + #[default] + OpenCensus, + OpenTelemetry, } -#[derive(Debug, Error)] -#[error("ID '{:?} should have {} bytes, but it has {}", self.id, self.expected_size, self.actual_size)] -pub struct IdLengthError { - id: Vec, - expected_size: usize, - actual_size: usize, +impl FromStr for CollectorProtocol { + type Err = (); + + fn from_str(s: &str) -> Result { + if s.eq_ignore_ascii_case("opencensus") { + Ok(Self::OpenCensus) + } else if s.eq_ignore_ascii_case("opentelemetry") { + Ok(Self::OpenTelemetry) + } else { + Err(()) + } + } } +pub type SpanSink = mpsc::Sender; + pub fn server( - sink: OpenCensusSink, - labels: impl Into, + sink: Option, + labels: impl Into, ) -> impl layer::Layer, S>> + Clone { - SpanConverter::layer(Kind::Server, sink, labels) + TraceContext::layer(sink.map(move |sink| SpanConverter { + kind: SpanKind::Server, + sink, + labels: labels.into(), + })) } pub fn client( - sink: OpenCensusSink, - labels: impl Into, + sink: Option, + labels: impl Into, ) -> impl layer::Layer, S>> + Clone { - SpanConverter::layer(Kind::Client, sink, labels) -} - -#[derive(Copy, Clone, Debug, PartialEq)] -enum Kind { - Server = 1, - Client = 2, + TraceContext::layer(sink.map(move |sink| SpanConverter { + kind: SpanKind::Client, + sink, + labels: labels.into(), + })) } -impl SpanConverter { - fn layer( - kind: Kind, - sink: OpenCensusSink, - labels: impl Into, - ) -> impl layer::Layer, S>> + Clone { - TraceContext::layer(sink.map(move |sink| Self { - kind, - sink, - labels: labels.into(), - })) - } - - fn mk_span(&self, mut span: trace_context::Span) -> Result { - let mut attributes = HashMap::::new(); - for (k, v) in self.labels.iter() { - attributes.insert( - k.clone(), - oc::AttributeValue { - value: Some(oc::attribute_value::Value::StringValue(truncatable( - v.clone(), - ))), - }, - ); - } - for (k, v) in span.labels.drain() { - attributes.insert( - k.to_string(), - oc::AttributeValue { - value: Some(oc::attribute_value::Value::StringValue(truncatable(v))), - }, - ); - } - Ok(oc::Span { - trace_id: into_bytes(span.trace_id, 16)?, - span_id: into_bytes(span.span_id, 8)?, - tracestate: None, - parent_span_id: into_bytes(span.parent_id, 8)?, - name: Some(truncatable(span.span_name)), - kind: self.kind as i32, - start_time: Some(span.start.into()), - end_time: Some(span.end.into()), - attributes: Some(oc::span::Attributes { - attribute_map: attributes, - dropped_attributes_count: 0, - }), - stack_trace: None, - time_events: None, - links: None, - status: None, // TODO: this is gRPC status; we must read response trailers to populate this - resource: None, - same_process_as_parent_span: Some(self.kind == Kind::Client), - child_span_count: None, - }) - } +#[derive(Clone)] +pub struct SpanConverter { + kind: SpanKind, + sink: SpanSink, + labels: SpanLabels, } impl trace_context::SpanSink for SpanConverter { - #[inline] fn is_enabled(&self) -> bool { true } - fn try_send(&mut self, span: trace_context::Span) -> Result<(), Error> { - let span = self.mk_span(span)?; - self.sink.try_send(span).map_err(Into::into) - } -} - -fn into_bytes(id: trace_context::Id, size: usize) -> Result, IdLengthError> { - let bytes: Vec = id.into(); - if bytes.len() == size { - Ok(bytes) - } else { - let actual_size = bytes.len(); - Err(IdLengthError { - id: bytes, - expected_size: size, - actual_size, - }) - } -} - -fn truncatable(value: String) -> oc::TruncatableString { - oc::TruncatableString { - value, - truncated_byte_count: 0, + fn try_send(&mut self, span: Span) -> Result<(), Error> { + self.sink.try_send(ExportSpan { + span, + kind: self.kind, + labels: Arc::clone(&self.labels), + })?; + Ok(()) } } diff --git a/linkerd/app/core/src/lib.rs b/linkerd/app/core/src/lib.rs index fc98a3b621..8d3a433791 100644 --- a/linkerd/app/core/src/lib.rs +++ b/linkerd/app/core/src/lib.rs @@ -40,6 +40,7 @@ pub use linkerd_http_metrics as http_metrics; pub use linkerd_idle_cache as idle_cache; pub use linkerd_io as io; pub use linkerd_opencensus as opencensus; +pub use linkerd_opentelemetry as opentelemetry; pub use linkerd_service_profiles as profiles; pub use linkerd_stack_metrics as stack_metrics; pub use linkerd_stack_tracing as stack_tracing; @@ -65,7 +66,7 @@ pub struct ProxyRuntime { pub identity: identity::creds::Receiver, pub metrics: metrics::Proxy, pub tap: proxy::tap::Registry, - pub span_sink: http_tracing::OpenCensusSink, + pub span_sink: Option, pub drain: drain::Watch, } diff --git a/linkerd/app/core/src/metrics.rs b/linkerd/app/core/src/metrics.rs index 3fd374031b..084403e365 100644 --- a/linkerd/app/core/src/metrics.rs +++ b/linkerd/app/core/src/metrics.rs @@ -9,7 +9,7 @@ pub use crate::transport::labels::{TargetAddr, TlsAccept}; use crate::{ classify::Class, - control, http_metrics, opencensus, profiles, stack_metrics, + control, http_metrics, opencensus, opentelemetry, profiles, stack_metrics, svc::Param, tls, transport::{self, labels::TlsConnect}, @@ -39,6 +39,7 @@ pub struct Metrics { pub proxy: Proxy, pub control: ControlHttp, pub opencensus: opencensus::metrics::Registry, + pub opentelemetry: opentelemetry::metrics::Registry, } #[derive(Clone, Debug)] @@ -191,11 +192,13 @@ impl Metrics { }; let (opencensus, opencensus_report) = opencensus::metrics::new(); + let (opentelemetry, opentelemetry_report) = opentelemetry::metrics::new(); let metrics = Metrics { proxy, control, opencensus, + opentelemetry, }; let report = endpoint_report @@ -205,6 +208,7 @@ impl Metrics { .and_report(control_report) .and_report(transport_report) .and_report(opencensus_report) + .and_report(opentelemetry_report) .and_report(stack); (metrics, report) diff --git a/linkerd/app/inbound/src/lib.rs b/linkerd/app/inbound/src/lib.rs index 88ec18afb8..94f13701f6 100644 --- a/linkerd/app/inbound/src/lib.rs +++ b/linkerd/app/inbound/src/lib.rs @@ -18,11 +18,13 @@ mod server; #[cfg(any(test, feature = "test-util", fuzzing))] pub mod test_util; +#[cfg(fuzzing)] +pub use self::http::fuzz as http_fuzz; pub use self::{metrics::InboundMetrics, policy::DefaultPolicy}; use linkerd_app_core::{ config::{ConnectConfig, ProxyConfig, QueueConfig}, drain, - http_tracing::OpenCensusSink, + http_tracing::SpanSink, identity, io, proxy::{tap, tcp}, svc, @@ -33,9 +35,6 @@ use std::{fmt::Debug, time::Duration}; use thiserror::Error; use tracing::debug_span; -#[cfg(fuzzing)] -pub use self::http::fuzz as http_fuzz; - #[derive(Clone, Debug)] pub struct Config { pub allow_discovery: NameMatch, @@ -67,7 +66,7 @@ struct Runtime { metrics: InboundMetrics, identity: identity::creds::Receiver, tap: tap::Registry, - span_sink: OpenCensusSink, + span_sink: Option, drain: drain::Watch, } diff --git a/linkerd/app/outbound/src/http/server.rs b/linkerd/app/outbound/src/http/server.rs index dc8f77cb1c..0fb34ded58 100644 --- a/linkerd/app/outbound/src/http/server.rs +++ b/linkerd/app/outbound/src/http/server.rs @@ -47,7 +47,7 @@ impl Outbound> { .check_new_service::>() .push(ServerRescue::layer(config.emit_headers)) .check_new_service::>() - // Initiates OpenCensus tracing. + // Initiates OpenTelemetry tracing. .push_on_service(http_tracing::server(rt.span_sink.clone(), trace_labels())) .push_on_service(http::BoxResponse::layer()) // Convert origin form HTTP/1 URIs to absolute form for Hyper's diff --git a/linkerd/app/outbound/src/lib.rs b/linkerd/app/outbound/src/lib.rs index 75ee6c70d5..2cc8fccecc 100644 --- a/linkerd/app/outbound/src/lib.rs +++ b/linkerd/app/outbound/src/lib.rs @@ -6,11 +6,11 @@ #![allow(opaque_hidden_inferred_bound)] #![forbid(unsafe_code)] +use linkerd_app_core::http_tracing::SpanSink; use linkerd_app_core::{ config::{ProxyConfig, QueueConfig}, drain, exp_backoff::ExponentialBackoff, - http_tracing::OpenCensusSink, identity, io, metrics::prom, profiles, @@ -95,7 +95,7 @@ struct Runtime { metrics: OutboundMetrics, identity: identity::NewClient, tap: tap::Registry, - span_sink: OpenCensusSink, + span_sink: Option, drain: drain::Watch, } diff --git a/linkerd/app/src/env.rs b/linkerd/app/src/env.rs index 49e00a88ba..c4ca612596 100644 --- a/linkerd/app/src/env.rs +++ b/linkerd/app/src/env.rs @@ -1,8 +1,9 @@ -use crate::{dns, gateway, identity, inbound, oc_collector, outbound, policy, spire}; +use crate::{dns, gateway, identity, inbound, outbound, policy, spire, trace_collector}; use linkerd_app_core::{ addr, config::*, control::{Config as ControlConfig, ControlAddr}, + http_tracing::CollectorProtocol, proxy::http::{h1, h2}, tls, transport::{DualListenAddr, Keepalive, ListenAddr, UserTimeout}, @@ -19,7 +20,7 @@ use tracing::{debug, error, info, warn}; mod control; mod http2; -mod opencensus; +mod trace; mod types; use self::types::*; @@ -145,7 +146,8 @@ pub const ENV_OUTBOUND_MAX_IN_FLIGHT: &str = "LINKERD2_PROXY_OUTBOUND_MAX_IN_FLI const ENV_OUTBOUND_DISABLE_INFORMATIONAL_HEADERS: &str = "LINKERD2_PROXY_OUTBOUND_DISABLE_INFORMATIONAL_HEADERS"; -pub const ENV_TRACE_ATTRIBUTES_PATH: &str = "LINKERD2_PROXY_TRACE_ATTRIBUTES_PATH"; +const ENV_TRACE_ATTRIBUTES_PATH: &str = "LINKERD2_PROXY_TRACE_ATTRIBUTES_PATH"; +const ENV_TRACE_PROTOCOL: &str = "LINKERD2_PROXY_TRACE_PROTOCOL"; /// Constrains which destination names may be used for profile/route discovery. /// @@ -428,7 +430,8 @@ pub fn parse_config(strings: &S) -> Result let hostname = strings.get(ENV_HOSTNAME); - let oc_attributes_file_path = strings.get(ENV_TRACE_ATTRIBUTES_PATH); + let trace_attributes_file_path = strings.get(ENV_TRACE_ATTRIBUTES_PATH); + let trace_protocol = strings.get(ENV_TRACE_PROTOCOL); let trace_collector_addr = parse_control_addr(strings, ENV_TRACE_COLLECTOR_SVC_BASE); @@ -813,8 +816,8 @@ pub fn parse_config(strings: &S) -> Result .into(), }; - let oc_collector = match trace_collector_addr? { - None => oc_collector::Config::Disabled, + let trace_collector = match trace_collector_addr? { + None => trace_collector::Config::Disabled, Some(addr) => { let connect = if addr.addr.is_loopback() { inbound.proxy.connect.clone() @@ -826,14 +829,20 @@ pub fn parse_config(strings: &S) -> Result } else { outbound.http_request_queue.failfast_timeout }; - let attributes = oc_attributes_file_path + let attributes = trace_attributes_file_path .map(|path| match path.and_then(|p| p.parse::().ok()) { - Some(path) => opencensus::read_trace_attributes(&path), + Some(path) => trace::read_trace_attributes(&path), None => HashMap::new(), }) .unwrap_or_default(); - oc_collector::Config::Enabled(Box::new(oc_collector::EnabledConfig { + let trace_protocol = trace_protocol + .map(|proto| proto.and_then(|p| p.parse::().ok())) + .ok() + .flatten() + .unwrap_or_default(); + + trace_collector::Config::Enabled(Box::new(trace_collector::EnabledConfig { attributes, hostname: hostname?, control: ControlConfig { @@ -844,6 +853,7 @@ pub fn parse_config(strings: &S) -> Result failfast_timeout, }, }, + kind: trace_protocol, })) } }; @@ -923,7 +933,7 @@ pub fn parse_config(strings: &S) -> Result dns, dst, tap, - oc_collector, + trace_collector, policy, identity, outbound, diff --git a/linkerd/app/src/env/opencensus.rs b/linkerd/app/src/env/trace.rs similarity index 100% rename from linkerd/app/src/env/opencensus.rs rename to linkerd/app/src/env/trace.rs diff --git a/linkerd/app/src/lib.rs b/linkerd/app/src/lib.rs index aa73893851..4fceec08dd 100644 --- a/linkerd/app/src/lib.rs +++ b/linkerd/app/src/lib.rs @@ -7,10 +7,10 @@ pub mod dst; pub mod env; pub mod identity; -pub mod oc_collector; pub mod policy; pub mod spire; pub mod tap; +pub mod trace_collector; pub use self::metrics::Metrics; use futures::{future, Future, FutureExt}; @@ -60,7 +60,7 @@ pub struct Config { pub policy: policy::Config, pub admin: admin::Config, pub tap: tap::Config, - pub oc_collector: oc_collector::Config, + pub trace_collector: trace_collector::Config, /// Grace period for graceful shutdowns. /// @@ -75,7 +75,7 @@ pub struct App { dst: ControlAddr, identity: identity::Identity, inbound_addr: Local, - oc_collector: oc_collector::OcCollector, + trace_collector: trace_collector::TraceCollector, outbound_addr: Local, outbound_addr_additional: Option>, start_proxy: Pin + Send + 'static>>, @@ -123,7 +123,7 @@ impl Config { policy, identity, inbound, - oc_collector, + trace_collector, outbound, gateway, tap, @@ -188,16 +188,27 @@ impl Config { }) }?; - debug!(config = ?oc_collector, "Building client"); - let oc_collector = { - let control_metrics = - ControlMetrics::register(registry.sub_registry_with_prefix("opencensus")); + debug!(config = ?trace_collector, "Building client"); + let trace_collector = { + let control_metrics = if let Some(prefix) = trace_collector.metrics_prefix() { + ControlMetrics::register(registry.sub_registry_with_prefix(prefix)) + } else { + ControlMetrics::register(&mut prom::Registry::default()) + }; let identity = identity.receiver().new_client(); let dns = dns.resolver; let client_metrics = metrics.control.clone(); - let metrics = metrics.opencensus; - info_span!("opencensus").in_scope(|| { - oc_collector.build(identity, dns, metrics, control_metrics, client_metrics) + let otel_metrics = metrics.opentelemetry; + let oc_metrics = metrics.opencensus; + info_span!("tracing").in_scope(|| { + trace_collector.build( + identity, + dns, + oc_metrics, + otel_metrics, + control_metrics, + client_metrics, + ) }) }?; @@ -205,7 +216,7 @@ impl Config { identity: identity.receiver(), metrics: metrics.proxy, tap: tap.registry(), - span_sink: oc_collector.span_sink(), + span_sink: trace_collector.span_sink(), drain: drain_rx.clone(), }; let inbound = Inbound::new(inbound, runtime.clone()); @@ -309,7 +320,7 @@ impl Config { drain: drain_tx, identity, inbound_addr, - oc_collector, + trace_collector, outbound_addr, outbound_addr_additional, start_proxy, @@ -369,10 +380,10 @@ impl App { self.identity.receiver().local_id().clone() } - pub fn opencensus_addr(&self) -> Option<&ControlAddr> { - match self.oc_collector { - oc_collector::OcCollector::Disabled { .. } => None, - oc_collector::OcCollector::Enabled(ref oc) => Some(&oc.addr), + pub fn tracing_addr(&self) -> Option<&ControlAddr> { + match self.trace_collector { + trace_collector::TraceCollector::Disabled { .. } => None, + crate::trace_collector::TraceCollector::Enabled(ref oc) => Some(&oc.addr), } } @@ -381,7 +392,7 @@ impl App { admin, drain, identity, - oc_collector, + trace_collector: collector, start_proxy, tap, .. @@ -446,8 +457,8 @@ impl App { tokio::spawn(serve.instrument(info_span!("tap").or_current())); } - if let oc_collector::OcCollector::Enabled(oc) = oc_collector { - tokio::spawn(oc.task.instrument(info_span!("opencensus").or_current())); + if let trace_collector::TraceCollector::Enabled(collector) = collector { + tokio::spawn(collector.task.instrument(info_span!("tracing"))); } // we don't care if the admin shutdown channel is diff --git a/linkerd/app/src/oc_collector.rs b/linkerd/app/src/oc_collector.rs deleted file mode 100644 index e8f25e23c5..0000000000 --- a/linkerd/app/src/oc_collector.rs +++ /dev/null @@ -1,103 +0,0 @@ -use linkerd_app_core::{ - control, dns, identity, metrics::ControlHttp as HttpMetrics, svc::NewService, Error, -}; -use linkerd_opencensus::{self as opencensus, metrics, proto}; -use std::{collections::HashMap, future::Future, pin::Pin, time::SystemTime}; -use tokio::sync::mpsc; -use tokio_stream::wrappers::ReceiverStream; -use tracing::Instrument; - -#[derive(Clone, Debug)] -pub enum Config { - Disabled, - Enabled(Box), -} - -#[derive(Clone, Debug)] -pub struct EnabledConfig { - pub control: control::Config, - pub attributes: HashMap, - pub hostname: Option, -} - -pub type Task = Pin + Send + 'static>>; - -pub type SpanSink = mpsc::Sender; - -pub enum OcCollector { - Disabled, - Enabled(Box), -} - -pub struct EnabledCollector { - pub addr: control::ControlAddr, - pub span_sink: SpanSink, - pub task: Task, -} - -impl Config { - const SPAN_BUFFER_CAPACITY: usize = 100; - const SERVICE_NAME: &'static str = "linkerd-proxy"; - - pub fn build( - self, - identity: identity::NewClient, - dns: dns::Resolver, - legacy_metrics: metrics::Registry, - control_metrics: control::Metrics, - client_metrics: HttpMetrics, - ) -> Result { - match self { - Config::Disabled => Ok(OcCollector::Disabled), - Config::Enabled(inner) => { - let addr = inner.control.addr.clone(); - let svc = inner - .control - .build(dns, client_metrics, control_metrics, identity) - .new_service(()); - - let (span_sink, spans_rx) = mpsc::channel(Self::SPAN_BUFFER_CAPACITY); - let spans_rx = ReceiverStream::new(spans_rx); - - let task = { - use self::proto::agent::common::v1 as oc; - - let node = oc::Node { - identifier: Some(oc::ProcessIdentifier { - host_name: inner.hostname.unwrap_or_default(), - pid: std::process::id(), - start_timestamp: Some(SystemTime::now().into()), - }), - service_info: Some(oc::ServiceInfo { - name: Self::SERVICE_NAME.to_string(), - }), - attributes: inner.attributes, - ..oc::Node::default() - }; - - let addr = addr.clone(); - Box::pin( - opencensus::export_spans(svc, node, spans_rx, legacy_metrics).instrument( - tracing::debug_span!("opencensus", peer.addr = %addr).or_current(), - ), - ) - }; - - Ok(OcCollector::Enabled(Box::new(EnabledCollector { - addr, - task, - span_sink, - }))) - } - } - } -} - -impl OcCollector { - pub fn span_sink(&self) -> Option { - match self { - OcCollector::Disabled => None, - OcCollector::Enabled(inner) => Some(inner.span_sink.clone()), - } - } -} diff --git a/linkerd/app/src/trace_collector.rs b/linkerd/app/src/trace_collector.rs new file mode 100644 index 0000000000..2e93732179 --- /dev/null +++ b/linkerd/app/src/trace_collector.rs @@ -0,0 +1,103 @@ +use linkerd_app_core::http_tracing::{CollectorProtocol, SpanSink}; +use linkerd_app_core::metrics::ControlHttp as HttpMetrics; +use linkerd_app_core::svc::NewService; +use linkerd_app_core::{control, dns, identity, opencensus, opentelemetry}; +use linkerd_error::Error; +use std::collections::HashMap; +use std::future::Future; +use std::pin::Pin; + +pub mod oc_collector; +pub mod otel_collector; + +const SPAN_BUFFER_CAPACITY: usize = 100; +const SERVICE_NAME: &str = "linkerd-proxy"; + +#[derive(Clone, Debug)] +pub enum Config { + Disabled, + Enabled(Box), +} + +#[derive(Clone, Debug)] +pub struct EnabledConfig { + pub control: control::Config, + pub attributes: HashMap, + pub hostname: Option, + pub kind: CollectorProtocol, +} + +pub type Task = Pin + Send + 'static>>; + +pub enum TraceCollector { + Disabled, + Enabled(Box), +} + +pub struct EnabledCollector { + pub addr: control::ControlAddr, + pub kind: CollectorProtocol, + pub span_sink: SpanSink, + pub task: Task, +} + +impl TraceCollector { + pub fn span_sink(&self) -> Option { + match self { + TraceCollector::Disabled => None, + TraceCollector::Enabled(inner) => Some(inner.span_sink.clone()), + } + } +} + +impl Config { + pub fn metrics_prefix(&self) -> Option<&'static str> { + match self { + Config::Disabled => None, + Config::Enabled(config) => match config.kind { + CollectorProtocol::OpenCensus => Some("opencensus"), + CollectorProtocol::OpenTelemetry => Some("opentelemetry"), + }, + } + } + + pub fn build( + self, + identity: identity::NewClient, + dns: dns::Resolver, + legacy_oc_metrics: opencensus::metrics::Registry, + legacy_otel_metrics: opentelemetry::metrics::Registry, + control_metrics: control::Metrics, + client_metrics: HttpMetrics, + ) -> Result { + match self { + Config::Disabled => Ok(TraceCollector::Disabled), + Config::Enabled(inner) => { + let addr = inner.control.addr.clone(); + let svc = inner + .control + .build(dns, client_metrics, control_metrics, identity) + .new_service(()); + + let collector = match inner.kind { + CollectorProtocol::OpenCensus => oc_collector::create_collector( + addr.clone(), + inner.hostname, + inner.attributes, + svc, + legacy_oc_metrics, + ), + CollectorProtocol::OpenTelemetry => otel_collector::create_collector( + addr.clone(), + inner.hostname, + inner.attributes, + svc, + legacy_otel_metrics, + ), + }; + + Ok(TraceCollector::Enabled(Box::new(collector))) + } + } + } +} diff --git a/linkerd/app/src/trace_collector/oc_collector.rs b/linkerd/app/src/trace_collector/oc_collector.rs new file mode 100644 index 0000000000..ee80db652e --- /dev/null +++ b/linkerd/app/src/trace_collector/oc_collector.rs @@ -0,0 +1,58 @@ +use crate::trace_collector::EnabledCollector; +use linkerd_app_core::{ + control::ControlAddr, http_tracing::CollectorProtocol, proxy::http::HttpBody, Error, +}; +use linkerd_opencensus::{self as opencensus, metrics, proto}; +use std::{collections::HashMap, time::SystemTime}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{body::BoxBody, client::GrpcService}; +use tracing::Instrument; + +pub(super) fn create_collector( + addr: ControlAddr, + hostname: Option, + attributes: HashMap, + svc: S, + legacy_metrics: metrics::Registry, +) -> EnabledCollector +where + S: GrpcService + Clone + Send + 'static, + S::Error: Into, + S::Future: Send, + S::ResponseBody: Default + HttpBody + Send + 'static, + ::Error: Into + Send, +{ + let (span_sink, spans_rx) = mpsc::channel(crate::trace_collector::SPAN_BUFFER_CAPACITY); + let spans_rx = ReceiverStream::new(spans_rx); + + let task = { + use self::proto::agent::common::v1 as oc; + + let node = oc::Node { + identifier: Some(oc::ProcessIdentifier { + host_name: hostname.unwrap_or_default(), + pid: std::process::id(), + start_timestamp: Some(SystemTime::now().into()), + }), + service_info: Some(oc::ServiceInfo { + name: crate::trace_collector::SERVICE_NAME.to_string(), + }), + attributes, + ..oc::Node::default() + }; + + let addr = addr.clone(); + Box::pin( + opencensus::export_spans(svc, node, spans_rx, legacy_metrics) + .instrument(tracing::debug_span!("opencensus", peer.addr = %addr).or_current()), + ) + }; + + EnabledCollector { + addr, + task, + span_sink, + kind: CollectorProtocol::OpenCensus, + } +} diff --git a/linkerd/app/src/trace_collector/otel_collector.rs b/linkerd/app/src/trace_collector/otel_collector.rs new file mode 100644 index 0000000000..f933389b8e --- /dev/null +++ b/linkerd/app/src/trace_collector/otel_collector.rs @@ -0,0 +1,112 @@ +use super::EnabledCollector; +use linkerd_app_core::{ + control::ControlAddr, http_tracing::CollectorProtocol, proxy::http::HttpBody, Error, +}; +use linkerd_opentelemetry::{ + self as opentelemetry, metrics, + proto::proto::common::v1::{any_value, AnyValue, KeyValue}, + proto::transform::common::ResourceAttributesWithSchema, +}; +use std::{collections::HashMap, time::SystemTime, time::UNIX_EPOCH}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{body::BoxBody, client::GrpcService}; +use tracing::Instrument; + +pub(super) fn create_collector( + addr: ControlAddr, + hostname: Option, + attributes: HashMap, + svc: S, + legacy_metrics: metrics::Registry, +) -> EnabledCollector +where + S: GrpcService + Clone + Send + 'static, + S::Error: Into, + S::Future: Send, + S::ResponseBody: Default + HttpBody + Send + 'static, + ::Error: Into + Send, +{ + let (span_sink, spans_rx) = mpsc::channel(crate::trace_collector::SPAN_BUFFER_CAPACITY); + let spans_rx = ReceiverStream::new(spans_rx); + + let mut resources = ResourceAttributesWithSchema::default(); + + resources + .attributes + .0 + .push(crate::trace_collector::SERVICE_NAME.with_key("service.name")); + resources + .attributes + .0 + .push((std::process::id() as i64).with_key("process.pid")); + + resources.attributes.0.push( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs() as i64) + .unwrap_or_else(|e| -(e.duration().as_secs() as i64)) + .with_key("process.start_timestamp"), + ); + resources + .attributes + .0 + .push(hostname.unwrap_or_default().with_key("host.name")); + + resources.attributes.0.extend( + attributes + .into_iter() + .map(|(key, value)| value.with_key(&key)), + ); + + let addr = addr.clone(); + let task = Box::pin( + opentelemetry::export_spans(svc, spans_rx, resources, legacy_metrics) + .instrument(tracing::debug_span!("opentelemetry", peer.addr = %addr).or_current()), + ); + + EnabledCollector { + addr, + task, + span_sink, + kind: CollectorProtocol::OpenTelemetry, + } +} + +trait IntoAnyValue +where + Self: Sized, +{ + fn into_any_value(self) -> AnyValue; + + fn with_key(self, key: &str) -> KeyValue { + KeyValue { + key: key.to_string(), + value: Some(self.into_any_value()), + } + } +} + +impl IntoAnyValue for String { + fn into_any_value(self) -> AnyValue { + AnyValue { + value: Some(any_value::Value::StringValue(self)), + } + } +} + +impl IntoAnyValue for &str { + fn into_any_value(self) -> AnyValue { + AnyValue { + value: Some(any_value::Value::StringValue(self.to_string())), + } + } +} + +impl IntoAnyValue for i64 { + fn into_any_value(self) -> AnyValue { + AnyValue { + value: Some(any_value::Value::IntValue(self)), + } + } +} diff --git a/linkerd/opencensus/Cargo.toml b/linkerd/opencensus/Cargo.toml index 65d1b45eed..87699ec356 100644 --- a/linkerd/opencensus/Cargo.toml +++ b/linkerd/opencensus/Cargo.toml @@ -12,6 +12,7 @@ http = "0.2" http-body = "0.4" linkerd-error = { path = "../error" } linkerd-metrics = { path = "../metrics" } +linkerd-trace-context = { path = "../trace-context" } opencensus-proto = { path = "../../opencensus-proto" } tonic = { version = "0.10", default-features = false, features = [ "prost", diff --git a/linkerd/opencensus/src/lib.rs b/linkerd/opencensus/src/lib.rs index dd7d0a156c..65fbf2c4b0 100644 --- a/linkerd/opencensus/src/lib.rs +++ b/linkerd/opencensus/src/lib.rs @@ -6,17 +6,19 @@ pub mod metrics; use futures::stream::{Stream, StreamExt}; use http_body::Body as HttpBody; use linkerd_error::Error; +use linkerd_trace_context::export::{ExportSpan, SpanKind}; use metrics::Registry; pub use opencensus_proto as proto; use opencensus_proto::agent::common::v1::Node; use opencensus_proto::agent::trace::v1::{ trace_service_client::TraceServiceClient, ExportTraceServiceRequest, }; -use opencensus_proto::trace::v1::Span; +use opencensus_proto::trace::v1::{Span, TruncatableString}; +use std::collections::HashMap; use tokio::{sync::mpsc, time}; use tokio_stream::wrappers::ReceiverStream; use tonic::{self as grpc, body::BoxBody, client::GrpcService}; -use tracing::{debug, trace}; +use tracing::{debug, info, trace}; pub async fn export_spans(client: T, node: Node, spans: S, metrics: Registry) where @@ -24,7 +26,7 @@ where T::Error: Into, T::ResponseBody: Default + HttpBody + Send + 'static, ::Error: Into + Send, - S: Stream + Unpin, + S: Stream + Unpin, { debug!("Span exporter running"); SpanExporter::new(client, node, spans, metrics).run().await @@ -49,7 +51,7 @@ where T::Error: Into, T::ResponseBody: Default + HttpBody + Send + 'static, ::Error: Into + Send, - S: Stream + Unpin, + S: Stream + Unpin, { const MAX_BATCH_SIZE: usize = 1000; const MAX_BATCH_IDLE: time::Duration = time::Duration::from_secs(10); @@ -175,6 +177,13 @@ where res = spans.next() => match res { Some(span) => { trace!(?span, "Adding to batch"); + let span = match convert_span(span) { + Ok(span) => span, + Err(error) => { + info!(%error, "Span dropped"); + continue; + } + }; accum.push(span); } None => return Err(SpanRxClosed), @@ -192,3 +201,61 @@ where } } } + +fn convert_span(span: ExportSpan) -> Result { + use proto::trace::v1 as oc; + + let ExportSpan { + mut span, + kind, + labels, + } = span; + + let mut attributes = HashMap::::new(); + for (k, v) in labels.iter() { + attributes.insert( + k.clone(), + oc::AttributeValue { + value: Some(oc::attribute_value::Value::StringValue(truncatable( + v.clone(), + ))), + }, + ); + } + for (k, v) in span.labels.drain() { + attributes.insert( + k.to_string(), + oc::AttributeValue { + value: Some(oc::attribute_value::Value::StringValue(truncatable(v))), + }, + ); + } + Ok(Span { + trace_id: span.trace_id.into_bytes::<16>()?.to_vec(), + span_id: span.span_id.into_bytes::<8>()?.to_vec(), + tracestate: None, + parent_span_id: span.parent_id.into_bytes::<8>()?.to_vec(), + name: Some(truncatable(span.span_name)), + kind: kind as i32, + start_time: Some(span.start.into()), + end_time: Some(span.end.into()), + attributes: Some(oc::span::Attributes { + attribute_map: attributes, + dropped_attributes_count: 0, + }), + stack_trace: None, + time_events: None, + links: None, + status: None, // TODO: this is gRPC status; we must read response trailers to populate this + resource: None, + same_process_as_parent_span: Some(kind == SpanKind::Client), + child_span_count: None, + }) +} + +fn truncatable(value: String) -> TruncatableString { + TruncatableString { + value, + truncated_byte_count: 0, + } +} diff --git a/linkerd/opentelemetry/Cargo.toml b/linkerd/opentelemetry/Cargo.toml index 8ae7509f05..4322449bca 100644 --- a/linkerd/opentelemetry/Cargo.toml +++ b/linkerd/opentelemetry/Cargo.toml @@ -12,6 +12,7 @@ http = "0.2" http-body = "0.4" linkerd-error = { path = "../error" } linkerd-metrics = { path = "../metrics" } +linkerd-trace-context = { path = "../trace-context" } opentelemetry = { version = "0.25", default-features = false, features = ["trace"] } opentelemetry_sdk = { version = "0.25", default-features = false, features = ["trace"] } opentelemetry-proto = { path = "../../opentelemetry-proto" } diff --git a/linkerd/opentelemetry/src/lib.rs b/linkerd/opentelemetry/src/lib.rs index 2f983cade7..10bcc6ad42 100644 --- a/linkerd/opentelemetry/src/lib.rs +++ b/linkerd/opentelemetry/src/lib.rs @@ -6,17 +6,26 @@ pub mod metrics; use futures::stream::{Stream, StreamExt}; use http_body::Body as HttpBody; use linkerd_error::Error; +use linkerd_trace_context as trace_context; use metrics::Registry; +pub use opentelemetry as otel; +use opentelemetry::trace::{ + SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState, +}; +use opentelemetry::KeyValue; pub use opentelemetry_proto as proto; use opentelemetry_proto::proto::collector::trace::v1::trace_service_client::TraceServiceClient; use opentelemetry_proto::proto::collector::trace::v1::ExportTraceServiceRequest; use opentelemetry_proto::proto::trace::v1::ResourceSpans; use opentelemetry_proto::transform::common::ResourceAttributesWithSchema; use opentelemetry_proto::transform::trace::group_spans_by_resource_and_scope; +pub use opentelemetry_sdk as sdk; pub use opentelemetry_sdk::export::trace::SpanData; +use opentelemetry_sdk::trace::SpanLinks; use tokio::{sync::mpsc, time}; use tonic::{self as grpc, body::BoxBody, client::GrpcService}; -use tracing::{debug, trace}; +use trace_context::export::ExportSpan; +use tracing::{debug, info, trace}; pub async fn export_spans( client: T, @@ -28,7 +37,7 @@ pub async fn export_spans( T::Error: Into, T::ResponseBody: Default + HttpBody + Send + 'static, ::Error: Into + Send, - S: Stream + Unpin, + S: Stream + Unpin, { debug!("Span exporter running"); SpanExporter::new(client, spans, resource, metrics) @@ -55,7 +64,7 @@ where T::Error: Into, T::ResponseBody: Default + HttpBody + Send + 'static, ::Error: Into + Send, - S: Stream + Unpin, + S: Stream + Unpin, { const MAX_BATCH_SIZE: usize = 1000; const MAX_BATCH_IDLE: time::Duration = time::Duration::from_secs(10); @@ -189,6 +198,14 @@ where res = span_stream.next() => match res { Some(span) => { trace!(?span, "Adding to batch"); + let span = match convert_span(span) { + Ok(span) => span, + Err(error) => { + info!(%error, "Span dropped"); + continue; + } + }; + input_accum.push(span); } None => break Err(SpanRxClosed), @@ -210,3 +227,36 @@ where res } } + +fn convert_span(span: ExportSpan) -> Result { + let ExportSpan { span, kind, labels } = span; + + let mut attributes = Vec::::new(); + for (k, v) in labels.iter() { + attributes.push(KeyValue::new(k.clone(), v.clone())); + } + let is_remote = kind != trace_context::export::SpanKind::Client; + Ok(SpanData { + parent_span_id: SpanId::from_bytes(span.parent_id.into_bytes()?), + span_kind: match kind { + trace_context::export::SpanKind::Server => SpanKind::Server, + trace_context::export::SpanKind::Client => SpanKind::Client, + }, + name: span.span_name.into(), + start_time: span.start, + end_time: span.end, + attributes, + dropped_attributes_count: 0, + links: SpanLinks::default(), + status: Status::Unset, // TODO: this is gRPC status; we must read response trailers to populate this + span_context: SpanContext::new( + TraceId::from_bytes(span.trace_id.into_bytes()?), + SpanId::from_bytes(span.span_id.into_bytes()?), + TraceFlags::default(), + is_remote, + TraceState::NONE, + ), + events: Default::default(), + instrumentation_lib: Default::default(), + }) +} diff --git a/linkerd/trace-context/src/export.rs b/linkerd/trace-context/src/export.rs new file mode 100644 index 0000000000..262c98ee7f --- /dev/null +++ b/linkerd/trace-context/src/export.rs @@ -0,0 +1,18 @@ +use crate::Span; +use std::collections::HashMap; +use std::sync::Arc; + +#[derive(Copy, Clone, Debug, PartialEq)] +pub enum SpanKind { + Server = 1, + Client = 2, +} + +pub type SpanLabels = Arc>; + +#[derive(Debug)] +pub struct ExportSpan { + pub span: Span, + pub kind: SpanKind, + pub labels: SpanLabels, +} diff --git a/linkerd/trace-context/src/lib.rs b/linkerd/trace-context/src/lib.rs index ed8c36017c..f88b04bca4 100644 --- a/linkerd/trace-context/src/lib.rs +++ b/linkerd/trace-context/src/lib.rs @@ -1,6 +1,7 @@ #![deny(rust_2018_idioms, clippy::disallowed_methods, clippy::disallowed_types)] #![forbid(unsafe_code)] +pub mod export; mod propagation; mod service; @@ -18,6 +19,27 @@ const SPAN_ID_LEN: usize = 8; #[derive(Debug, Default)] pub struct Id(Vec); +#[derive(Debug, Error)] +#[error("ID '{:?} should have {} bytes, but it has {}", self.id, self.expected_size, self.actual_size)] +pub struct IdLengthError { + id: Vec, + expected_size: usize, + actual_size: usize, +} + +impl Id { + pub fn into_bytes(self) -> Result<[u8; N], IdLengthError> { + self.as_ref().try_into().map_err(|_| { + let bytes: Vec = self.into(); + IdLengthError { + expected_size: N, + actual_size: bytes.len(), + id: bytes, + } + }) + } +} + #[derive(Debug, Default)] pub struct Flags(u8); diff --git a/linkerd2-proxy/src/main.rs b/linkerd2-proxy/src/main.rs index 61c54102df..f198d135e2 100644 --- a/linkerd2-proxy/src/main.rs +++ b/linkerd2-proxy/src/main.rs @@ -108,14 +108,11 @@ fn main() { ), } - if let Some(oc) = app.opencensus_addr() { - match oc.identity.value() { - None => info!("OpenCensus tracing collector at {}", oc.addr), + if let Some(tracing) = app.tracing_addr() { + match tracing.identity.value() { + None => info!("Tracing collector at {}", tracing.addr), Some(tls) => { - info!( - "OpenCensus tracing collector at {} ({})", - oc.addr, tls.server_id - ) + info!("Tracing collector at {} ({})", tracing.addr, tls.server_id) } } } diff --git a/tools/src/bin/gen-protos.rs b/tools/src/bin/gen-protos.rs index 4e85f3bffe..7100d21633 100644 --- a/tools/src/bin/gen-protos.rs +++ b/tools/src/bin/gen-protos.rs @@ -1,8 +1,12 @@ +use std::path::{Path, PathBuf}; + fn main() { - let opencensus_dir = { - let manifest_dir = std::path::PathBuf::from(std::env!("CARGO_MANIFEST_DIR")); - manifest_dir.parent().unwrap().join("opencensus-proto") - }; + generate_opentelemetry_protos(); + generate_opencensus_protos(); +} + +fn generate_opencensus_protos() { + let opencensus_dir = get_proto_dir("opencensus-proto"); let out_dir = opencensus_dir.join("src").join("gen"); @@ -17,12 +21,39 @@ fn main() { ] }; + generate_protos(&out_dir, iface_files, &opencensus_dir); +} + +fn generate_opentelemetry_protos() { + let opentelemetry_dir = get_proto_dir("opentelemetry-proto"); + + let out_dir = opentelemetry_dir.join("src").join("gen"); + + let iface_files = { + let proto_dir = opentelemetry_dir.join("opentelemetry").join("proto"); + &[ + proto_dir.join("collector/trace/v1/trace_service.proto"), + proto_dir.join("common/v1/common.proto"), + proto_dir.join("resource/v1/resource.proto"), + proto_dir.join("trace/v1/trace.proto"), + ] + }; + + generate_protos(&out_dir, iface_files, &opentelemetry_dir); +} + +fn get_proto_dir(name: &str) -> PathBuf { + let manifest_dir = std::path::PathBuf::from(std::env!("CARGO_MANIFEST_DIR")); + manifest_dir.parent().unwrap().join(name) +} + +fn generate_protos(out_dir: &Path, iface_files: &[PathBuf], includes: &Path) { if let Err(error) = tonic_build::configure() .build_client(true) .build_server(false) .emit_rerun_if_changed(false) .out_dir(out_dir) - .compile(iface_files, &[opencensus_dir]) + .compile(iface_files, &[includes]) { eprintln!("\nfailed to compile protos: {}", error); }