Skip to content

Commit 6129992

Browse files
feat: add update stream API (#810)
user can update time partition limit and custom partition in the same API call * enhancement for update stream user can make below changes to the existing stream - 1. time partition limit 2. custom partition send extra header X-P-Update-Header=true to make the update
1 parent b92c77b commit 6129992

File tree

4 files changed

+278
-75
lines changed

4 files changed

+278
-75
lines changed

server/src/handlers.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ const CUSTOM_PARTITION_KEY: &str = "x-p-custom-partition";
3333
const STATIC_SCHEMA_FLAG: &str = "x-p-static-schema-flag";
3434
const AUTHORIZATION_KEY: &str = "authorization";
3535
const SEPARATOR: char = '^';
36-
36+
const UPDATE_STREAM_KEY: &str = "x-p-update-stream";
3737
const OIDC_SCOPE: &str = "openid profile email";
3838
const COOKIE_AGE_DAYS: usize = 7;
3939
const SESSION_COOKIE_NAME: &str = "session";

server/src/handlers/http/logstream.rs

Lines changed: 214 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use super::cluster::{fetch_stats_from_ingestors, INTERNAL_STREAM_NAME};
2323
use crate::alerts::Alerts;
2424
use crate::handlers::{
2525
CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY,
26+
UPDATE_STREAM_KEY,
2627
};
2728
use crate::metadata::STREAM_INFO;
2829
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
@@ -187,77 +188,166 @@ pub async fn get_alert(req: HttpRequest) -> Result<impl Responder, StreamError>
187188
}
188189

189190
pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder, StreamError> {
190-
let time_partition = if let Some((_, time_partition_name)) = req
191-
.headers()
192-
.iter()
193-
.find(|&(key, _)| key == TIME_PARTITION_KEY)
194-
{
195-
time_partition_name.to_str().unwrap()
196-
} else {
197-
""
198-
};
191+
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
192+
let (time_partition, time_partition_limit, custom_partition, static_schema_flag, update_stream) =
193+
fetch_headers_from_put_stream_request(&req);
194+
195+
if metadata::STREAM_INFO.stream_exists(&stream_name) && update_stream != "true" {
196+
// Error if the log stream already exists
197+
return Err(StreamError::Custom {
198+
msg: format!(
199+
"Logstream {stream_name} already exists, please create a new log stream with unique name"
200+
),
201+
status: StatusCode::BAD_REQUEST,
202+
});
203+
}
204+
205+
if !time_partition.is_empty() && update_stream == "true" {
206+
return Err(StreamError::Custom {
207+
msg: "Altering the time partition of an existing stream is restricted.".to_string(),
208+
status: StatusCode::BAD_REQUEST,
209+
});
210+
}
199211
let mut time_partition_in_days: &str = "";
200-
if let Some((_, time_partition_limit_name)) = req
201-
.headers()
202-
.iter()
203-
.find(|&(key, _)| key == TIME_PARTITION_LIMIT_KEY)
204-
{
205-
let time_partition_limit = time_partition_limit_name.to_str().unwrap();
206-
if !time_partition_limit.ends_with('d') {
207-
return Err(StreamError::Custom {
208-
msg: "missing 'd' suffix for duration value".to_string(),
209-
status: StatusCode::BAD_REQUEST,
210-
});
211-
}
212-
let days = &time_partition_limit[0..time_partition_limit.len() - 1];
213-
if days.parse::<NonZeroU32>().is_err() {
214-
return Err(StreamError::Custom {
215-
msg: "could not convert duration to an unsigned number".to_string(),
216-
status: StatusCode::BAD_REQUEST,
217-
});
212+
if !time_partition_limit.is_empty() {
213+
let time_partition_days = validate_time_partition_limit(&time_partition_limit);
214+
if let Err(err) = time_partition_days {
215+
return Err(StreamError::CreateStream(err));
218216
} else {
219-
time_partition_in_days = days;
217+
time_partition_in_days = time_partition_days.unwrap();
218+
if update_stream == "true" {
219+
if let Err(err) = update_time_partition_limit_in_stream(
220+
stream_name.clone(),
221+
time_partition_in_days,
222+
)
223+
.await
224+
{
225+
return Err(StreamError::CreateStream(err));
226+
}
227+
return Ok(("Log stream updated", StatusCode::OK));
228+
}
220229
}
221230
}
222-
let static_schema_flag = if let Some((_, static_schema_flag)) = req
223-
.headers()
224-
.iter()
225-
.find(|&(key, _)| key == STATIC_SCHEMA_FLAG)
226-
{
227-
static_schema_flag.to_str().unwrap()
228-
} else {
229-
""
230-
};
231-
let mut custom_partition: &str = "";
232-
if let Some((_, custom_partition_key)) = req
233-
.headers()
234-
.iter()
235-
.find(|&(key, _)| key == CUSTOM_PARTITION_KEY)
236-
{
237-
custom_partition = custom_partition_key.to_str().unwrap();
238-
let custom_partition_list = custom_partition.split(',').collect::<Vec<&str>>();
239-
if custom_partition_list.len() > 3 {
240-
return Err(StreamError::Custom {
241-
msg: "maximum 3 custom partition keys are supported".to_string(),
242-
status: StatusCode::BAD_REQUEST,
243-
});
231+
232+
if !static_schema_flag.is_empty() && update_stream == "true" {
233+
return Err(StreamError::Custom {
234+
msg: "Altering the schema of an existing stream is restricted.".to_string(),
235+
status: StatusCode::BAD_REQUEST,
236+
});
237+
}
238+
239+
if !custom_partition.is_empty() {
240+
if let Err(err) = validate_custom_partition(&custom_partition) {
241+
return Err(StreamError::CreateStream(err));
242+
}
243+
if update_stream == "true" {
244+
if let Err(err) =
245+
update_custom_partition_in_stream(stream_name.clone(), &custom_partition).await
246+
{
247+
return Err(StreamError::CreateStream(err));
248+
}
249+
return Ok(("Log stream updated", StatusCode::OK));
244250
}
245251
}
246252

247-
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
248-
let mut schema = Arc::new(Schema::empty());
249-
if metadata::STREAM_INFO.stream_exists(&stream_name) {
250-
// Error if the log stream already exists
251-
return Err(StreamError::Custom {
252-
msg: format!(
253-
"logstream {stream_name} already exists, please create a new log stream with unique name"
254-
),
253+
let schema = validate_static_schema(
254+
&body,
255+
&stream_name,
256+
&time_partition,
257+
&custom_partition,
258+
&static_schema_flag,
259+
);
260+
if let Err(err) = schema {
261+
return Err(StreamError::CreateStream(err));
262+
}
263+
264+
create_stream(
265+
stream_name,
266+
&time_partition,
267+
time_partition_in_days,
268+
&custom_partition,
269+
&static_schema_flag,
270+
schema.unwrap(),
271+
)
272+
.await?;
273+
274+
Ok(("Log stream created", StatusCode::OK))
275+
}
276+
277+
fn fetch_headers_from_put_stream_request(
278+
req: &HttpRequest,
279+
) -> (String, String, String, String, String) {
280+
let mut time_partition = String::default();
281+
let mut time_partition_limit = String::default();
282+
let mut custom_partition = String::default();
283+
let mut static_schema_flag = String::default();
284+
let mut update_stream = String::default();
285+
req.headers().iter().for_each(|(key, value)| {
286+
if key == TIME_PARTITION_KEY {
287+
time_partition = value.to_str().unwrap().to_string();
288+
}
289+
if key == TIME_PARTITION_LIMIT_KEY {
290+
time_partition_limit = value.to_str().unwrap().to_string();
291+
}
292+
if key == CUSTOM_PARTITION_KEY {
293+
custom_partition = value.to_str().unwrap().to_string();
294+
}
295+
if key == STATIC_SCHEMA_FLAG {
296+
static_schema_flag = value.to_str().unwrap().to_string();
297+
}
298+
if key == UPDATE_STREAM_KEY {
299+
update_stream = value.to_str().unwrap().to_string();
300+
}
301+
});
302+
303+
(
304+
time_partition,
305+
time_partition_limit,
306+
custom_partition,
307+
static_schema_flag,
308+
update_stream,
309+
)
310+
}
311+
312+
fn validate_time_partition_limit(time_partition_limit: &str) -> Result<&str, CreateStreamError> {
313+
if !time_partition_limit.ends_with('d') {
314+
return Err(CreateStreamError::Custom {
315+
msg: "Missing 'd' suffix for duration value".to_string(),
255316
status: StatusCode::BAD_REQUEST,
256317
});
257318
}
319+
let days = &time_partition_limit[0..time_partition_limit.len() - 1];
320+
if days.parse::<NonZeroU32>().is_err() {
321+
return Err(CreateStreamError::Custom {
322+
msg: "Could not convert duration to an unsigned number".to_string(),
323+
status: StatusCode::BAD_REQUEST,
324+
});
325+
}
326+
327+
Ok(days)
328+
}
258329

330+
fn validate_custom_partition(custom_partition: &str) -> Result<(), CreateStreamError> {
331+
let custom_partition_list = custom_partition.split(',').collect::<Vec<&str>>();
332+
if custom_partition_list.len() > 3 {
333+
return Err(CreateStreamError::Custom {
334+
msg: "Maximum 3 custom partition keys are supported".to_string(),
335+
status: StatusCode::BAD_REQUEST,
336+
});
337+
}
338+
Ok(())
339+
}
340+
341+
fn validate_static_schema(
342+
body: &Bytes,
343+
stream_name: &str,
344+
time_partition: &str,
345+
custom_partition: &str,
346+
static_schema_flag: &str,
347+
) -> Result<Arc<Schema>, CreateStreamError> {
348+
let mut schema = Arc::new(Schema::empty());
259349
if !body.is_empty() && static_schema_flag == "true" {
260-
let static_schema: StaticSchema = serde_json::from_slice(&body)?;
350+
let static_schema: StaticSchema = serde_json::from_slice(body)?;
261351

262352
let parsed_schema = convert_static_schema_to_arrow_schema(
263353
static_schema.clone(),
@@ -267,31 +357,21 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder,
267357
if let Ok(parsed_schema) = parsed_schema {
268358
schema = parsed_schema;
269359
} else {
270-
return Err(StreamError::Custom {
271-
msg: format!("unable to commit static schema, logstream {stream_name} not created"),
360+
return Err(CreateStreamError::Custom {
361+
msg: format!("Unable to commit static schema, logstream {stream_name} not created"),
272362
status: StatusCode::BAD_REQUEST,
273363
});
274364
}
275365
} else if body.is_empty() && static_schema_flag == "true" {
276-
return Err(StreamError::Custom {
366+
return Err(CreateStreamError::Custom {
277367
msg: format!(
278-
"please provide schema in the request body for static schema logstream {stream_name}"
368+
"Please provide schema in the request body for static schema logstream {stream_name}"
279369
),
280370
status: StatusCode::BAD_REQUEST,
281371
});
282372
}
283373

284-
create_stream(
285-
stream_name,
286-
time_partition,
287-
time_partition_in_days,
288-
custom_partition,
289-
static_schema_flag,
290-
schema,
291-
)
292-
.await?;
293-
294-
Ok(("log stream created", StatusCode::OK))
374+
Ok(schema)
295375
}
296376

297377
pub async fn put_alert(
@@ -626,6 +706,56 @@ fn remove_id_from_alerts(value: &mut Value) {
626706
}
627707
}
628708

709+
pub async fn update_time_partition_limit_in_stream(
710+
stream_name: String,
711+
time_partition_limit: &str,
712+
) -> Result<(), CreateStreamError> {
713+
let storage = CONFIG.storage().get_object_store();
714+
if let Err(err) = storage
715+
.update_time_partition_limit_in_stream(&stream_name, time_partition_limit)
716+
.await
717+
{
718+
return Err(CreateStreamError::Storage { stream_name, err });
719+
}
720+
721+
if metadata::STREAM_INFO
722+
.update_time_partition_limit(&stream_name, time_partition_limit.to_string())
723+
.is_err()
724+
{
725+
return Err(CreateStreamError::Custom {
726+
msg: "failed to update time partition limit in metadata".to_string(),
727+
status: StatusCode::EXPECTATION_FAILED,
728+
});
729+
}
730+
731+
Ok(())
732+
}
733+
734+
pub async fn update_custom_partition_in_stream(
735+
stream_name: String,
736+
custom_partition: &str,
737+
) -> Result<(), CreateStreamError> {
738+
let storage = CONFIG.storage().get_object_store();
739+
if let Err(err) = storage
740+
.update_custom_partition_in_stream(&stream_name, custom_partition)
741+
.await
742+
{
743+
return Err(CreateStreamError::Storage { stream_name, err });
744+
}
745+
746+
if metadata::STREAM_INFO
747+
.update_custom_partition(&stream_name, custom_partition.to_string())
748+
.is_err()
749+
{
750+
return Err(CreateStreamError::Custom {
751+
msg: "failed to update custom partition in metadata".to_string(),
752+
status: StatusCode::EXPECTATION_FAILED,
753+
});
754+
}
755+
756+
Ok(())
757+
}
758+
629759
pub async fn create_stream(
630760
stream_name: String,
631761
time_partition: &str,
@@ -757,6 +887,10 @@ pub mod error {
757887
stream_name: String,
758888
err: ObjectStorageError,
759889
},
890+
#[error("{msg}")]
891+
Custom { msg: String, status: StatusCode },
892+
#[error("Could not deserialize into JSON object, {0}")]
893+
SerdeError(#[from] serde_json::Error),
760894
}
761895

762896
#[derive(Debug, thiserror::Error)]
@@ -809,6 +943,12 @@ pub mod error {
809943
StreamError::CreateStream(CreateStreamError::Storage { .. }) => {
810944
StatusCode::INTERNAL_SERVER_ERROR
811945
}
946+
StreamError::CreateStream(CreateStreamError::Custom { .. }) => {
947+
StatusCode::INTERNAL_SERVER_ERROR
948+
}
949+
StreamError::CreateStream(CreateStreamError::SerdeError(_)) => {
950+
StatusCode::BAD_REQUEST
951+
}
812952
StreamError::CacheNotEnabled(_) => StatusCode::BAD_REQUEST,
813953
StreamError::StreamNotFound(_) => StatusCode::NOT_FOUND,
814954
StreamError::Custom { status, .. } => *status,

server/src/metadata.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,33 @@ impl StreamInfo {
171171
metadata.first_event_at = first_event_at;
172172
})
173173
}
174+
175+
pub fn update_time_partition_limit(
176+
&self,
177+
stream_name: &str,
178+
time_partition_limit: String,
179+
) -> Result<(), MetadataError> {
180+
let mut map = self.write().expect(LOCK_EXPECT);
181+
map.get_mut(stream_name)
182+
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
183+
.map(|metadata| {
184+
metadata.time_partition_limit = Some(time_partition_limit);
185+
})
186+
}
187+
188+
pub fn update_custom_partition(
189+
&self,
190+
stream_name: &str,
191+
custom_partition: String,
192+
) -> Result<(), MetadataError> {
193+
let mut map = self.write().expect(LOCK_EXPECT);
194+
map.get_mut(stream_name)
195+
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
196+
.map(|metadata| {
197+
metadata.custom_partition = Some(custom_partition);
198+
})
199+
}
200+
174201
#[allow(clippy::too_many_arguments)]
175202
pub fn add_stream(
176203
&self,

0 commit comments

Comments
 (0)