Skip to content

Commit 0c5e482

Browse files
committed
Implements logging using OpenTelemetry
1 parent 9aa73b5 commit 0c5e482

File tree

10 files changed

+434
-17
lines changed

10 files changed

+434
-17
lines changed

Cargo.lock

Lines changed: 290 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ axum = "0.8.1"
1010
chrono = { version = "0.4.38", features = ["clock"] }
1111
serde = { version = "1.0.188", features = ["derive"] }
1212
sqlx = { version = "0.8.3", features = ["chrono", "postgres", "runtime-tokio"] }
13-
tokio = { version = "1.28.2", features = ["default", "macros", "rt-multi-thread"] } # For async tests
13+
tokio = { version = "1.28.2", features = ["default", "macros", "rt-multi-thread", "signal"] } # For async tests
1414
hmac = "0.12.1"
1515
sha2 = "0.10.8"
1616
hex = "0.4.3"
@@ -24,3 +24,10 @@ tracing = "0.1.41"
2424
tracing-subscriber = { version = "0.3.19", features = ["env-filter", "time", "fmt", "std"] }
2525
dotenv = "0.15.0"
2626
time = { version = "0.3.37", features = ["formatting"] }
27+
opentelemetry = "0.29.1"
28+
opentelemetry_sdk = "0.29.0"
29+
opentelemetry-semantic-conventions = { version = "0.29.0", features = ["semconv_experimental"] }
30+
tracing-core = "0.1.33"
31+
tracing-opentelemetry = "0.30.0"
32+
opentelemetry-otlp = { version = "0.29.0", features = ["grpc-tonic"] }
33+
opentelemetry-stdout = "0.29.0"

src/daily_task/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use tracing::{debug, error, info};
77

88
use crate::models::member::Member;
99

10+
#[tracing::instrument]
1011
pub async fn run_daily_task_at_midnight(pool: Arc<PgPool>) {
1112
loop {
1213
let now = chrono::Utc::now().with_timezone(&Kolkata);
@@ -37,6 +38,7 @@ pub async fn run_daily_task_at_midnight(pool: Arc<PgPool>) {
3738
/// This function does a number of things, including:
3839
/// * Insert new attendance records everyday for [`presense`](https://www.github.com/amfoss/presense) to update them later in the day.
3940
/// * Update the AttendanceSummary table
41+
#[tracing::instrument]
4042
async fn execute_daily_task(pool: Arc<PgPool>) {
4143
// Members is queried outside of each function to avoid repetition
4244
let members = sqlx::query_as::<_, Member>("SELECT * FROM Member")
@@ -50,6 +52,7 @@ async fn execute_daily_task(pool: Arc<PgPool>) {
5052
};
5153
}
5254

55+
#[tracing::instrument]
5356
async fn update_attendance(members: Vec<Member>, pool: &PgPool) {
5457
#[allow(deprecated)]
5558
let today = chrono::Utc::now()
@@ -92,6 +95,7 @@ async fn update_attendance(members: Vec<Member>, pool: &PgPool) {
9295
}
9396
}
9497

98+
#[tracing::instrument]
9599
async fn update_attendance_summary(member_id: i32, pool: &PgPool) {
96100
debug!("Updating summary for member #{}", member_id);
97101
#[allow(deprecated)]
@@ -129,6 +133,7 @@ async fn update_attendance_summary(member_id: i32, pool: &PgPool) {
129133
}
130134
}
131135

136+
#[tracing::instrument]
132137
async fn update_days_attended(member_id: i32, today: NaiveDate, pool: &PgPool) {
133138
// Convert year and month into i32 cause SQLx cannot encode u32 into database types
134139
let month: i32 = (today.month0() + 1) as i32;

src/graphql/mutations/attendance_mutations.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@ use crate::models::attendance::{Attendance, MarkAttendanceInput};
1111

1212
type HmacSha256 = Hmac<Sha256>;
1313

14-
#[derive(Default)]
14+
#[derive(Default, Debug)]
1515
pub struct AttendanceMutations;
1616

1717
#[Object]
1818
impl AttendanceMutations {
19+
#[tracing::instrument(skip(ctx))]
1920
#[graphql(name = "markAttendance")]
2021
async fn mark_attendance(
2122
&self,

src/graphql/queries/attendance_queries.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,17 @@ use async_graphql::{Context, Object, Result};
55
use chrono::NaiveDate;
66
use sqlx::PgPool;
77

8-
#[derive(Default)]
8+
#[derive(Default, Debug)]
99
pub struct AttendanceQueries;
1010

1111
#[Object]
1212
impl AttendanceQueries {
13+
#[tracing::instrument(skip(ctx))]
1314
async fn attendance(&self, ctx: &Context<'_>, member_id: i32) -> Result<Vec<Attendance>> {
1415
let pool = ctx.data::<Arc<PgPool>>().expect("Pool must be in context.");
1516

17+
tracing::info!("Fetching attendance for member ID: {}", member_id);
18+
1619
Ok(
1720
sqlx::query_as::<_, Attendance>("SELECT * FROM Attendance WHERE member_id = $1")
1821
.bind(member_id)

src/graphql/queries/member_queries.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,12 @@ use crate::models::{
99
status_update_streak::StatusUpdateStreakInfo,
1010
};
1111

12-
#[derive(Default)]
12+
#[derive(Default, Debug)]
1313
pub struct MemberQueries;
1414

1515
#[Object]
1616
impl MemberQueries {
17+
#[tracing::instrument(skip(ctx))]
1718
pub async fn members(
1819
&self,
1920
ctx: &Context<'_>,
@@ -45,9 +46,12 @@ impl MemberQueries {
4546

4647
#[ComplexObject]
4748
impl Member {
49+
#[tracing::instrument(skip(ctx))]
4850
async fn attendance(&self, ctx: &Context<'_>) -> Vec<AttendanceInfo> {
4951
let pool = ctx.data::<Arc<PgPool>>().expect("Pool must be in context.");
5052

53+
tracing::info!("Fetching attendance for member ID: {}", self.member_id);
54+
5155
sqlx::query_as::<_, AttendanceInfo>(
5256
"SELECT date, is_present, time_in, time_out FROM Attendance WHERE member_id = $1",
5357
)

src/main.rs

Lines changed: 116 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,18 @@ use tower_http::cors::CorsLayer;
77
use tracing::info;
88
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
99

10+
use opentelemetry::{global, trace::TracerProvider as _, KeyValue};
11+
use opentelemetry_sdk::{
12+
metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider},
13+
trace::{RandomIdGenerator, Sampler, SdkTracerProvider},
14+
Resource,
15+
};
16+
use opentelemetry_semantic_conventions::{
17+
attribute::{DEPLOYMENT_ENVIRONMENT_NAME, SERVICE_NAME, SERVICE_VERSION},
18+
SCHEMA_URL,
19+
};
20+
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
21+
1022
use daily_task::run_daily_task_at_midnight;
1123
use graphql::{Mutation, Query};
1224
use routes::setup_router;
@@ -37,10 +49,27 @@ impl Config {
3749
}
3850
}
3951

52+
struct OtelGuard {
53+
tracer_provider: SdkTracerProvider,
54+
meter_provider: SdkMeterProvider,
55+
}
56+
57+
impl Drop for OtelGuard {
58+
fn drop(&mut self) {
59+
if let Err(err) = self.tracer_provider.shutdown() {
60+
eprintln!("{err:?}");
61+
}
62+
if let Err(err) = self.meter_provider.shutdown() {
63+
eprintln!("{err:?}");
64+
}
65+
}
66+
}
67+
4068
#[tokio::main]
69+
#[tracing::instrument]
4170
async fn main() {
4271
let config = Config::from_env();
43-
setup_tracing(&config.env);
72+
let guard = setup_tracing(&config.env);
4473

4574
let pool = setup_database(&config.database_url).await;
4675
let schema = build_graphql_schema(pool.clone(), config.secret_key);
@@ -56,10 +85,85 @@ async fn main() {
5685
let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", config.port))
5786
.await
5887
.unwrap();
59-
axum::serve(listener, router).await.unwrap();
88+
89+
axum::serve(listener, router)
90+
.with_graceful_shutdown(shutdown_signal())
91+
.await
92+
.unwrap();
93+
94+
drop(guard);
95+
}
96+
97+
#[tracing::instrument]
98+
async fn shutdown_signal() {
99+
// Wait for Ctrl-C
100+
tokio::signal::ctrl_c()
101+
.await
102+
.expect("failed to install Ctrl+C handler");
103+
104+
tracing::info!("Shutdown signal received. Flushing telemetry...");
105+
106+
// Flush traces and metrics
107+
// guard.tracer_provider.shutdown().unwrap();
108+
}
109+
110+
fn resource() -> Resource {
111+
Resource::builder()
112+
.with_attributes(vec![
113+
KeyValue::new(SERVICE_NAME, env!("CARGO_PKG_NAME")),
114+
KeyValue::new(SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
115+
KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, "develop"),
116+
])
117+
.with_schema_url(Vec::new(), SCHEMA_URL)
118+
.build()
119+
}
120+
121+
fn init_meter_provider() -> SdkMeterProvider {
122+
let exporter = opentelemetry_otlp::MetricExporter::builder()
123+
.with_tonic()
124+
.with_temporality(opentelemetry_sdk::metrics::Temporality::default())
125+
.build()
126+
.unwrap();
127+
128+
let reader = PeriodicReader::builder(exporter)
129+
.with_interval(std::time::Duration::from_secs(30))
130+
.build();
131+
132+
let stdout_reader =
133+
PeriodicReader::builder(opentelemetry_stdout::MetricExporter::default()).build();
134+
135+
let meter_provider = MeterProviderBuilder::default()
136+
.with_resource(resource())
137+
.with_reader(reader)
138+
.with_reader(stdout_reader)
139+
.build();
140+
141+
global::set_meter_provider(meter_provider.clone());
142+
143+
meter_provider
144+
}
145+
146+
fn init_tracer_provider() -> SdkTracerProvider {
147+
let exporter = opentelemetry_otlp::SpanExporter::builder()
148+
.with_tonic()
149+
.build()
150+
.unwrap();
151+
152+
SdkTracerProvider::builder()
153+
.with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(
154+
1.0,
155+
))))
156+
.with_id_generator(RandomIdGenerator::default())
157+
.with_resource(resource())
158+
.with_batch_exporter(exporter)
159+
.build()
60160
}
61161

62-
fn setup_tracing(env: &str) {
162+
fn setup_tracing(env: &str) -> OtelGuard {
163+
let tracer_provider = init_tracer_provider();
164+
let meter_provider = init_meter_provider();
165+
let tracer = tracer_provider.tracer("tracing-otel-subscriber");
166+
63167
let kolkata_offset = UtcOffset::from_hms(5, 30, 0).expect("Hardcoded offset must be correct");
64168
let timer = fmt::time::OffsetTime::new(
65169
kolkata_offset,
@@ -75,6 +179,8 @@ fn setup_tracing(env: &str) {
75179
.with_ansi(false) // ANSI encodings are unreadable in the raw file.
76180
.with_writer(std::fs::File::create("root.log").unwrap()),
77181
)
182+
.with(MetricsLayer::new(meter_provider.clone()))
183+
.with(OpenTelemetryLayer::new(tracer))
78184
.with(EnvFilter::new("info"))
79185
.init();
80186
info!("Running in production mode.")
@@ -93,10 +199,17 @@ fn setup_tracing(env: &str) {
93199
.with_ansi(false)
94200
.with_writer(std::fs::File::create("root.log").unwrap()),
95201
)
202+
.with(MetricsLayer::new(meter_provider.clone()))
203+
.with(OpenTelemetryLayer::new(tracer))
96204
.with(EnvFilter::new("trace"))
97205
.init();
98206
info!("Running in development mode.");
99207
}
208+
209+
OtelGuard {
210+
tracer_provider,
211+
meter_provider,
212+
}
100213
}
101214

102215
async fn setup_database(database_url: &str) -> Arc<PgPool> {

src/models/attendance.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ pub struct AttendanceSummaryInfo {
3939
pub days_attended: i32,
4040
}
4141

42-
#[derive(InputObject)]
42+
#[derive(InputObject, Debug)]
4343
pub struct MarkAttendanceInput {
4444
pub member_id: i32,
4545
pub date: NaiveDate,

src/models/member.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use sqlx::FromRow;
44

55
#[derive(Enum, Copy, Clone, Eq, PartialEq, sqlx::Type)]
66
#[sqlx(type_name = "sex_type")]
7+
#[derive(Debug)]
78
pub enum Sex {
89
M,
910
F,
@@ -12,6 +13,7 @@ pub enum Sex {
1213

1314
#[derive(SimpleObject, FromRow)]
1415
#[graphql(complex)]
16+
#[derive(Debug)]
1517
pub struct Member {
1618
pub member_id: i32,
1719
pub roll_no: String,

src/routes.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use tower_http::cors::CorsLayer;
99

1010
use crate::graphql::{Mutation, Query};
1111

12+
#[tracing::instrument(skip(schema, cors))]
1213
pub fn setup_router(
1314
schema: Schema<Query, Mutation, EmptySubscription>,
1415
cors: CorsLayer,

0 commit comments

Comments
 (0)