Skip to content

Commit ad39f57

Browse files
feat: allow otel compatibility changes (#803)
This PR ensures we're compatible to OTEL with - allow /v1/logs endpoint for otel log ingestion - make all fields optional to allow ingestion
1 parent 1b6f992 commit ad39f57

7 files changed

+159
-59
lines changed

server/src/handlers/http/ingest.rs

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,43 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
105105
Ok(())
106106
}
107107

108+
// Handler for POST /v1/logs to ingest OTEL logs
109+
// ingests events by extracting stream name from header
110+
// creates if stream does not exist
111+
pub async fn ingest_otel_logs(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostError> {
112+
if let Some((_, stream_name)) = req
113+
.headers()
114+
.iter()
115+
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
116+
{
117+
let stream_name = stream_name.to_str().unwrap().to_owned();
118+
create_stream_if_not_exists(&stream_name).await?;
119+
120+
//flatten logs
121+
if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY)
122+
{
123+
let log_source: String = log_source.to_str().unwrap().to_owned();
124+
if log_source == LOG_SOURCE_OTEL {
125+
let mut json = otel::flatten_otel_logs(&body);
126+
for record in json.iter_mut() {
127+
let body: Bytes = serde_json::to_vec(record).unwrap().into();
128+
push_logs(stream_name.to_string(), req.clone(), body).await?;
129+
}
130+
} else {
131+
log::warn!("Unknown log source: {}", log_source);
132+
return Err(PostError::CustomError("Unknown log source".to_string()));
133+
}
134+
} else {
135+
return Err(PostError::CustomError(
136+
"log source key header is missing".to_string(),
137+
));
138+
}
139+
} else {
140+
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
141+
}
142+
Ok(HttpResponse::Ok().finish())
143+
}
144+
108145
async fn flatten_and_push_logs(
109146
req: HttpRequest,
110147
body: Bytes,
@@ -116,7 +153,9 @@ async fn flatten_and_push_logs(
116153
let log_source: String = log_source.to_str().unwrap().to_owned();
117154
match log_source.as_str() {
118155
LOG_SOURCE_KINESIS => json = kinesis::flatten_kinesis_logs(&body),
119-
LOG_SOURCE_OTEL => json = otel::flatten_otel_logs(&body),
156+
LOG_SOURCE_OTEL => {
157+
json = otel::flatten_otel_logs(&body);
158+
}
120159
_ => {
121160
log::warn!("Unknown log source: {}", log_source);
122161
push_logs(stream_name.to_string(), req.clone(), body).await?;

server/src/handlers/http/modal/ingest_server.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -139,16 +139,18 @@ impl ParseableServer for IngestServer {
139139
impl IngestServer {
140140
// configure the api routes
141141
fn configure_routes(config: &mut web::ServiceConfig, _oidc_client: Option<OpenIdClient>) {
142-
config.service(
143-
// Base path "{url}/api/v1"
144-
web::scope(&base_path())
145-
.service(Server::get_ingest_factory())
146-
.service(Self::logstream_api())
147-
.service(Server::get_about_factory())
148-
.service(Self::analytics_factory())
149-
.service(Server::get_liveness_factory())
150-
.service(Server::get_readiness_factory()),
151-
);
142+
config
143+
.service(
144+
// Base path "{url}/api/v1"
145+
web::scope(&base_path())
146+
.service(Server::get_ingest_factory())
147+
.service(Self::logstream_api())
148+
.service(Server::get_about_factory())
149+
.service(Self::analytics_factory())
150+
.service(Server::get_liveness_factory())
151+
.service(Server::get_readiness_factory()),
152+
)
153+
.service(Server::get_ingest_otel_factory());
152154
}
153155

154156
fn analytics_factory() -> Scope {

server/src/handlers/http/modal/server.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ impl Server {
151151
.service(Self::get_oauth_webscope(oidc_client))
152152
.service(Self::get_user_role_webscope()),
153153
)
154+
.service(Self::get_ingest_otel_factory())
154155
.service(Self::get_generated());
155156
}
156157

@@ -347,6 +348,17 @@ impl Server {
347348
.app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE))
348349
}
349350

351+
// /v1/logs endpoint to be used for OTEL log ingestion only
352+
pub fn get_ingest_otel_factory() -> Resource {
353+
web::resource("/v1/logs")
354+
.route(
355+
web::post()
356+
.to(ingest::ingest_otel_logs)
357+
.authorize_for_stream(Action::Ingest),
358+
)
359+
.app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE))
360+
}
361+
350362
// get the oauth webscope
351363
pub fn get_oauth_webscope(oidc_client: Option<OpenIdClient>) -> Scope {
352364
let oauth = web::scope("/o")

server/src/handlers/http/otel.rs

Lines changed: 80 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,18 @@ fn collect_json_from_any_value(
5656
if value.array_val.is_some() {
5757
let array_val = value.array_val.as_ref().unwrap();
5858
let values = &array_val.values;
59-
6059
for value in values {
61-
let value = &value.value;
62-
value_json = collect_json_from_any_value(key, value.clone());
60+
let array_value_json = collect_json_from_any_value(key, value.clone());
61+
for key in array_value_json.keys() {
62+
value_json.insert(
63+
format!(
64+
"{}_{}",
65+
key.to_owned(),
66+
value_to_string(array_value_json[key].to_owned())
67+
),
68+
array_value_json[key].to_owned(),
69+
);
70+
}
6371
}
6472
}
6573

@@ -69,7 +77,22 @@ fn collect_json_from_any_value(
6977
let kv_list_val = value.kv_list_val.unwrap();
7078
for key_value in kv_list_val.values {
7179
let value = key_value.value;
72-
value_json = collect_json_from_values(&value, key);
80+
if value.is_some() {
81+
let value = value.unwrap();
82+
let key_value_json = collect_json_from_any_value(key, value);
83+
84+
for key in key_value_json.keys() {
85+
value_json.insert(
86+
format!(
87+
"{}_{}_{}",
88+
key.to_owned(),
89+
key_value.key,
90+
value_to_string(key_value_json[key].to_owned())
91+
),
92+
key_value_json[key].to_owned(),
93+
);
94+
}
95+
}
7396
}
7497
}
7598
if value.bytes_val.is_some() {
@@ -96,6 +119,14 @@ fn collect_json_from_values(
96119
value_json
97120
}
98121

122+
fn value_to_string(value: serde_json::Value) -> String {
123+
match value.clone() {
124+
e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(),
125+
Value::String(s) => s,
126+
_ => "".to_string(),
127+
}
128+
}
129+
99130
pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
100131
let mut vec_otel_json: Vec<BTreeMap<String, Value>> = Vec::new();
101132
let body_str = std::str::from_utf8(body).unwrap();
@@ -117,27 +148,33 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
117148
}
118149
}
119150
}
120-
if resource.dropped_attributes_count > 0 {
151+
if resource.dropped_attributes_count.is_some() {
121152
otel_json.insert(
122153
"resource_dropped_attributes_count".to_string(),
123-
Value::Number(serde_json::Number::from(resource.dropped_attributes_count)),
154+
Value::Number(serde_json::Number::from(
155+
resource.dropped_attributes_count.unwrap(),
156+
)),
124157
);
125158
}
126159
}
127160

128161
for scope_logs in record.scope_logs.iter() {
129162
for scope_log in scope_logs.iter() {
130163
for instrumentation_scope in scope_log.scope.iter() {
131-
if !instrumentation_scope.name.is_empty() {
164+
if instrumentation_scope.name.is_some() {
132165
otel_json.insert(
133166
"instrumentation_scope_name".to_string(),
134-
Value::String(instrumentation_scope.name.to_string()),
167+
Value::String(
168+
instrumentation_scope.name.as_ref().unwrap().to_string(),
169+
),
135170
);
136171
}
137-
if !instrumentation_scope.version.is_empty() {
172+
if instrumentation_scope.version.is_some() {
138173
otel_json.insert(
139174
"instrumentation_scope_version".to_string(),
140-
Value::String(instrumentation_scope.version.to_string()),
175+
Value::String(
176+
instrumentation_scope.version.as_ref().unwrap().to_string(),
177+
),
141178
);
142179
}
143180
let attributes = &instrumentation_scope.attributes;
@@ -154,37 +191,45 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
154191
}
155192
}
156193
}
157-
if instrumentation_scope.dropped_attributes_count > 0 {
194+
if instrumentation_scope.dropped_attributes_count.is_some() {
158195
otel_json.insert(
159196
"instrumentation_scope_dropped_attributes_count".to_string(),
160197
Value::Number(serde_json::Number::from(
161-
instrumentation_scope.dropped_attributes_count,
198+
instrumentation_scope.dropped_attributes_count.unwrap(),
162199
)),
163200
);
164201
}
165202
}
166203

167204
for log_record in scope_log.log_records.iter() {
168205
let mut log_record_json: BTreeMap<String, Value> = BTreeMap::new();
169-
if !log_record.time_unix_nano > 0 {
206+
if log_record.time_unix_nano.is_some() {
170207
log_record_json.insert(
171208
"time_unix_nano".to_string(),
172-
Value::String(log_record.time_unix_nano.to_string()),
209+
Value::String(
210+
log_record.time_unix_nano.as_ref().unwrap().to_string(),
211+
),
173212
);
174213
}
175-
if !log_record.observed_time_unix_nano > 0 {
214+
if log_record.observed_time_unix_nano.is_some() {
176215
log_record_json.insert(
177216
"observed_time_unix_nano".to_string(),
178-
Value::String(log_record.observed_time_unix_nano.to_string()),
217+
Value::String(
218+
log_record
219+
.observed_time_unix_nano
220+
.as_ref()
221+
.unwrap()
222+
.to_string(),
223+
),
179224
);
180225
}
181-
if log_record.severity_number > 0 {
182-
let severity_number: i32 = log_record.severity_number;
226+
if log_record.severity_number.is_some() {
227+
let severity_number: i32 = log_record.severity_number.unwrap();
183228
log_record_json.insert(
184229
"severity_number".to_string(),
185230
Value::Number(serde_json::Number::from(severity_number)),
186231
);
187-
if log_record.severity_text.is_empty() {
232+
if log_record.severity_text.is_none() {
188233
log_record_json.insert(
189234
"severity_text".to_string(),
190235
Value::String(
@@ -193,10 +238,12 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
193238
);
194239
}
195240
}
196-
if !log_record.severity_text.is_empty() {
241+
if log_record.severity_text.is_some() {
197242
log_record_json.insert(
198243
"severity_text".to_string(),
199-
Value::String(log_record.severity_text.to_string()),
244+
Value::String(
245+
log_record.severity_text.as_ref().unwrap().to_string(),
246+
),
200247
);
201248
}
202249

@@ -221,17 +268,17 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
221268
}
222269
}
223270

224-
if log_record.dropped_attributes_count > 0 {
271+
if log_record.dropped_attributes_count.is_some() {
225272
log_record_json.insert(
226273
"log_record_dropped_attributes_count".to_string(),
227274
Value::Number(serde_json::Number::from(
228-
log_record.dropped_attributes_count,
275+
log_record.dropped_attributes_count.unwrap(),
229276
)),
230277
);
231278
}
232279

233-
if log_record.flags > 0 {
234-
let flags: u32 = log_record.flags;
280+
if log_record.flags.is_some() {
281+
let flags: u32 = log_record.flags.unwrap();
235282
log_record_json.insert(
236283
"flags_number".to_string(),
237284
Value::Number(serde_json::Number::from(flags)),
@@ -242,17 +289,17 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
242289
);
243290
}
244291

245-
if !log_record.span_id.is_empty() {
292+
if log_record.span_id.is_some() {
246293
log_record_json.insert(
247294
"span_id".to_string(),
248-
Value::String(log_record.span_id.to_string()),
295+
Value::String(log_record.span_id.as_ref().unwrap().to_string()),
249296
);
250297
}
251298

252-
if !log_record.trace_id.is_empty() {
299+
if log_record.trace_id.is_some() {
253300
log_record_json.insert(
254301
"trace_id".to_string(),
255-
Value::String(log_record.trace_id.to_string()),
302+
Value::String(log_record.trace_id.as_ref().unwrap().to_string()),
256303
);
257304
}
258305
for key in log_record_json.keys() {
@@ -261,18 +308,18 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
261308
vec_otel_json.push(otel_json.clone());
262309
}
263310

264-
if !scope_log.schema_url.is_empty() {
311+
if scope_log.schema_url.is_some() {
265312
otel_json.insert(
266313
"scope_log_schema_url".to_string(),
267-
Value::String(scope_log.schema_url.to_string()),
314+
Value::String(scope_log.schema_url.as_ref().unwrap().to_string()),
268315
);
269316
}
270317
}
271318
}
272-
if !record.schema_url.is_empty() {
319+
if record.schema_url.is_some() {
273320
otel_json.insert(
274321
"resource_schema_url".to_string(),
275-
Value::String(record.schema_url.to_string()),
322+
Value::String(record.schema_url.as_ref().unwrap().to_string()),
276323
);
277324
}
278325
}

server/src/handlers/http/otel/opentelemetry.proto.common.v1.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
/// since oneof in AnyValue does not allow repeated fields.
5454
pub struct ArrayValue {
5555
/// Array of values. The array may be empty (contain 0 elements).
56-
pub values: Vec<AnyValue>,
56+
pub values: Vec<Value>,
5757
}
5858

5959
#[derive(Serialize, Deserialize, Debug, Clone)]
@@ -83,13 +83,13 @@
8383
/// such as the fully qualified name and version.
8484
pub struct InstrumentationScope {
8585
/// An empty instrumentation scope name means the name is unknown.
86-
pub name: String,
87-
pub version: String,
86+
pub name: Option<String>,
87+
pub version: Option<String>,
8888
/// Additional attributes that describe the scope. \[Optional\].
8989
/// Attribute keys MUST be unique (it is not allowed to have more than one
9090
/// attribute with the same key).
9191
pub attributes: Option<Vec<KeyValue>>,
9292
#[serde(rename = "droppedAttributesCount")]
93-
pub dropped_attributes_count: u32,
93+
pub dropped_attributes_count: Option<u32>,
9494
}
9595

0 commit comments

Comments
 (0)