Skip to content

Commit eddd332

Browse files
authored
Fetch schema and stream from obj store (#745)
* Refactor object storage to use filter_func instead of starts_with_pattern in get_objects method * Refactor fetch_schema method to use object storage instead of HTTP requests * Refactor metadata.rs and storage.rs * refactor ingest logic * fetch stream info from store if stream info is not present in memory. error if stream info does not exist in S3 and memory
1 parent be469a3 commit eddd332

File tree

12 files changed

+122
-78
lines changed

12 files changed

+122
-78
lines changed

server/src/handlers/http.rs

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@
1818

1919
use actix_cors::Cors;
2020
use arrow_schema::Schema;
21+
use itertools::Itertools;
2122
use serde_json::Value;
2223

24+
use crate::option::CONFIG;
25+
2326
use self::{cluster::get_ingester_info, query::Query};
2427

2528
pub(crate) mod about;
@@ -61,32 +64,31 @@ pub fn base_path_without_preceding_slash() -> String {
6164
format!("{API_BASE_PATH}/{API_VERSION}")
6265
}
6366

67+
/// Fetches the schema for the specified stream.
68+
///
69+
/// # Arguments
70+
///
71+
/// * `stream_name` - The name of the stream to fetch the schema for.
72+
///
73+
/// # Returns
74+
///
75+
/// An `anyhow::Result` containing the `arrow_schema::Schema` for the specified stream.
6476
pub async fn fetch_schema(stream_name: &str) -> anyhow::Result<arrow_schema::Schema> {
65-
let mut res = vec![];
66-
let ima = get_ingester_info().await.unwrap();
67-
68-
for im in ima {
69-
let uri = format!(
70-
"{}{}/logstream/{}/schema",
71-
im.domain_name,
72-
base_path_without_preceding_slash(),
73-
stream_name
74-
);
75-
let reqw = reqwest::Client::new()
76-
.get(uri)
77-
.header(http::header::AUTHORIZATION, im.token.clone())
78-
.header(http::header::CONTENT_TYPE, "application/json")
79-
.send()
80-
.await?;
81-
82-
if reqw.status().is_success() {
83-
let v = serde_json::from_slice(&reqw.bytes().await?)?;
84-
res.push(v);
85-
}
86-
}
77+
let path_prefix =
78+
relative_path::RelativePathBuf::from(format!("{}/{}", stream_name, ".stream"));
79+
let store = CONFIG.storage().get_object_store();
80+
let res: Vec<Schema> = store
81+
.get_objects(
82+
Some(&path_prefix),
83+
Box::new(|file_name: String| file_name.contains(".schema")),
84+
)
85+
.await?
86+
.iter()
87+
// we should be able to unwrap as we know the data is valid schema
88+
.map(|byte_obj| serde_json::from_slice(byte_obj).unwrap())
89+
.collect_vec();
8790

8891
let new_schema = Schema::try_merge(res)?;
89-
9092
Ok(new_schema)
9193
}
9294

server/src/handlers/http/cluster/mod.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ use super::base_path_without_preceding_slash;
4949
use super::modal::IngesterMetadata;
5050

5151
// forward the request to all ingesters to keep them in sync
52+
#[allow(dead_code)]
5253
pub async fn sync_streams_with_ingesters(
5354
stream_name: &str,
5455
time_partition: &str,
@@ -117,7 +118,10 @@ pub async fn fetch_stats_from_ingesters(
117118
let obs = CONFIG
118119
.storage()
119120
.get_object_store()
120-
.get_objects(Some(&path), ".ingester")
121+
.get_objects(
122+
Some(&path),
123+
Box::new(|file_name| file_name.starts_with(".ingester")),
124+
)
121125
.await?;
122126
let mut ingestion_size = 0u64;
123127
let mut storage_size = 0u64;
@@ -140,6 +144,7 @@ pub async fn fetch_stats_from_ingesters(
140144
Ok(vec![qs])
141145
}
142146

147+
#[allow(dead_code)]
143148
async fn send_stream_sync_request(
144149
url: &str,
145150
ingester: IngesterMetadata,
@@ -183,6 +188,7 @@ async fn send_stream_sync_request(
183188
}
184189

185190
/// send a rollback request to all ingesters
191+
#[allow(dead_code)]
186192
async fn send_stream_rollback_request(
187193
url: &str,
188194
ingester: IngesterMetadata,
@@ -346,7 +352,10 @@ pub async fn get_ingester_info() -> anyhow::Result<IngesterMetadataArr> {
346352

347353
let root_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY);
348354
let arr = store
349-
.get_objects(Some(&root_path), "ingester")
355+
.get_objects(
356+
Some(&root_path),
357+
Box::new(|file_name| file_name.starts_with("ingester")),
358+
)
350359
.await?
351360
.iter()
352361
// this unwrap will most definateley shoot me in the foot later

server/src/handlers/http/ingest.rs

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ use crate::handlers::{
2727
LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, LOG_SOURCE_OTEL, PREFIX_META, PREFIX_TAGS, SEPARATOR,
2828
STREAM_NAME_HEADER_KEY,
2929
};
30-
use crate::metadata::STREAM_INFO;
30+
use crate::metadata::{self, STREAM_INFO};
3131
use crate::option::{Mode, CONFIG};
32-
use crate::storage::ObjectStorageError;
32+
use crate::storage::{LogStream, ObjectStorageError};
3333
use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError};
3434
use actix_web::{http::header::ContentType, HttpRequest, HttpResponse};
3535
use arrow_schema::{Field, Schema};
@@ -165,10 +165,28 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr
165165
.await?;
166166
}
167167
Mode::Ingest => {
168-
return Err(PostError::Invalid(anyhow::anyhow!(
169-
"Stream {} not found. Has it been created?",
170-
stream_name
171-
)));
168+
// here the ingest server has not found the stream
169+
// so it should check if the stream exists in storage
170+
let store = CONFIG.storage().get_object_store();
171+
let streams = store.list_streams().await?;
172+
if !streams.contains(&LogStream {
173+
name: stream_name.to_owned(),
174+
}) {
175+
log::error!("Stream {} not found", stream_name);
176+
return Err(PostError::Invalid(anyhow::anyhow!(
177+
"Stream {} not found. Has it been created?",
178+
stream_name
179+
)));
180+
}
181+
metadata::STREAM_INFO
182+
.upsert_stream_info(
183+
&*store,
184+
LogStream {
185+
name: stream_name.to_owned(),
186+
},
187+
)
188+
.await
189+
.map_err(|_| PostError::StreamNotFound(stream_name.to_owned()))?;
172190
}
173191
}
174192
Ok(())

server/src/handlers/http/logstream.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ use crate::storage::{retention::Retention, LogStream, StorageDir, StreamInfo};
2626
use crate::{catalog, event, stats};
2727
use crate::{metadata, validator};
2828

29+
use super::cluster::fetch_stats_from_ingesters;
2930
use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats};
30-
use super::cluster::{fetch_stats_from_ingesters, sync_streams_with_ingesters};
3131
use actix_web::http::StatusCode;
3232
use actix_web::{web, HttpRequest, Responder};
3333
use arrow_schema::{Field, Schema};
@@ -166,9 +166,6 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder,
166166
});
167167
}
168168

169-
if CONFIG.parseable.mode == Mode::Query {
170-
sync_streams_with_ingesters(&stream_name, time_partition, static_schema_flag, body).await?;
171-
}
172169
create_stream(stream_name, time_partition, static_schema_flag, schema).await?;
173170

174171
Ok(("log stream created", StatusCode::OK))

server/src/handlers/http/modal/ingest_server.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,10 @@ impl IngestServer {
263263
let store = CONFIG.storage().get_object_store();
264264
let base_path = RelativePathBuf::from("");
265265
let ingester_metadata = store
266-
.get_objects(Some(&base_path), "ingester")
266+
.get_objects(
267+
Some(&base_path),
268+
Box::new(|file_name| file_name.starts_with("ingester")),
269+
)
267270
.await?
268271
.iter()
269272
// this unwrap will most definateley shoot me in the foot later

server/src/handlers/http/query.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
7474

7575
if CONFIG.parseable.mode == Mode::Query {
7676
if let Ok(new_schema) = fetch_schema(&table_name).await {
77+
// commit schema merges the schema internally and updates the schema in storage.
7778
commit_schema_to_storage(&table_name, new_schema.clone())
7879
.await
7980
.map_err(QueryError::ObjectStorage)?;

server/src/metadata.rs

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use std::sync::{Arc, RwLock};
2626

2727
use crate::alerts::Alerts;
2828
use crate::metrics::{EVENTS_INGESTED, EVENTS_INGESTED_SIZE};
29-
use crate::storage::{ObjectStorage, StorageDir};
29+
use crate::storage::{LogStream, ObjectStorage, StorageDir};
3030
use crate::utils::arrow::MergedRecordReader;
3131

3232
use self::error::stream_info::{CheckAlertError, LoadError, MetadataError};
@@ -208,32 +208,41 @@ impl StreamInfo {
208208
// return error in case of an error from object storage itself.
209209

210210
for stream in storage.list_streams().await? {
211-
let alerts = storage.get_alerts(&stream.name).await?;
212-
let schema = storage.get_schema_on_server_start(&stream.name).await?;
213-
let meta = storage.get_stream_metadata(&stream.name).await?;
214-
215-
let schema = update_schema_from_staging(&stream.name, schema);
216-
let schema = HashMap::from_iter(
217-
schema
218-
.fields
219-
.iter()
220-
.map(|v| (v.name().to_owned(), v.clone())),
221-
);
222-
223-
let metadata = LogStreamMetadata {
224-
schema,
225-
alerts,
226-
cache_enabled: meta.cache_enabled,
227-
created_at: meta.created_at,
228-
first_event_at: meta.first_event_at,
229-
time_partition: meta.time_partition,
230-
static_schema_flag: meta.static_schema_flag,
231-
};
232-
233-
let mut map = self.write().expect(LOCK_EXPECT);
234-
235-
map.insert(stream.name, metadata);
211+
self.upsert_stream_info(storage, stream).await?;
236212
}
213+
Ok(())
214+
}
215+
216+
pub async fn upsert_stream_info(
217+
&self,
218+
storage: &(impl ObjectStorage + ?Sized),
219+
stream: LogStream,
220+
) -> Result<(), LoadError> {
221+
let alerts = storage.get_alerts(&stream.name).await?;
222+
let schema = storage.get_schema_on_server_start(&stream.name).await?;
223+
let meta = storage.get_stream_metadata(&stream.name).await?;
224+
225+
let schema = update_schema_from_staging(&stream.name, schema);
226+
let schema = HashMap::from_iter(
227+
schema
228+
.fields
229+
.iter()
230+
.map(|v| (v.name().to_owned(), v.clone())),
231+
);
232+
233+
let metadata = LogStreamMetadata {
234+
schema,
235+
alerts,
236+
cache_enabled: meta.cache_enabled,
237+
created_at: meta.created_at,
238+
first_event_at: meta.first_event_at,
239+
time_partition: meta.time_partition,
240+
static_schema_flag: meta.static_schema_flag,
241+
};
242+
243+
let mut map = self.write().expect(LOCK_EXPECT);
244+
245+
map.insert(stream.name, metadata);
237246

238247
Ok(())
239248
}

server/src/option.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,15 @@ pub struct Config {
4242

4343
impl Config {
4444
fn new() -> Self {
45-
let cli = create_parseable_cli_command().get_matches();
45+
let cli = create_parseable_cli_command()
46+
.name("Parseable")
47+
.about("A Cloud Native, log analytics platform")
48+
.before_help("Log Lake for the cloud-native world")
49+
.arg_required_else_help(true)
50+
.subcommand_required(true)
51+
.color(clap::ColorChoice::Always)
52+
.get_matches();
53+
4654
match cli.subcommand() {
4755
Some(("local-store", m)) => {
4856
let cli = match Cli::from_arg_matches(m) {
@@ -181,7 +189,8 @@ fn create_parseable_cli_command() -> Command {
181189
.next_line_help(false)
182190
.help_template(
183191
r#"
184-
{about} Join the community at https://logg.ing/community.
192+
{about}
193+
Join the community at https://logg.ing/community.
185194
186195
{all-args}
187196
"#,

server/src/storage.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ impl ObjectStoreFormat {
167167
}
168168
}
169169

170-
#[derive(serde::Serialize)]
170+
#[derive(serde::Serialize, PartialEq)]
171171
pub struct LogStream {
172172
pub name: String,
173173
}

server/src/storage/localfs.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ impl ObjectStorage for LocalFS {
193193
async fn get_objects(
194194
&self,
195195
base_path: Option<&RelativePath>,
196-
_starts_with_pattern: &str,
196+
filter_func: Box<(dyn Fn(String) -> bool + std::marker::Send + 'static)>,
197197
) -> Result<Vec<Bytes>, ObjectStorageError> {
198198
let time = Instant::now();
199199

@@ -206,13 +206,14 @@ impl ObjectStorage for LocalFS {
206206
let mut entries = fs::read_dir(&prefix).await?;
207207
let mut res = Vec::new();
208208
while let Some(entry) = entries.next_entry().await? {
209-
let ingester_file = entry
209+
let path = entry
210210
.path()
211211
.file_name()
212-
.unwrap_or_default()
212+
.unwrap()
213213
.to_str()
214-
.unwrap_or_default()
215-
.contains("ingester");
214+
.unwrap()
215+
.to_owned();
216+
let ingester_file = filter_func(path);
216217

217218
if !ingester_file {
218219
continue;

server/src/storage/object_storage.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ pub trait ObjectStorage: Sync + 'static {
6969
async fn get_objects(
7070
&self,
7171
base_path: Option<&RelativePath>,
72-
starts_with_pattern: &str,
72+
filter_fun: Box<dyn Fn(String) -> bool + Send>,
7373
) -> Result<Vec<Bytes>, ObjectStorageError>;
7474
async fn put_object(
7575
&self,

server/src/storage/s3.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -412,11 +412,10 @@ impl ObjectStorage for S3 {
412412
Ok(self._get_object(path).await?)
413413
}
414414

415-
// TBD is this the right way or the api calls are too many?
416415
async fn get_objects(
417416
&self,
418417
base_path: Option<&RelativePath>,
419-
starts_with_pattern: &str,
418+
filter_func: Box<dyn Fn(String) -> bool + Send>,
420419
) -> Result<Vec<Bytes>, ObjectStorageError> {
421420
let instant = Instant::now();
422421

@@ -431,11 +430,7 @@ impl ObjectStorage for S3 {
431430
let mut res = vec![];
432431

433432
while let Some(meta) = list_stream.next().await.transpose()? {
434-
let ingester_file = meta
435-
.location
436-
.filename()
437-
.unwrap()
438-
.starts_with(starts_with_pattern);
433+
let ingester_file = filter_func(meta.location.filename().unwrap().to_string());
439434

440435
if !ingester_file {
441436
continue;

0 commit comments

Comments
 (0)