Skip to content

Commit a279822

Browse files
fix for stats for each date (#805)
update stats in manifest list in snapshot for each date load daily stats on server start update GET /stats endpoint to accept query param if given /stats/date={date in yyyy-mm-dd} format, date level stats is returned
1 parent ad39f57 commit a279822

File tree

11 files changed

+205
-186
lines changed

11 files changed

+205
-186
lines changed

server/src/catalog.rs

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ use std::{io::ErrorKind, sync::Arc};
2020

2121
use self::{column::Column, snapshot::ManifestItem};
2222
use crate::handlers::http::base_path_without_preceding_slash;
23-
use crate::metrics::{EVENTS_INGESTED_SIZE_TODAY, EVENTS_INGESTED_TODAY, STORAGE_SIZE_TODAY};
23+
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
2424
use crate::option::CONFIG;
25-
use crate::stats::{event_labels, storage_size_labels, update_deleted_stats};
25+
use crate::stats::{event_labels_date, storage_size_labels_date, update_deleted_stats};
2626
use crate::{
2727
catalog::manifest::Manifest,
2828
event::DEFAULT_TIMESTAMP_KEY,
@@ -102,22 +102,6 @@ pub async fn update_snapshot(
102102
stream_name: &str,
103103
change: manifest::File,
104104
) -> Result<(), ObjectStorageError> {
105-
// get current snapshot
106-
let event_labels = event_labels(stream_name, "json");
107-
let storage_size_labels = storage_size_labels(stream_name);
108-
let events_ingested = EVENTS_INGESTED_TODAY
109-
.get_metric_with_label_values(&event_labels)
110-
.unwrap()
111-
.get() as u64;
112-
let ingestion_size = EVENTS_INGESTED_SIZE_TODAY
113-
.get_metric_with_label_values(&event_labels)
114-
.unwrap()
115-
.get() as u64;
116-
let storage_size = STORAGE_SIZE_TODAY
117-
.get_metric_with_label_values(&storage_size_labels)
118-
.unwrap()
119-
.get() as u64;
120-
121105
let mut meta = storage.get_object_store_format(stream_name).await?;
122106
let meta_clone = meta.clone();
123107
let manifests = &mut meta.snapshot.manifest_list;
@@ -132,6 +116,21 @@ pub async fn update_snapshot(
132116
lower_bound
133117
}
134118
};
119+
let date = lower_bound.date_naive().format("%Y-%m-%d").to_string();
120+
let event_labels = event_labels_date(stream_name, "json", &date);
121+
let storage_size_labels = storage_size_labels_date(stream_name, &date);
122+
let events_ingested = EVENTS_INGESTED_DATE
123+
.get_metric_with_label_values(&event_labels)
124+
.unwrap()
125+
.get() as u64;
126+
let ingestion_size = EVENTS_INGESTED_SIZE_DATE
127+
.get_metric_with_label_values(&event_labels)
128+
.unwrap()
129+
.get() as u64;
130+
let storage_size = EVENTS_STORAGE_SIZE_DATE
131+
.get_metric_with_label_values(&storage_size_labels)
132+
.unwrap()
133+
.get() as u64;
135134
let pos = manifests.iter().position(|item| {
136135
item.time_lower_bound <= lower_bound && lower_bound < item.time_upper_bound
137136
});
@@ -149,6 +148,25 @@ pub async fn update_snapshot(
149148
for m in manifests.iter_mut() {
150149
let p = manifest_path("").to_string();
151150
if m.manifest_path.contains(&p) {
151+
let date = m
152+
.time_lower_bound
153+
.date_naive()
154+
.format("%Y-%m-%d")
155+
.to_string();
156+
let event_labels = event_labels_date(stream_name, "json", &date);
157+
let storage_size_labels = storage_size_labels_date(stream_name, &date);
158+
let events_ingested = EVENTS_INGESTED_DATE
159+
.get_metric_with_label_values(&event_labels)
160+
.unwrap()
161+
.get() as u64;
162+
let ingestion_size = EVENTS_INGESTED_SIZE_DATE
163+
.get_metric_with_label_values(&event_labels)
164+
.unwrap()
165+
.get() as u64;
166+
let storage_size = EVENTS_STORAGE_SIZE_DATE
167+
.get_metric_with_label_values(&storage_size_labels)
168+
.unwrap()
169+
.get() as u64;
152170
ch = true;
153171
m.events_ingested = events_ingested;
154172
m.ingestion_size = ingestion_size;

server/src/event.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ impl Event {
8282
self.origin_format,
8383
self.origin_size,
8484
num_rows,
85+
self.parsed_timestamp,
8586
)?;
8687

8788
crate::livetail::LIVETAIL.process(&self.stream_name, &self.rb);

server/src/handlers/http/logstream.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@ use crate::handlers::{
2525
CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY,
2626
};
2727
use crate::metadata::STREAM_INFO;
28+
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
2829
use crate::option::{Mode, CONFIG};
2930
use crate::static_schema::{convert_static_schema_to_arrow_schema, StaticSchema};
31+
use crate::stats::{event_labels_date, storage_size_labels_date, Stats};
3032
use crate::storage::{retention::Retention, LogStream, StorageDir, StreamInfo};
3133
use crate::{
3234
catalog::{self, remove_manifest_from_snapshot},
@@ -482,6 +484,29 @@ pub async fn put_enable_cache(
482484
StatusCode::OK,
483485
))
484486
}
487+
pub async fn get_stats_date(stream_name: &str, date: &str) -> Result<Stats, StreamError> {
488+
let event_labels = event_labels_date(stream_name, "json", date);
489+
let storage_size_labels = storage_size_labels_date(stream_name, date);
490+
let events_ingested = EVENTS_INGESTED_DATE
491+
.get_metric_with_label_values(&event_labels)
492+
.unwrap()
493+
.get() as u64;
494+
let ingestion_size = EVENTS_INGESTED_SIZE_DATE
495+
.get_metric_with_label_values(&event_labels)
496+
.unwrap()
497+
.get() as u64;
498+
let storage_size = EVENTS_STORAGE_SIZE_DATE
499+
.get_metric_with_label_values(&storage_size_labels)
500+
.unwrap()
501+
.get() as u64;
502+
503+
let stats = Stats {
504+
events: events_ingested,
505+
ingestion: ingestion_size,
506+
storage: storage_size,
507+
};
508+
Ok(stats)
509+
}
485510

486511
pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError> {
487512
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
@@ -490,6 +515,25 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
490515
return Err(StreamError::StreamNotFound(stream_name));
491516
}
492517

518+
let query_string = req.query_string();
519+
if !query_string.is_empty() {
520+
let date_key = query_string.split('=').collect::<Vec<&str>>()[0];
521+
let date_value = query_string.split('=').collect::<Vec<&str>>()[1];
522+
if date_key != "date" {
523+
return Err(StreamError::Custom {
524+
msg: "Invalid query parameter".to_string(),
525+
status: StatusCode::BAD_REQUEST,
526+
});
527+
}
528+
529+
if !date_value.is_empty() {
530+
let stats = get_stats_date(&stream_name, date_value).await?;
531+
let stats = serde_json::to_value(stats)?;
532+
533+
return Ok((web::Json(stats), StatusCode::OK));
534+
}
535+
}
536+
493537
let stats = stats::get_current_stats(&stream_name, "json")
494538
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;
495539

server/src/handlers/http/modal/ingest_server.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,6 @@ impl IngestServer {
339339
}
340340

341341
metrics::fetch_stats_from_storage().await;
342-
metrics::reset_daily_metric_from_global();
343342

344343
let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync();
345344
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =

server/src/handlers/http/modal/query_server.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,6 @@ impl QueryServer {
184184

185185
// load data from stats back to prometheus metrics
186186
metrics::fetch_stats_from_storage().await;
187-
metrics::reset_daily_metric_from_global();
188187
// track all parquet files already in the data directory
189188
storage::retention::load_retention_from_global();
190189

server/src/handlers/http/modal/server.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -508,7 +508,6 @@ impl Server {
508508
DASHBOARDS.load().await?;
509509

510510
metrics::fetch_stats_from_storage().await;
511-
metrics::reset_daily_metric_from_global();
512511
storage::retention::load_retention_from_global();
513512

514513
let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync();

server/src/metadata.rs

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
use arrow_array::RecordBatch;
2020
use arrow_schema::{Field, Fields, Schema};
21-
use chrono::Local;
21+
use chrono::{Local, NaiveDateTime};
2222
use itertools::Itertools;
2323
use once_cell::sync::Lazy;
2424
use std::collections::HashMap;
@@ -27,10 +27,10 @@ use std::sync::{Arc, RwLock};
2727
use self::error::stream_info::{CheckAlertError, LoadError, MetadataError};
2828
use crate::alerts::Alerts;
2929
use crate::metrics::{
30-
EVENTS_INGESTED, EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_TODAY, EVENTS_INGESTED_TODAY,
31-
LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE,
30+
EVENTS_INGESTED, EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_DATE,
31+
EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE,
3232
};
33-
use crate::storage::{LogStream, ObjectStorage, StorageDir};
33+
use crate::storage::{LogStream, ObjectStorage, ObjectStoreFormat, StorageDir};
3434
use crate::utils::arrow::MergedRecordReader;
3535
use derive_more::{Deref, DerefMut};
3636

@@ -244,7 +244,8 @@ impl StreamInfo {
244244
let alerts = storage.get_alerts(&stream.name).await?;
245245
let schema = storage.get_schema_on_server_start(&stream.name).await?;
246246
let meta = storage.get_stream_metadata(&stream.name).await?;
247-
247+
let meta_clone = meta.clone();
248+
let stream_name = stream.name.clone();
248249
let schema = update_schema_from_staging(&stream.name, schema);
249250
let schema = HashMap::from_iter(
250251
schema
@@ -268,10 +269,30 @@ impl StreamInfo {
268269
let mut map = self.write().expect(LOCK_EXPECT);
269270

270271
map.insert(stream.name, metadata);
272+
Self::load_daily_metrics(meta_clone, &stream_name);
271273

272274
Ok(())
273275
}
274276

277+
fn load_daily_metrics(meta: ObjectStoreFormat, stream_name: &str) {
278+
let manifests = meta.snapshot.manifest_list;
279+
for manifest in manifests {
280+
let manifest_date = manifest.time_lower_bound.date_naive().to_string();
281+
let events_ingested = manifest.events_ingested;
282+
let ingestion_size = manifest.ingestion_size;
283+
let storage_size = manifest.storage_size;
284+
EVENTS_INGESTED_DATE
285+
.with_label_values(&[stream_name, "json", &manifest_date])
286+
.set(events_ingested as i64);
287+
EVENTS_INGESTED_SIZE_DATE
288+
.with_label_values(&[stream_name, "json", &manifest_date])
289+
.set(ingestion_size as i64);
290+
EVENTS_STORAGE_SIZE_DATE
291+
.with_label_values(&["data", stream_name, "parquet", &manifest_date])
292+
.set(storage_size as i64);
293+
}
294+
}
295+
275296
pub fn list_streams(&self) -> Vec<String> {
276297
self.read()
277298
.expect(LOCK_EXPECT)
@@ -286,18 +307,20 @@ impl StreamInfo {
286307
origin: &'static str,
287308
size: u64,
288309
num_rows: u64,
310+
parsed_timestamp: NaiveDateTime,
289311
) -> Result<(), MetadataError> {
312+
let parsed_date = parsed_timestamp.date().to_string();
290313
EVENTS_INGESTED
291314
.with_label_values(&[stream_name, origin])
292315
.add(num_rows as i64);
293-
EVENTS_INGESTED_TODAY
294-
.with_label_values(&[stream_name, origin])
316+
EVENTS_INGESTED_DATE
317+
.with_label_values(&[stream_name, origin, parsed_date.as_str()])
295318
.add(num_rows as i64);
296319
EVENTS_INGESTED_SIZE
297320
.with_label_values(&[stream_name, origin])
298321
.add(size as i64);
299-
EVENTS_INGESTED_SIZE_TODAY
300-
.with_label_values(&[stream_name, origin])
322+
EVENTS_INGESTED_SIZE_DATE
323+
.with_label_values(&[stream_name, origin, parsed_date.as_str()])
301324
.add(size as i64);
302325
LIFETIME_EVENTS_INGESTED
303326
.with_label_values(&[stream_name, origin])

0 commit comments

Comments
 (0)