diff --git a/Cargo.toml b/Cargo.toml index ea9f105..02f6dcf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,10 +7,18 @@ resolver = "2" publish = ["famedly"] [dependencies] +once_cell = "1.20.2" +opentelemetry = "0.27.1" +opentelemetry-appender-tracing = "0.27.0" +opentelemetry-otlp = "0.27.0" +opentelemetry-semantic-conventions = "0.27.0" +opentelemetry_sdk = { version = "0.27.1", features = ["rt-tokio"] } serde = { version = "1.0.210", features = ["derive"] } thiserror = "1.0.64" time = { version = "0.3.36", optional = true } tracing = "0.1.40" +tracing-opentelemetry = "0.28.0" +tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } url = { version = "2.5.2", features = ["serde"] } [dev-dependencies] diff --git a/src/lib.rs b/src/lib.rs index 84a45f9..5ba381f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,9 +5,12 @@ mod base_url; pub mod duration; /// [serde::Deserialize] impl for [tracing::level_filters::LevelFilter] mod level_filter; +/// Function to setup the telemetry tools +mod telemetry; pub use base_url::{BaseUrl, BaseUrlParseError}; pub use level_filter::LevelFilter; +pub use telemetry::{init_otel, ExporterConfig, OtelConfig, OtelInitError, ProviderConfig}; /// Generic combinators pub trait GenericCombinators { diff --git a/src/telemetry.rs b/src/telemetry.rs new file mode 100644 index 0000000..c0bba03 --- /dev/null +++ b/src/telemetry.rs @@ -0,0 +1,354 @@ +//! OpenTelemetry initialization +//! +//! Lib containing the definitions and initializations of the OpenTelemetry +//! tools + +use std::str::FromStr; + +use opentelemetry::{ + trace::{TraceError, TracerProvider as _}, + KeyValue, +}; +use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; +use opentelemetry_otlp::{LogExporter, SpanExporter, WithExportConfig as _}; +use opentelemetry_sdk::{ + logs::{LogError, LoggerProvider}, + metrics::{MeterProviderBuilder, MetricError, PeriodicReader, SdkMeterProvider}, + propagation::TraceContextPropagator, + runtime, + trace::{RandomIdGenerator, TracerProvider}, + Resource, +}; +use opentelemetry_semantic_conventions::{ + resource::{SERVICE_NAME, SERVICE_VERSION}, + SCHEMA_URL, +}; +use serde::Deserialize; +use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer}; +use tracing_subscriber::{ + layer::SubscriberExt as _, util::SubscriberInitExt as _, EnvFilter, Layer, +}; +use url::Url; + +use crate::LevelFilter; + +const DEFAULT_FILTER: &str = "opentelemetry=off,tonic=off,h2=off,reqwest=off,axum=info,hyper=info,hyper-tls=info,tokio=info,tower=info,josekit=info,openssl=info"; +const DEFAULT_LEVEL: &str = "info"; +const DEFAULT_ENDPOINT: &str = "http://localhost:4317"; + +fn resource(service_name: String, version: String) -> Resource { + Resource::from_schema_url( + [KeyValue::new(SERVICE_NAME, service_name), KeyValue::new(SERVICE_VERSION, version)], + SCHEMA_URL, + ) +} + +fn init_traces( + endpoint: Url, + service_name: String, + version: String, +) -> Result { + let exporter = SpanExporter::builder().with_tonic().with_endpoint(endpoint).build()?; + let tracer_provider = TracerProvider::builder() + .with_id_generator(RandomIdGenerator::default()) + .with_resource(resource(service_name, version)) + .with_batch_exporter(exporter, runtime::Tokio) + .build(); + + opentelemetry::global::set_tracer_provider(tracer_provider.clone()); + Ok(tracer_provider) +} + +fn init_metrics( + endpoint: Url, + service_name: String, + version: String, +) -> Result { + let exporter = opentelemetry_otlp::MetricExporter::builder() + .with_tonic() + .with_endpoint(endpoint) + .with_temporality(opentelemetry_sdk::metrics::Temporality::default()) + .build()?; + + let reader = PeriodicReader::builder(exporter, runtime::Tokio) + // TODO: Should this be configurable or not? + .with_interval(std::time::Duration::from_secs(1)) + .build(); + + let meter_provider = MeterProviderBuilder::default() + .with_resource(resource(service_name, version)) + .with_reader(reader) + .build(); + + Ok(meter_provider) +} + +fn init_logs( + endpoint: Url, + service_name: String, + version: String, +) -> Result { + let exporter = LogExporter::builder().with_tonic().with_endpoint(endpoint).build()?; + + Ok(LoggerProvider::builder() + .with_resource(resource(service_name, version)) + .with_batch_exporter(exporter, runtime::Tokio) + .build()) +} + +/// Initializes the OpenTelemetry +pub fn init_otel(config: OtelConfig) -> Result { + opentelemetry::global::set_text_map_propagator(TraceContextPropagator::default()); + + let stdout_layer = if config.stdout_enable() { + let logger_config = config.get_stdout_config(); + let filter_fmt = EnvFilter::from_str(&logger_config.get_filter())?; + + Some(tracing_subscriber::fmt::layer().with_thread_names(true).with_filter(filter_fmt)) + } else { + None + }; + + let (logger_provider, logs_layer) = if config.logs_enable() { + let logger_config = config.get_logs_config()?; + let filter_otel = EnvFilter::from_str(&logger_config.get_filter())?; + let logger_provider = + init_logs(config.get_endpoint(), config.get_service_name(), config.get_version())?; + + // Create a new OpenTelemetryTracingBridge using the above LoggerProvider. + let logs_layer = OpenTelemetryTracingBridge::new(&logger_provider); + let logs_layer = logs_layer.with_filter(filter_otel); + + (Some(logger_provider), Some(logs_layer)) + } else { + (None, None) + }; + + let (tracer_provider, tracer_layer) = if config.traces_enable() { + let tracer_config = config.get_traces_config()?; + + let trace_filter = EnvFilter::from_str(&tracer_config.get_filter())?; + let tracer_provider = + init_traces(config.get_endpoint(), config.get_service_name(), config.get_version())?; + let tracer = tracer_provider.tracer(config.get_service_name()); + let tracer_layer = OpenTelemetryLayer::new(tracer).with_filter(trace_filter); + + (Some(tracer_provider), Some(tracer_layer)) + } else { + (None, None) + }; + + let (meter_provider, meter_layer) = if config.metrics_enable() { + let meter_config = config.get_metrics_config()?; + + let metrics_filter = EnvFilter::from_str(&meter_config.get_filter())?; + let meter_provider = + init_metrics(config.get_endpoint(), config.get_service_name(), config.get_version())?; + let meter_layer = MetricsLayer::new(meter_provider.clone()).with_filter(metrics_filter); + + (Some(meter_provider), Some(meter_layer)) + } else { + (None, None) + }; + + // Initialize the tracing subscriber with the OpenTelemetry layer, the + // stdout layer, traces and metrics. + tracing_subscriber::registry() + .with(logs_layer) + .with(stdout_layer) + .with(meter_layer) + .with(tracer_layer) + .init(); + + Ok(ProvidersGuard { logger_provider, tracer_provider, meter_provider }) +} + +/// Guarding object to make sure the providers are properly shutdown +#[derive(Debug)] +pub struct ProvidersGuard { + logger_provider: Option, + tracer_provider: Option, + meter_provider: Option, +} + +impl Drop for ProvidersGuard { + #[allow(clippy::print_stderr)] + fn drop(&mut self) { + self.logger_provider.as_ref().inspect(|logger_provider| { + if let Err(err) = logger_provider.shutdown() { + eprintln!("{err:?}"); + } + }); + self.tracer_provider.as_ref().inspect(|tracer_provider| { + if let Err(err) = tracer_provider.shutdown() { + eprintln!("{err:?}"); + } + }); + self.meter_provider.as_ref().inspect(|meter_provider| { + if let Err(err) = meter_provider.shutdown() { + eprintln!("{err:?}"); + } + }); + } +} + +/// OpenTelemetry setup errors +#[allow(missing_docs)] +#[derive(Debug, thiserror::Error)] +pub enum OtelInitError { + #[error("Logger initialization error: {0}")] + LoggerInitError(#[from] LogError), + #[error("Tracer initialization error: {0}")] + TracerInitError(#[from] TraceError), + #[error("Meter initialization error: {0}")] + MeterInitError(#[from] MetricError), + #[error("Parsing EnvFilter directives error: {0}")] + EnvFilterError(#[from] tracing_subscriber::filter::ParseError), + #[error("Otel configuration is missing: {0}")] + MissingConfig(#[from] MissingConfigError), +} + +/// OpenTelemetry configuration +#[derive(Debug, Deserialize, Clone)] +pub struct OtelConfig { + /// Enables logs on stdout + pub stdout: Option, + /// Configurations for exporting traces, metrics and logs + pub exporter: Option, +} + +/// Configuration for exporting OpenTelemetry data +#[derive(Debug, Deserialize, Clone)] +pub struct ExporterConfig { + /// gRPC endpoint for exporting using OTELP + pub endpoint: Option, + /// Application service name + pub service_name: String, + /// Application version + pub version: String, + + /// Logs exporting config + pub logger: Option, + /// Traces exporting config + pub tracer: Option, + /// Metrics exporting config + pub meter: Option, +} + +#[derive(Debug, Deserialize, Clone)] +pub struct StdoutLogsConfig { + enable: bool, + level: Option, + filter_directives: Option, +} + +/// Provider configuration for OpenTelemetry export +#[derive(Debug, Deserialize, Clone)] +#[allow(missing_docs)] +pub struct ProviderConfig { + pub enable: bool, + pub level: Option, + pub filter_directives: Option, +} + +impl ProviderConfig { + #[allow(clippy::expect_used)] + fn get_filter(&self) -> String { + format!( + "{},{}", + self.level.unwrap_or( + LevelFilter::from_str(DEFAULT_LEVEL).expect("Error parsing default level") + ), + self.filter_directives.as_ref().unwrap_or(&DEFAULT_FILTER.to_owned()) + ) + } +} + +impl StdoutLogsConfig { + #[allow(clippy::expect_used)] + fn get_filter(&self) -> String { + format!( + "{},{}", + self.level.unwrap_or( + LevelFilter::from_str(DEFAULT_LEVEL).expect("Error parsing default level") + ), + self.filter_directives.as_ref().unwrap_or(&DEFAULT_FILTER.to_owned()) + ) + } +} + +impl Default for StdoutLogsConfig { + fn default() -> Self { + Self { enable: true, level: None, filter_directives: None } + } +} + +impl OtelConfig { + fn stdout_enable(&self) -> bool { + self.stdout.as_ref().is_none_or(|config| config.enable) + } + fn traces_enable(&self) -> bool { + self.exporter + .as_ref() + .is_some_and(|config| config.tracer.as_ref().is_some_and(|tracer| tracer.enable)) + } + fn metrics_enable(&self) -> bool { + self.exporter + .as_ref() + .is_some_and(|config| config.meter.as_ref().is_some_and(|meter| meter.enable)) + } + fn logs_enable(&self) -> bool { + self.exporter + .as_ref() + .is_some_and(|config| config.logger.as_ref().is_some_and(|logger| logger.enable)) + } + fn get_traces_config(&self) -> Result { + self.exporter + .as_ref() + .and_then(|exporter| exporter.tracer.clone()) + .ok_or(MissingConfigError::Traces) + } + fn get_metrics_config(&self) -> Result { + self.exporter + .as_ref() + .and_then(|exporter| exporter.meter.clone()) + .ok_or(MissingConfigError::Metrics) + } + fn get_logs_config(&self) -> Result { + self.exporter + .as_ref() + .and_then(|exporter| exporter.logger.clone()) + .ok_or(MissingConfigError::Logs) + } + fn get_stdout_config(&self) -> StdoutLogsConfig { + self.stdout.clone().unwrap_or_default() + } + #[allow(clippy::expect_used)] + fn get_endpoint(&self) -> Url { + self.exporter + .as_ref() + .and_then(|exporter| exporter.clone().endpoint) + .unwrap_or(Url::from_str(DEFAULT_ENDPOINT).expect("Error parsing default endpoint")) + } + fn get_service_name(&self) -> String { + self.exporter + .as_ref() + .map_or(env!("CARGO_PKG_NAME").to_owned(), |exporter| exporter.service_name.clone()) + } + fn get_version(&self) -> String { + self.exporter + .as_ref() + .map_or(env!("CARGO_PKG_VERSION").to_owned(), |exporter| exporter.service_name.clone()) + } +} + +/// Missing configurations errors +#[derive(Debug, thiserror::Error)] +pub enum MissingConfigError { + #[error("Traces export configuration is missing")] + Traces, + #[error("Metrics export configuration is missing")] + Metrics, + #[error("Logs export configuration is missing")] + Logs, +}