diff --git a/benchmarks/ddtrace_run/scenario.py b/benchmarks/ddtrace_run/scenario.py index 983d2c7e870..d94d5d83d0e 100644 --- a/benchmarks/ddtrace_run/scenario.py +++ b/benchmarks/ddtrace_run/scenario.py @@ -20,6 +20,11 @@ def run(self): env["DD_RUNTIME_METRICS_ENABLED"] = str(self.runtimemetrics) env["DD_APPSEC_ENABLED"] = str(self.appsec) + if self.telemetry: + # Force app started event, this is needed for telemetry to be enabled + # immediately after the process starts to avoid the 10 second delay. + env["_DD_INSTRUMENTATION_TELEMETRY_TESTS_FORCE_APP_STARTED"] = "true" + # initialize subprocess args subp_cmd = [] code = "import ddtrace; ddtrace._monkey._patch_all()\n" @@ -29,7 +34,7 @@ def run(self): if self.http: # mock requests to the trace agent before starting services - env["DD_TRACE_API_VERSION"] = "v0.4" + env["DD_TRACE_API_VERSION"] = "v0.5" code += """ import httpretty from ddtrace.trace import tracer @@ -41,10 +46,6 @@ def run(self): # profiler will collect snapshot during shutdown httpretty.register_uri(httpretty.POST, '%s/%s' % (tracer.agent_trace_url, 'profiling/v1/input')) """ - - if self.telemetry: - code += "telemetry_writer.enable()\n" - if self.tracing: code += "span = tracer.trace('test-x', service='bench-test'); span.finish()\n" diff --git a/benchmarks/startup/config.yaml b/benchmarks/startup/config.yaml index c7603fc6c29..d397a994b9a 100644 --- a/benchmarks/startup/config.yaml +++ b/benchmarks/startup/config.yaml @@ -32,3 +32,9 @@ import_ddtrace_auto_django: <<: *defaults import_ddtrace_auto: true import_django: true +import_ddtrace_auto_start_telemetry: + <<: *defaults + # By default telemetry collection is enabled after 10 seconds we need to force it to start immediately + # to detect the overhead. + env: {"_DD_INSTRUMENTATION_TELEMETRY_TESTS_FORCE_APP_STARTED": "true"} + import_ddtrace_auto: true diff --git a/ddtrace/internal/telemetry/writer.py b/ddtrace/internal/telemetry/writer.py index 0069b76073d..ea870ca59eb 100644 --- a/ddtrace/internal/telemetry/writer.py +++ b/ddtrace/internal/telemetry/writer.py @@ -29,10 +29,7 @@ from ..utils.version import version as tracer_version from . import modules from .constants import TELEMETRY_APM_PRODUCT -from .constants import TELEMETRY_LOG_LEVEL # noqa:F401 from .constants import TELEMETRY_NAMESPACE -from .constants import TELEMETRY_TYPE_DISTRIBUTION -from .constants import TELEMETRY_TYPE_GENERATE_METRICS from .constants import TELEMETRY_TYPE_LOGS from .data import get_application from .data import get_host_info @@ -202,8 +199,8 @@ def __init__(self, is_periodic=True, agentless=None): # This will occur when the agent writer starts. self.enable() # Force app started for unit tests - if config.FORCE_START: - self._app_started() + if config.FORCE_START and (app_started := self._app_started()): + self._events_queue.append(app_started) if config.LOG_COLLECTION_ENABLED: get_logger("ddtrace").addHandler(DDTelemetryLogHandler(self)) @@ -259,18 +256,17 @@ def add_event(self, payload, payload_type): Payload types accepted by telemetry/proxy v2: app-started, app-closing, app-integrations-change """ if self.enable(): - event = { - "tracer_time": int(time.time()), - "runtime_id": get_runtime_id(), - "api_version": "v2", - "seq_id": next(self._sequence), - "debug": self._debug, - "application": get_application(config.SERVICE, config.VERSION, config.ENV), - "host": get_host_info(), - "payload": payload, - "request_type": payload_type, - } - self._events_queue.append(event) + with self._service_lock: + self._events_queue.append({"payload": payload, "request_type": payload_type}) + + def add_events(self, events): + # type: (List[Dict[str, Any]]) -> None + """ + Adds a list of Telemetry events to the TelemetryWriter event buffer + """ + if self.enable(): + with self._service_lock: + self._events_queue.extend(events) def add_integration(self, integration_name, patched, auto_patched=None, error_msg=None, version=""): # type: (str, bool, Optional[bool], Optional[str], Optional[str]) -> None @@ -280,6 +276,9 @@ def add_integration(self, integration_name, patched, auto_patched=None, error_ms :param str integration_name: name of patched module :param bool auto_enabled: True if module is enabled in _monkey.PATCH_MODULES """ + if not self.enable(): + return + # Integrations can be patched before the telemetry writer is enabled. with self._service_lock: if integration_name not in self._integrations_queue: @@ -305,11 +304,11 @@ def add_error(self, code, msg, filename, line_number): self._error = (code, msg) def _app_started(self, register_app_shutdown=True): - # type: (bool) -> None + # type: (bool) -> Optional[Dict[str, Any]] """Sent when TelemetryWriter is enabled or forks""" if self._forked or self.started: # app-started events should only be sent by the main process - return + return None # List of configurations to be collected self.started = True @@ -340,10 +339,10 @@ def _app_started(self, register_app_shutdown=True): # Reset the error after it has been reported. self._error = (0, "") - self.add_event(payload, "app-started") + return {"payload": payload, "request_type": "app-started"} def _app_heartbeat_event(self): - # type: () -> None + # type: () -> Dict[str, Any] if config.DEPENDENCY_COLLECTION and time.monotonic() - self._extended_time > self._extended_heartbeat_interval: self._extended_time += self._extended_heartbeat_interval self._app_dependencies_loaded_event() @@ -352,26 +351,29 @@ def _app_heartbeat_event(self): {"name": name, "version": version} for name, version in self._imported_dependencies.items() ] } - self.add_event(payload, "app-extended-heartbeat") + request_type = "app-extended-heartbeat" else: - self.add_event({}, "app-heartbeat") + payload = {} + request_type = "app-heartbeat" + return {"payload": payload, "request_type": request_type} def _app_closing_event(self): - # type: () -> None + # type: () -> Optional[Dict[str, Any]] """Adds a Telemetry event which notifies the agent that an application instance has terminated""" if self._forked: # app-closing event should only be sent by the main process - return - payload = {} # type: Dict - self.add_event(payload, "app-closing") + return None + return {"payload": {}, "request_type": "app-closing"} def _app_integrations_changed_event(self, integrations): - # type: (List[Dict]) -> None + # type: (List[Dict]) -> Dict """Adds a Telemetry event which sends a list of configured integrations to the agent""" - payload = { - "integrations": integrations, + return { + "payload": { + "integrations": integrations, + }, + "request_type": "app-integrations-change", } - self.add_event(payload, "app-integrations-change") def _flush_integrations_queue(self): # type: () -> List[Dict] @@ -390,46 +392,48 @@ def _flush_configuration_queue(self): return configurations def _app_client_configuration_changed_event(self, configurations): - # type: (List[Dict]) -> None + # type: (List[Dict]) -> Dict[str, Any] """Adds a Telemetry event which sends list of modified configurations to the agent""" - payload = { - "configuration": configurations, + return { + "payload": { + "configuration": configurations, + }, + "request_type": "app-client-configuration-change", } - self.add_event(payload, "app-client-configuration-change") def _app_dependencies_loaded_event(self): + # type: () -> Optional[Dict[str, Any]] """Adds events to report imports done since the last periodic run""" - if not config.DEPENDENCY_COLLECTION or not self._enabled: - return + return None with self._service_lock: newly_imported_deps = modules.get_newly_imported_modules(self._modules_already_imported) if not newly_imported_deps: - return + return None with self._service_lock: - packages = update_imported_dependencies(self._imported_dependencies, newly_imported_deps) - - if packages: - payload = {"dependencies": packages} - self.add_event(payload, "app-dependencies-loaded") + if packages := update_imported_dependencies(self._imported_dependencies, newly_imported_deps): + return {"payload": {"dependencies": packages}, "request_type": "app-dependencies-loaded"} + return None def _app_product_change(self): - # type: () -> None + # type: () -> Optional[Dict[str, Any]] """Adds a Telemetry event which reports the enablement of an APM product""" if not self._send_product_change_updates: - return + return None - payload = { - "products": { - product: {"version": tracer_version, "enabled": status} - for product, status in self._product_enablement.items() - } - } - self.add_event(payload, "app-product-change") self._send_product_change_updates = False + return { + "payload": { + "products": { + product: {"version": tracer_version, "enabled": status} + for product, status in self._product_enablement.items() + } + }, + "request_type": "app-product-change", + } def product_activated(self, product, enabled): # type: (str, bool) -> None @@ -482,7 +486,7 @@ def add_configurations(self, configuration_list): def add_log(self, level, message, stack_trace="", tags=None): """ - Queues log. This event is meant to send library logs to Datadog’s backend through the Telemetry intake. + Queues log. This event is meant to send library logs to Datadog's backend through the Telemetry intake. This will make support cycles easier and ensure we know about potentially silent issues in libraries. """ if tags is None: @@ -564,65 +568,112 @@ def _flush_log_metrics(self): self._logs = set() return log_metrics - def _generate_metrics_event(self, namespace_metrics) -> None: + def _generate_metrics_event(self, namespace_metrics): + # type: (Dict[str, Dict[str, List[Dict[str, Any]]]]) -> Optional[Dict[str, Any]] for payload_type, namespaces in namespace_metrics.items(): for namespace, metrics in namespaces.items(): if metrics: - payload = { - "namespace": namespace, - "series": metrics, - } log.debug("%s request payload, namespace %s", payload_type, namespace) - if payload_type == TELEMETRY_TYPE_DISTRIBUTION: - self.add_event(payload, TELEMETRY_TYPE_DISTRIBUTION) - elif payload_type == TELEMETRY_TYPE_GENERATE_METRICS: - self.add_event(payload, TELEMETRY_TYPE_GENERATE_METRICS) + return { + "payload": { + "namespace": namespace, + "series": metrics, + }, + "request_type": payload_type, + } + return None def _generate_logs_event(self, logs): - # type: (Set[Dict[str, str]]) -> None + # type: (Set[Dict[str, str]]) -> Dict[str, Any] log.debug("%s request payload", TELEMETRY_TYPE_LOGS) - self.add_event({"logs": list(logs)}, TELEMETRY_TYPE_LOGS) + return {"payload": {"logs": list(logs)}, "request_type": TELEMETRY_TYPE_LOGS} def periodic(self, force_flush=False, shutting_down=False): - # ensure app_started is called at least once in case traces weren't flushed - self._app_started() - self._app_product_change() - - namespace_metrics = self._namespace.flush(float(self.interval)) - if namespace_metrics: - self._generate_metrics_event(namespace_metrics) + """Process and send telemetry events in batches. + + This method handles the periodic collection and sending of telemetry data with two main timing intervals: + 1. Metrics collection interval (10 seconds by default): Collects metrics and logs + 2. Heartbeat interval (60 seconds by default): Sends all collected data to the telemetry endpoint + + The method follows this flow: + 1. Collects metrics and logs that have accumulated since last collection + 2. If not at heartbeat interval and not force_flush: + - Queues the metrics and logs for future sending + - Returns early + 3. At heartbeat interval or force_flush: + - Collects app status (started, product changes) + - Collects integration changes + - Collects configuration changes + - Collects dependency changes + - Collects stored events (ex: metrics and logs) + - Sends everything as a single batch + + Args: + force_flush: If True, bypasses the heartbeat interval check and sends immediately + shutting_down: If True, includes app-closing event in the batch + + Note: + - Metrics are collected every 10 seconds to ensure accurate time-based data + - All data is sent in a single batch every 60 seconds to minimize network overhead + - A heartbeat event is always included to keep RC connections alive + """ + # Collect metrics and logs that have accumulated since last batch + events = [] + if namespace_metrics := self._namespace.flush(float(self.interval)): + if metrics_event := self._generate_metrics_event(namespace_metrics): + events.append(metrics_event) - logs_metrics = self._flush_log_metrics() - if logs_metrics: - self._generate_logs_event(logs_metrics) + if logs_metrics := self._flush_log_metrics(): + events.append(self._generate_logs_event(logs_metrics)) - # Telemetry metrics and logs should be aggregated into payloads every time periodic is called. - # This ensures metrics and logs are submitted in 10 second time buckets. + # Queue metrics if not at heartbeat interval if self._is_periodic and force_flush is False: if self._periodic_count < self._periodic_threshold: self._periodic_count += 1 + if events: + self.add_events(events) return self._periodic_count = 0 - integrations = self._flush_integrations_queue() - if integrations: - self._app_integrations_changed_event(integrations) + # At heartbeat interval, collect and send all telemetry data + if app_started := self._app_started(): + events.append(app_started) - configurations = self._flush_configuration_queue() - if configurations: - self._app_client_configuration_changed_event(configurations) + if app_product_change := self._app_product_change(): + events.append(app_product_change) - self._app_dependencies_loaded_event() + if integrations := self._flush_integrations_queue(): + events.append(self._app_integrations_changed_event(integrations)) - if shutting_down: - self._app_closing_event() + if configurations := self._flush_configuration_queue(): + events.append(self._app_client_configuration_changed_event(configurations)) - # Send a heartbeat event to the agent, this is required to keep RC connections alive - self._app_heartbeat_event() + if app_dependencies_loaded := self._app_dependencies_loaded_event(): + events.append(app_dependencies_loaded) - telemetry_events = self._flush_events_queue() - for telemetry_event in telemetry_events: - self._client.send_event(telemetry_event) + if shutting_down and (app_closing := self._app_closing_event()): + events.append(app_closing) + + # Always include a heartbeat to keep RC connections alive + events.append(self._app_heartbeat_event()) + + # Get any queued events and combine with current batch + if queued_events := self._flush_events_queue(): + events.extend(queued_events) + + # Prepare and send the final batch + batch_event = { + "tracer_time": int(time.time()), + "runtime_id": get_runtime_id(), + "api_version": "v2", + "seq_id": next(self._sequence), + "debug": self._debug, + "application": get_application(config.SERVICE, config.VERSION, config.ENV), + "host": get_host_info(), + "payload": events, + "request_type": "message-batch", + } + self._client.send_event(batch_event) def app_shutdown(self): if self.started: @@ -704,8 +755,8 @@ def _telemetry_excepthook(self, tp, value, root_traceback): error_msg = "{}:{} {}".format(filename, lineno, str(value)) self.add_integration(integration_name, True, error_msg=error_msg) - if self._enabled and not self.started: - self._app_started(False) + if app_started := self._app_started(False): + self._events_queue.append(app_started) self.app_shutdown() diff --git a/tests/conftest.py b/tests/conftest.py index 4d1bd26d633..309a8340980 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,7 @@ import ast import base64 import contextlib +import copy import functools import http.client as httplib import importlib @@ -33,7 +34,6 @@ from ddtrace.internal.core import crashtracking from ddtrace.internal.remoteconfig.client import RemoteConfigClient from ddtrace.internal.remoteconfig.worker import remoteconfig_poller -from ddtrace.internal.runtime import get_runtime_id from ddtrace.internal.service import ServiceStatus from ddtrace.internal.service import ServiceStatusError from ddtrace.internal.telemetry import TelemetryWriter @@ -600,7 +600,7 @@ def clear(self): pytest.fail("Failed to clear session: %s" % self.token) return True - def get_requests(self, request_type=None, filter_heartbeats=True): + def get_requests(self, filter_heartbeats=True): """Get a list of the requests sent to the test agent Results are in reverse order by ``seq_id`` @@ -615,11 +615,10 @@ def get_requests(self, request_type=None, filter_heartbeats=True): # /test/session/requests captures non telemetry payloads, ignore these requests continue req["body"] = json.loads(base64.b64decode(req["body"])) - # filter heartbeat requests to reduce noise + if req["body"]["request_type"] == "app-heartbeat" and filter_heartbeats: continue - if request_type is None or req["body"]["request_type"] == request_type: - requests.append(req) + requests.append(req) return sorted(requests, key=lambda r: r["body"]["seq_id"], reverse=True) @@ -628,12 +627,32 @@ def get_events(self, event_type=None, filter_heartbeats=True, subprocess=False): Results are in reverse order by ``seq_id`` """ - requests = self.get_requests(event_type, filter_heartbeats) - if subprocess: - # Use get_runtime_id to filter telemetry events generated in the current process - runtime_id = get_runtime_id() - requests = [req for req in requests if req["body"]["runtime_id"] != runtime_id] - return [req["body"] for req in requests] + requests = self.get_requests() + events = [] + for req in requests: + # if subprocess and req["body"]["runtime_id"] != runtime_id: + # continue + for req_body in self._get_request_bodies(req): + if filter_heartbeats and req_body["request_type"] == "app-heartbeat": + # filter heartbeat events to reduce noise + continue + if event_type is None or req_body["request_type"] == event_type: + events.append(req_body) + return events + + def _get_request_bodies(self, req): + if req["body"]["request_type"] == "message-batch": + payloads = req["body"]["payload"] + else: + payloads = [{"payload": req["body"]["payload"], "request_type": req["body"]["request_type"]}] + + requests = [] + for payload in payloads: + req_body = copy.deepcopy(req["body"]) + req_body["request_type"] = payload["request_type"] + req_body["payload"] = payload["payload"] + requests.append(req_body) + return requests def get_metrics(self, name=None): metrics = [] diff --git a/tests/telemetry/test_telemetry.py b/tests/telemetry/test_telemetry.py index 09771f56e5f..a14a5def8b0 100644 --- a/tests/telemetry/test_telemetry.py +++ b/tests/telemetry/test_telemetry.py @@ -161,7 +161,7 @@ def process_trace(self, trace): # force app_started event (instead of waiting for 10 seconds) from ddtrace.internal.telemetry import telemetry_writer -telemetry_writer._app_started() +telemetry_writer.periodic(force_flush=True) """ _, stderr, status, _ = run_python_code_in_subprocess(code) assert status == 0, stderr diff --git a/tests/telemetry/test_writer.py b/tests/telemetry/test_writer.py index 82c86987a1e..7942f00a17c 100644 --- a/tests/telemetry/test_writer.py +++ b/tests/telemetry/test_writer.py @@ -4,6 +4,7 @@ import time from typing import Any # noqa:F401 from typing import Dict # noqa:F401 +from typing import Optional # noqa:F401 import httpretty import mock @@ -16,7 +17,6 @@ from ddtrace.internal.telemetry.data import get_host_info from ddtrace.internal.telemetry.writer import get_runtime_id from ddtrace.internal.utils.version import _pep440_to_semver -from ddtrace.settings._config import DD_TRACE_OBFUSCATION_QUERY_STRING_REGEXP_DEFAULT from tests.conftest import DEFAULT_DDTRACE_SUBPROCESS_TEST_SERVICE_NAME from tests.utils import call_program from tests.utils import override_global_config @@ -31,15 +31,18 @@ def test_add_event(telemetry_writer, test_agent_session, mock_time): # send request to the agent telemetry_writer.periodic(force_flush=True) - requests = test_agent_session.get_requests(payload_type) + requests = test_agent_session.get_requests() assert len(requests) == 1 assert requests[0]["headers"]["Content-Type"] == "application/json" assert requests[0]["headers"]["DD-Client-Library-Language"] == "python" assert requests[0]["headers"]["DD-Client-Library-Version"] == _pep440_to_semver() - assert requests[0]["headers"]["DD-Telemetry-Request-Type"] == payload_type + assert requests[0]["headers"]["DD-Telemetry-Request-Type"] == "message-batch" assert requests[0]["headers"]["DD-Telemetry-API-Version"] == "v2" assert requests[0]["headers"]["DD-Telemetry-Debug-Enabled"] == "False" - assert requests[0]["body"] == _get_request_body(payload, payload_type) + + events = test_agent_session.get_events(payload_type) + assert len(events) == 1 + validate_request_body(events[0], payload, payload_type) def test_add_event_disabled_writer(telemetry_writer, test_agent_session): @@ -51,7 +54,7 @@ def test_add_event_disabled_writer(telemetry_writer, test_agent_session): # ensure no request were sent telemetry_writer.periodic(force_flush=True) - assert len(test_agent_session.get_requests(payload_type)) == 1 + assert len(test_agent_session.get_events(payload_type)) == 1 @pytest.mark.parametrize( @@ -85,129 +88,22 @@ def test_app_started_event(telemetry_writer, test_agent_session, mock_time): """asserts that app_started() queues a valid telemetry request which is then sent by periodic()""" with override_global_config(dict(_telemetry_dependency_collection=False)): # queue an app started event - telemetry_writer._app_started() + event = telemetry_writer._app_started() + assert event is not None, "app_started() did not return an event" + telemetry_writer.add_event(event["payload"], "app-started") # force a flush telemetry_writer.periodic(force_flush=True) - requests = test_agent_session.get_requests("app-started") + requests = test_agent_session.get_requests() assert len(requests) == 1 - assert requests[0]["headers"]["DD-Telemetry-Request-Type"] == "app-started" - - payload = { - "configuration": sorted( - [ - {"name": "DD_AGENT_HOST", "origin": "unknown", "value": None}, - {"name": "DD_AGENT_PORT", "origin": "unknown", "value": None}, - {"name": "DD_DOGSTATSD_PORT", "origin": "unknown", "value": None}, - {"name": "DD_DOGSTATSD_URL", "origin": "unknown", "value": None}, - {"name": "DD_DYNAMIC_INSTRUMENTATION_ENABLED", "origin": "unknown", "value": False}, - {"name": "DD_EXCEPTION_REPLAY_ENABLED", "origin": "unknown", "value": False}, - {"name": "DD_INSTRUMENTATION_TELEMETRY_ENABLED", "origin": "unknown", "value": True}, - {"name": "DD_PROFILING_STACK_ENABLED", "origin": "unknown", "value": True}, - {"name": "DD_PROFILING_MEMORY_ENABLED", "origin": "unknown", "value": True}, - {"name": "DD_PROFILING_HEAP_ENABLED", "origin": "unknown", "value": True}, - {"name": "DD_PROFILING_LOCK_ENABLED", "origin": "unknown", "value": True}, - {"name": "DD_PROFILING_EXPORT_LIBDD_ENABLED", "origin": "unknown", "value": False}, - {"name": "DD_PROFILING_CAPTURE_PCT", "origin": "unknown", "value": 1.0}, - {"name": "DD_PROFILING_UPLOAD_INTERVAL", "origin": "unknown", "value": 60.0}, - {"name": "DD_PROFILING_MAX_FRAMES", "origin": "unknown", "value": 64}, - {"name": "DD_REMOTE_CONFIGURATION_ENABLED", "origin": "unknown", "value": False}, - {"name": "DD_REMOTE_CONFIG_POLL_INTERVAL_SECONDS", "origin": "unknown", "value": 5.0}, - {"name": "DD_RUNTIME_METRICS_ENABLED", "origin": "unknown", "value": True}, - {"name": "DD_SERVICE_MAPPING", "origin": "unknown", "value": ""}, - {"name": "DD_SPAN_SAMPLING_RULES", "origin": "unknown", "value": None}, - {"name": "DD_SPAN_SAMPLING_RULES_FILE", "origin": "unknown", "value": None}, - {"name": "DD_TRACE_128_BIT_TRACEID_GENERATION_ENABLED", "origin": "unknown", "value": True}, - {"name": "DD_TRACE_AGENT_HOSTNAME", "origin": "default", "value": None}, - {"name": "DD_TRACE_AGENT_PORT", "origin": "default", "value": None}, - {"name": "DD_TRACE_AGENT_TIMEOUT_SECONDS", "origin": "unknown", "value": 2.0}, - {"name": "DD_TRACE_API_VERSION", "origin": "unknown", "value": None}, - {"name": "DD_TRACE_CLIENT_IP_ENABLED", "origin": "unknown", "value": None}, - {"name": "DD_TRACE_COMPUTE_STATS", "origin": "unknown", "value": False}, - {"name": "DD_TRACE_DEBUG", "origin": "unknown", "value": False}, - {"name": "DD_TRACE_HEALTH_METRICS_ENABLED", "origin": "unknown", "value": False}, - { - "name": "DD_TRACE_OBFUSCATION_QUERY_STRING_REGEXP", - "origin": "unknown", - "value": DD_TRACE_OBFUSCATION_QUERY_STRING_REGEXP_DEFAULT, - }, - {"name": "DD_TRACE_OTEL_ENABLED", "origin": "unknown", "value": False}, - {"name": "DD_TRACE_PARTIAL_FLUSH_ENABLED", "origin": "unknown", "value": True}, - {"name": "DD_TRACE_PARTIAL_FLUSH_MIN_SPANS", "origin": "unknown", "value": 300}, - { - "name": "DD_TRACE_PEER_SERVICE_DEFAULTS_ENABLED", - "origin": "default", - "value": False, - }, - { - "name": "DD_TRACE_PEER_SERVICE_MAPPING", - "origin": "env_var", - "value": "default_service:remapped_service", - }, - {"name": "DD_TRACE_PEER_SERVICE_DEFAULTS_ENABLED", "origin": "unknown", "value": False}, - {"name": "DD_TRACE_PEER_SERVICE_MAPPING", "origin": "unknown", "value": ""}, - { - "name": "DD_TRACE_PROPAGATION_STYLE_EXTRACT", - "origin": "unknown", - "value": "datadog,tracecontext", - }, - {"name": "DD_TRACE_PROPAGATION_STYLE_INJECT", "origin": "unknown", "value": "datadog,tracecontext"}, - {"name": "DD_TRACE_RATE_LIMIT", "origin": "unknown", "value": 100}, - {"name": "DD_TRACE_REMOVE_INTEGRATION_SERVICE_NAMES_ENABLED", "origin": "unknown", "value": False}, - {"name": "DD_TRACE_SPAN_ATTRIBUTE_SCHEMA", "origin": "unknown", "value": "v0"}, - {"name": "DD_TRACE_STARTUP_LOGS", "origin": "unknown", "value": False}, - {"name": "DD_TRACE_WRITER_BUFFER_SIZE_BYTES", "origin": "unknown", "value": 20 << 20}, - {"name": "DD_TRACE_WRITER_INTERVAL_SECONDS", "origin": "unknown", "value": 1.0}, - {"name": "DD_TRACE_WRITER_MAX_PAYLOAD_SIZE_BYTES", "origin": "unknown", "value": 20 << 20}, - {"name": "DD_TRACE_WRITER_REUSE_CONNECTIONS", "origin": "unknown", "value": False}, - {"name": "instrumentation_source", "origin": "code", "value": "manual"}, - {"name": "profiling_enabled", "origin": "default", "value": "false"}, - {"name": "data_streams_enabled", "origin": "default", "value": "false"}, - {"name": "appsec_enabled", "origin": "default", "value": "false"}, - {"name": "crashtracking_create_alt_stack", "origin": "unknown", "value": True}, - {"name": "crashtracking_use_alt_stack", "origin": "unknown", "value": True}, - {"name": "crashtracking_available", "origin": "unknown", "value": sys.platform == "linux"}, - {"name": "crashtracking_debug_url", "origin": "unknown", "value": None}, - {"name": "crashtracking_enabled", "origin": "unknown", "value": sys.platform == "linux"}, - {"name": "crashtracking_stacktrace_resolver", "origin": "unknown", "value": "full"}, - {"name": "crashtracking_started", "origin": "unknown", "value": False}, - {"name": "crashtracking_stderr_filename", "origin": "unknown", "value": None}, - {"name": "crashtracking_stdout_filename", "origin": "unknown", "value": None}, - { - "name": "python_build_gnu_type", - "origin": "unknown", - "value": sysconfig.get_config_var("BUILD_GNU_TYPE"), - }, - { - "name": "python_host_gnu_type", - "origin": "unknown", - "value": sysconfig.get_config_var("HOST_GNU_TYPE"), - }, - {"name": "python_soabi", "origin": "unknown", "value": sysconfig.get_config_var("SOABI")}, - {"name": "trace_sample_rate", "origin": "default", "value": "1.0"}, - {"name": "trace_sampling_rules", "origin": "default", "value": ""}, - {"name": "trace_header_tags", "origin": "default", "value": ""}, - {"name": "logs_injection_enabled", "origin": "default", "value": "false"}, - {"name": "trace_tags", "origin": "default", "value": ""}, - {"name": "trace_enabled", "origin": "default", "value": "true"}, - {"name": "instrumentation_config_id", "origin": "default", "value": ""}, - {"name": "DD_INJECT_FORCE", "origin": "unknown", "value": True}, - {"name": "DD_LIB_INJECTED", "origin": "unknown", "value": False}, - {"name": "DD_LIB_INJECTION_ATTEMPTED", "origin": "unknown", "value": False}, - ], - key=lambda x: x["name"], - ), - "error": { - "code": 0, - "message": "", - }, - } - requests[0]["body"]["payload"]["configuration"].sort(key=lambda c: c["name"]) - result = _get_request_body(payload, "app-started") - result["payload"]["configuration"] = [ - a for a in result["payload"]["configuration"] if a["name"] != "DD_TRACE_AGENT_URL" - ] - assert payload == result["payload"] + assert requests[0]["headers"]["DD-Telemetry-Request-Type"] == "message-batch" + app_started_events = test_agent_session.get_events("app-started") + assert len(app_started_events) == 1 + validate_request_body(app_started_events[0], None, "app-started") + assert len(app_started_events[0]["payload"]) == 3 + assert app_started_events[0]["payload"].get("configuration") + assert app_started_events[0]["payload"].get("products") + assert app_started_events[0]["payload"].get("error") == {"code": 0, "message": ""} def test_app_started_event_configuration_override(test_agent_session, run_python_code_in_subprocess, tmpdir): @@ -616,11 +512,12 @@ def test_app_closing_event(telemetry_writer, test_agent_session, mock_time): # send app closed event telemetry_writer.app_shutdown() - requests = test_agent_session.get_requests("app-closing") - assert len(requests) == 1 + num_requests = len(test_agent_session.get_requests()) + assert num_requests == 1 # ensure a valid request body was sent - totel_events = len(test_agent_session.get_events()) - assert requests[0]["body"] == _get_request_body({}, "app-closing", totel_events) + events = test_agent_session.get_events("app-closing") + assert len(events) == 1 + validate_request_body(events[0], {}, "app-closing", num_requests) def test_add_integration(telemetry_writer, test_agent_session, mock_time): @@ -632,12 +529,11 @@ def test_add_integration(telemetry_writer, test_agent_session, mock_time): # send integrations to the agent telemetry_writer.periodic(force_flush=True) - requests = test_agent_session.get_requests("app-integrations-change") + events = test_agent_session.get_events("app-integrations-change") # assert integration change telemetry request was sent - assert len(requests) == 1 - + assert len(events) == 1 # assert that the request had a valid request body - requests[0]["body"]["payload"]["integrations"].sort(key=lambda x: x["name"]) + events[0]["payload"]["integrations"].sort(key=lambda x: x["name"]) expected_payload = { "integrations": [ { @@ -658,7 +554,7 @@ def test_add_integration(telemetry_writer, test_agent_session, mock_time): }, ] } - assert requests[0]["body"] == _get_request_body(expected_payload, "app-integrations-change", seq_id=2) + validate_request_body(events[0], expected_payload, "app-integrations-change") def test_app_client_configuration_changed_event(telemetry_writer, test_agent_session, mock_time): @@ -696,7 +592,7 @@ def test_add_integration_disabled_writer(telemetry_writer, test_agent_session): telemetry_writer.add_integration("integration-name", True, False, "") telemetry_writer.periodic(force_flush=True) - assert len(test_agent_session.get_requests("app-integrations-change")) == 0 + assert len(test_agent_session.get_events("app-integrations-change")) == 0 @pytest.mark.parametrize("mock_status", [300, 400, 401, 403, 500]) @@ -794,20 +690,21 @@ def test_app_product_change_event(mock_time, telemetry_writer, test_agent_sessio } -def _get_request_body(payload, payload_type, seq_id=1): - # type: (Dict, str, int) -> Dict +def validate_request_body(received_body, payload, payload_type, seq_id=None): + # type: (Dict, Dict, str, Optional[int]) -> Dict """used to test the body of requests received by the testagent""" - return { - "tracer_time": time.time(), - "runtime_id": get_runtime_id(), - "api_version": "v2", - "debug": False, - "seq_id": seq_id, - "application": get_application(config.service, config.version, config.env), - "host": get_host_info(), - "payload": payload, - "request_type": payload_type, - } + assert len(received_body) == 9 + assert received_body["tracer_time"] == time.time() + assert received_body["runtime_id"] == get_runtime_id() + assert received_body["api_version"] == "v2" + assert received_body["debug"] is False + if seq_id is not None: + assert received_body["seq_id"] == seq_id + assert received_body["application"] == get_application(config.service, config.version, config.env) + assert received_body["host"] == get_host_info() + if payload is not None: + assert received_body["payload"] == payload + assert received_body["request_type"] == payload_type def test_telemetry_writer_agent_setup():