Skip to content

Commit

Permalink
Implement gRPC export for traces
Browse files Browse the repository at this point in the history
  • Loading branch information
inikulin committed May 24, 2024
1 parent 6dae45d commit 5642ad7
Show file tree
Hide file tree
Showing 13 changed files with 105 additions and 23 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,14 @@ slog-async = "2.3"
slog-json = "2.3"
slog-term = "2.4"
tempfile = "3.7"
tokio = "1.31.0"
tokio = "1.37.0"
thread_local = "1.1"
tikv-jemallocator = "0.5"
tikv-jemalloc-ctl = "0.5"
yaml-merge-keys = "0.5"

# needed for minver
async-stream = "0.3.5"
local-ip-address = "0.5.7"
lock_api = "0.4.11"
log = "0.4.20"
Expand Down
5 changes: 2 additions & 3 deletions examples/http_server/example_conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ telemetry:
enabled: true
# The output for the collected traces.
output:
jaeger_thrift_udp:
server_addr: "127.0.0.1:6831"
reporter_bind_addr: ~
open_telemetry_grpc:
endpoint_url: "http://localhost:4317"
# Sampling ratio.
#
# This can be any fractional value between `0.0` and `1.0`.
Expand Down
3 changes: 2 additions & 1 deletion foundations/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ telemetry-server = [
]

# Enables telemetry reporting over gRPC
telemetry-otlp-grpc = ["dep:tonic", "dep:tokio"]
telemetry-otlp-grpc = ["dep:tonic", "dep:tokio", "dep:hyper"]

# Enables experimental tokio runtime metrics
tokio-runtime-metrics = [
Expand Down Expand Up @@ -206,6 +206,7 @@ yaml-merge-keys = { workspace = true, optional = true, features = [
] }

# needed for minver purposes
async-stream = { workspace = true, optional = true }
local-ip-address = { workspace = true, optional = true }
lock_api = { workspace = true, optional = true }
log = { workspace = true, optional = true }
Expand Down
2 changes: 2 additions & 0 deletions foundations/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
//! [OpenTelemetry]: https://opentelemetry.io/
//! [gRPC]: https://grpc.io/

// NOTE: required to allow cfgs like `tokio_unstable` on nightly which is used in tests.
#![allow(unexpected_cfgs)]
#![warn(missing_docs)]
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
#![cfg_attr(docsrs, feature(doc_cfg))]
Expand Down
1 change: 1 addition & 0 deletions foundations/src/telemetry/log/log_volume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use slog::{Drain, Never, OwnedKVList, Record, SendSyncRefUnwindSafeDrain};

#[crate::telemetry::metrics::metrics(crate_path = "crate")]
mod foundations {
/// The number of produced log entries.
pub fn log_record_count(level: &'static str) -> Counter;
}

Expand Down
2 changes: 1 addition & 1 deletion foundations/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ mod scope;

mod driver;

#[cfg(feature = "tracing")]
#[cfg(all(feature = "tracing", feature = "telemetry-otlp-grpc"))]
mod otlp_conversion;

#[cfg(feature = "testing")]
Expand Down
20 changes: 19 additions & 1 deletion foundations/src/telemetry/otlp_conversion/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::ServiceInfo;
use opentelemetry_proto::tonic as otlp;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

pub(super) fn convert_service_info(
pub(super) fn convert_service_info_to_instrumentation_scope(
service_info: &ServiceInfo,
) -> otlp::common::v1::InstrumentationScope {
otlp::common::v1::InstrumentationScope {
Expand All @@ -12,6 +12,24 @@ pub(super) fn convert_service_info(
}
}

pub(super) fn convert_service_info_to_resource(
service_info: &ServiceInfo,
) -> otlp::resource::v1::Resource {
let service_name_attr = otlp::common::v1::KeyValue {
key: "service.name".to_string(),
value: Some(otlp::common::v1::AnyValue {
value: Some(otlp::common::v1::any_value::Value::StringValue(
service_info.name.to_string(),
)),
}),
};

otlp::resource::v1::Resource {
attributes: vec![service_name_attr],
dropped_attributes_count: 0,
}
}

pub(super) fn convert_time(time: SystemTime) -> u64 {
time.duration_since(UNIX_EPOCH)
.as_ref()
Expand Down
10 changes: 6 additions & 4 deletions foundations/src/telemetry/otlp_conversion/tracing.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use super::common::{convert_service_info, convert_time};
use super::common::{
convert_service_info_to_instrumentation_scope, convert_service_info_to_resource, convert_time,
};
use crate::ServiceInfo;
use cf_rustracing::log::Log;
use cf_rustracing::span::SpanReference;
Expand All @@ -13,7 +15,7 @@ fn convert_trace_id(span_state: &SpanContextState) -> Vec<u8> {
.high
.to_be_bytes()
.into_iter()
.chain(span_state.trace_id().low.to_be_bytes().into_iter())
.chain(span_state.trace_id().low.to_be_bytes())
.collect()
}

Expand Down Expand Up @@ -103,11 +105,11 @@ pub(crate) fn convert_span(
let (status_code, attributes) = convert_tags(&span);

otlp::trace::v1::ResourceSpans {
resource: None,
resource: Some(convert_service_info_to_resource(service_info)),
schema_url: Default::default(),
scope_spans: vec![otlp::trace::v1::ScopeSpans {
schema_url: Default::default(),
scope: Some(convert_service_info(service_info)),
scope: Some(convert_service_info_to_instrumentation_scope(service_info)),
spans: vec![otlp::trace::v1::Span {
trace_id: convert_trace_id(span_state),
span_id: span_state.span_id().to_be_bytes().to_vec(),
Expand Down
1 change: 1 addition & 0 deletions foundations/src/telemetry/settings/memory_profiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ impl MemoryProfilerSettings {
19
}

#[cfg(feature = "security")]
fn default_sandbox_profiling_syscalls() -> bool {
true
}
Expand Down
15 changes: 14 additions & 1 deletion foundations/src/telemetry/settings/otlp_grpc_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub struct OpenTelemetryGrpcOutputSettings {
///
/// # Default
/// Default value is the standard gRPC endpoints URL: `http://localhost:4317`.
/// See: https://opentelemetry.io/docs/languages/sdk-configuration/otlp-exporter/#otel_exporter_otlp_endpoint
/// See: <https://opentelemetry.io/docs/languages/sdk-configuration/otlp-exporter/#otel_exporter_otlp_endpoint>
#[serde(default = "OpenTelemetryGrpcOutputSettings::default_endpoint_url")]
pub endpoint_url: String,

Expand All @@ -24,6 +24,14 @@ pub struct OpenTelemetryGrpcOutputSettings {
/// Default value is `10` seconds.
#[serde(default = "OpenTelemetryGrpcOutputSettings::default_request_timeout_seconds")]
pub request_timeout_seconds: u64,

/// Maximum number of entries to be batched together and sent in one request.
///
/// # Default
///
/// Default value is `512`.
#[serde(default = "OpenTelemetryGrpcOutputSettings::default_max_batch_size")]
pub max_batch_size: usize,
}

#[cfg(not(feature = "settings"))]
Expand All @@ -33,6 +41,7 @@ impl Default for OpenTelemetryGrpcOutputSettings {
endpoint_url: OpenTelemetryGrpcOutputSettings::default_endpoint_url(),
request_timeout_seconds:
OpenTelemetryGrpcOutputSettings::default_request_timeout_seconds(),
max_batch_size: OpenTelemetryGrpcOutputSettings::default_max_batch_size(),
}
}
}
Expand All @@ -45,4 +54,8 @@ impl OpenTelemetryGrpcOutputSettings {
fn default_request_timeout_seconds() -> u64 {
10
}

fn default_max_batch_size() -> usize {
512
}
}
2 changes: 1 addition & 1 deletion foundations/src/telemetry/settings/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ pub struct JaegerThriftUdpOutputSettings {
/// The address of the Jaeger Thrift (UDP) agent.
///
/// The default value is the default Jaeger UDP server address.
/// See: https://www.jaegertracing.io/docs/1.31/getting-started/#all-in-one
/// See: <https://www.jaegertracing.io/docs/1.31/getting-started/#all-in-one>
#[serde(default = "JaegerThriftUdpOutputSettings::default_server_addr")]
pub server_addr: SocketAddr,

Expand Down
6 changes: 0 additions & 6 deletions foundations/src/telemetry/tracing/output_jaeger_thrift_udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ use futures_util::future::{BoxFuture, FutureExt as _};
use std::net::SocketAddr;
use std::net::{Ipv4Addr, Ipv6Addr};
use std::sync::Arc;
use std::thread;
use std::time::Duration;

#[cfg(feature = "logging")]
use crate::telemetry::log;
Expand Down Expand Up @@ -63,8 +61,6 @@ fn get_reporter_bind_addr(settings: &JaegerThriftUdpOutputSettings) -> Bootstrap
}

async fn do_export(reporter: JaegerCompactReporter, mut span_rx: SpanReceiver) {
const REPORTER_COOLDOWN_PERIOD: Duration = Duration::from_secs(2);

let reporter = Arc::new(reporter);

while let Some(span) = span_rx.recv().await {
Expand All @@ -79,8 +75,6 @@ async fn do_export(reporter: JaegerCompactReporter, mut span_rx: SpanReceiver) {

#[cfg(not(feature = "logging"))]
drop(e);

thread::sleep(REPORTER_COOLDOWN_PERIOD);
}
}
});
Expand Down
58 changes: 54 additions & 4 deletions foundations/src/telemetry/tracing/output_otlp_grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,27 @@ use crate::{BootstrapResult, ServiceInfo};
use anyhow::Context as _;
use cf_rustracing_jaeger::span::SpanReceiver;
use futures_util::future::{BoxFuture, FutureExt as _};
use hyper::http::uri::PathAndQuery;
use opentelemetry_proto::tonic::collector::trace::v1::{
ExportTraceServiceRequest, ExportTraceServiceResponse,
};
use opentelemetry_proto::tonic::trace::v1::ResourceSpans;
use std::time::Duration;
use tonic::client::Grpc;
use tonic::codec::ProstCodec;
use tonic::transport::Channel;
use tonic::{GrpcMethod, Request};

static COLLECTOR_PATH: &str = "/opentelemetry.proto.collector.trace.v1.TraceService/Export";
static TRACE_SERVICE: &str = "opentelemetry.proto.collector.trace.v1.TraceService";

pub(super) fn start(
service_info: ServiceInfo,
settings: &OpenTelemetryGrpcOutputSettings,
span_rx: SpanReceiver,
) -> BootstrapResult<BoxFuture<'static, BootstrapResult<()>>> {
let max_batch_size = settings.max_batch_size;

let grpc_channel = Channel::from_shared(format!("{}/v1/traces", settings.endpoint_url))?
.timeout(Duration::from_secs(settings.request_timeout_seconds));

Expand All @@ -25,17 +37,55 @@ pub(super) fn start(

let client = Grpc::new(grpc_channel);

do_export(client, service_info, span_rx).await
do_export(client, service_info, span_rx, max_batch_size).await
}
.boxed())
}

async fn do_export(
client: Grpc<Channel>,
mut client: Grpc<Channel>,
service_info: ServiceInfo,
span_rx: SpanReceiver,
mut span_rx: SpanReceiver,
max_batch_size: usize,
) -> BootstrapResult<()> {
todo!();
let mut batch = Vec::with_capacity(max_batch_size);

loop {
let received = span_rx.recv_many(&mut batch, max_batch_size).await;

if received == 0 {
break;
}

let resource_spans = batch
.drain(..)
.map(|span| convert_span(span, &service_info))
.collect();

client
.ready()
.await
.context("lost gRPC connection to the trace service")?;

client
.unary::<_, ExportTraceServiceResponse, _>(
create_request(resource_spans),
PathAndQuery::from_static(COLLECTOR_PATH),
ProstCodec::default(),
)
.await
.context("failed to send gRPC request to the trace service")?;
}

Ok(())
}

fn create_request(resource_spans: Vec<ResourceSpans>) -> Request<ExportTraceServiceRequest> {
let mut request = Request::new(ExportTraceServiceRequest { resource_spans });

request
.extensions_mut()
.insert(GrpcMethod::new(TRACE_SERVICE, "Export"));

request
}

0 comments on commit 5642ad7

Please sign in to comment.