From 721b6d0dbed583825b22af5b9eed850d4f9e9cee Mon Sep 17 00:00:00 2001 From: marcosschroh Date: Tue, 11 Feb 2025 13:29:10 +0100 Subject: [PATCH] feat(transactions): Transaction added to garantee only once paradigm. Related to #265 --- docs/transactions.md | 231 ++++++++++++ examples/transactions/README.md | 103 +++++ examples/transactions/docker-compose.yaml | 104 +++++ examples/transactions/poetry.lock | 355 ++++++++++++++++++ examples/transactions/pyproject.toml | 19 + examples/transactions/scripts/cluster/start | 3 + examples/transactions/scripts/cluster/stop | 4 + examples/transactions/tests/__init__.py | 0 .../transactions/transactions/__init__.py | 0 examples/transactions/transactions/app.py | 39 ++ examples/transactions/transactions/engine.py | 12 + .../transactions/transactions/producer.py | 19 + .../transactions/transactions/serializers.py | 36 ++ examples/transactions/transactions/streams.py | 67 ++++ kstreams/__init__.py | 5 +- kstreams/engine.py | 44 ++- kstreams/middleware/middleware.py | 5 + kstreams/middleware/udf_middleware.py | 1 + kstreams/test_utils/structs.py | 8 + kstreams/test_utils/test_clients.py | 76 +++- kstreams/test_utils/test_utils.py | 12 +- kstreams/test_utils/topics.py | 67 +++- kstreams/transaction.py | 80 ++++ kstreams/types.py | 12 + mkdocs.yml | 3 + poetry.lock | 16 + pyproject.toml | 1 + tests/conftest.py | 24 +- tests/test_client.py | 137 ++++++- tests/test_serialization.py | 41 +- tests/test_transactions.py | 346 +++++++++++++++++ 31 files changed, 1821 insertions(+), 49 deletions(-) create mode 100644 docs/transactions.md create mode 100644 examples/transactions/README.md create mode 100644 examples/transactions/docker-compose.yaml create mode 100644 examples/transactions/poetry.lock create mode 100644 examples/transactions/pyproject.toml create mode 100755 examples/transactions/scripts/cluster/start create mode 100755 examples/transactions/scripts/cluster/stop create mode 100644 examples/transactions/tests/__init__.py create mode 100644 examples/transactions/transactions/__init__.py create mode 100644 examples/transactions/transactions/app.py create mode 100644 examples/transactions/transactions/engine.py create mode 100644 examples/transactions/transactions/producer.py create mode 100644 examples/transactions/transactions/serializers.py create mode 100644 examples/transactions/transactions/streams.py create mode 100644 kstreams/transaction.py create mode 100644 tests/test_transactions.py diff --git a/docs/transactions.md b/docs/transactions.md new file mode 100644 index 00000000..28597922 --- /dev/null +++ b/docs/transactions.md @@ -0,0 +1,231 @@ +`Kafka 0.11.0` includes support for `idempotent` and `transactional` capabilities in the `producer`. Idempotent delivery ensures that messages are delivered `exactly once` +to a particular topic partition during the lifetime of a `single producer`. Transactional delivery allows producers to send data to multiple partitions such that either +all messages are successfully delivered, or none of them are. Together, these capabilities enable `*exactly once semantics*` in Kafka. + +It is important to notice that: + +- `Transaction` always start from a `send` (producer) +- Events sent to one or more topics will only be visible on consumers after the transaction is committed. +- To use transactions, a `transaction id` (unique id per transaction) must be set prior an event is sent. If you do not provide one `kstreams` will auto generate one for you. +- Transaction state is stored in a new internal topic `__transaction_state`. This topic is not created until the the first attempt to use a transactional request API. There are several settings to control the topic's configuration. +- `Topics` which are included in transactions should be configured for durability. In particular, the `replication.factor` should be at least `3`, and the `min.insync.replicas` for these topics should be set to `2` +- `Transactions` always add overhead, meaning that more effort is needed to produce events and the consumer have to apply filters +For example, `transaction.state.log.min.isr` controls the minimum ISR for this topic. +- `Streams` (consumers) will have to filter or not transactional events + +```plantuml +@startuml +!theme crt-amber + +!include https://raw.githubusercontent.com/plantuml-stdlib/C4-PlantUML/master/C4_Context.puml + +agent "Producer" as producer +agent "Stream A" as stream_a +agent "Stream B" as stream_b +queue "Topic A" as topic_a + +producer -> topic_a: "produce with a transaction" +topic_a --> stream_a: "consume only transactional events" +topic_a --> stream_b: "consume all events" +@enduml +``` + +## Transaction State + +A transaction have state, which is store in the special topic `__transaction_state`. This topic can be tuned in different ways, but it is outside of this guide scope. +The only important detailt that you must be aware of is that a `transaction` can be `commited` or `aborted`. You will not manage the internal states but you are responsible +of filtering or not transactions on the `stream` side. + +```plantuml +@startuml +!theme crt-amber + +!include https://raw.githubusercontent.com/plantuml-stdlib/C4-PlantUML/master/C4_Context.puml + +[*] --> Init +Init : Initialized with a `transaction_id` + +Init -> Commited +Commited --> [*] + +Init -> Aborted +Aborted --> [*] + +@enduml +``` + +## Usage + +From the `kstreams` point of view, the `transaction pattern` is a `context manager` that will `start` a transaction and then `commit` or `aboort` it. Always a `transaction` starts when an event is send. Once that we have the `context` we can send events in two ways: + +Using the `StreamEngine` directly: + +```python +from kstreams import create_engine +from kstreams.backends import Kafka + +from .serializers import JsonSerializer + +stream_engine = create_engine( + title="transaction-engine", + serializer=JsonSerializer(), + backend=Kafka( + bootstrap_servers=["localhost:9091", "localhost:9092", "localhost:9093"], + ), +) + + +transaction_id = "my-transaction-id-" + str(uuid.uuid4()) +async with stream_engine.transaction(transaction_id=transaction_id) as t: + await t.send( + "transactional_topic", + value=f"Transaction id {transaction_id} from argument in coroutine \n".encode(), + ) +``` + +or using the annotation `Transactional`: + +```python title="Transaction inside a stream" +@stream_engine.stream("a-topic", group_id="my-group") +async def stream_a(cr: ConsumerRecord, transactional: Transactional): + transaction_id = "my-transaction-id-" + str(uuid.uuid4()) + async with transactional(transaction_id=transaction_id) as t: + await t.send( + "transactional_topic", + value=f"Transaction id {transaction_id} from argument in coroutine \n".encode(), + ) +``` + +Once the `context` is left then the transaction is `commit` or `aborted`. + +## Commiting offsets + +With transactions it is possible to `commit` consumer offsets. This is quite important feature as it will guarantee that only a consumer offset is commit once the whole transaction as has been commited, including sending events to other topics. +To commit consumer offsets we need the `offsetes` and the `group_id` that we want to commit to. Example: + +```python +@stream_engine.stream("a-topic", group_id="my-group", enable_auto_commit=False) +async def stream_a(cr: ConsumerRecord, transactional: Transactional): + transaction_id = "my-transaction-id-" + str(uuid.uuid4()) + async with transactional(transaction_id=transaction_id) as t: + metadata = await t.send( + transactional_topic, + value=f"Transaction id {transaction_id} from argument in coroutine \n".encode(), + ) + + tp = TopicPartition(topic=cr.topic, partition=cr.partition) + await t.commit_offsets(offsets={tp: cr.offset + 1}, group_id="my-group") + logger.info(f"Message sent inside transaction with metadata: {metadata}") +``` + +!!! note + - The property `enable_auto_commit` must be set to `False` + - When commiting offsets the `group_id` must be the same as the `consumer group_id` + - Offsets is a dictionary of TopicPartition: offset + - If the `transaction` is `aborted` then the commit offset has NOT effect + +## Reading committed events + +It is the `stream` (consumer) responsability to filter commited events. The property `isolation_level="read_committed"` must be used. If the `isolation_level` is NOT set to `read_committed` then events that were sent with an `aborted` transaction are also consumed + +```plantuml +@startuml +!theme crt-amber + +!include https://raw.githubusercontent.com/plantuml-stdlib/C4-PlantUML/master/C4_Context.puml + +agent "Stream A" as stream_a +agent "Stream B" as stream_b +queue "Topic A" as topic_a +queue "Topic B" as topic_b + +topic_a -> stream_a: "consume all events" +stream_a --> topic_b: "produce with a transaction and commit offsets" +topic_b -> stream_b: "filter commited events" +@enduml +``` + +The above diagram implementation is the following: + +```python +@stream( + "transactional_topic", + group_id="another-group", + enable_auto_commit=False, + isolation_level="read_committed", # <-- This will filter aborted txn's +) +async def stream_b(cr: ConsumerRecord, stream: Stream): + logger.info( + f"Event consumed from topic {transactional_topic} with value: {cr.value} \n\n" + ) + await stream.commit() + + +@stream_engine.stream("a-topic", group_id="my-group") +async def stream_a(cr: ConsumerRecord, transactional: Transactional): + transaction_id = "my-transaction-id-" + str(uuid.uuid4()) + async with transactional(transaction_id=transaction_id) as t: + metadata = await t.send( + "transactional_topic", + value=f"Transaction id {transaction_id} from argument in coroutine \n".encode(), + ) + + tp = TopicPartition(topic=cr.topic, partition=cr.partition) + await t.commit_offsets(offsets={tp: cr.offset + 1}, group_id="my-group") + logger.info(f"Message sent inside transaction with metadata: {metadata}") +``` + +## Aborted transactions + +A transaction can be aborted, for example when we have a bug in our code. Consired the following example in which an error is `raised`: + +```python +@stream_engine.stream("a-topic", group_id="my-group") +async def consume_json(cr: ConsumerRecord, transactional: Transactional): + transaction_id = "my-transaction-id-" + str(uuid.uuid4()) + async with transactional(transaction_id=transaction_id) as t: + metadata = await t.send( + transactional_topic, + value=f"Transaction id {transaction_id} from argument in coroutine \n".encode(), + serializer=None, + ) + + tp = TopicPartition(topic=cr.topic, partition=cr.partition) + await t.commit_offsets( + offsets={tp: cr.offset + 1}, group_id="my-group-json-data" + ) + + raise ValueError("This is a test error") +``` + +The code will crash when the line with `raise ValueError` is reached, causing to leave the context manager with an `aborted` transaction. Any error that ocuurs inside the `context manager` will `abort` the `transaction`. + +It is also important to notice that: + +- If the transaction is aborted then `commited offsets` has not effect +- Events are always sent regarless whether the transaction was aborted. The consumer is responsible to filter events + +## Testing + +## Example + +A full transaction example with `kstreams` can be found in [transaction example](https://github.com/kpn/kstreams/tree/master/examples/transactions) + +```plantuml +@startuml +!theme crt-amber + +!include https://raw.githubusercontent.com/plantuml-stdlib/C4-PlantUML/master/C4_Context.puml + +agent "Producer" as producer +agent "Stream A" as stream_a +agent "Stream B" as stream_b +queue "Topic A" as topic_a +queue "Topic B" as topic_b + +producer -> topic_a: "produce" +topic_a <-- stream_a: "consume" +stream_a -> topic_b: "produce with transaction" +topic_b <-- stream_b: "consume read_committed events" +@enduml +``` diff --git a/examples/transactions/README.md b/examples/transactions/README.md new file mode 100644 index 00000000..07afbb88 --- /dev/null +++ b/examples/transactions/README.md @@ -0,0 +1,103 @@ +# Kstreams Transaction example + +This example shows how to use `kafka transactions` with `kstreams`. For this purpose we have to setup a local kafka with 3 `brokers` and `kafka-ui` to explore +the `topics offsets` and `groups lag`. + +## Requirements + +`python 3.9+`, `poetry`, `docker-compose` + +### Usage + +First run the local cluster: + +```bash +./scripts/cluster/start +``` + +Second, you need to install the project dependencies dependencies. In a different terminal execute: + +```bash +poetry install +``` + +Then we can run the project + +```bash +poetry run app +``` + +You should see something similar to the following logs: + +```bash +kstreams/examples/transactions via 🐳 colima is 📦 v0.1.0 via 🐍 v3.12.4 +❯ poetry run app + +02/14/2025 04:40:40 PM Updating subscribed topics to: frozenset({'local--kstreams-transactional'}) +02/14/2025 04:40:40 PM Subscribed to topic(s): {'local--kstreams-transactional'} +02/14/2025 04:40:40 PM Updating subscribed topics to: frozenset({'local--kstreams-json'}) +02/14/2025 04:40:40 PM Subscribed to topic(s): {'local--kstreams-json'} +02/14/2025 04:40:40 PM Starting Prometheus Monitoring started... +02/14/2025 04:40:40 PM Discovered coordinator 1 for group my-group-json-data +02/14/2025 04:40:40 PM Revoking previously assigned partitions set() for group my-group-json-data +02/14/2025 04:40:40 PM (Re-)joining group my-group-json-data +02/14/2025 04:40:40 PM Discovered coordinator 2 for group my-group-raw-data +02/14/2025 04:40:40 PM Revoking previously assigned partitions set() for group my-group-raw-data +02/14/2025 04:40:40 PM (Re-)joining group my-group-raw-data +02/14/2025 04:40:40 PM Joined group 'my-group-json-data' (generation 23) with member_id aiokafka-0.12.0-1d2163ac-810d-4028-be96-0741e0589752 +02/14/2025 04:40:40 PM Elected group leader -- performing partition assignments using roundrobin +02/14/2025 04:40:40 PM Joined group 'my-group-raw-data' (generation 23) with member_id aiokafka-0.12.0-fce394e0-f09e-4bfb-8133-eef590c52d7a +02/14/2025 04:40:40 PM Elected group leader -- performing partition assignments using roundrobin +02/14/2025 04:40:40 PM Successfully synced group my-group-json-data with generation 23 +02/14/2025 04:40:40 PM Setting newly assigned partitions {TopicPartition(topic='local--kstreams-json', partition=0)} for group my-group-json-data +02/14/2025 04:40:40 PM Successfully synced group my-group-raw-data with generation 23 +02/14/2025 04:40:40 PM Setting newly assigned partitions {TopicPartition(topic='local--kstreams-transactional', partition=0)} for group my-group-raw-data +02/14/2025 04:40:43 PM Message sent to topic local--kstreams-json + + +02/14/2025 04:40:43 PM Json Event consumed with offset: 25, value: {'message': 'Hello world!'} + + +02/14/2025 04:40:43 PM Discovered coordinator 1 for transactional id my-transaction-id-b213d4a1-7ee8-43dd-a121-8d5305391997 +02/14/2025 04:40:43 PM Discovered coordinator 1 for group id my-group-json-data +02/14/2025 04:40:43 PM Commiting offsets {TopicPartition(topic='local--kstreams-json', partition=0): 26} for group my-group-json-data +02/14/2025 04:40:43 PM Message sent inside transaction with metadata: RecordMetadata(topic='local--kstreams-transactional', partition=0, topic_partition=TopicPartition(topic='local--kstreams-transactional', partition=0), offset=50, timestamp=1739547643468, timestamp_type=0, log_start_offset=0) +Error in transaction: None +02/14/2025 04:40:43 PM Transaction with id my-transaction-id-b213d4a1-7ee8-43dd-a121-8d5305391997 committed +02/14/2025 04:40:43 PM Event consumed from topic local--kstreams-transactional with value: b'Transaction id my-transaction-id-b213d4a1-7ee8-43dd-a121-8d5305391997 from argument in coroutine \n' + + +02/14/2025 04:40:46 PM Message sent to topic local--kstreams-json + + +02/14/2025 04:40:46 PM Json Event consumed with offset: 26, value: {'message': 'Hello world!'} + + +02/14/2025 04:40:46 PM Discovered coordinator 3 for transactional id my-transaction-id-26cd219e-cb2c-4c0f-93b1-34682c5a8a43 +02/14/2025 04:40:46 PM Discovered coordinator 1 for group id my-group-json-data +02/14/2025 04:40:46 PM Commiting offsets {TopicPartition(topic='local--kstreams-json', partition=0): 27} for group my-group-json-data +02/14/2025 04:40:46 PM Message sent inside transaction with metadata: RecordMetadata(topic='local--kstreams-transactional', partition=0, topic_partition=TopicPartition(topic='local--kstreams-transactional', partition=0), offset=52, timestamp=1739547646464, timestamp_type=0, log_start_offset=0) +Error in transaction: None +02/14/2025 04:40:46 PM Transaction with id my-transaction-id-26cd219e-cb2c-4c0f-93b1-34682c5a8a43 committed +02/14/2025 04:40:46 PM Event consumed from topic local--kstreams-transactional with value: b'Transaction id my-transaction-id-26cd219e-cb2c-4c0f-93b1-34682c5a8a43 from argument in coroutine \n' + + +02/14/2025 04:40:49 PM Message sent to topic local--kstreams-json + + +02/14/2025 04:40:49 PM Json Event consumed with offset: 27, value: {'message': 'Hello world!'} + + +02/14/2025 04:40:49 PM Discovered coordinator 2 for transactional id my-transaction-id-e2df5b6a-2fd9-4c14-ac98-d719ad627eb6 +02/14/2025 04:40:49 PM Discovered coordinator 1 for group id my-group-json-data +02/14/2025 04:40:49 PM Commiting offsets {TopicPartition(topic='local--kstreams-json', partition=0): 28} for group my-group-json-data +02/14/2025 04:40:49 PM Message sent inside transaction with metadata: RecordMetadata(topic='local--kstreams-transactional', partition=0, topic_partition=TopicPartition(topic='local--kstreams-transactional', partition=0), offset=54, timestamp=1739547649480, timestamp_type=0, log_start_offset=0) +Error in transaction: None +02/14/2025 04:40:49 PM Transaction with id my-transaction-id-e2df5b6a-2fd9-4c14-ac98-d719ad627eb6 committed +02/14/2025 04:40:49 PM Event consumed from topic local--kstreams-transactional with value: b'Transaction id my-transaction-id-e2df5b6a-2fd9-4c14-ac98-d719ad627eb6 from argument in coroutine \n' +``` + +## Note + +If you plan on using this example, pay attention to the `pyproject.toml` dependencies, where +`kstreams` is pointing to the parent folder. You will have to set the latest version. diff --git a/examples/transactions/docker-compose.yaml b/examples/transactions/docker-compose.yaml new file mode 100644 index 00000000..cd502512 --- /dev/null +++ b/examples/transactions/docker-compose.yaml @@ -0,0 +1,104 @@ +version: '3' +services: + zookeeper: + image: "confluentinc/cp-zookeeper:7.7.0" + hostname: zookeeper + container_name: kstream_zookeeper + ports: + - 32181:32181 + networks: + - kafka + environment: + - ZOOKEEPER_CLIENT_PORT=32181 + broker-1: + image: confluentinc/cp-kafka:7.7.0 + hostname: broker-1 + container_name: broker-1 + ports: + - 9091:9091 + depends_on: + - zookeeper + networks: + - kafka + environment: + - KAFKA_ZOOKEEPER_CONNECT=zookeeper:32181 + - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=2 + - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT + - KAFKA_ADVERTISED_LISTENERS=EXTERNAL://127.0.0.1:9091,INTERNAL://broker-1:9010 + - KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL + - KAFKA_BROKER_ID=1 + - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 + - CONFLUENT_METRICS_ENABLE=true + - CONFLUENT_SUPPORT_CUSTOMER_ID=anonymous + - KAFKA_AUTO_CREATE_TOPICS_ENABL="true" + - KAFKA_JMX_PORT=19101 + broker-2: + image: confluentinc/cp-kafka:7.7.0 + hostname: broker-2 + container_name: broker-2 + ports: + - 9092:9092 + depends_on: + - zookeeper + networks: + - kafka + environment: + - KAFKA_ZOOKEEPER_CONNECT=zookeeper:32181 + - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=2 + - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT + - KAFKA_ADVERTISED_LISTENERS=EXTERNAL://127.0.0.1:9092,INTERNAL://broker-2:9010 + - KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL + - KAFKA_BROKER_ID=2 + - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 + - CONFLUENT_METRICS_ENABLE=true + - CONFLUENT_SUPPORT_CUSTOMER_ID=anonymous + - KAFKA_AUTO_CREATE_TOPICS_ENABL="true" + - KAFKA_JMX_PORT=19102 + broker-3: + image: confluentinc/cp-kafka:7.7.0 + hostname: broker-3 + container_name: broker-3 + ports: + - 9093:9093 + depends_on: + - zookeeper + networks: + - kafka + environment: + - KAFKA_ZOOKEEPER_CONNECT=zookeeper:32181 + - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=2 + - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT + - KAFKA_ADVERTISED_LISTENERS=EXTERNAL://127.0.0.1:9093,INTERNAL://broker-3:9010 + - KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL + - KAFKA_BROKER_ID=3 + - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 + - CONFLUENT_METRICS_ENABLE=true + - CONFLUENT_SUPPORT_CUSTOMER_ID=anonymous + - KAFKA_AUTO_CREATE_TOPICS_ENABL="true" + - KAFKA_JMX_PORT=19103 + kafka-ui: + container_name: kafka-ui + image: kafbat/kafka-ui:latest + ports: + - 8080:8080 + depends_on: + - broker-1 + - broker-2 + - broker-3 + environment: + KAFKA_CLUSTERS_0_NAME: broker-1 + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: broker-1:9010 + KAFKA_CLUSTERS_0_METRICS_PORT: 19101 + KAFKA_CLUSTERS_1_NAME: broker-2 + KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: broker-2:9010 + KAFKA_CLUSTERS_1_METRICS_PORT: 19102 + KAFKA_CLUSTERS_2_NAME: broker-3 + KAFKA_CLUSTERS_2_BOOTSTRAPSERVERS: broker-3:9010 + KAFKA_CLUSTERS_2_METRICS_PORT: 19103 + DYNAMIC_CONFIG_ENABLED: 'true' + networks: + - kafka + +networks: + kafka: + name: kafka diff --git a/examples/transactions/poetry.lock b/examples/transactions/poetry.lock new file mode 100644 index 00000000..38cc6c52 --- /dev/null +++ b/examples/transactions/poetry.lock @@ -0,0 +1,355 @@ +# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. + +[[package]] +name = "aiokafka" +version = "0.12.0" +description = "Kafka integration with asyncio" +optional = false +python-versions = ">=3.9" +files = [ + {file = "aiokafka-0.12.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:da8938eac2153ca767ac0144283b3df7e74bb4c0abc0c9a722f3ae63cfbf3a42"}, + {file = "aiokafka-0.12.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:a5c827c8883cfe64bc49100de82862225714e1853432df69aba99f135969bb1b"}, + {file = "aiokafka-0.12.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:bea5710f7707ed12a7f8661ab38dfa80f5253a405de5ba228f457cc30404eb51"}, + {file = "aiokafka-0.12.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d87b1a45c57bbb1c17d1900a74739eada27e4f4a0b0932ab3c5a8cbae8bbfe1e"}, + {file = "aiokafka-0.12.0-cp310-cp310-win32.whl", hash = "sha256:1158e630664d9abc74d8a7673bc70dc10737ff758e1457bebc1c05890f29ce2c"}, + {file = "aiokafka-0.12.0-cp310-cp310-win_amd64.whl", hash = "sha256:06f5889acf8e1a81d6e14adf035acb29afd1f5836447fa8fa23d3cbe8f7e8608"}, + {file = "aiokafka-0.12.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:ddc5308c43d48af883667e2f950a0a9739ce2c9bfe69a0b55dc234f58b1b42d6"}, + {file = "aiokafka-0.12.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:ff63689cafcd6dd642a15de75b7ae121071d6162cccba16d091bcb28b3886307"}, + {file = "aiokafka-0.12.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:24633931e05a9dc80555a2f845572b6845d2dcb1af12de27837b8602b1b8bc74"}, + {file = "aiokafka-0.12.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:42b2436c7c69384d210e9169fbfe339d9f49dbdcfddd8d51c79b9877de545e33"}, + {file = "aiokafka-0.12.0-cp311-cp311-win32.whl", hash = "sha256:90511a2c4cf5f343fc2190575041fbc70171654ab0dae64b3bbabd012613bfa7"}, + {file = "aiokafka-0.12.0-cp311-cp311-win_amd64.whl", hash = "sha256:04c8ad27d04d6c53a1859687015a5f4e58b1eb221e8a7342d6c6b04430def53e"}, + {file = "aiokafka-0.12.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:b01947553ff1120fa1cb1a05f2c3e5aa47a5378c720bafd09e6630ba18af02aa"}, + {file = "aiokafka-0.12.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:e3c8ec1c0606fa645462c7353dc3e4119cade20c4656efa2031682ffaad361c0"}, + {file = "aiokafka-0.12.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:577c1c48b240e9eba57b3d2d806fb3d023a575334fc3953f063179170cc8964f"}, + {file = "aiokafka-0.12.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d7b815b2e5fed9912f1231be6196547a367b9eb3380b487ff5942f0c73a3fb5c"}, + {file = "aiokafka-0.12.0-cp312-cp312-win32.whl", hash = "sha256:5a907abcdf02430df0829ac80f25b8bb849630300fa01365c76e0ae49306f512"}, + {file = "aiokafka-0.12.0-cp312-cp312-win_amd64.whl", hash = "sha256:fdbd69ec70eea4a8dfaa5c35ff4852e90e1277fcc426b9380f0b499b77f13b16"}, + {file = "aiokafka-0.12.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:f9e8ab97b935ca681a5f28cf22cf2b5112be86728876b3ec07e4ed5fc6c21f2d"}, + {file = "aiokafka-0.12.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:ed991c120fe19fd9439f564201dd746c4839700ef270dd4c3ee6d4895f64fe83"}, + {file = "aiokafka-0.12.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2c01abf9787b1c3f3af779ad8e76d5b74903f590593bc26f33ed48750503e7f7"}, + {file = "aiokafka-0.12.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:08c84b3894d97fd02fcc8886f394000d0f5ce771fab5c498ea2b0dd2f6b46d5b"}, + {file = "aiokafka-0.12.0-cp313-cp313-win32.whl", hash = "sha256:63875fed922c8c7cf470d9b2a82e1b76b4a1baf2ae62e07486cf516fd09ff8f2"}, + {file = "aiokafka-0.12.0-cp313-cp313-win_amd64.whl", hash = "sha256:bdc0a83eb386d2384325d6571f8ef65b4cfa205f8d1c16d7863e8d10cacd995a"}, + {file = "aiokafka-0.12.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:a9590554fae68ec80099beae5366f2494130535a1a3db0c4fa5ccb08f37f6e46"}, + {file = "aiokafka-0.12.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:6c77f5953ff4b25c889aef26df1f28df66c58db7abb7f34ecbe48502e9a6d273"}, + {file = "aiokafka-0.12.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f96d7fd8fdb5f439f7e7860fd8ec37870265d0578475e82049bce60ab07ca045"}, + {file = "aiokafka-0.12.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b8ddff02b1e981083dff6d1a80d4502e0e83e0e480faf1f881766ca6f23e8d22"}, + {file = "aiokafka-0.12.0-cp39-cp39-win32.whl", hash = "sha256:4aab2767dcc8923626d8d60c314f9ba633563249cff71750db5d70b6ec813da2"}, + {file = "aiokafka-0.12.0-cp39-cp39-win_amd64.whl", hash = "sha256:7a57fda053acd1b88c87803ad0381a1d2a29d36ec561550d11ce9154972b8e23"}, + {file = "aiokafka-0.12.0.tar.gz", hash = "sha256:62423895b866f95b5ed8d88335295a37cc5403af64cb7cb0e234f88adc2dff94"}, +] + +[package.dependencies] +async-timeout = "*" +packaging = "*" +typing-extensions = ">=4.10.0" + +[package.extras] +all = ["cramjam (>=2.8.0)", "gssapi"] +gssapi = ["gssapi"] +lz4 = ["cramjam (>=2.8.0)"] +snappy = ["cramjam"] +zstd = ["cramjam"] + +[[package]] +name = "aiorun" +version = "2025.1.1" +description = "Boilerplate for asyncio applications" +optional = false +python-versions = ">=3.7" +files = [ + {file = "aiorun-2025.1.1-py3-none-any.whl", hash = "sha256:46d6fa7ac4bfe93ff8385fa17941e4dbe0452d0353497196be25b000571fe3e1"}, + {file = "aiorun-2025.1.1.tar.gz", hash = "sha256:86d1075a034ce2671ab532db06e9204fe784cdd0c66ca7b8cc47a7527d0d50a3"}, +] + +[package.extras] +dev = ["pytest", "pytest-cov"] + +[[package]] +name = "annotated-types" +version = "0.7.0" +description = "Reusable constraint types to use with typing.Annotated" +optional = false +python-versions = ">=3.8" +files = [ + {file = "annotated_types-0.7.0-py3-none-any.whl", hash = "sha256:1f02e8b43a8fbbc3f3e0d4f0f4bfc8131bcb4eebe8849b8e5c773f3a1c582a53"}, + {file = "annotated_types-0.7.0.tar.gz", hash = "sha256:aff07c09a53a08bc8cfccb9c85b05f1aa9a2a6f23728d790723543408344ce89"}, +] + +[[package]] +name = "async-timeout" +version = "5.0.1" +description = "Timeout context manager for asyncio programs" +optional = false +python-versions = ">=3.8" +files = [ + {file = "async_timeout-5.0.1-py3-none-any.whl", hash = "sha256:39e3809566ff85354557ec2398b55e096c8364bacac9405a7a1fa429e77fe76c"}, + {file = "async_timeout-5.0.1.tar.gz", hash = "sha256:d9321a7a3d5a6a5e187e824d2fa0793ce379a202935782d555d6e9d2735677d3"}, +] + +[[package]] +name = "future" +version = "1.0.0" +description = "Clean single-source support for Python 3 and 2" +optional = false +python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" +files = [ + {file = "future-1.0.0-py3-none-any.whl", hash = "sha256:929292d34f5872e70396626ef385ec22355a1fae8ad29e1a734c3e43f9fbc216"}, + {file = "future-1.0.0.tar.gz", hash = "sha256:bd2968309307861edae1458a4f8a4f3598c03be43b97521076aebf5d94c07b05"}, +] + +[[package]] +name = "kstreams" +version = "0.26.6" +description = "Build simple kafka streams applications" +optional = false +python-versions = "^3.9" +files = [] +develop = true + +[package.dependencies] +aiokafka = "<1.0" +future = "^1.0.0" +prometheus-client = "<1.0" +pydantic = ">=2.0.0,<3.0.0" +PyYAML = ">=5.4,<7.0.0" + +[package.source] +type = "directory" +url = "../.." + +[[package]] +name = "packaging" +version = "24.2" +description = "Core utilities for Python packages" +optional = false +python-versions = ">=3.8" +files = [ + {file = "packaging-24.2-py3-none-any.whl", hash = "sha256:09abb1bccd265c01f4a3aa3f7a7db064b36514d2cba19a2f694fe6150451a759"}, + {file = "packaging-24.2.tar.gz", hash = "sha256:c228a6dc5e932d346bc5739379109d49e8853dd8223571c7c5b55260edc0b97f"}, +] + +[[package]] +name = "prometheus-client" +version = "0.21.1" +description = "Python client for the Prometheus monitoring system." +optional = false +python-versions = ">=3.8" +files = [ + {file = "prometheus_client-0.21.1-py3-none-any.whl", hash = "sha256:594b45c410d6f4f8888940fe80b5cc2521b305a1fafe1c58609ef715a001f301"}, + {file = "prometheus_client-0.21.1.tar.gz", hash = "sha256:252505a722ac04b0456be05c05f75f45d760c2911ffc45f2a06bcaed9f3ae3fb"}, +] + +[package.extras] +twisted = ["twisted"] + +[[package]] +name = "pydantic" +version = "2.10.6" +description = "Data validation using Python type hints" +optional = false +python-versions = ">=3.8" +files = [ + {file = "pydantic-2.10.6-py3-none-any.whl", hash = "sha256:427d664bf0b8a2b34ff5dd0f5a18df00591adcee7198fbd71981054cef37b584"}, + {file = "pydantic-2.10.6.tar.gz", hash = "sha256:ca5daa827cce33de7a42be142548b0096bf05a7e7b365aebfa5f8eeec7128236"}, +] + +[package.dependencies] +annotated-types = ">=0.6.0" +pydantic-core = "2.27.2" +typing-extensions = ">=4.12.2" + +[package.extras] +email = ["email-validator (>=2.0.0)"] +timezone = ["tzdata"] + +[[package]] +name = "pydantic-core" +version = "2.27.2" +description = "Core functionality for Pydantic validation and serialization" +optional = false +python-versions = ">=3.8" +files = [ + {file = "pydantic_core-2.27.2-cp310-cp310-macosx_10_12_x86_64.whl", hash = "sha256:2d367ca20b2f14095a8f4fa1210f5a7b78b8a20009ecced6b12818f455b1e9fa"}, + {file = "pydantic_core-2.27.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:491a2b73db93fab69731eaee494f320faa4e093dbed776be1a829c2eb222c34c"}, + {file = "pydantic_core-2.27.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:7969e133a6f183be60e9f6f56bfae753585680f3b7307a8e555a948d443cc05a"}, + {file = "pydantic_core-2.27.2-cp310-cp310-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:3de9961f2a346257caf0aa508a4da705467f53778e9ef6fe744c038119737ef5"}, + {file = "pydantic_core-2.27.2-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:e2bb4d3e5873c37bb3dd58714d4cd0b0e6238cebc4177ac8fe878f8b3aa8e74c"}, + {file = "pydantic_core-2.27.2-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:280d219beebb0752699480fe8f1dc61ab6615c2046d76b7ab7ee38858de0a4e7"}, + {file = "pydantic_core-2.27.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:47956ae78b6422cbd46f772f1746799cbb862de838fd8d1fbd34a82e05b0983a"}, + {file = "pydantic_core-2.27.2-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:14d4a5c49d2f009d62a2a7140d3064f686d17a5d1a268bc641954ba181880236"}, + {file = "pydantic_core-2.27.2-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:337b443af21d488716f8d0b6164de833e788aa6bd7e3a39c005febc1284f4962"}, + {file = "pydantic_core-2.27.2-cp310-cp310-musllinux_1_1_armv7l.whl", hash = "sha256:03d0f86ea3184a12f41a2d23f7ccb79cdb5a18e06993f8a45baa8dfec746f0e9"}, + {file = "pydantic_core-2.27.2-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:7041c36f5680c6e0f08d922aed302e98b3745d97fe1589db0a3eebf6624523af"}, + {file = "pydantic_core-2.27.2-cp310-cp310-win32.whl", hash = "sha256:50a68f3e3819077be2c98110c1f9dcb3817e93f267ba80a2c05bb4f8799e2ff4"}, + {file = "pydantic_core-2.27.2-cp310-cp310-win_amd64.whl", hash = "sha256:e0fd26b16394ead34a424eecf8a31a1f5137094cabe84a1bcb10fa6ba39d3d31"}, + {file = "pydantic_core-2.27.2-cp311-cp311-macosx_10_12_x86_64.whl", hash = "sha256:8e10c99ef58cfdf2a66fc15d66b16c4a04f62bca39db589ae8cba08bc55331bc"}, + {file = "pydantic_core-2.27.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:26f32e0adf166a84d0cb63be85c562ca8a6fa8de28e5f0d92250c6b7e9e2aff7"}, + {file = "pydantic_core-2.27.2-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8c19d1ea0673cd13cc2f872f6c9ab42acc4e4f492a7ca9d3795ce2b112dd7e15"}, + {file = "pydantic_core-2.27.2-cp311-cp311-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:5e68c4446fe0810e959cdff46ab0a41ce2f2c86d227d96dc3847af0ba7def306"}, + {file = "pydantic_core-2.27.2-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:d9640b0059ff4f14d1f37321b94061c6db164fbe49b334b31643e0528d100d99"}, + {file = "pydantic_core-2.27.2-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:40d02e7d45c9f8af700f3452f329ead92da4c5f4317ca9b896de7ce7199ea459"}, + {file = "pydantic_core-2.27.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1c1fd185014191700554795c99b347d64f2bb637966c4cfc16998a0ca700d048"}, + {file = "pydantic_core-2.27.2-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:d81d2068e1c1228a565af076598f9e7451712700b673de8f502f0334f281387d"}, + {file = "pydantic_core-2.27.2-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:1a4207639fb02ec2dbb76227d7c751a20b1a6b4bc52850568e52260cae64ca3b"}, + {file = "pydantic_core-2.27.2-cp311-cp311-musllinux_1_1_armv7l.whl", hash = "sha256:3de3ce3c9ddc8bbd88f6e0e304dea0e66d843ec9de1b0042b0911c1663ffd474"}, + {file = "pydantic_core-2.27.2-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:30c5f68ded0c36466acede341551106821043e9afaad516adfb6e8fa80a4e6a6"}, + {file = "pydantic_core-2.27.2-cp311-cp311-win32.whl", hash = "sha256:c70c26d2c99f78b125a3459f8afe1aed4d9687c24fd677c6a4436bc042e50d6c"}, + {file = "pydantic_core-2.27.2-cp311-cp311-win_amd64.whl", hash = "sha256:08e125dbdc505fa69ca7d9c499639ab6407cfa909214d500897d02afb816e7cc"}, + {file = "pydantic_core-2.27.2-cp311-cp311-win_arm64.whl", hash = "sha256:26f0d68d4b235a2bae0c3fc585c585b4ecc51382db0e3ba402a22cbc440915e4"}, + {file = "pydantic_core-2.27.2-cp312-cp312-macosx_10_12_x86_64.whl", hash = "sha256:9e0c8cfefa0ef83b4da9588448b6d8d2a2bf1a53c3f1ae5fca39eb3061e2f0b0"}, + {file = "pydantic_core-2.27.2-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:83097677b8e3bd7eaa6775720ec8e0405f1575015a463285a92bfdfe254529ef"}, + {file = "pydantic_core-2.27.2-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:172fce187655fece0c90d90a678424b013f8fbb0ca8b036ac266749c09438cb7"}, + {file = "pydantic_core-2.27.2-cp312-cp312-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:519f29f5213271eeeeb3093f662ba2fd512b91c5f188f3bb7b27bc5973816934"}, + {file = "pydantic_core-2.27.2-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:05e3a55d124407fffba0dd6b0c0cd056d10e983ceb4e5dbd10dda135c31071d6"}, + {file = "pydantic_core-2.27.2-cp312-cp312-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:9c3ed807c7b91de05e63930188f19e921d1fe90de6b4f5cd43ee7fcc3525cb8c"}, + {file = "pydantic_core-2.27.2-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6fb4aadc0b9a0c063206846d603b92030eb6f03069151a625667f982887153e2"}, + {file = "pydantic_core-2.27.2-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:28ccb213807e037460326424ceb8b5245acb88f32f3d2777427476e1b32c48c4"}, + {file = "pydantic_core-2.27.2-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:de3cd1899e2c279b140adde9357c4495ed9d47131b4a4eaff9052f23398076b3"}, + {file = "pydantic_core-2.27.2-cp312-cp312-musllinux_1_1_armv7l.whl", hash = "sha256:220f892729375e2d736b97d0e51466252ad84c51857d4d15f5e9692f9ef12be4"}, + {file = "pydantic_core-2.27.2-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:a0fcd29cd6b4e74fe8ddd2c90330fd8edf2e30cb52acda47f06dd615ae72da57"}, + {file = "pydantic_core-2.27.2-cp312-cp312-win32.whl", hash = "sha256:1e2cb691ed9834cd6a8be61228471d0a503731abfb42f82458ff27be7b2186fc"}, + {file = "pydantic_core-2.27.2-cp312-cp312-win_amd64.whl", hash = "sha256:cc3f1a99a4f4f9dd1de4fe0312c114e740b5ddead65bb4102884b384c15d8bc9"}, + {file = "pydantic_core-2.27.2-cp312-cp312-win_arm64.whl", hash = "sha256:3911ac9284cd8a1792d3cb26a2da18f3ca26c6908cc434a18f730dc0db7bfa3b"}, + {file = "pydantic_core-2.27.2-cp313-cp313-macosx_10_12_x86_64.whl", hash = "sha256:7d14bd329640e63852364c306f4d23eb744e0f8193148d4044dd3dacdaacbd8b"}, + {file = "pydantic_core-2.27.2-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:82f91663004eb8ed30ff478d77c4d1179b3563df6cdb15c0817cd1cdaf34d154"}, + {file = "pydantic_core-2.27.2-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:71b24c7d61131bb83df10cc7e687433609963a944ccf45190cfc21e0887b08c9"}, + {file = "pydantic_core-2.27.2-cp313-cp313-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:fa8e459d4954f608fa26116118bb67f56b93b209c39b008277ace29937453dc9"}, + {file = "pydantic_core-2.27.2-cp313-cp313-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:ce8918cbebc8da707ba805b7fd0b382816858728ae7fe19a942080c24e5b7cd1"}, + {file = "pydantic_core-2.27.2-cp313-cp313-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:eda3f5c2a021bbc5d976107bb302e0131351c2ba54343f8a496dc8783d3d3a6a"}, + {file = "pydantic_core-2.27.2-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bd8086fa684c4775c27f03f062cbb9eaa6e17f064307e86b21b9e0abc9c0f02e"}, + {file = "pydantic_core-2.27.2-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:8d9b3388db186ba0c099a6d20f0604a44eabdeef1777ddd94786cdae158729e4"}, + {file = "pydantic_core-2.27.2-cp313-cp313-musllinux_1_1_aarch64.whl", hash = "sha256:7a66efda2387de898c8f38c0cf7f14fca0b51a8ef0b24bfea5849f1b3c95af27"}, + {file = "pydantic_core-2.27.2-cp313-cp313-musllinux_1_1_armv7l.whl", hash = "sha256:18a101c168e4e092ab40dbc2503bdc0f62010e95d292b27827871dc85450d7ee"}, + {file = "pydantic_core-2.27.2-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:ba5dd002f88b78a4215ed2f8ddbdf85e8513382820ba15ad5ad8955ce0ca19a1"}, + {file = "pydantic_core-2.27.2-cp313-cp313-win32.whl", hash = "sha256:1ebaf1d0481914d004a573394f4be3a7616334be70261007e47c2a6fe7e50130"}, + {file = "pydantic_core-2.27.2-cp313-cp313-win_amd64.whl", hash = "sha256:953101387ecf2f5652883208769a79e48db18c6df442568a0b5ccd8c2723abee"}, + {file = "pydantic_core-2.27.2-cp313-cp313-win_arm64.whl", hash = "sha256:ac4dbfd1691affb8f48c2c13241a2e3b60ff23247cbcf981759c768b6633cf8b"}, + {file = "pydantic_core-2.27.2-cp38-cp38-macosx_10_12_x86_64.whl", hash = "sha256:d3e8d504bdd3f10835468f29008d72fc8359d95c9c415ce6e767203db6127506"}, + {file = "pydantic_core-2.27.2-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:521eb9b7f036c9b6187f0b47318ab0d7ca14bd87f776240b90b21c1f4f149320"}, + {file = "pydantic_core-2.27.2-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:85210c4d99a0114f5a9481b44560d7d1e35e32cc5634c656bc48e590b669b145"}, + {file = "pydantic_core-2.27.2-cp38-cp38-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:d716e2e30c6f140d7560ef1538953a5cd1a87264c737643d481f2779fc247fe1"}, + {file = "pydantic_core-2.27.2-cp38-cp38-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:f66d89ba397d92f840f8654756196d93804278457b5fbede59598a1f9f90b228"}, + {file = "pydantic_core-2.27.2-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:669e193c1c576a58f132e3158f9dfa9662969edb1a250c54d8fa52590045f046"}, + {file = "pydantic_core-2.27.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9fdbe7629b996647b99c01b37f11170a57ae675375b14b8c13b8518b8320ced5"}, + {file = "pydantic_core-2.27.2-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:d262606bf386a5ba0b0af3b97f37c83d7011439e3dc1a9298f21efb292e42f1a"}, + {file = "pydantic_core-2.27.2-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:cabb9bcb7e0d97f74df8646f34fc76fbf793b7f6dc2438517d7a9e50eee4f14d"}, + {file = "pydantic_core-2.27.2-cp38-cp38-musllinux_1_1_armv7l.whl", hash = "sha256:d2d63f1215638d28221f664596b1ccb3944f6e25dd18cd3b86b0a4c408d5ebb9"}, + {file = "pydantic_core-2.27.2-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:bca101c00bff0adb45a833f8451b9105d9df18accb8743b08107d7ada14bd7da"}, + {file = "pydantic_core-2.27.2-cp38-cp38-win32.whl", hash = "sha256:f6f8e111843bbb0dee4cb6594cdc73e79b3329b526037ec242a3e49012495b3b"}, + {file = "pydantic_core-2.27.2-cp38-cp38-win_amd64.whl", hash = "sha256:fd1aea04935a508f62e0d0ef1f5ae968774a32afc306fb8545e06f5ff5cdf3ad"}, + {file = "pydantic_core-2.27.2-cp39-cp39-macosx_10_12_x86_64.whl", hash = "sha256:c10eb4f1659290b523af58fa7cffb452a61ad6ae5613404519aee4bfbf1df993"}, + {file = "pydantic_core-2.27.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:ef592d4bad47296fb11f96cd7dc898b92e795032b4894dfb4076cfccd43a9308"}, + {file = "pydantic_core-2.27.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c61709a844acc6bf0b7dce7daae75195a10aac96a596ea1b776996414791ede4"}, + {file = "pydantic_core-2.27.2-cp39-cp39-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:42c5f762659e47fdb7b16956c71598292f60a03aa92f8b6351504359dbdba6cf"}, + {file = "pydantic_core-2.27.2-cp39-cp39-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:4c9775e339e42e79ec99c441d9730fccf07414af63eac2f0e48e08fd38a64d76"}, + {file = "pydantic_core-2.27.2-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:57762139821c31847cfb2df63c12f725788bd9f04bc2fb392790959b8f70f118"}, + {file = "pydantic_core-2.27.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0d1e85068e818c73e048fe28cfc769040bb1f475524f4745a5dc621f75ac7630"}, + {file = "pydantic_core-2.27.2-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:097830ed52fd9e427942ff3b9bc17fab52913b2f50f2880dc4a5611446606a54"}, + {file = "pydantic_core-2.27.2-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:044a50963a614ecfae59bb1eaf7ea7efc4bc62f49ed594e18fa1e5d953c40e9f"}, + {file = "pydantic_core-2.27.2-cp39-cp39-musllinux_1_1_armv7l.whl", hash = "sha256:4e0b4220ba5b40d727c7f879eac379b822eee5d8fff418e9d3381ee45b3b0362"}, + {file = "pydantic_core-2.27.2-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:5e4f4bb20d75e9325cc9696c6802657b58bc1dbbe3022f32cc2b2b632c3fbb96"}, + {file = "pydantic_core-2.27.2-cp39-cp39-win32.whl", hash = "sha256:cca63613e90d001b9f2f9a9ceb276c308bfa2a43fafb75c8031c4f66039e8c6e"}, + {file = "pydantic_core-2.27.2-cp39-cp39-win_amd64.whl", hash = "sha256:77d1bca19b0f7021b3a982e6f903dcd5b2b06076def36a652e3907f596e29f67"}, + {file = "pydantic_core-2.27.2-pp310-pypy310_pp73-macosx_10_12_x86_64.whl", hash = "sha256:2bf14caea37e91198329b828eae1618c068dfb8ef17bb33287a7ad4b61ac314e"}, + {file = "pydantic_core-2.27.2-pp310-pypy310_pp73-macosx_11_0_arm64.whl", hash = "sha256:b0cb791f5b45307caae8810c2023a184c74605ec3bcbb67d13846c28ff731ff8"}, + {file = "pydantic_core-2.27.2-pp310-pypy310_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:688d3fd9fcb71f41c4c015c023d12a79d1c4c0732ec9eb35d96e3388a120dcf3"}, + {file = "pydantic_core-2.27.2-pp310-pypy310_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3d591580c34f4d731592f0e9fe40f9cc1b430d297eecc70b962e93c5c668f15f"}, + {file = "pydantic_core-2.27.2-pp310-pypy310_pp73-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:82f986faf4e644ffc189a7f1aafc86e46ef70372bb153e7001e8afccc6e54133"}, + {file = "pydantic_core-2.27.2-pp310-pypy310_pp73-musllinux_1_1_aarch64.whl", hash = "sha256:bec317a27290e2537f922639cafd54990551725fc844249e64c523301d0822fc"}, + {file = "pydantic_core-2.27.2-pp310-pypy310_pp73-musllinux_1_1_armv7l.whl", hash = "sha256:0296abcb83a797db256b773f45773da397da75a08f5fcaef41f2044adec05f50"}, + {file = "pydantic_core-2.27.2-pp310-pypy310_pp73-musllinux_1_1_x86_64.whl", hash = "sha256:0d75070718e369e452075a6017fbf187f788e17ed67a3abd47fa934d001863d9"}, + {file = "pydantic_core-2.27.2-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:7e17b560be3c98a8e3aa66ce828bdebb9e9ac6ad5466fba92eb74c4c95cb1151"}, + {file = "pydantic_core-2.27.2-pp39-pypy39_pp73-macosx_10_12_x86_64.whl", hash = "sha256:c33939a82924da9ed65dab5a65d427205a73181d8098e79b6b426bdf8ad4e656"}, + {file = "pydantic_core-2.27.2-pp39-pypy39_pp73-macosx_11_0_arm64.whl", hash = "sha256:00bad2484fa6bda1e216e7345a798bd37c68fb2d97558edd584942aa41b7d278"}, + {file = "pydantic_core-2.27.2-pp39-pypy39_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c817e2b40aba42bac6f457498dacabc568c3b7a986fc9ba7c8d9d260b71485fb"}, + {file = "pydantic_core-2.27.2-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:251136cdad0cb722e93732cb45ca5299fb56e1344a833640bf93b2803f8d1bfd"}, + {file = "pydantic_core-2.27.2-pp39-pypy39_pp73-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:d2088237af596f0a524d3afc39ab3b036e8adb054ee57cbb1dcf8e09da5b29cc"}, + {file = "pydantic_core-2.27.2-pp39-pypy39_pp73-musllinux_1_1_aarch64.whl", hash = "sha256:d4041c0b966a84b4ae7a09832eb691a35aec90910cd2dbe7a208de59be77965b"}, + {file = "pydantic_core-2.27.2-pp39-pypy39_pp73-musllinux_1_1_armv7l.whl", hash = "sha256:8083d4e875ebe0b864ffef72a4304827015cff328a1be6e22cc850753bfb122b"}, + {file = "pydantic_core-2.27.2-pp39-pypy39_pp73-musllinux_1_1_x86_64.whl", hash = "sha256:f141ee28a0ad2123b6611b6ceff018039df17f32ada8b534e6aa039545a3efb2"}, + {file = "pydantic_core-2.27.2-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:7d0c8399fcc1848491f00e0314bd59fb34a9c008761bcb422a057670c3f65e35"}, + {file = "pydantic_core-2.27.2.tar.gz", hash = "sha256:eb026e5a4c1fee05726072337ff51d1efb6f59090b7da90d30ea58625b1ffb39"}, +] + +[package.dependencies] +typing-extensions = ">=4.6.0,<4.7.0 || >4.7.0" + +[[package]] +name = "pyyaml" +version = "6.0.2" +description = "YAML parser and emitter for Python" +optional = false +python-versions = ">=3.8" +files = [ + {file = "PyYAML-6.0.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:0a9a2848a5b7feac301353437eb7d5957887edbf81d56e903999a75a3d743086"}, + {file = "PyYAML-6.0.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:29717114e51c84ddfba879543fb232a6ed60086602313ca38cce623c1d62cfbf"}, + {file = "PyYAML-6.0.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8824b5a04a04a047e72eea5cec3bc266db09e35de6bdfe34c9436ac5ee27d237"}, + {file = "PyYAML-6.0.2-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:7c36280e6fb8385e520936c3cb3b8042851904eba0e58d277dca80a5cfed590b"}, + {file = "PyYAML-6.0.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ec031d5d2feb36d1d1a24380e4db6d43695f3748343d99434e6f5f9156aaa2ed"}, + {file = "PyYAML-6.0.2-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:936d68689298c36b53b29f23c6dbb74de12b4ac12ca6cfe0e047bedceea56180"}, + {file = "PyYAML-6.0.2-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:23502f431948090f597378482b4812b0caae32c22213aecf3b55325e049a6c68"}, + {file = "PyYAML-6.0.2-cp310-cp310-win32.whl", hash = "sha256:2e99c6826ffa974fe6e27cdb5ed0021786b03fc98e5ee3c5bfe1fd5015f42b99"}, + {file = "PyYAML-6.0.2-cp310-cp310-win_amd64.whl", hash = "sha256:a4d3091415f010369ae4ed1fc6b79def9416358877534caf6a0fdd2146c87a3e"}, + {file = "PyYAML-6.0.2-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:cc1c1159b3d456576af7a3e4d1ba7e6924cb39de8f67111c735f6fc832082774"}, + {file = "PyYAML-6.0.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:1e2120ef853f59c7419231f3bf4e7021f1b936f6ebd222406c3b60212205d2ee"}, + {file = "PyYAML-6.0.2-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5d225db5a45f21e78dd9358e58a98702a0302f2659a3c6cd320564b75b86f47c"}, + {file = "PyYAML-6.0.2-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:5ac9328ec4831237bec75defaf839f7d4564be1e6b25ac710bd1a96321cc8317"}, + {file = "PyYAML-6.0.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3ad2a3decf9aaba3d29c8f537ac4b243e36bef957511b4766cb0057d32b0be85"}, + {file = "PyYAML-6.0.2-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:ff3824dc5261f50c9b0dfb3be22b4567a6f938ccce4587b38952d85fd9e9afe4"}, + {file = "PyYAML-6.0.2-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:797b4f722ffa07cc8d62053e4cff1486fa6dc094105d13fea7b1de7d8bf71c9e"}, + {file = "PyYAML-6.0.2-cp311-cp311-win32.whl", hash = "sha256:11d8f3dd2b9c1207dcaf2ee0bbbfd5991f571186ec9cc78427ba5bd32afae4b5"}, + {file = "PyYAML-6.0.2-cp311-cp311-win_amd64.whl", hash = "sha256:e10ce637b18caea04431ce14fabcf5c64a1c61ec9c56b071a4b7ca131ca52d44"}, + {file = "PyYAML-6.0.2-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:c70c95198c015b85feafc136515252a261a84561b7b1d51e3384e0655ddf25ab"}, + {file = "PyYAML-6.0.2-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:ce826d6ef20b1bc864f0a68340c8b3287705cae2f8b4b1d932177dcc76721725"}, + {file = "PyYAML-6.0.2-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1f71ea527786de97d1a0cc0eacd1defc0985dcf6b3f17bb77dcfc8c34bec4dc5"}, + {file = "PyYAML-6.0.2-cp312-cp312-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:9b22676e8097e9e22e36d6b7bda33190d0d400f345f23d4065d48f4ca7ae0425"}, + {file = "PyYAML-6.0.2-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:80bab7bfc629882493af4aa31a4cfa43a4c57c83813253626916b8c7ada83476"}, + {file = "PyYAML-6.0.2-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:0833f8694549e586547b576dcfaba4a6b55b9e96098b36cdc7ebefe667dfed48"}, + {file = "PyYAML-6.0.2-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8b9c7197f7cb2738065c481a0461e50ad02f18c78cd75775628afb4d7137fb3b"}, + {file = "PyYAML-6.0.2-cp312-cp312-win32.whl", hash = "sha256:ef6107725bd54b262d6dedcc2af448a266975032bc85ef0172c5f059da6325b4"}, + {file = "PyYAML-6.0.2-cp312-cp312-win_amd64.whl", hash = "sha256:7e7401d0de89a9a855c839bc697c079a4af81cf878373abd7dc625847d25cbd8"}, + {file = "PyYAML-6.0.2-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:efdca5630322a10774e8e98e1af481aad470dd62c3170801852d752aa7a783ba"}, + {file = "PyYAML-6.0.2-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:50187695423ffe49e2deacb8cd10510bc361faac997de9efef88badc3bb9e2d1"}, + {file = "PyYAML-6.0.2-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0ffe8360bab4910ef1b9e87fb812d8bc0a308b0d0eef8c8f44e0254ab3b07133"}, + {file = "PyYAML-6.0.2-cp313-cp313-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:17e311b6c678207928d649faa7cb0d7b4c26a0ba73d41e99c4fff6b6c3276484"}, + {file = "PyYAML-6.0.2-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:70b189594dbe54f75ab3a1acec5f1e3faa7e8cf2f1e08d9b561cb41b845f69d5"}, + {file = "PyYAML-6.0.2-cp313-cp313-musllinux_1_1_aarch64.whl", hash = "sha256:41e4e3953a79407c794916fa277a82531dd93aad34e29c2a514c2c0c5fe971cc"}, + {file = "PyYAML-6.0.2-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:68ccc6023a3400877818152ad9a1033e3db8625d899c72eacb5a668902e4d652"}, + {file = "PyYAML-6.0.2-cp313-cp313-win32.whl", hash = "sha256:bc2fa7c6b47d6bc618dd7fb02ef6fdedb1090ec036abab80d4681424b84c1183"}, + {file = "PyYAML-6.0.2-cp313-cp313-win_amd64.whl", hash = "sha256:8388ee1976c416731879ac16da0aff3f63b286ffdd57cdeb95f3f2e085687563"}, + {file = "PyYAML-6.0.2-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:24471b829b3bf607e04e88d79542a9d48bb037c2267d7927a874e6c205ca7e9a"}, + {file = "PyYAML-6.0.2-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d7fded462629cfa4b685c5416b949ebad6cec74af5e2d42905d41e257e0869f5"}, + {file = "PyYAML-6.0.2-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:d84a1718ee396f54f3a086ea0a66d8e552b2ab2017ef8b420e92edbc841c352d"}, + {file = "PyYAML-6.0.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9056c1ecd25795207ad294bcf39f2db3d845767be0ea6e6a34d856f006006083"}, + {file = "PyYAML-6.0.2-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:82d09873e40955485746739bcb8b4586983670466c23382c19cffecbf1fd8706"}, + {file = "PyYAML-6.0.2-cp38-cp38-win32.whl", hash = "sha256:43fa96a3ca0d6b1812e01ced1044a003533c47f6ee8aca31724f78e93ccc089a"}, + {file = "PyYAML-6.0.2-cp38-cp38-win_amd64.whl", hash = "sha256:01179a4a8559ab5de078078f37e5c1a30d76bb88519906844fd7bdea1b7729ff"}, + {file = "PyYAML-6.0.2-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:688ba32a1cffef67fd2e9398a2efebaea461578b0923624778664cc1c914db5d"}, + {file = "PyYAML-6.0.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:a8786accb172bd8afb8be14490a16625cbc387036876ab6ba70912730faf8e1f"}, + {file = "PyYAML-6.0.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d8e03406cac8513435335dbab54c0d385e4a49e4945d2909a581c83647ca0290"}, + {file = "PyYAML-6.0.2-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:f753120cb8181e736c57ef7636e83f31b9c0d1722c516f7e86cf15b7aa57ff12"}, + {file = "PyYAML-6.0.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3b1fdb9dc17f5a7677423d508ab4f243a726dea51fa5e70992e59a7411c89d19"}, + {file = "PyYAML-6.0.2-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:0b69e4ce7a131fe56b7e4d770c67429700908fc0752af059838b1cfb41960e4e"}, + {file = "PyYAML-6.0.2-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:a9f8c2e67970f13b16084e04f134610fd1d374bf477b17ec1599185cf611d725"}, + {file = "PyYAML-6.0.2-cp39-cp39-win32.whl", hash = "sha256:6395c297d42274772abc367baaa79683958044e5d3835486c16da75d2a694631"}, + {file = "PyYAML-6.0.2-cp39-cp39-win_amd64.whl", hash = "sha256:39693e1f8320ae4f43943590b49779ffb98acb81f788220ea932a6b6c51004d8"}, + {file = "pyyaml-6.0.2.tar.gz", hash = "sha256:d584d9ec91ad65861cc08d42e834324ef890a082e591037abe114850ff7bbc3e"}, +] + +[[package]] +name = "typing-extensions" +version = "4.12.2" +description = "Backported and Experimental Type Hints for Python 3.8+" +optional = false +python-versions = ">=3.8" +files = [ + {file = "typing_extensions-4.12.2-py3-none-any.whl", hash = "sha256:04e5ca0351e0f3f85c6853954072df659d0d13fac324d0072316b67d7794700d"}, + {file = "typing_extensions-4.12.2.tar.gz", hash = "sha256:1a7ead55c7e559dd4dee8856e3a88b41225abfe1ce8df57b7c13915fe121ffb8"}, +] + +[metadata] +lock-version = "2.0" +python-versions = "^3.12" +content-hash = "4d7d42d972410e2a090c46ac43f94a0728854addba96f2ad3808cad8ac1b5f72" diff --git a/examples/transactions/pyproject.toml b/examples/transactions/pyproject.toml new file mode 100644 index 00000000..a7d38d57 --- /dev/null +++ b/examples/transactions/pyproject.toml @@ -0,0 +1,19 @@ +[tool.poetry] +name = "transactions" +version = "0.1.0" +description = "" +authors = ["marcosschroh "] +readme = "README.md" + +[tool.poetry.dependencies] +python = "^3.12" +kstreams = { path = "../../.", develop = true } +aiorun = "^2025.1.1" + + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" + +[tool.poetry.scripts] +app = "transactions.app:main" diff --git a/examples/transactions/scripts/cluster/start b/examples/transactions/scripts/cluster/start new file mode 100755 index 00000000..41c582ed --- /dev/null +++ b/examples/transactions/scripts/cluster/start @@ -0,0 +1,3 @@ +#!/bin/sh -e + +docker-compose up diff --git a/examples/transactions/scripts/cluster/stop b/examples/transactions/scripts/cluster/stop new file mode 100755 index 00000000..e2613397 --- /dev/null +++ b/examples/transactions/scripts/cluster/stop @@ -0,0 +1,4 @@ +#!/bin/sh -e + +docker-compose stop +docker-compose rm --force -v diff --git a/examples/transactions/tests/__init__.py b/examples/transactions/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/transactions/transactions/__init__.py b/examples/transactions/transactions/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/transactions/transactions/app.py b/examples/transactions/transactions/app.py new file mode 100644 index 00000000..919c8b5f --- /dev/null +++ b/examples/transactions/transactions/app.py @@ -0,0 +1,39 @@ +import asyncio +import logging + +import aiorun + +from .engine import stream_engine +from .producer import produce +from .streams import consume_from_transaction, consume_json + +logger = logging.getLogger(__name__) + + +async def start(): + stream_engine.add_stream(consume_from_transaction) + stream_engine.add_stream(consume_json) + + await stream_engine.start() + await produce( + topic="local--kstreams-json", + data={"message": "Hello world!"}, + send=stream_engine.send, + ) + + +async def shutdown(loop: asyncio.AbstractEventLoop): + await stream_engine.stop() + + +def main(): + logging.basicConfig( + format="%(asctime)s %(message)s", + datefmt="%m/%d/%Y %I:%M:%S %p", + level=logging.INFO, + ) + aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown) + + +if __name__ == "__main__": + start() diff --git a/examples/transactions/transactions/engine.py b/examples/transactions/transactions/engine.py new file mode 100644 index 00000000..6c4a7c5e --- /dev/null +++ b/examples/transactions/transactions/engine.py @@ -0,0 +1,12 @@ +from kstreams import create_engine +from kstreams.backends import Kafka + +from .serializers import JsonSerializer + +stream_engine = create_engine( + title="transaction-engine", + serializer=JsonSerializer(), + backend=Kafka( + bootstrap_servers=["localhost:9091", "localhost:9092", "localhost:9093"], + ), +) diff --git a/examples/transactions/transactions/producer.py b/examples/transactions/transactions/producer.py new file mode 100644 index 00000000..e5f548c8 --- /dev/null +++ b/examples/transactions/transactions/producer.py @@ -0,0 +1,19 @@ +import asyncio +import logging + +from kstreams import Send, consts + +logger = logging.getLogger(__name__) + + +async def produce(*, topic: str, data: dict, send: Send, total_events: int = 3) -> None: + for _ in range(total_events): + await asyncio.sleep(3) + await send( + topic, + value=data, + headers={ + "content-type": consts.APPLICATION_JSON, + }, + ) + logger.info(f"Message sent to topic {topic} \n\n") diff --git a/examples/transactions/transactions/serializers.py b/examples/transactions/transactions/serializers.py new file mode 100644 index 00000000..dad2bbd1 --- /dev/null +++ b/examples/transactions/transactions/serializers.py @@ -0,0 +1,36 @@ +import json +import logging +from typing import Any, Dict, Optional + +from kstreams import ConsumerRecord, middleware +from kstreams.types import Headers + +logger = logging.getLogger(__name__) + + +json_data = {"message": "Hello world!"} +raw_data = b"Hello world!" +raw_topic = "local--kstreams" +json_topic = "local--kstreams-json" + + +class JsonSerializer: + async def serialize( + self, + payload: Any, + headers: Optional[Headers] = None, + serializer_kwargs: Optional[Dict] = None, + ) -> bytes: + """ + Serialize a payload to json + """ + value = json.dumps(payload) + return value.encode() + + +class JsonDeserializerMiddleware(middleware.BaseMiddleware): + async def __call__(self, cr: ConsumerRecord): + if cr.value is not None: + data = json.loads(cr.value.decode()) + cr.value = data + return await self.next_call(cr) diff --git a/examples/transactions/transactions/streams.py b/examples/transactions/transactions/streams.py new file mode 100644 index 00000000..b845999b --- /dev/null +++ b/examples/transactions/transactions/streams.py @@ -0,0 +1,67 @@ +import logging +import uuid + +from kstreams import ( + ConsumerRecord, + Stream, + TopicPartition, + middleware, + stream, +) +from kstreams.types import Send, Transactional + +from .serializers import JsonDeserializerMiddleware + +logger = logging.getLogger(__name__) + + +json_data = {"message": "Hello world!"} +raw_data = b"Hello world!" +transactional_topic = "local--kstreams-transactional" +json_topic = "local--kstreams-json" + + +@stream( + transactional_topic, + group_id="my-group-raw-data", + enable_auto_commit=False, + isolation_level="read_committed", # <-- This will filter aborted txn's +) +async def consume_from_transaction(cr: ConsumerRecord, stream: Stream): + logger.info( + f"Event consumed from topic {transactional_topic} with value: {cr.value} \n\n" + ) + await stream.commit() + + +@stream( + json_topic, + group_id="my-group-json-data", + enable_auto_commit=False, + middlewares=[middleware.Middleware(JsonDeserializerMiddleware)], +) +async def consume_json(cr: ConsumerRecord, send: Send, transactional: Transactional): + logger.info(f"Json Event consumed with offset: {cr.offset}, value: {cr.value}\n\n") + transaction_id = "my-transaction-id-" + str(uuid.uuid4()) + async with transactional(transaction_id=transaction_id) as t: + # send raw data to show that it is possible to send data without serialization + metadata = await t.send( + transactional_topic, + value=f"Transaction id {transaction_id} from argument in coroutine \n".encode(), + serializer=None, + ) + + tp = TopicPartition(topic=cr.topic, partition=cr.partition) + await t.commit_offsets( + offsets={tp: cr.offset + 1}, group_id="my-group-json-data" + ) + + import asyncio + + await asyncio.sleep(4) + raise ValueError("This is a test error") + logger.info(f"Message sent inside transaction with metadata: {metadata}") + + # The same can be achied using the STREAM ENGINE di + # async with stream_engine.transaction(transaction_id=transaction_id) as t: + # ... diff --git a/kstreams/__init__.py b/kstreams/__init__.py index 54e82471..a7efc058 100644 --- a/kstreams/__init__.py +++ b/kstreams/__init__.py @@ -11,7 +11,8 @@ from .streams import Stream, stream from .structs import TopicPartitionOffset from .test_utils import TestStreamClient -from .types import ConsumerRecord, Send +from .transaction import Transaction +from .types import ConsumerRecord, Send, Transactional __all__ = [ "Consumer", @@ -31,4 +32,6 @@ "TestStreamClient", "TopicPartition", "TopicPartitionOffset", + "Transaction", + "Transactional", ] diff --git a/kstreams/engine.py b/kstreams/engine.py index 1ba85d20..72fd85ac 100644 --- a/kstreams/engine.py +++ b/kstreams/engine.py @@ -18,7 +18,13 @@ from .serializers import NO_DEFAULT, Deserializer, Serializer from .streams import Stream, StreamFunc from .streams import stream as stream_func -from .types import Deprecated, EngineHooks, Headers, NextMiddlewareCall +from .transaction import Transaction, TransactionManager +from .types import ( + Deprecated, + EngineHooks, + Headers, + NextMiddlewareCall, +) from .utils import encode_headers, execute_hooks logger = logging.getLogger(__name__) @@ -81,8 +87,11 @@ def __init__( self.deserializer = deserializer self.serializer = serializer self.monitor = monitor - self._producer: typing.Optional[typing.Type[Producer]] = None + self._producer: typing.Optional[Producer] = None self._streams: typing.List[Stream] = [] + self._transaction_manager = TransactionManager( + producer_class=self.producer_class, backend=self.backend, send=self.send + ) self._on_startup = [] if on_startup is None else list(on_startup) self._on_stop = [] if on_stop is None else list(on_stop) self._after_startup = [] if after_startup is None else list(after_startup) @@ -98,6 +107,7 @@ async def send( headers: typing.Optional[Headers] = None, serializer: typing.Optional[Serializer] = NO_DEFAULT, serializer_kwargs: typing.Optional[typing.Dict] = None, + producer: typing.Optional[Producer] = None, ): """ Attributes: @@ -111,7 +121,9 @@ async def send( encode the event serializer_kwargs Dict[str, Any] | None: Serializer kwargs """ - if self._producer is None: + producer = producer or self._producer + + if producer is None: raise EngineNotStartedException() if serializer is NO_DEFAULT: @@ -127,7 +139,7 @@ async def send( if headers is not None: encoded_headers = encode_headers(headers) - fut = await self._producer.send( + fut = await producer.send( topic, value=value, key=key, @@ -142,6 +154,23 @@ async def send( return metadata + def transaction( + self, + transaction_id: typing.Optional[str] = None, + ) -> Transaction: + """ + Provides a context manager to send messages transactionally. + It creates a transactional producer, starts a transaction and + commits or aborts the transaction based on the context manager + + Attributes: + transaction_id str | None: The transaction unique identifier. + if None, a new transaction id will be generated + """ + return self._transaction_manager.get_or_create_transaction( + transaction_id=transaction_id + ) + async def start(self) -> None: # Execute on_startup hooks await execute_hooks(self._on_startup) @@ -394,6 +423,7 @@ def add_stream( next_call=stream.func, send=self.send, stream=stream, + transactional=self.transaction, ) # NOTE: When `no typing` support is deprecated this check can @@ -408,7 +438,11 @@ def _build_stream_middleware_stack(self, *, stream: Stream) -> NextMiddlewareCal next_call = stream.udf_handler for middleware, options in reversed(middlewares): next_call = middleware( - next_call=next_call, send=self.send, stream=stream, **options + next_call=next_call, + send=self.send, + stream=stream, + transactional=self.transaction, + **options, ) return next_call diff --git a/kstreams/middleware/middleware.py b/kstreams/middleware/middleware.py index 8338a09c..fa25ee3c 100644 --- a/kstreams/middleware/middleware.py +++ b/kstreams/middleware/middleware.py @@ -17,6 +17,7 @@ class MiddlewareProtocol(typing.Protocol): next_call: types.NextMiddlewareCall send: types.Send stream: "Stream" + transactional: types.Transactional def __init__( self, @@ -24,6 +25,7 @@ def __init__( next_call: types.NextMiddlewareCall, send: types.Send, stream: "Stream", + transactional: types.Transactional, **kwargs: typing.Any, ) -> None: ... # pragma: no cover @@ -52,6 +54,7 @@ class BaseMiddleware: next_call: types.NextMiddlewareCall send: types.Send stream: "Stream" + transactional: types.Transactional def __init__( self, @@ -59,10 +62,12 @@ def __init__( next_call: types.NextMiddlewareCall, send: types.Send, stream: "Stream", + transactional: types.Transactional, ) -> None: self.next_call = next_call self.send = send self.stream = stream + self.transactional = transactional async def __call__(self, cr: types.ConsumerRecord) -> typing.Any: raise NotImplementedError diff --git a/kstreams/middleware/udf_middleware.py b/kstreams/middleware/udf_middleware.py index 12b48f4e..447dea25 100644 --- a/kstreams/middleware/udf_middleware.py +++ b/kstreams/middleware/udf_middleware.py @@ -39,6 +39,7 @@ def __init__(self, *args, **kwargs) -> None: types.ConsumerRecord: None, Stream: self.stream, types.Send: self.send, + types.Transactional: self.transactional, } def get_type(self) -> UDFType: diff --git a/kstreams/test_utils/structs.py b/kstreams/test_utils/structs.py index d37e3b9a..8c422eaf 100644 --- a/kstreams/test_utils/structs.py +++ b/kstreams/test_utils/structs.py @@ -1,3 +1,4 @@ +from enum import Enum from typing import NamedTuple @@ -6,3 +7,10 @@ class RecordMetadata(NamedTuple): partition: int topic: str timestamp: int + + +class TransactionStatus(str, Enum): + NOT_STARTED = "NOT_STARTED" + INITIALIZED = "INITIALIZED" + ABORTED = "ABORTED" + COMMITTED = "COMMITTED" diff --git a/kstreams/test_utils/test_clients.py b/kstreams/test_utils/test_clients.py index 799192bf..9c03d391 100644 --- a/kstreams/test_utils/test_clients.py +++ b/kstreams/test_utils/test_clients.py @@ -1,12 +1,12 @@ from datetime import datetime -from typing import Any, Coroutine, Dict, List, Optional, Sequence, Set, Union +from typing import Any, Coroutine, Dict, List, Literal, Optional, Sequence, Set, Union from kstreams import RebalanceListener, TopicPartition from kstreams.clients import Consumer, Producer from kstreams.serializers import Serializer from kstreams.types import ConsumerRecord, EncodedHeaders -from .structs import RecordMetadata +from .structs import RecordMetadata, TransactionStatus from .topics import TopicManager @@ -62,6 +62,66 @@ async def fut(): return fut() +class TransactionalManager: + def is_fatal_error(self) -> Literal[False]: + return False + + +class TestTransactionalProducer(TestProducer): + __test__ = False + + def __init__(self, *args, **kwargs): + self.topics = [] + self._txn_manager = TransactionalManager() + + async def send( + self, + topic_name: str, + value: Any = None, + key: Any = None, + partition: int = 0, + timestamp_ms: Optional[int] = None, + headers: Optional[EncodedHeaders] = None, + serializer: Optional[Serializer] = None, + serializer_kwargs: Optional[Dict] = None, + ) -> Coroutine: + topic, _ = TopicManager.get_or_create(topic_name, transactional=True) + self.topics.append(topic) + + return await super().send( + topic_name=topic_name, + value=value, + key=key, + partition=partition, + timestamp_ms=timestamp_ms, + headers=headers, + serializer=serializer, + serializer_kwargs=serializer_kwargs, + ) + + async def stop(self): ... + + async def begin_transaction(self) -> None: + for topic in self.topics: + topic.transaction_status = TransactionStatus.INITIALIZED + + async def abort_transaction(self) -> None: + """ + Abort all TransactionalTopics + """ + for topic in self.topics: + topic.transaction_status = TransactionStatus.ABORTED + + async def commit_transaction(self) -> None: + """ + Commit all TransactionalTopics then they are ready to be consumed + """ + for topic in self.topics: + topic.transaction_status = TransactionStatus.COMMITTED + + async def send_offsets_to_transaction(self, offsets, group_id) -> None: ... + + class TestConsumer(Base, Consumer): __test__ = False @@ -74,7 +134,10 @@ def __init__(self, group_id: Optional[str] = None, **kwargs) -> None: # Called to make sure that has all the kafka attributes like _coordinator # so it will behave like an real Kafka Consumer - super().__init__() + super().__init__(**kwargs) + self.transactional = ( + True if self._isolation_level == "read_committed" else False + ) def subscribe( self, @@ -84,6 +147,7 @@ def subscribe( pattern: Optional[str] = None, ) -> None: self.topics = topics + if topics is None: # then it is a pattern subscription, we need to get the current # topics (pre created) from the topic manager @@ -91,7 +155,11 @@ def subscribe( topics = TopicManager.get_topics_by_pattern(pattern=pattern) for topic_name in topics: - topic, created = TopicManager.get_or_create(topic_name, consumer=self) + topic, created = TopicManager.get_or_create( + topic_name, + consumer=self, + transactional=self.transactional, + ) if not created: # It means that the topic already exist, so we are in diff --git a/kstreams/test_utils/test_utils.py b/kstreams/test_utils/test_utils.py index 69d1725b..2813828a 100644 --- a/kstreams/test_utils/test_utils.py +++ b/kstreams/test_utils/test_utils.py @@ -9,7 +9,7 @@ from kstreams.types import ConsumerRecord, Headers from .structs import RecordMetadata -from .test_clients import TestConsumer, TestProducer +from .test_clients import TestConsumer, TestProducer, TestTransactionalProducer from .topics import Topic, TopicManager @@ -39,10 +39,14 @@ def __init__( topics: Optional[List[str]] = None, test_producer_class: Type[Producer] = TestProducer, test_consumer_class: Type[Consumer] = TestConsumer, + test_transactional_producer: Type[ + TestTransactionalProducer + ] = TestTransactionalProducer, ) -> None: self.stream_engine = stream_engine self.test_producer_class = test_producer_class self.test_consumer_class = test_consumer_class + self.test_transactional_producer = test_transactional_producer # Extra topics' names defined by the end user which must be created # before the cycle test starts @@ -54,6 +58,9 @@ def __init__( self.engine_consumer_class = self.stream_engine.consumer_class self.stream_engine.producer_class = self.test_producer_class + self.stream_engine._transaction_manager.producer_class = ( + self.test_transactional_producer + ) self.stream_engine.consumer_class = self.test_consumer_class if not monitoring_enabled: @@ -89,6 +96,9 @@ async def stop(self) -> None: # restore original config self.stream_engine.producer_class = self.engine_producer_class + self.stream_engine._transaction_manager.producer_class = ( + self.engine_producer_class + ) self.stream_engine.consumer_class = self.engine_consumer_class self.stream_engine.monitor = self.monitor diff --git a/kstreams/test_utils/topics.py b/kstreams/test_utils/topics.py index c9c8e842..79343fc4 100644 --- a/kstreams/test_utils/topics.py +++ b/kstreams/test_utils/topics.py @@ -1,4 +1,5 @@ import asyncio +import logging import re from collections import defaultdict from dataclasses import dataclass, field @@ -7,6 +8,9 @@ from kstreams.types import ConsumerRecord from . import test_clients +from .structs import TransactionStatus + +logger = logging.getLogger(__name__) @dataclass @@ -18,7 +22,7 @@ class Topic: ) total_events: int = 0 # for now we assumed that 1 streams is connected to 1 topic - consumer: Optional[test_clients.Consumer] = None + consumer: Optional["test_clients.TestConsumer"] = None async def put(self, event: ConsumerRecord) -> None: await self.queue.put(event) @@ -27,6 +31,7 @@ async def put(self, event: ConsumerRecord) -> None: async def get(self) -> ConsumerRecord: cr = await self.queue.get() self.task_done() + return cr def get_nowait(self) -> ConsumerRecord: @@ -77,6 +82,39 @@ def __repr__(self) -> str: ) +@dataclass +class TransactionalTopic(Topic): + transaction_status: TransactionStatus = TransactionStatus.NOT_STARTED + + def __hash__(self): + return hash(self.transaction_status.value) + + async def get(self) -> ConsumerRecord: + while self.transaction_status in ( + TransactionStatus.NOT_STARTED, + TransactionStatus.INITIALIZED, + ): + # wait for the transaction to end + await asyncio.sleep(1e-10) + logger.debug( + f"Waiting for transaction to end. Status: {self.transaction_status}" + ) + + if ( + self.consumer is not None + and self.transaction_status == TransactionStatus.ABORTED + and self.consumer.transactional + ): + # The transaction was aborted and the consumer wants only events + # that were committed, then we should empty the queue + # then we do a normal get to wait for the next event + while not self.queue.empty(): + await super().get() + + # Here the transaction was committed or the consumer is not transactional + return await super().get() + + @dataclass class TopicManager: # The queues will represent the kafka topics during testing @@ -96,6 +134,12 @@ def get(cls, name: str) -> Topic: "and the topic has events." ) + @classmethod + def get_transactional_topic(cls, name: str) -> TransactionalTopic: + topic = cls.get(name) + assert isinstance(topic, TransactionalTopic) + return topic + @classmethod def get_topics_by_pattern(cls, pattern: str) -> Sequence[str]: compile_expression = re.compile(pattern) @@ -108,15 +152,28 @@ def get_topics_by_pattern(cls, pattern: str) -> Sequence[str]: @classmethod def create( - cls, name: str, consumer: Optional["test_clients.Consumer"] = None + cls, + name: str, + consumer: Optional["test_clients.TestConsumer"] = None, + transactional: bool = False, ) -> Topic: - topic = Topic(name=name, queue=asyncio.Queue(), consumer=consumer) + topic_class = TransactionalTopic if transactional else Topic + + topic = topic_class( + name=name, + queue=asyncio.Queue(), + consumer=consumer, + ) + cls.topics[name] = topic return topic @classmethod def get_or_create( - cls, name: str, consumer: Optional["test_clients.Consumer"] = None + cls, + name: str, + consumer: Optional["test_clients.TestConsumer"] = None, + transactional: bool = False, ) -> Tuple[Topic, bool]: """ A convenience method for looking up Topic by name. @@ -130,7 +187,7 @@ def get_or_create( topic = cls.get(name) return topic, False except ValueError: - topic = cls.create(name, consumer=consumer) + topic = cls.create(name, consumer=consumer, transactional=transactional) return topic, True @classmethod diff --git a/kstreams/transaction.py b/kstreams/transaction.py new file mode 100644 index 00000000..4490db44 --- /dev/null +++ b/kstreams/transaction.py @@ -0,0 +1,80 @@ +import logging +import typing +import uuid +from functools import partial + +from .backends.kafka import Kafka +from .clients import Producer +from .types import Send + +logger = logging.getLogger(__name__) + + +class Transaction: + def __init__(self, transaction_id: str, producer: Producer, send: Send) -> None: + self.transaction_id = transaction_id + self.producer = producer + # inject the unique producer per transaction_id + self.send = partial(send, producer=self.producer) + + async def commit_offsets(self, *, offsets: dict[str, int], group_id: str) -> None: + await self.producer.send_offsets_to_transaction(offsets, group_id) + logger.info(f"Commiting offsets {offsets} for group {group_id}") + + async def __aenter__(self): + await self.producer.start() + await self.producer.begin_transaction() + return self + + async def __aexit__(self, exc_type, exc, tb): + if exc: + if self.producer._txn_manager.is_fatal_error(): + logger.error(f"Error in transaction: {exc}") + # TODO: check if it is possible to reach this line + return + await self.producer.abort_transaction() + logger.error(f"Transaction with id {self.transaction_id} aborted") + else: + await self.producer.commit_transaction() + logger.info(f"Transaction with id {self.transaction_id} committed") + + await self.producer.stop() + + +class TransactionManager: + def __init__( + self, producer_class: typing.Type[Producer], backend: Kafka, send: Send + ) -> None: + self.backend = backend + self.producer_class = producer_class + self.send = send + + # map active transaction ids to transaction objects + # TODO: check if a transaction can be reused + self._transactions: dict[str, Transaction] = {} + + def get_or_create_transaction( + self, transaction_id: typing.Optional[str] = None + ) -> Transaction: + # transaction_id must be unique and it can not be reused, otherwise + # it will raise an error aiokafka.errors.KafkaError: + # KafkaError: Unexpected error during batch delivery + transaction_id = transaction_id or str(uuid.uuid4()) + + if ( + transaction_id in self._transactions + ): # do .get() instead and fallback to function + return self._transactions[transaction_id] + + producer = self._create_producer(transaction_id=transaction_id) + transaction = Transaction( + transaction_id=transaction_id, producer=producer, send=self.send + ) + self._transactions[transaction_id] = transaction + return transaction + + def _create_producer(self, transaction_id: str, **kwargs) -> Producer: + config = {**self.backend.model_dump(), **kwargs} + config["transactional_id"] = transaction_id + producer = self.producer_class(**config) + return producer diff --git a/kstreams/types.py b/kstreams/types.py index 90107722..c0cc75ea 100644 --- a/kstreams/types.py +++ b/kstreams/types.py @@ -1,10 +1,14 @@ import typing +from contextlib import AbstractAsyncContextManager from dataclasses import dataclass from aiokafka.structs import RecordMetadata +from .clients import Producer + if typing.TYPE_CHECKING: from .serializers import Serializer # pragma: no cover + from .transaction import Transaction # pragma: no cover Headers = typing.Dict[str, str] EncodedHeaders = typing.Sequence[typing.Tuple[str, bytes]] @@ -23,9 +27,17 @@ def __call__( headers: typing.Optional[Headers] = None, serializer: typing.Optional["Serializer"] = None, serializer_kwargs: typing.Optional[typing.Dict] = None, + producer: typing.Optional[Producer] = None, ) -> typing.Awaitable[RecordMetadata]: ... +class Transactional(typing.Protocol): + def __call__( + self, + transaction_id: typing.Optional[str] = None, + ) -> AbstractAsyncContextManager["Transaction", None]: ... + + D = typing.TypeVar("D") Deprecated = typing.Annotated[D, "deprecated"] diff --git a/mkdocs.yml b/mkdocs.yml index 1f863e78..c7acd173 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -33,6 +33,7 @@ nav: - Metrics: 'metrics.md' - Monitoring: 'monitoring.md' - Serialization: 'serialization.md' + - Transactions: 'transactions.md' - Testing: 'test_client.md' - Middleware: 'middleware.md' - Utils: 'utils.md' @@ -59,6 +60,8 @@ markdown_extensions: - admonition - pymdownx.tabbed: alternate_style: true + - plantuml_markdown: + server: http://www.plantuml.com/plantuml plugins: - autorefs diff --git a/poetry.lock b/poetry.lock index 37b051c2..32de6bd3 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1001,6 +1001,22 @@ files = [ {file = "pathspec-0.12.1.tar.gz", hash = "sha256:a482d51503a1ab33b1c67a6c3813a26953dbdc71c31dacaef9a838c4e29f5712"}, ] +[[package]] +name = "plantuml-markdown" +version = "3.11.1" +description = "A PlantUML plugin for Markdown" +optional = false +python-versions = "*" +files = [ + {file = "plantuml_markdown-3.11.1-py3-none-any.whl", hash = "sha256:52c8507be5b4e5b57e3c4c1cfbe8894b6204f4b6bac2279896efb03d3b79bacf"}, + {file = "plantuml_markdown-3.11.1.tar.gz", hash = "sha256:0ac39c5d13d3ea8b84ec88f57369a85d058ff15c0c225591dd649b327586142c"}, +] + +[package.dependencies] +Markdown = "*" +requests = "*" +six = "*" + [[package]] name = "platformdirs" version = "4.3.6" diff --git a/pyproject.toml b/pyproject.toml index 220b24c4..cc9ebcf6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,6 +48,7 @@ mkdocs-material = "^9.5.39" starlette-prometheus = "^0.10.0" codecov = "^2.1.12" mkdocstrings = { version = "^0.27", extras = ["python"] } +plantuml-markdown = "^3.11.1" [build-system] requires = ["poetry-core>=1.0.0"] diff --git a/tests/conftest.py b/tests/conftest.py index 39fcfc1c..482e3aac 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,5 @@ import asyncio +import json from collections import namedtuple from dataclasses import field from typing import Any, Dict, List, NamedTuple, Optional, Sequence, Tuple @@ -7,7 +8,7 @@ import pytest_asyncio from pytest_httpserver import HTTPServer -from kstreams import clients, create_engine +from kstreams import ConsumerRecord, clients, create_engine, types from kstreams.utils import create_ssl_context_from_mem @@ -81,6 +82,27 @@ async def committed(self, _: TopicPartition): return 10 +class JsonSerializer: + async def serialize( + self, + payload: Any, + headers: Optional[types.Headers] = None, + serializer_kwargs: Optional[Dict] = None, + ) -> bytes: + """ + Serialize paylod to json + """ + value = json.dumps(payload) + return value.encode() + + +class JsonDeserializer: + async def deserialize(self, consumer_record: ConsumerRecord, **kwargs) -> Any: + if consumer_record.value is not None: + data = consumer_record.value.decode() + return json.loads(data) + + @pytest.fixture def record_metadata(): return RecordMetadata() diff --git a/tests/test_client.py b/tests/test_client.py index eb0370ca..f0988aaa 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -6,7 +6,13 @@ import pytest -from kstreams import ConsumerRecord, StreamEngine, TopicPartition, TopicPartitionOffset +from kstreams import ( + ConsumerRecord, + StreamEngine, + TopicPartition, + TopicPartitionOffset, + Transactional, +) from kstreams.streams import Stream from kstreams.test_utils import ( TestConsumer, @@ -455,6 +461,135 @@ async def my_stream(stream: Stream): assert (await my_stream.consumer.committed(tp)) == total_events - 1 +@pytest.mark.asyncio +async def test_multiple_transactions(stream_engine: StreamEngine): + topic_name = "local--kstreams-consumer-commit" + transactional_topic = "transactional-topic" + value = b'{"message": "Hello world!"}' + total_events = 5 + save_to_db = Mock() + + @stream_engine.stream(transactional_topic, isolation_level="read_committed") + async def stream_transaction(cr: ConsumerRecord): + assert cr.value == value + save_to_db(cr.value) + + @stream_engine.stream(topic_name, group_id="my-group-id") + async def my_stream(cr: ConsumerRecord, transaction: Transactional): + for transaction_id in range(0, total_events): + async with transaction(transaction_id=str(transaction_id)) as t: + await t.send(transactional_topic, value=cr.value) + tp = TopicPartition( + topic=topic_name, + partition=cr.partition, + ) + await t.commit_offsets(offsets={tp: cr.offset}, group_id="my-group-id") + + client = TestStreamClient(stream_engine) + async with client: + for _ in range(0, total_events): + await client.send(topic_name, value=value) + + await asyncio.sleep(1e-10) + + assert my_stream.consumer is not None + assert stream_transaction.consumer is not None + + save_to_db.assert_has_calls( + [call(value) for _ in range(0, total_events)], any_order=True + ) + + +@pytest.mark.asyncio +async def test_single_transaction_multiple_events(stream_engine: StreamEngine): + topic_name = "local--kstreams-consumer-commit" + transactional_topic = "transactional-topic" + value = b'{"message": "Hello world!"}' + total_events = 5 + save_to_db = Mock() + + @stream_engine.stream(transactional_topic, isolation_level="read_committed") + async def stream_transaction(cr: ConsumerRecord): + assert cr.value == value + save_to_db(cr.value) + + @stream_engine.stream(topic_name, group_id="my-group-id") + async def my_stream(cr: ConsumerRecord, transaction: Transactional): + async with transaction(transaction_id="transaction_id") as t: + for _ in range(0, total_events): + await t.send(transactional_topic, value=cr.value) + tp = TopicPartition( + topic=topic_name, + partition=cr.partition, + ) + await t.commit_offsets(offsets={tp: cr.offset}, group_id="my-group-id") + + client = TestStreamClient(stream_engine) + async with client: + await client.send(topic_name, value=value) + await asyncio.sleep(1e-10) + + assert my_stream.consumer is not None + assert stream_transaction.consumer is not None + + save_to_db.assert_has_calls( + [call(value) for _ in range(0, total_events)], any_order=True + ) + + +@pytest.mark.asyncio +async def test_not_consume_on_abort_transaction(stream_engine: StreamEngine): + topic_name = "local--kstreams-consumer-commit" + transactional_topic = "transactional-topic" + save_to_db = Mock() + + @stream_engine.stream(transactional_topic, isolation_level="read_committed") + async def stream_transaction(cr: ConsumerRecord): + save_to_db(cr.value) + + @stream_engine.stream(topic_name) + async def my_stream(cr: ConsumerRecord, transaction: Transactional): + async with transaction(transaction_id="my-transaction-id") as t: + await t.send(transactional_topic, value=cr.value) + await t.send(transactional_topic, value=cr.value) + raise ValueError("This is an arror....") + + client = TestStreamClient(stream_engine) + async with client: + await client.send(topic_name, value=b'{"message": "Hello world!"}') + await asyncio.sleep(1e-10) + + save_to_db.assert_not_called() + + +@pytest.mark.asyncio +async def test_consume_on_abort_transaction(stream_engine: StreamEngine): + topic_name = "local--kstreams-consumer-commit" + transactional_topic = "transactional-topic" + value = b'{"message": "Hello world!"}' + save_to_db = Mock() + + # the streams has the isolation_level="read_uncommitted" + @stream_engine.stream(transactional_topic) + async def stream_transaction(cr: ConsumerRecord): + assert cr.value == value + save_to_db(cr.value) + + @stream_engine.stream(topic_name, group_id="my-group-id") + async def my_stream(cr: ConsumerRecord, transaction: Transactional): + async with transaction(transaction_id="my-transaction-id") as t: + await t.send(transactional_topic, value=cr.value) + await t.send(transactional_topic, value=cr.value) + raise ValueError("This is an arror....") + + client = TestStreamClient(stream_engine) + async with client: + await client.send(topic_name, value=value) + await asyncio.sleep(1e-10) + + save_to_db.assert_has_calls([call(value), call(value)], any_order=True) + + @pytest.mark.asyncio async def test_e2e_example(): """ diff --git a/tests/test_serialization.py b/tests/test_serialization.py index 5b05a718..5e42f0e9 100644 --- a/tests/test_serialization.py +++ b/tests/test_serialization.py @@ -1,41 +1,20 @@ -import json -from typing import Any, Dict, Optional +from typing import Dict, Optional from unittest import mock import pytest -from kstreams import ConsumerRecord, StreamEngine, consts +from kstreams import StreamEngine, consts from kstreams.clients import Producer from kstreams.streams import Stream from kstreams.test_utils.test_utils import TestStreamClient -from kstreams.types import Headers from kstreams.utils import encode_headers - -class MyJsonSerializer: - async def serialize( - self, - payload: Any, - headers: Optional[Headers] = None, - serializer_kwargs: Optional[Dict] = None, - ) -> bytes: - """ - Serialize paylod to json - """ - value = json.dumps(payload) - return value.encode() - - -class MyJsonDeserializer: - async def deserialize(self, consumer_record: ConsumerRecord, **kwargs) -> Any: - if consumer_record.value is not None: - data = consumer_record.value.decode() - return json.loads(data) +from .conftest import JsonDeserializer, JsonSerializer @pytest.mark.asyncio async def test_send_global_serializer(stream_engine: StreamEngine, record_metadata): - serializer = MyJsonSerializer() + serializer = JsonSerializer() stream_engine.serializer = serializer async def async_func(): @@ -87,7 +66,7 @@ async def async_func(): topic, value=value, headers=headers, - serializer=MyJsonSerializer(), + serializer=JsonSerializer(), ) assert metadata @@ -104,7 +83,7 @@ async def async_func(): @pytest.mark.asyncio async def test_not_serialize_value(stream_engine: StreamEngine, record_metadata): # even if a serializer is set, we can send the value as is - stream_engine.serializer = MyJsonSerializer() + stream_engine.serializer = JsonSerializer() async def async_func(): return record_metadata @@ -153,7 +132,7 @@ async def test_consume_global_deserialization( Even though deserialzers are deprecated, we still support them. """ topic = "local--hello-kpn" - stream_engine.deserializer = MyJsonDeserializer() + stream_engine.deserializer = JsonDeserializer() client = TestStreamClient(stream_engine) save_to_db = mock.Mock() @@ -169,7 +148,7 @@ async def hello_stream(stream: Stream): value=value, headers=headers, key="1", - serializer=MyJsonSerializer(), + serializer=JsonSerializer(), ) # The payload as been encoded with json, @@ -197,7 +176,7 @@ async def test_consume_custom_deserialization( save_to_db = mock.Mock() - @stream_engine.stream(topic, deserializer=MyJsonDeserializer()) + @stream_engine.stream(topic, deserializer=JsonDeserializer()) async def hello_stream(stream: Stream): async for event in stream: save_to_db(event) @@ -209,7 +188,7 @@ async def hello_stream(stream: Stream): value=value, headers=headers, key="1", - serializer=MyJsonSerializer(), + serializer=JsonSerializer(), ) # The payload as been encoded with json, diff --git a/tests/test_transactions.py b/tests/test_transactions.py new file mode 100644 index 00000000..90a5c170 --- /dev/null +++ b/tests/test_transactions.py @@ -0,0 +1,346 @@ +import asyncio +import contextlib +import typing +from unittest import mock + +import pytest + +from kstreams import ( + Consumer, + ConsumerRecord, + Producer, + RecordMetadata, + StreamEngine, + TopicPartition, + types, +) +from tests import TimeoutErrorException + +from .conftest import JsonSerializer + + +@pytest.mark.parametrize("transaction_id", [None, "my-transaction-id"]) +@pytest.mark.asyncio +async def test_create_transaction_with_stream_engine( + stream_engine: StreamEngine, + transaction_id: typing.Optional[str], +): + with mock.patch.multiple( + Producer, + start=mock.DEFAULT, + begin_transaction=mock.DEFAULT, + commit_transaction=mock.DEFAULT, + ): + async with stream_engine.transaction(transaction_id=transaction_id) as t: + ... + + if transaction_id is None: + assert t.transaction_id is not None + assert t.producer._txn_manager.transactional_id is not None + else: + assert t.transaction_id == "my-transaction-id" + assert t.producer._txn_manager.transactional_id == "my-transaction-id" + + assert t.send is not None + + t.producer.start.assert_awaited() + t.producer.begin_transaction.assert_awaited() + t.producer.commit_transaction.assert_awaited() + + assert t.producer._closed + + +@pytest.mark.asyncio +async def test_create_transaction_with_stream( + stream_engine: StreamEngine, consumer_record_factory +): + value = b"test" + transaction_id = "my-transaction-id" + + async def getone(_): + return consumer_record_factory(value=value) + + with ( + mock.patch.multiple( + Consumer, + start=mock.DEFAULT, + subscribe=mock.DEFAULT, + getone=getone, + ), + mock.patch.multiple( + Producer, + start=mock.DEFAULT, + stop=mock.DEFAULT, + begin_transaction=mock.DEFAULT, + commit_transaction=mock.DEFAULT, + ), + ): + + @stream_engine.stream("local--kstreams") + async def stream( + cr: ConsumerRecord[str, bytes], transaction: types.Transactional + ): + async with transaction(transaction_id=transaction_id) as t: + assert cr.value == value + + assert t.transaction_id == transaction_id + assert t.producer._txn_manager.transactional_id == transaction_id + + t.producer.start.assert_awaited() + t.producer.begin_transaction.assert_awaited() + t.producer.commit_transaction.assert_awaited() + t.producer.stop.assert_awaited() + + await asyncio.sleep(0.1) + + with contextlib.suppress(TimeoutErrorException): + await asyncio.wait_for(stream.start(), timeout=0.1) + + await stream.stop() + + +@pytest.mark.asyncio +async def test_send_event_in_transaction( + stream_engine: StreamEngine, record_metadata: RecordMetadata +): + async def async_func(): + return record_metadata + + with mock.patch.multiple( + Producer, + start=mock.DEFAULT, + begin_transaction=mock.DEFAULT, + commit_transaction=mock.DEFAULT, + send=mock.AsyncMock(return_value=async_func()), + ): + async with stream_engine.transaction() as t: + await t.send("sink-topic", value=b"1", key="1") + + t.producer.begin_transaction.assert_awaited() + t.producer.commit_transaction.assert_awaited() + t.producer.send.assert_awaited_once_with( + "sink-topic", + value=b"1", + key="1", + partition=None, + timestamp_ms=None, + headers=None, + ) + + +@pytest.mark.asyncio +async def test_send_event_with_global_serializer_in_transaction( + stream_engine: StreamEngine, record_metadata: RecordMetadata +): + stream_engine.serializer = JsonSerializer() + value = {"value": "Hello world!!"} + + async def async_func(): + return record_metadata + + with mock.patch.multiple( + Producer, + start=mock.DEFAULT, + begin_transaction=mock.DEFAULT, + commit_transaction=mock.DEFAULT, + send=mock.AsyncMock(return_value=async_func()), + ): + # Check that we send json data + async with stream_engine.transaction() as t: + await t.send("sink-topic", value=value, key="1") + + t.producer.begin_transaction.assert_awaited() + t.producer.commit_transaction.assert_awaited() + t.producer.send.assert_awaited_once_with( + "sink-topic", + value=b'{"value": "Hello world!!"}', + key="1", + partition=None, + timestamp_ms=None, + headers=None, + ) + + +@pytest.mark.asyncio +async def test_override_global_serializer_in_transaction( + stream_engine: StreamEngine, record_metadata: RecordMetadata +): + stream_engine.serializer = JsonSerializer() + + async def async_func(): + return record_metadata + + with mock.patch.multiple( + Producer, + start=mock.DEFAULT, + begin_transaction=mock.DEFAULT, + commit_transaction=mock.DEFAULT, + send=mock.AsyncMock(return_value=async_func()), + ): + # We have to set serializer=None otherwise we will get + # TypeError: Object of type bytes is not JSON serializable + async with stream_engine.transaction() as t: + await t.send( + "sink-topic", value=b"Helloooo", key="2", partition=10, serializer=None + ) + + t.producer.begin_transaction.assert_awaited() + t.producer.commit_transaction.assert_awaited() + t.producer.send.assert_awaited_once_with( + "sink-topic", + value=b"Helloooo", + key="2", + partition=10, + timestamp_ms=None, + headers=None, + ) + + +@pytest.mark.asyncio +async def test_commit_event_in_transaction( + stream_engine: StreamEngine, + record_metadata: RecordMetadata, + consumer_record_factory: typing.Callable[..., ConsumerRecord], +): + async def async_func(): + return record_metadata + + async def getone(_): + return consumer_record_factory(value=b"1") + + with ( + mock.patch.multiple( + Producer, + start=mock.DEFAULT, + begin_transaction=mock.DEFAULT, + commit_transaction=mock.DEFAULT, + send=mock.AsyncMock(return_value=async_func()), + send_offsets_to_transaction=mock.DEFAULT, + ), + mock.patch.multiple( + Consumer, + start=mock.DEFAULT, + subscribe=mock.DEFAULT, + unsubscribe=mock.DEFAULT, + getone=getone, + ), + ): + + @stream_engine.stream("local--kstreams", group_id="test-group") + async def stream( + cr: ConsumerRecord[str, bytes], transaction: types.Transactional + ): + async with transaction() as t: + assert cr.value == b"1" + await t.send("sink-topic", value=b"1", key="1") + + tp = TopicPartition(topic=cr.topic, partition=cr.partition) + await t.commit_offsets( + offsets={tp: cr.offset + 1}, group_id="test-group" + ) + + t.producer.begin_transaction.assert_awaited() + t.producer.commit_transaction.assert_awaited() + t.producer.send.assert_awaited_once_with( + "sink-topic", + value=b"1", + key="1", + partition=None, + timestamp_ms=None, + headers=None, + ) + t.producer.send_offsets_to_transaction.assert_awaited_once_with( + {tp: cr.offset + 1}, "test-group" + ) + + assert t.producer._closed + await asyncio.sleep(0.1) + + with contextlib.suppress(TimeoutErrorException): + await asyncio.wait_for(stream.start(), timeout=0.1) + + await stream.stop() + + +@pytest.mark.asyncio +async def test_abort_transaction( + stream_engine: StreamEngine, + record_metadata: RecordMetadata, + consumer_record_factory: typing.Callable[..., ConsumerRecord], +): + async def async_func(): + return record_metadata + + async def getone(_): + return consumer_record_factory(value=b"1") + + with ( + mock.patch.multiple( + Producer, + start=mock.DEFAULT, + begin_transaction=mock.DEFAULT, + commit_transaction=mock.DEFAULT, + abort_transaction=mock.DEFAULT, + send=mock.AsyncMock(return_value=async_func()), + send_offsets_to_transaction=mock.DEFAULT, + ), + mock.patch.multiple( + Consumer, + start=mock.DEFAULT, + subscribe=mock.DEFAULT, + unsubscribe=mock.DEFAULT, + getone=getone, + ), + pytest.raises(ValueError) as exc, + ): + + @stream_engine.stream("local--kstreams", group_id="test-group") + async def stream( + cr: ConsumerRecord[str, bytes], transaction: types.Transactional + ) -> typing.NoReturn: + async with transaction() as t: + assert cr.value == b"1" + await t.send("sink-topic", value=b"1", key="1") + + tp = TopicPartition(topic=cr.topic, partition=cr.partition) + await t.commit_offsets( + offsets={tp: cr.offset + 1}, group_id="test-group" + ) + + # raise exception to abort the transaction + raise ValueError("This is a test error") + + t.producer.begin_transaction.assert_awaited() + + # commit_transaction should not be called + t.producer.commit_transaction.assert_not_awaited() + + # abort_transaction should be called + t.producer.abort_transaction.assert_awaited() + + # The event is always produced even if the transaction is aborted + # the consumer should filter the event using + # isolation_level="read_committed" + t.producer.send.assert_awaited_once_with( + "sink-topic", + value=b"1", + key="1", + partition=None, + timestamp_ms=None, + headers=None, + ) + + # The commit is always called even if the transaction is aborted + t.producer.send_offsets_to_transaction.assert_awaited_once_with( + {tp: cr.offset + 1}, "test-group" + ) + + assert t.producer._closed + await asyncio.sleep(0.1) + + with contextlib.suppress(TimeoutErrorException): + await asyncio.wait_for(stream.start(), timeout=0.1) + + await stream.stop() + + assert exc.type is ValueError