Skip to content

Commit cc259d5

Browse files
author
Devdutt Shenoi
authored
refactor and expose ingest server util (#1091)
1 parent 838dd7f commit cc259d5

File tree

2 files changed

+82
-87
lines changed

2 files changed

+82
-87
lines changed

src/handlers/http/modal/ingest_server.rs

Lines changed: 80 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ use crate::{handlers::http::base_path, option::CONFIG};
4545
use actix_web::web;
4646
use actix_web::web::resource;
4747
use actix_web::Scope;
48-
use anyhow::anyhow;
4948
use async_trait::async_trait;
5049
use base64::Engine;
5150
use bytes::Bytes;
@@ -85,14 +84,14 @@ impl ParseableServer for IngestServer {
8584
// parseable can't use local storage for persistence when running a distributed setup
8685
if CONFIG.get_storage_mode_string() == "Local drive" {
8786
return Err(anyhow::Error::msg(
88-
"This instance of the Parseable server has been configured to run in a distributed setup, it doesn't support local storage.",
89-
));
87+
"This instance of the Parseable server has been configured to run in a distributed setup, it doesn't support local storage.",
88+
));
9089
}
9190

9291
// check for querier state. Is it there, or was it there in the past
93-
let parseable_json = self.check_querier_state().await?;
92+
let parseable_json = check_querier_state().await?;
9493
// to get the .parseable.json file in staging
95-
self.validate_credentials().await?;
94+
validate_credentials().await?;
9695

9796
Ok(parseable_json)
9897
}
@@ -112,7 +111,7 @@ impl ParseableServer for IngestServer {
112111
tokio::spawn(airplane::server());
113112

114113
// set the ingestor metadata
115-
self.set_ingestor_metadata().await?;
114+
set_ingestor_metadata().await?;
116115

117116
// Ingestors shouldn't have to deal with OpenId auth flow
118117
let app = self.start(prometheus, None);
@@ -278,96 +277,92 @@ impl IngestServer {
278277
),
279278
)
280279
}
280+
}
281+
282+
// create the ingestor metadata and put the .ingestor.json file in the object store
283+
pub async fn set_ingestor_metadata() -> anyhow::Result<()> {
284+
let storage_ingestor_metadata = migrate_ingester_metadata().await?;
285+
let store = CONFIG.storage().get_object_store();
286+
287+
// find the meta file in staging if not generate new metadata
288+
let resource = INGESTOR_META.clone();
289+
// use the id that was generated/found in the staging and
290+
// generate the path for the object store
291+
let path = ingestor_metadata_path(None);
292+
293+
// we are considering that we can always get from object store
294+
if let Some(mut store_data) = storage_ingestor_metadata {
295+
if store_data.domain_name != INGESTOR_META.domain_name {
296+
store_data
297+
.domain_name
298+
.clone_from(&INGESTOR_META.domain_name);
299+
store_data.port.clone_from(&INGESTOR_META.port);
281300

282-
// create the ingestor metadata and put the .ingestor.json file in the object store
283-
async fn set_ingestor_metadata(&self) -> anyhow::Result<()> {
284-
let storage_ingestor_metadata = migrate_ingester_metadata().await?;
285-
let store = CONFIG.storage().get_object_store();
286-
287-
// find the meta file in staging if not generate new metadata
288-
let resource = INGESTOR_META.clone();
289-
// use the id that was generated/found in the staging and
290-
// generate the path for the object store
291-
let path = ingestor_metadata_path(None);
292-
293-
// we are considering that we can always get from object store
294-
if storage_ingestor_metadata.is_some() {
295-
let mut store_data = storage_ingestor_metadata.unwrap();
296-
297-
if store_data.domain_name != INGESTOR_META.domain_name {
298-
store_data
299-
.domain_name
300-
.clone_from(&INGESTOR_META.domain_name);
301-
store_data.port.clone_from(&INGESTOR_META.port);
302-
303-
let resource = Bytes::from(serde_json::to_vec(&store_data)?);
304-
305-
// if pushing to object store fails propagate the error
306-
return store
307-
.put_object(&path, resource)
308-
.await
309-
.map_err(|err| anyhow!(err));
310-
}
311-
} else {
312-
let resource = Bytes::from(serde_json::to_vec(&resource)?);
301+
let resource = Bytes::from(serde_json::to_vec(&store_data)?);
313302

303+
// if pushing to object store fails propagate the error
314304
store.put_object(&path, resource).await?;
315305
}
306+
} else {
307+
let resource = Bytes::from(serde_json::to_vec(&resource)?);
316308

317-
Ok(())
309+
store.put_object(&path, resource).await?;
318310
}
319311

320-
// check for querier state. Is it there, or was it there in the past
321-
// this should happen before the set the ingestor metadata
322-
async fn check_querier_state(&self) -> anyhow::Result<Option<Bytes>, ObjectStorageError> {
323-
// how do we check for querier state?
324-
// based on the work flow of the system, the querier will always need to start first
325-
// i.e the querier will create the `.parseable.json` file
326-
327-
let store = CONFIG.storage().get_object_store();
328-
let path = parseable_json_path();
312+
Ok(())
313+
}
329314

330-
let parseable_json = store.get_object(&path).await;
331-
match parseable_json {
332-
Ok(_) => Ok(Some(parseable_json.unwrap())),
333-
Err(_) => Err(ObjectStorageError::Custom(
315+
// check for querier state. Is it there, or was it there in the past
316+
// this should happen before the set the ingestor metadata
317+
async fn check_querier_state() -> anyhow::Result<Option<Bytes>, ObjectStorageError> {
318+
// how do we check for querier state?
319+
// based on the work flow of the system, the querier will always need to start first
320+
// i.e the querier will create the `.parseable.json` file
321+
let parseable_json = CONFIG
322+
.storage()
323+
.get_object_store()
324+
.get_object(&parseable_json_path())
325+
.await
326+
.map_err(|_| {
327+
ObjectStorageError::Custom(
334328
"Query Server has not been started yet. Please start the querier server first."
335329
.to_string(),
336-
)),
337-
}
338-
}
339-
340-
async fn validate_credentials(&self) -> anyhow::Result<()> {
341-
// check if your creds match with others
342-
let store = CONFIG.storage().get_object_store();
343-
let base_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY);
344-
let ingestor_metadata = store
345-
.get_objects(
346-
Some(&base_path),
347-
Box::new(|file_name| file_name.starts_with("ingestor")),
348330
)
349-
.await?;
350-
if !ingestor_metadata.is_empty() {
351-
let ingestor_metadata_value: Value =
352-
serde_json::from_slice(&ingestor_metadata[0]).expect("ingestor.json is valid json");
353-
let check = ingestor_metadata_value
354-
.as_object()
355-
.and_then(|meta| meta.get("token"))
356-
.and_then(|token| token.as_str())
357-
.unwrap();
358-
359-
let token = base64::prelude::BASE64_STANDARD.encode(format!(
360-
"{}:{}",
361-
CONFIG.parseable.username, CONFIG.parseable.password
362-
));
363-
364-
let token = format!("Basic {}", token);
365-
366-
if check != token {
367-
return Err(anyhow::anyhow!("Credentials do not match with other ingestors. Please check your credentials and try again."));
368-
}
369-
}
331+
})?;
332+
333+
Ok(Some(parseable_json))
334+
}
370335

371-
Ok(())
336+
async fn validate_credentials() -> anyhow::Result<()> {
337+
// check if your creds match with others
338+
let store = CONFIG.storage().get_object_store();
339+
let base_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY);
340+
let ingestor_metadata = store
341+
.get_objects(
342+
Some(&base_path),
343+
Box::new(|file_name| file_name.starts_with("ingestor")),
344+
)
345+
.await?;
346+
if !ingestor_metadata.is_empty() {
347+
let ingestor_metadata_value: Value =
348+
serde_json::from_slice(&ingestor_metadata[0]).expect("ingestor.json is valid json");
349+
let check = ingestor_metadata_value
350+
.as_object()
351+
.and_then(|meta| meta.get("token"))
352+
.and_then(|token| token.as_str())
353+
.unwrap();
354+
355+
let token = base64::prelude::BASE64_STANDARD.encode(format!(
356+
"{}:{}",
357+
CONFIG.parseable.username, CONFIG.parseable.password
358+
));
359+
360+
let token = format!("Basic {}", token);
361+
362+
if check != token {
363+
return Err(anyhow::anyhow!("Credentials do not match with other ingestors. Please check your credentials and try again."));
364+
}
372365
}
366+
367+
Ok(())
373368
}

src/storage/staging.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ impl StorageDir {
129129
let paths = dir
130130
.flatten()
131131
.map(|file| file.path())
132-
.filter(|file| file.extension().map_or(false, |ext| ext.eq("arrows")))
132+
.filter(|file| file.extension().is_some_and(|ext| ext.eq("arrows")))
133133
.sorted_by_key(|f| f.metadata().unwrap().modified().unwrap())
134134
.collect();
135135

@@ -199,7 +199,7 @@ impl StorageDir {
199199

200200
dir.flatten()
201201
.map(|file| file.path())
202-
.filter(|file| file.extension().map_or(false, |ext| ext.eq("parquet")))
202+
.filter(|file| file.extension().is_some_and(|ext| ext.eq("parquet")))
203203
.collect()
204204
}
205205

0 commit comments

Comments
 (0)