Skip to content

Commit

Permalink
feat: migrate to opentelemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
maschad committed Feb 11, 2025
1 parent d2f3d42 commit 57d546a
Show file tree
Hide file tree
Showing 8 changed files with 359 additions and 140 deletions.
93 changes: 93 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

52 changes: 9 additions & 43 deletions atoma-bin/atoma_daemon.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{path::Path, str::FromStr, sync::Arc};
use std::{str::FromStr, sync::Arc};

use anyhow::{Context, Result};
use atoma_daemon::{
Expand All @@ -16,21 +16,8 @@ use tokio::{
try_join,
};
use tracing::info;
use tracing_appender::{
non_blocking,
rolling::{RollingFileAppender, Rotation},
};
use tracing_subscriber::{
fmt::{self, format::FmtSpan, time::UtcTime},
layer::SubscriberExt,
util::SubscriberInitExt,
EnvFilter, Registry,
};

/// The directory where the logs are stored.
const LOGS: &str = "./logs";
/// The log file name.
const LOG_FILE: &str = "atoma-daemon-service.log";
mod telemetry;

#[derive(Parser)]
struct DaemonArgs {
Expand All @@ -41,7 +28,10 @@ struct DaemonArgs {
#[tokio::main]
#[allow(clippy::redundant_pub_crate)]
async fn main() -> Result<()> {
setup_logging();
// Store both guards to keep logging active for the duration of the program
let (_file_guard, _stdout_guard) =
telemetry::setup_logging().context("Failed to setup logging")?;

let args = DaemonArgs::parse();
let daemon_config = AtomaDaemonConfig::from_file_path(args.config_path.clone());
let state_manager_config = AtomaStateManagerConfig::from_file_path(args.config_path.clone());
Expand Down Expand Up @@ -106,32 +96,8 @@ async fn main() -> Result<()> {

let (daemon_result, _) = try_join!(daemon_handle, ctrl_c)?;

daemon_result
}
// Before the program exits, ensure all spans are exported
telemetry::shutdown();

fn setup_logging() {
let log_dir = Path::new(LOGS);
let file_appender = RollingFileAppender::new(Rotation::DAILY, log_dir, LOG_FILE);
let (non_blocking_appender, _guard) = non_blocking(file_appender);
let file_layer = fmt::layer()
.json()
.with_timer(UtcTime::rfc_3339())
.with_current_span(true)
.with_writer(non_blocking_appender);

let console_layer = fmt::layer()
.pretty()
.with_target(true)
.with_line_number(true)
.with_file(true)
.with_span_events(FmtSpan::FULL);

let env_filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("info,atoma_daemon=debug"));

Registry::default()
.with(env_filter)
.with(console_layer)
.with(file_layer)
.init();
daemon_result
}
95 changes: 7 additions & 88 deletions atoma-bin/atoma_node.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
use std::{
path::{Path, PathBuf},
str::FromStr,
sync::Arc,
};
use std::{path::PathBuf, str::FromStr, sync::Arc};

use anyhow::{Context, Result};
use atoma_confidential::AtomaConfidentialCompute;
Expand All @@ -24,25 +20,11 @@ use tokio::{
try_join,
};
use tracing::{error, info, instrument, warn};
use tracing_appender::{
non_blocking,
non_blocking::WorkerGuard,
rolling::{RollingFileAppender, Rotation},
};
use tracing_subscriber::{
fmt::{self, format::FmtSpan, time::UtcTime},
prelude::*,
EnvFilter, Registry,
};

mod telemetry;

/// The name of the environment variable for the Hugging Face token
const HF_TOKEN: &str = "HF_TOKEN";
/// The directory where the logs are stored.
const LOGS: &str = "./logs";
/// The log file name for the node service.
const NODE_LOG_FILE: &str = "atoma-node.log";
/// The log file name for the daemon service.
const DAEMON_LOG_FILE: &str = "atoma-daemon.log";

/// Command line arguments for the Atoma node
#[derive(Parser)]
Expand Down Expand Up @@ -171,7 +153,7 @@ async fn initialize_tokenizers(
#[allow(clippy::too_many_lines)]
#[allow(clippy::redundant_pub_crate)]
async fn main() -> Result<()> {
let _log_guards = setup_logging(LOGS).context("Failed to setup logging")?;
let _log_guards = telemetry::setup_logging().context("Failed to setup logging")?;

dotenvy::dotenv().ok();

Expand Down Expand Up @@ -445,73 +427,10 @@ async fn main() -> Result<()> {
event = "atoma_node_service_shutdown",
"Atoma node service shut down successfully"
);
Ok(())
}

/// Configure logging with JSON formatting, file output, and console output
fn setup_logging<P: AsRef<Path>>(log_dir: P) -> Result<(WorkerGuard, WorkerGuard)> {
// Create logs directory if it doesn't exist
std::fs::create_dir_all(&log_dir).context("Failed to create logs directory")?;

// Set up file appenders with rotation for both services
let node_appender = RollingFileAppender::new(Rotation::DAILY, log_dir.as_ref(), NODE_LOG_FILE);
let daemon_appender =
RollingFileAppender::new(Rotation::DAILY, log_dir.as_ref(), DAEMON_LOG_FILE);

// Create non-blocking writers and keep the guards
let (node_non_blocking, node_guard) = non_blocking(node_appender);
let (daemon_non_blocking, daemon_guard) = non_blocking(daemon_appender);

// Create JSON formatter for node service
let node_layer = fmt::layer()
.json()
.with_timer(UtcTime::rfc_3339())
.with_thread_ids(true)
.with_thread_names(true)
.with_target(true)
.with_line_number(true)
.with_file(true)
.with_current_span(true)
.with_span_list(true)
.with_writer(node_non_blocking)
.with_filter(EnvFilter::new("atoma_node=info"));

// Create JSON formatter for daemon service
let daemon_layer = fmt::layer()
.json()
.with_timer(UtcTime::rfc_3339())
.with_thread_ids(true)
.with_thread_names(true)
.with_target(true)
.with_line_number(true)
.with_file(true)
.with_current_span(true)
.with_span_list(true)
.with_writer(daemon_non_blocking)
.with_filter(EnvFilter::new("atoma_daemon=info"));

// Create console formatter for development
let console_layer = fmt::layer()
.pretty()
.with_target(true)
.with_thread_ids(true)
.with_line_number(true)
.with_file(true)
.with_span_events(FmtSpan::ENTER);

// Create filter from environment variable or default to info
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));

// Combine layers with filter
Registry::default()
.with(env_filter)
.with(console_layer)
.with(node_layer)
.with(daemon_layer)
.init();

// Return the guards so they can be stored in main
Ok((node_guard, daemon_guard))
telemetry::shutdown();

Ok(())
}

/// Handles the results of various tasks (subscriber, state manager, and server).
Expand Down
Loading

0 comments on commit 57d546a

Please sign in to comment.