Skip to content

Commit 9396d9b

Browse files
feat: detect schema for a log event (#966)
Server can generate schema for a sample log event before creating stream. This helps user create the static schema New API endpoint `POST /logstream/schema/detect` with body as a json object returns schema. Also supports json array input. eg. for request json - ``` { "datetime": "2024-10-21T05:40:58.280Z", "b": 2.0, "c": 1, "host": "backend", "a": false, "id": "duzrixscdpavbdgc", "message": "Tom is interested in mathematics.", "method": "GET", "p_metadata": "state=fatal", "p_tags": "environment=development", "p_timestamp": "2024-10-15T14:00:00+05:30", "referrer": "http://www.facebook.com/", "status": 404, "user-identifier": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36 OPR/91.0.4516.20" } ``` API returns ``` { "fields": [ { "name": "datetime", "data_type": { "Timestamp": [ "Millisecond", null ] }, "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} }, { "name": "b", "data_type": "Float64", "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} }, { "name": "c", "data_type": "Int64", "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} }, { "name": "host", "data_type": "Utf8", "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} }, { "name": "a", "data_type": "Boolean", "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} }, { "name": "id", "data_type": "Utf8", "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} }, { "name": "message", "data_type": "Utf8", "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} }, { "name": "method", "data_type": "Utf8", "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} }, { "name": "p_metadata", "data_type": "Utf8", "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} }, { "name": "p_tags", "data_type": "Utf8", "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} }, { "name": "p_timestamp", "data_type": { "Timestamp": [ "Millisecond", null ] }, "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} }, { "name": "referrer", "data_type": "Utf8", "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} }, { "name": "status", "data_type": "Int64", "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} }, { "name": "user-identifier", "data_type": "Utf8", "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} } ], "metadata": {} } ``` console then asks user to confirm this schema or change as per his requirement once confirmed, he can create stream with the static schema
1 parent d39e2f1 commit 9396d9b

File tree

5 files changed

+76
-0
lines changed

5 files changed

+76
-0
lines changed

server/src/event/format.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ use std::{collections::HashMap, sync::Arc};
2222
use anyhow::{anyhow, Error as AnyError};
2323
use arrow_array::{RecordBatch, StringArray};
2424
use arrow_schema::{DataType, Field, Schema, TimeUnit};
25+
use chrono::DateTime;
26+
use serde_json::Value;
2527

2628
use crate::utils::{self, arrow::get_field};
2729

@@ -171,3 +173,30 @@ pub fn update_field_type_in_schema(
171173
.collect();
172174
Arc::new(Schema::new(new_schema))
173175
}
176+
177+
pub fn update_data_type_to_datetime(schema: Schema, value: Value) -> Schema {
178+
let new_schema: Vec<Field> = schema
179+
.fields()
180+
.iter()
181+
.map(|field| {
182+
if field.data_type() == &DataType::Utf8 {
183+
if let Value::Object(map) = &value {
184+
if let Some(Value::String(s)) = map.get(field.name()) {
185+
if DateTime::parse_from_rfc3339(s).is_ok() {
186+
// Update the field's data type to Timestamp
187+
return Field::new(
188+
field.name().clone(),
189+
DataType::Timestamp(TimeUnit::Millisecond, None),
190+
true,
191+
);
192+
}
193+
}
194+
}
195+
}
196+
// Return the original field if no update is needed
197+
Field::new(field.name(), field.data_type().clone(), true)
198+
})
199+
.collect();
200+
201+
Schema::new(new_schema)
202+
}

server/src/handlers/http/logstream.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use super::cluster::{sync_streams_with_ingestors, INTERNAL_STREAM_NAME};
2222
use super::ingest::create_stream_if_not_exists;
2323
use super::modal::utils::logstream_utils::create_update_stream;
2424
use crate::alerts::Alerts;
25+
use crate::event::format::update_data_type_to_datetime;
2526
use crate::handlers::STREAM_TYPE_KEY;
2627
use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
2728
use crate::metadata::STREAM_INFO;
@@ -36,6 +37,7 @@ use crate::{metadata, validator};
3637
use actix_web::http::header::{self, HeaderMap};
3738
use actix_web::http::StatusCode;
3839
use actix_web::{web, HttpRequest, Responder};
40+
use arrow_json::reader::infer_json_schema_from_iterator;
3941
use arrow_schema::{Field, Schema};
4042
use bytes::Bytes;
4143
use chrono::Utc;
@@ -89,6 +91,26 @@ pub async fn list(_: HttpRequest) -> impl Responder {
8991
web::Json(res)
9092
}
9193

94+
pub async fn detect_schema(body: Bytes) -> Result<impl Responder, StreamError> {
95+
let body_val: Value = serde_json::from_slice(&body)?;
96+
let value_arr: Vec<Value> = match body_val {
97+
Value::Array(arr) => arr,
98+
value @ Value::Object(_) => vec![value],
99+
_ => {
100+
return Err(StreamError::Custom {
101+
msg: "please send json events as part of the request".to_string(),
102+
status: StatusCode::BAD_REQUEST,
103+
})
104+
}
105+
};
106+
107+
let mut schema = infer_json_schema_from_iterator(value_arr.iter().map(Ok)).unwrap();
108+
for value in value_arr {
109+
schema = update_data_type_to_datetime(schema, value);
110+
}
111+
Ok((web::Json(schema), StatusCode::OK))
112+
}
113+
92114
pub async fn schema(req: HttpRequest) -> Result<impl Responder, StreamError> {
93115
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
94116
let schema = STREAM_INFO.schema(&stream_name)?;

server/src/handlers/http/modal/query_server.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,17 @@ impl QueryServer {
258258
web::resource("")
259259
.route(web::get().to(logstream::list).authorize(Action::ListStream)),
260260
)
261+
.service(
262+
web::scope("/schema/detect").service(
263+
web::resource("")
264+
// PUT "/logstream/{logstream}" ==> Create log stream
265+
.route(
266+
web::post()
267+
.to(logstream::detect_schema)
268+
.authorize(Action::DetectSchema),
269+
),
270+
),
271+
)
261272
.service(
262273
web::scope("/{logstream}")
263274
.service(

server/src/handlers/http/modal/server.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,17 @@ impl Server {
292292
web::resource("")
293293
.route(web::get().to(logstream::list).authorize(Action::ListStream)),
294294
)
295+
.service(
296+
web::scope("/schema/detect").service(
297+
web::resource("")
298+
// PUT "/logstream/{logstream}" ==> Create log stream
299+
.route(
300+
web::post()
301+
.to(logstream::detect_schema)
302+
.authorize(Action::DetectSchema),
303+
),
304+
),
305+
)
295306
.service(
296307
web::scope("/{logstream}")
297308
.service(

server/src/rbac/role.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ pub enum Action {
2525
CreateStream,
2626
ListStream,
2727
GetStreamInfo,
28+
DetectSchema,
2829
GetSchema,
2930
GetStats,
3031
DeleteStream,
@@ -140,6 +141,7 @@ impl RoleBuilder {
140141
| Action::GetAnalytics => Permission::Unit(action),
141142
Action::Ingest
142143
| Action::GetSchema
144+
| Action::DetectSchema
143145
| Action::GetStats
144146
| Action::GetRetention
145147
| Action::PutRetention
@@ -214,6 +216,7 @@ pub mod model {
214216
Action::DeleteStream,
215217
Action::ListStream,
216218
Action::GetStreamInfo,
219+
Action::DetectSchema,
217220
Action::GetSchema,
218221
Action::GetStats,
219222
Action::GetRetention,

0 commit comments

Comments
 (0)