Skip to content

Commit f8935f3

Browse files
fix: stream sync for ingestors (#826)
fix for stream sync for ingestors at create and update stream - sync from querier to all live ingestors sync stream and schema at server start
1 parent 26eafa7 commit f8935f3

File tree

6 files changed

+278
-258
lines changed

6 files changed

+278
-258
lines changed

server/src/handlers/http/cluster/mod.rs

Lines changed: 22 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use crate::handlers::http::cluster::utils::{
2323
};
2424
use crate::handlers::http::ingest::{ingest_internal_stream, PostError};
2525
use crate::handlers::http::logstream::error::StreamError;
26-
use crate::handlers::{STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY};
2726
use crate::option::CONFIG;
2827

2928
use crate::metrics::prom_utils::Metrics;
@@ -96,62 +95,47 @@ pub async fn sync_cache_with_ingestors(
9695
}
9796

9897
// forward the request to all ingestors to keep them in sync
99-
#[allow(dead_code)]
10098
pub async fn sync_streams_with_ingestors(
99+
req: HttpRequest,
100+
body: Bytes,
101101
stream_name: &str,
102-
time_partition: &str,
103-
static_schema: &str,
104-
schema: Bytes,
105102
) -> Result<(), StreamError> {
106103
let ingestor_infos = get_ingestor_info().await.map_err(|err| {
107104
log::error!("Fatal: failed to get ingestor info: {:?}", err);
108105
StreamError::Anyhow(err)
109106
})?;
110107

111-
let mut errored = false;
108+
let client = reqwest::Client::new();
112109
for ingestor in ingestor_infos.iter() {
113110
let url = format!(
114111
"{}{}/logstream/{}",
115112
ingestor.domain_name,
116113
base_path_without_preceding_slash(),
117114
stream_name
118115
);
116+
let res = client
117+
.put(url)
118+
.headers(req.headers().into())
119+
.header(header::AUTHORIZATION, &ingestor.token)
120+
.body(body.clone())
121+
.send()
122+
.await
123+
.map_err(|err| {
124+
log::error!(
125+
"Fatal: failed to forward upsert stream request to ingestor: {}\n Error: {:?}",
126+
ingestor.domain_name,
127+
err
128+
);
129+
StreamError::Network(err)
130+
})?;
119131

120-
match send_stream_sync_request(
121-
&url,
122-
ingestor.clone(),
123-
time_partition,
124-
static_schema,
125-
schema.clone(),
126-
)
127-
.await
128-
{
129-
Ok(_) => continue,
130-
Err(_) => {
131-
errored = true;
132-
break;
133-
}
134-
}
135-
}
136-
137-
if errored {
138-
for ingestor in ingestor_infos {
139-
let url = format!(
140-
"{}{}/logstream/{}",
132+
if !res.status().is_success() {
133+
log::error!(
134+
"failed to forward upsert stream request to ingestor: {}\nResponse Returned: {:?}",
141135
ingestor.domain_name,
142-
base_path_without_preceding_slash(),
143-
stream_name
136+
res
144137
);
145-
146-
// delete the stream
147-
send_stream_delete_request(&url, ingestor.clone()).await?;
148138
}
149-
150-
// this might be a bit too much
151-
return Err(StreamError::Custom {
152-
msg: "Failed to sync stream with ingestors".to_string(),
153-
status: StatusCode::INTERNAL_SERVER_ERROR,
154-
});
155139
}
156140

157141
Ok(())
@@ -301,49 +285,6 @@ pub async fn fetch_stats_from_ingestors(
301285
Ok(vec![qs])
302286
}
303287

304-
#[allow(dead_code)]
305-
async fn send_stream_sync_request(
306-
url: &str,
307-
ingestor: IngestorMetadata,
308-
time_partition: &str,
309-
static_schema: &str,
310-
schema: Bytes,
311-
) -> Result<(), StreamError> {
312-
if !utils::check_liveness(&ingestor.domain_name).await {
313-
return Ok(());
314-
}
315-
316-
let client = reqwest::Client::new();
317-
let res = client
318-
.put(url)
319-
.header(header::CONTENT_TYPE, "application/json")
320-
.header(TIME_PARTITION_KEY, time_partition)
321-
.header(STATIC_SCHEMA_FLAG, static_schema)
322-
.header(header::AUTHORIZATION, ingestor.token)
323-
.body(schema)
324-
.send()
325-
.await
326-
.map_err(|err| {
327-
log::error!(
328-
"Fatal: failed to forward create stream request to ingestor: {}\n Error: {:?}",
329-
ingestor.domain_name,
330-
err
331-
);
332-
StreamError::Network(err)
333-
})?;
334-
335-
if !res.status().is_success() {
336-
log::error!(
337-
"failed to forward create stream request to ingestor: {}\nResponse Returned: {:?}",
338-
ingestor.domain_name,
339-
res
340-
);
341-
return Err(StreamError::Network(res.error_for_status().unwrap_err()));
342-
}
343-
344-
Ok(())
345-
}
346-
347288
/// send a delete stream request to all ingestors
348289
pub async fn send_stream_delete_request(
349290
url: &str,

server/src/handlers/http/logstream.rs

Lines changed: 100 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
use self::error::{CreateStreamError, StreamError};
2020
use super::base_path_without_preceding_slash;
2121
use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats};
22-
use super::cluster::{fetch_daily_stats_from_ingestors, fetch_stats_from_ingestors};
22+
use super::cluster::{
23+
fetch_daily_stats_from_ingestors, fetch_stats_from_ingestors, sync_streams_with_ingestors,
24+
};
2325
use crate::alerts::Alerts;
2426
use crate::handlers::{
2527
CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY,
@@ -169,89 +171,14 @@ pub async fn get_alert(req: HttpRequest) -> Result<impl Responder, StreamError>
169171

170172
pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder, StreamError> {
171173
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
172-
let (time_partition, time_partition_limit, custom_partition, static_schema_flag, update_stream) =
173-
fetch_headers_from_put_stream_request(&req);
174-
175-
if metadata::STREAM_INFO.stream_exists(&stream_name) && update_stream != "true" {
176-
// Error if the log stream already exists
177-
return Err(StreamError::Custom {
178-
msg: format!(
179-
"Logstream {stream_name} already exists, please create a new log stream with unique name"
180-
),
181-
status: StatusCode::BAD_REQUEST,
182-
});
183-
}
184-
185-
if !time_partition.is_empty() && update_stream == "true" {
186-
return Err(StreamError::Custom {
187-
msg: "Altering the time partition of an existing stream is restricted.".to_string(),
188-
status: StatusCode::BAD_REQUEST,
189-
});
190-
}
191-
let mut time_partition_in_days: &str = "";
192-
if !time_partition_limit.is_empty() {
193-
let time_partition_days = validate_time_partition_limit(&time_partition_limit);
194-
if let Err(err) = time_partition_days {
195-
return Err(StreamError::CreateStream(err));
196-
} else {
197-
time_partition_in_days = time_partition_days.unwrap();
198-
if update_stream == "true" {
199-
if let Err(err) = update_time_partition_limit_in_stream(
200-
stream_name.clone(),
201-
time_partition_in_days,
202-
)
203-
.await
204-
{
205-
return Err(StreamError::CreateStream(err));
206-
}
207-
return Ok(("Log stream updated", StatusCode::OK));
208-
}
209-
}
210-
}
211-
212-
if !static_schema_flag.is_empty() && update_stream == "true" {
213-
return Err(StreamError::Custom {
214-
msg: "Altering the schema of an existing stream is restricted.".to_string(),
215-
status: StatusCode::BAD_REQUEST,
216-
});
217-
}
218174

219-
if !custom_partition.is_empty() {
220-
if let Err(err) = validate_custom_partition(&custom_partition) {
221-
return Err(StreamError::CreateStream(err));
222-
}
223-
if update_stream == "true" {
224-
if let Err(err) =
225-
update_custom_partition_in_stream(stream_name.clone(), &custom_partition).await
226-
{
227-
return Err(StreamError::CreateStream(err));
228-
}
229-
return Ok(("Log stream updated", StatusCode::OK));
230-
}
231-
}
232-
233-
let schema = validate_static_schema(
234-
&body,
235-
&stream_name,
236-
&time_partition,
237-
&custom_partition,
238-
&static_schema_flag,
239-
);
240-
if let Err(err) = schema {
241-
return Err(StreamError::CreateStream(err));
175+
if CONFIG.parseable.mode == Mode::Query {
176+
create_update_stream(&req, &body, &stream_name).await?;
177+
sync_streams_with_ingestors(req, body, &stream_name).await?;
178+
} else {
179+
create_update_stream(&req, &body, &stream_name).await?;
242180
}
243181

244-
create_stream(
245-
stream_name,
246-
&time_partition,
247-
time_partition_in_days,
248-
&custom_partition,
249-
&static_schema_flag,
250-
schema.unwrap(),
251-
false,
252-
)
253-
.await?;
254-
255182
Ok(("Log stream created", StatusCode::OK))
256183
}
257184

@@ -355,6 +282,96 @@ fn validate_static_schema(
355282
Ok(schema)
356283
}
357284

285+
async fn create_update_stream(
286+
req: &HttpRequest,
287+
body: &Bytes,
288+
stream_name: &str,
289+
) -> Result<(), StreamError> {
290+
let (time_partition, time_partition_limit, custom_partition, static_schema_flag, update_stream) =
291+
fetch_headers_from_put_stream_request(req);
292+
293+
if metadata::STREAM_INFO.stream_exists(stream_name) && update_stream != "true" {
294+
// Error if the log stream already exists
295+
return Err(StreamError::Custom {
296+
msg: format!(
297+
"Logstream {stream_name} already exists, please create a new log stream with unique name"
298+
),
299+
status: StatusCode::BAD_REQUEST,
300+
});
301+
}
302+
303+
if !time_partition.is_empty() && update_stream == "true" {
304+
return Err(StreamError::Custom {
305+
msg: "Altering the time partition of an existing stream is restricted.".to_string(),
306+
status: StatusCode::BAD_REQUEST,
307+
});
308+
}
309+
let mut time_partition_in_days: &str = "";
310+
if !time_partition_limit.is_empty() {
311+
let time_partition_days = validate_time_partition_limit(&time_partition_limit);
312+
if let Err(err) = time_partition_days {
313+
return Err(StreamError::CreateStream(err));
314+
} else {
315+
time_partition_in_days = time_partition_days.unwrap();
316+
if update_stream == "true" {
317+
if let Err(err) = update_time_partition_limit_in_stream(
318+
stream_name.to_string(),
319+
time_partition_in_days,
320+
)
321+
.await
322+
{
323+
return Err(StreamError::CreateStream(err));
324+
}
325+
return Ok(());
326+
}
327+
}
328+
}
329+
330+
if !static_schema_flag.is_empty() && update_stream == "true" {
331+
return Err(StreamError::Custom {
332+
msg: "Altering the schema of an existing stream is restricted.".to_string(),
333+
status: StatusCode::BAD_REQUEST,
334+
});
335+
}
336+
337+
if !custom_partition.is_empty() {
338+
if let Err(err) = validate_custom_partition(&custom_partition) {
339+
return Err(StreamError::CreateStream(err));
340+
}
341+
if update_stream == "true" {
342+
if let Err(err) =
343+
update_custom_partition_in_stream(stream_name.to_string(), &custom_partition).await
344+
{
345+
return Err(StreamError::CreateStream(err));
346+
}
347+
return Ok(());
348+
}
349+
}
350+
351+
let schema = validate_static_schema(
352+
body,
353+
stream_name,
354+
&time_partition,
355+
&custom_partition,
356+
&static_schema_flag,
357+
);
358+
if let Err(err) = schema {
359+
return Err(StreamError::CreateStream(err));
360+
}
361+
362+
create_stream(
363+
stream_name.to_string(),
364+
&time_partition,
365+
time_partition_in_days,
366+
&custom_partition,
367+
&static_schema_flag,
368+
schema.unwrap(),
369+
false,
370+
)
371+
.await?;
372+
373+
Ok(())
374+
}
358375
pub async fn put_alert(
359376
req: HttpRequest,
360377
body: web::Json<serde_json::Value>,
@@ -471,7 +488,7 @@ pub async fn get_cache_enabled(req: HttpRequest) -> Result<impl Responder, Strea
471488
_ => {}
472489
}
473490

474-
let cache_enabled = STREAM_INFO.cache_enabled(&stream_name)?;
491+
let cache_enabled = STREAM_INFO.get_cache_enabled(&stream_name)?;
475492
Ok((web::Json(cache_enabled), StatusCode::OK))
476493
}
477494

@@ -545,7 +562,7 @@ pub async fn put_enable_cache(
545562
.put_stream_manifest(&stream_name, &stream_metadata)
546563
.await?;
547564

548-
STREAM_INFO.set_stream_cache(&stream_name, enable_cache)?;
565+
STREAM_INFO.set_cache_enabled(&stream_name, enable_cache)?;
549566
Ok((
550567
format!("Cache set to {enable_cache} for log stream {stream_name}"),
551568
StatusCode::OK,

0 commit comments

Comments
 (0)