Skip to content

Commit

Permalink
feat(taskworker) Add tracing continutation to taskworker tasks (#81831)
Browse files Browse the repository at this point in the history
Add sentry-trace and baggage to task headers. With these headers we can
continue traces from within workers so that spawned tasks are included
in the trace they originated from.

This screenshot shows a task that was spawned by
`OrganizationDetails.get()` included in the trace from the endpoint.

Refs #80054
Refs #80254
  • Loading branch information
markstory authored Dec 10, 2024
1 parent afb0c19 commit 3997d3f
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 11 deletions.
9 changes: 9 additions & 0 deletions src/sentry/taskworker/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,19 @@ def producer(self) -> KafkaProducer:
return self._producer

def get(self, name: str) -> Task[Any, Any]:
"""
Get a registered task by name
Raises KeyError when an unknown task is provided.
"""
if name not in self._registered_tasks:
raise KeyError(f"No task registered with the name {name}. Check your imports")
return self._registered_tasks[name]

def contains(self, name: str) -> bool:
"""
Check if a task name has been registered
"""
return name in self._registered_tasks

def register(
Expand All @@ -80,6 +88,7 @@ def register(
asynchronously via taskworkers.
Parameters
----------
name: str
The name of the task. This is serialized and must be stable across deploys.
Expand Down
23 changes: 23 additions & 0 deletions src/sentry/taskworker/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from uuid import uuid4

import orjson
import sentry_sdk
from django.conf import settings
from django.utils import timezone
from google.protobuf.timestamp_pb2 import Timestamp
Expand Down Expand Up @@ -58,15 +59,31 @@ def retry(self) -> Retry | None:
return self._retry

def __call__(self, *args: P.args, **kwargs: P.kwargs) -> R:
"""
Call the task function immediately.
"""
return self._func(*args, **kwargs)

def delay(self, *args: P.args, **kwargs: P.kwargs) -> None:
"""
Schedule a task to run later with a set of arguments.
The provided parameters will be JSON encoded and stored within
a `TaskActivation` protobuf that is appended to kafka
"""
self.apply_async(*args, **kwargs)

def apply_async(self, *args: P.args, **kwargs: P.kwargs) -> None:
"""
Schedule a task to run later with a set of arguments.
The provided parameters will be JSON encoded and stored within
a `TaskActivation` protobuf that is appended to kafka
"""
if settings.TASK_WORKER_ALWAYS_EAGER:
self._func(*args, **kwargs)
else:
# TODO(taskworker) promote parameters to headers
self._namespace.send_task(self.create_activation(*args, **kwargs))

def create_activation(self, *args: P.args, **kwargs: P.kwargs) -> TaskActivation:
Expand All @@ -81,10 +98,16 @@ def create_activation(self, *args: P.args, **kwargs: P.kwargs) -> TaskActivation
if isinstance(expires, datetime.timedelta):
expires = int(expires.total_seconds())

headers = {
"sentry-trace": sentry_sdk.get_traceparent() or "",
"baggage": sentry_sdk.get_baggage() or "",
}

return TaskActivation(
id=uuid4().hex,
namespace=self._namespace.name,
taskname=self.name,
headers=headers,
parameters=orjson.dumps({"args": args, "kwargs": kwargs}).decode("utf8"),
retry_state=self._create_retry_state(),
received_at=received_at,
Expand Down
26 changes: 15 additions & 11 deletions src/sentry/taskworker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import grpc
import orjson
import sentry_sdk
from django.conf import settings
from django.core.cache import cache
from sentry_protos.sentry.v1.taskworker_pb2 import (
Expand All @@ -30,11 +31,20 @@
mp_context = multiprocessing.get_context("fork")


def _process_activation(
namespace: str, task_name: str, args: list[Any], kwargs: dict[str, Any]
) -> None:
def _process_activation(activation: TaskActivation) -> None:
"""multiprocess worker method"""
taskregistry.get(namespace).get(task_name)(*args, **kwargs)
parameters = orjson.loads(activation.parameters)
args = parameters.get("args", [])
kwargs = parameters.get("kwargs", {})
headers = {k: v for k, v in activation.headers.items()}

transaction = sentry_sdk.continue_trace(
environ_or_headers=headers,
op="task.taskworker",
name=f"{activation.namespace}:{activation.taskname}",
)
with sentry_sdk.start_transaction(transaction):
taskregistry.get(activation.namespace).get(activation.taskname)(*args, **kwargs)


AT_MOST_ONCE_TIMEOUT = 60 * 60 * 24 # 1 day
Expand Down Expand Up @@ -187,17 +197,11 @@ def process_task(self, activation: TaskActivation) -> TaskActivation | None:
result = None
execution_start_time = 0.0
try:
task_data_parameters = orjson.loads(activation.parameters)
execution_start_time = time.time()

result = self._pool.apply_async(
func=_process_activation,
args=(
activation.namespace,
activation.taskname,
task_data_parameters["args"],
task_data_parameters["kwargs"],
),
args=(activation,),
)
# Will trigger a TimeoutError if the task execution runs long
result.get(timeout=processing_timeout)
Expand Down
14 changes: 14 additions & 0 deletions tests/sentry/taskworker/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging

import pytest
import sentry_sdk

from sentry.conf.types.kafka_definition import Topic
from sentry.taskworker.registry import TaskNamespace
Expand Down Expand Up @@ -174,3 +175,16 @@ def with_parameters(one: str, two: int, org_id: int) -> None:
assert params["args"]
assert params["args"] == ["one", 22]
assert params["kwargs"] == {"org_id": 99}


def test_create_activation_tracing(task_namespace: TaskNamespace) -> None:
@task_namespace.register(name="test.parameters")
def with_parameters(one: str, two: int, org_id: int) -> None:
pass

with sentry_sdk.start_transaction(op="test.task"):
activation = with_parameters.create_activation("one", 22, org_id=99)

headers = activation.headers
assert headers["sentry-trace"]
assert "baggage" in headers

0 comments on commit 3997d3f

Please sign in to comment.