Skip to content

Commit f240dde

Browse files
bug: reject event if fields count exceed 250 (#1311)
Also remove other_attributes from otel logs/traces/metrics. We keep all attributes as individual columns in the ingested event. If total column count in flattened event > the allowed limit log the error, and reject the event Fixes: #1310 Signed-off-by: Nikhil Sinha <[email protected]> Co-authored-by: Nitish Tiwari <[email protected]>
1 parent 3d844b3 commit f240dde

File tree

8 files changed

+171
-300
lines changed

8 files changed

+171
-300
lines changed

src/cli.rs

+10
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use crate::{
3535
pub const DEFAULT_USERNAME: &str = "admin";
3636
pub const DEFAULT_PASSWORD: &str = "admin";
3737

38+
pub const DATASET_FIELD_COUNT_LIMIT: usize = 250;
3839
#[derive(Parser)]
3940
#[command(
4041
name = "parseable",
@@ -368,6 +369,15 @@ pub struct Options {
368369

369370
#[arg(long, env = "P_MS_CLARITY_TAG", help = "Tag for MS Clarity")]
370371
pub ms_clarity_tag: Option<String>,
372+
373+
#[arg(
374+
long,
375+
env = "P_DATASET_FIELD_COUNT_LIMIT",
376+
default_value_t = DATASET_FIELD_COUNT_LIMIT,
377+
value_parser = validation::validate_dataset_fields_allowed_limit,
378+
help = "total number of fields recommended in a dataset"
379+
)]
380+
pub dataset_fields_allowed_limit: usize,
371381
}
372382

373383
#[derive(Parser, Debug)]

src/handlers/http/ingest.rs

+3
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,8 @@ pub enum PostError {
467467
KnownFormat(#[from] known_schema::Error),
468468
#[error("Ingestion is not allowed to stream {0} as it is already associated with a different OTEL format")]
469469
IncorrectLogFormat(String),
470+
#[error("Failed to ingest events in dataset {0}. Total number of fields {1} exceeds the permissible limit of {2}. We recommend creating a new dataset beyond {2} for better query performance.")]
471+
FieldsCountLimitExceeded(String, usize, usize),
470472
}
471473

472474
impl actix_web::ResponseError for PostError {
@@ -495,6 +497,7 @@ impl actix_web::ResponseError for PostError {
495497
PostError::MissingTimePartition(_) => StatusCode::BAD_REQUEST,
496498
PostError::KnownFormat(_) => StatusCode::BAD_REQUEST,
497499
PostError::IncorrectLogFormat(_) => StatusCode::BAD_REQUEST,
500+
PostError::FieldsCountLimitExceeded(_, _, _) => StatusCode::BAD_REQUEST,
498501
}
499502
}
500503

src/handlers/http/modal/utils/ingest_utils.rs

+35
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ pub async fn flatten_and_push_logs(
5454
log_source: &LogSource,
5555
p_custom_fields: &HashMap<String, String>,
5656
) -> Result<(), PostError> {
57+
// Verify the dataset fields count
58+
verify_dataset_fields_count(stream_name)?;
59+
5760
match log_source {
5861
LogSource::Kinesis => {
5962
//custom flattening required for Amazon Kinesis
@@ -205,6 +208,38 @@ pub fn get_custom_fields_from_header(req: &HttpRequest) -> HashMap<String, Strin
205208
p_custom_fields
206209
}
207210

211+
fn verify_dataset_fields_count(stream_name: &str) -> Result<(), PostError> {
212+
let fields_count = PARSEABLE
213+
.get_stream(stream_name)?
214+
.get_schema()
215+
.fields()
216+
.len();
217+
let dataset_fields_warn_threshold = 0.8 * PARSEABLE.options.dataset_fields_allowed_limit as f64;
218+
// Check if the fields count exceeds the warn threshold
219+
if fields_count > dataset_fields_warn_threshold as usize {
220+
tracing::warn!(
221+
"Dataset {0} has {1} fields, which exceeds the warning threshold of {2}. Ingestion will not be possible after reaching {3} fields. We recommend creating a new dataset.",
222+
stream_name,
223+
fields_count,
224+
dataset_fields_warn_threshold as usize,
225+
PARSEABLE.options.dataset_fields_allowed_limit
226+
);
227+
}
228+
// Check if the fields count exceeds the limit
229+
// Return an error if the fields count exceeds the limit
230+
if fields_count > PARSEABLE.options.dataset_fields_allowed_limit {
231+
let error = PostError::FieldsCountLimitExceeded(
232+
stream_name.to_string(),
233+
fields_count,
234+
PARSEABLE.options.dataset_fields_allowed_limit,
235+
);
236+
tracing::error!("{}", error);
237+
// Return an error if the fields count exceeds the limit
238+
return Err(error);
239+
}
240+
Ok(())
241+
}
242+
208243
#[cfg(test)]
209244
mod tests {
210245
use super::*;

src/option.rs

+16
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ pub mod validation {
9191
path::{Path, PathBuf},
9292
};
9393

94+
use crate::cli::DATASET_FIELD_COUNT_LIMIT;
9495
use path_clean::PathClean;
9596

9697
use super::{Compression, Mode};
@@ -173,4 +174,19 @@ pub mod validation {
173174
Err("Invalid value for max disk usage. It should be given as 90.0 for 90%".to_string())
174175
}
175176
}
177+
178+
pub fn validate_dataset_fields_allowed_limit(s: &str) -> Result<usize, String> {
179+
if let Ok(size) = s.parse::<usize>() {
180+
if (1..=DATASET_FIELD_COUNT_LIMIT).contains(&size) {
181+
Ok(size)
182+
} else {
183+
Err(format!(
184+
"Invalid value for P_DATASET_FIELD_COUNT_LIMIT. It should be between 1 and {}",
185+
DATASET_FIELD_COUNT_LIMIT
186+
))
187+
}
188+
} else {
189+
Err("Invalid value for P_DATASET_FIELD_COUNT_LIMIT. It should be given as integer value".to_string())
190+
}
191+
}
176192
}

src/otel/logs.rs

+22-39
Original file line numberDiff line numberDiff line change
@@ -15,27 +15,33 @@
1515
* along with this program. If not, see <http://www.gnu.org/licenses/>.
1616
*
1717
*/
18-
18+
use super::otel_utils::collect_json_from_values;
19+
use super::otel_utils::convert_epoch_nano_to_timestamp;
20+
use super::otel_utils::insert_attributes;
1921
use opentelemetry_proto::tonic::logs::v1::LogRecord;
2022
use opentelemetry_proto::tonic::logs::v1::LogsData;
2123
use opentelemetry_proto::tonic::logs::v1::ScopeLogs;
2224
use opentelemetry_proto::tonic::logs::v1::SeverityNumber;
2325
use serde_json::Map;
2426
use serde_json::Value;
2527

26-
use super::otel_utils::add_other_attributes_if_not_empty;
27-
use super::otel_utils::collect_json_from_values;
28-
use super::otel_utils::convert_epoch_nano_to_timestamp;
29-
use super::otel_utils::insert_attributes;
30-
use super::otel_utils::merge_attributes_in_json;
31-
32-
pub const OTEL_LOG_KNOWN_FIELD_LIST: [&str; 6] = [
28+
pub const OTEL_LOG_KNOWN_FIELD_LIST: [&str; 16] = [
29+
"scope_name",
30+
"scope_version",
31+
"scope_log_schema_url",
32+
"scope_dropped_attributes_count",
33+
"resource_dropped_attributes_count",
34+
"schema_url",
3335
"time_unix_nano",
36+
"observed_time_unix_nano",
3437
"severity_number",
3538
"severity_text",
3639
"body",
40+
"flags",
41+
"log_record_dropped_attributes_count",
3742
"span_id",
3843
"trace_id",
44+
"event_name",
3945
];
4046
/// otel log event has severity number
4147
/// there is a mapping of severity number to severity text provided in proto
@@ -60,7 +66,6 @@ fn flatten_severity(severity_number: i32) -> Map<String, Value> {
6066
/// this function is called recursively for each log record object in the otel logs
6167
pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> {
6268
let mut log_record_json: Map<String, Value> = Map::new();
63-
let mut other_attributes = Map::new();
6469
log_record_json.insert(
6570
"time_unix_nano".to_string(),
6671
Value::String(convert_epoch_nano_to_timestamp(
@@ -83,11 +88,7 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> {
8388
log_record_json.insert(key.to_owned(), body_json[key].to_owned());
8489
}
8590
}
86-
insert_attributes(
87-
&mut log_record_json,
88-
&log_record.attributes,
89-
&mut other_attributes,
90-
);
91+
insert_attributes(&mut log_record_json, &log_record.attributes);
9192
log_record_json.insert(
9293
"log_record_dropped_attributes_count".to_string(),
9394
Value::Number(log_record.dropped_attributes_count.into()),
@@ -106,9 +107,6 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> {
106107
Value::String(hex::encode(&log_record.trace_id)),
107108
);
108109

109-
// Add the `other_attributes` to the log record json
110-
add_other_attributes_if_not_empty(&mut log_record_json, &other_attributes);
111-
112110
log_record_json
113111
}
114112

@@ -117,18 +115,13 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> {
117115
fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec<Map<String, Value>> {
118116
let mut vec_scope_log_json = Vec::new();
119117
let mut scope_log_json = Map::new();
120-
let mut other_attributes = Map::new();
121118
if let Some(scope) = &scope_log.scope {
122119
scope_log_json.insert("scope_name".to_string(), Value::String(scope.name.clone()));
123120
scope_log_json.insert(
124121
"scope_version".to_string(),
125122
Value::String(scope.version.clone()),
126123
);
127-
insert_attributes(
128-
&mut scope_log_json,
129-
&scope.attributes,
130-
&mut other_attributes,
131-
);
124+
insert_attributes(&mut scope_log_json, &scope.attributes);
132125
scope_log_json.insert(
133126
"scope_dropped_attributes_count".to_string(),
134127
Value::Number(scope.dropped_attributes_count.into()),
@@ -146,26 +139,17 @@ fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec<Map<String, Value>> {
146139
vec_scope_log_json.push(combined_json);
147140
}
148141

149-
// Add the `other_attributes` to the scope log json
150-
merge_attributes_in_json(other_attributes, &mut vec_scope_log_json);
151-
152142
vec_scope_log_json
153143
}
154144

155145
/// this function performs the custom flattening of the otel logs
156146
/// and returns a `Vec` of `Value::Object` of the flattened json
157147
pub fn flatten_otel_logs(message: &LogsData) -> Vec<Value> {
158148
let mut vec_otel_json = Vec::new();
159-
160149
for record in &message.resource_logs {
161150
let mut resource_log_json = Map::new();
162-
let mut other_attributes = Map::new();
163151
if let Some(resource) = &record.resource {
164-
insert_attributes(
165-
&mut resource_log_json,
166-
&resource.attributes,
167-
&mut other_attributes,
168-
);
152+
insert_attributes(&mut resource_log_json, &resource.attributes);
169153
resource_log_json.insert(
170154
"resource_dropped_attributes_count".to_string(),
171155
Value::Number(resource.dropped_attributes_count.into()),
@@ -176,19 +160,18 @@ pub fn flatten_otel_logs(message: &LogsData) -> Vec<Value> {
176160
for scope_log in &record.scope_logs {
177161
vec_resource_logs_json.extend(flatten_scope_log(scope_log));
178162
}
163+
179164
resource_log_json.insert(
180165
"schema_url".to_string(),
181166
Value::String(record.schema_url.clone()),
182167
);
183168

184169
for resource_logs_json in &mut vec_resource_logs_json {
185170
resource_logs_json.extend(resource_log_json.clone());
186-
}
187-
188-
// Add the `other_attributes` to the resource log json
189-
merge_attributes_in_json(other_attributes, &mut vec_resource_logs_json);
190171

191-
vec_otel_json.extend(vec_resource_logs_json);
172+
vec_otel_json.push(Value::Object(resource_logs_json.clone()));
173+
}
192174
}
193-
vec_otel_json.into_iter().map(Value::Object).collect()
175+
176+
vec_otel_json
194177
}

0 commit comments

Comments
 (0)