diff --git a/Cargo.toml b/Cargo.toml index ea9f105..02f6dcf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,10 +7,18 @@ resolver = "2" publish = ["famedly"] [dependencies] +once_cell = "1.20.2" +opentelemetry = "0.27.1" +opentelemetry-appender-tracing = "0.27.0" +opentelemetry-otlp = "0.27.0" +opentelemetry-semantic-conventions = "0.27.0" +opentelemetry_sdk = { version = "0.27.1", features = ["rt-tokio"] } serde = { version = "1.0.210", features = ["derive"] } thiserror = "1.0.64" time = { version = "0.3.36", optional = true } tracing = "0.1.40" +tracing-opentelemetry = "0.28.0" +tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } url = { version = "2.5.2", features = ["serde"] } [dev-dependencies] diff --git a/src/lib.rs b/src/lib.rs index 84a45f9..67cc436 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,9 +5,12 @@ mod base_url; pub mod duration; /// [serde::Deserialize] impl for [tracing::level_filters::LevelFilter] mod level_filter; +/// Function to setup the telemetry tools +mod telemetry; pub use base_url::{BaseUrl, BaseUrlParseError}; pub use level_filter::LevelFilter; +pub use telemetry::{init_otel, OtelConfig, OtelInitError, ProviderConfig}; /// Generic combinators pub trait GenericCombinators { diff --git a/src/telemetry.rs b/src/telemetry.rs new file mode 100644 index 0000000..9f80536 --- /dev/null +++ b/src/telemetry.rs @@ -0,0 +1,243 @@ +//! OpenTelemetry initialization +//! +//! Lib containing the definitions and initializations of the OpenTelemetry +//! tools + +use std::str::FromStr; + +use opentelemetry::{ + trace::{TraceError, TracerProvider as _}, + KeyValue, +}; +use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; +use opentelemetry_otlp::{LogExporter, SpanExporter, WithExportConfig as _}; +use opentelemetry_sdk::{ + logs::{LogError, LoggerProvider}, + metrics::{MeterProviderBuilder, MetricError, PeriodicReader, SdkMeterProvider}, + propagation::TraceContextPropagator, + runtime, + trace::{RandomIdGenerator, TracerProvider}, + Resource, +}; +use opentelemetry_semantic_conventions::{ + resource::{SERVICE_NAME, SERVICE_VERSION}, + SCHEMA_URL, +}; +use serde::Deserialize; +use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer}; +use tracing_subscriber::{ + layer::SubscriberExt as _, util::SubscriberInitExt as _, EnvFilter, Layer, +}; +use url::Url; + +use crate::LevelFilter; + +const DEFAULT_FILTER: &str = "opentelemetry=off,tonic=off,h2=off,reqwest=off,axum=info,hyper=info,hyper-tls=info,tokio=info,tower=info,josekit=info,openssl=info"; +const DEFAULT_LEVEL: &str = "info"; +const DEFAULT_ENDPOINT: &str = "http://localhost:4317"; + +fn resource() -> Resource { + Resource::from_schema_url( + [ + KeyValue::new(SERVICE_NAME, env!("CARGO_PKG_NAME")), + KeyValue::new(SERVICE_VERSION, env!("CARGO_PKG_VERSION")), + ], + SCHEMA_URL, + ) +} + +fn init_traces(endpoint: Url) -> Result { + let exporter = SpanExporter::builder().with_tonic().with_endpoint(endpoint).build()?; + let tracer_provider = TracerProvider::builder() + .with_id_generator(RandomIdGenerator::default()) + .with_resource(resource()) + .with_batch_exporter(exporter, runtime::Tokio) + .build(); + + opentelemetry::global::set_tracer_provider(tracer_provider.clone()); + Ok(tracer_provider) +} + +fn init_metrics(endpoint: Url) -> Result { + let exporter = opentelemetry_otlp::MetricExporter::builder() + .with_tonic() + .with_endpoint(endpoint) + .with_temporality(opentelemetry_sdk::metrics::Temporality::default()) + .build()?; + + let reader = PeriodicReader::builder(exporter, runtime::Tokio) + // TODO: Should this be configurable or not? + .with_interval(std::time::Duration::from_secs(1)) + .build(); + + let meter_provider = + MeterProviderBuilder::default().with_resource(resource()).with_reader(reader).build(); + + Ok(meter_provider) +} + +fn init_logs(endpoint: Url) -> Result { + let exporter = LogExporter::builder().with_tonic().with_endpoint(endpoint).build()?; + + Ok(LoggerProvider::builder() + .with_resource(resource()) + .with_batch_exporter(exporter, runtime::Tokio) + .build()) +} + +/// Initializes the OpenTelemetry +pub fn init_otel(config: OtelConfig) -> Result { + opentelemetry::global::set_text_map_propagator(TraceContextPropagator::default()); + + let stdout_layer = if config.stdout_enable { + let logger_config = config.logger.clone().unwrap_or_default(); + let filter_fmt = EnvFilter::from_str(&logger_config.get_filter())?; + + Some(tracing_subscriber::fmt::layer().with_thread_names(true).with_filter(filter_fmt)) + } else { + None + }; + + let (logger_provider, logs_layer) = if config.logs_enable { + let logger_config = config.logger.unwrap_or_default(); + let filter_otel = EnvFilter::from_str(&logger_config.get_filter())?; + let logger_provider = init_logs(logger_config.endpoint)?; + + // Create a new OpenTelemetryTracingBridge using the above LoggerProvider. + let logs_layer = OpenTelemetryTracingBridge::new(&logger_provider); + let logs_layer = logs_layer.with_filter(filter_otel); + + (Some(logger_provider), Some(logs_layer)) + } else { + (None, None) + }; + + let (tracer_provider, tracer_layer) = if config.trace_enable { + let tracer_config = config.tracer.unwrap_or_default(); + + let trace_filter = EnvFilter::from_str(&tracer_config.get_filter())?; + let tracer_provider = init_traces(tracer_config.endpoint)?; + let tracer = tracer_provider.tracer(env!("CARGO_PKG_NAME")); + let tracer_layer = OpenTelemetryLayer::new(tracer).with_filter(trace_filter); + + (Some(tracer_provider), Some(tracer_layer)) + } else { + (None, None) + }; + + let (meter_provider, meter_layer) = if config.metrics_enable { + let meter_config = config.meter.unwrap_or_default(); + + let metrics_filter = EnvFilter::from_str(&meter_config.get_filter())?; + let meter_provider = init_metrics(meter_config.endpoint)?; + let meter_layer = MetricsLayer::new(meter_provider.clone()).with_filter(metrics_filter); + + (Some(meter_provider), Some(meter_layer)) + } else { + (None, None) + }; + + // Initialize the tracing subscriber with the OpenTelemetry layer, the + // stdout layer, traces and metrics. + tracing_subscriber::registry() + .with(logs_layer) + .with(stdout_layer) + .with(meter_layer) + .with(tracer_layer) + .init(); + + Ok(ProvidersGuard { logger_provider, tracer_provider, meter_provider }) +} + +/// Guarding object to make sure the providers are properly shutdown +#[derive(Debug)] +pub struct ProvidersGuard { + logger_provider: Option, + tracer_provider: Option, + meter_provider: Option, +} + +impl Drop for ProvidersGuard { + #[allow(clippy::print_stderr)] + fn drop(&mut self) { + self.logger_provider.as_ref().inspect(|logger_provider| { + if let Err(err) = logger_provider.shutdown() { + eprintln!("{err:?}"); + } + }); + self.tracer_provider.as_ref().inspect(|tracer_provider| { + if let Err(err) = tracer_provider.shutdown() { + eprintln!("{err:?}"); + } + }); + self.meter_provider.as_ref().inspect(|meter_provider| { + if let Err(err) = meter_provider.shutdown() { + eprintln!("{err:?}"); + } + }); + } +} + +/// OpenTelemetry setup errors +#[allow(missing_docs)] +#[derive(Debug, thiserror::Error)] +pub enum OtelInitError { + #[error("Logger initialization error: {0}")] + LoggerInitError(#[from] LogError), + #[error("Tracer initialization error: {0}")] + TracerInitError(#[from] TraceError), + #[error("Meter initialization error: {0}")] + MeterInitError(#[from] MetricError), + #[error("Parsing EnvFilter directives error: {0}")] + EnvFilterError(#[from] tracing_subscriber::filter::ParseError), + #[error("Otel endpoint configuration is missing")] + MissingOtelEndpoint, + #[error("Otel level configuration is missing")] + MissingOtelLevel, + #[error("Otel filter configuration is missing")] + MissingOtelFilter, +} + +/// OpenTelemetry configuration +#[derive(Debug, Deserialize, Clone)] +pub struct OtelConfig { + /// Enables trace exporting + pub trace_enable: bool, + /// Enables metrics exporting + pub metrics_enable: bool, + /// Enables logs exporting + pub logs_enable: bool, + /// Enables logs on stdout + pub stdout_enable: bool, + /// Logs exporting config + pub logger: Option, + /// Traces exporting config + pub tracer: Option, + /// Metrics exporting config + pub meter: Option, +} + +/// Provider configuration for OpenTelemetry export +#[derive(Debug, Deserialize, Clone)] +pub struct ProviderConfig { + level: LevelFilter, + filter_directives: String, + endpoint: Url, +} + +impl Default for ProviderConfig { + #[allow(clippy::expect_used)] + fn default() -> Self { + Self { + level: LevelFilter::from_str(DEFAULT_LEVEL).expect("Error parsing default level"), + filter_directives: DEFAULT_FILTER.to_owned(), + endpoint: Url::from_str(DEFAULT_ENDPOINT).expect("Error parsing default endpoint"), + } + } +} + +impl ProviderConfig { + fn get_filter(&self) -> String { + format!("{},{}", self.level, self.filter_directives) + } +}