Skip to content

Commit be9fb1e

Browse files
nikhilsinhaparseableDevdutt Shenoi
and
Devdutt Shenoi
authored
Ingestor querier sync (#1003)
Below changes are done in this PR - 1. removed querier endpoint and token from parseable.json 2. added migration steps to update parseable.json for latest release 3. removed ingestor to querier sync for ingestion with new stream creation 4. updated logic to list stream from storage 5. updated logic in stream migration at server start 6. updated logic in places where querier/ingestors need to check if stream is created in S3 before confirming the existence of the stream --------- Co-authored-by: Devdutt Shenoi <[email protected]>
1 parent da78e06 commit be9fb1e

27 files changed

+867
-369
lines changed

server/src/handlers/http/cluster/mod.rs

Lines changed: 1 addition & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ use crate::metrics::prom_utils::Metrics;
3030
use crate::rbac::role::model::DefaultPrivilege;
3131
use crate::rbac::user::User;
3232
use crate::stats::Stats;
33-
use crate::storage::get_staging_metadata;
3433
use crate::storage::object_storage::ingestor_metadata_path;
3534
use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY};
3635
use crate::storage::{ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY};
@@ -65,7 +64,6 @@ pub async fn sync_streams_with_ingestors(
6564
headers: HeaderMap,
6665
body: Bytes,
6766
stream_name: &str,
68-
skip_ingestor: Option<String>,
6967
) -> Result<(), StreamError> {
7068
let mut reqwest_headers = http_header::HeaderMap::new();
7169

@@ -79,15 +77,7 @@ pub async fn sync_streams_with_ingestors(
7977

8078
let client = reqwest::Client::new();
8179

82-
let final_ingestor_infos = match skip_ingestor {
83-
None => ingestor_infos,
84-
Some(skip_ingestor) => ingestor_infos
85-
.into_iter()
86-
.filter(|ingestor| ingestor.domain_name != to_url_string(skip_ingestor.clone()))
87-
.collect::<Vec<IngestorMetadata>>(),
88-
};
89-
90-
for ingestor in final_ingestor_infos {
80+
for ingestor in ingestor_infos {
9181
if !utils::check_liveness(&ingestor.domain_name).await {
9282
log::warn!("Ingestor {} is not live", ingestor.domain_name);
9383
continue;
@@ -852,62 +842,3 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {
852842

853843
Ok(())
854844
}
855-
856-
pub async fn forward_create_stream_request(stream_name: &str) -> Result<(), StreamError> {
857-
let client = reqwest::Client::new();
858-
859-
let staging_metadata = get_staging_metadata().unwrap().ok_or_else(|| {
860-
StreamError::Anyhow(anyhow::anyhow!("Failed to retrieve staging metadata"))
861-
})?;
862-
let querier_endpoint = to_url_string(staging_metadata.querier_endpoint.unwrap());
863-
let token = staging_metadata.querier_auth_token.unwrap();
864-
865-
if !check_liveness(&querier_endpoint).await {
866-
log::warn!("Querier {} is not live", querier_endpoint);
867-
return Err(StreamError::Anyhow(anyhow::anyhow!("Querier is not live")));
868-
}
869-
870-
let url = format!(
871-
"{}{}/logstream/{}?skip_ingestors={}",
872-
querier_endpoint,
873-
base_path_without_preceding_slash(),
874-
stream_name,
875-
CONFIG.parseable.ingestor_endpoint,
876-
);
877-
878-
let response = client
879-
.put(&url)
880-
.header(header::AUTHORIZATION, &token)
881-
.send()
882-
.await
883-
.map_err(|err| {
884-
log::error!(
885-
"Fatal: failed to forward create stream request to querier: {}\n Error: {:?}",
886-
&url,
887-
err
888-
);
889-
StreamError::Network(err)
890-
})?;
891-
892-
let status = response.status();
893-
894-
if !status.is_success() {
895-
let response_text = response.text().await.map_err(|err| {
896-
log::error!("Failed to read response text from querier: {}", &url);
897-
StreamError::Network(err)
898-
})?;
899-
900-
log::error!(
901-
"Failed to forward create stream request to querier: {}\nResponse Returned: {:?}",
902-
&url,
903-
response_text
904-
);
905-
906-
return Err(StreamError::Anyhow(anyhow::anyhow!(
907-
"Request failed with status: {}",
908-
status,
909-
)));
910-
}
911-
912-
Ok(())
913-
}

server/src/handlers/http/ingest.rs

Lines changed: 32 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,13 @@ use crate::event::{
2626
error::EventError,
2727
format::{self, EventFormat},
2828
};
29-
use crate::handlers::http::cluster::forward_create_stream_request;
29+
use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage;
3030
use crate::handlers::{LOG_SOURCE_KEY, LOG_SOURCE_OTEL, STREAM_NAME_HEADER_KEY};
3131
use crate::localcache::CacheError;
3232
use crate::metadata::error::stream_info::MetadataError;
33-
use crate::metadata::{self, STREAM_INFO};
33+
use crate::metadata::STREAM_INFO;
3434
use crate::option::{Mode, CONFIG};
35-
use crate::storage::{LogStream, ObjectStorageError, StreamType};
35+
use crate::storage::{ObjectStorageError, StreamType};
3636
use crate::utils::header_parsing::ParseHeaderError;
3737
use actix_web::{http::header::ContentType, HttpRequest, HttpResponse};
3838
use arrow_array::RecordBatch;
@@ -153,7 +153,17 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result<HttpResponse, P
153153
)));
154154
}
155155
if !STREAM_INFO.stream_exists(&stream_name) {
156-
return Err(PostError::StreamNotFound(stream_name));
156+
// For distributed deployments, if the stream not found in memory map,
157+
//check if it exists in the storage
158+
//create stream and schema from storage
159+
if CONFIG.parseable.mode != Mode::All {
160+
match create_stream_and_schema_from_storage(&stream_name).await {
161+
Ok(true) => {}
162+
Ok(false) | Err(_) => return Err(PostError::StreamNotFound(stream_name.clone())),
163+
}
164+
} else {
165+
return Err(PostError::StreamNotFound(stream_name.clone()));
166+
}
157167
}
158168

159169
flatten_and_push_logs(req, body, stream_name).await?;
@@ -190,49 +200,25 @@ pub async fn create_stream_if_not_exists(
190200
stream_exists = true;
191201
return Ok(stream_exists);
192202
}
193-
match &CONFIG.parseable.mode {
194-
Mode::All | Mode::Query => {
195-
super::logstream::create_stream(
196-
stream_name.to_string(),
197-
"",
198-
"",
199-
"",
200-
"",
201-
Arc::new(Schema::empty()),
202-
stream_type,
203-
)
204-
.await?;
205-
}
206-
Mode::Ingest => {
207-
// here the ingest server has not found the stream
208-
// so it should check if the stream exists in storage
209-
let store = CONFIG.storage().get_object_store();
210-
let streams = store.list_streams().await?;
211-
if !streams.contains(&LogStream {
212-
name: stream_name.to_owned(),
213-
}) {
214-
match forward_create_stream_request(stream_name).await {
215-
Ok(()) => log::info!("Stream {} created", stream_name),
216-
Err(e) => {
217-
return Err(PostError::Invalid(anyhow::anyhow!(
218-
"Unable to create stream: {} using query server. Error: {}",
219-
stream_name,
220-
e.to_string(),
221-
)))
222-
}
223-
};
224-
}
225-
metadata::STREAM_INFO
226-
.upsert_stream_info(
227-
&*store,
228-
LogStream {
229-
name: stream_name.to_owned(),
230-
},
231-
)
232-
.await
233-
.map_err(|_| PostError::StreamNotFound(stream_name.to_owned()))?;
234-
}
203+
204+
// For distributed deployments, if the stream not found in memory map,
205+
//check if it exists in the storage
206+
//create stream and schema from storage
207+
if CONFIG.parseable.mode != Mode::All {
208+
return Ok(create_stream_and_schema_from_storage(stream_name).await?);
235209
}
210+
211+
super::logstream::create_stream(
212+
stream_name.to_string(),
213+
"",
214+
"",
215+
"",
216+
"",
217+
Arc::new(Schema::empty()),
218+
stream_type,
219+
)
220+
.await?;
221+
236222
Ok(stream_exists)
237223
}
238224

0 commit comments

Comments
 (0)