Skip to content

Commit

Permalink
feat(transactions): Transaction added to garantee only once paradigm.…
Browse files Browse the repository at this point in the history
… Related to #265
  • Loading branch information
marcosschroh committed Mar 4, 2025
1 parent d6b2a51 commit 721b6d0
Show file tree
Hide file tree
Showing 31 changed files with 1,821 additions and 49 deletions.
231 changes: 231 additions & 0 deletions docs/transactions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
`Kafka 0.11.0` includes support for `idempotent` and `transactional` capabilities in the `producer`. Idempotent delivery ensures that messages are delivered `exactly once`
to a particular topic partition during the lifetime of a `single producer`. Transactional delivery allows producers to send data to multiple partitions such that either
all messages are successfully delivered, or none of them are. Together, these capabilities enable `*exactly once semantics*` in Kafka.

It is important to notice that:

- `Transaction` always start from a `send` (producer)
- Events sent to one or more topics will only be visible on consumers after the transaction is committed.
- To use transactions, a `transaction id` (unique id per transaction) must be set prior an event is sent. If you do not provide one `kstreams` will auto generate one for you.
- Transaction state is stored in a new internal topic `__transaction_state`. This topic is not created until the the first attempt to use a transactional request API. There are several settings to control the topic's configuration.
- `Topics` which are included in transactions should be configured for durability. In particular, the `replication.factor` should be at least `3`, and the `min.insync.replicas` for these topics should be set to `2`
- `Transactions` always add overhead, meaning that more effort is needed to produce events and the consumer have to apply filters
For example, `transaction.state.log.min.isr` controls the minimum ISR for this topic.
- `Streams` (consumers) will have to filter or not transactional events

```plantuml
@startuml
!theme crt-amber
!include https://raw.githubusercontent.com/plantuml-stdlib/C4-PlantUML/master/C4_Context.puml
agent "Producer" as producer
agent "Stream A" as stream_a
agent "Stream B" as stream_b
queue "Topic A" as topic_a
producer -> topic_a: "produce with a transaction"
topic_a --> stream_a: "consume only transactional events"
topic_a --> stream_b: "consume all events"
@enduml
```

## Transaction State

A transaction have state, which is store in the special topic `__transaction_state`. This topic can be tuned in different ways, but it is outside of this guide scope.
The only important detailt that you must be aware of is that a `transaction` can be `commited` or `aborted`. You will not manage the internal states but you are responsible
of filtering or not transactions on the `stream` side.

```plantuml
@startuml
!theme crt-amber
!include https://raw.githubusercontent.com/plantuml-stdlib/C4-PlantUML/master/C4_Context.puml
[*] --> Init
Init : Initialized with a `transaction_id`
Init -> Commited
Commited --> [*]
Init -> Aborted
Aborted --> [*]
@enduml
```

## Usage

From the `kstreams` point of view, the `transaction pattern` is a `context manager` that will `start` a transaction and then `commit` or `aboort` it. Always a `transaction` starts when an event is send. Once that we have the `context` we can send events in two ways:

Using the `StreamEngine` directly:

```python
from kstreams import create_engine
from kstreams.backends import Kafka

from .serializers import JsonSerializer

stream_engine = create_engine(
title="transaction-engine",
serializer=JsonSerializer(),
backend=Kafka(
bootstrap_servers=["localhost:9091", "localhost:9092", "localhost:9093"],
),
)


transaction_id = "my-transaction-id-" + str(uuid.uuid4())
async with stream_engine.transaction(transaction_id=transaction_id) as t:
await t.send(
"transactional_topic",
value=f"Transaction id {transaction_id} from argument in coroutine \n".encode(),
)
```

or using the annotation `Transactional`:

```python title="Transaction inside a stream"
@stream_engine.stream("a-topic", group_id="my-group")
async def stream_a(cr: ConsumerRecord, transactional: Transactional):
transaction_id = "my-transaction-id-" + str(uuid.uuid4())
async with transactional(transaction_id=transaction_id) as t:
await t.send(
"transactional_topic",
value=f"Transaction id {transaction_id} from argument in coroutine \n".encode(),
)
```

Once the `context` is left then the transaction is `commit` or `aborted`.

## Commiting offsets

With transactions it is possible to `commit` consumer offsets. This is quite important feature as it will guarantee that only a consumer offset is commit once the whole transaction as has been commited, including sending events to other topics.
To commit consumer offsets we need the `offsetes` and the `group_id` that we want to commit to. Example:

```python
@stream_engine.stream("a-topic", group_id="my-group", enable_auto_commit=False)
async def stream_a(cr: ConsumerRecord, transactional: Transactional):
transaction_id = "my-transaction-id-" + str(uuid.uuid4())
async with transactional(transaction_id=transaction_id) as t:
metadata = await t.send(
transactional_topic,
value=f"Transaction id {transaction_id} from argument in coroutine \n".encode(),
)

tp = TopicPartition(topic=cr.topic, partition=cr.partition)
await t.commit_offsets(offsets={tp: cr.offset + 1}, group_id="my-group")
logger.info(f"Message sent inside transaction with metadata: {metadata}")
```

!!! note
- The property `enable_auto_commit` must be set to `False`
- When commiting offsets the `group_id` must be the same as the `consumer group_id`
- Offsets is a dictionary of TopicPartition: offset
- If the `transaction` is `aborted` then the commit offset has NOT effect

## Reading committed events

It is the `stream` (consumer) responsability to filter commited events. The property `isolation_level="read_committed"` must be used. If the `isolation_level` is NOT set to `read_committed` then events that were sent with an `aborted` transaction are also consumed

```plantuml
@startuml
!theme crt-amber
!include https://raw.githubusercontent.com/plantuml-stdlib/C4-PlantUML/master/C4_Context.puml
agent "Stream A" as stream_a
agent "Stream B" as stream_b
queue "Topic A" as topic_a
queue "Topic B" as topic_b
topic_a -> stream_a: "consume all events"
stream_a --> topic_b: "produce with a transaction and commit offsets"
topic_b -> stream_b: "filter commited events"
@enduml
```

The above diagram implementation is the following:

```python
@stream(
"transactional_topic",
group_id="another-group",
enable_auto_commit=False,
isolation_level="read_committed", # <-- This will filter aborted txn's
)
async def stream_b(cr: ConsumerRecord, stream: Stream):
logger.info(
f"Event consumed from topic {transactional_topic} with value: {cr.value} \n\n"
)
await stream.commit()


@stream_engine.stream("a-topic", group_id="my-group")
async def stream_a(cr: ConsumerRecord, transactional: Transactional):
transaction_id = "my-transaction-id-" + str(uuid.uuid4())
async with transactional(transaction_id=transaction_id) as t:
metadata = await t.send(
"transactional_topic",
value=f"Transaction id {transaction_id} from argument in coroutine \n".encode(),
)

tp = TopicPartition(topic=cr.topic, partition=cr.partition)
await t.commit_offsets(offsets={tp: cr.offset + 1}, group_id="my-group")
logger.info(f"Message sent inside transaction with metadata: {metadata}")
```

## Aborted transactions

A transaction can be aborted, for example when we have a bug in our code. Consired the following example in which an error is `raised`:

```python
@stream_engine.stream("a-topic", group_id="my-group")
async def consume_json(cr: ConsumerRecord, transactional: Transactional):
transaction_id = "my-transaction-id-" + str(uuid.uuid4())
async with transactional(transaction_id=transaction_id) as t:
metadata = await t.send(
transactional_topic,
value=f"Transaction id {transaction_id} from argument in coroutine \n".encode(),
serializer=None,
)

tp = TopicPartition(topic=cr.topic, partition=cr.partition)
await t.commit_offsets(
offsets={tp: cr.offset + 1}, group_id="my-group-json-data"
)

raise ValueError("This is a test error")
```

The code will crash when the line with `raise ValueError` is reached, causing to leave the context manager with an `aborted` transaction. Any error that ocuurs inside the `context manager` will `abort` the `transaction`.

It is also important to notice that:

- If the transaction is aborted then `commited offsets` has not effect
- Events are always sent regarless whether the transaction was aborted. The consumer is responsible to filter events

## Testing

## Example

A full transaction example with `kstreams` can be found in [transaction example](https://github.com/kpn/kstreams/tree/master/examples/transactions)

```plantuml
@startuml
!theme crt-amber
!include https://raw.githubusercontent.com/plantuml-stdlib/C4-PlantUML/master/C4_Context.puml
agent "Producer" as producer
agent "Stream A" as stream_a
agent "Stream B" as stream_b
queue "Topic A" as topic_a
queue "Topic B" as topic_b
producer -> topic_a: "produce"
topic_a <-- stream_a: "consume"
stream_a -> topic_b: "produce with transaction"
topic_b <-- stream_b: "consume read_committed events"
@enduml
```
103 changes: 103 additions & 0 deletions examples/transactions/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Kstreams Transaction example

This example shows how to use `kafka transactions` with `kstreams`. For this purpose we have to setup a local kafka with 3 `brokers` and `kafka-ui` to explore
the `topics offsets` and `groups lag`.

## Requirements

`python 3.9+`, `poetry`, `docker-compose`

### Usage

First run the local cluster:

```bash
./scripts/cluster/start
```

Second, you need to install the project dependencies dependencies. In a different terminal execute:

```bash
poetry install
```

Then we can run the project

```bash
poetry run app
```

You should see something similar to the following logs:

```bash
kstreams/examples/transactions via 🐳 colima is 📦 v0.1.0 via 🐍 v3.12.4
❯ poetry run app

02/14/2025 04:40:40 PM Updating subscribed topics to: frozenset({'local--kstreams-transactional'})
02/14/2025 04:40:40 PM Subscribed to topic(s): {'local--kstreams-transactional'}
02/14/2025 04:40:40 PM Updating subscribed topics to: frozenset({'local--kstreams-json'})
02/14/2025 04:40:40 PM Subscribed to topic(s): {'local--kstreams-json'}
02/14/2025 04:40:40 PM Starting Prometheus Monitoring started...
02/14/2025 04:40:40 PM Discovered coordinator 1 for group my-group-json-data
02/14/2025 04:40:40 PM Revoking previously assigned partitions set() for group my-group-json-data
02/14/2025 04:40:40 PM (Re-)joining group my-group-json-data
02/14/2025 04:40:40 PM Discovered coordinator 2 for group my-group-raw-data
02/14/2025 04:40:40 PM Revoking previously assigned partitions set() for group my-group-raw-data
02/14/2025 04:40:40 PM (Re-)joining group my-group-raw-data
02/14/2025 04:40:40 PM Joined group 'my-group-json-data' (generation 23) with member_id aiokafka-0.12.0-1d2163ac-810d-4028-be96-0741e0589752
02/14/2025 04:40:40 PM Elected group leader -- performing partition assignments using roundrobin
02/14/2025 04:40:40 PM Joined group 'my-group-raw-data' (generation 23) with member_id aiokafka-0.12.0-fce394e0-f09e-4bfb-8133-eef590c52d7a
02/14/2025 04:40:40 PM Elected group leader -- performing partition assignments using roundrobin
02/14/2025 04:40:40 PM Successfully synced group my-group-json-data with generation 23
02/14/2025 04:40:40 PM Setting newly assigned partitions {TopicPartition(topic='local--kstreams-json', partition=0)} for group my-group-json-data
02/14/2025 04:40:40 PM Successfully synced group my-group-raw-data with generation 23
02/14/2025 04:40:40 PM Setting newly assigned partitions {TopicPartition(topic='local--kstreams-transactional', partition=0)} for group my-group-raw-data
02/14/2025 04:40:43 PM Message sent to topic local--kstreams-json


02/14/2025 04:40:43 PM Json Event consumed with offset: 25, value: {'message': 'Hello world!'}


02/14/2025 04:40:43 PM Discovered coordinator 1 for transactional id my-transaction-id-b213d4a1-7ee8-43dd-a121-8d5305391997
02/14/2025 04:40:43 PM Discovered coordinator 1 for group id my-group-json-data
02/14/2025 04:40:43 PM Commiting offsets {TopicPartition(topic='local--kstreams-json', partition=0): 26} for group my-group-json-data
02/14/2025 04:40:43 PM Message sent inside transaction with metadata: RecordMetadata(topic='local--kstreams-transactional', partition=0, topic_partition=TopicPartition(topic='local--kstreams-transactional', partition=0), offset=50, timestamp=1739547643468, timestamp_type=0, log_start_offset=0)
Error in transaction: None
02/14/2025 04:40:43 PM Transaction with id my-transaction-id-b213d4a1-7ee8-43dd-a121-8d5305391997 committed
02/14/2025 04:40:43 PM Event consumed from topic local--kstreams-transactional with value: b'Transaction id my-transaction-id-b213d4a1-7ee8-43dd-a121-8d5305391997 from argument in coroutine \n'


02/14/2025 04:40:46 PM Message sent to topic local--kstreams-json


02/14/2025 04:40:46 PM Json Event consumed with offset: 26, value: {'message': 'Hello world!'}


02/14/2025 04:40:46 PM Discovered coordinator 3 for transactional id my-transaction-id-26cd219e-cb2c-4c0f-93b1-34682c5a8a43
02/14/2025 04:40:46 PM Discovered coordinator 1 for group id my-group-json-data
02/14/2025 04:40:46 PM Commiting offsets {TopicPartition(topic='local--kstreams-json', partition=0): 27} for group my-group-json-data
02/14/2025 04:40:46 PM Message sent inside transaction with metadata: RecordMetadata(topic='local--kstreams-transactional', partition=0, topic_partition=TopicPartition(topic='local--kstreams-transactional', partition=0), offset=52, timestamp=1739547646464, timestamp_type=0, log_start_offset=0)
Error in transaction: None
02/14/2025 04:40:46 PM Transaction with id my-transaction-id-26cd219e-cb2c-4c0f-93b1-34682c5a8a43 committed
02/14/2025 04:40:46 PM Event consumed from topic local--kstreams-transactional with value: b'Transaction id my-transaction-id-26cd219e-cb2c-4c0f-93b1-34682c5a8a43 from argument in coroutine \n'


02/14/2025 04:40:49 PM Message sent to topic local--kstreams-json


02/14/2025 04:40:49 PM Json Event consumed with offset: 27, value: {'message': 'Hello world!'}


02/14/2025 04:40:49 PM Discovered coordinator 2 for transactional id my-transaction-id-e2df5b6a-2fd9-4c14-ac98-d719ad627eb6
02/14/2025 04:40:49 PM Discovered coordinator 1 for group id my-group-json-data
02/14/2025 04:40:49 PM Commiting offsets {TopicPartition(topic='local--kstreams-json', partition=0): 28} for group my-group-json-data
02/14/2025 04:40:49 PM Message sent inside transaction with metadata: RecordMetadata(topic='local--kstreams-transactional', partition=0, topic_partition=TopicPartition(topic='local--kstreams-transactional', partition=0), offset=54, timestamp=1739547649480, timestamp_type=0, log_start_offset=0)
Error in transaction: None
02/14/2025 04:40:49 PM Transaction with id my-transaction-id-e2df5b6a-2fd9-4c14-ac98-d719ad627eb6 committed
02/14/2025 04:40:49 PM Event consumed from topic local--kstreams-transactional with value: b'Transaction id my-transaction-id-e2df5b6a-2fd9-4c14-ac98-d719ad627eb6 from argument in coroutine \n'
```

## Note

If you plan on using this example, pay attention to the `pyproject.toml` dependencies, where
`kstreams` is pointing to the parent folder. You will have to set the latest version.
Loading

0 comments on commit 721b6d0

Please sign in to comment.