Skip to content

Commit

Permalink
feat(metrics-summaries): Ingest new segment-related columns (#5480)
Browse files Browse the repository at this point in the history
  • Loading branch information
phacops authored Feb 1, 2024
1 parent aa20511 commit f57f752
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 12 deletions.
26 changes: 23 additions & 3 deletions rust_snuba/src/processors/metrics_summaries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ pub fn process_message(
let from: FromSpanMessage = serde_json::from_slice(payload_bytes)?;

let mut metrics_summaries: Vec<MetricsSummary> = Vec::new();
let sentry_tags = from.sentry_tags.unwrap_or_default();
let group: u64 = sentry_tags
.get("group")
.map(|group| u64::from_str_radix(group, 16).unwrap_or_default())
.unwrap_or_default();
let span_id = u64::from_str_radix(&from.span_id, 16)?;
let segment_id = from.segment_id.map_or(span_id, |segment_id| {
u64::from_str_radix(&segment_id, 16).unwrap_or_default()
});

let end_timestamp_ms = from.start_timestamp_ms + from.duration_ms as u64;
for (metric_mri, summaries) in &from._metrics_summary {
Expand All @@ -33,13 +42,17 @@ pub fn process_message(
metrics_summaries.push(MetricsSummary {
count: summary.count as u64,
deleted: 0,
duration_ms: from.duration_ms,
end_timestamp: end_timestamp_ms / 1000,
group,
is_segment: if from.is_segment { 1 } else { 0 },
max: summary.max,
metric_mri,
min: summary.min,
project_id: from.project_id,
retention_days: enforce_retention(from.retention_days, &config.env_config),
span_id: u64::from_str_radix(&from.span_id, 16)?,
segment_id,
span_id,
sum: summary.sum,
tag_keys,
tag_values,
Expand All @@ -57,12 +70,13 @@ pub fn process_message(
struct FromSpanMessage {
#[serde(default)]
_metrics_summary: BTreeMap<String, Vec<FromMetricsSummary>>,
#[serde(default)]
duration_ms: u32,
is_segment: bool,
project_id: u64,
received: f64,
#[serde(default)]
retention_days: Option<u16>,
segment_id: Option<String>,
sentry_tags: Option<BTreeMap<String, String>>,
span_id: String,
start_timestamp_ms: u64,
trace_id: Uuid,
Expand Down Expand Up @@ -90,12 +104,16 @@ struct FromMetricsSummary {
struct MetricsSummary<'a> {
count: u64,
deleted: u8,
duration_ms: u32,
end_timestamp: u64,
group: u64,
is_segment: u8,
max: f64,
metric_mri: &'a str,
min: f64,
project_id: u64,
retention_days: u16,
segment_id: u64,
span_id: u64,
sum: f64,
#[serde(rename(serialize = "tags.key"))]
Expand Down Expand Up @@ -151,6 +169,7 @@ mod tests {
]
},
"duration_ms": 1000,
"is_segment": false,
"project_id": 1,
"received": 1691105878.720,
"retention_days": 90,
Expand Down Expand Up @@ -190,6 +209,7 @@ mod tests {
]
},
"duration_ms": 1000,
"is_segment": false,
"project_id": 1,
"received": 1691105878.720,
"retention_days": 90,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@ expression: snapshot_payload
{
"count": 1,
"deleted": 0,
"duration_ms": 1000,
"end_timestamp": 1691105879,
"group": 16045690984833335023,
"is_segment": 0,
"max": 1.0,
"metric_mri": "c:sentry.events.outcomes@none",
"min": 1.0,
"project_id": 1,
"retention_days": 90,
"segment_id": 16045690984833335023,
"span_id": 16045690984833335023,
"sum": 1.0,
"tags.key": [
Expand All @@ -38,12 +42,16 @@ expression: snapshot_payload
{
"count": 1,
"deleted": 0,
"duration_ms": 1000,
"end_timestamp": 1691105879,
"group": 16045690984833335023,
"is_segment": 0,
"max": 0.0,
"metric_mri": "c:sentry.events.post_save.normalize.errors@none",
"min": 0.0,
"project_id": 1,
"retention_days": 90,
"segment_id": 16045690984833335023,
"span_id": 16045690984833335023,
"sum": 0.0,
"tags.key": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@ name: metrics_summaries
schema:
[
{ name: project_id, type: UInt, args: { size: 64 } },
{ name: span_id, type: UInt, args: { size: 64 } },
{ name: trace_id, type: UUID },
{ name: metric_mri, type: String },
{ name: trace_id, type: UUID },
{ name: segment_id, type: UInt, args: { size: 64 } },
{ name: span_id, type: UInt, args: { size: 64 } },
{ name: group, type: UInt, args: { size: 64 } },
{ name: duration_ms, type: UInt, args: { size: 32 } },
{ name: is_segment, type: UInt, args: { size: 8 } },
{ name: min, type: Float, args: { size: 64 } },
{ name: max, type: Float, args: { size: 64 } },
{ name: sum, type: Float, args: { size: 64 } },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ schema:
[
{ name: project_id, type: UInt, args: { size: 64 } },
{ name: metric_mri, type: String },
{ name: span_id, type: UInt, args: { size: 64 } },
{ name: trace_id, type: UUID},
{ name: segment_id, type: UInt, args: { size: 64 } },
{ name: span_id, type: UInt, args: { size: 64 } },
{ name: group, type: UInt, args: { size: 64 } },
{ name: duration_ms, type: UInt, args: { size: 32 } },
{ name: is_segment, type: UInt, args: { size: 8 } },
{ name: min, type: Float, args: { size: 64 } },
{ name: max, type: Float, args: { size: 64 } },
{ name: sum, type: Float, args: { size: 64 } },
Expand Down Expand Up @@ -69,7 +73,7 @@ query_processors:
columns: [trace_id]
- processor: HexIntColumnProcessor
args:
columns: [span_id]
columns: [span_id, segment_id, group]
- processor: MappingOptimizer
args:
column_name: tags
Expand All @@ -89,7 +93,7 @@ query_processors:
query_splitters:
- splitter: TimeSplitQueryStrategy
args:
timestamp_col: start_timestamp
timestamp_col: end_timestamp

mandatory_condition_checkers:
- condition: ProjectIdEnforcer
Expand Down
12 changes: 8 additions & 4 deletions tests/datasets/test_spans_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,17 +174,21 @@ def build_result(self, meta: KafkaMessageMetadata) -> Sequence[Mapping[str, Any]

def build_metrics_summary_result(self) -> Sequence[Mapping[str, Any]]:
common_fields = {
"project_id": 1,
"trace_id": str(UUID(self.trace_id)),
"span_id": int(self.span_id, 16),
"deleted": 0,
"duration_ms": self.duration_ms,
"end_timestamp": int(
datetime.fromtimestamp(
(self.start_timestamp_ms + self.duration_ms) / 1000,
tz=timezone.utc,
).timestamp()
),
"deleted": 0,
"group": int(self.group, 16),
"is_segment": self.parent_span_id == "",
"project_id": 1,
"retention_days": 90,
"segment_id": int(self.segment_id, 16),
"span_id": int(self.span_id, 16),
"trace_id": str(UUID(self.trace_id)),
}
return [
{
Expand Down

0 comments on commit f57f752

Please sign in to comment.