Skip to content

Commit

Permalink
Revert "feat(spans): Set origin_timestamp for spans (#5367)"
Browse files Browse the repository at this point in the history
This reverts commit 0fa6894.

Co-authored-by: untitaker <[email protected]>
  • Loading branch information
getsentry-bot and untitaker committed Jan 10, 2024
1 parent c30e29e commit 1c963c3
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 27 deletions.
3 changes: 1 addition & 2 deletions rust_snuba/src/processors/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ pub fn process_message(
) -> anyhow::Result<InsertBatch> {
let payload_bytes = payload.payload().context("Expected payload")?;
let msg: InputMessage = serde_json::from_slice(payload_bytes)?;

let functions = msg.functions.iter().map(|from| {
Function {
profile_id: msg.profile_id,
Expand All @@ -40,8 +39,8 @@ pub fn process_message(
});

Ok(InsertBatch {
origin_timestamp: DateTime::from_timestamp(msg.received, 0),
rows: RowData::from_rows(functions)?,
origin_timestamp: DateTime::from_timestamp(msg.received, 0),
sentry_received_timestamp: None,
})
}
Expand Down
8 changes: 3 additions & 5 deletions rust_snuba/src/processors/profiles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,24 @@ use rust_arroyo::backends::kafka::types::KafkaPayload;
use serde::{Deserialize, Serialize};
use uuid::Uuid;

use crate::processors::utils::enforce_retention;
use crate::types::{InsertBatch, KafkaMessageMetadata, RowData};

pub fn process_message(
payload: KafkaPayload,
metadata: KafkaMessageMetadata,
config: &ProcessorConfig,
_config: &ProcessorConfig,
) -> anyhow::Result<InsertBatch> {
let payload_bytes = payload.payload().context("Expected payload")?;
let mut msg: ProfileMessage = serde_json::from_slice(payload_bytes)?;

msg.retention_days = Some(enforce_retention(msg.retention_days, &config.env_config));
msg.offset = metadata.offset;
msg.partition = metadata.partition;

let origin_timestamp = DateTime::from_timestamp(msg.received, 0);

Ok(InsertBatch {
origin_timestamp,
rows: RowData::from_rows([msg])?,
origin_timestamp,
sentry_received_timestamp: None,
})
}
Expand Down Expand Up @@ -52,7 +50,7 @@ struct ProfileMessage {
profile_id: Uuid,
project_id: u64,
received: i64,
retention_days: Option<u16>,
retention_days: u32,
trace_id: Uuid,
transaction_id: Uuid,
transaction_name: String,
Expand Down
31 changes: 11 additions & 20 deletions rust_snuba/src/processors/spans.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use std::collections::BTreeMap;

use anyhow::Context;
use chrono::DateTime;
use rust_arroyo::backends::kafka::types::KafkaPayload;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use uuid::Uuid;

use crate::config::ProcessorConfig;
use crate::processors::utils::{enforce_retention, hex_to_u64};
use crate::types::{InsertBatch, KafkaMessageMetadata, RowData};
use crate::types::{InsertBatch, KafkaMessageMetadata};

pub fn process_message(
payload: KafkaPayload,
Expand All @@ -19,18 +18,13 @@ pub fn process_message(
let payload_bytes = payload.payload().context("Expected payload")?;
let msg: FromSpanMessage = serde_json::from_slice(payload_bytes)?;

let origin_timestamp = DateTime::from_timestamp(msg.received as i64, 0);
let mut span: Span = msg.try_into()?;

span.retention_days = Some(enforce_retention(span.retention_days, &config.env_config));

span.offset = metadata.offset;
span.partition = metadata.partition;

Ok(InsertBatch {
origin_timestamp,
rows: RowData::from_rows([span])?,
sentry_received_timestamp: None,
})
InsertBatch::from_rows([span])
}

#[derive(Debug, Default, Deserialize)]
Expand All @@ -49,7 +43,6 @@ struct FromSpanMessage {
parent_span_id: u64,
profile_id: Option<Uuid>,
project_id: u64,
received: f64,
retention_days: Option<u16>,
#[serde(default, deserialize_with = "hex_to_u64")]
segment_id: u64,
Expand Down Expand Up @@ -418,7 +411,6 @@ mod tests {
parent_span_id: Option<String>,
profile_id: Option<Uuid>,
project_id: Option<u64>,
received: Option<f64>,
retention_days: Option<u16>,
segment_id: Option<String>,
sentry_tags: TestSentryTags,
Expand All @@ -439,8 +431,15 @@ mod tests {
profile_id: Some(Uuid::new_v4()),
project_id: Some(1),
retention_days: Some(90),
received: Some(1691105878.720),
segment_id: Some("deadbeefdeadbeef".into()),
span_id: Some("deadbeefdeadbeef".into()),
start_timestamp_ms: Some(1691105878720),
trace_id: Some(Uuid::new_v4()),
tags: Some(BTreeMap::from([
("tag1".into(), "value1".into()),
("tag2".into(), "123".into()),
("tag3".into(), "true".into()),
])),
sentry_tags: TestSentryTags {
action: Some("GET".into()),
domain: Some("targetdomain.tld:targetport".into()),
Expand All @@ -455,14 +454,6 @@ mod tests {
transaction_method: Some("GET".into()),
transaction_op: Some("navigation".into()),
},
span_id: Some("deadbeefdeadbeef".into()),
start_timestamp_ms: Some(1691105878720),
tags: Some(BTreeMap::from([
("tag1".into(), "value1".into()),
("tag2".into(), "123".into()),
("tag3".into(), "true".into()),
])),
trace_id: Some(Uuid::new_v4()),
}
}

Expand Down

0 comments on commit 1c963c3

Please sign in to comment.