From 4c362c0d6ebdc7308b39b42f1ee0278b431bfd38 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Fri, 13 Dec 2024 10:50:40 -0500 Subject: [PATCH 01/16] feat(eap): Add a FindTraces endpoint --- snuba/web/rpc/v1/endpoint_find_traces.py | 262 +++++++++++++++ tests/web/rpc/v1/test_endpoint_find_traces.py | 311 ++++++++++++++++++ 2 files changed, 573 insertions(+) create mode 100644 snuba/web/rpc/v1/endpoint_find_traces.py create mode 100644 tests/web/rpc/v1/test_endpoint_find_traces.py diff --git a/snuba/web/rpc/v1/endpoint_find_traces.py b/snuba/web/rpc/v1/endpoint_find_traces.py new file mode 100644 index 0000000000..be80917c0f --- /dev/null +++ b/snuba/web/rpc/v1/endpoint_find_traces.py @@ -0,0 +1,262 @@ +import uuid +from collections import defaultdict +from typing import Any, Callable, Dict, Iterable, Sequence, Type + +from google.protobuf.json_format import MessageToDict +from sentry_protos.snuba.v1.endpoint_find_traces_pb2 import ( + FindTracesRequest, + FindTracesResponse, + TraceColumn, +) +from sentry_protos.snuba.v1.request_common_pb2 import PageToken +from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey, AttributeValue + +from snuba.attribution.appid import AppID +from snuba.attribution.attribution_info import AttributionInfo +from snuba.datasets.entities.entity_key import EntityKey +from snuba.datasets.entities.factory import get_entity +from snuba.datasets.pluggable_dataset import PluggableDataset +from snuba.query import OrderBy, OrderByDirection, SelectedExpression +from snuba.query.data_source.simple import Entity +from snuba.query.dsl import Functions as f +from snuba.query.dsl import column +from snuba.query.expressions import Expression +from snuba.query.logical import Query +from snuba.query.query_settings import HTTPQuerySettings +from snuba.request import Request as SnubaRequest +from snuba.web.query import run_query +from snuba.web.rpc import RPCEndpoint +from snuba.web.rpc.common.common import ( + base_conditions_and, + trace_item_filters_to_expression, + treeify_or_and_conditions, +) +from snuba.web.rpc.common.debug_info import ( + extract_response_meta, + setup_trace_query_settings, +) +from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException + +_DEFAULT_ROW_LIMIT = 10_000 + +_COLUMN_TO_NAME: dict[TraceColumn.Name, str] = { + TraceColumn.Name.TRACE_ID: "trace_id", + TraceColumn.Name.START_TIMESTAMP: "start_timestamp", +} + +_NAME_TO_COLUMN: dict[str, TraceColumn.Name] = { + v: k for k, v in _COLUMN_TO_NAME.items() +} + +_TYPES_TO_CLICKHOUSE: dict[AttributeKey.Type, str] = { + AttributeKey.Type.TYPE_STRING: "String", + AttributeKey.Type.TYPE_INT: "Int64", + AttributeKey.Type.TYPE_FLOAT: "Float64", +} + +_POSSIBLE_TYPES: dict[TraceColumn.Name, set[AttributeKey.Type]] = { + TraceColumn.Name.TRACE_ID: { + AttributeKey.Type.TYPE_STRING, + }, + TraceColumn.Name.START_TIMESTAMP: { + AttributeKey.Type.TYPE_STRING, + AttributeKey.Type.TYPE_INT, + AttributeKey.Type.TYPE_FLOAT, + }, +} + + +def _column_to_expression(trace_column: TraceColumn) -> Expression: + if ( + trace_column.name in _COLUMN_TO_NAME + and trace_column.type in _POSSIBLE_TYPES.get(trace_column.name, {}) + ): + return f.CAST( + column( + _COLUMN_TO_NAME[trace_column.name], + ), + _TYPES_TO_CLICKHOUSE[trace_column.type], + alias=_COLUMN_TO_NAME[trace_column.name], + ) + raise BadSnubaRPCRequestException( + f"{trace_column.name} had an unknown or unset type: {trace_column.type}" + ) + + +def _convert_order_by( + order_by: Sequence[FindTracesRequest.OrderBy], +) -> Sequence[OrderBy]: + res: list[OrderBy] = [] + for x in order_by: + direction = OrderByDirection.DESC if x.descending else OrderByDirection.ASC + res.append( + OrderBy( + direction=direction, + expression=_column_to_expression(x.column), + ) + ) + return res + + +def _build_query(request: FindTracesRequest) -> Query: + entity = Entity( + key=EntityKey("eap_spans"), + schema=get_entity(EntityKey("eap_spans")).get_data_model(), + sample=None, + ) + + selected_columns = [] + + for trace_column in request.columns: + expression = _column_to_expression(trace_column) + selected_columns.append( + SelectedExpression( + name=_COLUMN_TO_NAME[trace_column.name], + expression=expression, + ) + ) + + res = Query( + from_clause=entity, + selected_columns=selected_columns, + condition=base_conditions_and( + request.meta, + trace_item_filters_to_expression(request.filter), + ), + order_by=_convert_order_by(request.order_by), + groupby=[ + _column_to_expression( + TraceColumn( + type=AttributeKey.TYPE_STRING, + name=TraceColumn.Name.TRACE_ID, + ), + ), + ], + limit=request.limit if request.limit > 0 else _DEFAULT_ROW_LIMIT, + ) + treeify_or_and_conditions(res) + return res + + +def _build_snuba_request(request: FindTracesRequest) -> SnubaRequest: + query_settings = ( + setup_trace_query_settings() if request.meta.debug else HTTPQuerySettings() + ) + + return SnubaRequest( + id=uuid.UUID(request.meta.request_id), + original_body=MessageToDict(request), + query=_build_query(request), + query_settings=query_settings, + attribution_info=AttributionInfo( + referrer=request.meta.referrer, + team="eap", + feature="eap", + tenant_ids={ + "organization_id": request.meta.organization_id, + "referrer": request.meta.referrer, + }, + app_id=AppID("eap"), + parent_api="eap_span_samples", + ), + ) + + +def _convert_results( + request: FindTracesRequest, data: Iterable[Dict[str, Any]] +) -> list[FindTracesResponse.Trace]: + converters: Dict[str, Callable[[Any], AttributeValue]] = {} + + for trace_column in request.columns: + if trace_column.type == AttributeKey.TYPE_BOOLEAN: + converters[trace_column.name] = lambda x: AttributeValue(val_bool=bool(x)) + elif trace_column.type == AttributeKey.TYPE_STRING: + converters[trace_column.name] = lambda x: AttributeValue(val_str=str(x)) + elif trace_column.type == AttributeKey.TYPE_INT: + converters[trace_column.name] = lambda x: AttributeValue(val_int=int(x)) + elif trace_column.type == AttributeKey.TYPE_FLOAT: + converters[trace_column.name] = lambda x: AttributeValue(val_float=float(x)) + + res: list[FindTracesResponse.Trace] = [] + column_ordering = { + trace_column.name: i for i, trace_column in enumerate(request.columns) + } + + for row in data: + values: defaultdict[ + TraceColumn.Name, FindTracesResponse.Trace.Column + ] = defaultdict(FindTracesResponse.Trace.Column) + for column_name, value in row.items(): + name = _NAME_TO_COLUMN[column_name] + if name in converters.keys(): + values[name] = FindTracesResponse.Trace.Column( + value=converters[name](value), + ) + res.append( + FindTracesResponse.Trace( + # we return the columns in the order they were requested + columns=sorted( + values.values(), + key=lambda c: column_ordering.__getitem__(c.name), + ) + ) + ) + + return res + + +def _get_page_token( + request: FindTracesRequest, response: list[FindTracesResponse.Trace] +) -> PageToken: + if not response: + return PageToken(offset=0) + num_rows = len(response) + return PageToken(offset=request.page_token.offset + num_rows) + + +def _validate_order_by(in_msg: FindTracesRequest) -> None: + order_by_cols = set([ob.column.name for ob in in_msg.order_by]) + selected_columns = set([c.name for c in in_msg.columns]) + if not order_by_cols.issubset(selected_columns): + raise BadSnubaRPCRequestException( + f"Ordered by columns {order_by_cols} not selected: {selected_columns}" + ) + + +class EndpointFindTraces(RPCEndpoint[FindTracesRequest, FindTracesResponse]): + @classmethod + def version(cls) -> str: + return "v1" + + @classmethod + def request_class(cls) -> Type[FindTracesRequest]: + return FindTracesRequest + + @classmethod + def response_class(cls) -> Type[FindTracesResponse]: + return FindTracesResponse + + def _execute(self, in_msg: FindTracesRequest) -> FindTracesResponse: + _validate_order_by(in_msg) + + in_msg.meta.request_id = getattr(in_msg.meta, "request_id", None) or str( + uuid.uuid4() + ) + snuba_request = _build_snuba_request(in_msg) + res = run_query( + dataset=PluggableDataset(name="eap", all_entities=[]), + request=snuba_request, + timer=self._timer, + ) + traces = _convert_results(in_msg, res.result.get("data", [])) + response_meta = extract_response_meta( + in_msg.meta.request_id, + in_msg.meta.debug, + [res], + [self._timer], + ) + return FindTracesResponse( + traces=traces, + page_token=_get_page_token(in_msg, traces), + meta=response_meta, + ) diff --git a/tests/web/rpc/v1/test_endpoint_find_traces.py b/tests/web/rpc/v1/test_endpoint_find_traces.py new file mode 100644 index 0000000000..4558956469 --- /dev/null +++ b/tests/web/rpc/v1/test_endpoint_find_traces.py @@ -0,0 +1,311 @@ +import random +import uuid +from datetime import datetime, timedelta, timezone +from typing import Any, Mapping + +import pytest +from google.protobuf.timestamp_pb2 import Timestamp +from sentry_protos.snuba.v1.endpoint_find_traces_pb2 import ( + FindTracesRequest, + FindTracesResponse, + TraceColumn, +) +from sentry_protos.snuba.v1.error_pb2 import Error as ErrorProto +from sentry_protos.snuba.v1.request_common_pb2 import ( + PageToken, + RequestMeta, + ResponseMeta, +) +from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey, AttributeValue +from sentry_protos.snuba.v1.trace_item_filter_pb2 import ( + ComparisonFilter, + TraceItemFilter, +) + +from snuba.datasets.storages.factory import get_storage +from snuba.datasets.storages.storage_key import StorageKey +from snuba.web.rpc.v1.endpoint_find_traces import EndpointFindTraces +from tests.base import BaseApiTest +from tests.helpers import write_raw_unprocessed_events + +_RELEASE_TAG = "backend@24.7.0.dev0+c45b49caed1e5fcbf70097ab3f434b487c359b6b" +_SERVER_NAME = "D23CXQ4GK2.local" +_TRACE_IDS = [uuid.uuid4().hex for _ in range(10)] +_BASE_TIME = datetime.now(tz=timezone.utc).replace( + minute=0, + second=0, + microsecond=0, +) - timedelta(minutes=180) + + +def gen_message( + dt: datetime, + trace_id: str, + measurements: dict[str, dict[str, float]] | None = None, + tags: dict[str, str] | None = None, +) -> Mapping[str, Any]: + measurements = measurements or {} + tags = tags or {} + return { + "description": "/api/0/relays/projectconfigs/", + "duration_ms": 152, + "event_id": "d826225de75d42d6b2f01b957d51f18f", + "exclusive_time_ms": 0.228, + "is_segment": True, + "data": { + "sentry.environment": "development", + "sentry.release": _RELEASE_TAG, + "thread.name": "uWSGIWorker1Core0", + "thread.id": "8522009600", + "sentry.segment.name": "/api/0/relays/projectconfigs/", + "sentry.sdk.name": "sentry.python.django", + "sentry.sdk.version": "2.7.0", + "my.float.field": 101.2, + "my.int.field": 2000, + "my.neg.field": -100, + "my.neg.float.field": -101.2, + "my.true.bool.field": True, + "my.false.bool.field": False, + }, + "measurements": { + "num_of_spans": {"value": 50.0}, + "eap.measurement": {"value": random.choice([1, 100, 1000])}, + **measurements, + }, + "organization_id": 1, + "origin": "auto.http.django", + "project_id": 1, + "received": 1721319572.877828, + "retention_days": 90, + "segment_id": "8873a98879faf06d", + "sentry_tags": { + "category": "http", + "environment": "development", + "op": "http.server", + "platform": "python", + "release": _RELEASE_TAG, + "sdk.name": "sentry.python.django", + "sdk.version": "2.7.0", + "status": "ok", + "status_code": "200", + "thread.id": "8522009600", + "thread.name": "uWSGIWorker1Core0", + "trace.status": "ok", + "transaction": "/api/0/relays/projectconfigs/", + "transaction.method": "POST", + "transaction.op": "http.server", + "user": "ip:127.0.0.1", + }, + "span_id": "123456781234567D", + "tags": { + "http.status_code": "200", + "relay_endpoint_version": "3", + "relay_id": "88888888-4444-4444-8444-cccccccccccc", + "relay_no_cache": "False", + "relay_protocol_version": "3", + "relay_use_post_or_schedule": "True", + "relay_use_post_or_schedule_rejected": "version", + "server_name": _SERVER_NAME, + "spans_over_limit": "False", + "color": random.choice(["red", "green", "blue"]), + "location": random.choice(["mobile", "frontend", "backend"]), + **tags, + }, + "trace_id": trace_id, + "start_timestamp_ms": int(dt.timestamp()) * 1000 - int(random.gauss(1000, 200)), + "start_timestamp_precise": dt.timestamp(), + "end_timestamp_precise": dt.timestamp() + 1, + } + + +@pytest.fixture(autouse=False) +def setup_teardown(clickhouse_db: None, redis_db: None) -> None: + spans_storage = get_storage(StorageKey("eap_spans")) + start = _BASE_TIME + messages = [ + gen_message( + dt=start - timedelta(minutes=i), + trace_id=_TRACE_IDS[i % len(_TRACE_IDS)], + ) + for i in range(120) + ] + write_raw_unprocessed_events(spans_storage, messages) # type: ignore + + +@pytest.mark.clickhouse_db +@pytest.mark.redis_db +class TestFindTraces(BaseApiTest): + def test_no_data(self) -> None: + ts = Timestamp() + ts.GetCurrentTime() + message = FindTracesRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=ts, + end_timestamp=ts, + ), + columns=[ + TraceColumn( + name=TraceColumn.Name.TRACE_ID, + type=AttributeKey.TYPE_STRING, + ) + ], + limit=10, + ) + response = self.app.post( + "/rpc/EndpointFindTraces/v1", data=message.SerializeToString() + ) + error_proto = ErrorProto() + if response.status_code != 200: + error_proto.ParseFromString(response.data) + assert response.status_code == 200, error_proto + + def test_with_data_and_order_by(self, setup_teardown: Any) -> None: + ts = Timestamp(seconds=int(_BASE_TIME.timestamp())) + hour_ago = int((_BASE_TIME - timedelta(hours=1)).timestamp()) + message = FindTracesRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=hour_ago), + end_timestamp=ts, + request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480", + ), + columns=[ + TraceColumn( + name=TraceColumn.Name.TRACE_ID, + type=AttributeKey.TYPE_STRING, + ) + ], + order_by=[ + FindTracesRequest.OrderBy( + column=TraceColumn( + name=TraceColumn.Name.TRACE_ID, + type=AttributeKey.TYPE_STRING, + ), + ), + ], + ) + response = EndpointFindTraces().execute(message) + expected_response = FindTracesResponse( + traces=[ + FindTracesResponse.Trace( + columns=[ + FindTracesResponse.Trace.Column( + name=TraceColumn.Name.TRACE_ID, + value=AttributeValue( + val_str=trace_id, + ), + ), + ], + ) + for trace_id in sorted(_TRACE_IDS) + ], + page_token=PageToken(offset=len(_TRACE_IDS)), + meta=ResponseMeta(request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480"), + ) + assert response == expected_response + + def test_with_data_order_by_and_limit(self, setup_teardown: Any) -> None: + ts = Timestamp(seconds=int(_BASE_TIME.timestamp())) + hour_ago = int((_BASE_TIME - timedelta(hours=1)).timestamp()) + message = FindTracesRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=hour_ago), + end_timestamp=ts, + request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480", + ), + columns=[ + TraceColumn( + name=TraceColumn.Name.TRACE_ID, + type=AttributeKey.TYPE_STRING, + ), + ], + order_by=[ + FindTracesRequest.OrderBy( + column=TraceColumn( + name=TraceColumn.Name.TRACE_ID, + type=AttributeKey.TYPE_STRING, + ), + ), + ], + limit=1, + ) + response = EndpointFindTraces().execute(message) + expected_response = FindTracesResponse( + traces=[ + FindTracesResponse.Trace( + columns=[ + FindTracesResponse.Trace.Column( + name=TraceColumn.Name.TRACE_ID, + value=AttributeValue( + val_str=sorted(_TRACE_IDS)[0], + ), + ) + ], + ) + ], + page_token=PageToken(offset=1), + meta=ResponseMeta(request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480"), + ) + assert response == expected_response + + def test_with_data_and_filter(self, setup_teardown: Any) -> None: + ts = Timestamp(seconds=int(_BASE_TIME.timestamp())) + hour_ago = int((_BASE_TIME - timedelta(hours=1)).timestamp()) + message = FindTracesRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=hour_ago), + end_timestamp=ts, + request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480", + ), + filter=TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey( + type=AttributeKey.TYPE_STRING, + name="sentry.trace_id", + ), + op=ComparisonFilter.OP_EQUALS, + value=AttributeValue( + val_str=_TRACE_IDS[0], + ), + ), + ), + columns=[ + TraceColumn( + name=TraceColumn.Name.TRACE_ID, + type=AttributeKey.TYPE_STRING, + ), + ], + ) + response = EndpointFindTraces().execute(message) + expected_response = FindTracesResponse( + traces=[ + FindTracesResponse.Trace( + columns=[ + FindTracesResponse.Trace.Column( + name=TraceColumn.Name.TRACE_ID, + value=AttributeValue( + val_str=_TRACE_IDS[0], + ), + ) + ], + ) + ], + page_token=PageToken(offset=1), + meta=ResponseMeta(request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480"), + ) + assert response == expected_response From 3afc79392ebb9df0019252df192244201e1d1d9b Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Fri, 13 Dec 2024 11:31:55 -0500 Subject: [PATCH 02/16] Switch name to GetTraces --- requirements.txt | 2 +- ..._find_traces.py => endpoint_get_traces.py} | 57 ++++++++-------- ..._traces.py => test_endpoint_get_traces.py} | 67 ++++++++++--------- 3 files changed, 64 insertions(+), 62 deletions(-) rename snuba/web/rpc/v1/{endpoint_find_traces.py => endpoint_get_traces.py} (83%) rename tests/web/rpc/v1/{test_endpoint_find_traces.py => test_endpoint_get_traces.py} (85%) diff --git a/requirements.txt b/requirements.txt index d9a484583e..9c4f195bcb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -46,4 +46,4 @@ sqlparse==0.5.0 google-api-python-client==2.88.0 sentry-usage-accountant==0.0.11 freezegun==1.2.2 -sentry-protos==0.1.40 +sentry-protos @ git+ssh://git@github.com/getsentry/sentry-protos.git@pierre/eap-find-traces#subdirectory=py diff --git a/snuba/web/rpc/v1/endpoint_find_traces.py b/snuba/web/rpc/v1/endpoint_get_traces.py similarity index 83% rename from snuba/web/rpc/v1/endpoint_find_traces.py rename to snuba/web/rpc/v1/endpoint_get_traces.py index be80917c0f..f970fb65f4 100644 --- a/snuba/web/rpc/v1/endpoint_find_traces.py +++ b/snuba/web/rpc/v1/endpoint_get_traces.py @@ -3,9 +3,9 @@ from typing import Any, Callable, Dict, Iterable, Sequence, Type from google.protobuf.json_format import MessageToDict -from sentry_protos.snuba.v1.endpoint_find_traces_pb2 import ( - FindTracesRequest, - FindTracesResponse, +from sentry_protos.snuba.v1.endpoint_get_traces_pb2 import ( + GetTracesRequest, + GetTracesResponse, TraceColumn, ) from sentry_protos.snuba.v1.request_common_pb2 import PageToken @@ -40,8 +40,8 @@ _DEFAULT_ROW_LIMIT = 10_000 _COLUMN_TO_NAME: dict[TraceColumn.Name, str] = { - TraceColumn.Name.TRACE_ID: "trace_id", - TraceColumn.Name.START_TIMESTAMP: "start_timestamp", + TraceColumn.Name.NAME_TRACE_ID: "trace_id", + TraceColumn.Name.NAME_START_TIMESTAMP: "start_timestamp", } _NAME_TO_COLUMN: dict[str, TraceColumn.Name] = { @@ -55,10 +55,10 @@ } _POSSIBLE_TYPES: dict[TraceColumn.Name, set[AttributeKey.Type]] = { - TraceColumn.Name.TRACE_ID: { + TraceColumn.Name.NAME_TRACE_ID: { AttributeKey.Type.TYPE_STRING, }, - TraceColumn.Name.START_TIMESTAMP: { + TraceColumn.Name.NAME_START_TIMESTAMP: { AttributeKey.Type.TYPE_STRING, AttributeKey.Type.TYPE_INT, AttributeKey.Type.TYPE_FLOAT, @@ -84,7 +84,7 @@ def _column_to_expression(trace_column: TraceColumn) -> Expression: def _convert_order_by( - order_by: Sequence[FindTracesRequest.OrderBy], + order_by: Sequence[GetTracesRequest.OrderBy], ) -> Sequence[OrderBy]: res: list[OrderBy] = [] for x in order_by: @@ -98,7 +98,7 @@ def _convert_order_by( return res -def _build_query(request: FindTracesRequest) -> Query: +def _build_query(request: GetTracesRequest) -> Query: entity = Entity( key=EntityKey("eap_spans"), schema=get_entity(EntityKey("eap_spans")).get_data_model(), @@ -128,7 +128,7 @@ def _build_query(request: FindTracesRequest) -> Query: _column_to_expression( TraceColumn( type=AttributeKey.TYPE_STRING, - name=TraceColumn.Name.TRACE_ID, + name=TraceColumn.Name.NAME_TRACE_ID, ), ), ], @@ -138,7 +138,7 @@ def _build_query(request: FindTracesRequest) -> Query: return res -def _build_snuba_request(request: FindTracesRequest) -> SnubaRequest: +def _build_snuba_request(request: GetTracesRequest) -> SnubaRequest: query_settings = ( setup_trace_query_settings() if request.meta.debug else HTTPQuerySettings() ) @@ -163,8 +163,8 @@ def _build_snuba_request(request: FindTracesRequest) -> SnubaRequest: def _convert_results( - request: FindTracesRequest, data: Iterable[Dict[str, Any]] -) -> list[FindTracesResponse.Trace]: + request: GetTracesRequest, data: Iterable[Dict[str, Any]] +) -> list[GetTracesResponse.Trace]: converters: Dict[str, Callable[[Any], AttributeValue]] = {} for trace_column in request.columns: @@ -177,27 +177,28 @@ def _convert_results( elif trace_column.type == AttributeKey.TYPE_FLOAT: converters[trace_column.name] = lambda x: AttributeValue(val_float=float(x)) - res: list[FindTracesResponse.Trace] = [] + res: list[GetTracesResponse.Trace] = [] column_ordering = { trace_column.name: i for i, trace_column in enumerate(request.columns) } for row in data: values: defaultdict[ - TraceColumn.Name, FindTracesResponse.Trace.Column - ] = defaultdict(FindTracesResponse.Trace.Column) + TraceColumn.Name, GetTracesResponse.Trace.Column + ] = defaultdict(GetTracesResponse.Trace.Column) for column_name, value in row.items(): name = _NAME_TO_COLUMN[column_name] if name in converters.keys(): - values[name] = FindTracesResponse.Trace.Column( + values[name] = GetTracesResponse.Trace.Column( + name=name, value=converters[name](value), ) res.append( - FindTracesResponse.Trace( + GetTracesResponse.Trace( # we return the columns in the order they were requested columns=sorted( values.values(), - key=lambda c: column_ordering.__getitem__(c.name), + key=lambda c: column_ordering[c.name], ) ) ) @@ -206,7 +207,7 @@ def _convert_results( def _get_page_token( - request: FindTracesRequest, response: list[FindTracesResponse.Trace] + request: GetTracesRequest, response: list[GetTracesResponse.Trace] ) -> PageToken: if not response: return PageToken(offset=0) @@ -214,7 +215,7 @@ def _get_page_token( return PageToken(offset=request.page_token.offset + num_rows) -def _validate_order_by(in_msg: FindTracesRequest) -> None: +def _validate_order_by(in_msg: GetTracesRequest) -> None: order_by_cols = set([ob.column.name for ob in in_msg.order_by]) selected_columns = set([c.name for c in in_msg.columns]) if not order_by_cols.issubset(selected_columns): @@ -223,20 +224,20 @@ def _validate_order_by(in_msg: FindTracesRequest) -> None: ) -class EndpointFindTraces(RPCEndpoint[FindTracesRequest, FindTracesResponse]): +class EndpointGetTraces(RPCEndpoint[GetTracesRequest, GetTracesResponse]): @classmethod def version(cls) -> str: return "v1" @classmethod - def request_class(cls) -> Type[FindTracesRequest]: - return FindTracesRequest + def request_class(cls) -> Type[GetTracesRequest]: + return GetTracesRequest @classmethod - def response_class(cls) -> Type[FindTracesResponse]: - return FindTracesResponse + def response_class(cls) -> Type[GetTracesResponse]: + return GetTracesResponse - def _execute(self, in_msg: FindTracesRequest) -> FindTracesResponse: + def _execute(self, in_msg: GetTracesRequest) -> GetTracesResponse: _validate_order_by(in_msg) in_msg.meta.request_id = getattr(in_msg.meta, "request_id", None) or str( @@ -255,7 +256,7 @@ def _execute(self, in_msg: FindTracesRequest) -> FindTracesResponse: [res], [self._timer], ) - return FindTracesResponse( + return GetTracesResponse( traces=traces, page_token=_get_page_token(in_msg, traces), meta=response_meta, diff --git a/tests/web/rpc/v1/test_endpoint_find_traces.py b/tests/web/rpc/v1/test_endpoint_get_traces.py similarity index 85% rename from tests/web/rpc/v1/test_endpoint_find_traces.py rename to tests/web/rpc/v1/test_endpoint_get_traces.py index 4558956469..5eb6e5e8f8 100644 --- a/tests/web/rpc/v1/test_endpoint_find_traces.py +++ b/tests/web/rpc/v1/test_endpoint_get_traces.py @@ -5,9 +5,9 @@ import pytest from google.protobuf.timestamp_pb2 import Timestamp -from sentry_protos.snuba.v1.endpoint_find_traces_pb2 import ( - FindTracesRequest, - FindTracesResponse, +from sentry_protos.snuba.v1.endpoint_get_traces_pb2 import ( + GetTracesRequest, + GetTracesResponse, TraceColumn, ) from sentry_protos.snuba.v1.error_pb2 import Error as ErrorProto @@ -24,7 +24,7 @@ from snuba.datasets.storages.factory import get_storage from snuba.datasets.storages.storage_key import StorageKey -from snuba.web.rpc.v1.endpoint_find_traces import EndpointFindTraces +from snuba.web.rpc.v1.endpoint_get_traces import EndpointGetTraces from tests.base import BaseApiTest from tests.helpers import write_raw_unprocessed_events @@ -134,11 +134,11 @@ def setup_teardown(clickhouse_db: None, redis_db: None) -> None: @pytest.mark.clickhouse_db @pytest.mark.redis_db -class TestFindTraces(BaseApiTest): +class TestGetTraces(BaseApiTest): def test_no_data(self) -> None: ts = Timestamp() ts.GetCurrentTime() - message = FindTracesRequest( + message = GetTracesRequest( meta=RequestMeta( project_ids=[1, 2, 3], organization_id=1, @@ -149,14 +149,14 @@ def test_no_data(self) -> None: ), columns=[ TraceColumn( - name=TraceColumn.Name.TRACE_ID, + name=TraceColumn.Name.NAME_TRACE_ID, type=AttributeKey.TYPE_STRING, ) ], limit=10, ) response = self.app.post( - "/rpc/EndpointFindTraces/v1", data=message.SerializeToString() + "/rpc/EndpointGetTraces/v1", data=message.SerializeToString() ) error_proto = ErrorProto() if response.status_code != 200: @@ -166,7 +166,7 @@ def test_no_data(self) -> None: def test_with_data_and_order_by(self, setup_teardown: Any) -> None: ts = Timestamp(seconds=int(_BASE_TIME.timestamp())) hour_ago = int((_BASE_TIME - timedelta(hours=1)).timestamp()) - message = FindTracesRequest( + message = GetTracesRequest( meta=RequestMeta( project_ids=[1, 2, 3], organization_id=1, @@ -178,26 +178,26 @@ def test_with_data_and_order_by(self, setup_teardown: Any) -> None: ), columns=[ TraceColumn( - name=TraceColumn.Name.TRACE_ID, + name=TraceColumn.Name.NAME_TRACE_ID, type=AttributeKey.TYPE_STRING, ) ], order_by=[ - FindTracesRequest.OrderBy( + GetTracesRequest.OrderBy( column=TraceColumn( - name=TraceColumn.Name.TRACE_ID, + name=TraceColumn.Name.NAME_TRACE_ID, type=AttributeKey.TYPE_STRING, ), ), ], ) - response = EndpointFindTraces().execute(message) - expected_response = FindTracesResponse( + response = EndpointGetTraces().execute(message) + expected_response = GetTracesResponse( traces=[ - FindTracesResponse.Trace( + GetTracesResponse.Trace( columns=[ - FindTracesResponse.Trace.Column( - name=TraceColumn.Name.TRACE_ID, + GetTracesResponse.Trace.Column( + name=TraceColumn.Name.NAME_TRACE_ID, value=AttributeValue( val_str=trace_id, ), @@ -214,7 +214,7 @@ def test_with_data_and_order_by(self, setup_teardown: Any) -> None: def test_with_data_order_by_and_limit(self, setup_teardown: Any) -> None: ts = Timestamp(seconds=int(_BASE_TIME.timestamp())) hour_ago = int((_BASE_TIME - timedelta(hours=1)).timestamp()) - message = FindTracesRequest( + message = GetTracesRequest( meta=RequestMeta( project_ids=[1, 2, 3], organization_id=1, @@ -226,27 +226,27 @@ def test_with_data_order_by_and_limit(self, setup_teardown: Any) -> None: ), columns=[ TraceColumn( - name=TraceColumn.Name.TRACE_ID, + name=TraceColumn.Name.NAME_TRACE_ID, type=AttributeKey.TYPE_STRING, ), ], order_by=[ - FindTracesRequest.OrderBy( + GetTracesRequest.OrderBy( column=TraceColumn( - name=TraceColumn.Name.TRACE_ID, + name=TraceColumn.Name.NAME_TRACE_ID, type=AttributeKey.TYPE_STRING, ), ), ], limit=1, ) - response = EndpointFindTraces().execute(message) - expected_response = FindTracesResponse( + response = EndpointGetTraces().execute(message) + expected_response = GetTracesResponse( traces=[ - FindTracesResponse.Trace( + GetTracesResponse.Trace( columns=[ - FindTracesResponse.Trace.Column( - name=TraceColumn.Name.TRACE_ID, + GetTracesResponse.Trace.Column( + name=TraceColumn.Name.NAME_TRACE_ID, value=AttributeValue( val_str=sorted(_TRACE_IDS)[0], ), @@ -262,7 +262,7 @@ def test_with_data_order_by_and_limit(self, setup_teardown: Any) -> None: def test_with_data_and_filter(self, setup_teardown: Any) -> None: ts = Timestamp(seconds=int(_BASE_TIME.timestamp())) hour_ago = int((_BASE_TIME - timedelta(hours=1)).timestamp()) - message = FindTracesRequest( + message = GetTracesRequest( meta=RequestMeta( project_ids=[1, 2, 3], organization_id=1, @@ -286,18 +286,18 @@ def test_with_data_and_filter(self, setup_teardown: Any) -> None: ), columns=[ TraceColumn( - name=TraceColumn.Name.TRACE_ID, + name=TraceColumn.Name.NAME_TRACE_ID, type=AttributeKey.TYPE_STRING, ), ], ) - response = EndpointFindTraces().execute(message) - expected_response = FindTracesResponse( + response = EndpointGetTraces().execute(message) + expected_response = GetTracesResponse( traces=[ - FindTracesResponse.Trace( + GetTracesResponse.Trace( columns=[ - FindTracesResponse.Trace.Column( - name=TraceColumn.Name.TRACE_ID, + GetTracesResponse.Trace.Column( + name=TraceColumn.Name.NAME_TRACE_ID, value=AttributeValue( val_str=_TRACE_IDS[0], ), @@ -308,4 +308,5 @@ def test_with_data_and_filter(self, setup_teardown: Any) -> None: page_token=PageToken(offset=1), meta=ResponseMeta(request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480"), ) + print(response, expected_response) assert response == expected_response From f68fa69ed4c4b47246d820a4150b12cc38a0a751 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Fri, 13 Dec 2024 13:02:13 -0500 Subject: [PATCH 03/16] Remove unneeded print --- tests/web/rpc/v1/test_endpoint_get_traces.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/web/rpc/v1/test_endpoint_get_traces.py b/tests/web/rpc/v1/test_endpoint_get_traces.py index 5eb6e5e8f8..57ab295388 100644 --- a/tests/web/rpc/v1/test_endpoint_get_traces.py +++ b/tests/web/rpc/v1/test_endpoint_get_traces.py @@ -308,5 +308,4 @@ def test_with_data_and_filter(self, setup_teardown: Any) -> None: page_token=PageToken(offset=1), meta=ResponseMeta(request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480"), ) - print(response, expected_response) assert response == expected_response From f9d043bb67f15b5122114ced5cae4d3b8eeb9d14 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Mon, 16 Dec 2024 10:06:02 -0500 Subject: [PATCH 04/16] Add support for some aggregated fields --- snuba/web/rpc/v1/endpoint_get_traces.py | 34 +++-- tests/web/rpc/v1/test_endpoint_get_traces.py | 127 +++++++++++++++++-- 2 files changed, 138 insertions(+), 23 deletions(-) diff --git a/snuba/web/rpc/v1/endpoint_get_traces.py b/snuba/web/rpc/v1/endpoint_get_traces.py index f970fb65f4..0d4423d3c9 100644 --- a/snuba/web/rpc/v1/endpoint_get_traces.py +++ b/snuba/web/rpc/v1/endpoint_get_traces.py @@ -42,6 +42,8 @@ _COLUMN_TO_NAME: dict[TraceColumn.Name, str] = { TraceColumn.Name.NAME_TRACE_ID: "trace_id", TraceColumn.Name.NAME_START_TIMESTAMP: "start_timestamp", + TraceColumn.Name.NAME_TOTAL_SPAN_COUNT: "total_span_count", + TraceColumn.Name.NAME_FILTERED_SPAN_COUNT: "filtered_span_count", } _NAME_TO_COLUMN: dict[str, TraceColumn.Name] = { @@ -66,7 +68,17 @@ } -def _column_to_expression(trace_column: TraceColumn) -> Expression: +def _column_to_expression(trace_column: TraceColumn, conditions=None) -> Expression: + if trace_column.name == TraceColumn.Name.NAME_TOTAL_SPAN_COUNT: + return f.count(alias="total_span_count") + if trace_column.name == TraceColumn.Name.NAME_FILTERED_SPAN_COUNT: + return f.countIf(conditions, alias="filtered_span_count") + if trace_column.name == TraceColumn.Name.NAME_START_TIMESTAMP: + return f.CAST( + f.min(column("start_timestamp")), + _TYPES_TO_CLICKHOUSE[trace_column.type], + alias=_COLUMN_TO_NAME[trace_column.name], + ) if ( trace_column.name in _COLUMN_TO_NAME and trace_column.type in _POSSIBLE_TYPES.get(trace_column.name, {}) @@ -99,16 +111,11 @@ def _convert_order_by( def _build_query(request: GetTracesRequest) -> Query: - entity = Entity( - key=EntityKey("eap_spans"), - schema=get_entity(EntityKey("eap_spans")).get_data_model(), - sample=None, - ) - + trace_item_filter_expressions = trace_item_filters_to_expression(request.filter) selected_columns = [] for trace_column in request.columns: - expression = _column_to_expression(trace_column) + expression = _column_to_expression(trace_column, trace_item_filter_expressions) selected_columns.append( SelectedExpression( name=_COLUMN_TO_NAME[trace_column.name], @@ -116,12 +123,16 @@ def _build_query(request: GetTracesRequest) -> Query: ) ) + entity = Entity( + key=EntityKey("eap_spans"), + schema=get_entity(EntityKey("eap_spans")).get_data_model(), + sample=None, + ) res = Query( from_clause=entity, selected_columns=selected_columns, condition=base_conditions_and( request.meta, - trace_item_filters_to_expression(request.filter), ), order_by=_convert_order_by(request.order_by), groupby=[ @@ -134,7 +145,9 @@ def _build_query(request: GetTracesRequest) -> Query: ], limit=request.limit if request.limit > 0 else _DEFAULT_ROW_LIMIT, ) + treeify_or_and_conditions(res) + return res @@ -184,7 +197,8 @@ def _convert_results( for row in data: values: defaultdict[ - TraceColumn.Name, GetTracesResponse.Trace.Column + TraceColumn.Name, + GetTracesResponse.Trace.Column, ] = defaultdict(GetTracesResponse.Trace.Column) for column_name, value in row.items(): name = _NAME_TO_COLUMN[column_name] diff --git a/tests/web/rpc/v1/test_endpoint_get_traces.py b/tests/web/rpc/v1/test_endpoint_get_traces.py index 57ab295388..eaf8f104df 100644 --- a/tests/web/rpc/v1/test_endpoint_get_traces.py +++ b/tests/web/rpc/v1/test_endpoint_get_traces.py @@ -1,5 +1,6 @@ import random import uuid +from collections import defaultdict from datetime import datetime, timedelta, timezone from typing import Any, Mapping @@ -36,6 +37,7 @@ second=0, microsecond=0, ) - timedelta(minutes=180) +_SPAN_COUNT = 120 def gen_message( @@ -43,13 +45,14 @@ def gen_message( trace_id: str, measurements: dict[str, dict[str, float]] | None = None, tags: dict[str, str] | None = None, + span_op: str = "http.server", ) -> Mapping[str, Any]: measurements = measurements or {} tags = tags or {} return { "description": "/api/0/relays/projectconfigs/", "duration_ms": 152, - "event_id": "d826225de75d42d6b2f01b957d51f18f", + "event_id": uuid.uuid4().hex, "exclusive_time_ms": 0.228, "is_segment": True, "data": { @@ -81,7 +84,7 @@ def gen_message( "sentry_tags": { "category": "http", "environment": "development", - "op": "http.server", + "op": span_op, "platform": "python", "release": _RELEASE_TAG, "sdk.name": "sentry.python.django", @@ -96,7 +99,7 @@ def gen_message( "transaction.op": "http.server", "user": "ip:127.0.0.1", }, - "span_id": "123456781234567D", + "span_id": uuid.uuid4().hex[:16], "tags": { "http.status_code": "200", "relay_endpoint_version": "3", @@ -118,18 +121,20 @@ def gen_message( } +_SPANS = [ + gen_message( + dt=_BASE_TIME - timedelta(minutes=i), + trace_id=_TRACE_IDS[i % len(_TRACE_IDS)], + span_op="http.server" if i < len(_TRACE_IDS) else "db", + ) + for i in range(_SPAN_COUNT) +] + + @pytest.fixture(autouse=False) def setup_teardown(clickhouse_db: None, redis_db: None) -> None: spans_storage = get_storage(StorageKey("eap_spans")) - start = _BASE_TIME - messages = [ - gen_message( - dt=start - timedelta(minutes=i), - trace_id=_TRACE_IDS[i % len(_TRACE_IDS)], - ) - for i in range(120) - ] - write_raw_unprocessed_events(spans_storage, messages) # type: ignore + write_raw_unprocessed_events(spans_storage, _SPANS) # type: ignore @pytest.mark.clickhouse_db @@ -180,7 +185,7 @@ def test_with_data_and_order_by(self, setup_teardown: Any) -> None: TraceColumn( name=TraceColumn.Name.NAME_TRACE_ID, type=AttributeKey.TYPE_STRING, - ) + ), ], order_by=[ GetTracesRequest.OrderBy( @@ -309,3 +314,99 @@ def test_with_data_and_filter(self, setup_teardown: Any) -> None: meta=ResponseMeta(request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480"), ) assert response == expected_response + + def test_with_data_and_order_by_and_aggregated_fields( + self, setup_teardown: Any + ) -> None: + ts = Timestamp(seconds=int(_BASE_TIME.timestamp())) + three_hours_ago = int((_BASE_TIME - timedelta(hours=3)).timestamp()) + start_timestamp_per_trace_id = defaultdict(lambda: 2 * 1e10) + for s in _SPANS: + start_timestamp_per_trace_id[s["trace_id"]] = min( + start_timestamp_per_trace_id[s["trace_id"]], + s["start_timestamp_precise"], + ) + message = GetTracesRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=three_hours_ago), + end_timestamp=ts, + request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480", + ), + columns=[ + TraceColumn( + name=TraceColumn.Name.NAME_TRACE_ID, + type=AttributeKey.TYPE_STRING, + ), + TraceColumn( + name=TraceColumn.Name.NAME_START_TIMESTAMP, + type=AttributeKey.TYPE_FLOAT, + ), + TraceColumn( + name=TraceColumn.Name.NAME_TOTAL_SPAN_COUNT, + type=AttributeKey.TYPE_INT, + ), + TraceColumn( + name=TraceColumn.Name.NAME_FILTERED_SPAN_COUNT, + type=AttributeKey.TYPE_INT, + ), + ], + filter=TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey( + name="sentry.op", + type=AttributeKey.TYPE_STRING, + ), + op=ComparisonFilter.OP_EQUALS, + value=AttributeValue(val_str="db"), + ), + ), + order_by=[ + GetTracesRequest.OrderBy( + column=TraceColumn( + name=TraceColumn.Name.NAME_TRACE_ID, + type=AttributeKey.TYPE_STRING, + ), + ), + ], + ) + response = EndpointGetTraces().execute(message) + expected_response = GetTracesResponse( + traces=[ + GetTracesResponse.Trace( + columns=[ + GetTracesResponse.Trace.Column( + name=TraceColumn.Name.NAME_TRACE_ID, + value=AttributeValue( + val_str=trace_id, + ), + ), + GetTracesResponse.Trace.Column( + name=TraceColumn.Name.NAME_START_TIMESTAMP, + value=AttributeValue( + val_float=start_timestamp_per_trace_id[trace_id], + ), + ), + GetTracesResponse.Trace.Column( + name=TraceColumn.Name.NAME_TOTAL_SPAN_COUNT, + value=AttributeValue( + val_int=_SPAN_COUNT // len(_TRACE_IDS), + ), + ), + GetTracesResponse.Trace.Column( + name=TraceColumn.Name.NAME_FILTERED_SPAN_COUNT, + value=AttributeValue( + val_int=(_SPAN_COUNT // len(_TRACE_IDS)) - 1, + ), + ), + ], + ) + for trace_id in sorted(_TRACE_IDS) + ], + page_token=PageToken(offset=len(_TRACE_IDS)), + meta=ResponseMeta(request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480"), + ) + assert response == expected_response From 92ff17647f1acd9c9ca40474ec7739d3bdf5acb8 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Mon, 16 Dec 2024 10:32:13 -0500 Subject: [PATCH 05/16] Add support for root span name --- snuba/web/rpc/v1/endpoint_get_traces.py | 20 ++++++++++--- tests/web/rpc/v1/test_endpoint_get_traces.py | 30 ++++++++++++++++++-- 2 files changed, 43 insertions(+), 7 deletions(-) diff --git a/snuba/web/rpc/v1/endpoint_get_traces.py b/snuba/web/rpc/v1/endpoint_get_traces.py index 0d4423d3c9..f79690fe98 100644 --- a/snuba/web/rpc/v1/endpoint_get_traces.py +++ b/snuba/web/rpc/v1/endpoint_get_traces.py @@ -40,10 +40,11 @@ _DEFAULT_ROW_LIMIT = 10_000 _COLUMN_TO_NAME: dict[TraceColumn.Name, str] = { - TraceColumn.Name.NAME_TRACE_ID: "trace_id", + TraceColumn.Name.NAME_FILTERED_SPAN_COUNT: "filtered_span_count", + TraceColumn.Name.NAME_ROOT_SPAN_NAME: "root_span_name", TraceColumn.Name.NAME_START_TIMESTAMP: "start_timestamp", TraceColumn.Name.NAME_TOTAL_SPAN_COUNT: "total_span_count", - TraceColumn.Name.NAME_FILTERED_SPAN_COUNT: "filtered_span_count", + TraceColumn.Name.NAME_TRACE_ID: "trace_id", } _NAME_TO_COLUMN: dict[str, TraceColumn.Name] = { @@ -70,15 +71,26 @@ def _column_to_expression(trace_column: TraceColumn, conditions=None) -> Expression: if trace_column.name == TraceColumn.Name.NAME_TOTAL_SPAN_COUNT: - return f.count(alias="total_span_count") + return f.count( + alias=_COLUMN_TO_NAME[trace_column.name], + ) if trace_column.name == TraceColumn.Name.NAME_FILTERED_SPAN_COUNT: - return f.countIf(conditions, alias="filtered_span_count") + return f.countIf( + conditions, + alias=_COLUMN_TO_NAME[trace_column.name], + ) if trace_column.name == TraceColumn.Name.NAME_START_TIMESTAMP: return f.CAST( f.min(column("start_timestamp")), _TYPES_TO_CLICKHOUSE[trace_column.type], alias=_COLUMN_TO_NAME[trace_column.name], ) + if trace_column.name == TraceColumn.Name.NAME_ROOT_SPAN_NAME: + return f.anyIf( + column("name"), + f.equals(column("is_segment"), True), + alias=_COLUMN_TO_NAME[trace_column.name], + ) if ( trace_column.name in _COLUMN_TO_NAME and trace_column.type in _POSSIBLE_TYPES.get(trace_column.name, {}) diff --git a/tests/web/rpc/v1/test_endpoint_get_traces.py b/tests/web/rpc/v1/test_endpoint_get_traces.py index eaf8f104df..dcb56f6cca 100644 --- a/tests/web/rpc/v1/test_endpoint_get_traces.py +++ b/tests/web/rpc/v1/test_endpoint_get_traces.py @@ -46,15 +46,17 @@ def gen_message( measurements: dict[str, dict[str, float]] | None = None, tags: dict[str, str] | None = None, span_op: str = "http.server", + span_name: str = "root", + is_segment: bool = False, ) -> Mapping[str, Any]: measurements = measurements or {} tags = tags or {} return { - "description": "/api/0/relays/projectconfigs/", + "description": span_name, "duration_ms": 152, "event_id": uuid.uuid4().hex, "exclusive_time_ms": 0.228, - "is_segment": True, + "is_segment": is_segment, "data": { "sentry.environment": "development", "sentry.release": _RELEASE_TAG, @@ -80,7 +82,7 @@ def gen_message( "project_id": 1, "received": 1721319572.877828, "retention_days": 90, - "segment_id": "8873a98879faf06d", + "segment_id": trace_id[:16], "sentry_tags": { "category": "http", "environment": "development", @@ -126,6 +128,12 @@ def gen_message( dt=_BASE_TIME - timedelta(minutes=i), trace_id=_TRACE_IDS[i % len(_TRACE_IDS)], span_op="http.server" if i < len(_TRACE_IDS) else "db", + span_name=( + "root" + if i < len(_TRACE_IDS) + else f"child {i%len(_TRACE_IDS)+1} of {_SPAN_COUNT//len(_TRACE_IDS)-1}" + ), + is_segment=True if i < len(_TRACE_IDS) else False, ) for i in range(_SPAN_COUNT) ] @@ -353,6 +361,10 @@ def test_with_data_and_order_by_and_aggregated_fields( name=TraceColumn.Name.NAME_FILTERED_SPAN_COUNT, type=AttributeKey.TYPE_INT, ), + TraceColumn( + name=TraceColumn.Name.NAME_ROOT_SPAN_NAME, + type=AttributeKey.TYPE_STRING, + ), ], filter=TraceItemFilter( comparison_filter=ComparisonFilter( @@ -371,6 +383,12 @@ def test_with_data_and_order_by_and_aggregated_fields( type=AttributeKey.TYPE_STRING, ), ), + GetTracesRequest.OrderBy( + column=TraceColumn( + name=TraceColumn.Name.NAME_START_TIMESTAMP, + type=AttributeKey.TYPE_FLOAT, + ), + ), ], ) response = EndpointGetTraces().execute(message) @@ -402,6 +420,12 @@ def test_with_data_and_order_by_and_aggregated_fields( val_int=(_SPAN_COUNT // len(_TRACE_IDS)) - 1, ), ), + GetTracesResponse.Trace.Column( + name=TraceColumn.Name.NAME_ROOT_SPAN_NAME, + value=AttributeValue( + val_str="root", + ), + ), ], ) for trace_id in sorted(_TRACE_IDS) From 89d89cf9f25165005c7e51d79a7e828ec382f4a4 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Mon, 16 Dec 2024 10:59:51 -0500 Subject: [PATCH 06/16] Use https to fetch other repo --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 9c4f195bcb..92c6e03fc7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -46,4 +46,4 @@ sqlparse==0.5.0 google-api-python-client==2.88.0 sentry-usage-accountant==0.0.11 freezegun==1.2.2 -sentry-protos @ git+ssh://git@github.com/getsentry/sentry-protos.git@pierre/eap-find-traces#subdirectory=py +sentry-protos @ git+https://github.com/getsentry/sentry-protos@pierre/eap-find-traces#subdirectory=py From f99e2d174be77485fb28b1f4e9118d90cdc3919d Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Mon, 23 Dec 2024 14:20:44 -0500 Subject: [PATCH 07/16] Split into 2 queries to gather trace data fast --- requirements.txt | 2 +- snuba/web/rpc/v1/endpoint_get_traces.py | 351 ++++++++++++------- tests/web/rpc/v1/test_endpoint_get_traces.py | 185 +++++----- 3 files changed, 325 insertions(+), 213 deletions(-) diff --git a/requirements.txt b/requirements.txt index 92c6e03fc7..911e20e30c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -46,4 +46,4 @@ sqlparse==0.5.0 google-api-python-client==2.88.0 sentry-usage-accountant==0.0.11 freezegun==1.2.2 -sentry-protos @ git+https://github.com/getsentry/sentry-protos@pierre/eap-find-traces#subdirectory=py +-e file:///Users/phacops/code/sentry-protos/py#sentry-protos diff --git a/snuba/web/rpc/v1/endpoint_get_traces.py b/snuba/web/rpc/v1/endpoint_get_traces.py index f79690fe98..338a50d6fa 100644 --- a/snuba/web/rpc/v1/endpoint_get_traces.py +++ b/snuba/web/rpc/v1/endpoint_get_traces.py @@ -6,20 +6,21 @@ from sentry_protos.snuba.v1.endpoint_get_traces_pb2 import ( GetTracesRequest, GetTracesResponse, - TraceColumn, + TraceAttribute, ) -from sentry_protos.snuba.v1.request_common_pb2 import PageToken +from sentry_protos.snuba.v1.request_common_pb2 import PageToken, TraceItemName from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey, AttributeValue +from sentry_protos.snuba.v1.trace_item_filter_pb2 import TraceItemFilter from snuba.attribution.appid import AppID from snuba.attribution.attribution_info import AttributionInfo from snuba.datasets.entities.entity_key import EntityKey from snuba.datasets.entities.factory import get_entity from snuba.datasets.pluggable_dataset import PluggableDataset -from snuba.query import OrderBy, OrderByDirection, SelectedExpression +from snuba.query import LimitBy, OrderBy, OrderByDirection, SelectedExpression from snuba.query.data_source.simple import Entity from snuba.query.dsl import Functions as f -from snuba.query.dsl import column +from snuba.query.dsl import and_cond, column, in_cond, literal, literals_array from snuba.query.expressions import Expression from snuba.query.logical import Query from snuba.query.query_settings import HTTPQuerySettings @@ -28,6 +29,8 @@ from snuba.web.rpc import RPCEndpoint from snuba.web.rpc.common.common import ( base_conditions_and, + project_id_and_org_conditions, + timestamp_in_range_condition, trace_item_filters_to_expression, treeify_or_and_conditions, ) @@ -39,16 +42,31 @@ _DEFAULT_ROW_LIMIT = 10_000 -_COLUMN_TO_NAME: dict[TraceColumn.Name, str] = { - TraceColumn.Name.NAME_FILTERED_SPAN_COUNT: "filtered_span_count", - TraceColumn.Name.NAME_ROOT_SPAN_NAME: "root_span_name", - TraceColumn.Name.NAME_START_TIMESTAMP: "start_timestamp", - TraceColumn.Name.NAME_TOTAL_SPAN_COUNT: "total_span_count", - TraceColumn.Name.NAME_TRACE_ID: "trace_id", +_ATTRIBUTES: dict[TraceAttribute.Key, tuple[str, AttributeKey.Type]] = { + TraceAttribute.Key.KEY_FILTERED_ITEM_COUNT: ( + "filtered_item_count", + AttributeKey.Type.TYPE_INT, + ), + TraceAttribute.Key.KEY_ROOT_SPAN_NAME: ( + "root_span_name", + AttributeKey.Type.TYPE_STRING, + ), + TraceAttribute.Key.KEY_START_TIMESTAMP: ( + "start_timestamp", + AttributeKey.Type.TYPE_FLOAT, + ), + TraceAttribute.Key.KEY_TOTAL_ITEM_COUNT: ( + "total_item_count", + AttributeKey.Type.TYPE_INT, + ), + TraceAttribute.Key.KEY_TRACE_ID: ( + "trace_id", + AttributeKey.Type.TYPE_STRING, + ), } -_NAME_TO_COLUMN: dict[str, TraceColumn.Name] = { - v: k for k, v in _COLUMN_TO_NAME.items() +_NAME_TO_ATTRIBUTE: dict[str, TraceAttribute.Key] = { + v[0]: k for k, v in _ATTRIBUTES.items() } _TYPES_TO_CLICKHOUSE: dict[AttributeKey.Type, str] = { @@ -57,53 +75,41 @@ AttributeKey.Type.TYPE_FLOAT: "Float64", } -_POSSIBLE_TYPES: dict[TraceColumn.Name, set[AttributeKey.Type]] = { - TraceColumn.Name.NAME_TRACE_ID: { - AttributeKey.Type.TYPE_STRING, - }, - TraceColumn.Name.NAME_START_TIMESTAMP: { - AttributeKey.Type.TYPE_STRING, - AttributeKey.Type.TYPE_INT, - AttributeKey.Type.TYPE_FLOAT, - }, -} - -def _column_to_expression(trace_column: TraceColumn, conditions=None) -> Expression: - if trace_column.name == TraceColumn.Name.NAME_TOTAL_SPAN_COUNT: +def _attribute_to_expression( + trace_attribute: TraceAttribute, conditions=None +) -> Expression: + if trace_attribute.key == TraceAttribute.Key.KEY_TOTAL_ITEM_COUNT: return f.count( - alias=_COLUMN_TO_NAME[trace_column.name], + alias=_ATTRIBUTES[trace_attribute.key][0], ) - if trace_column.name == TraceColumn.Name.NAME_FILTERED_SPAN_COUNT: + if trace_attribute.key == TraceAttribute.Key.KEY_FILTERED_ITEM_COUNT: return f.countIf( conditions, - alias=_COLUMN_TO_NAME[trace_column.name], + alias=_ATTRIBUTES[trace_attribute.key][0], ) - if trace_column.name == TraceColumn.Name.NAME_START_TIMESTAMP: + if trace_attribute.key == TraceAttribute.Key.KEY_START_TIMESTAMP: + attribute = _ATTRIBUTES[trace_attribute.key] return f.CAST( f.min(column("start_timestamp")), - _TYPES_TO_CLICKHOUSE[trace_column.type], - alias=_COLUMN_TO_NAME[trace_column.name], + _TYPES_TO_CLICKHOUSE[attribute[1]], + alias=_ATTRIBUTES[trace_attribute.key][0], ) - if trace_column.name == TraceColumn.Name.NAME_ROOT_SPAN_NAME: + if trace_attribute.key == TraceAttribute.Key.KEY_ROOT_SPAN_NAME: return f.anyIf( column("name"), f.equals(column("is_segment"), True), - alias=_COLUMN_TO_NAME[trace_column.name], + alias=_ATTRIBUTES[trace_attribute.key][0], ) - if ( - trace_column.name in _COLUMN_TO_NAME - and trace_column.type in _POSSIBLE_TYPES.get(trace_column.name, {}) - ): + if trace_attribute.key in _ATTRIBUTES: + attribute = _ATTRIBUTES[trace_attribute.key] return f.CAST( - column( - _COLUMN_TO_NAME[trace_column.name], - ), - _TYPES_TO_CLICKHOUSE[trace_column.type], - alias=_COLUMN_TO_NAME[trace_column.name], + column(attribute[0]), + _TYPES_TO_CLICKHOUSE[attribute[1]], + alias=attribute[0], ) raise BadSnubaRPCRequestException( - f"{trace_column.name} had an unknown or unset type: {trace_column.type}" + f"{trace_attribute.key} had an unknown or unset type: {trace_attribute.type}" ) @@ -112,58 +118,18 @@ def _convert_order_by( ) -> Sequence[OrderBy]: res: list[OrderBy] = [] for x in order_by: - direction = OrderByDirection.DESC if x.descending else OrderByDirection.ASC res.append( OrderBy( - direction=direction, - expression=_column_to_expression(x.column), - ) - ) - return res - - -def _build_query(request: GetTracesRequest) -> Query: - trace_item_filter_expressions = trace_item_filters_to_expression(request.filter) - selected_columns = [] - - for trace_column in request.columns: - expression = _column_to_expression(trace_column, trace_item_filter_expressions) - selected_columns.append( - SelectedExpression( - name=_COLUMN_TO_NAME[trace_column.name], - expression=expression, + direction=( + OrderByDirection.DESC if x.descending else OrderByDirection.ASC + ), + expression=_attribute_to_expression(TraceAttribute(key=x.key)), ) ) - - entity = Entity( - key=EntityKey("eap_spans"), - schema=get_entity(EntityKey("eap_spans")).get_data_model(), - sample=None, - ) - res = Query( - from_clause=entity, - selected_columns=selected_columns, - condition=base_conditions_and( - request.meta, - ), - order_by=_convert_order_by(request.order_by), - groupby=[ - _column_to_expression( - TraceColumn( - type=AttributeKey.TYPE_STRING, - name=TraceColumn.Name.NAME_TRACE_ID, - ), - ), - ], - limit=request.limit if request.limit > 0 else _DEFAULT_ROW_LIMIT, - ) - - treeify_or_and_conditions(res) - return res -def _build_snuba_request(request: GetTracesRequest) -> SnubaRequest: +def _build_snuba_request(request: GetTracesRequest, query: Query) -> SnubaRequest: query_settings = ( setup_trace_query_settings() if request.meta.debug else HTTPQuerySettings() ) @@ -171,7 +137,7 @@ def _build_snuba_request(request: GetTracesRequest) -> SnubaRequest: return SnubaRequest( id=uuid.UUID(request.meta.request_id), original_body=MessageToDict(request), - query=_build_query(request), + query=query, query_settings=query_settings, attribution_info=AttributionInfo( referrer=request.meta.referrer, @@ -192,39 +158,42 @@ def _convert_results( ) -> list[GetTracesResponse.Trace]: converters: Dict[str, Callable[[Any], AttributeValue]] = {} - for trace_column in request.columns: - if trace_column.type == AttributeKey.TYPE_BOOLEAN: - converters[trace_column.name] = lambda x: AttributeValue(val_bool=bool(x)) - elif trace_column.type == AttributeKey.TYPE_STRING: - converters[trace_column.name] = lambda x: AttributeValue(val_str=str(x)) - elif trace_column.type == AttributeKey.TYPE_INT: - converters[trace_column.name] = lambda x: AttributeValue(val_int=int(x)) - elif trace_column.type == AttributeKey.TYPE_FLOAT: - converters[trace_column.name] = lambda x: AttributeValue(val_float=float(x)) + for trace_attribute in request.attributes: + attribute_type = _ATTRIBUTES[trace_attribute.key][1] + if attribute_type == AttributeKey.TYPE_BOOLEAN: + converters[trace_attribute.key] = lambda x: AttributeValue(val_bool=bool(x)) + elif attribute_type == AttributeKey.TYPE_STRING: + converters[trace_attribute.key] = lambda x: AttributeValue(val_str=str(x)) + elif attribute_type == AttributeKey.TYPE_INT: + converters[trace_attribute.key] = lambda x: AttributeValue(val_int=int(x)) + elif attribute_type == AttributeKey.TYPE_FLOAT: + converters[trace_attribute.key] = lambda x: AttributeValue( + val_float=float(x) + ) res: list[GetTracesResponse.Trace] = [] column_ordering = { - trace_column.name: i for i, trace_column in enumerate(request.columns) + trace_attribute.key: i for i, trace_attribute in enumerate(request.attributes) } for row in data: values: defaultdict[ - TraceColumn.Name, - GetTracesResponse.Trace.Column, - ] = defaultdict(GetTracesResponse.Trace.Column) + TraceAttribute.Key, + TraceAttribute, + ] = defaultdict(TraceAttribute) for column_name, value in row.items(): - name = _NAME_TO_COLUMN[column_name] - if name in converters.keys(): - values[name] = GetTracesResponse.Trace.Column( - name=name, - value=converters[name](value), - ) + key = _NAME_TO_ATTRIBUTE[column_name] + values[key] = TraceAttribute( + key=key, + value=converters[key](value), + type=_ATTRIBUTES[key][1], + ) res.append( GetTracesResponse.Trace( # we return the columns in the order they were requested - columns=sorted( + attributes=sorted( values.values(), - key=lambda c: column_ordering[c.name], + key=lambda c: column_ordering[c.key], ) ) ) @@ -233,17 +202,18 @@ def _convert_results( def _get_page_token( - request: GetTracesRequest, response: list[GetTracesResponse.Trace] + request: GetTracesRequest, + rows: list[GetTracesResponse.Trace], ) -> PageToken: - if not response: + if not rows: return PageToken(offset=0) - num_rows = len(response) + num_rows = len(rows) return PageToken(offset=request.page_token.offset + num_rows) def _validate_order_by(in_msg: GetTracesRequest) -> None: - order_by_cols = set([ob.column.name for ob in in_msg.order_by]) - selected_columns = set([c.name for c in in_msg.columns]) + order_by_cols = set([ob.key for ob in in_msg.order_by]) + selected_columns = set([c.key for c in in_msg.attributes]) if not order_by_cols.issubset(selected_columns): raise BadSnubaRPCRequestException( f"Ordered by columns {order_by_cols} not selected: {selected_columns}" @@ -269,21 +239,156 @@ def _execute(self, in_msg: GetTracesRequest) -> GetTracesResponse: in_msg.meta.request_id = getattr(in_msg.meta, "request_id", None) or str( uuid.uuid4() ) - snuba_request = _build_snuba_request(in_msg) - res = run_query( - dataset=PluggableDataset(name="eap", all_entities=[]), - request=snuba_request, - timer=self._timer, - ) - traces = _convert_results(in_msg, res.result.get("data", [])) response_meta = extract_response_meta( in_msg.meta.request_id, in_msg.meta.debug, - [res], + [], [self._timer], ) + + # Get a dict of trace IDs and timestamps. + trace_ids = self._list_trace_ids(request=in_msg) + if len(trace_ids) == 0: + return GetTracesResponse(meta=response_meta) + + # Get metadata for those traces. + traces = self._get_metadata_for_traces(request=in_msg, trace_ids=trace_ids) return GetTracesResponse( traces=traces, page_token=_get_page_token(in_msg, traces), meta=response_meta, ) + + def _list_trace_ids( + self, + request: GetTracesRequest, + ) -> dict[str, int]: + # Find first span filter. + # TODO: support more than one filter. + trace_item_filters_expression = trace_item_filters_to_expression( + next( + f.filter + for f in request.filters + if f.item_name == TraceItemName.TRACE_ITEM_NAME_EAP_SPANS + ) + if len(request.filters) > 0 + else TraceItemFilter() + ) + selected_columns: list[SelectedExpression] = [ + SelectedExpression( + name="trace_id", + expression=f.cast( + column("trace_id"), + "String", + alias="trace_id", + ), + ), + SelectedExpression( + name="_sort_timestamp", + expression=f.cast( + column("_sort_timestamp"), + "UInt32", + alias="_sort_timestamp", + ), + ), + ] + entity = Entity( + key=EntityKey("eap_spans"), + schema=get_entity(EntityKey("eap_spans")).get_data_model(), + sample=None, + ) + query = Query( + from_clause=entity, + selected_columns=selected_columns, + condition=base_conditions_and( + request.meta, + trace_item_filters_expression, + ), + order_by=[ + OrderBy( + direction=OrderByDirection.DESC, + expression=column("_sort_timestamp"), + ), + ], + limitby=LimitBy(limit=1, columns=[column("trace_id")]), + limit=request.limit if request.limit > 0 else _DEFAULT_ROW_LIMIT, + ) + + treeify_or_and_conditions(query) + + results = run_query( + dataset=PluggableDataset(name="eap", all_entities=[]), + request=_build_snuba_request(request, query), + timer=self._timer, + ) + trace_ids: dict[str, int] = {} + for row in results.result.get("data", []): + trace_ids[row["trace_id"]] = row["_sort_timestamp"] + return trace_ids + + def _get_metadata_for_traces( + self, + request: GetTracesRequest, + trace_ids: dict[str, int], + ): + trace_item_filters_expression = trace_item_filters_to_expression( + request.filters[0].filter if len(request.filters) > 0 else TraceItemFilter() + ) + + selected_columns: list[SelectedExpression] = [] + for trace_attribute in request.attributes: + selected_columns.append( + SelectedExpression( + name=_ATTRIBUTES[trace_attribute.key][0], + expression=_attribute_to_expression( + trace_attribute, + trace_item_filters_expression, + ), + ) + ) + + entity = Entity( + key=EntityKey("eap_spans"), + schema=get_entity(EntityKey("eap_spans")).get_data_model(), + sample=None, + ) + timestamps = trace_ids.values() + query = Query( + from_clause=entity, + selected_columns=selected_columns, + condition=and_cond( + project_id_and_org_conditions(request.meta), + timestamp_in_range_condition( + min(timestamps) - 2 * 3600, + max(timestamps) + 2 * 3600, + ), + in_cond( + f.cast( + column("trace_id"), + "String", + alias="trace_id", + ), + literals_array( + None, [literal(trace_id) for trace_id in trace_ids.keys()] + ), + ), + ), + groupby=[ + _attribute_to_expression( + TraceAttribute( + key=TraceAttribute.Key.KEY_TRACE_ID, + ), + ), + ], + order_by=_convert_order_by(request.order_by), + ) + + treeify_or_and_conditions(query) + + results = run_query( + dataset=PluggableDataset(name="eap", all_entities=[]), + request=_build_snuba_request(request, query), + timer=self._timer, + ) + + return _convert_results(request, results.result.get("data", [])) diff --git a/tests/web/rpc/v1/test_endpoint_get_traces.py b/tests/web/rpc/v1/test_endpoint_get_traces.py index dcb56f6cca..1916bf6602 100644 --- a/tests/web/rpc/v1/test_endpoint_get_traces.py +++ b/tests/web/rpc/v1/test_endpoint_get_traces.py @@ -5,17 +5,19 @@ from typing import Any, Mapping import pytest +from google.protobuf.json_format import MessageToDict from google.protobuf.timestamp_pb2 import Timestamp from sentry_protos.snuba.v1.endpoint_get_traces_pb2 import ( GetTracesRequest, GetTracesResponse, - TraceColumn, + TraceAttribute, ) from sentry_protos.snuba.v1.error_pb2 import Error as ErrorProto from sentry_protos.snuba.v1.request_common_pb2 import ( PageToken, RequestMeta, ResponseMeta, + TraceItemName, ) from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey, AttributeValue from sentry_protos.snuba.v1.trace_item_filter_pb2 import ( @@ -148,7 +150,7 @@ def setup_teardown(clickhouse_db: None, redis_db: None) -> None: @pytest.mark.clickhouse_db @pytest.mark.redis_db class TestGetTraces(BaseApiTest): - def test_no_data(self) -> None: + def test_without_data(self) -> None: ts = Timestamp() ts.GetCurrentTime() message = GetTracesRequest( @@ -160,10 +162,9 @@ def test_no_data(self) -> None: start_timestamp=ts, end_timestamp=ts, ), - columns=[ - TraceColumn( - name=TraceColumn.Name.NAME_TRACE_ID, - type=AttributeKey.TYPE_STRING, + attributes=[ + TraceAttribute( + key=TraceAttribute.Key.KEY_TRACE_ID, ) ], limit=10, @@ -189,18 +190,14 @@ def test_with_data_and_order_by(self, setup_teardown: Any) -> None: end_timestamp=ts, request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480", ), - columns=[ - TraceColumn( - name=TraceColumn.Name.NAME_TRACE_ID, - type=AttributeKey.TYPE_STRING, + attributes=[ + TraceAttribute( + key=TraceAttribute.Key.KEY_TRACE_ID, ), ], order_by=[ GetTracesRequest.OrderBy( - column=TraceColumn( - name=TraceColumn.Name.NAME_TRACE_ID, - type=AttributeKey.TYPE_STRING, - ), + key=TraceAttribute.Key.KEY_TRACE_ID, ), ], ) @@ -208,9 +205,10 @@ def test_with_data_and_order_by(self, setup_teardown: Any) -> None: expected_response = GetTracesResponse( traces=[ GetTracesResponse.Trace( - columns=[ - GetTracesResponse.Trace.Column( - name=TraceColumn.Name.NAME_TRACE_ID, + attributes=[ + TraceAttribute( + key=TraceAttribute.Key.KEY_TRACE_ID, + type=AttributeKey.TYPE_STRING, value=AttributeValue( val_str=trace_id, ), @@ -222,7 +220,7 @@ def test_with_data_and_order_by(self, setup_teardown: Any) -> None: page_token=PageToken(offset=len(_TRACE_IDS)), meta=ResponseMeta(request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480"), ) - assert response == expected_response + assert MessageToDict(response) == MessageToDict(expected_response) def test_with_data_order_by_and_limit(self, setup_teardown: Any) -> None: ts = Timestamp(seconds=int(_BASE_TIME.timestamp())) @@ -237,18 +235,14 @@ def test_with_data_order_by_and_limit(self, setup_teardown: Any) -> None: end_timestamp=ts, request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480", ), - columns=[ - TraceColumn( - name=TraceColumn.Name.NAME_TRACE_ID, - type=AttributeKey.TYPE_STRING, + attributes=[ + TraceAttribute( + key=TraceAttribute.Key.KEY_TRACE_ID, ), ], order_by=[ GetTracesRequest.OrderBy( - column=TraceColumn( - name=TraceColumn.Name.NAME_TRACE_ID, - type=AttributeKey.TYPE_STRING, - ), + key=TraceAttribute.Key.KEY_TRACE_ID, ), ], limit=1, @@ -257,11 +251,15 @@ def test_with_data_order_by_and_limit(self, setup_teardown: Any) -> None: expected_response = GetTracesResponse( traces=[ GetTracesResponse.Trace( - columns=[ - GetTracesResponse.Trace.Column( - name=TraceColumn.Name.NAME_TRACE_ID, + attributes=[ + TraceAttribute( + key=TraceAttribute.Key.KEY_TRACE_ID, + type=AttributeKey.Type.TYPE_STRING, value=AttributeValue( - val_str=sorted(_TRACE_IDS)[0], + val_str=sorted( + _SPANS, + key=lambda s: s["start_timestamp_ms"], + )[len(_SPANS) - 1]["trace_id"], ), ) ], @@ -270,7 +268,7 @@ def test_with_data_order_by_and_limit(self, setup_teardown: Any) -> None: page_token=PageToken(offset=1), meta=ResponseMeta(request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480"), ) - assert response == expected_response + assert MessageToDict(response) == MessageToDict(expected_response) def test_with_data_and_filter(self, setup_teardown: Any) -> None: ts = Timestamp(seconds=int(_BASE_TIME.timestamp())) @@ -285,22 +283,26 @@ def test_with_data_and_filter(self, setup_teardown: Any) -> None: end_timestamp=ts, request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480", ), - filter=TraceItemFilter( - comparison_filter=ComparisonFilter( - key=AttributeKey( - type=AttributeKey.TYPE_STRING, - name="sentry.trace_id", - ), - op=ComparisonFilter.OP_EQUALS, - value=AttributeValue( - val_str=_TRACE_IDS[0], + filters=[ + GetTracesRequest.TraceFilter( + item_name=TraceItemName.TRACE_ITEM_NAME_EAP_SPANS, + filter=TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey( + name="sentry.trace_id", + type=AttributeKey.TYPE_STRING, + ), + op=ComparisonFilter.OP_EQUALS, + value=AttributeValue( + val_str=_TRACE_IDS[0], + ), + ), ), ), - ), - columns=[ - TraceColumn( - name=TraceColumn.Name.NAME_TRACE_ID, - type=AttributeKey.TYPE_STRING, + ], + attributes=[ + TraceAttribute( + key=TraceAttribute.Key.KEY_TRACE_ID, ), ], ) @@ -308,9 +310,10 @@ def test_with_data_and_filter(self, setup_teardown: Any) -> None: expected_response = GetTracesResponse( traces=[ GetTracesResponse.Trace( - columns=[ - GetTracesResponse.Trace.Column( - name=TraceColumn.Name.NAME_TRACE_ID, + attributes=[ + TraceAttribute( + key=TraceAttribute.Key.KEY_TRACE_ID, + type=AttributeKey.Type.TYPE_STRING, value=AttributeValue( val_str=_TRACE_IDS[0], ), @@ -321,9 +324,9 @@ def test_with_data_and_filter(self, setup_teardown: Any) -> None: page_token=PageToken(offset=1), meta=ResponseMeta(request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480"), ) - assert response == expected_response + assert MessageToDict(response) == MessageToDict(expected_response) - def test_with_data_and_order_by_and_aggregated_fields( + def test_with_data_order_by_and_aggregated_fields( self, setup_teardown: Any ) -> None: ts = Timestamp(seconds=int(_BASE_TIME.timestamp())) @@ -344,50 +347,49 @@ def test_with_data_and_order_by_and_aggregated_fields( end_timestamp=ts, request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480", ), - columns=[ - TraceColumn( - name=TraceColumn.Name.NAME_TRACE_ID, + attributes=[ + TraceAttribute( + key=TraceAttribute.Key.KEY_TRACE_ID, type=AttributeKey.TYPE_STRING, ), - TraceColumn( - name=TraceColumn.Name.NAME_START_TIMESTAMP, + TraceAttribute( + key=TraceAttribute.Key.KEY_START_TIMESTAMP, type=AttributeKey.TYPE_FLOAT, ), - TraceColumn( - name=TraceColumn.Name.NAME_TOTAL_SPAN_COUNT, + TraceAttribute( + key=TraceAttribute.Key.KEY_TOTAL_ITEM_COUNT, type=AttributeKey.TYPE_INT, ), - TraceColumn( - name=TraceColumn.Name.NAME_FILTERED_SPAN_COUNT, + TraceAttribute( + key=TraceAttribute.Key.KEY_FILTERED_ITEM_COUNT, type=AttributeKey.TYPE_INT, ), - TraceColumn( - name=TraceColumn.Name.NAME_ROOT_SPAN_NAME, + TraceAttribute( + key=TraceAttribute.Key.KEY_ROOT_SPAN_NAME, type=AttributeKey.TYPE_STRING, ), ], - filter=TraceItemFilter( - comparison_filter=ComparisonFilter( - key=AttributeKey( - name="sentry.op", - type=AttributeKey.TYPE_STRING, + filters=[ + GetTracesRequest.TraceFilter( + item_name=TraceItemName.TRACE_ITEM_NAME_EAP_SPANS, + filter=TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey( + name="sentry.op", + type=AttributeKey.TYPE_STRING, + ), + op=ComparisonFilter.OP_EQUALS, + value=AttributeValue(val_str="db"), + ), ), - op=ComparisonFilter.OP_EQUALS, - value=AttributeValue(val_str="db"), ), - ), + ], order_by=[ GetTracesRequest.OrderBy( - column=TraceColumn( - name=TraceColumn.Name.NAME_TRACE_ID, - type=AttributeKey.TYPE_STRING, - ), + key=TraceAttribute.Key.KEY_TRACE_ID, ), GetTracesRequest.OrderBy( - column=TraceColumn( - name=TraceColumn.Name.NAME_START_TIMESTAMP, - type=AttributeKey.TYPE_FLOAT, - ), + key=TraceAttribute.Key.KEY_START_TIMESTAMP, ), ], ) @@ -395,33 +397,38 @@ def test_with_data_and_order_by_and_aggregated_fields( expected_response = GetTracesResponse( traces=[ GetTracesResponse.Trace( - columns=[ - GetTracesResponse.Trace.Column( - name=TraceColumn.Name.NAME_TRACE_ID, + attributes=[ + TraceAttribute( + key=TraceAttribute.Key.KEY_TRACE_ID, + type=AttributeKey.TYPE_STRING, value=AttributeValue( val_str=trace_id, ), ), - GetTracesResponse.Trace.Column( - name=TraceColumn.Name.NAME_START_TIMESTAMP, + TraceAttribute( + key=TraceAttribute.Key.KEY_START_TIMESTAMP, + type=AttributeKey.TYPE_FLOAT, value=AttributeValue( val_float=start_timestamp_per_trace_id[trace_id], ), ), - GetTracesResponse.Trace.Column( - name=TraceColumn.Name.NAME_TOTAL_SPAN_COUNT, + TraceAttribute( + key=TraceAttribute.Key.KEY_TOTAL_ITEM_COUNT, + type=AttributeKey.TYPE_INT, value=AttributeValue( val_int=_SPAN_COUNT // len(_TRACE_IDS), ), ), - GetTracesResponse.Trace.Column( - name=TraceColumn.Name.NAME_FILTERED_SPAN_COUNT, + TraceAttribute( + key=TraceAttribute.Key.KEY_FILTERED_ITEM_COUNT, + type=AttributeKey.TYPE_INT, value=AttributeValue( val_int=(_SPAN_COUNT // len(_TRACE_IDS)) - 1, ), ), - GetTracesResponse.Trace.Column( - name=TraceColumn.Name.NAME_ROOT_SPAN_NAME, + TraceAttribute( + key=TraceAttribute.Key.KEY_ROOT_SPAN_NAME, + type=AttributeKey.TYPE_STRING, value=AttributeValue( val_str="root", ), @@ -433,4 +440,4 @@ def test_with_data_and_order_by_and_aggregated_fields( page_token=PageToken(offset=len(_TRACE_IDS)), meta=ResponseMeta(request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480"), ) - assert response == expected_response + assert MessageToDict(response) == MessageToDict(expected_response) From 8f71604d808adeea3b06df391d12827967be2b30 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Mon, 23 Dec 2024 14:24:56 -0500 Subject: [PATCH 08/16] Upgrade sentry-protos to v0.1.41 --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 911e20e30c..a87cb948a8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -29,6 +29,7 @@ python-rapidjson==1.8 redis==4.5.4 sentry-arroyo==2.19.3 sentry-kafka-schemas==0.1.122 +sentry-protos==0.1.41 sentry-redis-tools==0.3.0 sentry-relay==0.9.2 sentry-sdk==2.18.0 @@ -46,4 +47,3 @@ sqlparse==0.5.0 google-api-python-client==2.88.0 sentry-usage-accountant==0.0.11 freezegun==1.2.2 --e file:///Users/phacops/code/sentry-protos/py#sentry-protos From d4c93c24e6f8467c265db2998b26814b4b8b0be6 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Thu, 2 Jan 2025 10:33:38 -0500 Subject: [PATCH 09/16] Fix type issues --- snuba/web/rpc/v1/endpoint_get_traces.py | 30 +++++++++++++------- tests/web/rpc/v1/test_endpoint_get_traces.py | 12 ++++---- 2 files changed, 27 insertions(+), 15 deletions(-) diff --git a/snuba/web/rpc/v1/endpoint_get_traces.py b/snuba/web/rpc/v1/endpoint_get_traces.py index 338a50d6fa..4607d8a142 100644 --- a/snuba/web/rpc/v1/endpoint_get_traces.py +++ b/snuba/web/rpc/v1/endpoint_get_traces.py @@ -42,7 +42,10 @@ _DEFAULT_ROW_LIMIT = 10_000 -_ATTRIBUTES: dict[TraceAttribute.Key, tuple[str, AttributeKey.Type]] = { +_ATTRIBUTES: dict[ + TraceAttribute.Key.ValueType, + tuple[str, AttributeKey.Type.ValueType], +] = { TraceAttribute.Key.KEY_FILTERED_ITEM_COUNT: ( "filtered_item_count", AttributeKey.Type.TYPE_INT, @@ -65,11 +68,11 @@ ), } -_NAME_TO_ATTRIBUTE: dict[str, TraceAttribute.Key] = { +_NAME_TO_ATTRIBUTE: dict[str, TraceAttribute.Key.ValueType] = { v[0]: k for k, v in _ATTRIBUTES.items() } -_TYPES_TO_CLICKHOUSE: dict[AttributeKey.Type, str] = { +_TYPES_TO_CLICKHOUSE: dict[AttributeKey.Type.ValueType, str] = { AttributeKey.Type.TYPE_STRING: "String", AttributeKey.Type.TYPE_INT: "Int64", AttributeKey.Type.TYPE_FLOAT: "Float64", @@ -77,7 +80,8 @@ def _attribute_to_expression( - trace_attribute: TraceAttribute, conditions=None + trace_attribute: TraceAttribute, + *conditions: Expression, ) -> Expression: if trace_attribute.key == TraceAttribute.Key.KEY_TOTAL_ITEM_COUNT: return f.count( @@ -85,12 +89,12 @@ def _attribute_to_expression( ) if trace_attribute.key == TraceAttribute.Key.KEY_FILTERED_ITEM_COUNT: return f.countIf( - conditions, + *conditions, alias=_ATTRIBUTES[trace_attribute.key][0], ) if trace_attribute.key == TraceAttribute.Key.KEY_START_TIMESTAMP: attribute = _ATTRIBUTES[trace_attribute.key] - return f.CAST( + return f.cast( f.min(column("start_timestamp")), _TYPES_TO_CLICKHOUSE[attribute[1]], alias=_ATTRIBUTES[trace_attribute.key][0], @@ -103,7 +107,7 @@ def _attribute_to_expression( ) if trace_attribute.key in _ATTRIBUTES: attribute = _ATTRIBUTES[trace_attribute.key] - return f.CAST( + return f.cast( column(attribute[0]), _TYPES_TO_CLICKHOUSE[attribute[1]], alias=attribute[0], @@ -156,7 +160,13 @@ def _build_snuba_request(request: GetTracesRequest, query: Query) -> SnubaReques def _convert_results( request: GetTracesRequest, data: Iterable[Dict[str, Any]] ) -> list[GetTracesResponse.Trace]: - converters: Dict[str, Callable[[Any], AttributeValue]] = {} + converters: Dict[ + TraceAttribute.Key.ValueType, + Callable[ + [Any], + AttributeValue, + ], + ] = {} for trace_attribute in request.attributes: attribute_type = _ATTRIBUTES[trace_attribute.key][1] @@ -178,7 +188,7 @@ def _convert_results( for row in data: values: defaultdict[ - TraceAttribute.Key, + TraceAttribute.Key.ValueType, TraceAttribute, ] = defaultdict(TraceAttribute) for column_name, value in row.items(): @@ -330,7 +340,7 @@ def _get_metadata_for_traces( self, request: GetTracesRequest, trace_ids: dict[str, int], - ): + ) -> list[GetTracesResponse.Trace]: trace_item_filters_expression = trace_item_filters_to_expression( request.filters[0].filter if len(request.filters) > 0 else TraceItemFilter() ) diff --git a/tests/web/rpc/v1/test_endpoint_get_traces.py b/tests/web/rpc/v1/test_endpoint_get_traces.py index 1916bf6602..adf33ee840 100644 --- a/tests/web/rpc/v1/test_endpoint_get_traces.py +++ b/tests/web/rpc/v1/test_endpoint_get_traces.py @@ -2,6 +2,7 @@ import uuid from collections import defaultdict from datetime import datetime, timedelta, timezone +from operator import itemgetter from typing import Any, Mapping import pytest @@ -248,6 +249,10 @@ def test_with_data_order_by_and_limit(self, setup_teardown: Any) -> None: limit=1, ) response = EndpointGetTraces().execute(message) + last_span = sorted( + _SPANS, + key=itemgetter("start_timestamp_ms"), + )[len(_SPANS) - 1] expected_response = GetTracesResponse( traces=[ GetTracesResponse.Trace( @@ -256,10 +261,7 @@ def test_with_data_order_by_and_limit(self, setup_teardown: Any) -> None: key=TraceAttribute.Key.KEY_TRACE_ID, type=AttributeKey.Type.TYPE_STRING, value=AttributeValue( - val_str=sorted( - _SPANS, - key=lambda s: s["start_timestamp_ms"], - )[len(_SPANS) - 1]["trace_id"], + val_str=last_span["trace_id"], ), ) ], @@ -331,7 +333,7 @@ def test_with_data_order_by_and_aggregated_fields( ) -> None: ts = Timestamp(seconds=int(_BASE_TIME.timestamp())) three_hours_ago = int((_BASE_TIME - timedelta(hours=3)).timestamp()) - start_timestamp_per_trace_id = defaultdict(lambda: 2 * 1e10) + start_timestamp_per_trace_id: dict[str, float] = defaultdict(lambda: 2 * 1e10) for s in _SPANS: start_timestamp_per_trace_id[s["trace_id"]] = min( start_timestamp_per_trace_id[s["trace_id"]], From a24d1a871c38afd80fe1b0cc4c46cc7d0edc8bc7 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Thu, 2 Jan 2025 11:23:40 -0500 Subject: [PATCH 10/16] Change to guarantee a root span name --- snuba/web/rpc/v1/endpoint_get_traces.py | 7 ++-- tests/web/rpc/v1/test_endpoint_get_traces.py | 37 +++++++++++--------- 2 files changed, 24 insertions(+), 20 deletions(-) diff --git a/snuba/web/rpc/v1/endpoint_get_traces.py b/snuba/web/rpc/v1/endpoint_get_traces.py index 4607d8a142..7f68891a69 100644 --- a/snuba/web/rpc/v1/endpoint_get_traces.py +++ b/snuba/web/rpc/v1/endpoint_get_traces.py @@ -55,7 +55,7 @@ AttributeKey.Type.TYPE_STRING, ), TraceAttribute.Key.KEY_START_TIMESTAMP: ( - "start_timestamp", + "trace_start_timestamp", AttributeKey.Type.TYPE_FLOAT, ), TraceAttribute.Key.KEY_TOTAL_ITEM_COUNT: ( @@ -100,9 +100,10 @@ def _attribute_to_expression( alias=_ATTRIBUTES[trace_attribute.key][0], ) if trace_attribute.key == TraceAttribute.Key.KEY_ROOT_SPAN_NAME: - return f.anyIf( + # TODO: Change to return the root span name instead of the trace's first span's name. + return f.argMin( column("name"), - f.equals(column("is_segment"), True), + column("start_timestamp"), alias=_ATTRIBUTES[trace_attribute.key][0], ) if trace_attribute.key in _ATTRIBUTES: diff --git a/tests/web/rpc/v1/test_endpoint_get_traces.py b/tests/web/rpc/v1/test_endpoint_get_traces.py index adf33ee840..ba05da807d 100644 --- a/tests/web/rpc/v1/test_endpoint_get_traces.py +++ b/tests/web/rpc/v1/test_endpoint_get_traces.py @@ -54,6 +54,9 @@ def gen_message( ) -> Mapping[str, Any]: measurements = measurements or {} tags = tags or {} + timestamp = dt.timestamp() + if not is_segment: + timestamp += random.random() return { "description": span_name, "duration_ms": 152, @@ -120,15 +123,15 @@ def gen_message( **tags, }, "trace_id": trace_id, - "start_timestamp_ms": int(dt.timestamp()) * 1000 - int(random.gauss(1000, 200)), - "start_timestamp_precise": dt.timestamp(), - "end_timestamp_precise": dt.timestamp() + 1, + "start_timestamp_ms": int(timestamp * 1000), + "start_timestamp_precise": timestamp, + "end_timestamp_precise": timestamp + 1, } _SPANS = [ gen_message( - dt=_BASE_TIME - timedelta(minutes=i), + dt=_BASE_TIME + timedelta(minutes=i), trace_id=_TRACE_IDS[i % len(_TRACE_IDS)], span_op="http.server" if i < len(_TRACE_IDS) else "db", span_name=( @@ -136,7 +139,7 @@ def gen_message( if i < len(_TRACE_IDS) else f"child {i%len(_TRACE_IDS)+1} of {_SPAN_COUNT//len(_TRACE_IDS)-1}" ), - is_segment=True if i < len(_TRACE_IDS) else False, + is_segment=i < len(_TRACE_IDS), ) for i in range(_SPAN_COUNT) ] @@ -180,15 +183,15 @@ def test_without_data(self) -> None: def test_with_data_and_order_by(self, setup_teardown: Any) -> None: ts = Timestamp(seconds=int(_BASE_TIME.timestamp())) - hour_ago = int((_BASE_TIME - timedelta(hours=1)).timestamp()) + three_hours_later = int((_BASE_TIME + timedelta(hours=3)).timestamp()) message = GetTracesRequest( meta=RequestMeta( project_ids=[1, 2, 3], organization_id=1, cogs_category="something", referrer="something", - start_timestamp=Timestamp(seconds=hour_ago), - end_timestamp=ts, + start_timestamp=ts, + end_timestamp=Timestamp(seconds=three_hours_later), request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480", ), attributes=[ @@ -225,15 +228,15 @@ def test_with_data_and_order_by(self, setup_teardown: Any) -> None: def test_with_data_order_by_and_limit(self, setup_teardown: Any) -> None: ts = Timestamp(seconds=int(_BASE_TIME.timestamp())) - hour_ago = int((_BASE_TIME - timedelta(hours=1)).timestamp()) + three_hours_later = int((_BASE_TIME + timedelta(hours=3)).timestamp()) message = GetTracesRequest( meta=RequestMeta( project_ids=[1, 2, 3], organization_id=1, cogs_category="something", referrer="something", - start_timestamp=Timestamp(seconds=hour_ago), - end_timestamp=ts, + start_timestamp=ts, + end_timestamp=Timestamp(seconds=three_hours_later), request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480", ), attributes=[ @@ -274,15 +277,15 @@ def test_with_data_order_by_and_limit(self, setup_teardown: Any) -> None: def test_with_data_and_filter(self, setup_teardown: Any) -> None: ts = Timestamp(seconds=int(_BASE_TIME.timestamp())) - hour_ago = int((_BASE_TIME - timedelta(hours=1)).timestamp()) + three_hours_later = int((_BASE_TIME + timedelta(hours=3)).timestamp()) message = GetTracesRequest( meta=RequestMeta( project_ids=[1, 2, 3], organization_id=1, cogs_category="something", referrer="something", - start_timestamp=Timestamp(seconds=hour_ago), - end_timestamp=ts, + start_timestamp=ts, + end_timestamp=Timestamp(seconds=three_hours_later), request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480", ), filters=[ @@ -332,7 +335,7 @@ def test_with_data_order_by_and_aggregated_fields( self, setup_teardown: Any ) -> None: ts = Timestamp(seconds=int(_BASE_TIME.timestamp())) - three_hours_ago = int((_BASE_TIME - timedelta(hours=3)).timestamp()) + three_hours_later = int((_BASE_TIME + timedelta(hours=3)).timestamp()) start_timestamp_per_trace_id: dict[str, float] = defaultdict(lambda: 2 * 1e10) for s in _SPANS: start_timestamp_per_trace_id[s["trace_id"]] = min( @@ -345,8 +348,8 @@ def test_with_data_order_by_and_aggregated_fields( organization_id=1, cogs_category="something", referrer="something", - start_timestamp=Timestamp(seconds=three_hours_ago), - end_timestamp=ts, + start_timestamp=ts, + end_timestamp=Timestamp(seconds=three_hours_later), request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480", ), attributes=[ From 136cc0f4d02c98d685f1fb621f2c0ee69941154c Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Thu, 2 Jan 2025 17:02:18 -0500 Subject: [PATCH 11/16] Refactor request_id into a single variable --- tests/web/rpc/v1/test_endpoint_get_traces.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/tests/web/rpc/v1/test_endpoint_get_traces.py b/tests/web/rpc/v1/test_endpoint_get_traces.py index ba05da807d..a3e95e1ed5 100644 --- a/tests/web/rpc/v1/test_endpoint_get_traces.py +++ b/tests/web/rpc/v1/test_endpoint_get_traces.py @@ -41,6 +41,7 @@ microsecond=0, ) - timedelta(minutes=180) _SPAN_COUNT = 120 +_REQUEST_ID = uuid.uuid4().hex def gen_message( @@ -165,6 +166,7 @@ def test_without_data(self) -> None: referrer="something", start_timestamp=ts, end_timestamp=ts, + request_id=_REQUEST_ID, ), attributes=[ TraceAttribute( @@ -192,7 +194,7 @@ def test_with_data_and_order_by(self, setup_teardown: Any) -> None: referrer="something", start_timestamp=ts, end_timestamp=Timestamp(seconds=three_hours_later), - request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480", + request_id=_REQUEST_ID, ), attributes=[ TraceAttribute( @@ -222,7 +224,7 @@ def test_with_data_and_order_by(self, setup_teardown: Any) -> None: for trace_id in sorted(_TRACE_IDS) ], page_token=PageToken(offset=len(_TRACE_IDS)), - meta=ResponseMeta(request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480"), + meta=ResponseMeta(request_id=_REQUEST_ID), ) assert MessageToDict(response) == MessageToDict(expected_response) @@ -237,7 +239,7 @@ def test_with_data_order_by_and_limit(self, setup_teardown: Any) -> None: referrer="something", start_timestamp=ts, end_timestamp=Timestamp(seconds=three_hours_later), - request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480", + request_id=_REQUEST_ID, ), attributes=[ TraceAttribute( @@ -271,7 +273,7 @@ def test_with_data_order_by_and_limit(self, setup_teardown: Any) -> None: ) ], page_token=PageToken(offset=1), - meta=ResponseMeta(request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480"), + meta=ResponseMeta(request_id=_REQUEST_ID), ) assert MessageToDict(response) == MessageToDict(expected_response) @@ -286,7 +288,7 @@ def test_with_data_and_filter(self, setup_teardown: Any) -> None: referrer="something", start_timestamp=ts, end_timestamp=Timestamp(seconds=three_hours_later), - request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480", + request_id=_REQUEST_ID, ), filters=[ GetTracesRequest.TraceFilter( @@ -327,7 +329,7 @@ def test_with_data_and_filter(self, setup_teardown: Any) -> None: ) ], page_token=PageToken(offset=1), - meta=ResponseMeta(request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480"), + meta=ResponseMeta(request_id=_REQUEST_ID), ) assert MessageToDict(response) == MessageToDict(expected_response) @@ -350,7 +352,7 @@ def test_with_data_order_by_and_aggregated_fields( referrer="something", start_timestamp=ts, end_timestamp=Timestamp(seconds=three_hours_later), - request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480", + request_id=_REQUEST_ID, ), attributes=[ TraceAttribute( @@ -443,6 +445,6 @@ def test_with_data_order_by_and_aggregated_fields( for trace_id in sorted(_TRACE_IDS) ], page_token=PageToken(offset=len(_TRACE_IDS)), - meta=ResponseMeta(request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480"), + meta=ResponseMeta(request_id=_REQUEST_ID), ) assert MessageToDict(response) == MessageToDict(expected_response) From 868c71a55b0eee709ca5af6a0507ae1c72a9ebd3 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Thu, 2 Jan 2025 17:17:07 -0500 Subject: [PATCH 12/16] Add error cases when querying with more than 1 filter or not a span filter --- snuba/web/rpc/v1/endpoint_get_traces.py | 27 ++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/snuba/web/rpc/v1/endpoint_get_traces.py b/snuba/web/rpc/v1/endpoint_get_traces.py index 7f68891a69..e3b82f0862 100644 --- a/snuba/web/rpc/v1/endpoint_get_traces.py +++ b/snuba/web/rpc/v1/endpoint_get_traces.py @@ -2,6 +2,7 @@ from collections import defaultdict from typing import Any, Callable, Dict, Iterable, Sequence, Type +from google.protobuf.internal.containers import RepeatedCompositeFieldContainer from google.protobuf.json_format import MessageToDict from sentry_protos.snuba.v1.endpoint_get_traces_pb2 import ( GetTracesRequest, @@ -231,6 +232,24 @@ def _validate_order_by(in_msg: GetTracesRequest) -> None: ) +def _select_supported_filters( + filters: RepeatedCompositeFieldContainer[GetTracesRequest.TraceFilter], +) -> TraceItemFilter: + filter_count = len(filters) + if filter_count == 0: + return TraceItemFilter() + if filter_count > 1: + raise BadSnubaRPCRequestException("Multiple filters are not supported.") + try: + return next( + f.filter + for f in filters + if f.item_name == TraceItemName.TRACE_ITEM_NAME_EAP_SPANS + ) + except StopIteration: + raise BadSnubaRPCRequestException("Only one span filter is supported.") + + class EndpointGetTraces(RPCEndpoint[GetTracesRequest, GetTracesResponse]): @classmethod def version(cls) -> str: @@ -277,13 +296,7 @@ def _list_trace_ids( # Find first span filter. # TODO: support more than one filter. trace_item_filters_expression = trace_item_filters_to_expression( - next( - f.filter - for f in request.filters - if f.item_name == TraceItemName.TRACE_ITEM_NAME_EAP_SPANS - ) - if len(request.filters) > 0 - else TraceItemFilter() + _select_supported_filters(request.filters), ) selected_columns: list[SelectedExpression] = [ SelectedExpression( From 293f71a61fdd329c97672cc24e5eefb99ac2525a Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Thu, 2 Jan 2025 17:40:59 -0500 Subject: [PATCH 13/16] Use _select_supported_filters everywhere appropriate --- snuba/web/rpc/v1/endpoint_get_traces.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/snuba/web/rpc/v1/endpoint_get_traces.py b/snuba/web/rpc/v1/endpoint_get_traces.py index e3b82f0862..54828e6b9c 100644 --- a/snuba/web/rpc/v1/endpoint_get_traces.py +++ b/snuba/web/rpc/v1/endpoint_get_traces.py @@ -232,6 +232,7 @@ def _validate_order_by(in_msg: GetTracesRequest) -> None: ) +# TODO: support more than one filter. def _select_supported_filters( filters: RepeatedCompositeFieldContainer[GetTracesRequest.TraceFilter], ) -> TraceItemFilter: @@ -241,6 +242,7 @@ def _select_supported_filters( if filter_count > 1: raise BadSnubaRPCRequestException("Multiple filters are not supported.") try: + # Find first span filter. return next( f.filter for f in filters @@ -293,8 +295,6 @@ def _list_trace_ids( self, request: GetTracesRequest, ) -> dict[str, int]: - # Find first span filter. - # TODO: support more than one filter. trace_item_filters_expression = trace_item_filters_to_expression( _select_supported_filters(request.filters), ) @@ -356,7 +356,7 @@ def _get_metadata_for_traces( trace_ids: dict[str, int], ) -> list[GetTracesResponse.Trace]: trace_item_filters_expression = trace_item_filters_to_expression( - request.filters[0].filter if len(request.filters) > 0 else TraceItemFilter() + _select_supported_filters(request.filters), ) selected_columns: list[SelectedExpression] = [] From 01651caa37f073b564a914290d5a7d5fb8fba861 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Thu, 2 Jan 2025 19:02:35 -0500 Subject: [PATCH 14/16] Order by start_timestamp only --- snuba/web/rpc/v1/endpoint_get_traces.py | 30 +++++--------- tests/web/rpc/v1/test_endpoint_get_traces.py | 42 ++++++++------------ 2 files changed, 26 insertions(+), 46 deletions(-) diff --git a/snuba/web/rpc/v1/endpoint_get_traces.py b/snuba/web/rpc/v1/endpoint_get_traces.py index 54828e6b9c..f0a3e40a6d 100644 --- a/snuba/web/rpc/v1/endpoint_get_traces.py +++ b/snuba/web/rpc/v1/endpoint_get_traces.py @@ -1,6 +1,6 @@ import uuid from collections import defaultdict -from typing import Any, Callable, Dict, Iterable, Sequence, Type +from typing import Any, Callable, Dict, Iterable, Type from google.protobuf.internal.containers import RepeatedCompositeFieldContainer from google.protobuf.json_format import MessageToDict @@ -42,6 +42,7 @@ from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException _DEFAULT_ROW_LIMIT = 10_000 +_BUFFER_WINDOW = 2 * 3600 # 2 hours _ATTRIBUTES: dict[ TraceAttribute.Key.ValueType, @@ -119,22 +120,6 @@ def _attribute_to_expression( ) -def _convert_order_by( - order_by: Sequence[GetTracesRequest.OrderBy], -) -> Sequence[OrderBy]: - res: list[OrderBy] = [] - for x in order_by: - res.append( - OrderBy( - direction=( - OrderByDirection.DESC if x.descending else OrderByDirection.ASC - ), - expression=_attribute_to_expression(TraceAttribute(key=x.key)), - ) - ) - return res - - def _build_snuba_request(request: GetTracesRequest, query: Query) -> SnubaRequest: query_settings = ( setup_trace_query_settings() if request.meta.debug else HTTPQuerySettings() @@ -383,8 +368,8 @@ def _get_metadata_for_traces( condition=and_cond( project_id_and_org_conditions(request.meta), timestamp_in_range_condition( - min(timestamps) - 2 * 3600, - max(timestamps) + 2 * 3600, + min(timestamps) - _BUFFER_WINDOW, + max(timestamps) + _BUFFER_WINDOW, ), in_cond( f.cast( @@ -404,7 +389,12 @@ def _get_metadata_for_traces( ), ), ], - order_by=_convert_order_by(request.order_by), + order_by=[ + OrderBy( + direction=OrderByDirection.DESC, + expression=column("trace_start_timestamp"), + ), + ], ) treeify_or_and_conditions(query) diff --git a/tests/web/rpc/v1/test_endpoint_get_traces.py b/tests/web/rpc/v1/test_endpoint_get_traces.py index a3e95e1ed5..46d28c0442 100644 --- a/tests/web/rpc/v1/test_endpoint_get_traces.py +++ b/tests/web/rpc/v1/test_endpoint_get_traces.py @@ -183,7 +183,7 @@ def test_without_data(self) -> None: error_proto.ParseFromString(response.data) assert response.status_code == 200, error_proto - def test_with_data_and_order_by(self, setup_teardown: Any) -> None: + def test_with_data(self, setup_teardown: Any) -> None: ts = Timestamp(seconds=int(_BASE_TIME.timestamp())) three_hours_later = int((_BASE_TIME + timedelta(hours=3)).timestamp()) message = GetTracesRequest( @@ -201,11 +201,6 @@ def test_with_data_and_order_by(self, setup_teardown: Any) -> None: key=TraceAttribute.Key.KEY_TRACE_ID, ), ], - order_by=[ - GetTracesRequest.OrderBy( - key=TraceAttribute.Key.KEY_TRACE_ID, - ), - ], ) response = EndpointGetTraces().execute(message) expected_response = GetTracesResponse( @@ -228,7 +223,7 @@ def test_with_data_and_order_by(self, setup_teardown: Any) -> None: ) assert MessageToDict(response) == MessageToDict(expected_response) - def test_with_data_order_by_and_limit(self, setup_teardown: Any) -> None: + def test_with_data_and_limit(self, setup_teardown: Any) -> None: ts = Timestamp(seconds=int(_BASE_TIME.timestamp())) three_hours_later = int((_BASE_TIME + timedelta(hours=3)).timestamp()) message = GetTracesRequest( @@ -246,11 +241,6 @@ def test_with_data_order_by_and_limit(self, setup_teardown: Any) -> None: key=TraceAttribute.Key.KEY_TRACE_ID, ), ], - order_by=[ - GetTracesRequest.OrderBy( - key=TraceAttribute.Key.KEY_TRACE_ID, - ), - ], limit=1, ) response = EndpointGetTraces().execute(message) @@ -333,9 +323,7 @@ def test_with_data_and_filter(self, setup_teardown: Any) -> None: ) assert MessageToDict(response) == MessageToDict(expected_response) - def test_with_data_order_by_and_aggregated_fields( - self, setup_teardown: Any - ) -> None: + def test_with_data_and_aggregated_fields(self, setup_teardown: Any) -> None: ts = Timestamp(seconds=int(_BASE_TIME.timestamp())) three_hours_later = int((_BASE_TIME + timedelta(hours=3)).timestamp()) start_timestamp_per_trace_id: dict[str, float] = defaultdict(lambda: 2 * 1e10) @@ -344,6 +332,10 @@ def test_with_data_order_by_and_aggregated_fields( start_timestamp_per_trace_id[s["trace_id"]], s["start_timestamp_precise"], ) + trace_id_per_start_timestamp: dict[float, str] = { + timestamp: trace_id + for trace_id, timestamp in start_timestamp_per_trace_id.items() + } message = GetTracesRequest( meta=RequestMeta( project_ids=[1, 2, 3], @@ -391,14 +383,6 @@ def test_with_data_order_by_and_aggregated_fields( ), ), ], - order_by=[ - GetTracesRequest.OrderBy( - key=TraceAttribute.Key.KEY_TRACE_ID, - ), - GetTracesRequest.OrderBy( - key=TraceAttribute.Key.KEY_START_TIMESTAMP, - ), - ], ) response = EndpointGetTraces().execute(message) expected_response = GetTracesResponse( @@ -409,14 +393,16 @@ def test_with_data_order_by_and_aggregated_fields( key=TraceAttribute.Key.KEY_TRACE_ID, type=AttributeKey.TYPE_STRING, value=AttributeValue( - val_str=trace_id, + val_str=trace_id_per_start_timestamp[start_timestamp], ), ), TraceAttribute( key=TraceAttribute.Key.KEY_START_TIMESTAMP, type=AttributeKey.TYPE_FLOAT, value=AttributeValue( - val_float=start_timestamp_per_trace_id[trace_id], + val_float=start_timestamp_per_trace_id[ + trace_id_per_start_timestamp[start_timestamp] + ], ), ), TraceAttribute( @@ -442,9 +428,13 @@ def test_with_data_order_by_and_aggregated_fields( ), ], ) - for trace_id in sorted(_TRACE_IDS) + for start_timestamp in reversed( + sorted(trace_id_per_start_timestamp.keys()) + ) ], page_token=PageToken(offset=len(_TRACE_IDS)), meta=ResponseMeta(request_id=_REQUEST_ID), ) + for start_timestamp in reversed(sorted(trace_id_per_start_timestamp.keys())): + print(start_timestamp, trace_id_per_start_timestamp[start_timestamp]) assert MessageToDict(response) == MessageToDict(expected_response) From ff054376f49a4d508be71db2ad4288263941a248 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Fri, 3 Jan 2025 10:42:37 -0500 Subject: [PATCH 15/16] Request start_timestamp if needed --- snuba/web/rpc/v1/endpoint_get_traces.py | 77 ++++++++++---------- tests/web/rpc/v1/test_endpoint_get_traces.py | 17 ++++- 2 files changed, 54 insertions(+), 40 deletions(-) diff --git a/snuba/web/rpc/v1/endpoint_get_traces.py b/snuba/web/rpc/v1/endpoint_get_traces.py index f0a3e40a6d..464778d6a3 100644 --- a/snuba/web/rpc/v1/endpoint_get_traces.py +++ b/snuba/web/rpc/v1/endpoint_get_traces.py @@ -69,15 +69,19 @@ AttributeKey.Type.TYPE_STRING, ), } - -_NAME_TO_ATTRIBUTE: dict[str, TraceAttribute.Key.ValueType] = { - v[0]: k for k, v in _ATTRIBUTES.items() -} - -_TYPES_TO_CLICKHOUSE: dict[AttributeKey.Type.ValueType, str] = { - AttributeKey.Type.TYPE_STRING: "String", - AttributeKey.Type.TYPE_INT: "Int64", - AttributeKey.Type.TYPE_FLOAT: "Float64", +_TYPES_TO_CLICKHOUSE: dict[AttributeKey.Type.ValueType, tuple[str, Callable]] = { + AttributeKey.Type.TYPE_STRING: ( + "String", + lambda x: AttributeValue(val_str=str(x)), + ), + AttributeKey.Type.TYPE_INT: ( + "Int64", + lambda x: AttributeValue(val_int=int(x)), + ), + AttributeKey.Type.TYPE_FLOAT: ( + "Float64", + lambda x: AttributeValue(val_float=float(x)), + ), } @@ -98,7 +102,7 @@ def _attribute_to_expression( attribute = _ATTRIBUTES[trace_attribute.key] return f.cast( f.min(column("start_timestamp")), - _TYPES_TO_CLICKHOUSE[attribute[1]], + _TYPES_TO_CLICKHOUSE[attribute[1]][0], alias=_ATTRIBUTES[trace_attribute.key][0], ) if trace_attribute.key == TraceAttribute.Key.KEY_ROOT_SPAN_NAME: @@ -112,7 +116,7 @@ def _attribute_to_expression( attribute = _ATTRIBUTES[trace_attribute.key] return f.cast( column(attribute[0]), - _TYPES_TO_CLICKHOUSE[attribute[1]], + _TYPES_TO_CLICKHOUSE[attribute[1]][0], alias=attribute[0], ) raise BadSnubaRPCRequestException( @@ -147,27 +151,6 @@ def _build_snuba_request(request: GetTracesRequest, query: Query) -> SnubaReques def _convert_results( request: GetTracesRequest, data: Iterable[Dict[str, Any]] ) -> list[GetTracesResponse.Trace]: - converters: Dict[ - TraceAttribute.Key.ValueType, - Callable[ - [Any], - AttributeValue, - ], - ] = {} - - for trace_attribute in request.attributes: - attribute_type = _ATTRIBUTES[trace_attribute.key][1] - if attribute_type == AttributeKey.TYPE_BOOLEAN: - converters[trace_attribute.key] = lambda x: AttributeValue(val_bool=bool(x)) - elif attribute_type == AttributeKey.TYPE_STRING: - converters[trace_attribute.key] = lambda x: AttributeValue(val_str=str(x)) - elif attribute_type == AttributeKey.TYPE_INT: - converters[trace_attribute.key] = lambda x: AttributeValue(val_int=int(x)) - elif attribute_type == AttributeKey.TYPE_FLOAT: - converters[trace_attribute.key] = lambda x: AttributeValue( - val_float=float(x) - ) - res: list[GetTracesResponse.Trace] = [] column_ordering = { trace_attribute.key: i for i, trace_attribute in enumerate(request.attributes) @@ -178,12 +161,13 @@ def _convert_results( TraceAttribute.Key.ValueType, TraceAttribute, ] = defaultdict(TraceAttribute) - for column_name, value in row.items(): - key = _NAME_TO_ATTRIBUTE[column_name] - values[key] = TraceAttribute( - key=key, - value=converters[key](value), - type=_ATTRIBUTES[key][1], + for attribute in request.attributes: + value = row[_ATTRIBUTES[attribute.key][0]] + type = _ATTRIBUTES[attribute.key][1] + values[attribute.key] = TraceAttribute( + key=attribute.key, + value=_TYPES_TO_CLICKHOUSE[type][1](value), + type=type, ) res.append( GetTracesResponse.Trace( @@ -345,7 +329,24 @@ def _get_metadata_for_traces( ) selected_columns: list[SelectedExpression] = [] + start_timestamp_requested = False for trace_attribute in request.attributes: + if trace_attribute.key == TraceAttribute.Key.KEY_START_TIMESTAMP: + start_timestamp_requested = True + selected_columns.append( + SelectedExpression( + name=_ATTRIBUTES[trace_attribute.key][0], + expression=_attribute_to_expression( + trace_attribute, + trace_item_filters_expression, + ), + ) + ) + + # Since we're always ordering by start_timestamp, we need to request + # the field unless it's already been requested. + if not start_timestamp_requested: + trace_attribute = TraceAttribute(key=TraceAttribute.Key.KEY_START_TIMESTAMP) selected_columns.append( SelectedExpression( name=_ATTRIBUTES[trace_attribute.key][0], diff --git a/tests/web/rpc/v1/test_endpoint_get_traces.py b/tests/web/rpc/v1/test_endpoint_get_traces.py index 46d28c0442..64e2c6c6e2 100644 --- a/tests/web/rpc/v1/test_endpoint_get_traces.py +++ b/tests/web/rpc/v1/test_endpoint_get_traces.py @@ -186,6 +186,17 @@ def test_without_data(self) -> None: def test_with_data(self, setup_teardown: Any) -> None: ts = Timestamp(seconds=int(_BASE_TIME.timestamp())) three_hours_later = int((_BASE_TIME + timedelta(hours=3)).timestamp()) + start_timestamp_per_trace_id: dict[str, float] = defaultdict(lambda: 2 * 1e10) + for s in _SPANS: + start_timestamp_per_trace_id[s["trace_id"]] = min( + start_timestamp_per_trace_id[s["trace_id"]], + s["start_timestamp_precise"], + ) + trace_id_per_start_timestamp: dict[float, str] = { + timestamp: trace_id + for trace_id, timestamp in start_timestamp_per_trace_id.items() + } + message = GetTracesRequest( meta=RequestMeta( project_ids=[1, 2, 3], @@ -211,12 +222,14 @@ def test_with_data(self, setup_teardown: Any) -> None: key=TraceAttribute.Key.KEY_TRACE_ID, type=AttributeKey.TYPE_STRING, value=AttributeValue( - val_str=trace_id, + val_str=trace_id_per_start_timestamp[start_timestamp], ), ), ], ) - for trace_id in sorted(_TRACE_IDS) + for start_timestamp in reversed( + sorted(trace_id_per_start_timestamp.keys()) + ) ], page_token=PageToken(offset=len(_TRACE_IDS)), meta=ResponseMeta(request_id=_REQUEST_ID), From 4b212502b14e92a52a09d3c8b67fa828a84c6c8f Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Fri, 3 Jan 2025 14:48:54 -0500 Subject: [PATCH 16/16] Fix typing --- snuba/web/rpc/v1/endpoint_get_traces.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/snuba/web/rpc/v1/endpoint_get_traces.py b/snuba/web/rpc/v1/endpoint_get_traces.py index 464778d6a3..e60a8b8da7 100644 --- a/snuba/web/rpc/v1/endpoint_get_traces.py +++ b/snuba/web/rpc/v1/endpoint_get_traces.py @@ -69,7 +69,10 @@ AttributeKey.Type.TYPE_STRING, ), } -_TYPES_TO_CLICKHOUSE: dict[AttributeKey.Type.ValueType, tuple[str, Callable]] = { +_TYPES_TO_CLICKHOUSE: dict[ + AttributeKey.Type.ValueType, + tuple[str, Callable[[Any], AttributeValue]], +] = { AttributeKey.Type.TYPE_STRING: ( "String", lambda x: AttributeValue(val_str=str(x)),