Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/di class #260

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
{
"machine_info": {
"node": "Woile-MacBook-Pro.local",
"processor": "arm",
"machine": "arm64",
"python_compiler": "Clang 16.0.6 ",
"python_implementation": "CPython",
"python_implementation_version": "3.11.10",
"python_version": "3.11.10",
"python_build": [
"main",
"Sep 7 2024 01:03:31"
],
"release": "24.2.0",
"system": "Darwin",
"cpu": {
"python_version": "3.11.10.final.0 (64 bit)",
"cpuinfo_version": [
9,
0,
0
],
"cpuinfo_version_string": "9.0.0",
"arch": "ARM_8",
"bits": 64,
"count": 12,
"arch_string_raw": "arm64",
"brand_raw": "Apple M3 Pro"
}
},
"commit_info": {
"id": "ce85f3617c6291590250df4dd5247280067a4773",
"time": "2024-12-30T18:30:06+01:00",
"author_time": "2024-12-30T18:30:06+01:00",
"dirty": true,
"project": "kstreams",
"branch": "feat/di-class"
},
"benchmarks": [
{
"group": null,
"name": "test_startup_and_processing_single_consumer_record",
"fullname": "tests/test_benchmarks.py::test_startup_and_processing_single_consumer_record",
"params": null,
"param": null,
"extra_info": {},
"options": {
"disable_gc": false,
"timer": "perf_counter",
"min_rounds": 5,
"max_time": 1.0,
"min_time": 5e-06,
"warmup": false
},
"stats": {
"min": 4.329101648181677e-05,
"max": 0.010624374961480498,
"mean": 0.00010476869028665672,
"stddev": 0.00014913532189803988,
"rounds": 5273,
"median": 0.0001015830785036087,
"iqr": 5.8636273024603724e-05,
"q1": 7.308297790586948e-05,
"q3": 0.0001317192509304732,
"iqr_outliers": 11,
"stddev_outliers": 11,
"outliers": "11;11",
"ld15iqr": 4.329101648181677e-05,
"hd15iqr": 0.0002988340565934777,
"ops": 9544.836317643263,
"total": 0.5524453038815409,
"iterations": 1
}
},
{
"group": null,
"name": "test_startup_and_inject_all",
"fullname": "tests/test_benchmarks.py::test_startup_and_inject_all",
"params": null,
"param": null,
"extra_info": {},
"options": {
"disable_gc": false,
"timer": "perf_counter",
"min_rounds": 5,
"max_time": 1.0,
"min_time": 5e-06,
"warmup": false
},
"stats": {
"min": 4.475004971027374e-05,
"max": 0.019209666061215103,
"mean": 0.00021860343115481218,
"stddev": 0.00027677358158200696,
"rounds": 14520,
"median": 0.00020762498024851084,
"iqr": 0.0001650420017540455,
"q1": 0.0001278329873457551,
"q3": 0.0002928749890998006,
"iqr_outliers": 37,
"stddev_outliers": 46,
"outliers": "46;37",
"ld15iqr": 4.475004971027374e-05,
"hd15iqr": 0.0005416660569608212,
"ops": 4574.493614840898,
"total": 3.1741218203678727,
"iterations": 1
}
},
{
"group": null,
"name": "test_consume_many",
"fullname": "tests/test_benchmarks.py::test_consume_many",
"params": null,
"param": null,
"extra_info": {},
"options": {
"disable_gc": false,
"timer": "perf_counter",
"min_rounds": 5,
"max_time": 1.0,
"min_time": 5e-06,
"warmup": false
},
"stats": {
"min": 0.0006425828905776143,
"max": 0.0007407079683616757,
"mean": 0.0006802422760321444,
"stddev": 2.059963208525067e-05,
"rounds": 1207,
"median": 0.0006902500754222274,
"iqr": 3.9405771531164646e-05,
"q1": 0.0006585522787645459,
"q3": 0.0006979580502957106,
"iqr_outliers": 0,
"stddev_outliers": 496,
"outliers": "496;0",
"ld15iqr": 0.0006425828905776143,
"hd15iqr": 0.0007407079683616757,
"ops": 1470.064468549358,
"total": 0.8210524271707982,
"iterations": 1
}
}
],
"datetime": "2024-12-30T17:33:48.146167+00:00",
"version": "5.1.0"
}
13 changes: 1 addition & 12 deletions .github/workflows/bench-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,8 @@ jobs:
- name: Setup python
uses: actions/setup-python@v5
with:
python-version: '3.13'
python-version: '3.10'
architecture: x64
- name: Set Cache
uses: actions/cache@v4
id: cache # name for referring later
with:
path: .venv/
# The cache key depends on poetry.lock
key: ${{ runner.os }}-cache-${{ hashFiles('poetry.lock') }}
restore-keys: |
${{ runner.os }}-cache-
${{ runner.os }}-

- name: Install Dependencies
# if: steps.cache.outputs.cache-hit != 'true'
run: |
Expand Down
12 changes: 1 addition & 11 deletions .github/workflows/pr-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,8 @@ jobs:
- name: Setup python
uses: actions/setup-python@v5
with:
python-version: '3.13'
python-version: '3.10'
architecture: x64
- name: Set Cache
uses: actions/cache@v4
id: cache # name for referring later
with:
path: .venv/
# The cache key depends on poetry.lock
key: ${{ runner.os }}-cache-${{ hashFiles('poetry.lock') }}
restore-keys: |
${{ runner.os }}-cache-
${{ runner.os }}-
- name: Install Dependencies
# if: steps.cache.outputs.cache-hit != 'true'
run: |
Expand Down
63 changes: 52 additions & 11 deletions kstreams/middleware/udf_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,70 @@ async def anext(async_gen: typing.AsyncGenerator):
return await async_gen.__anext__()


class UdfParam(typing.NamedTuple):
annotation: type
args: typing.Tuple[typing.Any]
is_generic: bool = False


def build_params(signature: inspect.Signature) -> typing.List[UdfParam]:
return [
UdfParam(
typing.get_origin(param.annotation) or param.annotation,
typing.get_args(param.annotation),
typing.get_origin(param.annotation) is not None,
)
for param in signature.parameters.values()
]


class UdfHandler(BaseMiddleware):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
signature = inspect.signature(self.next_call)
self.params: typing.List[typing.Any] = [
typing.get_origin(param.annotation) or param.annotation
self.params: typing.List[UdfParam] = [
UdfParam(
typing.get_origin(param.annotation) or param.annotation,
typing.get_args(param.annotation),
typing.get_origin(param.annotation) is not None,
)
for param in signature.parameters.values()
]
self.type: UDFType = setup_type(self.params)

def get_type(self) -> UDFType:
return self.type
self.type: UDFType = setup_type([p.annotation for p in self.params])

def bind_udf_params(self, cr: types.ConsumerRecord) -> typing.List:
# NOTE: When `no typing` support is deprecated then this can
# be more eficient as the CR will be always there.
ANNOTATIONS_TO_PARAMS = {
types.ConsumerRecord: cr,
self.ANNOTATIONS_TO_PARAMS: dict[type, typing.Any] = {
types.ConsumerRecord: None,
Stream: self.stream,
types.Send: self.send,
}

return [ANNOTATIONS_TO_PARAMS[param_type] for param_type in self.params]
def get_type(self) -> UDFType:
"""Used by the stream_engine to know whether to call this middleware or not."""
return self.type

def build_param(self, param: UdfParam, cr: types.ConsumerRecord) -> type:
if (
param.annotation is types.ConsumerRecord
and param.is_generic
and len(param.args) == 2 # guarantees ConsumerRecord has two args
):
cr_type = param.args[1]

# Check if it's compatible with a pydantic model
if hasattr(cr_type, "model_validate"):
pydantic_value = cr_type.model_validate(cr.value)
self.ANNOTATIONS_TO_PARAMS[types.ConsumerRecord] = cr._replace(
value=pydantic_value
)
return param.annotation

def bind_udf_params(self, cr: types.ConsumerRecord) -> typing.List:
self.ANNOTATIONS_TO_PARAMS[types.ConsumerRecord] = cr
return [
self.ANNOTATIONS_TO_PARAMS[self.build_param(param, cr)]
for param in self.params
]

async def __call__(self, cr: types.ConsumerRecord) -> typing.Any:
"""
Expand Down
2 changes: 1 addition & 1 deletion kstreams/streams_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# NOTE: remove this module when Stream with `no typing` support is deprecated


def setup_type(params: List[inspect.Parameter]) -> UDFType:
def setup_type(params: List[type]) -> UDFType:
"""
Inspect the user defined function (coroutine) to get the proper way to call it

Expand Down
19 changes: 19 additions & 0 deletions kstreams/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,25 @@ def __call__(

@dataclass
class ConsumerRecord(typing.Generic[KT, VT]):
"""
ConsumerRecord represents a record received from a Kafka topic.

Attributes:
topic (str): The topic this record is received from.
partition (int): The partition from which this record is received.
offset (int): The position of this record in the corresponding Kafka partition.
timestamp (int): The timestamp of this record.
timestamp_type (int): The timestamp type of this record.
key (Optional[KT]): The key (or `None` if no key is specified).
value (Optional[VT]): The value.
checksum (Optional[int]): Deprecated.
serialized_key_size (int): The size of the serialized,
uncompressed key in bytes.
serialized_value_size (int): The size of the serialized,
uncompressed value in bytes.
headers (EncodedHeaders): The headers.
"""

topic: str
"The topic this record is received from"

Expand Down
60 changes: 60 additions & 0 deletions tests/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from unittest import mock

import pytest
from pydantic import BaseModel

from kstreams import ConsumerRecord, Send, TopicPartition
from kstreams.clients import Consumer, Producer
Expand Down Expand Up @@ -124,6 +125,65 @@ async def stream(cr: ConsumerRecord[str, bytes]):
await stream.stop()


@pytest.mark.asyncio
async def test_stream_generic_cr_with_pydantic_type(
stream_engine: StreamEngine, consumer_record_factory
):
"""Allow to use Pydantic models as generic types in ConsumerRecord.

```python
class Customer(BaseModel):
id: int

@stream_engine.stream("local--kstreams")
async def stream(cr: ConsumerRecord[str, Customer]):
assert cr.value.id == 1
```
"""

class Profile(BaseModel):
name: str

class Customer(BaseModel):
id: int
profile: Profile

data = {"id": 1, "profile": {"name": "John"}}
topic_name = "local--kstreams"
value = "John"

async def getone(_):
return consumer_record_factory(value=data)

with mock.patch.multiple(
Consumer,
start=mock.DEFAULT,
subscribe=mock.DEFAULT,
getone=getone,
):

@stream_engine.stream(topic_name)
async def stream(cr: ConsumerRecord[str, Customer]):
if cr.value is None:
raise ValueError("Value is None")
assert cr.value.profile.name == value
await asyncio.sleep(0.1)

assert stream.consumer is None
assert stream.topics == [topic_name]

with contextlib.suppress(TimeoutErrorException):
# now it is possible to run a stream directly, so we need
# to stop the `forever` consumption
await asyncio.wait_for(stream.start(), timeout=0.1)

assert stream.consumer
Consumer.subscribe.assert_called_once_with(
topics=[topic_name], listener=stream.rebalance_listener, pattern=None
)
await stream.stop()


@pytest.mark.asyncio
async def test_stream_cr_and_stream_with_typing(
stream_engine: StreamEngine, consumer_record_factory
Expand Down
Loading