Skip to content

Commit

Permalink
feat: add dependency injection framework
Browse files Browse the repository at this point in the history
  • Loading branch information
woile committed Nov 26, 2024
1 parent 079e2a8 commit 3ae6e78
Show file tree
Hide file tree
Showing 28 changed files with 1,102 additions and 93 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
{
"machine_info": {
"node": "Woile-MacBook-Pro.local",
"processor": "arm",
"machine": "arm64",
"python_compiler": "Clang 16.0.6 ",
"python_implementation": "CPython",
"python_implementation_version": "3.11.10",
"python_version": "3.11.10",
"python_build": [
"main",
"Sep 7 2024 01:03:31"
],
"release": "24.1.0",
"system": "Darwin",
"cpu": {
"python_version": "3.11.10.final.0 (64 bit)",
"cpuinfo_version": [
9,
0,
0
],
"cpuinfo_version_string": "9.0.0",
"arch": "ARM_8",
"bits": 64,
"count": 12,
"arch_string_raw": "arm64",
"brand_raw": "Apple M3 Pro"
}
},
"commit_info": {
"id": "4beaca18138343fa989b1283ae577de131abb733",
"time": "2024-11-26T16:04:54+01:00",
"author_time": "2022-09-17T09:45:33+02:00",
"dirty": true,
"project": "kstreams",
"branch": "feat/dependency-injection"
},
"benchmarks": [
{
"group": null,
"name": "test_startup_and_processing_single_consumer_record",
"fullname": "tests/test_benchmarks.py::test_startup_and_processing_single_consumer_record",
"params": null,
"param": null,
"extra_info": {},
"options": {
"disable_gc": false,
"timer": "perf_counter",
"min_rounds": 5,
"max_time": 1.0,
"min_time": 5e-06,
"warmup": false
},
"stats": {
"min": 0.00010604201816022396,
"max": 0.010822750016814098,
"mean": 0.00016831592185808216,
"stddev": 0.0003149759900475096,
"rounds": 1544,
"median": 0.00013091700384393334,
"iqr": 1.879199407994747e-05,
"q1": 0.00012045800394844264,
"q3": 0.0001392499980283901,
"iqr_outliers": 114,
"stddev_outliers": 63,
"outliers": "63;114",
"ld15iqr": 0.00010604201816022396,
"hd15iqr": 0.000167582998983562,
"ops": 5941.2085853836425,
"total": 0.25987978334887885,
"iterations": 1
}
},
{
"group": null,
"name": "test_startup_and_inject_all",
"fullname": "tests/test_benchmarks.py::test_startup_and_inject_all",
"params": null,
"param": null,
"extra_info": {},
"options": {
"disable_gc": false,
"timer": "perf_counter",
"min_rounds": 5,
"max_time": 1.0,
"min_time": 5e-06,
"warmup": false
},
"stats": {
"min": 0.0001525410043541342,
"max": 0.03395479201572016,
"mean": 0.00024245004325373947,
"stddev": 0.0008005369642820076,
"rounds": 4560,
"median": 0.00021754149929620326,
"iqr": 5.6313510867767036e-05,
"q1": 0.00018539548909757286,
"q3": 0.0002417089999653399,
"iqr_outliers": 65,
"stddev_outliers": 5,
"outliers": "5;65",
"ld15iqr": 0.0001525410043541342,
"hd15iqr": 0.00034166700788773596,
"ops": 4124.561029479529,
"total": 1.105572197237052,
"iterations": 1
}
},
{
"group": null,
"name": "test_consume_many",
"fullname": "tests/test_benchmarks.py::test_consume_many",
"params": null,
"param": null,
"extra_info": {},
"options": {
"disable_gc": false,
"timer": "perf_counter",
"min_rounds": 5,
"max_time": 1.0,
"min_time": 5e-06,
"warmup": false
},
"stats": {
"min": 0.0034218749788124114,
"max": 0.004076749988598749,
"mean": 0.0034961712951928296,
"stddev": 7.397271227296705e-05,
"rounds": 268,
"median": 0.00347295799292624,
"iqr": 7.147900760173798e-05,
"q1": 0.003452624994679354,
"q3": 0.003524104002281092,
"iqr_outliers": 14,
"stddev_outliers": 24,
"outliers": "24;14",
"ld15iqr": 0.0034218749788124114,
"hd15iqr": 0.0036370840098243207,
"ops": 286.02717532032295,
"total": 0.9369739071116783,
"iterations": 1
}
}
],
"datetime": "2024-11-26T15:14:17.201596+00:00",
"version": "5.1.0"
}
4 changes: 2 additions & 2 deletions .github/workflows/bench-release.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: Bump version
name: Benchmark latest release

on:
push:
Expand Down Expand Up @@ -46,5 +46,5 @@ jobs:
git config --global user.email "[email protected]"
git config --global user.name "GitHub Action"
git add .benchmarks/
git commit -m "bench: bench: add benchmark current release"
git commit -m "bench: current release"
git push origin master
40 changes: 34 additions & 6 deletions .github/workflows/pr-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ on:
required: true

jobs:
build_test_bench:
test:
runs-on: ubuntu-latest
strategy:
matrix:
Expand Down Expand Up @@ -56,15 +56,43 @@ jobs:
git config --global user.email "[email protected]"
git config --global user.name "GitHub Action"
./scripts/test
- name: Benchmark regression test
run: |
./scripts/bench-compare
- name: Upload coverage to Codecov
uses: codecov/[email protected]
with:
file: ./coverage.xml
name: kstreams
fail_ci_if_error: true
token: ${{secrets.CODECOV_TOKEN}}
bench:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Setup python
uses: actions/setup-python@v5
with:
python-version: '3.13'
architecture: x64
- name: Set Cache
uses: actions/cache@v4
id: cache # name for referring later
with:
path: .venv/
# The cache key depends on poetry.lock
key: ${{ runner.os }}-cache-${{ hashFiles('poetry.lock') }}
restore-keys: |
${{ runner.os }}-cache-
${{ runner.os }}-
- name: Install Dependencies
# if: steps.cache.outputs.cache-hit != 'true'
run: |
python -m pip install -U pip poetry
poetry --version
poetry config --local virtualenvs.in-project true
poetry install
- name: Benchmark regression test
run: |
./scripts/bench-current
./scripts/bench-compare
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ if __name__ == "__main__":
- [ ] Store (kafka streams pattern)
- [ ] Stream Join
- [ ] Windowing
- [ ] PEP 593

## Development

Expand Down
6 changes: 6 additions & 0 deletions kstreams/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from aiokafka.structs import RecordMetadata, TopicPartition

from ._di.parameters import FromHeader, Header
from .backends.kafka import Kafka
from .clients import Consumer, Producer
from .create import StreamEngine, create_engine
from .prometheus.monitor import PrometheusMonitor, PrometheusMonitorType
Expand Down Expand Up @@ -31,4 +33,8 @@
"TestStreamClient",
"TopicPartition",
"TopicPartitionOffset",
"Kafka",
"StreamDependencyManager",
"FromHeader",
"Header",
]
68 changes: 68 additions & 0 deletions kstreams/_di/binders/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import inspect
from typing import Any, AsyncIterator, Awaitable, Protocol, TypeVar, Union

from di.api.dependencies import CacheKey
from di.dependent import Dependent, Marker

from kstreams.types import ConsumerRecord


class ExtractorTrait(Protocol):
"""Implement to extract data from incoming `ConsumerRecord`.
Consumers will always work with a consumer Record.
Implementing this would let you extract information from the `ConsumerRecord`.
"""

def __hash__(self) -> int:
"""Required by di in order to cache the deps"""
...

def __eq__(self, __o: object) -> bool:
"""Required by di in order to cache the deps"""
...

async def extract(
self, consumer_record: ConsumerRecord
) -> Union[Awaitable[Any], AsyncIterator[Any]]:
"""This is where the magic should happen.
For example, you could "extract" here a json from the `ConsumerRecord.value`
"""
...


T = TypeVar("T", covariant=True)


class MarkerTrait(Protocol[T]):
def register_parameter(self, param: inspect.Parameter) -> T: ...


class Binder(Dependent[Any]):
def __init__(
self,
*,
extractor: ExtractorTrait,
) -> None:
super().__init__(call=extractor.extract, scope="consumer_record")
self.extractor = extractor

@property
def cache_key(self) -> CacheKey:
return self.extractor


class BinderMarker(Marker):
"""Bind together the different dependencies.
NETX: Add asyncapi marker here, like `MarkerTrait[AsyncApiTrait]`.
Recommendation to wait until 3.0:
- [#618](https://github.com/asyncapi/spec/issues/618)
"""

def __init__(self, *, extractor_marker: MarkerTrait[ExtractorTrait]) -> None:
self.extractor_marker = extractor_marker

def register_parameter(self, param: inspect.Parameter) -> Binder:
return Binder(extractor=self.extractor_marker.register_parameter(param))
44 changes: 44 additions & 0 deletions kstreams/_di/binders/header.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import inspect
from typing import Any, NamedTuple, Optional

from kstreams.exceptions import HeaderNotFound
from kstreams.types import ConsumerRecord


class HeaderExtractor(NamedTuple):
name: str

def __hash__(self) -> int:
return hash((self.__class__, self.name))

def __eq__(self, __o: object) -> bool:
return isinstance(__o, HeaderExtractor) and __o.name == self.name

async def extract(self, consumer_record: ConsumerRecord) -> Any:
headers = dict(consumer_record.headers)
try:
header = headers[self.name]
except KeyError as e:
message = (
f"No header `{self.name}` found.\n"
"Check if your broker is sending the header.\n"
"Try adding a default value to your parameter like `None`.\n"
"Or set `convert_underscores = False`."
)
raise HeaderNotFound(message) from e
else:
return header


class HeaderMarker(NamedTuple):
alias: Optional[str]
convert_underscores: bool

def register_parameter(self, param: inspect.Parameter) -> HeaderExtractor:
if self.alias is not None:
name = self.alias
elif self.convert_underscores:
name = param.name.replace("_", "-")
else:
name = param.name
return HeaderExtractor(name=name)
Loading

0 comments on commit 3ae6e78

Please sign in to comment.