From 20a3290b7a82de46b57a531760ab0bcdd081933f Mon Sep 17 00:00:00 2001 From: Santiago Fraire Willemoes Date: Mon, 30 Dec 2024 18:29:40 +0100 Subject: [PATCH 1/4] fix: add docs to ConsumerRecord --- kstreams/types.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/kstreams/types.py b/kstreams/types.py index 9010772..20839bc 100644 --- a/kstreams/types.py +++ b/kstreams/types.py @@ -35,6 +35,25 @@ def __call__( @dataclass class ConsumerRecord(typing.Generic[KT, VT]): + """ + ConsumerRecord represents a record received from a Kafka topic. + + Attributes: + topic (str): The topic this record is received from. + partition (int): The partition from which this record is received. + offset (int): The position of this record in the corresponding Kafka partition. + timestamp (int): The timestamp of this record. + timestamp_type (int): The timestamp type of this record. + key (Optional[KT]): The key (or `None` if no key is specified). + value (Optional[VT]): The value. + checksum (Optional[int]): Deprecated. + serialized_key_size (int): The size of the serialized, + uncompressed key in bytes. + serialized_value_size (int): The size of the serialized, + uncompressed value in bytes. + headers (EncodedHeaders): The headers. + """ + topic: str "The topic this record is received from" From 68d862dad19a241e4524170f476bb9f62aa49c48 Mon Sep 17 00:00:00 2001 From: Santiago Fraire Willemoes Date: Mon, 30 Dec 2024 18:30:06 +0100 Subject: [PATCH 2/4] feat: add support for pydantic in ConsumerRecord generic value --- ...73_20241230_173342_uncommited-changes.json | 148 ++++++++++++++++++ kstreams/middleware/udf_middleware.py | 59 +++++-- kstreams/streams_utils.py | 2 +- tests/test_streams.py | 60 +++++++ 4 files changed, 257 insertions(+), 12 deletions(-) create mode 100644 .benchmarks/Darwin-CPython-3.11-64bit/0003_ce85f3617c6291590250df4dd5247280067a4773_20241230_173342_uncommited-changes.json diff --git a/.benchmarks/Darwin-CPython-3.11-64bit/0003_ce85f3617c6291590250df4dd5247280067a4773_20241230_173342_uncommited-changes.json b/.benchmarks/Darwin-CPython-3.11-64bit/0003_ce85f3617c6291590250df4dd5247280067a4773_20241230_173342_uncommited-changes.json new file mode 100644 index 0000000..e122ff7 --- /dev/null +++ b/.benchmarks/Darwin-CPython-3.11-64bit/0003_ce85f3617c6291590250df4dd5247280067a4773_20241230_173342_uncommited-changes.json @@ -0,0 +1,148 @@ +{ + "machine_info": { + "node": "Woile-MacBook-Pro.local", + "processor": "arm", + "machine": "arm64", + "python_compiler": "Clang 16.0.6 ", + "python_implementation": "CPython", + "python_implementation_version": "3.11.10", + "python_version": "3.11.10", + "python_build": [ + "main", + "Sep 7 2024 01:03:31" + ], + "release": "24.2.0", + "system": "Darwin", + "cpu": { + "python_version": "3.11.10.final.0 (64 bit)", + "cpuinfo_version": [ + 9, + 0, + 0 + ], + "cpuinfo_version_string": "9.0.0", + "arch": "ARM_8", + "bits": 64, + "count": 12, + "arch_string_raw": "arm64", + "brand_raw": "Apple M3 Pro" + } + }, + "commit_info": { + "id": "ce85f3617c6291590250df4dd5247280067a4773", + "time": "2024-12-30T18:30:06+01:00", + "author_time": "2024-12-30T18:30:06+01:00", + "dirty": true, + "project": "kstreams", + "branch": "feat/di-class" + }, + "benchmarks": [ + { + "group": null, + "name": "test_startup_and_processing_single_consumer_record", + "fullname": "tests/test_benchmarks.py::test_startup_and_processing_single_consumer_record", + "params": null, + "param": null, + "extra_info": {}, + "options": { + "disable_gc": false, + "timer": "perf_counter", + "min_rounds": 5, + "max_time": 1.0, + "min_time": 5e-06, + "warmup": false + }, + "stats": { + "min": 4.329101648181677e-05, + "max": 0.010624374961480498, + "mean": 0.00010476869028665672, + "stddev": 0.00014913532189803988, + "rounds": 5273, + "median": 0.0001015830785036087, + "iqr": 5.8636273024603724e-05, + "q1": 7.308297790586948e-05, + "q3": 0.0001317192509304732, + "iqr_outliers": 11, + "stddev_outliers": 11, + "outliers": "11;11", + "ld15iqr": 4.329101648181677e-05, + "hd15iqr": 0.0002988340565934777, + "ops": 9544.836317643263, + "total": 0.5524453038815409, + "iterations": 1 + } + }, + { + "group": null, + "name": "test_startup_and_inject_all", + "fullname": "tests/test_benchmarks.py::test_startup_and_inject_all", + "params": null, + "param": null, + "extra_info": {}, + "options": { + "disable_gc": false, + "timer": "perf_counter", + "min_rounds": 5, + "max_time": 1.0, + "min_time": 5e-06, + "warmup": false + }, + "stats": { + "min": 4.475004971027374e-05, + "max": 0.019209666061215103, + "mean": 0.00021860343115481218, + "stddev": 0.00027677358158200696, + "rounds": 14520, + "median": 0.00020762498024851084, + "iqr": 0.0001650420017540455, + "q1": 0.0001278329873457551, + "q3": 0.0002928749890998006, + "iqr_outliers": 37, + "stddev_outliers": 46, + "outliers": "46;37", + "ld15iqr": 4.475004971027374e-05, + "hd15iqr": 0.0005416660569608212, + "ops": 4574.493614840898, + "total": 3.1741218203678727, + "iterations": 1 + } + }, + { + "group": null, + "name": "test_consume_many", + "fullname": "tests/test_benchmarks.py::test_consume_many", + "params": null, + "param": null, + "extra_info": {}, + "options": { + "disable_gc": false, + "timer": "perf_counter", + "min_rounds": 5, + "max_time": 1.0, + "min_time": 5e-06, + "warmup": false + }, + "stats": { + "min": 0.0006425828905776143, + "max": 0.0007407079683616757, + "mean": 0.0006802422760321444, + "stddev": 2.059963208525067e-05, + "rounds": 1207, + "median": 0.0006902500754222274, + "iqr": 3.9405771531164646e-05, + "q1": 0.0006585522787645459, + "q3": 0.0006979580502957106, + "iqr_outliers": 0, + "stddev_outliers": 496, + "outliers": "496;0", + "ld15iqr": 0.0006425828905776143, + "hd15iqr": 0.0007407079683616757, + "ops": 1470.064468549358, + "total": 0.8210524271707982, + "iterations": 1 + } + } + ], + "datetime": "2024-12-30T17:33:48.146167+00:00", + "version": "5.1.0" +} \ No newline at end of file diff --git a/kstreams/middleware/udf_middleware.py b/kstreams/middleware/udf_middleware.py index 7ba4d2d..fb7bfa8 100644 --- a/kstreams/middleware/udf_middleware.py +++ b/kstreams/middleware/udf_middleware.py @@ -14,29 +14,66 @@ async def anext(async_gen: typing.AsyncGenerator): return await async_gen.__anext__() +class UdfParam(typing.NamedTuple): + annotation: type + args: typing.Tuple[typing.Any] + is_generic: bool = False + + +def build_params(signature: inspect.Signature) -> typing.List[UdfParam]: + return [ + UdfParam( + typing.get_origin(param.annotation) or param.annotation, + typing.get_args(param.annotation), + typing.get_origin(param.annotation) is not None, + ) + for param in signature.parameters.values() + ] + + class UdfHandler(BaseMiddleware): def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) signature = inspect.signature(self.next_call) - self.params: typing.List[typing.Any] = [ - typing.get_origin(param.annotation) or param.annotation + self.params: typing.List[UdfParam] = [ + UdfParam( + typing.get_origin(param.annotation) or param.annotation, + typing.get_args(param.annotation), + typing.get_origin(param.annotation) is not None, + ) for param in signature.parameters.values() ] - self.type: UDFType = setup_type(self.params) - def get_type(self) -> UDFType: - return self.type + self.type: UDFType = setup_type([p.annotation for p in self.params]) - def bind_udf_params(self, cr: types.ConsumerRecord) -> typing.List: - # NOTE: When `no typing` support is deprecated then this can - # be more eficient as the CR will be always there. - ANNOTATIONS_TO_PARAMS = { - types.ConsumerRecord: cr, + self.ANNOTATIONS_TO_PARAMS: dict[type, typing.Any] = { + types.ConsumerRecord: None, Stream: self.stream, types.Send: self.send, } - return [ANNOTATIONS_TO_PARAMS[param_type] for param_type in self.params] + def get_type(self) -> UDFType: + """Used by the stream_engine to know whether to call this middleware or not.""" + return self.type + + def bind_udf_params(self, cr: types.ConsumerRecord) -> typing.List: + self.ANNOTATIONS_TO_PARAMS[types.ConsumerRecord] = cr + + args = [] + for param in self.params: + if param.annotation is types.ConsumerRecord and param.is_generic: + if len(param.args) == 2: + cr_type = param.args[1] + + # Check if it's compatible with a pydantic model + if hasattr(cr_type, "model_validate"): + pydantic_value = cr_type.model_validate(cr.value) + self.ANNOTATIONS_TO_PARAMS[types.ConsumerRecord] = cr._replace( + value=pydantic_value + ) + args.append(self.ANNOTATIONS_TO_PARAMS[param.annotation]) + + return args async def __call__(self, cr: types.ConsumerRecord) -> typing.Any: """ diff --git a/kstreams/streams_utils.py b/kstreams/streams_utils.py index d2c0841..922cdbd 100644 --- a/kstreams/streams_utils.py +++ b/kstreams/streams_utils.py @@ -7,7 +7,7 @@ # NOTE: remove this module when Stream with `no typing` support is deprecated -def setup_type(params: List[inspect.Parameter]) -> UDFType: +def setup_type(params: List[type]) -> UDFType: """ Inspect the user defined function (coroutine) to get the proper way to call it diff --git a/tests/test_streams.py b/tests/test_streams.py index 7ab253f..eb87861 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -4,6 +4,7 @@ from unittest import mock import pytest +from pydantic import BaseModel from kstreams import ConsumerRecord, Send, TopicPartition from kstreams.clients import Consumer, Producer @@ -124,6 +125,65 @@ async def stream(cr: ConsumerRecord[str, bytes]): await stream.stop() +@pytest.mark.asyncio +async def test_stream_generic_cr_with_pydantic_type( + stream_engine: StreamEngine, consumer_record_factory +): + """Allow to use Pydantic models as generic types in ConsumerRecord. + + ```python + class Customer(BaseModel): + id: int + + @stream_engine.stream("local--kstreams") + async def stream(cr: ConsumerRecord[str, Customer]): + assert cr.value.id == 1 + ``` + """ + + class Profile(BaseModel): + name: str + + class Customer(BaseModel): + id: int + profile: Profile + + data = {"id": 1, "profile": {"name": "John"}} + topic_name = "local--kstreams" + value = "John" + + async def getone(_): + return consumer_record_factory(value=data) + + with mock.patch.multiple( + Consumer, + start=mock.DEFAULT, + subscribe=mock.DEFAULT, + getone=getone, + ): + + @stream_engine.stream(topic_name) + async def stream(cr: ConsumerRecord[str, Customer]): + if cr.value is None: + raise ValueError("Value is None") + assert cr.value.profile.name == value + await asyncio.sleep(0.1) + + assert stream.consumer is None + assert stream.topics == [topic_name] + + with contextlib.suppress(TimeoutErrorException): + # now it is possible to run a stream directly, so we need + # to stop the `forever` consumption + await asyncio.wait_for(stream.start(), timeout=0.1) + + assert stream.consumer + Consumer.subscribe.assert_called_once_with( + topics=[topic_name], listener=stream.rebalance_listener, pattern=None + ) + await stream.stop() + + @pytest.mark.asyncio async def test_stream_cr_and_stream_with_typing( stream_engine: StreamEngine, consumer_record_factory From 3c474a9873e75e9feba153fe73023eaa0c23d9e0 Mon Sep 17 00:00:00 2001 From: Santiago Fraire Willemoes Date: Mon, 30 Dec 2024 18:53:50 +0100 Subject: [PATCH 3/4] ci: remove cache from bench and use same python version as current benches --- .github/workflows/bench-release.yml | 13 +------------ .github/workflows/pr-tests.yaml | 12 +----------- 2 files changed, 2 insertions(+), 23 deletions(-) diff --git a/.github/workflows/bench-release.yml b/.github/workflows/bench-release.yml index 4bea850..60a96dc 100644 --- a/.github/workflows/bench-release.yml +++ b/.github/workflows/bench-release.yml @@ -18,19 +18,8 @@ jobs: - name: Setup python uses: actions/setup-python@v5 with: - python-version: '3.13' + python-version: '3.10' architecture: x64 - - name: Set Cache - uses: actions/cache@v4 - id: cache # name for referring later - with: - path: .venv/ - # The cache key depends on poetry.lock - key: ${{ runner.os }}-cache-${{ hashFiles('poetry.lock') }} - restore-keys: | - ${{ runner.os }}-cache- - ${{ runner.os }}- - - name: Install Dependencies # if: steps.cache.outputs.cache-hit != 'true' run: | diff --git a/.github/workflows/pr-tests.yaml b/.github/workflows/pr-tests.yaml index 9b2c716..fc969d8 100644 --- a/.github/workflows/pr-tests.yaml +++ b/.github/workflows/pr-tests.yaml @@ -69,18 +69,8 @@ jobs: - name: Setup python uses: actions/setup-python@v5 with: - python-version: '3.13' + python-version: '3.10' architecture: x64 - - name: Set Cache - uses: actions/cache@v4 - id: cache # name for referring later - with: - path: .venv/ - # The cache key depends on poetry.lock - key: ${{ runner.os }}-cache-${{ hashFiles('poetry.lock') }} - restore-keys: | - ${{ runner.os }}-cache- - ${{ runner.os }}- - name: Install Dependencies # if: steps.cache.outputs.cache-hit != 'true' run: | From cd50778cc8d361039fb80ca1bb4734bff83f4e94 Mon Sep 17 00:00:00 2001 From: Santiago Fraire Willemoes Date: Mon, 6 Jan 2025 14:20:36 +0100 Subject: [PATCH 4/4] fix: improve udf code --- kstreams/middleware/udf_middleware.py | 36 +++++++++++++++------------ 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/kstreams/middleware/udf_middleware.py b/kstreams/middleware/udf_middleware.py index fb7bfa8..00dc412 100644 --- a/kstreams/middleware/udf_middleware.py +++ b/kstreams/middleware/udf_middleware.py @@ -56,24 +56,28 @@ def get_type(self) -> UDFType: """Used by the stream_engine to know whether to call this middleware or not.""" return self.type + def build_param(self, param: UdfParam, cr: types.ConsumerRecord) -> type: + if ( + param.annotation is types.ConsumerRecord + and param.is_generic + and len(param.args) == 2 # guarantees ConsumerRecord has two args + ): + cr_type = param.args[1] + + # Check if it's compatible with a pydantic model + if hasattr(cr_type, "model_validate"): + pydantic_value = cr_type.model_validate(cr.value) + self.ANNOTATIONS_TO_PARAMS[types.ConsumerRecord] = cr._replace( + value=pydantic_value + ) + return param.annotation + def bind_udf_params(self, cr: types.ConsumerRecord) -> typing.List: self.ANNOTATIONS_TO_PARAMS[types.ConsumerRecord] = cr - - args = [] - for param in self.params: - if param.annotation is types.ConsumerRecord and param.is_generic: - if len(param.args) == 2: - cr_type = param.args[1] - - # Check if it's compatible with a pydantic model - if hasattr(cr_type, "model_validate"): - pydantic_value = cr_type.model_validate(cr.value) - self.ANNOTATIONS_TO_PARAMS[types.ConsumerRecord] = cr._replace( - value=pydantic_value - ) - args.append(self.ANNOTATIONS_TO_PARAMS[param.annotation]) - - return args + return [ + self.ANNOTATIONS_TO_PARAMS[self.build_param(param, cr)] + for param in self.params + ] async def __call__(self, cr: types.ConsumerRecord) -> typing.Any: """