Skip to content

Commit 59ba6a8

Browse files
parmesantde-sh
andauthored
Feat: Add Prism API (#1188)
- init commit for Prism GET /home - refactored code for clippy - modified the base path for prism - added logstream/info API - added rbac, modified listing of streams - updates to prism endpoints - /logstream/{logstream}/info added stats and retention to the response body - GET /users - GET /users/{username} - added GET /roles endpoint Co-authored-by: Devdutt Shenoi <[email protected]> Signed-off-by: parmesant <[email protected]>
1 parent afef093 commit 59ba6a8

File tree

21 files changed

+889
-116
lines changed

21 files changed

+889
-116
lines changed

src/alerts/mod.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use datafusion::common::tree_node::TreeNode;
2424
use http::StatusCode;
2525
use itertools::Itertools;
2626
use once_cell::sync::Lazy;
27+
use serde::Serialize;
2728
use serde_json::Error as SerdeError;
2829
use std::collections::{HashMap, HashSet};
2930
use std::fmt::{self, Display};
@@ -873,3 +874,52 @@ impl Alerts {
873874
Ok(())
874875
}
875876
}
877+
878+
#[derive(Debug, Serialize)]
879+
pub struct AlertsInfo {
880+
total: u64,
881+
silenced: u64,
882+
resolved: u64,
883+
triggered: u64,
884+
low: u64,
885+
medium: u64,
886+
high: u64,
887+
}
888+
889+
// TODO: add RBAC
890+
pub async fn get_alerts_info() -> Result<AlertsInfo, AlertError> {
891+
let alerts = ALERTS.alerts.read().await;
892+
let mut total = 0;
893+
let mut silenced = 0;
894+
let mut resolved = 0;
895+
let mut triggered = 0;
896+
let mut low = 0;
897+
let mut medium = 0;
898+
let mut high = 0;
899+
900+
for (_, alert) in alerts.iter() {
901+
total += 1;
902+
match alert.state {
903+
AlertState::Silenced => silenced += 1,
904+
AlertState::Resolved => resolved += 1,
905+
AlertState::Triggered => triggered += 1,
906+
}
907+
908+
match alert.severity {
909+
Severity::Low => low += 1,
910+
Severity::Medium => medium += 1,
911+
Severity::High => high += 1,
912+
_ => {}
913+
}
914+
}
915+
916+
Ok(AlertsInfo {
917+
total,
918+
silenced,
919+
resolved,
920+
triggered,
921+
low,
922+
medium,
923+
high,
924+
})
925+
}

src/handlers/http/logstream.rs

Lines changed: 19 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818

1919
use self::error::StreamError;
20-
use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats};
20+
use super::cluster::utils::{IngestionStats, QueriedStats, StorageStats};
2121
use super::query::update_schema_when_distributed;
2222
use crate::event::format::override_data_type;
2323
use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
@@ -257,64 +257,26 @@ pub async fn get_stats(
257257
let stats = stats::get_current_stats(&stream_name, "json")
258258
.ok_or_else(|| StreamNotFound(stream_name.clone()))?;
259259

260-
let ingestor_stats: Option<Vec<QueriedStats>> = None;
261-
262-
let hash_map = PARSEABLE.streams.read().expect("Readable");
263-
let stream_meta = &hash_map
264-
.get(&stream_name)
265-
.ok_or_else(|| StreamNotFound(stream_name.clone()))?
266-
.metadata
267-
.read()
268-
.expect(LOCK_EXPECT);
269-
270260
let time = Utc::now();
271261

272-
let stats = match &stream_meta.first_event_at {
273-
Some(_) => {
274-
let ingestion_stats = IngestionStats::new(
275-
stats.current_stats.events,
276-
format!("{} {}", stats.current_stats.ingestion, "Bytes"),
277-
stats.lifetime_stats.events,
278-
format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"),
279-
stats.deleted_stats.events,
280-
format!("{} {}", stats.deleted_stats.ingestion, "Bytes"),
281-
"json",
282-
);
283-
let storage_stats = StorageStats::new(
284-
format!("{} {}", stats.current_stats.storage, "Bytes"),
285-
format!("{} {}", stats.lifetime_stats.storage, "Bytes"),
286-
format!("{} {}", stats.deleted_stats.storage, "Bytes"),
287-
"parquet",
288-
);
289-
290-
QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats)
291-
}
292-
293-
None => {
294-
let ingestion_stats = IngestionStats::new(
295-
stats.current_stats.events,
296-
format!("{} {}", stats.current_stats.ingestion, "Bytes"),
297-
stats.lifetime_stats.events,
298-
format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"),
299-
stats.deleted_stats.events,
300-
format!("{} {}", stats.deleted_stats.ingestion, "Bytes"),
301-
"json",
302-
);
303-
let storage_stats = StorageStats::new(
304-
format!("{} {}", stats.current_stats.storage, "Bytes"),
305-
format!("{} {}", stats.lifetime_stats.storage, "Bytes"),
306-
format!("{} {}", stats.deleted_stats.storage, "Bytes"),
307-
"parquet",
308-
);
309-
310-
QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats)
311-
}
312-
};
313-
let stats = if let Some(mut ingestor_stats) = ingestor_stats {
314-
ingestor_stats.push(stats);
315-
merge_quried_stats(ingestor_stats)
316-
} else {
317-
stats
262+
let stats = {
263+
let ingestion_stats = IngestionStats::new(
264+
stats.current_stats.events,
265+
format!("{} Bytes", stats.current_stats.ingestion),
266+
stats.lifetime_stats.events,
267+
format!("{} Bytes", stats.lifetime_stats.ingestion),
268+
stats.deleted_stats.events,
269+
format!("{} Bytes", stats.deleted_stats.ingestion),
270+
"json",
271+
);
272+
let storage_stats = StorageStats::new(
273+
format!("{} Bytes", stats.current_stats.storage),
274+
format!("{} Bytes", stats.lifetime_stats.storage),
275+
format!("{} Bytes", stats.deleted_stats.storage),
276+
"parquet",
277+
);
278+
279+
QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats)
318280
};
319281

320282
let stats = serde_json::to_value(stats)?;

src/handlers/http/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,18 +40,25 @@ pub mod logstream;
4040
pub mod middleware;
4141
pub mod modal;
4242
pub mod oidc;
43+
pub mod prism_home;
44+
pub mod prism_logstream;
4345
pub mod query;
4446
pub mod rbac;
4547
pub mod role;
4648
pub mod users;
4749
pub const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760;
4850
pub const API_BASE_PATH: &str = "api";
4951
pub const API_VERSION: &str = "v1";
52+
pub const PRISM_BASE_PATH: &str = "prism";
5053

5154
pub fn base_path() -> String {
5255
format!("/{API_BASE_PATH}/{API_VERSION}")
5356
}
5457

58+
pub fn prism_base_path() -> String {
59+
format!("/{API_BASE_PATH}/{PRISM_BASE_PATH}/{API_VERSION}")
60+
}
61+
5562
pub fn metrics_path() -> String {
5663
format!("{}/metrics", base_path())
5764
}

src/handlers/http/modal/query/querier_logstream.rs

Lines changed: 18 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ use crate::{
4545
parseable::{StreamNotFound, PARSEABLE},
4646
stats::{self, Stats},
4747
storage::StreamType,
48-
LOCK_EXPECT,
4948
};
5049

5150
pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
@@ -176,57 +175,28 @@ pub async fn get_stats(
176175
None
177176
};
178177

179-
let hash_map = PARSEABLE.streams.read().expect(LOCK_EXPECT);
180-
let stream_meta = hash_map
181-
.get(&stream_name)
182-
.ok_or_else(|| StreamNotFound(stream_name.clone()))?
183-
.metadata
184-
.read()
185-
.expect(LOCK_EXPECT);
186-
187178
let time = Utc::now();
188179

189-
let stats = match &stream_meta.first_event_at {
190-
Some(_) => {
191-
let ingestion_stats = IngestionStats::new(
192-
stats.current_stats.events,
193-
format!("{} {}", stats.current_stats.ingestion, "Bytes"),
194-
stats.lifetime_stats.events,
195-
format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"),
196-
stats.deleted_stats.events,
197-
format!("{} {}", stats.deleted_stats.ingestion, "Bytes"),
198-
"json",
199-
);
200-
let storage_stats = StorageStats::new(
201-
format!("{} {}", stats.current_stats.storage, "Bytes"),
202-
format!("{} {}", stats.lifetime_stats.storage, "Bytes"),
203-
format!("{} {}", stats.deleted_stats.storage, "Bytes"),
204-
"parquet",
205-
);
206-
207-
QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats)
208-
}
180+
let stats = {
181+
let ingestion_stats = IngestionStats::new(
182+
stats.current_stats.events,
183+
format!("{} Bytes", stats.current_stats.ingestion),
184+
stats.lifetime_stats.events,
185+
format!("{} Bytes", stats.lifetime_stats.ingestion),
186+
stats.deleted_stats.events,
187+
format!("{} Bytes", stats.deleted_stats.ingestion),
188+
"json",
189+
);
190+
let storage_stats = StorageStats::new(
191+
format!("{} Bytes", stats.current_stats.storage),
192+
format!("{} Bytes", stats.lifetime_stats.storage),
193+
format!("{} Bytes", stats.deleted_stats.storage),
194+
"parquet",
195+
);
209196

210-
None => {
211-
let ingestion_stats = IngestionStats::new(
212-
stats.current_stats.events,
213-
format!("{} {}", stats.current_stats.ingestion, "Bytes"),
214-
stats.lifetime_stats.events,
215-
format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"),
216-
stats.deleted_stats.events,
217-
format!("{} {}", stats.deleted_stats.ingestion, "Bytes"),
218-
"json",
219-
);
220-
let storage_stats = StorageStats::new(
221-
format!("{} {}", stats.current_stats.storage, "Bytes"),
222-
format!("{} {}", stats.lifetime_stats.storage, "Bytes"),
223-
format!("{} {}", stats.deleted_stats.storage, "Bytes"),
224-
"parquet",
225-
);
226-
227-
QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats)
228-
}
197+
QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats)
229198
};
199+
230200
let stats = if let Some(mut ingestor_stats) = ingestor_stats {
231201
ingestor_stats.push(stats);
232202
merge_quried_stats(ingestor_stats)

src/handlers/http/modal/query_server.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ use std::thread;
2121
use crate::alerts::ALERTS;
2222
use crate::correlation::CORRELATIONS;
2323
use crate::handlers::airplane;
24-
use crate::handlers::http::base_path;
2524
use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular};
2625
use crate::handlers::http::middleware::{DisAllowRootUser, RouteExt};
26+
use crate::handlers::http::{base_path, prism_base_path};
2727
use crate::handlers::http::{logstream, MAX_EVENT_PAYLOAD_SIZE};
2828
use crate::handlers::http::{rbac, role};
2929
use crate::hottier::HotTierManager;
@@ -61,16 +61,23 @@ impl ParseableServer for QueryServer {
6161
.service(Server::get_about_factory())
6262
.service(Self::get_logstream_webscope())
6363
.service(Self::get_user_webscope())
64+
.service(Server::get_users_webscope())
6465
.service(Server::get_dashboards_webscope())
6566
.service(Server::get_filters_webscope())
6667
.service(Server::get_llm_webscope())
6768
.service(Server::get_oauth_webscope(oidc_client))
6869
.service(Self::get_user_role_webscope())
70+
.service(Server::get_roles_webscope())
6971
.service(Server::get_counts_webscope())
7072
.service(Server::get_metrics_webscope())
7173
.service(Server::get_alerts_webscope())
7274
.service(Self::get_cluster_web_scope()),
7375
)
76+
.service(
77+
web::scope(&prism_base_path())
78+
.service(Server::get_prism_home())
79+
.service(Server::get_prism_logstream()),
80+
)
7481
.service(Server::get_generated());
7582
}
7683

src/handlers/http/modal/server.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use crate::handlers::http::about;
2626
use crate::handlers::http::alerts;
2727
use crate::handlers::http::base_path;
2828
use crate::handlers::http::health_check;
29+
use crate::handlers::http::prism_base_path;
2930
use crate::handlers::http::query;
3031
use crate::handlers::http::users::dashboards;
3132
use crate::handlers::http::users::filters;
@@ -80,15 +81,22 @@ impl ParseableServer for Server {
8081
.service(Self::get_about_factory())
8182
.service(Self::get_logstream_webscope())
8283
.service(Self::get_user_webscope())
84+
.service(Self::get_users_webscope())
8385
.service(Self::get_dashboards_webscope())
8486
.service(Self::get_filters_webscope())
8587
.service(Self::get_llm_webscope())
8688
.service(Self::get_oauth_webscope(oidc_client))
8789
.service(Self::get_user_role_webscope())
90+
.service(Self::get_roles_webscope())
8891
.service(Self::get_counts_webscope())
8992
.service(Self::get_alerts_webscope())
9093
.service(Self::get_metrics_webscope()),
9194
)
95+
.service(
96+
web::scope(&prism_base_path())
97+
.service(Server::get_prism_home())
98+
.service(Server::get_prism_logstream()),
99+
)
92100
.service(Self::get_ingest_otel_factory())
93101
.service(Self::get_generated());
94102
}
@@ -154,6 +162,24 @@ impl ParseableServer for Server {
154162
}
155163

156164
impl Server {
165+
pub fn get_prism_home() -> Resource {
166+
web::resource("/home").route(web::get().to(http::prism_home::home_api))
167+
}
168+
169+
pub fn get_prism_logstream() -> Scope {
170+
web::scope("/logstream").service(
171+
web::scope("/{logstream}").service(
172+
web::resource("/info").route(
173+
web::get()
174+
.to(http::prism_logstream::get_info)
175+
.authorize_for_stream(Action::GetStreamInfo)
176+
.authorize_for_stream(Action::GetStats)
177+
.authorize_for_stream(Action::GetRetention),
178+
),
179+
),
180+
)
181+
}
182+
157183
pub fn get_metrics_webscope() -> Scope {
158184
web::scope("/metrics").service(
159185
web::resource("").route(web::get().to(metrics::get).authorize(Action::Metrics)),
@@ -455,6 +481,13 @@ impl Server {
455481
}
456482
}
457483

484+
// get list of roles
485+
pub fn get_roles_webscope() -> Scope {
486+
web::scope("/roles").service(
487+
web::resource("").route(web::get().to(role::list_roles).authorize(Action::ListRole)),
488+
)
489+
}
490+
458491
// get the role webscope
459492
pub fn get_user_role_webscope() -> Scope {
460493
web::scope("/role")
@@ -475,6 +508,27 @@ impl Server {
475508
)
476509
}
477510

511+
// get the users webscope (for Prism only)
512+
pub fn get_users_webscope() -> Scope {
513+
web::scope("/users")
514+
.service(
515+
web::resource("")
516+
// GET /users => List all users
517+
.route(
518+
web::get()
519+
.to(http::rbac::list_users_prism)
520+
.authorize(Action::ListUser),
521+
),
522+
)
523+
.service(
524+
web::resource("/{username}").route(
525+
web::get()
526+
.to(http::rbac::get_prism_user)
527+
.authorize_for_user(Action::GetUserRoles),
528+
),
529+
)
530+
}
531+
478532
// get the user webscope
479533
pub fn get_user_webscope() -> Scope {
480534
web::scope("/user")

0 commit comments

Comments
 (0)