Skip to content

Commit 749a16f

Browse files
fix: date level stats (#1312)
read all stream.json files for a dataset get sum of stats for a given date from the manifest list in snapshot
1 parent b4858fd commit 749a16f

File tree

4 files changed

+31
-50
lines changed

4 files changed

+31
-50
lines changed

src/handlers/http/cluster/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ pub async fn sync_role_update_with_ingestors(
393393
.await
394394
}
395395

396-
pub fn fetch_daily_stats_from_ingestors(
396+
pub fn fetch_daily_stats(
397397
date: &str,
398398
stream_meta_list: &[ObjectStoreFormat],
399399
) -> Result<Stats, StreamError> {

src/handlers/http/logstream.rs

+3
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,8 @@ pub mod error {
524524
HotTierValidation(#[from] HotTierValidationError),
525525
#[error("{0}")]
526526
HotTierError(#[from] HotTierError),
527+
#[error("Invalid query parameter: {0}")]
528+
InvalidQueryParameter(String),
527529
}
528530

529531
impl actix_web::ResponseError for StreamError {
@@ -559,6 +561,7 @@ pub mod error {
559561
StreamError::HotTierNotEnabled(_) => StatusCode::FORBIDDEN,
560562
StreamError::HotTierValidation(_) => StatusCode::BAD_REQUEST,
561563
StreamError::HotTierError(_) => StatusCode::INTERNAL_SERVER_ERROR,
564+
StreamError::InvalidQueryParameter(_) => StatusCode::BAD_REQUEST,
562565
}
563566
}
564567

src/handlers/http/modal/query/querier_logstream.rs

+19-32
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818

1919
use core::str;
20-
use std::fs;
20+
use std::{collections::HashMap, fs};
2121

2222
use actix_web::{
2323
web::{self, Path},
@@ -36,18 +36,18 @@ use crate::{
3636
handlers::http::{
3737
base_path_without_preceding_slash,
3838
cluster::{
39-
self, fetch_daily_stats_from_ingestors, fetch_stats_from_ingestors,
40-
sync_streams_with_ingestors,
39+
self, fetch_daily_stats, fetch_stats_from_ingestors, sync_streams_with_ingestors,
4140
utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats},
4241
},
43-
logstream::{error::StreamError, get_stats_date},
42+
logstream::error::StreamError,
4443
modal::{NodeMetadata, NodeType},
4544
},
4645
hottier::HotTierManager,
4746
parseable::{StreamNotFound, PARSEABLE},
48-
stats::{self, Stats},
47+
stats,
4948
storage::{ObjectStoreFormat, StreamType, STREAM_ROOT_DIRECTORY},
5049
};
50+
const STATS_DATE_QUERY_PARAM: &str = "date";
5151

5252
pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
5353
let stream_name = stream_name.into_inner();
@@ -142,34 +142,27 @@ pub async fn get_stats(
142142
return Err(StreamNotFound(stream_name.clone()).into());
143143
}
144144

145-
let query_string = req.query_string();
146-
if !query_string.is_empty() {
147-
let date_key = query_string.split('=').collect::<Vec<&str>>()[0];
148-
let date_value = query_string.split('=').collect::<Vec<&str>>()[1];
149-
if date_key != "date" {
150-
return Err(StreamError::Custom {
151-
msg: "Invalid query parameter".to_string(),
152-
status: StatusCode::BAD_REQUEST,
153-
});
154-
}
145+
let query_map = web::Query::<HashMap<String, String>>::from_query(req.query_string())
146+
.map_err(|_| StreamError::InvalidQueryParameter(STATS_DATE_QUERY_PARAM.to_string()))?;
155147

156-
if !date_value.is_empty() {
157-
let querier_stats = get_stats_date(&stream_name, date_value).await?;
148+
if !query_map.is_empty() {
149+
let date_value = query_map.get(STATS_DATE_QUERY_PARAM).ok_or_else(|| {
150+
StreamError::InvalidQueryParameter(STATS_DATE_QUERY_PARAM.to_string())
151+
})?;
158152

153+
if !date_value.is_empty() {
159154
// this function requires all the ingestor stream jsons
160155
let path = RelativePathBuf::from_iter([&stream_name, STREAM_ROOT_DIRECTORY]);
161156
let obs = PARSEABLE
162157
.storage
163158
.get_object_store()
164159
.get_objects(
165160
Some(&path),
166-
Box::new(|file_name| {
167-
file_name.starts_with(".ingestor") && file_name.ends_with("stream.json")
168-
}),
161+
Box::new(|file_name| file_name.ends_with("stream.json")),
169162
)
170163
.await?;
171164

172-
let mut ingestor_stream_jsons = Vec::new();
165+
let mut stream_jsons = Vec::new();
173166
for ob in obs {
174167
let stream_metadata: ObjectStoreFormat = match serde_json::from_slice(&ob) {
175168
Ok(d) => d,
@@ -178,20 +171,14 @@ pub async fn get_stats(
178171
continue;
179172
}
180173
};
181-
ingestor_stream_jsons.push(stream_metadata);
174+
stream_jsons.push(stream_metadata);
182175
}
183176

184-
let ingestor_stats =
185-
fetch_daily_stats_from_ingestors(date_value, &ingestor_stream_jsons)?;
177+
let stats = fetch_daily_stats(date_value, &stream_jsons)?;
186178

187-
let total_stats = Stats {
188-
events: querier_stats.events + ingestor_stats.events,
189-
ingestion: querier_stats.ingestion + ingestor_stats.ingestion,
190-
storage: querier_stats.storage + ingestor_stats.storage,
191-
};
192-
let stats = serde_json::to_value(total_stats)?;
179+
let stats = serde_json::to_value(stats)?;
193180

194-
return Ok((web::Json(stats), StatusCode::OK));
181+
return Ok(web::Json(stats));
195182
}
196183
}
197184

@@ -238,5 +225,5 @@ pub async fn get_stats(
238225

239226
let stats = serde_json::to_value(stats)?;
240227

241-
Ok((web::Json(stats), StatusCode::OK))
228+
Ok(web::Json(stats))
242229
}

src/prism/home/mod.rs

+8-17
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,7 @@ use crate::{
3030
alerts::{get_alerts_info, AlertError, AlertsInfo, ALERTS},
3131
correlation::{CorrelationError, CORRELATIONS},
3232
event::format::LogSource,
33-
handlers::http::{
34-
cluster::fetch_daily_stats_from_ingestors,
35-
logstream::{error::StreamError, get_stats_date},
36-
},
33+
handlers::http::{cluster::fetch_daily_stats, logstream::error::StreamError},
3734
parseable::PARSEABLE,
3835
rbac::{map::SessionKey, role::Action, Users},
3936
storage::{ObjectStorageError, ObjectStoreFormat, STREAM_ROOT_DIRECTORY},
@@ -221,9 +218,9 @@ async fn stats_for_date(
221218
};
222219

223220
// Process each stream concurrently
224-
let stream_stats_futures = stream_wise_meta.iter().map(|(stream, meta)| {
225-
get_stream_stats_for_date(stream.clone(), date.clone(), meta.clone())
226-
});
221+
let stream_stats_futures = stream_wise_meta
222+
.values()
223+
.map(|meta| get_stream_stats_for_date(date.clone(), meta));
227224

228225
let stream_stats_results = futures::future::join_all(stream_stats_futures).await;
229226

@@ -246,18 +243,12 @@ async fn stats_for_date(
246243
}
247244

248245
async fn get_stream_stats_for_date(
249-
stream: String,
250246
date: String,
251-
meta: Vec<ObjectStoreFormat>,
247+
meta: &[ObjectStoreFormat],
252248
) -> Result<(u64, u64, u64), PrismHomeError> {
253-
let querier_stats = get_stats_date(&stream, &date).await?;
254-
let ingestor_stats = fetch_daily_stats_from_ingestors(&date, &meta)?;
255-
256-
Ok((
257-
querier_stats.events + ingestor_stats.events,
258-
querier_stats.ingestion + ingestor_stats.ingestion,
259-
querier_stats.storage + ingestor_stats.storage,
260-
))
249+
let stats = fetch_daily_stats(&date, meta)?;
250+
251+
Ok((stats.events, stats.ingestion, stats.storage))
261252
}
262253

263254
pub async fn generate_home_search_response(

0 commit comments

Comments
 (0)