Skip to content

Commit 4775fb1

Browse files
author
Devdutt Shenoi
authored
test + refactor: staging (#1129)
1 parent 986749b commit 4775fb1

29 files changed

+1161
-1046
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
target
22
data*
3-
staging/
3+
staging/*
44
limitcache
55
examples
66
cert.pem

Cargo.lock

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ zip = { version = "2.2.0", default-features = false, features = ["deflate"] }
129129
[dev-dependencies]
130130
rstest = "0.23.0"
131131
arrow = "53.0.0"
132+
temp-dir = "0.1.14"
132133

133134
[package.metadata.parseable_ui]
134135
assets-url = "https://github.com/parseablehq/console/releases/download/v0.9.18/build.zip"

src/cli.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ pub struct BlobStoreArgs {
102102
pub storage: AzureBlobConfig,
103103
}
104104

105-
#[derive(Parser, Debug)]
105+
#[derive(Parser, Debug, Default)]
106106
pub struct Options {
107107
// Authentication
108108
#[arg(long, env = "P_USERNAME", help = "Admin username to be set for this Parseable server", default_value = DEFAULT_USERNAME)]
@@ -283,7 +283,7 @@ pub struct Options {
283283
pub ingestor_endpoint: String,
284284

285285
#[command(flatten)]
286-
oidc: Option<OidcConfig>,
286+
pub oidc: Option<OidcConfig>,
287287

288288
// Kafka configuration (conditionally compiled)
289289
#[cfg(any(

src/event/mod.rs

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,14 @@
1818
*/
1919

2020
pub mod format;
21-
mod writer;
2221

2322
use arrow_array::RecordBatch;
2423
use arrow_schema::{Field, Fields, Schema};
2524
use itertools::Itertools;
2625
use std::sync::Arc;
2726

2827
use self::error::EventError;
29-
pub use self::writer::STREAM_WRITERS;
30-
use crate::{metadata, storage::StreamType};
28+
use crate::{metadata, staging::STAGING, storage::StreamType};
3129
use chrono::NaiveDateTime;
3230
use std::collections::HashMap;
3331

@@ -52,36 +50,32 @@ impl Event {
5250
let mut key = get_schema_key(&self.rb.schema().fields);
5351
if self.time_partition.is_some() {
5452
let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string();
55-
key = format!("{key}{parsed_timestamp_to_min}");
53+
key.push_str(&parsed_timestamp_to_min);
5654
}
5755

5856
if !self.custom_partition_values.is_empty() {
59-
let mut custom_partition_key = String::default();
6057
for (k, v) in self.custom_partition_values.iter().sorted_by_key(|v| v.0) {
61-
custom_partition_key = format!("{custom_partition_key}&{k}={v}");
58+
key.push_str(&format!("&{k}={v}"));
6259
}
63-
key = format!("{key}{custom_partition_key}");
6460
}
6561

66-
let num_rows = self.rb.num_rows() as u64;
6762
if self.is_first_event {
6863
commit_schema(&self.stream_name, self.rb.schema())?;
6964
}
7065

71-
STREAM_WRITERS.append_to_local(
72-
&self.stream_name,
66+
STAGING.get_or_create_stream(&self.stream_name).push(
7367
&key,
7468
&self.rb,
7569
self.parsed_timestamp,
7670
&self.custom_partition_values,
77-
&self.stream_type,
71+
self.stream_type,
7872
)?;
7973

8074
metadata::STREAM_INFO.update_stats(
8175
&self.stream_name,
8276
self.origin_format,
8377
self.origin_size,
84-
num_rows,
78+
self.rb.num_rows(),
8579
self.parsed_timestamp,
8680
)?;
8781

@@ -93,21 +87,16 @@ impl Event {
9387
pub fn process_unchecked(&self) -> Result<(), EventError> {
9488
let key = get_schema_key(&self.rb.schema().fields);
9589

96-
STREAM_WRITERS.append_to_local(
97-
&self.stream_name,
90+
STAGING.get_or_create_stream(&self.stream_name).push(
9891
&key,
9992
&self.rb,
10093
self.parsed_timestamp,
10194
&self.custom_partition_values,
102-
&self.stream_type,
95+
self.stream_type,
10396
)?;
10497

10598
Ok(())
10699
}
107-
108-
pub fn clear(&self, stream_name: &str) {
109-
STREAM_WRITERS.clear(stream_name);
110-
}
111100
}
112101

113102
pub fn get_schema_key(fields: &[Arc<Field>]) -> String {
@@ -138,14 +127,13 @@ pub mod error {
138127
use arrow_schema::ArrowError;
139128

140129
use crate::metadata::error::stream_info::MetadataError;
130+
use crate::staging::StagingError;
141131
use crate::storage::ObjectStorageError;
142132

143-
use super::writer::errors::StreamWriterError;
144-
145133
#[derive(Debug, thiserror::Error)]
146134
pub enum EventError {
147135
#[error("Stream Writer Failed: {0}")]
148-
StreamWriter(#[from] StreamWriterError),
136+
StreamWriter(#[from] StagingError),
149137
#[error("Metadata Error: {0}")]
150138
Metadata(#[from] MetadataError),
151139
#[error("Stream Writer Failed: {0}")]

src/event/writer/file_writer.rs

Lines changed: 0 additions & 94 deletions
This file was deleted.

0 commit comments

Comments
 (0)