Skip to content

Commit 43a793d

Browse files
parmesantnikhilsinhaparseableDevdutt Shenoinitisht
authored
feat: split arrow conversion and sync (#1138)
Upload and conversion tasks are handled by a monitor task which checks whether these tasks have crossed a threshold or not. In case either of these tasks crosses a pre-defined threshold, a warning will be printed. --------- Co-authored-by: Nikhil Sinha <[email protected]> Co-authored-by: Devdutt Shenoi <[email protected]> Co-authored-by: Nitish Tiwari <[email protected]>
1 parent a71ce87 commit 43a793d

File tree

9 files changed

+459
-167
lines changed

9 files changed

+459
-167
lines changed

src/alerts/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ pub struct RollingWindow {
330330
// should always be "now"
331331
pub eval_end: String,
332332
// x minutes (5m)
333-
pub eval_frequency: u32,
333+
pub eval_frequency: u64,
334334
}
335335

336336
#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)]
@@ -641,7 +641,7 @@ impl AlertConfig {
641641
columns
642642
}
643643

644-
pub fn get_eval_frequency(&self) -> u32 {
644+
pub fn get_eval_frequency(&self) -> u64 {
645645
match &self.eval_type {
646646
EvalConfig::RollingWindow(rolling_window) => rolling_window.eval_frequency,
647647
}

src/handlers/http/logstream.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,6 @@ pub async fn list(req: HttpRequest) -> Result<impl Responder, StreamError> {
103103
.unwrap()
104104
.into_iter()
105105
.filter(|logstream| {
106-
warn!("logstream-\n{logstream:?}");
107-
108106
Users.authorize(key.clone(), Action::ListStream, Some(logstream), None)
109107
== crate::rbac::Response::Authorized
110108
})

src/handlers/http/modal/ingest_server.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,11 @@ impl ParseableServer for IngestServer {
203203
sync::run_local_sync().await;
204204
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
205205
sync::object_store_sync().await;
206+
let (
207+
mut remote_conversion_handler,
208+
mut remote_conversion_outbox,
209+
mut remote_conversion_inbox,
210+
) = sync::arrow_conversion().await;
206211

207212
tokio::spawn(airplane::server());
208213

@@ -219,12 +224,16 @@ impl ParseableServer for IngestServer {
219224
// actix server finished .. stop other threads and stop the server
220225
remote_sync_inbox.send(()).unwrap_or(());
221226
localsync_inbox.send(()).unwrap_or(());
227+
remote_conversion_inbox.send(()).unwrap_or(());
222228
if let Err(e) = localsync_handler.await {
223229
error!("Error joining remote_sync_handler: {:?}", e);
224230
}
225231
if let Err(e) = remote_sync_handler.await {
226232
error!("Error joining remote_sync_handler: {:?}", e);
227233
}
234+
if let Err(e) = remote_conversion_handler.await {
235+
error!("Error joining remote_conversion_handler: {:?}", e);
236+
}
228237
return e
229238
},
230239
_ = &mut localsync_outbox => {
@@ -238,6 +247,13 @@ impl ParseableServer for IngestServer {
238247
error!("Error joining remote_sync_handler: {:?}", e);
239248
}
240249
(remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await;
250+
},
251+
_ = &mut remote_conversion_outbox => {
252+
// remote_conversion failed, this is recoverable by just starting remote_conversion thread again
253+
if let Err(e) = remote_conversion_handler.await {
254+
error!("Error joining remote_conversion_handler: {:?}", e);
255+
}
256+
(remote_conversion_handler, remote_conversion_outbox, remote_conversion_inbox) = sync::arrow_conversion().await;
241257
}
242258

243259
}

src/handlers/http/modal/mod.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,19 @@ pub trait ParseableServer {
139139

140140
// Perform S3 sync and wait for completion
141141
info!("Starting data sync to S3...");
142-
if let Err(e) = CONFIG.storage().get_object_store().sync(true).await {
142+
143+
if let Err(e) = CONFIG.storage().get_object_store().conversion(true).await {
144+
warn!("Failed to convert arrow files to parquet. {:?}", e);
145+
} else {
146+
info!("Successfully converted arrow files to parquet.");
147+
}
148+
149+
if let Err(e) = CONFIG
150+
.storage()
151+
.get_object_store()
152+
.upload_files_from_staging()
153+
.await
154+
{
143155
warn!("Failed to sync local data with object store. {:?}", e);
144156
} else {
145157
info!("Successfully synced all data to S3.");

src/handlers/http/modal/server.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,11 @@ impl ParseableServer for Server {
134134
sync::run_local_sync().await;
135135
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
136136
sync::object_store_sync().await;
137-
137+
let (
138+
mut remote_conversion_handler,
139+
mut remote_conversion_outbox,
140+
mut remote_conversion_inbox,
141+
) = sync::arrow_conversion().await;
138142
if CONFIG.options.send_analytics {
139143
analytics::init_analytics_scheduler()?;
140144
}
@@ -152,12 +156,16 @@ impl ParseableServer for Server {
152156
// actix server finished .. stop other threads and stop the server
153157
remote_sync_inbox.send(()).unwrap_or(());
154158
localsync_inbox.send(()).unwrap_or(());
159+
remote_conversion_inbox.send(()).unwrap_or(());
155160
if let Err(e) = localsync_handler.await {
156161
error!("Error joining remote_sync_handler: {:?}", e);
157162
}
158163
if let Err(e) = remote_sync_handler.await {
159164
error!("Error joining remote_sync_handler: {:?}", e);
160165
}
166+
if let Err(e) = remote_conversion_handler.await {
167+
error!("Error joining remote_conversion_handler: {:?}", e);
168+
}
161169
return e
162170
},
163171
_ = &mut localsync_outbox => {
@@ -171,6 +179,13 @@ impl ParseableServer for Server {
171179
error!("Error joining remote_sync_handler: {:?}", e);
172180
}
173181
(remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await;
182+
},
183+
_ = &mut remote_conversion_outbox => {
184+
// remote_conversion failed, this is recoverable by just starting remote_conversion thread again
185+
if let Err(e) = remote_conversion_handler.await {
186+
error!("Error joining remote_conversion_handler: {:?}", e);
187+
}
188+
(remote_conversion_handler, remote_conversion_outbox, remote_conversion_inbox) = sync::arrow_conversion().await;
174189
}
175190

176191
};

src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ pub use handlers::http::modal::{
5656
use once_cell::sync::Lazy;
5757
use reqwest::{Client, ClientBuilder};
5858

59-
pub const STORAGE_UPLOAD_INTERVAL: u32 = 60;
59+
pub const STORAGE_CONVERSION_INTERVAL: u32 = 60;
60+
pub const STORAGE_UPLOAD_INTERVAL: u32 = 30;
6061

6162
// A single HTTP client for all outgoing HTTP requests from the parseable server
6263
static HTTP_CLIENT: Lazy<Client> = Lazy::new(|| {

src/staging/streams.rs

Lines changed: 55 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
use std::{
2121
collections::HashMap,
22-
fs::{remove_file, OpenOptions},
22+
fs::{remove_file, File, OpenOptions},
2323
path::{Path, PathBuf},
2424
process,
2525
sync::{Arc, Mutex, RwLock},
@@ -165,6 +165,11 @@ impl<'a> Stream<'a> {
165165
paths
166166
}
167167

168+
/// Groups arrow files which are to be included in one parquet
169+
///
170+
/// Excludes the arrow file being written for the current minute (data is still being written to that one)
171+
///
172+
/// Only includes ones starting from the previous minute
168173
pub fn arrow_files_grouped_exclude_time(
169174
&self,
170175
exclude: NaiveDateTime,
@@ -173,6 +178,8 @@ impl<'a> Stream<'a> {
173178
let mut grouped_arrow_file: HashMap<PathBuf, Vec<PathBuf>> = HashMap::new();
174179
let mut arrow_files = self.arrow_files();
175180

181+
// if the shutdown signal is false i.e. normal condition
182+
// don't keep the ones for the current minute
176183
if !shutdown_signal {
177184
arrow_files.retain(|path| {
178185
!path
@@ -215,6 +222,45 @@ impl<'a> Stream<'a> {
215222
.collect()
216223
}
217224

225+
pub fn schema_files(&self) -> Vec<PathBuf> {
226+
let Ok(dir) = self.data_path.read_dir() else {
227+
return vec![];
228+
};
229+
230+
dir.flatten()
231+
.map(|file| file.path())
232+
.filter(|file| file.extension().is_some_and(|ext| ext.eq("schema")))
233+
.collect()
234+
}
235+
236+
pub fn get_schemas_if_present(&self) -> Option<Vec<Schema>> {
237+
let Ok(dir) = self.data_path.read_dir() else {
238+
return None;
239+
};
240+
241+
let mut schemas: Vec<Schema> = Vec::new();
242+
243+
for file in dir.flatten() {
244+
if let Some(ext) = file.path().extension() {
245+
if ext.eq("schema") {
246+
let file = File::open(file.path()).expect("Schema File should exist");
247+
248+
let schema = match serde_json::from_reader(file) {
249+
Ok(schema) => schema,
250+
Err(_) => continue,
251+
};
252+
schemas.push(schema);
253+
}
254+
}
255+
}
256+
257+
if !schemas.is_empty() {
258+
Some(schemas)
259+
} else {
260+
None
261+
}
262+
}
263+
218264
fn arrow_path_to_parquet(path: &Path, random_string: &str) -> PathBuf {
219265
let filename = path.file_stem().unwrap().to_str().unwrap();
220266
let (_, filename) = filename.split_once('.').unwrap();
@@ -249,6 +295,9 @@ impl<'a> Stream<'a> {
249295
}
250296
}
251297

298+
/// This function reads arrow files, groups their schemas
299+
///
300+
/// converts them into parquet files and returns a merged schema
252301
pub fn convert_disk_files_to_parquet(
253302
&self,
254303
time_partition: Option<&String>,
@@ -272,12 +321,12 @@ impl<'a> Stream<'a> {
272321
}
273322

274323
// warn!("staging files-\n{staging_files:?}\n");
275-
for (parquet_path, files) in staging_files {
324+
for (parquet_path, arrow_files) in staging_files {
276325
metrics::STAGING_FILES
277326
.with_label_values(&[&self.stream_name])
278-
.set(files.len() as i64);
327+
.set(arrow_files.len() as i64);
279328

280-
for file in &files {
329+
for file in &arrow_files {
281330
let file_size = file.metadata().unwrap().len();
282331
let file_type = file.extension().unwrap().to_str().unwrap();
283332

@@ -286,7 +335,7 @@ impl<'a> Stream<'a> {
286335
.add(file_size as i64);
287336
}
288337

289-
let record_reader = MergedReverseRecordReader::try_new(&files);
338+
let record_reader = MergedReverseRecordReader::try_new(&arrow_files);
290339
if record_reader.readers.is_empty() {
291340
continue;
292341
}
@@ -319,7 +368,7 @@ impl<'a> Stream<'a> {
319368
);
320369
remove_file(parquet_path).unwrap();
321370
} else {
322-
for file in files {
371+
for file in arrow_files {
323372
// warn!("file-\n{file:?}\n");
324373
let file_size = file.metadata().unwrap().len();
325374
let file_type = file.extension().unwrap().to_str().unwrap();

0 commit comments

Comments
 (0)