From 333ff0440b3d0efe0ca29406cf8256555a8e7d35 Mon Sep 17 00:00:00 2001 From: Andrew Weiland Date: Mon, 20 Jan 2025 15:39:03 -0500 Subject: [PATCH] Adding Datadog to worker and scheduler --- compose/fastapi/start-scheduler | 1 - src/broker.py | 8 ++--- .../dependency/structured_logs.py | 34 ++++++++++--------- src/infrastructure/logger.py | 3 -- src/scheduler.py | 1 + src/worker.py | 3 +- 6 files changed, 24 insertions(+), 26 deletions(-) diff --git a/compose/fastapi/start-scheduler b/compose/fastapi/start-scheduler index 15c1ff209d5..0a86d1bc3f8 100644 --- a/compose/fastapi/start-scheduler +++ b/compose/fastapi/start-scheduler @@ -10,5 +10,4 @@ set -o errexit set -o pipefail set -o nounset - taskiq scheduler scheduler:scheduler -fsd \ No newline at end of file diff --git a/src/broker.py b/src/broker.py index 84358e3c3cc..d07fc3a4b38 100644 --- a/src/broker.py +++ b/src/broker.py @@ -19,13 +19,11 @@ class StructlogMiddleware(TaskiqMiddleware): def pre_execute( - self, - message: TaskiqMessage, + self, + message: TaskiqMessage, ) -> "Union[TaskiqMessage, Coroutine[Any, Any, TaskiqMessage]]": structlog.contextvars.clear_contextvars() - structlog.contextvars.bind_contextvars( - taskiq_task_id=message.task_id, taskiq_task_name=message.task_name - ) + structlog.contextvars.bind_contextvars(taskiq_task_id=message.task_id, taskiq_task_name=message.task_name) return message diff --git a/src/infrastructure/dependency/structured_logs.py b/src/infrastructure/dependency/structured_logs.py index 3b32a883990..7252e2eb2bc 100644 --- a/src/infrastructure/dependency/structured_logs.py +++ b/src/infrastructure/dependency/structured_logs.py @@ -3,6 +3,7 @@ Much of this is borrowed from: https://gist.github.com/Brymes/cd8f9f138e12845417a246822f64ca26 """ + import logging import time @@ -79,10 +80,10 @@ def setup_structured_logging(json_logs: bool = False, log_level: str = "INFO"): structlog.configure( processors=shared_processors - + [ - # Prepare event dict for `ProcessorFormatter`. - structlog.stdlib.ProcessorFormatter.wrap_for_formatter, - ], + + [ + # Prepare event dict for `ProcessorFormatter`. + structlog.stdlib.ProcessorFormatter.wrap_for_formatter, + ], logger_factory=structlog.stdlib.LoggerFactory(), cache_logger_on_first_use=True, ) @@ -176,18 +177,19 @@ async def dispatch(self, request: Request, call_next) -> Response: # Recreate the Uvicorn access log format, but add all parameters as structured information logger_fn = access_logger.warn if 400 < status_code < 500 else access_logger.info - logger_fn(f"""{real_host}:{client_port} - "{http_method} {url} HTTP/{http_version}" {status_code}""", - http={ - "url": str(request.url), - "request_path": str(path), - "status_code": status_code, - "method": http_method, - "request_id": request_id, - "version": http_version, - }, - network={"client": {"ip": real_host, "port": client_port}}, - duration=process_time, - ) + logger_fn( + f"""{real_host}:{client_port} - "{http_method} {url} HTTP/{http_version}" {status_code}""", + http={ + "url": str(request.url), + "request_path": str(path), + "status_code": status_code, + "method": http_method, + "request_id": request_id, + "version": http_version, + }, + network={"client": {"ip": real_host, "port": client_port}}, + duration=process_time, + ) # if 400 < status_code < 500: # access_logger.warn( diff --git a/src/infrastructure/logger.py b/src/infrastructure/logger.py index 34d5e8f0d55..6a28f3aabd0 100644 --- a/src/infrastructure/logger.py +++ b/src/infrastructure/logger.py @@ -3,7 +3,6 @@ import structlog - if os.environ.get("ENV") == "testing": # Some tests check logging output, so use the old logger fmt = "%(levelname)s: %(message)s" @@ -17,5 +16,3 @@ else: # Default to structured logger, enable JSON format if env set logger = structlog.stdlib.get_logger("api") # type: ignore - - diff --git a/src/scheduler.py b/src/scheduler.py index 31baaf14b15..61db60709e8 100644 --- a/src/scheduler.py +++ b/src/scheduler.py @@ -1,6 +1,7 @@ """ Entrypoint for the backend scheduler """ + import datadog # noqa: F401, I001 from taskiq import TaskiqScheduler diff --git a/src/worker.py b/src/worker.py index 67ea43b7d2a..b55ea2a7b12 100644 --- a/src/worker.py +++ b/src/worker.py @@ -1,6 +1,7 @@ """ Entrypoint for the backend worker """ + import datadog # noqa: F401, I001 -from broker import broker as worker # noqa: F401 \ No newline at end of file +from broker import broker as worker # noqa: F401