Skip to content

Commit

Permalink
feat: Add OpenTelemetry setup function
Browse files Browse the repository at this point in the history
  • Loading branch information
mzaniolo committed Jan 8, 2025
1 parent 7762eab commit 4c1d1df
Show file tree
Hide file tree
Showing 5 changed files with 417 additions and 0 deletions.
14 changes: 14 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,24 @@ resolver = "2"
publish = ["famedly"]

[dependencies]
async-trait = "0.1.85"
http = "1.2.0"
once_cell = "1.20.2"
opentelemetry = "0.27.1"
opentelemetry-appender-tracing = "0.27.0"
opentelemetry-http = "0.27.0"
opentelemetry-otlp = "0.27.0"
opentelemetry-semantic-conventions = "0.27.0"
opentelemetry_sdk = { version = "0.27.1", features = ["rt-tokio"] }
reqwest = "0.12.12"
reqwest-middleware = "0.4.0"
serde = { version = "1.0.210", features = ["derive"] }
thiserror = "1.0.64"
time = { version = "0.3.36", optional = true }
tokio = { version = "1.42.0", features = ["full"] }
tracing = "0.1.40"
tracing-opentelemetry = "0.28.0"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
url = { version = "2.5.2", features = ["serde"] }

[dev-dependencies]
Expand Down
6 changes: 6 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@ mod base_url;
pub mod duration;
/// [serde::Deserialize] impl for [tracing::level_filters::LevelFilter]
mod level_filter;
/// Function to setup the telemetry tools
mod telemetry;

pub use base_url::{BaseUrl, BaseUrlParseError};
pub use level_filter::LevelFilter;
pub use telemetry::{
config::{ExporterConfig, OtelConfig, ProviderConfig},
init_otel, OtelInitError, OtelMiddleware,
};

/// Generic combinators
pub trait GenericCombinators {
Expand Down
207 changes: 207 additions & 0 deletions src/telemetry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
//! OpenTelemetry initialization
//!
//! Lib containing the definitions and initializations of the OpenTelemetry
//! tools
use std::str::FromStr as _;

use config::{MissingConfigError, OtelConfig};
use opentelemetry::{
trace::{TraceError, TracerProvider as _},
KeyValue,
};
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_otlp::{LogExporter, SpanExporter, WithExportConfig as _};
use opentelemetry_sdk::{
logs::{LogError, LoggerProvider},
metrics::{MeterProviderBuilder, MetricError, PeriodicReader, SdkMeterProvider},
propagation::TraceContextPropagator,
runtime,
trace::{RandomIdGenerator, TracerProvider},
Resource,
};
use opentelemetry_semantic_conventions::{
resource::{SERVICE_NAME, SERVICE_VERSION},
SCHEMA_URL,
};
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
use tracing_subscriber::{
layer::SubscriberExt as _, util::SubscriberInitExt as _, EnvFilter, Layer,
};
use url::Url;

pub mod config;
mod reqwest_middleware;
pub use reqwest_middleware::OtelMiddleware;

fn resource(service_name: String, version: String) -> Resource {
Resource::from_schema_url(
[KeyValue::new(SERVICE_NAME, service_name), KeyValue::new(SERVICE_VERSION, version)],
SCHEMA_URL,
)
}

Check warning on line 41 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L36-L41

Added lines #L36 - L41 were not covered by tests

fn init_traces(
endpoint: Url,
service_name: String,
version: String,
) -> Result<TracerProvider, TraceError> {
let exporter = SpanExporter::builder().with_tonic().with_endpoint(endpoint).build()?;
let tracer_provider = TracerProvider::builder()
.with_id_generator(RandomIdGenerator::default())
.with_resource(resource(service_name, version))
// .with_simple_exporter(exporter)
.with_batch_exporter(exporter, runtime::Tokio)
.build();

opentelemetry::global::set_tracer_provider(tracer_provider.clone());
Ok(tracer_provider)
}

Check warning on line 58 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L43-L58

Added lines #L43 - L58 were not covered by tests

fn init_metrics(
endpoint: Url,
service_name: String,
version: String,
) -> Result<SdkMeterProvider, MetricError> {
let exporter = opentelemetry_otlp::MetricExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.with_temporality(opentelemetry_sdk::metrics::Temporality::default())
.build()?;

Check warning on line 69 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L60-L69

Added lines #L60 - L69 were not covered by tests

let reader = PeriodicReader::builder(exporter, runtime::Tokio)
// TODO: Should this be configurable or not?
.with_interval(std::time::Duration::from_secs(1))
.build();

let meter_provider = MeterProviderBuilder::default()
.with_resource(resource(service_name, version))
.with_reader(reader)
.build();

Ok(meter_provider)
}

Check warning on line 82 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L71-L82

Added lines #L71 - L82 were not covered by tests

fn init_logs(
endpoint: Url,
service_name: String,
version: String,
) -> Result<LoggerProvider, LogError> {
let exporter = LogExporter::builder().with_tonic().with_endpoint(endpoint).build()?;

Check warning on line 89 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L84-L89

Added lines #L84 - L89 were not covered by tests

Ok(LoggerProvider::builder()
.with_resource(resource(service_name, version))
.with_batch_exporter(exporter, runtime::Tokio)
.build())
}

Check warning on line 95 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L91-L95

Added lines #L91 - L95 were not covered by tests

/// Initializes the OpenTelemetry
pub fn init_otel(config: OtelConfig) -> Result<ProvidersGuard, OtelInitError> {
opentelemetry::global::set_text_map_propagator(TraceContextPropagator::default());

Check warning on line 99 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L98-L99

Added lines #L98 - L99 were not covered by tests

let stdout_layer = if config.stdout_enable() {
let logger_config = config.get_stdout_config();
let filter_fmt = EnvFilter::from_str(&logger_config.get_filter())?;

Check warning on line 103 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L101-L103

Added lines #L101 - L103 were not covered by tests

Some(tracing_subscriber::fmt::layer().with_thread_names(true).with_filter(filter_fmt))

Check warning on line 105 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L105

Added line #L105 was not covered by tests
} else {
None

Check warning on line 107 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L107

Added line #L107 was not covered by tests
};

let (logger_provider, logs_layer) = if config.logs_enable() {
let logger_config = config.get_logs_config()?;
let filter_otel = EnvFilter::from_str(&logger_config.get_filter())?;
let logger_provider =
init_logs(config.get_endpoint(), config.get_service_name(), config.get_version())?;

Check warning on line 114 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L110-L114

Added lines #L110 - L114 were not covered by tests

// Create a new OpenTelemetryTracingBridge using the above LoggerProvider.
let logs_layer = OpenTelemetryTracingBridge::new(&logger_provider);
let logs_layer = logs_layer.with_filter(filter_otel);

(Some(logger_provider), Some(logs_layer))

Check warning on line 120 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L117-L120

Added lines #L117 - L120 were not covered by tests
} else {
(None, None)

Check warning on line 122 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L122

Added line #L122 was not covered by tests
};

let (tracer_provider, tracer_layer) = if config.traces_enable() {
let tracer_config = config.get_traces_config()?;

Check warning on line 126 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L125-L126

Added lines #L125 - L126 were not covered by tests

let trace_filter = EnvFilter::from_str(&tracer_config.get_filter())?;
let tracer_provider =
init_traces(config.get_endpoint(), config.get_service_name(), config.get_version())?;
let tracer = tracer_provider.tracer(config.get_service_name());
let tracer_layer = OpenTelemetryLayer::new(tracer).with_filter(trace_filter);

(Some(tracer_provider), Some(tracer_layer))

Check warning on line 134 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L128-L134

Added lines #L128 - L134 were not covered by tests
} else {
(None, None)

Check warning on line 136 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L136

Added line #L136 was not covered by tests
};

let (meter_provider, meter_layer) = if config.metrics_enable() {
let meter_config = config.get_metrics_config()?;

Check warning on line 140 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L139-L140

Added lines #L139 - L140 were not covered by tests

let metrics_filter = EnvFilter::from_str(&meter_config.get_filter())?;
let meter_provider =
init_metrics(config.get_endpoint(), config.get_service_name(), config.get_version())?;
let meter_layer = MetricsLayer::new(meter_provider.clone()).with_filter(metrics_filter);

(Some(meter_provider), Some(meter_layer))

Check warning on line 147 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L142-L147

Added lines #L142 - L147 were not covered by tests
} else {
(None, None)

Check warning on line 149 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L149

Added line #L149 was not covered by tests
};

// Initialize the tracing subscriber with the OpenTelemetry layer, the
// stdout layer, traces and metrics.
tracing_subscriber::registry()
.with(logs_layer)
.with(stdout_layer)
.with(meter_layer)
.with(tracer_layer)
.init();

Ok(ProvidersGuard { logger_provider, tracer_provider, meter_provider })
}

Check warning on line 162 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L154-L162

Added lines #L154 - L162 were not covered by tests

/// Guarding object to make sure the providers are properly shutdown
#[derive(Debug)]
pub struct ProvidersGuard {
logger_provider: Option<LoggerProvider>,
tracer_provider: Option<TracerProvider>,
meter_provider: Option<SdkMeterProvider>,
}

impl Drop for ProvidersGuard {
#[allow(clippy::print_stderr)]
fn drop(&mut self) {
self.logger_provider.as_ref().inspect(|logger_provider| {
if let Err(err) = logger_provider.shutdown() {
eprintln!("{err:?}");
}
});
self.tracer_provider.as_ref().inspect(|tracer_provider| {
if let Err(err) = tracer_provider.shutdown() {
eprintln!("{err:?}");
}
});
self.meter_provider.as_ref().inspect(|meter_provider| {
if let Err(err) = meter_provider.shutdown() {
eprintln!("{err:?}");
}
});
}

Check warning on line 190 in src/telemetry.rs

View check run for this annotation

Codecov / codecov/patch

src/telemetry.rs#L174-L190

Added lines #L174 - L190 were not covered by tests
}

/// OpenTelemetry setup errors
#[allow(missing_docs)]
#[derive(Debug, thiserror::Error)]
pub enum OtelInitError {
#[error("Logger initialization error: {0}")]
LoggerInitError(#[from] LogError),
#[error("Tracer initialization error: {0}")]
TracerInitError(#[from] TraceError),
#[error("Meter initialization error: {0}")]
MeterInitError(#[from] MetricError),
#[error("Parsing EnvFilter directives error: {0}")]
EnvFilterError(#[from] tracing_subscriber::filter::ParseError),
#[error("Otel configuration is missing: {0}")]
MissingConfig(#[from] MissingConfigError),
}
Loading

0 comments on commit 4c1d1df

Please sign in to comment.