Skip to content

Commit 63bbf9d

Browse files
chore: optimize server start (#1277)
parallelise stream migration load calls - correlations, filters, dashboards, alerts in parallel remove file migration - deprecate for migrations < 1.0.0
1 parent b32aa56 commit 63bbf9d

File tree

6 files changed

+108
-141
lines changed

6 files changed

+108
-141
lines changed

src/alerts/mod.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -705,6 +705,8 @@ pub enum AlertError {
705705
InvalidStateChange(String),
706706
#[error("{0}")]
707707
StreamNotFound(#[from] StreamNotFound),
708+
#[error("{0}")]
709+
Anyhow(#[from] anyhow::Error),
708710
}
709711

710712
impl actix_web::ResponseError for AlertError {
@@ -719,6 +721,7 @@ impl actix_web::ResponseError for AlertError {
719721
Self::CustomError(_) => StatusCode::BAD_REQUEST,
720722
Self::InvalidStateChange(_) => StatusCode::BAD_REQUEST,
721723
Self::StreamNotFound(_) => StatusCode::NOT_FOUND,
724+
Self::Anyhow(_) => StatusCode::INTERNAL_SERVER_ERROR,
722725
}
723726
}
724727

@@ -731,7 +734,7 @@ impl actix_web::ResponseError for AlertError {
731734

732735
impl Alerts {
733736
/// Loads alerts from disk, blocks
734-
pub async fn load(&self) -> Result<(), AlertError> {
737+
pub async fn load(&self) -> anyhow::Result<()> {
735738
let mut map = self.alerts.write().await;
736739
let store = PARSEABLE.storage.get_object_store();
737740

@@ -743,7 +746,8 @@ impl Alerts {
743746
alert.clone(),
744747
inbox_rx,
745748
outbox_tx,
746-
)?;
749+
)
750+
.map_err(|e| anyhow::Error::msg(e.to_string()))?;
747751

748752
self.update_task(alert.id, handle, outbox_rx, inbox_tx)
749753
.await;

src/handlers/http/cluster/mod.rs

+6
Original file line numberDiff line numberDiff line change
@@ -720,6 +720,12 @@ async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
720720
PostError::Invalid(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err))
721721
})?;
722722

723+
// add a check to see if the ingestor is live
724+
if !check_liveness(&ingestor.domain_name).await {
725+
warn!("Ingestor {} is not live", ingestor.domain_name);
726+
continue;
727+
}
728+
723729
let res = HTTP_CLIENT
724730
.get(uri)
725731
.header(header::AUTHORIZATION, &ingestor.token)

src/handlers/http/modal/mod.rs

+40
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@ use std::{path::Path, sync::Arc};
2020

2121
use actix_web::{middleware::from_fn, web::ServiceConfig, App, HttpServer};
2222
use actix_web_prometheus::PrometheusMetrics;
23+
use anyhow::Context;
2324
use async_trait::async_trait;
2425
use base64::{prelude::BASE64_STANDARD, Engine};
2526
use bytes::Bytes;
27+
use futures::future;
2628
use openid::Discovered;
2729
use relative_path::RelativePathBuf;
2830
use serde::{Deserialize, Serialize};
@@ -32,11 +34,14 @@ use tokio::sync::oneshot;
3234
use tracing::{error, info, warn};
3335

3436
use crate::{
37+
alerts::ALERTS,
3538
cli::Options,
39+
correlation::CORRELATIONS,
3640
oidc::Claims,
3741
option::Mode,
3842
parseable::PARSEABLE,
3943
storage::{ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY},
44+
users::{dashboards::DASHBOARDS, filters::FILTERS},
4045
utils::{get_indexer_id, get_ingestor_id},
4146
};
4247

@@ -159,6 +164,41 @@ pub trait ParseableServer {
159164
}
160165
}
161166

167+
pub async fn load_on_init() -> anyhow::Result<()> {
168+
// Run all loading operations concurrently
169+
let (correlations_result, filters_result, dashboards_result, alerts_result) = future::join4(
170+
async {
171+
CORRELATIONS
172+
.load()
173+
.await
174+
.context("Failed to load correlations")
175+
},
176+
async { FILTERS.load().await.context("Failed to load filters") },
177+
async { DASHBOARDS.load().await.context("Failed to load dashboards") },
178+
async { ALERTS.load().await.context("Failed to load alerts") },
179+
)
180+
.await;
181+
182+
// Handle errors from each operation
183+
if let Err(e) = correlations_result {
184+
error!("{e}");
185+
}
186+
187+
if let Err(err) = filters_result {
188+
error!("{err}");
189+
}
190+
191+
if let Err(err) = dashboards_result {
192+
error!("{err}");
193+
}
194+
195+
if let Err(err) = alerts_result {
196+
error!("{err}");
197+
}
198+
199+
Ok(())
200+
}
201+
162202
#[derive(Debug, Serialize, Deserialize, Default, Clone, Eq, PartialEq)]
163203
pub struct IngestorMetadata {
164204
pub version: String,

src/handlers/http/modal/query_server.rs

+6-25
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818

1919
use std::thread;
2020

21-
use crate::alerts::ALERTS;
22-
use crate::correlation::CORRELATIONS;
2321
use crate::handlers::airplane;
2422
use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular};
2523
use crate::handlers::http::middleware::{DisAllowRootUser, RouteExt};
@@ -28,22 +26,20 @@ use crate::handlers::http::{logstream, MAX_EVENT_PAYLOAD_SIZE};
2826
use crate::handlers::http::{rbac, role};
2927
use crate::hottier::HotTierManager;
3028
use crate::rbac::role::Action;
31-
use crate::users::dashboards::DASHBOARDS;
32-
use crate::users::filters::FILTERS;
3329
use crate::{analytics, migration, storage, sync};
3430
use actix_web::web::{resource, ServiceConfig};
3531
use actix_web::{web, Scope};
3632
use actix_web_prometheus::PrometheusMetrics;
3733
use async_trait::async_trait;
3834
use bytes::Bytes;
3935
use tokio::sync::oneshot;
40-
use tracing::{error, info};
36+
use tracing::info;
4137

4238
use crate::parseable::PARSEABLE;
4339
use crate::Server;
4440

4541
use super::query::{querier_ingest, querier_logstream, querier_rbac, querier_role};
46-
use super::{OpenIdClient, ParseableServer};
42+
use super::{load_on_init, OpenIdClient, ParseableServer};
4743

4844
pub struct QueryServer;
4945

@@ -90,7 +86,6 @@ impl ParseableServer for QueryServer {
9086
));
9187
}
9288

93-
migration::run_file_migration(&PARSEABLE).await?;
9489
let parseable_json = PARSEABLE.validate_storage().await?;
9590
migration::run_metadata_migration(&PARSEABLE, &parseable_json).await?;
9691

@@ -109,22 +104,8 @@ impl ParseableServer for QueryServer {
109104

110105
//create internal stream at server start
111106
PARSEABLE.create_internal_stream_if_not_exists().await?;
112-
113-
if let Err(e) = CORRELATIONS.load().await {
114-
error!("{e}");
115-
}
116-
if let Err(err) = FILTERS.load().await {
117-
error!("{err}")
118-
};
119-
120-
if let Err(err) = DASHBOARDS.load().await {
121-
error!("{err}")
122-
};
123-
124-
if let Err(err) = ALERTS.load().await {
125-
error!("{err}")
126-
};
127-
107+
// load on init
108+
load_on_init().await?;
128109
// track all parquet files already in the data directory
129110
storage::retention::load_retention_from_global();
130111

@@ -150,11 +131,11 @@ impl ParseableServer for QueryServer {
150131

151132
let result = self
152133
.start(shutdown_rx, prometheus.clone(), PARSEABLE.options.openid())
153-
.await;
134+
.await?;
154135
// Cancel sync jobs
155136
cancel_tx.send(()).expect("Cancellation should not fail");
156137

157-
result
138+
Ok(result)
158139
}
159140
}
160141

src/handlers/http/modal/server.rs

+5-20
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,7 @@
1818

1919
use std::thread;
2020

21-
use crate::alerts::ALERTS;
2221
use crate::analytics;
23-
use crate::correlation::CORRELATIONS;
2422
use crate::handlers;
2523
use crate::handlers::http::about;
2624
use crate::handlers::http::alerts;
@@ -35,8 +33,6 @@ use crate::metrics;
3533
use crate::migration;
3634
use crate::storage;
3735
use crate::sync;
38-
use crate::users::dashboards::DASHBOARDS;
39-
use crate::users::filters::FILTERS;
4036

4137
use actix_web::web;
4238
use actix_web::web::resource;
@@ -47,7 +43,6 @@ use actix_web_static_files::ResourceFiles;
4743
use async_trait::async_trait;
4844
use bytes::Bytes;
4945
use tokio::sync::oneshot;
50-
use tracing::error;
5146

5247
use crate::{
5348
handlers::http::{
@@ -61,6 +56,7 @@ use crate::{
6156

6257
// use super::generate;
6358
use super::generate;
59+
use super::load_on_init;
6460
use super::OpenIdClient;
6561
use super::ParseableServer;
6662

@@ -103,7 +99,8 @@ impl ParseableServer for Server {
10399
}
104100

105101
async fn load_metadata(&self) -> anyhow::Result<Option<Bytes>> {
106-
migration::run_file_migration(&PARSEABLE).await?;
102+
//TODO: removed file migration
103+
//deprecated support for deployments < v1.0.0
107104
let parseable_json = PARSEABLE.validate_storage().await?;
108105
migration::run_metadata_migration(&PARSEABLE, &parseable_json).await?;
109106

@@ -120,20 +117,8 @@ impl ParseableServer for Server {
120117

121118
migration::run_migration(&PARSEABLE).await?;
122119

123-
if let Err(e) = CORRELATIONS.load().await {
124-
error!("{e}");
125-
}
126-
if let Err(err) = FILTERS.load().await {
127-
error!("{err}")
128-
};
129-
130-
if let Err(err) = DASHBOARDS.load().await {
131-
error!("{err}")
132-
};
133-
134-
if let Err(err) = ALERTS.load().await {
135-
error!("{err}")
136-
};
120+
// load on init
121+
load_on_init().await?;
137122

138123
storage::retention::load_retention_from_global();
139124

0 commit comments

Comments
 (0)