Skip to content

Commit

Permalink
feat: Add OpenTelemetry setup function
Browse files Browse the repository at this point in the history
  • Loading branch information
mzaniolo committed Jan 7, 2025
1 parent 7762eab commit 9f45534
Show file tree
Hide file tree
Showing 3 changed files with 365 additions and 0 deletions.
8 changes: 8 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,18 @@ resolver = "2"
publish = ["famedly"]

[dependencies]
once_cell = "1.20.2"
opentelemetry = "0.27.1"
opentelemetry-appender-tracing = "0.27.0"
opentelemetry-otlp = "0.27.0"
opentelemetry-semantic-conventions = "0.27.0"
opentelemetry_sdk = { version = "0.27.1", features = ["rt-tokio"] }
serde = { version = "1.0.210", features = ["derive"] }
thiserror = "1.0.64"
time = { version = "0.3.36", optional = true }
tracing = "0.1.40"
tracing-opentelemetry = "0.28.0"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
url = { version = "2.5.2", features = ["serde"] }

[dev-dependencies]
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ mod base_url;
pub mod duration;
/// [serde::Deserialize] impl for [tracing::level_filters::LevelFilter]
mod level_filter;
/// Function to setup the telemetry tools
mod telemetry;

pub use base_url::{BaseUrl, BaseUrlParseError};
pub use level_filter::LevelFilter;
pub use telemetry::{init_otel, ExporterConfig, OtelConfig, OtelInitError, ProviderConfig};

/// Generic combinators
pub trait GenericCombinators {
Expand Down
354 changes: 354 additions & 0 deletions src/telemetry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,354 @@
//! OpenTelemetry initialization
//!
//! Lib containing the definitions and initializations of the OpenTelemetry
//! tools
use std::str::FromStr;

use opentelemetry::{
trace::{TraceError, TracerProvider as _},
KeyValue,
};
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_otlp::{LogExporter, SpanExporter, WithExportConfig as _};
use opentelemetry_sdk::{
logs::{LogError, LoggerProvider},
metrics::{MeterProviderBuilder, MetricError, PeriodicReader, SdkMeterProvider},
propagation::TraceContextPropagator,
runtime,
trace::{RandomIdGenerator, TracerProvider},
Resource,
};
use opentelemetry_semantic_conventions::{
resource::{SERVICE_NAME, SERVICE_VERSION},
SCHEMA_URL,
};
use serde::Deserialize;
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
use tracing_subscriber::{
layer::SubscriberExt as _, util::SubscriberInitExt as _, EnvFilter, Layer,
};
use url::Url;

use crate::LevelFilter;

const DEFAULT_FILTER: &str = "opentelemetry=off,tonic=off,h2=off,reqwest=off,axum=info,hyper=info,hyper-tls=info,tokio=info,tower=info,josekit=info,openssl=info";
const DEFAULT_LEVEL: &str = "info";
const DEFAULT_ENDPOINT: &str = "http://localhost:4317";

fn resource(service_name: String, version: String) -> Resource {
Resource::from_schema_url(
[KeyValue::new(SERVICE_NAME, service_name), KeyValue::new(SERVICE_VERSION, version)],
SCHEMA_URL,
)
}

Check warning on line 44 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L39-L44

Added lines #L39 - L44 were not covered by tests

fn init_traces(
endpoint: Url,
service_name: String,
version: String,
) -> Result<TracerProvider, TraceError> {
let exporter = SpanExporter::builder().with_tonic().with_endpoint(endpoint).build()?;
let tracer_provider = TracerProvider::builder()
.with_id_generator(RandomIdGenerator::default())
.with_resource(resource(service_name, version))
.with_batch_exporter(exporter, runtime::Tokio)
.build();

opentelemetry::global::set_tracer_provider(tracer_provider.clone());
Ok(tracer_provider)
}

Check warning on line 60 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L46-L60

Added lines #L46 - L60 were not covered by tests

fn init_metrics(
endpoint: Url,
service_name: String,
version: String,
) -> Result<SdkMeterProvider, MetricError> {
let exporter = opentelemetry_otlp::MetricExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.with_temporality(opentelemetry_sdk::metrics::Temporality::default())
.build()?;

Check warning on line 71 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L62-L71

Added lines #L62 - L71 were not covered by tests

let reader = PeriodicReader::builder(exporter, runtime::Tokio)
// TODO: Should this be configurable or not?
.with_interval(std::time::Duration::from_secs(1))
.build();

let meter_provider = MeterProviderBuilder::default()
.with_resource(resource(service_name, version))
.with_reader(reader)
.build();

Ok(meter_provider)
}

Check warning on line 84 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L73-L84

Added lines #L73 - L84 were not covered by tests

fn init_logs(
endpoint: Url,
service_name: String,
version: String,
) -> Result<LoggerProvider, LogError> {
let exporter = LogExporter::builder().with_tonic().with_endpoint(endpoint).build()?;

Check warning on line 91 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L86-L91

Added lines #L86 - L91 were not covered by tests

Ok(LoggerProvider::builder()
.with_resource(resource(service_name, version))
.with_batch_exporter(exporter, runtime::Tokio)
.build())
}

Check warning on line 97 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L93-L97

Added lines #L93 - L97 were not covered by tests

/// Initializes the OpenTelemetry
pub fn init_otel(config: OtelConfig) -> Result<ProvidersGuard, OtelInitError> {
opentelemetry::global::set_text_map_propagator(TraceContextPropagator::default());

Check warning on line 101 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L100-L101

Added lines #L100 - L101 were not covered by tests

let stdout_layer = if config.stdout_enable() {
let logger_config = config.get_stdout_config();
let filter_fmt = EnvFilter::from_str(&logger_config.get_filter())?;

Check warning on line 105 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L103-L105

Added lines #L103 - L105 were not covered by tests

Some(tracing_subscriber::fmt::layer().with_thread_names(true).with_filter(filter_fmt))

Check warning on line 107 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L107

Added line #L107 was not covered by tests
} else {
None

Check warning on line 109 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L109

Added line #L109 was not covered by tests
};

let (logger_provider, logs_layer) = if config.logs_enable() {
let logger_config = config.get_logs_config()?;
let filter_otel = EnvFilter::from_str(&logger_config.get_filter())?;
let logger_provider =
init_logs(config.get_endpoint(), config.get_service_name(), config.get_version())?;

Check warning on line 116 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L112-L116

Added lines #L112 - L116 were not covered by tests

// Create a new OpenTelemetryTracingBridge using the above LoggerProvider.
let logs_layer = OpenTelemetryTracingBridge::new(&logger_provider);
let logs_layer = logs_layer.with_filter(filter_otel);

(Some(logger_provider), Some(logs_layer))

Check warning on line 122 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L119-L122

Added lines #L119 - L122 were not covered by tests
} else {
(None, None)

Check warning on line 124 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L124

Added line #L124 was not covered by tests
};

let (tracer_provider, tracer_layer) = if config.traces_enable() {
let tracer_config = config.get_traces_config()?;

Check warning on line 128 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L127-L128

Added lines #L127 - L128 were not covered by tests

let trace_filter = EnvFilter::from_str(&tracer_config.get_filter())?;
let tracer_provider =
init_traces(config.get_endpoint(), config.get_service_name(), config.get_version())?;
let tracer = tracer_provider.tracer(config.get_service_name());
let tracer_layer = OpenTelemetryLayer::new(tracer).with_filter(trace_filter);

(Some(tracer_provider), Some(tracer_layer))

Check warning on line 136 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L130-L136

Added lines #L130 - L136 were not covered by tests
} else {
(None, None)

Check warning on line 138 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L138

Added line #L138 was not covered by tests
};

let (meter_provider, meter_layer) = if config.metrics_enable() {
let meter_config = config.get_metrics_config()?;

Check warning on line 142 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L141-L142

Added lines #L141 - L142 were not covered by tests

let metrics_filter = EnvFilter::from_str(&meter_config.get_filter())?;
let meter_provider =
init_metrics(config.get_endpoint(), config.get_service_name(), config.get_version())?;
let meter_layer = MetricsLayer::new(meter_provider.clone()).with_filter(metrics_filter);

(Some(meter_provider), Some(meter_layer))

Check warning on line 149 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L144-L149

Added lines #L144 - L149 were not covered by tests
} else {
(None, None)

Check warning on line 151 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L151

Added line #L151 was not covered by tests
};

// Initialize the tracing subscriber with the OpenTelemetry layer, the
// stdout layer, traces and metrics.
tracing_subscriber::registry()
.with(logs_layer)
.with(stdout_layer)
.with(meter_layer)
.with(tracer_layer)
.init();

Ok(ProvidersGuard { logger_provider, tracer_provider, meter_provider })
}

Check warning on line 164 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L156-L164

Added lines #L156 - L164 were not covered by tests

/// Guarding object to make sure the providers are properly shutdown
#[derive(Debug)]
pub struct ProvidersGuard {
logger_provider: Option<LoggerProvider>,
tracer_provider: Option<TracerProvider>,
meter_provider: Option<SdkMeterProvider>,
}

impl Drop for ProvidersGuard {
#[allow(clippy::print_stderr)]
fn drop(&mut self) {
self.logger_provider.as_ref().inspect(|logger_provider| {
if let Err(err) = logger_provider.shutdown() {
eprintln!("{err:?}");
}
});
self.tracer_provider.as_ref().inspect(|tracer_provider| {
if let Err(err) = tracer_provider.shutdown() {
eprintln!("{err:?}");
}
});
self.meter_provider.as_ref().inspect(|meter_provider| {
if let Err(err) = meter_provider.shutdown() {
eprintln!("{err:?}");
}
});
}

Check warning on line 192 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L176-L192

Added lines #L176 - L192 were not covered by tests
}

/// OpenTelemetry setup errors
#[allow(missing_docs)]
#[derive(Debug, thiserror::Error)]
pub enum OtelInitError {
#[error("Logger initialization error: {0}")]
LoggerInitError(#[from] LogError),
#[error("Tracer initialization error: {0}")]
TracerInitError(#[from] TraceError),
#[error("Meter initialization error: {0}")]
MeterInitError(#[from] MetricError),
#[error("Parsing EnvFilter directives error: {0}")]
EnvFilterError(#[from] tracing_subscriber::filter::ParseError),
#[error("Otel configuration is missing: {0}")]
MissingConfig(#[from] MissingConfigError),
}

/// OpenTelemetry configuration
#[derive(Debug, Deserialize, Clone)]
pub struct OtelConfig {
/// Enables logs on stdout
pub stdout: Option<StdoutLogsConfig>,
/// Configurations for exporting traces, metrics and logs
pub exporter: Option<ExporterConfig>,
}

/// Configuration for exporting OpenTelemetry data
#[derive(Debug, Deserialize, Clone)]
pub struct ExporterConfig {
/// gRPC endpoint for exporting using OTELP
pub endpoint: Option<Url>,
/// Application service name
pub service_name: String,
/// Application version
pub version: String,

/// Logs exporting config
pub logger: Option<ProviderConfig>,
/// Traces exporting config
pub tracer: Option<ProviderConfig>,
/// Metrics exporting config
pub meter: Option<ProviderConfig>,
}

#[derive(Debug, Deserialize, Clone)]
pub struct StdoutLogsConfig {
enable: bool,
level: Option<LevelFilter>,
filter_directives: Option<String>,
}

/// Provider configuration for OpenTelemetry export
#[derive(Debug, Deserialize, Clone)]
#[allow(missing_docs)]
pub struct ProviderConfig {
pub enable: bool,
pub level: Option<LevelFilter>,
pub filter_directives: Option<String>,
}

impl ProviderConfig {
#[allow(clippy::expect_used)]
fn get_filter(&self) -> String {
format!(
"{},{}",
self.level.unwrap_or(
LevelFilter::from_str(DEFAULT_LEVEL).expect("Error parsing default level")
),
self.filter_directives.as_ref().unwrap_or(&DEFAULT_FILTER.to_owned())
)
}

Check warning on line 264 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L256-L264

Added lines #L256 - L264 were not covered by tests
}

impl StdoutLogsConfig {
#[allow(clippy::expect_used)]
fn get_filter(&self) -> String {
format!(
"{},{}",
self.level.unwrap_or(
LevelFilter::from_str(DEFAULT_LEVEL).expect("Error parsing default level")
),
self.filter_directives.as_ref().unwrap_or(&DEFAULT_FILTER.to_owned())
)
}

Check warning on line 277 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L269-L277

Added lines #L269 - L277 were not covered by tests
}

impl Default for StdoutLogsConfig {
fn default() -> Self {
Self { enable: true, level: None, filter_directives: None }
}

Check warning on line 283 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L281-L283

Added lines #L281 - L283 were not covered by tests
}

impl OtelConfig {
fn stdout_enable(&self) -> bool {
self.stdout.as_ref().is_none_or(|config| config.enable)
}
fn traces_enable(&self) -> bool {
self.exporter
.as_ref()
.is_some_and(|config| config.tracer.as_ref().is_some_and(|tracer| tracer.enable))
}
fn metrics_enable(&self) -> bool {
self.exporter
.as_ref()
.is_some_and(|config| config.meter.as_ref().is_some_and(|meter| meter.enable))
}
fn logs_enable(&self) -> bool {
self.exporter
.as_ref()
.is_some_and(|config| config.logger.as_ref().is_some_and(|logger| logger.enable))
}
fn get_traces_config(&self) -> Result<ProviderConfig, MissingConfigError> {
self.exporter
.as_ref()
.and_then(|exporter| exporter.tracer.clone())
.ok_or(MissingConfigError::Traces)
}
fn get_metrics_config(&self) -> Result<ProviderConfig, MissingConfigError> {
self.exporter
.as_ref()
.and_then(|exporter| exporter.meter.clone())
.ok_or(MissingConfigError::Metrics)
}
fn get_logs_config(&self) -> Result<ProviderConfig, MissingConfigError> {
self.exporter
.as_ref()
.and_then(|exporter| exporter.logger.clone())
.ok_or(MissingConfigError::Logs)
}
fn get_stdout_config(&self) -> StdoutLogsConfig {
self.stdout.clone().unwrap_or_default()
}

Check warning on line 325 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L287-L325

Added lines #L287 - L325 were not covered by tests
#[allow(clippy::expect_used)]
fn get_endpoint(&self) -> Url {
self.exporter
.as_ref()
.and_then(|exporter| exporter.clone().endpoint)
.unwrap_or(Url::from_str(DEFAULT_ENDPOINT).expect("Error parsing default endpoint"))
}
fn get_service_name(&self) -> String {
self.exporter
.as_ref()
.map_or(env!("CARGO_PKG_NAME").to_owned(), |exporter| exporter.service_name.clone())
}
fn get_version(&self) -> String {
self.exporter
.as_ref()
.map_or(env!("CARGO_PKG_VERSION").to_owned(), |exporter| exporter.service_name.clone())
}

Check warning on line 342 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L327-L342

Added lines #L327 - L342 were not covered by tests
}

/// Missing configurations errors
#[derive(Debug, thiserror::Error)]
pub enum MissingConfigError {
#[error("Traces export configuration is missing")]
Traces,
#[error("Metrics export configuration is missing")]
Metrics,
#[error("Logs export configuration is missing")]
Logs,
}

0 comments on commit 9f45534

Please sign in to comment.