diff --git a/logfire/_internal/integrations/httpx.py b/logfire/_internal/integrations/httpx.py index aa327ed2..7a0eabe3 100644 --- a/logfire/_internal/integrations/httpx.py +++ b/logfire/_internal/integrations/httpx.py @@ -1,16 +1,18 @@ from __future__ import annotations +import contextlib +import functools import inspect from contextlib import suppress from email.headerregistry import ContentTypeHeader from email.policy import EmailPolicy -from functools import lru_cache -from typing import TYPE_CHECKING, Any, Callable, Literal, Mapping, cast +from functools import cached_property, lru_cache +from typing import TYPE_CHECKING, Any, Awaitable, Callable, Literal, Mapping, cast import httpx +from opentelemetry.trace import NonRecordingSpan, Span, use_span from logfire._internal.stack_info import warn_at_user_stacklevel -from logfire.propagate import attach_context, get_context try: from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor @@ -30,15 +32,13 @@ " pip install 'logfire[httpx]'" ) -from logfire import Logfire +from logfire import Logfire, LogfireSpan from logfire._internal.main import set_user_attributes_on_raw_span from logfire._internal.utils import handle_internal_errors if TYPE_CHECKING: from typing import ParamSpec, TypedDict, TypeVar - from opentelemetry.trace import Span - class AsyncClientKwargs(TypedDict, total=False): request_hook: RequestHook | AsyncRequestHook response_hook: ResponseHook | AsyncResponseHook @@ -71,6 +71,7 @@ def instrument_httpx( capture_request_json_body: bool, capture_request_text_body: bool, capture_response_json_body: bool, + capture_response_text_body: bool, capture_request_form_data: bool, **kwargs: Any, ) -> None: @@ -116,7 +117,11 @@ def instrument_httpx( capture_request_form_data, ) final_kwargs['response_hook'] = make_response_hook( - response_hook, should_capture_response_headers, capture_response_json_body, logfire_instance + response_hook, + should_capture_response_headers, + capture_response_json_body, + capture_response_text_body, + logfire_instance, ) final_kwargs['async_request_hook'] = make_async_request_hook( async_request_hook, @@ -126,7 +131,11 @@ def instrument_httpx( capture_request_form_data, ) final_kwargs['async_response_hook'] = make_async_response_hook( - async_response_hook, should_capture_response_headers, capture_response_json_body, logfire_instance + async_response_hook, + should_capture_response_headers, + capture_response_json_body, + capture_response_text_body, + logfire_instance, ) instrumentor.instrument(**final_kwargs) @@ -143,7 +152,11 @@ def instrument_httpx( capture_request_form_data, ) response_hook = make_async_response_hook( - response_hook, should_capture_response_headers, capture_response_json_body, logfire_instance + response_hook, + should_capture_response_headers, + capture_response_json_body, + capture_response_text_body, + logfire_instance, ) else: request_hook = cast('RequestHook | None', final_kwargs.get('request_hook')) @@ -157,14 +170,38 @@ def instrument_httpx( capture_request_form_data, ) response_hook = make_response_hook( - response_hook, should_capture_response_headers, capture_response_json_body, logfire_instance + response_hook, + should_capture_response_headers, + capture_response_json_body, + capture_response_text_body, + logfire_instance, ) tracer_provider = final_kwargs['tracer_provider'] instrumentor.instrument_client(client, tracer_provider, request_hook, response_hook) # type: ignore[reportArgumentType] -class LogfireHttpxRequestInfo(RequestInfo): +class LogfireHttpxInfoMixin: + headers: httpx.Headers + + @property + def content_type_header_object(self) -> ContentTypeHeader: + return content_type_header_from_string(self.content_type_header_string) + + @property + def content_type_header_string(self) -> str: + return self.headers.get('content-type', '') + + @property + def content_type_is_json(self): + return is_json_type(self.content_type_header_string) + + @property + def content_type_is_text(self): + return is_text_type(self.content_type_header_string) + + +class LogfireHttpxRequestInfo(RequestInfo, LogfireHttpxInfoMixin): span: Span def capture_headers(self): @@ -197,22 +234,6 @@ def capture_text_as_json(self, attr_name: str = 'http.request.body.json', text: def body_is_streaming(self): return not isinstance(self.stream, httpx.ByteStream) - @property - def content_type_header_object(self) -> ContentTypeHeader: - return content_type_header_from_string(self.content_type_header_string) - - @property - def content_type_header_string(self) -> str: - return self.headers.get('content-type', '') - - @property - def content_type_is_json(self): - return is_json_type(self.content_type_header_string) - - @property - def content_type_is_text(self): - return is_text_type(self.content_type_header_string) - @property def content_type_is_form(self): content_type = self.content_type_header_string @@ -248,6 +269,102 @@ def set_complex_span_attributes(self, attributes: dict[str, Any]): set_user_attributes_on_raw_span(self.span, attributes) # type: ignore +class LogfireHttpxResponseInfo(ResponseInfo, LogfireHttpxInfoMixin): + span: Span + logfire_instance: Logfire + is_async: bool + + def capture_headers(self): + capture_headers(self.span, self.headers, 'response') + + def capture_body_if_json(self, attr_name: str = 'http.response.body.json'): + if self.content_type_is_json: + + def hook(span: LogfireSpan): + self.capture_text_as_json(span, attr_name=attr_name) + + self.on_response_read(hook) + + def capture_body_if_text(self, attr_name: str = 'http.response.body.text'): + if self.content_type_is_text: + + def hook(span: LogfireSpan): + span.set_attribute(attr_name, self.response.text) + + self.on_response_read(hook) + + @cached_property + def response(self) -> httpx.Response: + frame = inspect.currentframe().f_back.f_back # type: ignore + while frame: # pragma: no branch + response = frame.f_locals.get('response') + frame = frame.f_back + if isinstance(response, httpx.Response): + return response + raise RuntimeError('Could not find the response object') # pragma: no cover + + def on_response_read(self, hook: Callable[[LogfireSpan], None]): + if self.is_async: + + async def aread(original_aread: Callable[[], Awaitable[bytes]]) -> bytes: + with self.attach_original_span_context(), self.logfire_instance.span('Reading response body') as span: + content = await original_aread() + hook(span) + return content + + self.wrap_response_aread(aread) + else: + + def read(original_read: Callable[[], bytes]) -> bytes: + with self.attach_original_span_context(), self.logfire_instance.span('Reading response body') as span: + content = original_read() + hook(span) + return content + + self.wrap_response_read(read) + + def wrap_response_read(self, hook: Callable[[Callable[[], bytes]], bytes]): + response = self.response + original_read = response.read + + @functools.wraps(original_read) + def read() -> bytes: + try: + # Only log the body the first time it's read + return response.content + except httpx.ResponseNotRead: + return hook(original_read) + + response.read = read + + def wrap_response_aread(self, hook: Callable[[Callable[[], Awaitable[bytes]]], Awaitable[bytes]]): + response = self.response + original_aread = response.aread + + @functools.wraps(original_aread) + async def aread() -> bytes: + try: + # Only log the body the first time it's read + return response.content + except httpx.ResponseNotRead: + return await hook(original_aread) + + response.aread = aread + + @contextlib.contextmanager + def attach_original_span_context(self): + with use_span(NonRecordingSpan(self.span.get_span_context())): + yield + + def capture_text_as_json( + self, span: LogfireSpan, *, text: str | None = None, attr_name: str = 'http.response.body.json' + ): + span.set_attribute(attr_name, {}) # Set the JSON schema + # Set the attribute to the raw text so that the backend can parse it + text = text if text is not None else self.response.text + span._span.set_attribute(attr_name, text) # type: ignore + + def make_request_hook( hook: RequestHook | None, should_capture_headers: bool, @@ -260,10 +377,13 @@ def make_request_hook( def new_hook(span: Span, request: RequestInfo) -> None: with handle_internal_errors(): - request = LogfireHttpxRequestInfo(*request) - request.span = span - capture_request( - request, should_capture_headers, should_capture_json, should_capture_text, should_capture_form_data + request = capture_request( + span, + request, + should_capture_headers, + should_capture_json, + should_capture_text, + should_capture_form_data, ) run_hook(hook, span, request) @@ -282,45 +402,41 @@ def make_async_request_hook( async def new_hook(span: Span, request: RequestInfo) -> None: with handle_internal_errors(): - request = LogfireHttpxRequestInfo(*request) - request.span = span - capture_request( - request, should_capture_headers, should_capture_json, should_capture_text, should_capture_form_data + request = capture_request( + span, + request, + should_capture_headers, + should_capture_json, + should_capture_text, + should_capture_form_data, ) await run_async_hook(hook, span, request) return new_hook -def capture_request( - request: LogfireHttpxRequestInfo, +def make_response_hook( + hook: ResponseHook | None, should_capture_headers: bool, should_capture_json: bool, should_capture_text: bool, - should_capture_form_data: bool, -) -> None: - if should_capture_headers: - request.capture_headers() - if should_capture_json: - request.capture_body_if_json() - if should_capture_text and not (should_capture_json and request.content_type_is_json): - request.capture_body_if_text() - if should_capture_form_data: - request.capture_body_if_form() - - -def make_response_hook( - hook: ResponseHook | None, should_capture_headers: bool, should_capture_json: bool, logfire_instance: Logfire + logfire_instance: Logfire, ) -> ResponseHook | None: - if not should_capture_headers and not should_capture_json and not hook: + if not (should_capture_headers or should_capture_json or should_capture_text or hook): return None def new_hook(span: Span, request: RequestInfo, response: ResponseInfo) -> None: with handle_internal_errors(): - if should_capture_headers: - capture_response_headers(span, response) - if should_capture_json: - capture_response_json(logfire_instance, response, False) + request, response = capture_response( + span, + request, + response, + logfire_instance, + should_capture_headers, + should_capture_json, + should_capture_text, + is_async=False, + ) run_hook(hook, span, request, response) return new_hook @@ -330,72 +446,79 @@ def make_async_response_hook( hook: ResponseHook | AsyncResponseHook | None, should_capture_headers: bool, should_capture_json: bool, + should_capture_text: bool, logfire_instance: Logfire, ) -> AsyncResponseHook | None: - if not should_capture_headers and not should_capture_json and not hook: + if not (should_capture_headers or should_capture_json or should_capture_text or hook): return None async def new_hook(span: Span, request: RequestInfo, response: ResponseInfo) -> None: with handle_internal_errors(): - if should_capture_headers: - capture_response_headers(span, response) - if should_capture_json: - capture_response_json(logfire_instance, response, True) + request, response = capture_response( + span, + request, + response, + logfire_instance, + should_capture_headers, + should_capture_json, + should_capture_text, + is_async=True, + ) await run_async_hook(hook, span, request, response) return new_hook -def capture_response_json(logfire_instance: Logfire, response_info: ResponseInfo, is_async: bool) -> None: - if not is_json_type(response_info.headers.get('content-type', '')): - return +def capture_request( + span: Span, + request: RequestInfo, + should_capture_headers: bool, + should_capture_json: bool, + should_capture_text: bool, + should_capture_form_data: bool, +) -> LogfireHttpxRequestInfo: + request = LogfireHttpxRequestInfo(*request) + request.span = span - frame = inspect.currentframe().f_back.f_back # type: ignore - while frame: - response = frame.f_locals.get('response') - frame = frame.f_back - if isinstance(response, httpx.Response): # pragma: no branch - break - else: # pragma: no cover - return + if should_capture_headers: + request.capture_headers() + if should_capture_json: + request.capture_body_if_json() + if should_capture_text and not (should_capture_json and request.content_type_is_json): + request.capture_body_if_text() + if should_capture_form_data: + request.capture_body_if_form() - ctx = get_context() - attr_name = 'http.response.body.json' + return request - if is_async: # these two branches should be kept almost identical - original_aread = response.aread - async def aread(*args: Any, **kwargs: Any): - try: - # Only log the body the first time it's read - return response.content - except httpx.ResponseNotRead: - pass - with attach_context(ctx), logfire_instance.span('Reading response body') as span: - content = await original_aread(*args, **kwargs) - span.set_attribute(attr_name, {}) # Set the JSON schema - # Set the attribute to the raw text so that the backend can parse it - span._span.set_attribute(attr_name, response.text) # type: ignore - return content +def capture_response( + span: Span, + request: RequestInfo, + response: ResponseInfo, + logfire_instance: Logfire, + should_capture_headers: bool, + should_capture_json: bool, + should_capture_text: bool, + *, + is_async: bool, +) -> tuple[LogfireHttpxRequestInfo, LogfireHttpxResponseInfo]: + request = LogfireHttpxRequestInfo(*request) + request.span = span - response.aread = aread - else: - original_read = response.read + response = LogfireHttpxResponseInfo(*response) + response.span = span + response.logfire_instance = logfire_instance + response.is_async = is_async - def read(*args: Any, **kwargs: Any): - try: - # Only log the body the first time it's read - return response.content - except httpx.ResponseNotRead: - pass - with attach_context(ctx), logfire_instance.span('Reading response body') as span: - content = original_read(*args, **kwargs) - span.set_attribute(attr_name, {}) # Set the JSON schema - # Set the attribute to the raw text so that the backend can parse it - span._span.set_attribute(attr_name, response.text) # type: ignore - return content + if should_capture_headers: + response.capture_headers() + if should_capture_json: + response.capture_body_if_json() + if should_capture_text and not (should_capture_json and request.content_type_is_json): + response.capture_body_if_text() - response.read = read + return request, response async def run_async_hook(hook: Callable[P, Any] | None, *args: P.args, **kwargs: P.kwargs) -> None: @@ -410,10 +533,6 @@ def run_hook(hook: Callable[P, Any] | None, *args: P.args, **kwargs: P.kwargs) - hook(*args, **kwargs) -def capture_response_headers(span: Span, response: ResponseInfo) -> None: - capture_headers(span, response.headers, 'response') - - def capture_headers(span: Span, headers: httpx.Headers, request_or_response: Literal['request', 'response']) -> None: span.set_attributes( { diff --git a/logfire/_internal/main.py b/logfire/_internal/main.py index beba2070..b594f854 100644 --- a/logfire/_internal/main.py +++ b/logfire/_internal/main.py @@ -1183,6 +1183,7 @@ def instrument_httpx( capture_request_text_body: bool = False, capture_request_json_body: bool = False, capture_response_json_body: bool = False, + capture_response_text_body: bool = False, capture_request_form_data: bool = False, **kwargs: Unpack[ClientKwargs], ) -> None: ... @@ -1196,6 +1197,7 @@ def instrument_httpx( capture_request_json_body: bool = False, capture_request_text_body: bool = False, capture_response_json_body: bool = False, + capture_response_text_body: bool = False, capture_request_form_data: bool = False, **kwargs: Unpack[AsyncClientKwargs], ) -> None: ... @@ -1209,6 +1211,7 @@ def instrument_httpx( capture_request_json_body: bool = False, capture_request_text_body: bool = False, capture_response_json_body: bool = False, + capture_response_text_body: bool = False, capture_request_form_data: bool = False, **kwargs: Unpack[HTTPXInstrumentKwargs], ) -> None: ... @@ -1221,6 +1224,7 @@ def instrument_httpx( capture_request_json_body: bool = False, capture_request_text_body: bool = False, capture_response_json_body: bool = False, + capture_response_text_body: bool = False, capture_request_form_data: bool = False, **kwargs: Any, ) -> None: @@ -1251,6 +1255,9 @@ def instrument_httpx( which happens automatically for non-streaming requests. For streaming requests, the body is not captured if it's merely iterated over. Doesn't check if the body is actually JSON. + capture_response_text_body: Set to `True` to capture the response text body + if the content type is either `text/*` + or `application/` followed by a known human-readable text format, e.g. XML. capture_request_form_data: Set to `True` to capture the request form data. Specifically captures the `data` argument of `httpx` methods like `post` and `put`. Doesn't inspect or parse the raw request body. @@ -1266,6 +1273,7 @@ def instrument_httpx( capture_request_json_body=capture_request_json_body, capture_request_text_body=capture_request_text_body, capture_response_json_body=capture_response_json_body, + capture_response_text_body=capture_response_text_body, capture_request_form_data=capture_request_form_data, **kwargs, ) diff --git a/tests/otel_integrations/test_httpx.py b/tests/otel_integrations/test_httpx.py index c53761b6..0ecccdf4 100644 --- a/tests/otel_integrations/test_httpx.py +++ b/tests/otel_integrations/test_httpx.py @@ -357,6 +357,7 @@ def test_httpx_client_capture_full(exporter: TestExporter): capture_request_json_body=True, capture_request_text_body=True, capture_response_headers=True, + capture_response_text_body=True, capture_response_json_body=True, ) response = client.post('https://example.org/', json={'hello': 'world'}) @@ -409,7 +410,7 @@ def test_httpx_client_capture_full(exporter: TestExporter): { 'name': 'Reading response body', 'context': {'trace_id': 1, 'span_id': 5, 'is_remote': False}, - 'parent': {'trace_id': 1, 'span_id': 3, 'is_remote': True}, + 'parent': {'trace_id': 1, 'span_id': 3, 'is_remote': False}, 'start_time': 4000000000, 'end_time': 5000000000, 'attributes': { @@ -451,6 +452,7 @@ async def test_async_httpx_client_capture_full(exporter: TestExporter): capture_request_json_body=True, capture_request_text_body=True, capture_response_headers=True, + capture_response_text_body=True, capture_response_json_body=True, capture_request_form_data=True, ) @@ -504,7 +506,7 @@ async def test_async_httpx_client_capture_full(exporter: TestExporter): { 'name': 'Reading response body', 'context': {'trace_id': 1, 'span_id': 5, 'is_remote': False}, - 'parent': {'trace_id': 1, 'span_id': 3, 'is_remote': True}, + 'parent': {'trace_id': 1, 'span_id': 3, 'is_remote': False}, 'start_time': 4000000000, 'end_time': 5000000000, 'attributes': { @@ -566,7 +568,12 @@ def test_httpx_client_capture_request_form_data(exporter: TestExporter): assert [code.co_name for code in CODES_FOR_METHODS_WITH_DATA_PARAM] == ['request', 'stream', 'request', 'stream'] with httpx.Client(transport=create_transport()) as client: - logfire.instrument_httpx(client, capture_request_form_data=True, capture_request_text_body=True) + logfire.instrument_httpx( + client, + capture_request_form_data=True, + capture_request_text_body=True, + capture_response_text_body=True, + ) client.post('https://example.org/', data={'form': 'values'}) assert exporter.exported_spans_as_dict() == snapshot( @@ -635,6 +642,57 @@ def test_httpx_client_capture_request_text_body(exporter: TestExporter): ) +def test_httpx_client_capture_response_text_body(exporter: TestExporter): + with httpx.Client(transport=create_transport()) as client: + logfire.instrument_httpx(client, capture_response_text_body=True) + client.post('https://example.org/', headers={'Content-Type': 'text/plain'}, content='hello') + + assert exporter.exported_spans_as_dict() == snapshot( + [ + { + 'name': 'POST', + 'context': {'trace_id': 1, 'span_id': 1, 'is_remote': False}, + 'parent': None, + 'start_time': 1000000000, + 'end_time': 2000000000, + 'attributes': { + 'http.method': 'POST', + 'http.request.method': 'POST', + 'http.url': 'https://example.org/', + 'url.full': 'https://example.org/', + 'http.host': 'example.org', + 'server.address': 'example.org', + 'network.peer.address': 'example.org', + 'logfire.span_type': 'span', + 'logfire.msg': 'POST /', + 'http.status_code': 200, + 'http.response.status_code': 200, + 'http.flavor': '1.1', + 'network.protocol.version': '1.1', + 'http.target': '/', + }, + }, + { + 'name': 'Reading response body', + 'context': {'trace_id': 1, 'span_id': 3, 'is_remote': False}, + 'parent': {'trace_id': 1, 'span_id': 1, 'is_remote': False}, + 'start_time': 3000000000, + 'end_time': 4000000000, + 'attributes': { + 'code.filepath': 'test_httpx.py', + 'code.function': 'test_httpx_client_capture_response_text_body', + 'code.lineno': 123, + 'logfire.msg_template': 'Reading response body', + 'logfire.msg': 'Reading response body', + 'logfire.span_type': 'span', + 'http.response.body.text': '{"good": "response"}', + 'logfire.json_schema': '{"type":"object","properties":{"http.response.body.text":{}}}', + }, + }, + ] + ) + + def test_is_json_type(): assert is_json_type('application/json') assert is_json_type(' APPLICATION / JSON ')