Skip to content

Commit

Permalink
inc984: scrub user from eap_spans (#6691)
Browse files Browse the repository at this point in the history
Co-authored-by: Rachel Chen <[email protected]>
Co-authored-by: Rachel Chen <[email protected]>
  • Loading branch information
3 people authored Dec 19, 2024
1 parent 5b718ee commit 4ffbb49
Show file tree
Hide file tree
Showing 2 changed files with 381 additions and 0 deletions.
52 changes: 52 additions & 0 deletions snuba/manual_jobs/scrub_users_from_eap_spans.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from datetime import datetime
from typing import Any, Mapping, Optional

from snuba.clusters.cluster import ClickhouseClientSettings, get_cluster
from snuba.clusters.storage_sets import StorageSetKey
from snuba.manual_jobs import Job, JobLogger, JobSpec

_IP_PREFIX = "ip:"
_SCRUBBED = "scrubbed"


class ScrubUserFromEAPSpans(Job):
def __init__(self, job_spec: JobSpec) -> None:
self.__validate_job_params(job_spec.params)
super().__init__(job_spec)

def __validate_job_params(self, params: Optional[Mapping[Any, Any]]) -> None:
assert params
assert isinstance(params["organization_ids"], list)
assert all([isinstance(p, int) for p in params["organization_ids"]])
self._organization_ids = params["organization_ids"]
self._start_datetime = datetime.fromisoformat(params["start_datetime"])
self._end_datetime = datetime.fromisoformat(params["end_datetime"])

def _get_query(self, cluster_name: str | None) -> str:
organization_ids = ",".join([str(p) for p in self._organization_ids])
start_datetime = self._start_datetime.isoformat()
end_datetime = self._end_datetime.isoformat()
on_cluster = f"ON CLUSTER '{cluster_name}'" if cluster_name else ""
return f"""ALTER TABLE eap_spans_2_local
{on_cluster}
UPDATE `attr_str_2` = mapApply((k, v) -> (k, if(k = 'user' AND startsWith(v, '{_IP_PREFIX}') AND (isIPv4String(substring(v, 4)) OR isIPv6String(substring(v, 4))), 'ip:scrubbed', v)), `attr_str_2`)
WHERE organization_id IN [{organization_ids}]
AND _sort_timestamp >= toDateTime('{start_datetime}')
AND _sort_timestamp < toDateTime('{end_datetime}')"""

def execute(self, logger: JobLogger) -> None:
cluster = get_cluster(StorageSetKey.EVENTS_ANALYTICS_PLATFORM)
storage_node = cluster.get_local_nodes()[0]
connection = cluster.get_node_connection(
ClickhouseClientSettings.CLEANUP, storage_node
)
if not cluster.is_single_node():
cluster_name = cluster.get_clickhouse_cluster_name()
else:
cluster_name = None
query = self._get_query(cluster_name)
logger.info("Executing query: {query}")
result = connection.execute(query=query, settings={"mutations_sync": 2})

logger.info("complete")
logger.info(repr(result))
329 changes: 329 additions & 0 deletions tests/manual_jobs/test_scrub_users_from_eap_spans.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,329 @@
import random
import uuid
from datetime import datetime, timedelta
from typing import Any, Mapping

import pytest
from google.protobuf.timestamp_pb2 import Timestamp
from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import (
Column,
TraceItemColumnValues,
TraceItemTableRequest,
TraceItemTableResponse,
)
from sentry_protos.snuba.v1.request_common_pb2 import (
PageToken,
RequestMeta,
ResponseMeta,
)
from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey, AttributeValue
from sentry_protos.snuba.v1.trace_item_filter_pb2 import ExistsFilter, TraceItemFilter

from snuba.datasets.storages.factory import get_storage
from snuba.datasets.storages.storage_key import StorageKey
from snuba.manual_jobs import JobSpec
from snuba.manual_jobs.job_status import JobStatus
from snuba.manual_jobs.runner import get_job_status, run_job
from snuba.manual_jobs.scrub_users_from_eap_spans import ScrubUserFromEAPSpans
from snuba.web.rpc.v1.endpoint_trace_item_table import EndpointTraceItemTable
from tests.helpers import write_raw_unprocessed_events

_RELEASE_TAG = "[email protected]+c45b49caed1e5fcbf70097ab3f434b487c359b6b"
_USER = "192.168.0.45"


@pytest.mark.redis_db
@pytest.mark.clickhouse_db
def test_basic() -> None:
job_id = "abc"
run_job(
JobSpec(
job_id,
"ScrubUserFromEAPSpans",
False,
{
"organization_ids": [1, 3, 5, 6],
"start_datetime": "2024-12-01 00:00:00",
"end_datetime": "2024-12-10 00:00:00",
},
)
)

assert get_job_status(job_id) == JobStatus.FINISHED


@pytest.mark.parametrize(
("jobspec"),
[
JobSpec(
"abc",
"ScrubUserFromEAPSpans",
False,
{
"organization_ids": [1, "b"],
"start_datetime": "2024-12-01 00:00:00",
"end_datetime": "2024-12-10 00:00:00",
},
),
JobSpec(
"abc",
"ScrubUserFromEAPSpans",
False,
{
"organization_ids": [1, 2],
"start_datetime": "2024-12-01 00:00:0",
"end_datetime": "2024-12-10 00:00:00",
},
),
JobSpec(
"abc",
"ScrubUserFromEAPSpans",
False,
{
"organization_ids": [1, 2],
"start_datetime": "2024-12-01 00:00:00",
"end_datetime": "2024-12-10 00:00:0",
},
),
],
)
@pytest.mark.redis_db
def test_fail_validation(jobspec: JobSpec) -> None:
with pytest.raises(Exception):
run_job(jobspec)


@pytest.mark.redis_db
def test_generate_query() -> None:
job = ScrubUserFromEAPSpans(
JobSpec(
"bassa",
"ScrubUserFromEAPSpans",
False,
{
"organization_ids": [1, 3, 5, 6],
"start_datetime": "2024-12-01 00:00:00",
"end_datetime": "2024-12-10 00:00:00",
},
)
)
assert (
job._get_query(None)
== """ALTER TABLE eap_spans_2_local
UPDATE `attr_str_2` = mapApply((k, v) -> (k, if(k = 'user' AND startsWith(v, 'ip:') AND (isIPv4String(substring(v, 4)) OR isIPv6String(substring(v, 4))), 'ip:scrubbed', v)), `attr_str_2`)
WHERE organization_id IN [1,3,5,6]
AND _sort_timestamp >= toDateTime('2024-12-01T00:00:00')
AND _sort_timestamp < toDateTime('2024-12-10T00:00:00')"""
)


def _gen_message(
dt: datetime,
organization_id: int,
user: str,
tags: dict[str, str] | None = None,
) -> Mapping[str, Any]:
measurements: dict[Any, Any] = {}
tags = tags or {}
return {
"description": "/api/0/relays/projectconfigs/",
"duration_ms": 152,
"event_id": "d826225de75d42d6b2f01b957d51f18f",
"exclusive_time_ms": 0.228,
"is_segment": True,
"data": {
"sentry.environment": "development",
"sentry.release": _RELEASE_TAG,
"thread.name": "uWSGIWorker1Core0",
"thread.id": "8522009600",
"sentry.segment.name": "/api/0/relays/projectconfigs/",
"sentry.sdk.name": "sentry.python.django",
"sentry.sdk.version": "2.7.0",
"my.float.field": 101.2,
"my.int.field": 2000,
"my.neg.field": -100,
"my.neg.float.field": -101.2,
"my.true.bool.field": True,
"my.false.bool.field": False,
},
"measurements": {
"num_of_spans": {"value": 50.0},
"eap.measurement": {"value": random.choice([1, 100, 1000])},
**measurements,
},
"organization_id": organization_id,
"origin": "auto.http.django",
"project_id": 1,
"received": 1721319572.877828,
"retention_days": 90,
"segment_id": "8873a98879faf06d",
"sentry_tags": {
"category": "http",
"environment": "development",
"op": "http.server",
"platform": "python",
"release": _RELEASE_TAG,
"sdk.name": "sentry.python.django",
"sdk.version": "2.7.0",
"status": "ok",
"status_code": "200",
"thread.id": "8522009600",
"thread.name": "uWSGIWorker1Core0",
"trace.status": "ok",
"transaction": "/api/0/relays/projectconfigs/",
"transaction.method": "POST",
"transaction.op": "http.server",
"user": user,
},
"span_id": "123456781234567D",
"tags": {
"http.status_code": "200",
"relay_endpoint_version": "3",
"relay_id": "88888888-4444-4444-8444-cccccccccccc",
"relay_no_cache": "False",
"relay_protocol_version": "3",
"relay_use_post_or_schedule": "True",
"relay_use_post_or_schedule_rejected": "version",
"user.ip": "192.168.0.45",
"user": user,
"spans_over_limit": "False",
"server_name": "blah",
"color": random.choice(["red", "green", "blue"]),
"location": random.choice(["mobile", "frontend", "backend"]),
**tags,
},
"trace_id": uuid.uuid4().hex,
"start_timestamp_ms": int(dt.timestamp()) * 1000 - int(random.gauss(1000, 200)),
"start_timestamp_precise": dt.timestamp(),
"end_timestamp_precise": dt.timestamp() + 1,
}


def _generate_request(
ts: Any, hour_ago: int, organization_id: int, project_ids: list[int]
) -> TraceItemTableRequest:
# project_ids is added as an argument to avoid this query getting cached
return TraceItemTableRequest(
meta=RequestMeta(
project_ids=project_ids,
organization_id=organization_id,
cogs_category="something",
referrer="something",
start_timestamp=Timestamp(seconds=hour_ago),
end_timestamp=ts,
request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480",
),
filter=TraceItemFilter(
exists_filter=ExistsFilter(
key=AttributeKey(type=AttributeKey.TYPE_STRING, name="color")
)
),
columns=[Column(key=AttributeKey(type=AttributeKey.TYPE_STRING, name="user"))],
order_by=[
TraceItemTableRequest.OrderBy(
column=Column(
key=AttributeKey(type=AttributeKey.TYPE_STRING, name="user")
)
)
],
)


def _generate_expected_response(user: str) -> TraceItemTableResponse:
return TraceItemTableResponse(
column_values=[
TraceItemColumnValues(
attribute_name="user",
results=[AttributeValue(val_str=user) for _ in range(20)],
)
],
page_token=PageToken(offset=20),
meta=ResponseMeta(request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480"),
)


@pytest.mark.clickhouse_db
@pytest.mark.redis_db
def test_span_is_scrubbed() -> None:
BASE_TIME = datetime.utcnow().replace(
minute=0, second=0, microsecond=0
) - timedelta(minutes=180)
organization_ids = [0, 1, 2]
spans_storage = get_storage(StorageKey("eap_spans"))
messages = []
for organization_id in organization_ids:
for i in range(20):
if organization_id == 0:
messages.append(
_gen_message(
BASE_TIME - timedelta(minutes=i), organization_id, "ip:" + _USER
)
)
if organization_id == 1:
messages.append(
_gen_message(
BASE_TIME - timedelta(minutes=i),
organization_id,
"email:" + _USER,
)
)
if organization_id == 2:
messages.append(
_gen_message(
BASE_TIME - timedelta(minutes=i),
organization_id,
"noscrub",
)
)

write_raw_unprocessed_events(spans_storage, messages) # type: ignore

# we inserted spans for organizations 0, 1, 2, and we make sure they look as expected
ts = Timestamp(seconds=int(BASE_TIME.timestamp()))
hour_ago = int((BASE_TIME - timedelta(hours=1)).timestamp())
for organization_id in organization_ids:

response = EndpointTraceItemTable().execute(
_generate_request(ts, hour_ago, organization_id, [1, 2, 3])
)
if organization_id == 0:
assert response == _generate_expected_response("ip:" + _USER)
if organization_id == 1:
assert response == _generate_expected_response("email:" + _USER)
if organization_id == 2:
assert response == _generate_expected_response("noscrub")

# next we scrub organizations 0
start_datetime = datetime.utcfromtimestamp(Timestamp(seconds=hour_ago).seconds)
end_datetime = datetime.utcfromtimestamp(ts.seconds)

run_job(
JobSpec(
"plswork",
"ScrubUserFromEAPSpans",
False,
{
"organization_ids": [organization_ids[0]],
"start_datetime": start_datetime.strftime("%Y-%m-%d %H:%M:%S"),
"end_datetime": end_datetime.strftime("%Y-%m-%d %H:%M:%S"),
},
)
)

response = EndpointTraceItemTable().execute(
_generate_request(ts, hour_ago, organization_ids[0], [3, 2, 1])
)
assert response == _generate_expected_response("ip:scrubbed")

# then we make sure organization 1 is NOT SCRUBBED because it's not coalesced using ip
response = EndpointTraceItemTable().execute(
_generate_request(ts, hour_ago, organization_ids[1], [3, 2, 1])
)
assert response == _generate_expected_response("email:" + _USER)

# then we make sure organization 2 is NOT SCRUBBED because its irrelevant
response = EndpointTraceItemTable().execute(
_generate_request(ts, hour_ago, organization_ids[2], [3, 2, 1])
)
assert response == _generate_expected_response("noscrub")

0 comments on commit 4ffbb49

Please sign in to comment.