Skip to content

Commit

Permalink
Merge pull request #8 from Airbase/SM-python3.12
Browse files Browse the repository at this point in the history
Upgrade to python 3.12, upgrade packages, pre-commit hooks
  • Loading branch information
sidmitra authored Sep 10, 2024
2 parents 05c78f0 + a929d85 commit f3ffe2b
Show file tree
Hide file tree
Showing 12 changed files with 661 additions and 764 deletions.
27 changes: 15 additions & 12 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ default_language_version:

repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.3.0
rev: v4.6.0
hooks:
- id: check-added-large-files
- id: check-case-conflict
Expand All @@ -25,33 +25,36 @@ repos:
- id: trailing-whitespace

- repo: https://github.com/asottile/pyupgrade
rev: v2.34.0
rev: v3.17.0
hooks:
- id: pyupgrade
args: ["--py39-plus", "--keep-runtime-typing"]
args: ["--py312-plus", "--keep-runtime-typing"]

- repo: https://github.com/PyCQA/autoflake
rev: v1.4
rev: v2.3.1
hooks:
- id: autoflake
args: [--in-place, --remove-all-unused-import]

- repo: https://github.com/pycqa/isort
rev: 5.10.1
rev: 5.13.2
hooks:
- id: isort
name: isort (python)
- id: isort
name: isort (pyi)
types: [pyi]

- repo: https://github.com/ambv/black
rev: 22.3.0
hooks:
- id: black
args: [--line-length=88, --safe]

- repo: https://github.com/pycqa/flake8
rev: 4.0.1
rev: 7.1.1
hooks:
- id: flake8

- repo: https://github.com/charliermarsh/ruff-pre-commit
rev: v0.6.4 # Upgrade along with version in pyproject.toml dev dependencies
hooks:
- id: ruff
types_or: [ python, pyi ]
args: [ --fix ]
- id: ruff-format
types_or: [ python, pyi ]
4 changes: 2 additions & 2 deletions cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from store import InMemoryStore


def setup_logging():
def setup_logging() -> None:
prod_log_format = (
"%(asctime)s [%(process)d] %(levelname)-8.8s %(name)s: %(message)s"
)
Expand All @@ -31,7 +31,7 @@ def setup_logging():

@click.command()
@click.option("--broker", default="redis://localhost:6379/1", help="celery broker uri")
def run(broker):
def run(broker) -> None:
setup_logging()

# start all the exporters in different threads
Expand Down
3 changes: 1 addition & 2 deletions exporters/base.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from abc import ABC, abstractmethod
from typing import Union

from celery.events.state import Task, Worker


class Exporter(ABC):
@abstractmethod
def process_event(self, event: Union[Task, Worker]):
def process_event(self, event: Task | Worker):
pass
25 changes: 15 additions & 10 deletions exporters/dd.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from enum import Enum
from threading import Thread
from time import sleep
from typing import Union

import daiquiri
from celery.events.state import Task, Worker
Expand All @@ -21,21 +20,22 @@ class DataDogMetrics(Enum):


class DataDogSummary: # maybe this can be generic
def __init__(self, events):
def __init__(self, events) -> None:
self.events = events

@property
def wait_time(self):
"""Returns wait time of a task in seconds.
Note:
-----
----
observation task started from celery-beat don't have pending or task-sent event
Returns
--------
Returns:
-------
int
Wait Time in seconds
"""
try:
client_sent_time = self.events[PENDING]["timestamp"]
Expand All @@ -50,9 +50,10 @@ def run_time(self):
"""Returns execution of a task in seconds.
Returns
--------
-------
int
Run Time in seconds
"""
try:
return self.events[SUCCESS]["runtime"]
Expand All @@ -62,7 +63,7 @@ def run_time(self):


class DataDogExporter(Exporter, Thread):
def __init__(self, config_option=None, store=None):
def __init__(self, config_option=None, store=None) -> None:
Thread.__init__(self)
self.daemon = True
self.store = store
Expand All @@ -84,7 +85,7 @@ def get_tags(events):
except KeyError:
logger.exception(f"Pending Event missing in {events}")

def process_event(self, event: Union[Task, Worker]):
def process_event(self, event: Task | Worker) -> None:
self.store.add_event(event.uuid, event.state, event)
if event.state in READY_STATES:
logger.debug(f"task: {event.uuid} ended with state: {event.state}")
Expand All @@ -103,11 +104,15 @@ def run(self) -> None:
summary = DataDogSummary(events)
if (wait_time := summary.wait_time) is not None:
statsd.histogram(
DataDogMetrics.TASK_WAIT_TIME.value, wait_time, tags=tags
DataDogMetrics.TASK_WAIT_TIME.value,
wait_time,
tags=tags,
)
if (run_time := summary.run_time) is not None:
statsd.histogram(
DataDogMetrics.TASK_RUNTIME_TIME.value, run_time, tags=tags
DataDogMetrics.TASK_RUNTIME_TIME.value,
run_time,
tags=tags,
)
if SUCCESS in events:
statsd.increment(DataDogMetrics.TOTAL_SUCCESS.value, tags=tags)
Expand Down
Loading

0 comments on commit f3ffe2b

Please sign in to comment.