-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(transactions): Transaction added to garantee only once paradigm.…
… Related to #265
- Loading branch information
1 parent
d6b2a51
commit 6c40c0a
Showing
32 changed files
with
2,321 additions
and
443 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,231 @@ | ||
`Kafka 0.11.0` includes support for `idempotent` and `transactional` capabilities in the `producer`. Idempotent delivery ensures that messages are delivered `exactly once` | ||
to a particular topic partition during the lifetime of a `single producer`. Transactional delivery allows producers to send data to multiple partitions such that either | ||
all messages are successfully delivered, or none of them are. Together, these capabilities enable `*exactly once semantics*` in Kafka. | ||
|
||
It is important to notice that: | ||
|
||
- `Transaction` always start from a `send` (producer) | ||
- Events sent to one or more topics will only be visible on consumers after the transaction is committed. | ||
- To use transactions, a `transaction id` (unique id per transaction) must be set prior an event is sent. If you do not provide one `kstreams` will auto generate one for you. | ||
- Transaction state is stored in a new internal topic `__transaction_state`. This topic is not created until the the first attempt to use a transactional request API. There are several settings to control the topic's configuration. | ||
- `Topics` which are included in transactions should be configured for durability. In particular, the `replication.factor` should be at least `3`, and the `min.insync.replicas` for these topics should be set to `2` | ||
- `Transactions` always add overhead, meaning that more effort is needed to produce events and the consumer have to apply filters | ||
For example, `transaction.state.log.min.isr` controls the minimum ISR for this topic. | ||
- `Streams` (consumers) will have to filter or not transactional events | ||
|
||
```plantuml | ||
@startuml | ||
!theme crt-amber | ||
!include https://raw.githubusercontent.com/plantuml-stdlib/C4-PlantUML/master/C4_Context.puml | ||
agent "Producer" as producer | ||
agent "Stream A" as stream_a | ||
agent "Stream B" as stream_b | ||
queue "Topic A" as topic_a | ||
producer -> topic_a: "produce with a transaction" | ||
topic_a --> stream_a: "consume only transactional events" | ||
topic_a --> stream_b: "consume all events" | ||
@enduml | ||
``` | ||
|
||
## Transaction State | ||
|
||
A transaction have state, which is store in the special topic `__transaction_state`. This topic can be tuned in different ways, but it is outside of this guide scope. | ||
The only important detailt that you must be aware of is that a `transaction` can be `commited` or `aborted`. You will not manage the internal states but you are responsible | ||
of filtering or not transactions on the `stream` side. | ||
|
||
```plantuml | ||
@startuml | ||
!theme crt-amber | ||
!include https://raw.githubusercontent.com/plantuml-stdlib/C4-PlantUML/master/C4_Context.puml | ||
[*] --> Init | ||
Init : Initialized with a `transaction_id` | ||
Init -> Commited | ||
Commited --> [*] | ||
Init -> Aborted | ||
Aborted --> [*] | ||
@enduml | ||
``` | ||
|
||
## Usage | ||
|
||
From the `kstreams` point of view, the `transaction pattern` is a `context manager` that will `start` a transaction and then `commit` or `aboort` it. Always a `transaction` starts when an event is send. Once that we have the `context` we can send events in two ways: | ||
|
||
Using the `StreamEngine` directly: | ||
|
||
```python | ||
from kstreams import create_engine | ||
from kstreams.backends import Kafka | ||
|
||
from .serializers import JsonSerializer | ||
|
||
stream_engine = create_engine( | ||
title="transaction-engine", | ||
serializer=JsonSerializer(), | ||
backend=Kafka( | ||
bootstrap_servers=["localhost:9091", "localhost:9092", "localhost:9093"], | ||
), | ||
) | ||
|
||
|
||
transaction_id = "my-transaction-id-" + str(uuid.uuid4()) | ||
async with stream_engine.transaction(transaction_id=transaction_id) as t: | ||
await t.send( | ||
"transactional_topic", | ||
value=f"Transaction id {transaction_id} from argument in coroutine \n".encode(), | ||
) | ||
``` | ||
|
||
or using the annotation `Transactional`: | ||
|
||
```python title="Transaction inside a stream" | ||
@stream_engine.stream("a-topic", group_id="my-group") | ||
async def stream_a(cr: ConsumerRecord, transactional: Transactional): | ||
transaction_id = "my-transaction-id-" + str(uuid.uuid4()) | ||
async with transactional(transaction_id=transaction_id) as t: | ||
await t.send( | ||
"transactional_topic", | ||
value=f"Transaction id {transaction_id} from argument in coroutine \n".encode(), | ||
) | ||
``` | ||
|
||
Once the `context` is left then the transaction is `commit` or `aborted`. | ||
|
||
## Commiting offsets | ||
|
||
With transactions it is possible to `commit` consumer offsets. This is quite important feature as it will guarantee that only a consumer offset is commit once the whole transaction as has been commited, including sending events to other topics. | ||
To commit consumer offsets we need the `offsetes` and the `group_id` that we want to commit to. Example: | ||
|
||
```python | ||
@stream_engine.stream("a-topic", group_id="my-group", enable_auto_commit=False) | ||
async def stream_a(cr: ConsumerRecord, transactional: Transactional): | ||
transaction_id = "my-transaction-id-" + str(uuid.uuid4()) | ||
async with transactional(transaction_id=transaction_id) as t: | ||
metadata = await t.send( | ||
transactional_topic, | ||
value=f"Transaction id {transaction_id} from argument in coroutine \n".encode(), | ||
) | ||
|
||
tp = TopicPartition(topic=cr.topic, partition=cr.partition) | ||
await t.commit_offsets(offsets={tp: cr.offset + 1}, group_id="my-group") | ||
logger.info(f"Message sent inside transaction with metadata: {metadata}") | ||
``` | ||
|
||
!!! note | ||
- The property `enable_auto_commit` must be set to `False` | ||
- When commiting offsets the `group_id` must be the same as the `consumer group_id` | ||
- Offsets is a dictionary of TopicPartition: offset | ||
- If the `transaction` is `aborted` then the commit offset has NOT effect | ||
|
||
## Reading committed events | ||
|
||
It is the `stream` (consumer) responsability to filter commited events. The property `isolation_level="read_committed"` must be used. If the `isolation_level` is NOT set to `read_committed` then events that were sent with an `aborted` transaction are also consumed | ||
|
||
```plantuml | ||
@startuml | ||
!theme crt-amber | ||
!include https://raw.githubusercontent.com/plantuml-stdlib/C4-PlantUML/master/C4_Context.puml | ||
agent "Stream A" as stream_a | ||
agent "Stream B" as stream_b | ||
queue "Topic A" as topic_a | ||
queue "Topic B" as topic_b | ||
topic_a -> stream_a: "consume all events" | ||
stream_a --> topic_b: "produce with a transaction and commit offsets" | ||
topic_b -> stream_b: "filter commited events" | ||
@enduml | ||
``` | ||
|
||
The above diagram implementation is the following: | ||
|
||
```python | ||
@stream( | ||
"transactional_topic", | ||
group_id="another-group", | ||
enable_auto_commit=False, | ||
isolation_level="read_committed", # <-- This will filter aborted txn's | ||
) | ||
async def stream_b(cr: ConsumerRecord, stream: Stream): | ||
logger.info( | ||
f"Event consumed from topic {transactional_topic} with value: {cr.value} \n\n" | ||
) | ||
await stream.commit() | ||
|
||
|
||
@stream_engine.stream("a-topic", group_id="my-group") | ||
async def stream_a(cr: ConsumerRecord, transactional: Transactional): | ||
transaction_id = "my-transaction-id-" + str(uuid.uuid4()) | ||
async with transactional(transaction_id=transaction_id) as t: | ||
metadata = await t.send( | ||
"transactional_topic", | ||
value=f"Transaction id {transaction_id} from argument in coroutine \n".encode(), | ||
) | ||
|
||
tp = TopicPartition(topic=cr.topic, partition=cr.partition) | ||
await t.commit_offsets(offsets={tp: cr.offset + 1}, group_id="my-group") | ||
logger.info(f"Message sent inside transaction with metadata: {metadata}") | ||
``` | ||
|
||
## Aborted transactions | ||
|
||
A transaction can be aborted, for example when we have a bug in our code. Consired the following example in which an error is `raised`: | ||
|
||
```python | ||
@stream_engine.stream("a-topic", group_id="my-group") | ||
async def consume_json(cr: ConsumerRecord, transactional: Transactional): | ||
transaction_id = "my-transaction-id-" + str(uuid.uuid4()) | ||
async with transactional(transaction_id=transaction_id) as t: | ||
metadata = await t.send( | ||
transactional_topic, | ||
value=f"Transaction id {transaction_id} from argument in coroutine \n".encode(), | ||
serializer=None, | ||
) | ||
|
||
tp = TopicPartition(topic=cr.topic, partition=cr.partition) | ||
await t.commit_offsets( | ||
offsets={tp: cr.offset + 1}, group_id="my-group-json-data" | ||
) | ||
|
||
raise ValueError("This is a test error") | ||
``` | ||
|
||
The code will crash when the line with `raise ValueError` is reached, causing to leave the context manager with an `aborted` transaction. Any error that ocuurs inside the `context manager` will `abort` the `transaction`. | ||
|
||
It is also important to notice that: | ||
|
||
- If the transaction is aborted then `commited offsets` has not effect | ||
- Events are always sent regarless whether the transaction was aborted. The consumer is responsible to filter events | ||
|
||
## Testing | ||
|
||
## Example | ||
|
||
A full transaction example with `kstreams` can be found in [transaction example](https://github.com/kpn/kstreams/tree/master/examples/transactions) | ||
|
||
```plantuml | ||
@startuml | ||
!theme crt-amber | ||
!include https://raw.githubusercontent.com/plantuml-stdlib/C4-PlantUML/master/C4_Context.puml | ||
agent "Producer" as producer | ||
agent "Stream A" as stream_a | ||
agent "Stream B" as stream_b | ||
queue "Topic A" as topic_a | ||
queue "Topic B" as topic_b | ||
producer -> topic_a: "produce" | ||
topic_a <-- stream_a: "consume" | ||
stream_a -> topic_b: "produce with transaction" | ||
topic_b <-- stream_b: "consume read_committed events" | ||
@enduml | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
# Kstreams Transaction example | ||
|
||
This example shows how to use `kafka transactions` with `kstreams`. For this purpose we have to setup a local kafka with 3 `brokers` and `kafka-ui` to explore | ||
the `topics offsets` and `groups lag`. | ||
|
||
## Requirements | ||
|
||
`python 3.9+`, `poetry`, `docker-compose` | ||
|
||
### Usage | ||
|
||
First run the local cluster: | ||
|
||
```bash | ||
./scripts/cluster/start | ||
``` | ||
|
||
Second, you need to install the project dependencies dependencies. In a different terminal execute: | ||
|
||
```bash | ||
poetry install | ||
``` | ||
|
||
Then we can run the project | ||
|
||
```bash | ||
poetry run app | ||
``` | ||
|
||
You should see something similar to the following logs: | ||
|
||
```bash | ||
kstreams/examples/transactions via 🐳 colima is 📦 v0.1.0 via 🐍 v3.12.4 | ||
❯ poetry run app | ||
|
||
02/14/2025 04:40:40 PM Updating subscribed topics to: frozenset({'local--kstreams-transactional'}) | ||
02/14/2025 04:40:40 PM Subscribed to topic(s): {'local--kstreams-transactional'} | ||
02/14/2025 04:40:40 PM Updating subscribed topics to: frozenset({'local--kstreams-json'}) | ||
02/14/2025 04:40:40 PM Subscribed to topic(s): {'local--kstreams-json'} | ||
02/14/2025 04:40:40 PM Starting Prometheus Monitoring started... | ||
02/14/2025 04:40:40 PM Discovered coordinator 1 for group my-group-json-data | ||
02/14/2025 04:40:40 PM Revoking previously assigned partitions set() for group my-group-json-data | ||
02/14/2025 04:40:40 PM (Re-)joining group my-group-json-data | ||
02/14/2025 04:40:40 PM Discovered coordinator 2 for group my-group-raw-data | ||
02/14/2025 04:40:40 PM Revoking previously assigned partitions set() for group my-group-raw-data | ||
02/14/2025 04:40:40 PM (Re-)joining group my-group-raw-data | ||
02/14/2025 04:40:40 PM Joined group 'my-group-json-data' (generation 23) with member_id aiokafka-0.12.0-1d2163ac-810d-4028-be96-0741e0589752 | ||
02/14/2025 04:40:40 PM Elected group leader -- performing partition assignments using roundrobin | ||
02/14/2025 04:40:40 PM Joined group 'my-group-raw-data' (generation 23) with member_id aiokafka-0.12.0-fce394e0-f09e-4bfb-8133-eef590c52d7a | ||
02/14/2025 04:40:40 PM Elected group leader -- performing partition assignments using roundrobin | ||
02/14/2025 04:40:40 PM Successfully synced group my-group-json-data with generation 23 | ||
02/14/2025 04:40:40 PM Setting newly assigned partitions {TopicPartition(topic='local--kstreams-json', partition=0)} for group my-group-json-data | ||
02/14/2025 04:40:40 PM Successfully synced group my-group-raw-data with generation 23 | ||
02/14/2025 04:40:40 PM Setting newly assigned partitions {TopicPartition(topic='local--kstreams-transactional', partition=0)} for group my-group-raw-data | ||
02/14/2025 04:40:43 PM Message sent to topic local--kstreams-json | ||
|
||
|
||
02/14/2025 04:40:43 PM Json Event consumed with offset: 25, value: {'message': 'Hello world!'} | ||
|
||
|
||
02/14/2025 04:40:43 PM Discovered coordinator 1 for transactional id my-transaction-id-b213d4a1-7ee8-43dd-a121-8d5305391997 | ||
02/14/2025 04:40:43 PM Discovered coordinator 1 for group id my-group-json-data | ||
02/14/2025 04:40:43 PM Commiting offsets {TopicPartition(topic='local--kstreams-json', partition=0): 26} for group my-group-json-data | ||
02/14/2025 04:40:43 PM Message sent inside transaction with metadata: RecordMetadata(topic='local--kstreams-transactional', partition=0, topic_partition=TopicPartition(topic='local--kstreams-transactional', partition=0), offset=50, timestamp=1739547643468, timestamp_type=0, log_start_offset=0) | ||
Error in transaction: None | ||
02/14/2025 04:40:43 PM Transaction with id my-transaction-id-b213d4a1-7ee8-43dd-a121-8d5305391997 committed | ||
02/14/2025 04:40:43 PM Event consumed from topic local--kstreams-transactional with value: b'Transaction id my-transaction-id-b213d4a1-7ee8-43dd-a121-8d5305391997 from argument in coroutine \n' | ||
|
||
|
||
02/14/2025 04:40:46 PM Message sent to topic local--kstreams-json | ||
|
||
|
||
02/14/2025 04:40:46 PM Json Event consumed with offset: 26, value: {'message': 'Hello world!'} | ||
|
||
|
||
02/14/2025 04:40:46 PM Discovered coordinator 3 for transactional id my-transaction-id-26cd219e-cb2c-4c0f-93b1-34682c5a8a43 | ||
02/14/2025 04:40:46 PM Discovered coordinator 1 for group id my-group-json-data | ||
02/14/2025 04:40:46 PM Commiting offsets {TopicPartition(topic='local--kstreams-json', partition=0): 27} for group my-group-json-data | ||
02/14/2025 04:40:46 PM Message sent inside transaction with metadata: RecordMetadata(topic='local--kstreams-transactional', partition=0, topic_partition=TopicPartition(topic='local--kstreams-transactional', partition=0), offset=52, timestamp=1739547646464, timestamp_type=0, log_start_offset=0) | ||
Error in transaction: None | ||
02/14/2025 04:40:46 PM Transaction with id my-transaction-id-26cd219e-cb2c-4c0f-93b1-34682c5a8a43 committed | ||
02/14/2025 04:40:46 PM Event consumed from topic local--kstreams-transactional with value: b'Transaction id my-transaction-id-26cd219e-cb2c-4c0f-93b1-34682c5a8a43 from argument in coroutine \n' | ||
|
||
|
||
02/14/2025 04:40:49 PM Message sent to topic local--kstreams-json | ||
|
||
|
||
02/14/2025 04:40:49 PM Json Event consumed with offset: 27, value: {'message': 'Hello world!'} | ||
|
||
|
||
02/14/2025 04:40:49 PM Discovered coordinator 2 for transactional id my-transaction-id-e2df5b6a-2fd9-4c14-ac98-d719ad627eb6 | ||
02/14/2025 04:40:49 PM Discovered coordinator 1 for group id my-group-json-data | ||
02/14/2025 04:40:49 PM Commiting offsets {TopicPartition(topic='local--kstreams-json', partition=0): 28} for group my-group-json-data | ||
02/14/2025 04:40:49 PM Message sent inside transaction with metadata: RecordMetadata(topic='local--kstreams-transactional', partition=0, topic_partition=TopicPartition(topic='local--kstreams-transactional', partition=0), offset=54, timestamp=1739547649480, timestamp_type=0, log_start_offset=0) | ||
Error in transaction: None | ||
02/14/2025 04:40:49 PM Transaction with id my-transaction-id-e2df5b6a-2fd9-4c14-ac98-d719ad627eb6 committed | ||
02/14/2025 04:40:49 PM Event consumed from topic local--kstreams-transactional with value: b'Transaction id my-transaction-id-e2df5b6a-2fd9-4c14-ac98-d719ad627eb6 from argument in coroutine \n' | ||
``` | ||
|
||
## Note | ||
|
||
If you plan on using this example, pay attention to the `pyproject.toml` dependencies, where | ||
`kstreams` is pointing to the parent folder. You will have to set the latest version. |
Oops, something went wrong.