Skip to content

Commit

Permalink
Adding Datadog to worker and scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
aweiland committed Jan 20, 2025
1 parent aaeab69 commit 333ff04
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 26 deletions.
1 change: 0 additions & 1 deletion compose/fastapi/start-scheduler
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,4 @@ set -o errexit
set -o pipefail
set -o nounset


taskiq scheduler scheduler:scheduler -fsd
8 changes: 3 additions & 5 deletions src/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@

class StructlogMiddleware(TaskiqMiddleware):
def pre_execute(
self,
message: TaskiqMessage,
self,
message: TaskiqMessage,
) -> "Union[TaskiqMessage, Coroutine[Any, Any, TaskiqMessage]]":
structlog.contextvars.clear_contextvars()
structlog.contextvars.bind_contextvars(
taskiq_task_id=message.task_id, taskiq_task_name=message.task_name
)
structlog.contextvars.bind_contextvars(taskiq_task_id=message.task_id, taskiq_task_name=message.task_name)

return message

Expand Down
34 changes: 18 additions & 16 deletions src/infrastructure/dependency/structured_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
Much of this is borrowed from: https://gist.github.com/Brymes/cd8f9f138e12845417a246822f64ca26
"""

import logging
import time

Expand Down Expand Up @@ -79,10 +80,10 @@ def setup_structured_logging(json_logs: bool = False, log_level: str = "INFO"):

structlog.configure(
processors=shared_processors
+ [
# Prepare event dict for `ProcessorFormatter`.
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
+ [
# Prepare event dict for `ProcessorFormatter`.
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
Expand Down Expand Up @@ -176,18 +177,19 @@ async def dispatch(self, request: Request, call_next) -> Response:

# Recreate the Uvicorn access log format, but add all parameters as structured information
logger_fn = access_logger.warn if 400 < status_code < 500 else access_logger.info
logger_fn(f"""{real_host}:{client_port} - "{http_method} {url} HTTP/{http_version}" {status_code}""",
http={
"url": str(request.url),
"request_path": str(path),
"status_code": status_code,
"method": http_method,
"request_id": request_id,
"version": http_version,
},
network={"client": {"ip": real_host, "port": client_port}},
duration=process_time,
)
logger_fn(
f"""{real_host}:{client_port} - "{http_method} {url} HTTP/{http_version}" {status_code}""",
http={
"url": str(request.url),
"request_path": str(path),
"status_code": status_code,
"method": http_method,
"request_id": request_id,
"version": http_version,
},
network={"client": {"ip": real_host, "port": client_port}},
duration=process_time,
)

# if 400 < status_code < 500:
# access_logger.warn(
Expand Down
3 changes: 0 additions & 3 deletions src/infrastructure/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import structlog


if os.environ.get("ENV") == "testing":
# Some tests check logging output, so use the old logger
fmt = "%(levelname)s: %(message)s"
Expand All @@ -17,5 +16,3 @@
else:
# Default to structured logger, enable JSON format if env set
logger = structlog.stdlib.get_logger("api") # type: ignore


1 change: 1 addition & 0 deletions src/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Entrypoint for the backend scheduler
"""

import datadog # noqa: F401, I001

from taskiq import TaskiqScheduler
Expand Down
3 changes: 2 additions & 1 deletion src/worker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Entrypoint for the backend worker
"""

import datadog # noqa: F401, I001

from broker import broker as worker # noqa: F401
from broker import broker as worker # noqa: F401

0 comments on commit 333ff04

Please sign in to comment.