Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate OpenTelemetry into the proxy #3221

Merged
merged 1 commit into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,7 @@ dependencies = [
"linkerd-app-outbound",
"linkerd-error",
"linkerd-opencensus",
"linkerd-opentelemetry",
"linkerd-tonic-stream",
"rangemap",
"regex",
Expand Down Expand Up @@ -1184,6 +1185,7 @@ dependencies = [
"linkerd-meshtls",
"linkerd-metrics",
"linkerd-opencensus",
"linkerd-opentelemetry",
"linkerd-proxy-api-resolve",
"linkerd-proxy-balance",
"linkerd-proxy-client-policy",
Expand Down Expand Up @@ -1743,6 +1745,7 @@ dependencies = [
"http-body",
"linkerd-error",
"linkerd-metrics",
"linkerd-trace-context",
"opencensus-proto",
"tokio",
"tokio-stream",
Expand All @@ -1759,6 +1762,7 @@ dependencies = [
"http-body",
"linkerd-error",
"linkerd-metrics",
"linkerd-trace-context",
"opentelemetry",
"opentelemetry-proto",
"opentelemetry_sdk",
Expand Down
1 change: 1 addition & 0 deletions linkerd/app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ linkerd-app-inbound = { path = "./inbound" }
linkerd-app-outbound = { path = "./outbound" }
linkerd-error = { path = "../error" }
linkerd-opencensus = { path = "../opencensus" }
linkerd-opentelemetry = { path = "../opentelemetry" }
linkerd-tonic-stream = { path = "../tonic-stream" }
rangemap = "1"
regex = "1"
Expand Down
1 change: 1 addition & 0 deletions linkerd/app/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ linkerd-io = { path = "../../io" }
linkerd-meshtls = { path = "../../meshtls", default-features = false }
linkerd-metrics = { path = "../../metrics", features = ["process", "stack"] }
linkerd-opencensus = { path = "../../opencensus" }
linkerd-opentelemetry = { path = "../../opentelemetry" }
linkerd-proxy-api-resolve = { path = "../../proxy/api-resolve" }
linkerd-proxy-balance = { path = "../../proxy/balance" }
linkerd-proxy-core = { path = "../../proxy/core" }
Expand Down
165 changes: 51 additions & 114 deletions linkerd/app/core/src/http_tracing.rs
Original file line number Diff line number Diff line change
@@ -1,139 +1,76 @@
use linkerd_error::Error;
use linkerd_opencensus::proto::trace::v1 as oc;
use linkerd_stack::layer;
use linkerd_trace_context::{self as trace_context, TraceContext};
use std::{collections::HashMap, sync::Arc};
use thiserror::Error;
use linkerd_trace_context::{
self as trace_context,
export::{ExportSpan, SpanKind, SpanLabels},
Span, TraceContext,
};
use std::{str::FromStr, sync::Arc};
use tokio::sync::mpsc;

pub type OpenCensusSink = Option<mpsc::Sender<oc::Span>>;
pub type Labels = Arc<HashMap<String, String>>;

/// SpanConverter converts trace_context::Span objects into OpenCensus agent
/// protobuf span objects. SpanConverter receives trace_context::Span objects by
/// implmenting the SpanSink trait. For each span that it receives, it converts
/// it to an OpenCensus span and then sends it on the provided mpsc::Sender.
#[derive(Clone)]
pub struct SpanConverter {
kind: Kind,
sink: mpsc::Sender<oc::Span>,
labels: Labels,
#[derive(Debug, Copy, Clone, Default)]
pub enum CollectorProtocol {
#[default]
OpenCensus,
OpenTelemetry,
}

#[derive(Debug, Error)]
#[error("ID '{:?} should have {} bytes, but it has {}", self.id, self.expected_size, self.actual_size)]
pub struct IdLengthError {
id: Vec<u8>,
expected_size: usize,
actual_size: usize,
impl FromStr for CollectorProtocol {
type Err = ();

fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.eq_ignore_ascii_case("opencensus") {
Ok(Self::OpenCensus)
} else if s.eq_ignore_ascii_case("opentelemetry") {
Ok(Self::OpenTelemetry)

Check warning on line 25 in linkerd/app/core/src/http_tracing.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/core/src/http_tracing.rs#L21-L25

Added lines #L21 - L25 were not covered by tests
} else {
Err(())

Check warning on line 27 in linkerd/app/core/src/http_tracing.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/core/src/http_tracing.rs#L27

Added line #L27 was not covered by tests
}
}
}

pub type SpanSink = mpsc::Sender<ExportSpan>;

pub fn server<S>(
sink: OpenCensusSink,
labels: impl Into<Labels>,
sink: Option<SpanSink>,
labels: impl Into<SpanLabels>,
) -> impl layer::Layer<S, Service = TraceContext<Option<SpanConverter>, S>> + Clone {
SpanConverter::layer(Kind::Server, sink, labels)
TraceContext::layer(sink.map(move |sink| SpanConverter {
kind: SpanKind::Server,
sink,
labels: labels.into(),

Check warning on line 41 in linkerd/app/core/src/http_tracing.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/core/src/http_tracing.rs#L39-L41

Added lines #L39 - L41 were not covered by tests
}))
}

pub fn client<S>(
sink: OpenCensusSink,
labels: impl Into<Labels>,
sink: Option<SpanSink>,
labels: impl Into<SpanLabels>,
) -> impl layer::Layer<S, Service = TraceContext<Option<SpanConverter>, S>> + Clone {
SpanConverter::layer(Kind::Client, sink, labels)
}

#[derive(Copy, Clone, Debug, PartialEq)]
enum Kind {
Server = 1,
Client = 2,
TraceContext::layer(sink.map(move |sink| SpanConverter {
kind: SpanKind::Client,
sink,
labels: labels.into(),

Check warning on line 52 in linkerd/app/core/src/http_tracing.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/core/src/http_tracing.rs#L50-L52

Added lines #L50 - L52 were not covered by tests
}))
}

impl SpanConverter {
fn layer<S>(
kind: Kind,
sink: OpenCensusSink,
labels: impl Into<Labels>,
) -> impl layer::Layer<S, Service = TraceContext<Option<Self>, S>> + Clone {
TraceContext::layer(sink.map(move |sink| Self {
kind,
sink,
labels: labels.into(),
}))
}

fn mk_span(&self, mut span: trace_context::Span) -> Result<oc::Span, IdLengthError> {
let mut attributes = HashMap::<String, oc::AttributeValue>::new();
for (k, v) in self.labels.iter() {
attributes.insert(
k.clone(),
oc::AttributeValue {
value: Some(oc::attribute_value::Value::StringValue(truncatable(
v.clone(),
))),
},
);
}
for (k, v) in span.labels.drain() {
attributes.insert(
k.to_string(),
oc::AttributeValue {
value: Some(oc::attribute_value::Value::StringValue(truncatable(v))),
},
);
}
Ok(oc::Span {
trace_id: into_bytes(span.trace_id, 16)?,
span_id: into_bytes(span.span_id, 8)?,
tracestate: None,
parent_span_id: into_bytes(span.parent_id, 8)?,
name: Some(truncatable(span.span_name)),
kind: self.kind as i32,
start_time: Some(span.start.into()),
end_time: Some(span.end.into()),
attributes: Some(oc::span::Attributes {
attribute_map: attributes,
dropped_attributes_count: 0,
}),
stack_trace: None,
time_events: None,
links: None,
status: None, // TODO: this is gRPC status; we must read response trailers to populate this
resource: None,
same_process_as_parent_span: Some(self.kind == Kind::Client),
child_span_count: None,
})
}
#[derive(Clone)]
pub struct SpanConverter {
kind: SpanKind,
sink: SpanSink,
labels: SpanLabels,
}

impl trace_context::SpanSink for SpanConverter {
#[inline]
fn is_enabled(&self) -> bool {
true
}

fn try_send(&mut self, span: trace_context::Span) -> Result<(), Error> {
let span = self.mk_span(span)?;
self.sink.try_send(span).map_err(Into::into)
}
}

fn into_bytes(id: trace_context::Id, size: usize) -> Result<Vec<u8>, IdLengthError> {
let bytes: Vec<u8> = id.into();
if bytes.len() == size {
Ok(bytes)
} else {
let actual_size = bytes.len();
Err(IdLengthError {
id: bytes,
expected_size: size,
actual_size,
})
}
}

fn truncatable(value: String) -> oc::TruncatableString {
oc::TruncatableString {
value,
truncated_byte_count: 0,
fn try_send(&mut self, span: Span) -> Result<(), Error> {
self.sink.try_send(ExportSpan {
span,
kind: self.kind,
labels: Arc::clone(&self.labels),

Check warning on line 72 in linkerd/app/core/src/http_tracing.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/core/src/http_tracing.rs#L68-L72

Added lines #L68 - L72 were not covered by tests
})?;
Ok(())

Check warning on line 74 in linkerd/app/core/src/http_tracing.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/core/src/http_tracing.rs#L74

Added line #L74 was not covered by tests
}
}
3 changes: 2 additions & 1 deletion linkerd/app/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub use linkerd_http_metrics as http_metrics;
pub use linkerd_idle_cache as idle_cache;
pub use linkerd_io as io;
pub use linkerd_opencensus as opencensus;
pub use linkerd_opentelemetry as opentelemetry;
pub use linkerd_service_profiles as profiles;
pub use linkerd_stack_metrics as stack_metrics;
pub use linkerd_stack_tracing as stack_tracing;
Expand All @@ -65,7 +66,7 @@ pub struct ProxyRuntime {
pub identity: identity::creds::Receiver,
pub metrics: metrics::Proxy,
pub tap: proxy::tap::Registry,
pub span_sink: http_tracing::OpenCensusSink,
pub span_sink: Option<http_tracing::SpanSink>,
pub drain: drain::Watch,
}

Expand Down
6 changes: 5 additions & 1 deletion linkerd/app/core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
pub use crate::transport::labels::{TargetAddr, TlsAccept};
use crate::{
classify::Class,
control, http_metrics, opencensus, profiles, stack_metrics,
control, http_metrics, opencensus, opentelemetry, profiles, stack_metrics,
svc::Param,
tls,
transport::{self, labels::TlsConnect},
Expand Down Expand Up @@ -39,6 +39,7 @@ pub struct Metrics {
pub proxy: Proxy,
pub control: ControlHttp,
pub opencensus: opencensus::metrics::Registry,
pub opentelemetry: opentelemetry::metrics::Registry,
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -191,11 +192,13 @@ impl Metrics {
};

let (opencensus, opencensus_report) = opencensus::metrics::new();
let (opentelemetry, opentelemetry_report) = opentelemetry::metrics::new();

let metrics = Metrics {
proxy,
control,
opencensus,
opentelemetry,
};

let report = endpoint_report
Expand All @@ -205,6 +208,7 @@ impl Metrics {
.and_report(control_report)
.and_report(transport_report)
.and_report(opencensus_report)
.and_report(opentelemetry_report)
.and_report(stack);

(metrics, report)
Expand Down
9 changes: 4 additions & 5 deletions linkerd/app/inbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ mod server;
#[cfg(any(test, feature = "test-util", fuzzing))]
pub mod test_util;

#[cfg(fuzzing)]
pub use self::http::fuzz as http_fuzz;
pub use self::{metrics::InboundMetrics, policy::DefaultPolicy};
use linkerd_app_core::{
config::{ConnectConfig, ProxyConfig, QueueConfig},
drain,
http_tracing::OpenCensusSink,
http_tracing::SpanSink,
identity, io,
proxy::{tap, tcp},
svc,
Expand All @@ -33,9 +35,6 @@ use std::{fmt::Debug, time::Duration};
use thiserror::Error;
use tracing::debug_span;

#[cfg(fuzzing)]
pub use self::http::fuzz as http_fuzz;

#[derive(Clone, Debug)]
pub struct Config {
pub allow_discovery: NameMatch,
Expand Down Expand Up @@ -67,7 +66,7 @@ struct Runtime {
metrics: InboundMetrics,
identity: identity::creds::Receiver,
tap: tap::Registry,
span_sink: OpenCensusSink,
span_sink: Option<SpanSink>,
drain: drain::Watch,
}

Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/outbound/src/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl<T> Outbound<svc::ArcNewCloneHttp<T>> {
.check_new_service::<T, http::Request<_>>()
.push(ServerRescue::layer(config.emit_headers))
.check_new_service::<T, http::Request<_>>()
// Initiates OpenCensus tracing.
// Initiates OpenTelemetry tracing.
.push_on_service(http_tracing::server(rt.span_sink.clone(), trace_labels()))
.push_on_service(http::BoxResponse::layer())
// Convert origin form HTTP/1 URIs to absolute form for Hyper's
Expand Down
4 changes: 2 additions & 2 deletions linkerd/app/outbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
#![allow(opaque_hidden_inferred_bound)]
#![forbid(unsafe_code)]

use linkerd_app_core::http_tracing::SpanSink;
use linkerd_app_core::{
config::{ProxyConfig, QueueConfig},
drain,
exp_backoff::ExponentialBackoff,
http_tracing::OpenCensusSink,
identity, io,
metrics::prom,
profiles,
Expand Down Expand Up @@ -95,7 +95,7 @@ struct Runtime {
metrics: OutboundMetrics,
identity: identity::NewClient,
tap: tap::Registry,
span_sink: OpenCensusSink,
span_sink: Option<SpanSink>,
drain: drain::Watch,
}

Expand Down
Loading
Loading