Skip to content

Commit b32aa56

Browse files
fix: parallelise datasets API (#1276)
* fix: parallelise datasets API process each stream in parallel process distinct counts in parallel * refactor on coderabbit suggestions
1 parent b8b0b18 commit b32aa56

File tree

1 file changed

+129
-65
lines changed

1 file changed

+129
-65
lines changed

src/prism/logstream/mod.rs

+129-65
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use chrono::Utc;
2424
use http::StatusCode;
2525
use serde::{Deserialize, Serialize};
2626
use serde_json::{json, Value};
27-
use tracing::{debug, error, warn};
27+
use tracing::warn;
2828

2929
use crate::{
3030
handlers::http::{
@@ -247,79 +247,143 @@ impl PrismDatasetRequest {
247247
mut self,
248248
key: SessionKey,
249249
) -> Result<Vec<PrismDatasetResponse>, PrismLogstreamError> {
250-
let is_empty = self.streams.is_empty();
251-
if is_empty {
250+
if self.streams.is_empty() {
252251
self.streams = PARSEABLE.streams.list();
253252
}
254253

255-
let mut responses = vec![];
256-
for stream in self.streams.iter() {
257-
if Users.authorize(key.clone(), Action::ListStream, Some(stream), None)
258-
!= crate::rbac::Response::Authorized
259-
{
260-
// Don't warn if listed from Parseable
261-
if !is_empty {
262-
warn!("Unauthorized access requested for stream: {stream}");
254+
// Process streams concurrently
255+
let results = futures::future::join_all(
256+
self.streams
257+
.iter()
258+
.map(|stream| self.process_stream(stream.clone(), key.clone())),
259+
)
260+
.await;
261+
262+
// Collect successful responses and handle errors
263+
let mut responses = Vec::new();
264+
for result in results {
265+
match result {
266+
Ok(Some(response)) => responses.push(response),
267+
Ok(None) => {
268+
warn!("Stream not found or unauthorized access");
269+
continue;
270+
}
271+
Err(err) => {
272+
warn!("error: {err}");
273+
continue;
263274
}
264-
continue;
265275
}
276+
}
266277

267-
if PARSEABLE.check_or_load_stream(stream).await {
268-
debug!("Stream not found: {stream}");
269-
continue;
270-
}
278+
Ok(responses)
279+
}
271280

272-
let PrismLogstreamInfo {
273-
info,
274-
schema,
275-
stats,
276-
retention,
277-
} = get_prism_logstream_info(stream).await?;
278-
279-
let hottier = match HotTierManager::global() {
280-
Some(manager) => match manager.get_hot_tier(stream).await {
281-
Ok(stats) => Some(stats),
282-
Err(HotTierError::HotTierValidationError(
283-
HotTierValidationError::NotFound(_),
284-
)) => None,
285-
Err(err) => return Err(err.into()),
286-
},
287-
_ => None,
288-
};
289-
let records = CountsRequest {
290-
stream: stream.clone(),
291-
start_time: "1h".to_owned(),
292-
end_time: "now".to_owned(),
293-
num_bins: 10,
294-
}
295-
.get_bin_density()
296-
.await?;
297-
let counts = CountsResponse {
298-
fields: vec!["start_time".into(), "end_time".into(), "count".into()],
299-
records,
300-
};
301-
302-
// Retrieve distinct values for source identifiers
303-
// Returns None if fields aren't present or if query fails
304-
let ips = self.get_distinct_entries(stream, "p_src_ip").await.ok();
305-
let user_agents = self.get_distinct_entries(stream, "p_user_agent").await.ok();
306-
307-
responses.push(PrismDatasetResponse {
308-
stream: stream.clone(),
309-
info,
310-
schema,
311-
stats,
312-
retention,
313-
hottier,
314-
counts,
315-
distinct_sources: json!({
316-
"ips": ips,
317-
"user_agents": user_agents
318-
}),
319-
})
281+
async fn process_stream(
282+
&self,
283+
stream: String,
284+
key: SessionKey,
285+
) -> Result<Option<PrismDatasetResponse>, PrismLogstreamError> {
286+
// Skip unauthorized streams
287+
if !self.is_authorized(&stream, &key) {
288+
return Ok(None);
320289
}
321290

322-
Ok(responses)
291+
// Skip streams that don't exist
292+
if !self.stream_exists(&stream).await {
293+
return Ok(None);
294+
}
295+
296+
// Process stream data
297+
match get_prism_logstream_info(&stream).await {
298+
Ok(info) => Ok(Some(self.build_dataset_response(stream, info).await?)),
299+
Err(err) => Err(err),
300+
}
301+
}
302+
303+
fn is_authorized(&self, stream: &str, key: &SessionKey) -> bool {
304+
if Users.authorize(key.clone(), Action::ListStream, Some(stream), None)
305+
!= crate::rbac::Response::Authorized
306+
{
307+
warn!("Unauthorized access requested for stream: {stream}");
308+
false
309+
} else {
310+
true
311+
}
312+
}
313+
314+
async fn stream_exists(&self, stream: &str) -> bool {
315+
if PARSEABLE.check_or_load_stream(stream).await {
316+
warn!("Stream not found: {stream}");
317+
false
318+
} else {
319+
true
320+
}
321+
}
322+
323+
async fn build_dataset_response(
324+
&self,
325+
stream: String,
326+
info: PrismLogstreamInfo,
327+
) -> Result<PrismDatasetResponse, PrismLogstreamError> {
328+
// Get hot tier info
329+
let hottier = self.get_hot_tier_info(&stream).await?;
330+
331+
// Get counts
332+
let counts = self.get_counts(&stream).await?;
333+
334+
// Get distinct entries concurrently
335+
let (ips_result, user_agents_result) = futures::join!(
336+
self.get_distinct_entries(&stream, "p_src_ip"),
337+
self.get_distinct_entries(&stream, "p_user_agent")
338+
);
339+
340+
let ips = ips_result.ok();
341+
let user_agents = user_agents_result.ok();
342+
343+
Ok(PrismDatasetResponse {
344+
stream,
345+
info: info.info,
346+
schema: info.schema,
347+
stats: info.stats,
348+
retention: info.retention,
349+
hottier,
350+
counts,
351+
distinct_sources: json!({
352+
"ips": ips,
353+
"user_agents": user_agents
354+
}),
355+
})
356+
}
357+
358+
async fn get_hot_tier_info(
359+
&self,
360+
stream: &str,
361+
) -> Result<Option<StreamHotTier>, PrismLogstreamError> {
362+
match HotTierManager::global() {
363+
Some(manager) => match manager.get_hot_tier(stream).await {
364+
Ok(stats) => Ok(Some(stats)),
365+
Err(HotTierError::HotTierValidationError(HotTierValidationError::NotFound(_))) => {
366+
Ok(None)
367+
}
368+
Err(err) => Err(err.into()),
369+
},
370+
None => Ok(None),
371+
}
372+
}
373+
374+
async fn get_counts(&self, stream: &str) -> Result<CountsResponse, PrismLogstreamError> {
375+
let count_request = CountsRequest {
376+
stream: stream.to_owned(),
377+
start_time: "1h".to_owned(),
378+
end_time: "now".to_owned(),
379+
num_bins: 10,
380+
};
381+
382+
let records = count_request.get_bin_density().await?;
383+
Ok(CountsResponse {
384+
fields: vec!["start_time".into(), "end_time".into(), "count".into()],
385+
records,
386+
})
323387
}
324388

325389
/// Retrieves distinct values for a specific field in a stream.

0 commit comments

Comments
 (0)