Skip to content

feat(flags): Update errors processor to index feature flag data #6713

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions rust_snuba/src/processors/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ type GenericContext = BTreeMap<String, ContextStringify>;

#[derive(Debug, Default, Deserialize, JsonSchema)]
struct Contexts {
#[serde(default)]
flags: Option<FlagContext>,
#[serde(default)]
replay: Option<ReplayContext>,
#[serde(default)]
Expand All @@ -183,6 +185,20 @@ struct TraceContext {
other: GenericContext,
}

#[derive(Debug, Default, Deserialize, JsonSchema)]
struct FlagContext {
#[serde(default)]
values: Option<Vec<Option<FlagContextItem>>>,
}

#[derive(Debug, Default, Deserialize, JsonSchema)]
struct FlagContextItem {
#[serde(default)]
flag: Unicodify,
#[serde(default)]
result: Unicodify,
}

#[derive(Debug, Default, Deserialize, JsonSchema)]
struct ReplayContext {
#[serde(default, skip_serializing_if = "Option::is_none")]
Expand Down Expand Up @@ -384,6 +400,10 @@ struct ErrorRow {
tags_key: Vec<String>,
#[serde(rename = "tags.value")]
tags_value: Vec<String>,
#[serde(rename = "flags.key")]
flags_key: Vec<String>,
#[serde(rename = "flags.value")]
flags_value: Vec<String>,
timestamp: u32,
title: String,
#[serde(skip_serializing_if = "Option::is_none")]
Expand Down Expand Up @@ -563,6 +583,21 @@ impl ErrorRow {
replay_id = Some(rid)
}

// Split feature keys and values into two vectors if the context could be parsed.
let mut flags_key = Vec::new();
let mut flags_value = Vec::new();

if let Some(ctx) = from_context.flags {
if let Some(values) = ctx.values {
for item in values.into_iter().flatten() {
if let (Some(k), Some(v)) = (item.flag.0, item.result.0) {
flags_key.push(k);
flags_value.push(v);
}
}
}
};

// Stacktrace.

let exceptions = from
Expand Down Expand Up @@ -668,6 +703,8 @@ impl ErrorRow {
exception_stacks_mechanism_type: stack_mechanism_types,
exception_stacks_type: stack_types,
exception_stacks_value: stack_values,
flags_key,
flags_value,
group_id: from.group_id,
http_method: from_request.method.0,
http_referer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@ source: src/processors/mod.rs
expression: diff
---
[
Change {
path: ".<anyOf:0>.2.data.contexts.<anyOf:0>",
change: PropertyAdd {
lhs_additional_properties: true,
added: "flags",
},
},
Change {
path: ".<anyOf:0>.2.data.contexts.<anyOf:0>.trace.<anyOf:0>",
change: PropertyAdd {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ expression: snapshot_payload
"exception_stacks.value": [
"Some exception."
],
"flags.key": [],
"flags.value": [],
"group_id": 124,
"http_method": null,
"http_referer": null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ expression: snapshot_payload
"exception_stacks.value": [
"Some exception."
],
"flags.key": [],
"flags.value": [],
"group_id": 124,
"http_method": null,
"http_referer": null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ expression: snapshot_payload
"exception_stacks.value": [
"Some exception."
],
"flags.key": [],
"flags.value": [],
"group_id": 124,
"http_method": null,
"http_referer": null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ expression: snapshot_payload
"exception_stacks.mechanism_type": [],
"exception_stacks.type": [],
"exception_stacks.value": [],
"flags.key": [],
"flags.value": [],
"group_id": 124,
"http_method": null,
"http_referer": null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ expression: snapshot_payload
"exception_stacks.mechanism_type": [],
"exception_stacks.type": [],
"exception_stacks.value": [],
"flags.key": [],
"flags.value": [],
"group_id": 123123123,
"http_method": null,
"http_referer": null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ expression: snapshot_payload
"exception_stacks.mechanism_type": [],
"exception_stacks.type": [],
"exception_stacks.value": [],
"flags.key": [],
"flags.value": [],
"group_id": 124,
"http_method": null,
"http_referer": null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ expression: snapshot_payload
"exception_stacks.mechanism_type": [],
"exception_stacks.type": [],
"exception_stacks.value": [],
"flags.key": [],
"flags.value": [],
"group_id": 123123,
"http_method": null,
"http_referer": null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ expression: snapshot_payload
"exception_stacks.mechanism_type": [],
"exception_stacks.type": [],
"exception_stacks.value": [],
"flags.key": [],
"flags.value": [],
"group_id": 123,
"http_method": null,
"http_referer": null,
Expand Down
141 changes: 141 additions & 0 deletions tests/datasets/test_errors_processor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import json
import uuid
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
Expand Down Expand Up @@ -35,6 +36,7 @@ class ErrorEvent:
trace_sampled: bool | None
environment: str
replay_id: uuid.UUID | None
flags: list[Mapping[str, Any]]
received_timestamp: datetime
errors: Sequence[Mapping[str, Any]] | None

Expand Down Expand Up @@ -135,6 +137,7 @@ def serialize(self) -> tuple[int, str, Mapping[str, Any]]:
]
},
"contexts": {
"flags": {"values": self.flags},
"runtime": {
"version": "3.7.6",
"type": "runtime",
Expand Down Expand Up @@ -377,8 +380,15 @@ def build_result(self, meta: KafkaMessageMetadata) -> Mapping[str, Any]:
"modules.version": ["1.13.2", "0.2.0", "0.6.0"],
"transaction_name": "",
"num_processing_errors": len(self.errors) if self.errors is not None else 0,
"flags.key": [],
"flags.value": [],
}

if self.flags:
for flag in self.flags:
expected_result["flags.key"].append(flag["flag"])
expected_result["flags.value"].append(json.dumps(flag["result"]))

if self.replay_id:
expected_result["replay_id"] = str(self.replay_id)

Expand Down Expand Up @@ -423,6 +433,7 @@ def __get_error_event(self, timestamp: datetime, recieved: datetime) -> ErrorEve
"subdivision": "fake_subdivision",
},
errors=None,
flags=[],
)

def test_errors_basic(self) -> None:
Expand Down Expand Up @@ -465,6 +476,7 @@ def test_errors_replayid_context(self) -> None:
},
replay_id=uuid.uuid4(),
errors=None,
flags=[],
)

payload = message.serialize()
Expand Down Expand Up @@ -503,6 +515,7 @@ def test_errors_replayid_tag(self) -> None:
},
replay_id=None,
errors=None,
flags=[],
)
replay_id = uuid.uuid4()
payload = message.serialize()
Expand Down Expand Up @@ -548,6 +561,7 @@ def test_errors_replayid_tag_and_context(self) -> None:
},
replay_id=replay_id,
errors=None,
flags=[],
)

payload = message.serialize()
Expand Down Expand Up @@ -591,6 +605,7 @@ def test_errors_replayid_invalid_tag(self) -> None:
},
replay_id=None,
errors=None,
flags=[],
)
invalid_replay_id = "imnotavaliduuid"
payload = message.serialize()
Expand Down Expand Up @@ -649,6 +664,7 @@ def test_exception_main_thread_true(self) -> None:
]
},
errors=None,
flags=[],
)
payload = message.serialize()
meta = KafkaMessageMetadata(offset=2, partition=2, timestamp=timestamp)
Expand Down Expand Up @@ -703,6 +719,7 @@ def test_exception_main_thread_false(self) -> None:
]
},
errors=None,
flags=[],
)
payload = message.serialize()
meta = KafkaMessageMetadata(offset=2, partition=2, timestamp=timestamp)
Expand Down Expand Up @@ -743,6 +760,7 @@ def test_trace_sampled(self) -> None:
replay_id=None,
threads=None,
errors=None,
flags=[],
)
payload = message.serialize()
meta = KafkaMessageMetadata(offset=2, partition=2, timestamp=timestamp)
Expand Down Expand Up @@ -794,6 +812,59 @@ def test_errors_processed(self) -> None:
replay_id=None,
threads=None,
errors=[{"type": "one"}, {"type": "two"}, {"type": "three"}],
flags=[],
)
payload = message.serialize()
meta = KafkaMessageMetadata(offset=2, partition=2, timestamp=timestamp)

result = message.build_result(meta)
result["num_processing_errors"] = 3

assert self.processor.process_message(payload, meta) == InsertBatch(
[result], ANY
)

# ensure old behavior where data.errors=None won't set 'num_processing_errors'
message.errors = None
payload = message.serialize()
meta = KafkaMessageMetadata(offset=2, partition=2, timestamp=timestamp)

result = message.build_result(meta)

assert self.processor.process_message(payload, meta) == InsertBatch(
[result], ANY
)

def test_errors_with_flags(self) -> None:
timestamp, recieved = self.__get_timestamps()
message = ErrorEvent(
event_id=str(uuid.UUID("dcb9d002cac548c795d1c9adbfc68040")),
organization_id=1,
project_id=2,
group_id=100,
platform="python",
message="",
trace_id=str(uuid.uuid4()),
trace_sampled=False,
timestamp=timestamp,
received_timestamp=recieved,
release="1.0.0",
dist="dist",
environment="prod",
email="[email protected]",
ip_address="127.0.0.1",
user_id="myself",
username="me",
geo={
"country_code": "XY",
"region": "fake_region",
"city": "fake_city",
"subdivision": "fake_subdivision",
},
replay_id=None,
threads=None,
errors=[{"type": "one"}, {"type": "two"}, {"type": "three"}],
flags=[{"flag": "abc", "result": True}],
)
payload = message.serialize()
meta = KafkaMessageMetadata(offset=2, partition=2, timestamp=timestamp)
Expand All @@ -815,3 +886,73 @@ def test_errors_processed(self) -> None:
assert self.processor.process_message(payload, meta) == InsertBatch(
[result], ANY
)

def test_errors_with_malformed_flags(self) -> None:
timestamp, recieved = self.__get_timestamps()
message = ErrorEvent(
event_id=str(uuid.UUID("dcb9d002cac548c795d1c9adbfc68040")),
organization_id=1,
project_id=2,
group_id=100,
platform="python",
message="",
trace_id=str(uuid.uuid4()),
trace_sampled=False,
timestamp=timestamp,
received_timestamp=recieved,
release="1.0.0",
dist="dist",
environment="prod",
email="[email protected]",
ip_address="127.0.0.1",
user_id="myself",
username="me",
geo={
"country_code": "XY",
"region": "fake_region",
"city": "fake_city",
"subdivision": "fake_subdivision",
},
replay_id=None,
threads=None,
errors=[{"type": "one"}, {"type": "two"}, {"type": "three"}],
flags=[],
)
payload = message.serialize()
meta = KafkaMessageMetadata(offset=2, partition=2, timestamp=timestamp)

# Assert malformed context type is ignored.
payload[2]["data"]["contexts"]["flags"] = {"key": "value"}
result = self.processor.process_message(payload, meta)
assert result.rows[0]["flags.key"] == []
assert result.rows[0]["flags.value"] == []

# Assert malformed values type is ignored.
payload[2]["data"]["contexts"]["flags"] = {"values": None}
result = self.processor.process_message(payload, meta)
assert result.rows[0]["flags.key"] == []
assert result.rows[0]["flags.value"] == []

# Assert malformed item type is ignored.
payload[2]["data"]["contexts"]["flags"] = {"values": [None]}
result = self.processor.process_message(payload, meta)
assert result.rows[0]["flags.key"] == []
assert result.rows[0]["flags.value"] == []

# Assert incorrect item contents is ignored.
payload[2]["data"]["contexts"]["flags"] = {"values": [{"key": "value"}]}
result = self.processor.process_message(payload, meta)
assert result.rows[0]["flags.key"] == []
assert result.rows[0]["flags.value"] == []

# Assert missing "result" key means the whole item is ignored.
payload[2]["data"]["contexts"]["flags"] = {"values": [{"flag": "value"}]}
result = self.processor.process_message(payload, meta)
assert result.rows[0]["flags.key"] == []
assert result.rows[0]["flags.value"] == []

# Assert missing "flag" key means the whole item is ignored.
payload[2]["data"]["contexts"]["flags"] = {"values": [{"result": "value"}]}
result = self.processor.process_message(payload, meta)
assert result.rows[0]["flags.key"] == []
assert result.rows[0]["flags.value"] == []
Loading