Skip to content

Commit 4969b31

Browse files
fix: time partition limit (#792)
additional header to be provided X-P-Time-Partition-Limit with a value of unsigned integer with ending 'd'. eg. 90d for 90 days if not provided, default constraint of 30 days will be applied using this, user can ingest logs older than 30 days as well fixes #752
1 parent ebf2c16 commit 4969b31

File tree

9 files changed

+108
-28
lines changed

9 files changed

+108
-28
lines changed

server/src/event/format/json.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ impl EventFormat for Event {
4848
static_schema_flag: Option<String>,
4949
time_partition: Option<String>,
5050
) -> Result<(Self::Data, Vec<Arc<Field>>, bool, Tags, Metadata), anyhow::Error> {
51-
let data = flatten_json_body(self.data, None, false)?;
51+
let data = flatten_json_body(self.data, None, None, false)?;
5252
let stream_schema = schema;
5353

5454
// incoming event may be a single json or a json array

server/src/handlers.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ const PREFIX_META: &str = "x-p-meta-";
2424
const STREAM_NAME_HEADER_KEY: &str = "x-p-stream";
2525
const LOG_SOURCE_KEY: &str = "x-p-log-source";
2626
const TIME_PARTITION_KEY: &str = "x-p-time-partition";
27+
const TIME_PARTITION_LIMIT_KEY: &str = "x-p-time-partition-limit";
2728
const STATIC_SCHEMA_FLAG: &str = "x-p-static-schema-flag";
2829
const AUTHORIZATION_KEY: &str = "authorization";
2930
const SEPARATOR: char = '^';

server/src/handlers/http/ingest.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result
104104
.map_err(|_err| PostError::StreamNotFound(stream_name.clone()))?;
105105

106106
let time_partition = object_store_format.time_partition;
107+
let time_partition_limit = object_store_format.time_partition_limit;
107108
let static_schema_flag = object_store_format.static_schema_flag;
108109
let body_val: Value = serde_json::from_slice(&body)?;
109110
let size: usize = body.len();
@@ -129,7 +130,11 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result
129130
.process()
130131
.await?;
131132
} else {
132-
let data = convert_array_to_object(body_val.clone(), time_partition.clone())?;
133+
let data = convert_array_to_object(
134+
body_val.clone(),
135+
time_partition.clone(),
136+
time_partition_limit,
137+
)?;
133138
for value in data {
134139
let body_timestamp = value.get(&time_partition.clone().unwrap().to_string());
135140
parsed_timestamp = body_timestamp
@@ -210,6 +215,7 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr
210215
stream_name.to_string(),
211216
"",
212217
"",
218+
"",
213219
Arc::new(Schema::empty()),
214220
)
215221
.await?;

server/src/handlers/http/logstream.rs

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use super::base_path_without_preceding_slash;
2121
use super::cluster::fetch_stats_from_ingestors;
2222
use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats};
2323
use crate::alerts::Alerts;
24-
use crate::handlers::{STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY};
24+
use crate::handlers::{STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY};
2525
use crate::metadata::STREAM_INFO;
2626
use crate::option::{Mode, CONFIG};
2727
use crate::static_schema::{convert_static_schema_to_arrow_schema, StaticSchema};
@@ -40,6 +40,7 @@ use itertools::Itertools;
4040
use serde_json::Value;
4141
use std::collections::HashMap;
4242
use std::fs;
43+
use std::num::NonZeroU32;
4344
use std::sync::Arc;
4445

4546
pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
@@ -191,6 +192,29 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder,
191192
} else {
192193
""
193194
};
195+
let mut time_partition_in_days: &str = "";
196+
if let Some((_, time_partition_limit_name)) = req
197+
.headers()
198+
.iter()
199+
.find(|&(key, _)| key == TIME_PARTITION_LIMIT_KEY)
200+
{
201+
let time_partition_limit = time_partition_limit_name.to_str().unwrap();
202+
if !time_partition_limit.ends_with('d') {
203+
return Err(StreamError::Custom {
204+
msg: "missing 'd' suffix for duration value".to_string(),
205+
status: StatusCode::BAD_REQUEST,
206+
});
207+
}
208+
let days = &time_partition_limit[0..time_partition_limit.len() - 1];
209+
if days.parse::<NonZeroU32>().is_err() {
210+
return Err(StreamError::Custom {
211+
msg: "could not convert duration to an unsigned number".to_string(),
212+
status: StatusCode::BAD_REQUEST,
213+
});
214+
} else {
215+
time_partition_in_days = days;
216+
}
217+
}
194218
let static_schema_flag = if let Some((_, static_schema_flag)) = req
195219
.headers()
196220
.iter()
@@ -235,7 +259,14 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder,
235259
});
236260
}
237261

238-
create_stream(stream_name, time_partition, static_schema_flag, schema).await?;
262+
create_stream(
263+
stream_name,
264+
time_partition,
265+
time_partition_in_days,
266+
static_schema_flag,
267+
schema,
268+
)
269+
.await?;
239270

240271
Ok(("log stream created", StatusCode::OK))
241272
}
@@ -516,6 +547,7 @@ fn remove_id_from_alerts(value: &mut Value) {
516547
pub async fn create_stream(
517548
stream_name: String,
518549
time_partition: &str,
550+
time_partition_limit: &str,
519551
static_schema_flag: &str,
520552
schema: Arc<Schema>,
521553
) -> Result<(), CreateStreamError> {
@@ -528,6 +560,7 @@ pub async fn create_stream(
528560
.create_stream(
529561
&stream_name,
530562
time_partition,
563+
time_partition_limit,
531564
static_schema_flag,
532565
schema.clone(),
533566
)
@@ -557,6 +590,7 @@ pub async fn create_stream(
557590
stream_name.to_string(),
558591
created_at,
559592
time_partition.to_string(),
593+
time_partition_limit.to_string(),
560594
static_schema_flag.to_string(),
561595
static_schema,
562596
);
@@ -595,6 +629,7 @@ pub async fn get_stream_info(req: HttpRequest) -> Result<impl Responder, StreamE
595629
created_at: stream_meta.created_at.clone(),
596630
first_event_at: stream_meta.first_event_at.clone(),
597631
time_partition: stream_meta.time_partition.clone(),
632+
time_partition_limit: stream_meta.time_partition_limit.clone(),
598633
cache_enabled: stream_meta.cache_enabled,
599634
static_schema_flag: stream_meta.static_schema_flag.clone(),
600635
};

server/src/metadata.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,11 @@ use once_cell::sync::Lazy;
2424
use std::collections::HashMap;
2525
use std::sync::{Arc, RwLock};
2626

27+
use self::error::stream_info::{CheckAlertError, LoadError, MetadataError};
2728
use crate::alerts::Alerts;
2829
use crate::metrics::{EVENTS_INGESTED, EVENTS_INGESTED_SIZE};
2930
use crate::storage::{LogStream, ObjectStorage, StorageDir};
3031
use crate::utils::arrow::MergedRecordReader;
31-
32-
use self::error::stream_info::{CheckAlertError, LoadError, MetadataError};
3332
use derive_more::{Deref, DerefMut};
3433

3534
// TODO: make return type be of 'static lifetime instead of cloning
@@ -47,6 +46,7 @@ pub struct LogStreamMetadata {
4746
pub created_at: String,
4847
pub first_event_at: Option<String>,
4948
pub time_partition: Option<String>,
49+
pub time_partition_limit: Option<String>,
5050
pub static_schema_flag: Option<String>,
5151
}
5252

@@ -166,6 +166,7 @@ impl StreamInfo {
166166
stream_name: String,
167167
created_at: String,
168168
time_partition: String,
169+
time_partition_limit: String,
169170
static_schema_flag: String,
170171
static_schema: HashMap<String, Arc<Field>>,
171172
) {
@@ -181,6 +182,11 @@ impl StreamInfo {
181182
} else {
182183
Some(time_partition)
183184
},
185+
time_partition_limit: if time_partition_limit.is_empty() {
186+
None
187+
} else {
188+
Some(time_partition_limit)
189+
},
184190
static_schema_flag: if static_schema_flag != "true" {
185191
None
186192
} else {
@@ -237,6 +243,7 @@ impl StreamInfo {
237243
created_at: meta.created_at,
238244
first_event_at: meta.first_event_at,
239245
time_partition: meta.time_partition,
246+
time_partition_limit: meta.time_partition_limit,
240247
static_schema_flag: meta.static_schema_flag,
241248
};
242249

server/src/storage.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,15 @@ mod s3;
3030
pub mod staging;
3131
mod store_metadata;
3232

33+
use self::retention::Retention;
34+
pub use self::staging::StorageDir;
3335
pub use localfs::FSConfig;
3436
pub use object_storage::{ObjectStorage, ObjectStorageProvider};
3537
pub use s3::S3Config;
3638
pub use store_metadata::{
3739
put_remote_metadata, put_staging_metadata, resolve_parseable_metadata, StorageMetadata,
3840
};
3941

40-
use self::retention::Retention;
41-
pub use self::staging::StorageDir;
42-
4342
// metadata file names in a Stream prefix
4443
pub const STREAM_METADATA_FILE_NAME: &str = ".stream.json";
4544
pub const PARSEABLE_METADATA_FILE_NAME: &str = ".parseable.json";
@@ -94,6 +93,8 @@ pub struct ObjectStoreFormat {
9493
#[serde(skip_serializing_if = "Option::is_none")]
9594
pub time_partition: Option<String>,
9695
#[serde(skip_serializing_if = "Option::is_none")]
96+
pub time_partition_limit: Option<String>,
97+
#[serde(skip_serializing_if = "Option::is_none")]
9798
pub static_schema_flag: Option<String>,
9899
}
99100

@@ -109,6 +110,8 @@ pub struct StreamInfo {
109110
#[serde(skip_serializing_if = "Option::is_none")]
110111
pub time_partition: Option<String>,
111112
#[serde(skip_serializing_if = "Option::is_none")]
113+
pub time_partition_limit: Option<String>,
114+
#[serde(skip_serializing_if = "Option::is_none")]
112115
pub static_schema_flag: Option<String>,
113116
}
114117

@@ -155,6 +158,7 @@ impl Default for ObjectStoreFormat {
155158
cache_enabled: false,
156159
retention: None,
157160
time_partition: None,
161+
time_partition_limit: None,
158162
static_schema_flag: None,
159163
}
160164
}

server/src/storage/object_storage.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ pub trait ObjectStorage: Sync + 'static {
127127
&self,
128128
stream_name: &str,
129129
time_partition: &str,
130+
time_partition_limit: &str,
130131
static_schema_flag: &str,
131132
schema: Arc<Schema>,
132133
) -> Result<(), ObjectStorageError> {
@@ -139,6 +140,11 @@ pub trait ObjectStorage: Sync + 'static {
139140
} else {
140141
format.time_partition = Some(time_partition.to_string());
141142
}
143+
if time_partition_limit.is_empty() {
144+
format.time_partition_limit = None;
145+
} else {
146+
format.time_partition_limit = Some(time_partition_limit.to_string());
147+
}
142148
if static_schema_flag != "true" {
143149
format.static_schema_flag = None;
144150
} else {

server/src/utils/json.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,24 @@ pub mod flatten;
2424
pub fn flatten_json_body(
2525
body: serde_json::Value,
2626
time_partition: Option<String>,
27+
time_partition_limit: Option<String>,
2728
validation_required: bool,
2829
) -> Result<Value, anyhow::Error> {
29-
flatten::flatten(body, "_", time_partition, validation_required)
30+
flatten::flatten(
31+
body,
32+
"_",
33+
time_partition,
34+
time_partition_limit,
35+
validation_required,
36+
)
3037
}
3138

3239
pub fn convert_array_to_object(
3340
body: Value,
3441
time_partition: Option<String>,
42+
time_partition_limit: Option<String>,
3543
) -> Result<Vec<Value>, anyhow::Error> {
36-
let data = flatten_json_body(body, time_partition, true)?;
44+
let data = flatten_json_body(body, time_partition, time_partition_limit, true)?;
3745
let value_arr = match data {
3846
Value::Array(arr) => arr,
3947
value @ Value::Object(_) => vec![value],

0 commit comments

Comments
 (0)