Skip to content

feat(tracer): add support for generators #13377

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 80 additions & 3 deletions ddtrace/_trace/tracer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from contextlib import contextmanager
import functools
import inspect
from inspect import isasyncgenfunction
from inspect import iscoroutinefunction
from itertools import chain
import logging
Expand Down Expand Up @@ -780,6 +782,56 @@ def flush(self):
"""Flush the buffer of the trace writer. This does nothing if an unbuffered trace writer is used."""
self._span_aggregator.writer.flush_queue()

def _wrap_generator(
self,
f: AnyCallable,
span_name: str,
service: Optional[str] = None,
resource: Optional[str] = None,
span_type: Optional[str] = None,
) -> AnyCallable:
"""Wrap a generator function with tracing."""

@functools.wraps(f)
def func_wrapper(*args, **kwargs):
if getattr(self, "_wrap_executor", None):
return self._wrap_executor(
self,
f,
args,
kwargs,
span_name,
service=service,
resource=resource,
span_type=span_type,
)

with self.trace(span_name, service=service, resource=resource, span_type=span_type) as span:
gen = f(*args, **kwargs)
for value in gen:
yield value

return func_wrapper

def _wrap_generator_async(
self,
f: AnyCallable,
span_name: str,
service: Optional[str] = None,
resource: Optional[str] = None,
span_type: Optional[str] = None,
) -> AnyCallable:
"""Wrap a generator function with tracing."""

@functools.wraps(f)
async def func_wrapper(*args, **kwargs):
with self.trace(span_name, service=service, resource=resource, span_type=span_type) as span:
agen = f(*args, **kwargs)
async for value in agen:
yield value

return func_wrapper

def wrap(
self,
name: Optional[str] = None,
Expand Down Expand Up @@ -817,6 +869,15 @@ async def coroutine():
def coroutine():
return 'executed'

>>> # or use it on generators
@tracer.wrap()
def gen():
yield 'executed'

>>> @tracer.wrap()
async def gen():
yield 'executed'

You can access the current span using `tracer.current_span()` to set
tags:

Expand All @@ -830,10 +891,26 @@ def wrap_decorator(f: AnyCallable) -> AnyCallable:
# FIXME[matt] include the class name for methods.
span_name = name if name else "%s.%s" % (f.__module__, f.__name__)

# detect if the the given function is a coroutine to use the
# right decorator; this initial check ensures that the
# detect if the the given function is a coroutine and/or a generator
# to use the right decorator; this initial check ensures that the
# evaluation is done only once for each @tracer.wrap
if iscoroutinefunction(f):
if inspect.isgeneratorfunction(f):
func_wrapper = self._wrap_generator(
f,
span_name,
service=service,
resource=resource,
span_type=span_type,
)
elif inspect.isasyncgenfunction(f):
func_wrapper = self._wrap_generator_async(
f,
span_name,
service=service,
resource=resource,
span_type=span_type,
)
elif iscoroutinefunction(f):
# call the async factory that creates a tracing decorator capable
# to await the coroutine execution before finishing the span. This
# code is used for compatibility reasons to prevent Syntax errors
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
fixes:
- |
tracing: Fixes support for wrapping generator and async generator functions with `tracer.wrap()`. Previously, calling `tracer.current_span()` inside a wrapped generator function would return `None`, leading to `AttributeError` when interacting with the span. Additionally, traces reported to Datadog showed incorrect durations, as span context was not maintained across generator iteration. This change ensures that `tracer.wrap()` now correctly handles both sync and async generators by preserving the tracing context throughout their execution and finalizing spans correctly. Users can now safely use `tracer.current_span()` within generator functions and expect accurate trace reporting.
29 changes: 29 additions & 0 deletions tests/contrib/asyncio/test_tracer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Ensure that the tracer works with asynchronous executions within the same ``IOLoop``."""

import asyncio
import os
import re
Expand Down Expand Up @@ -223,3 +224,31 @@ async def my_function():
rb"created at .*/dd-trace-py/ddtrace/contrib/internal/asyncio/patch.py:.* took .* seconds"
match = re.match(pattern, err)
assert match, err


@pytest.mark.asyncio
async def test_wrapped_generator(tracer):
@tracer.wrap("decorated_generator", service="s", resource="r", span_type="t")
async def f(tag_name, tag_value):
# make sure we can still set tags
span = tracer.current_span()
span.set_tag(tag_name, tag_value)

for i in range(3):
yield i

result = [item async for item in f("a", "b")]
assert result == [0, 1, 2]

traces = tracer.pop_traces()

assert 1 == len(traces)
spans = traces[0]
assert 1 == len(spans)
span = spans[0]

assert span.name == "decorated_generator"
assert span.service == "s"
assert span.resource == "r"
assert span.span_type == "t"
assert span.get_tag("a") == "b"
29 changes: 29 additions & 0 deletions tests/tracer/test_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import logging
from os import getpid
import threading
import time
from unittest.case import SkipTest

import mock
Expand Down Expand Up @@ -284,6 +285,34 @@ def wrapped_function(param, kw_param=None):
(dict(name="wrap.overwrite", service="webserver", meta=dict(args="(42,)", kwargs="{'kw_param': 42}")),),
)

def test_tracer_wrap_generator(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered adding a test that validates the duration of these spans?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point -- updated in c6029f5.

@self.tracer.wrap("decorated_generator", service="s", resource="r", span_type="t")
def f(tag_name, tag_value):
# make sure we can still set tags
span = self.tracer.current_span()
span.set_tag(tag_name, tag_value)

for i in range(3):
time.sleep(0.01)
yield i

result = list(f("a", "b"))
assert result == [0, 1, 2]

self.assert_span_count(1)
span = self.get_root_span()
span.assert_matches(
name="decorated_generator",
service="s",
resource="r",
span_type="t",
meta=dict(a="b"),
)

# tracer should finish _after_ the generator has been exhausted
assert span.duration is not None
assert span.duration > 0.03

def test_tracer_disabled(self):
self.tracer.enabled = True
with self.trace("foo") as s:
Expand Down