Skip to content

Commit

Permalink
feat: Add OpenTelemetry setup function
Browse files Browse the repository at this point in the history
  • Loading branch information
mzaniolo committed Jan 6, 2025
1 parent 7762eab commit 7eb6574
Show file tree
Hide file tree
Showing 3 changed files with 254 additions and 0 deletions.
8 changes: 8 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,18 @@ resolver = "2"
publish = ["famedly"]

[dependencies]
once_cell = "1.20.2"
opentelemetry = "0.27.1"
opentelemetry-appender-tracing = "0.27.0"
opentelemetry-otlp = "0.27.0"
opentelemetry-semantic-conventions = "0.27.0"
opentelemetry_sdk = { version = "0.27.1", features = ["rt-tokio"] }
serde = { version = "1.0.210", features = ["derive"] }
thiserror = "1.0.64"
time = { version = "0.3.36", optional = true }
tracing = "0.1.40"
tracing-opentelemetry = "0.28.0"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
url = { version = "2.5.2", features = ["serde"] }

[dev-dependencies]
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ mod base_url;
pub mod duration;
/// [serde::Deserialize] impl for [tracing::level_filters::LevelFilter]
mod level_filter;
/// Function to setup the telemetry tools
mod telemetry;

pub use base_url::{BaseUrl, BaseUrlParseError};
pub use level_filter::LevelFilter;
pub use telemetry::{init_otel, OtelConfig, OtelInitError, ProviderConfig};

/// Generic combinators
pub trait GenericCombinators {
Expand Down
243 changes: 243 additions & 0 deletions src/telemetry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
//! OpenTelemetry initialization
//!
//! Lib containing the definitions and initializations of the OpenTelemetry
//! tools
use std::str::FromStr;

use once_cell::sync::Lazy;
use opentelemetry::{
trace::{TraceError, TracerProvider as _},
KeyValue,
};
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_otlp::{LogExporter, SpanExporter, WithExportConfig as _};
use opentelemetry_sdk::{
logs::{LogError, LoggerProvider},
metrics::{MeterProviderBuilder, MetricError, PeriodicReader, SdkMeterProvider},
propagation::TraceContextPropagator,
runtime,
trace::{RandomIdGenerator, TracerProvider},
Resource,
};
use opentelemetry_semantic_conventions::{
resource::{SERVICE_NAME, SERVICE_VERSION},
SCHEMA_URL,
};
use serde::Deserialize;
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
use tracing_subscriber::{
layer::SubscriberExt as _, util::SubscriberInitExt as _, EnvFilter, Layer,
};
use url::Url;

use crate::LevelFilter;

const DEFAULT_FILTER: &str = "opentelemetry=off,tonic=off,h2=off,reqwest=off,axum=info,hyper=info,hyper-tls=info,tokio=info,tower=info,josekit=info,openssl=info";
const DEFAULT_LEVEL: &str = "info";
const DEFAULT_ENDPOINT: &str = "http://localhost:4317";

static RESOURCE: Lazy<Resource> = Lazy::new(|| {
Resource::from_schema_url(
[
KeyValue::new(SERVICE_NAME, env!("CARGO_PKG_NAME")),
KeyValue::new(SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
],
SCHEMA_URL,
)
});

fn init_traces(endpoint: Url) -> Result<TracerProvider, TraceError> {
let exporter = SpanExporter::builder().with_tonic().with_endpoint(endpoint).build()?;
let tracer_provider = TracerProvider::builder()
.with_id_generator(RandomIdGenerator::default())
.with_resource(RESOURCE.clone())
.with_batch_exporter(exporter, runtime::Tokio)
.build();

opentelemetry::global::set_tracer_provider(tracer_provider.clone());
Ok(tracer_provider)
}

fn init_metrics(endpoint: Url) -> Result<SdkMeterProvider, MetricError> {
let exporter = opentelemetry_otlp::MetricExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.with_temporality(opentelemetry_sdk::metrics::Temporality::default())
.build()
.unwrap();

let reader = PeriodicReader::builder(exporter, runtime::Tokio)
// TODO: Should this be configurable or not?
.with_interval(std::time::Duration::from_secs(1))
.build();

let meter_provider =
MeterProviderBuilder::default().with_resource(RESOURCE.clone()).with_reader(reader).build();

Ok(meter_provider)
}

fn init_logs(endpoint: Url) -> Result<LoggerProvider, LogError> {
let exporter = LogExporter::builder().with_tonic().with_endpoint(endpoint).build()?;

Ok(LoggerProvider::builder()
.with_resource(RESOURCE.clone())
.with_batch_exporter(exporter, runtime::Tokio)
.build())
}

/// Initializes the OpenTelemetry
pub fn init_otel(config: OtelConfig) -> Result<ProvidersGuard, OtelInitError> {
opentelemetry::global::set_text_map_propagator(TraceContextPropagator::default());

let stdout_layer = if config.stdout_enable {
let logger_config = config.logger.clone().unwrap_or_default();
let filter_fmt = EnvFilter::from_str(&logger_config.get_filter())?;

Some(tracing_subscriber::fmt::layer().with_thread_names(true).with_filter(filter_fmt))
} else {
None
};

let (logger_provider, logs_layer) = if config.logs_enable {
let logger_config = config.logger.unwrap_or_default();
let filter_otel = EnvFilter::from_str(&logger_config.get_filter())?;
let logger_provider = init_logs(logger_config.endpoint)?;

// Create a new OpenTelemetryTracingBridge using the above LoggerProvider.
let logs_layer = OpenTelemetryTracingBridge::new(&logger_provider);
let logs_layer = logs_layer.with_filter(filter_otel);

(Some(logger_provider), Some(logs_layer))
} else {
(None, None)
};

let (tracer_provider, tracer_layer) = if config.trace_enable {
let tracer_config = config.tracer.unwrap_or_default();

let trace_filter = EnvFilter::from_str(&tracer_config.get_filter())?;
let tracer_provider = init_traces(tracer_config.endpoint)?;
let tracer = tracer_provider.tracer(env!("CARGO_PKG_NAME"));
let tracer_layer = OpenTelemetryLayer::new(tracer).with_filter(trace_filter);

(Some(tracer_provider), Some(tracer_layer))
} else {
(None, None)
};

let (meter_provider, meter_layer) = if config.metrics_enable {
let meter_config = config.meter.unwrap_or_default();

let metrics_filter = EnvFilter::from_str(&meter_config.get_filter())?;
let meter_provider = init_metrics(meter_config.endpoint)?;
let meter_layer = MetricsLayer::new(meter_provider.clone()).with_filter(metrics_filter);

(Some(meter_provider), Some(meter_layer))
} else {
(None, None)
};

// Initialize the tracing subscriber with the OpenTelemetry layer, the
// stdout layer, traces and metrics.
tracing_subscriber::registry()
.with(logs_layer)
.with(stdout_layer)
.with(meter_layer)
.with(tracer_layer)
.init();

Ok(ProvidersGuard { logger_provider, tracer_provider, meter_provider })
}

/// Guarding object to make sure the providers are properly shutdown
#[derive(Debug)]
pub struct ProvidersGuard {
logger_provider: Option<LoggerProvider>,
tracer_provider: Option<TracerProvider>,
meter_provider: Option<SdkMeterProvider>,
}

impl Drop for ProvidersGuard {
fn drop(&mut self) {
self.logger_provider.as_ref().inspect(|logger_provider| {
if let Err(err) = logger_provider.shutdown() {
eprintln!("{err:?}");
}
});
self.tracer_provider.as_ref().inspect(|tracer_provider| {
if let Err(err) = tracer_provider.shutdown() {
eprintln!("{err:?}");
}
});
self.meter_provider.as_ref().inspect(|meter_provider| {
if let Err(err) = meter_provider.shutdown() {
eprintln!("{err:?}");
}
});
}
}

/// OpenTelemetry setup errors
#[allow(missing_docs)]
#[derive(Debug, thiserror::Error)]
pub enum OtelInitError {
#[error("Logger initialization error: {0}")]
LoggerInitError(#[from] LogError),
#[error("Tracer initialization error: {0}")]
TracerInitError(#[from] TraceError),
#[error("Meter initialization error: {0}")]
MeterInitError(#[from] MetricError),
#[error("Parsing EnvFilter directives error: {0}")]
EnvFilterError(#[from] tracing_subscriber::filter::ParseError),
#[error("Otel endpoint configuration is missing")]
MissingOtelEndpoint,
#[error("Otel level configuration is missing")]
MissingOtelLevel,
#[error("Otel filter configuration is missing")]
MissingOtelFilter,
}

/// OpenTelemetry configuration
#[derive(Debug, Deserialize, Clone)]
pub struct OtelConfig {
/// Enables trace exporting
pub trace_enable: bool,
/// Enables metrics exporting
pub metrics_enable: bool,
/// Enables logs exporting
pub logs_enable: bool,
/// Enables logs on stdout
pub stdout_enable: bool,
/// Logs exporting config
pub logger: Option<ProviderConfig>,
/// Traces exporting config
pub tracer: Option<ProviderConfig>,
/// Metrics exporting config
pub meter: Option<ProviderConfig>,
}

/// Provider configuration for OpenTelemetry export
#[derive(Debug, Deserialize, Clone)]
pub struct ProviderConfig {
level: LevelFilter,
filter_directives: String,
endpoint: Url,
}

impl Default for ProviderConfig {
fn default() -> Self {
Self {
level: LevelFilter::from_str(DEFAULT_LEVEL).expect("Error parsing default level"),
filter_directives: DEFAULT_FILTER.to_owned(),
endpoint: Url::from_str(DEFAULT_ENDPOINT).expect("Error parsing default endpoint"),
}
}
}

impl ProviderConfig {
fn get_filter(&self) -> String {
format!("{},{}", self.level, self.filter_directives)
}
}

0 comments on commit 7eb6574

Please sign in to comment.