Skip to content

Commit c33d8e9

Browse files
feat: enhancement to allow custom partitions in ingestion (#795)
This PR allows custom partitions in ingestion to allow adding an optional header - X-P-Custom-Partition in stream creation API with comma separated values the custom-partition will be stored in the stream.json file at the time of ingestion, below validations are put in place - 1. all events in the batch have the custom partitions 2. max of 3 partitions are allowed 3. custom partition values should not have a '.' in the log event 4. custom partition value should not be empty in the log event Prefixes will be created in the alphabetical order of the custom partition fields sort order in the parquet will be p_timestamp/time_partition and the custom partition fields same sort order will be reflected in the manifest file
1 parent f1032cc commit c33d8e9

File tree

16 files changed

+506
-148
lines changed

16 files changed

+506
-148
lines changed

server/src/event.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,13 @@ mod writer;
2323
use arrow_array::RecordBatch;
2424
use arrow_schema::{Field, Fields, Schema};
2525
use itertools::Itertools;
26-
2726
use std::sync::Arc;
2827

2928
use self::error::EventError;
3029
pub use self::writer::STREAM_WRITERS;
3130
use crate::{handlers::http::ingest::PostError, metadata};
3231
use chrono::NaiveDateTime;
32+
use std::collections::HashMap;
3333

3434
pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
3535
pub const DEFAULT_TAGS_KEY: &str = "p_tags";
@@ -44,6 +44,7 @@ pub struct Event {
4444
pub is_first_event: bool,
4545
pub parsed_timestamp: NaiveDateTime,
4646
pub time_partition: Option<String>,
47+
pub custom_partition_values: HashMap<String, String>,
4748
}
4849

4950
// Events holds the schema related to a each event for a single log stream
@@ -55,6 +56,14 @@ impl Event {
5556
key = format!("{key}{parsed_timestamp_to_min}");
5657
}
5758

59+
if !self.custom_partition_values.is_empty() {
60+
let mut custom_partition_key = String::default();
61+
for (k, v) in self.custom_partition_values.iter().sorted_by_key(|v| v.0) {
62+
custom_partition_key = format!("{custom_partition_key}&{k}={v}");
63+
}
64+
key = format!("{key}{custom_partition_key}");
65+
}
66+
5867
let num_rows = self.rb.num_rows() as u64;
5968
if self.is_first_event {
6069
commit_schema(&self.stream_name, self.rb.schema())?;
@@ -65,6 +74,7 @@ impl Event {
6574
&key,
6675
self.rb.clone(),
6776
self.parsed_timestamp,
77+
self.custom_partition_values,
6878
)?;
6979

7080
metadata::STREAM_INFO.update_stats(
@@ -111,8 +121,15 @@ impl Event {
111121
schema_key: &str,
112122
rb: RecordBatch,
113123
parsed_timestamp: NaiveDateTime,
124+
custom_partition_values: HashMap<String, String>,
114125
) -> Result<(), EventError> {
115-
STREAM_WRITERS.append_to_local(stream_name, schema_key, rb, parsed_timestamp)?;
126+
STREAM_WRITERS.append_to_local(
127+
stream_name,
128+
schema_key,
129+
rb,
130+
parsed_timestamp,
131+
custom_partition_values,
132+
)?;
116133
Ok(())
117134
}
118135
}

server/src/event/format/json.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ impl EventFormat for Event {
4848
static_schema_flag: Option<String>,
4949
time_partition: Option<String>,
5050
) -> Result<(Self::Data, Vec<Arc<Field>>, bool, Tags, Metadata), anyhow::Error> {
51-
let data = flatten_json_body(self.data, None, None, false)?;
51+
let data = flatten_json_body(self.data, None, None, None, false)?;
5252
let stream_schema = schema;
5353

5454
// incoming event may be a single json or a json array

server/src/event/writer.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ impl Writer {
5353
schema_key: &str,
5454
rb: RecordBatch,
5555
parsed_timestamp: NaiveDateTime,
56+
custom_partition_values: HashMap<String, String>,
5657
) -> Result<(), StreamWriterError> {
5758
let rb = utils::arrow::replace_columns(
5859
rb.schema(),
@@ -61,8 +62,13 @@ impl Writer {
6162
&[Arc::new(get_timestamp_array(rb.num_rows()))],
6263
);
6364

64-
self.disk
65-
.push(stream_name, schema_key, &rb, parsed_timestamp)?;
65+
self.disk.push(
66+
stream_name,
67+
schema_key,
68+
&rb,
69+
parsed_timestamp,
70+
custom_partition_values,
71+
)?;
6672
self.mem.push(schema_key, rb);
6773
Ok(())
6874
}
@@ -84,6 +90,7 @@ impl WriterTable {
8490
schema_key: &str,
8591
record: RecordBatch,
8692
parsed_timestamp: NaiveDateTime,
93+
custom_partition_values: HashMap<String, String>,
8794
) -> Result<(), StreamWriterError> {
8895
let hashmap_guard = self.read().unwrap();
8996

@@ -95,6 +102,7 @@ impl WriterTable {
95102
schema_key,
96103
record,
97104
parsed_timestamp,
105+
custom_partition_values,
98106
)?;
99107
}
100108
None => {
@@ -149,6 +157,7 @@ impl WriterTable {
149157
schema_key,
150158
record,
151159
parsed_timestamp,
160+
custom_partition_values,
152161
)?;
153162
} else {
154163
writer.lock().unwrap().push_mem(stream_name, record)?;
@@ -157,7 +166,13 @@ impl WriterTable {
157166
None => {
158167
if CONFIG.parseable.mode != Mode::Query {
159168
let mut writer = Writer::default();
160-
writer.push(stream_name, schema_key, record, parsed_timestamp)?;
169+
writer.push(
170+
stream_name,
171+
schema_key,
172+
record,
173+
parsed_timestamp,
174+
custom_partition_values,
175+
)?;
161176
map.insert(stream_name.to_owned(), Mutex::new(writer));
162177
} else {
163178
let mut writer = Writer::default();

server/src/event/writer/file_writer.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ impl FileWriter {
4545
schema_key: &str,
4646
record: &RecordBatch,
4747
parsed_timestamp: NaiveDateTime,
48+
custom_partition_values: HashMap<String, String>,
4849
) -> Result<(), StreamWriterError> {
4950
match self.get_mut(schema_key) {
5051
Some(writer) => {
@@ -56,8 +57,13 @@ impl FileWriter {
5657
// entry is not present thus we create it
5758
None => {
5859
// this requires mutable borrow of the map so we drop this read lock and wait for write lock
59-
let (path, writer) =
60-
init_new_stream_writer_file(stream_name, schema_key, record, parsed_timestamp)?;
60+
let (path, writer) = init_new_stream_writer_file(
61+
stream_name,
62+
schema_key,
63+
record,
64+
parsed_timestamp,
65+
custom_partition_values,
66+
)?;
6167
self.insert(
6268
schema_key.to_owned(),
6369
ArrowWriter {
@@ -83,9 +89,10 @@ fn init_new_stream_writer_file(
8389
schema_key: &str,
8490
record: &RecordBatch,
8591
parsed_timestamp: NaiveDateTime,
92+
custom_partition_values: HashMap<String, String>,
8693
) -> Result<(PathBuf, StreamWriter<std::fs::File>), StreamWriterError> {
8794
let dir = StorageDir::new(stream_name);
88-
let path = dir.path_by_current_time(schema_key, parsed_timestamp);
95+
let path = dir.path_by_current_time(schema_key, parsed_timestamp, custom_partition_values);
8996
std::fs::create_dir_all(dir.data_path)?;
9097

9198
let file = OpenOptions::new().create(true).append(true).open(&path)?;

server/src/handlers.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ const STREAM_NAME_HEADER_KEY: &str = "x-p-stream";
2626
const LOG_SOURCE_KEY: &str = "x-p-log-source";
2727
const TIME_PARTITION_KEY: &str = "x-p-time-partition";
2828
const TIME_PARTITION_LIMIT_KEY: &str = "x-p-time-partition-limit";
29+
const CUSTOM_PARTITION_KEY: &str = "x-p-custom-partition";
2930
const STATIC_SCHEMA_FLAG: &str = "x-p-static-schema-flag";
3031
const AUTHORIZATION_KEY: &str = "authorization";
3132
const SEPARATOR: char = '^';

0 commit comments

Comments
 (0)