Skip to content

Commit 46686dc

Browse files
feat: allow stream creation from ingestor in distributed deployments (#980)
Co-authored-by: Akshat Agarwal <[email protected]>
1 parent d361b69 commit 46686dc

File tree

8 files changed

+179
-11
lines changed

8 files changed

+179
-11
lines changed

server/src/handlers/http/cluster/mod.rs

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use crate::metrics::prom_utils::Metrics;
3030
use crate::rbac::role::model::DefaultPrivilege;
3131
use crate::rbac::user::User;
3232
use crate::stats::Stats;
33+
use crate::storage::get_staging_metadata;
3334
use crate::storage::object_storage::ingestor_metadata_path;
3435
use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY};
3536
use crate::storage::{ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY};
@@ -64,6 +65,7 @@ pub async fn sync_streams_with_ingestors(
6465
headers: HeaderMap,
6566
body: Bytes,
6667
stream_name: &str,
68+
skip_ingestor: Option<String>,
6769
) -> Result<(), StreamError> {
6870
let mut reqwest_headers = http_header::HeaderMap::new();
6971

@@ -76,7 +78,16 @@ pub async fn sync_streams_with_ingestors(
7678
})?;
7779

7880
let client = reqwest::Client::new();
79-
for ingestor in ingestor_infos.iter() {
81+
82+
let final_ingestor_infos = match skip_ingestor {
83+
None => ingestor_infos,
84+
Some(skip_ingestor) => ingestor_infos
85+
.into_iter()
86+
.filter(|ingestor| ingestor.domain_name != to_url_string(skip_ingestor.clone()))
87+
.collect::<Vec<IngestorMetadata>>(),
88+
};
89+
90+
for ingestor in final_ingestor_infos {
8091
if !utils::check_liveness(&ingestor.domain_name).await {
8192
log::warn!("Ingestor {} is not live", ingestor.domain_name);
8293
continue;
@@ -841,3 +852,62 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {
841852

842853
Ok(())
843854
}
855+
856+
pub async fn forward_create_stream_request(stream_name: &str) -> Result<(), StreamError> {
857+
let client = reqwest::Client::new();
858+
859+
let staging_metadata = get_staging_metadata().unwrap().ok_or_else(|| {
860+
StreamError::Anyhow(anyhow::anyhow!("Failed to retrieve staging metadata"))
861+
})?;
862+
let querier_endpoint = to_url_string(staging_metadata.querier_endpoint.unwrap());
863+
let token = staging_metadata.querier_auth_token.unwrap();
864+
865+
if !check_liveness(&querier_endpoint).await {
866+
log::warn!("Querier {} is not live", querier_endpoint);
867+
return Err(StreamError::Anyhow(anyhow::anyhow!("Querier is not live")));
868+
}
869+
870+
let url = format!(
871+
"{}{}/logstream/{}?skip_ingestors={}",
872+
querier_endpoint,
873+
base_path_without_preceding_slash(),
874+
stream_name,
875+
CONFIG.parseable.ingestor_endpoint,
876+
);
877+
878+
let response = client
879+
.put(&url)
880+
.header(header::AUTHORIZATION, &token)
881+
.send()
882+
.await
883+
.map_err(|err| {
884+
log::error!(
885+
"Fatal: failed to forward create stream request to querier: {}\n Error: {:?}",
886+
&url,
887+
err
888+
);
889+
StreamError::Network(err)
890+
})?;
891+
892+
let status = response.status();
893+
894+
if !status.is_success() {
895+
let response_text = response.text().await.map_err(|err| {
896+
log::error!("Failed to read response text from querier: {}", &url);
897+
StreamError::Network(err)
898+
})?;
899+
900+
log::error!(
901+
"Failed to forward create stream request to querier: {}\nResponse Returned: {:?}",
902+
&url,
903+
response_text
904+
);
905+
906+
return Err(StreamError::Anyhow(anyhow::anyhow!(
907+
"Request failed with status: {}",
908+
status,
909+
)));
910+
}
911+
912+
Ok(())
913+
}

server/src/handlers/http/ingest.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use crate::event::{
2626
error::EventError,
2727
format::{self, EventFormat},
2828
};
29+
use crate::handlers::http::cluster::forward_create_stream_request;
2930
use crate::handlers::{LOG_SOURCE_KEY, LOG_SOURCE_OTEL, STREAM_NAME_HEADER_KEY};
3031
use crate::localcache::CacheError;
3132
use crate::metadata::error::stream_info::MetadataError;
@@ -210,11 +211,16 @@ pub async fn create_stream_if_not_exists(
210211
if !streams.contains(&LogStream {
211212
name: stream_name.to_owned(),
212213
}) {
213-
log::error!("Stream {} not found", stream_name);
214-
return Err(PostError::Invalid(anyhow::anyhow!(
215-
"Stream `{}` not found. Please create it using the Query server.",
216-
stream_name
217-
)));
214+
match forward_create_stream_request(stream_name).await {
215+
Ok(()) => log::info!("Stream {} created", stream_name),
216+
Err(e) => {
217+
return Err(PostError::Invalid(anyhow::anyhow!(
218+
"Unable to create stream: {} using query server. Error: {}",
219+
stream_name,
220+
e.to_string(),
221+
)))
222+
}
223+
};
218224
}
219225
metadata::STREAM_INFO
220226
.upsert_stream_info(

server/src/handlers/http/logstream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -654,7 +654,7 @@ pub async fn create_internal_stream_if_not_exists() -> Result<(), StreamError> {
654654
header::CONTENT_TYPE,
655655
HeaderValue::from_static("application/json"),
656656
);
657-
sync_streams_with_ingestors(header_map, Bytes::new(), INTERNAL_STREAM_NAME).await?;
657+
sync_streams_with_ingestors(header_map, Bytes::new(), INTERNAL_STREAM_NAME, None).await?;
658658
}
659659
Ok(())
660660
}

server/src/handlers/http/modal/query/querier_logstream.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
1+
use core::str;
12
use std::fs;
23

34
use actix_web::{web, HttpRequest, Responder};
45
use bytes::Bytes;
56
use chrono::Utc;
67
use http::StatusCode;
8+
use serde::Deserialize;
9+
use tokio::sync::Mutex;
10+
11+
static CREATE_STREAM_LOCK: Mutex<()> = Mutex::const_new(());
712

813
use crate::{
914
event,
@@ -74,11 +79,22 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
7479
Ok((format!("log stream {stream_name} deleted"), StatusCode::OK))
7580
}
7681

77-
pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder, StreamError> {
82+
#[derive(Deserialize)]
83+
pub struct PutStreamQuery {
84+
skip_ingestors: Option<String>,
85+
}
86+
87+
pub async fn put_stream(
88+
req: HttpRequest,
89+
body: Bytes,
90+
info: web::Query<PutStreamQuery>,
91+
) -> Result<impl Responder, StreamError> {
7892
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
7993

94+
let _ = CREATE_STREAM_LOCK.lock().await;
8095
let headers = create_update_stream(&req, &body, &stream_name).await?;
81-
sync_streams_with_ingestors(headers, body, &stream_name).await?;
96+
97+
sync_streams_with_ingestors(headers, body, &stream_name, info.skip_ingestors.clone()).await?;
8298

8399
Ok(("Log stream created", StatusCode::OK))
84100
}

server/src/migration.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ pub async fn run_metadata_migration(
7878
let metadata = metadata_migration::v3_v4(storage_metadata);
7979
put_remote_metadata(&*object_store, &metadata).await?;
8080
}
81+
Some("v4") => {
82+
let metadata = metadata_migration::v4_v5(storage_metadata);
83+
put_remote_metadata(&*object_store, &metadata).await?;
84+
}
8185
_ => (),
8286
}
8387
}

server/src/migration/metadata_migration.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*
1717
*/
1818

19+
use base64::Engine;
1920
use rand::distributions::DistString;
2021
use serde_json::{Map, Value as JsonValue};
2122

@@ -148,6 +149,55 @@ pub fn v3_v4(mut storage_metadata: JsonValue) -> JsonValue {
148149
storage_metadata
149150
}
150151

152+
// maybe rename
153+
pub fn v4_v5(mut storage_metadata: JsonValue) -> JsonValue {
154+
let metadata = storage_metadata.as_object_mut().unwrap();
155+
metadata.remove_entry("version");
156+
metadata.insert("version".to_string(), JsonValue::String("v5".to_string()));
157+
158+
match metadata.get("server_mode") {
159+
None => {
160+
metadata.insert(
161+
"server_mode".to_string(),
162+
JsonValue::String(CONFIG.parseable.mode.to_string()),
163+
);
164+
}
165+
Some(JsonValue::String(mode)) => match mode.as_str() {
166+
"Query" => {
167+
metadata.insert(
168+
"querier_endpoint".to_string(),
169+
JsonValue::String(CONFIG.parseable.address.clone()),
170+
);
171+
}
172+
"All" => {
173+
metadata.insert(
174+
"server_mode".to_string(),
175+
JsonValue::String(CONFIG.parseable.mode.to_string()),
176+
);
177+
metadata.insert(
178+
"querier_endpoint".to_string(),
179+
JsonValue::String(CONFIG.parseable.address.clone()),
180+
);
181+
}
182+
_ => (),
183+
},
184+
_ => (),
185+
}
186+
187+
metadata.insert(
188+
"querier_auth_token".to_string(),
189+
JsonValue::String(format!(
190+
"Basic {}",
191+
base64::prelude::BASE64_STANDARD.encode(format!(
192+
"{}:{}",
193+
CONFIG.parseable.username, CONFIG.parseable.password
194+
))
195+
)),
196+
);
197+
198+
storage_metadata
199+
}
200+
151201
pub async fn migrate_ingester_metadata() -> anyhow::Result<Option<IngestorMetadata>> {
152202
let imp = ingestor_metadata_path(None);
153203
let bytes = match CONFIG.storage().get_object_store().get_object(&imp).await {

server/src/storage.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ pub use localfs::FSConfig;
4040
pub use object_storage::{ObjectStorage, ObjectStorageProvider};
4141
pub use s3::S3Config;
4242
pub use store_metadata::{
43-
put_remote_metadata, put_staging_metadata, resolve_parseable_metadata, StorageMetadata,
43+
get_staging_metadata, put_remote_metadata, put_staging_metadata, resolve_parseable_metadata,
44+
StorageMetadata,
4445
};
4546

4647
// metadata file names in a Stream prefix

server/src/storage/store_metadata.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use std::{
2222
path::PathBuf,
2323
};
2424

25+
use base64::Engine;
2526
use bytes::Bytes;
2627
use once_cell::sync::OnceCell;
2728
use relative_path::RelativePathBuf;
@@ -63,10 +64,29 @@ pub struct StorageMetadata {
6364
pub roles: HashMap<String, Vec<DefaultPrivilege>>,
6465
#[serde(default)]
6566
pub default_role: Option<String>,
67+
pub querier_endpoint: Option<String>,
68+
pub querier_auth_token: Option<String>,
6669
}
6770

6871
impl StorageMetadata {
6972
pub fn new() -> Self {
73+
let (querier_endpoint, querier_auth_token) = match CONFIG.parseable.mode {
74+
Mode::All | Mode::Query => {
75+
let querier_auth_token = format!(
76+
"Basic {}",
77+
base64::prelude::BASE64_STANDARD.encode(format!(
78+
"{}:{}",
79+
CONFIG.parseable.username, CONFIG.parseable.password
80+
))
81+
);
82+
(
83+
Some(CONFIG.parseable.address.clone()),
84+
Some(querier_auth_token),
85+
)
86+
}
87+
Mode::Ingest => (None, None),
88+
};
89+
7090
Self {
7191
version: CURRENT_STORAGE_METADATA_VERSION.to_string(),
7292
mode: CONFIG.storage_name.to_owned(),
@@ -78,9 +98,10 @@ impl StorageMetadata {
7898
streams: Vec::new(),
7999
roles: HashMap::default(),
80100
default_role: None,
101+
querier_endpoint,
102+
querier_auth_token,
81103
}
82104
}
83-
84105
pub fn global() -> &'static StaticStorageMetadata {
85106
STORAGE_METADATA
86107
.get()

0 commit comments

Comments
 (0)