Skip to content

Commit

Permalink
Merge pull request #81 from kpn/new/initial_offsets
Browse files Browse the repository at this point in the history
feat: adds ability for stream to be initiated with initial offsets
  • Loading branch information
reidmeyer authored Nov 28, 2022
2 parents 79c5195 + a9e60c7 commit 6cfbe04
Show file tree
Hide file tree
Showing 10 changed files with 180 additions and 24 deletions.
34 changes: 34 additions & 0 deletions docs/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,40 @@ if __name__ == "__main__":
stream_engine.remove_stream(stream)
```

### Starting the stream with initial offsets
If you want to start your consumption from certain offsets, you can include that in your stream instantiation.

Use case:
This feature is useful if one wants to manage their own offsets, rather than committing consumed offsets to Kafka.
When an application manages its own offsets and tries to start a stream, we start the stream using the initial
offsets as defined in the database.

If you try to seek on a partition or topic that is not assigned to your stream, the code will ignore the seek
and print out a warning. For example, if you have two consumers that are consuming from different partitions,
and you try to seek for all of the partitions on each consumer, each consumer will seek for the partitions
it has been assigned, and it will print out a warning log for the ones it was not assigned.

If you try to seek on offsets that are not yet present on your partition, the consumer will revert to the auto_offset_reset
config. There will not be a warning, so be aware of this.

Also be aware that when your application restarts, it most likely will trigger the initial_offsets again.
This means that setting intial_offsets to be a hardcoded number might not get the results you expect.

```python title="Initial Offsets from Database"

topic_name = "local--kstreams"
db_table = ExampleDatabase()
initial_offsets: [List[TopicPartitionOffset]] = [TopicPartitionOffset(topic=topic_name, partition=0, offset=db_table.offset)]

stream = Stream(
topic_name,
name="my-stream"
func=stream, # coroutine or async generator
deserializer=MyDeserializer(),
initial_offsets=initial_offsets
)
```

## Stream crashing

If your stream `crashes` for any reason, the event consumption will stop meaning that non event will be consumed from the `topic`.
Expand Down
3 changes: 2 additions & 1 deletion docs/test_client.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ In some cases your stream will commit, in this situation checking the commited p
```python
import pytest
from kstreams.test_utils import TestStreamClient
from kstreams import TopicPartition

from .example import produce, stream_engine

Expand All @@ -130,7 +131,7 @@ value = b'{"message": "Hello world!"}'
name = "my-stream"
key = "1"
partition = 2
tp = structs.TopicPartition(
tp = TopicPartition(
topic=topic_name,
partition=partition,
)
Expand Down
5 changes: 4 additions & 1 deletion kstreams/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from aiokafka.structs import ConsumerRecord
from aiokafka.structs import ConsumerRecord, TopicPartition

from .clients import Consumer, ConsumerType, Producer, ProducerType
from .create import StreamEngine, create_engine
from .prometheus.monitor import PrometheusMonitor, PrometheusMonitorType
from .streams import Stream, stream
from .structs import TopicPartitionOffset

__all__ = [
"Consumer",
Expand All @@ -17,4 +18,6 @@
"Stream",
"stream",
"ConsumerRecord",
"TopicPartition",
"TopicPartitionOffset",
]
9 changes: 8 additions & 1 deletion kstreams/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

from aiokafka.structs import RecordMetadata

from kstreams.structs import TopicPartitionOffset

from .backends.kafka import Kafka
from .clients import ConsumerType, ProducerType
from .exceptions import DuplicateStreamException, EngineNotStartedException
Expand Down Expand Up @@ -165,11 +167,16 @@ def stream(
*,
name: Optional[str] = None,
deserializer: Optional[Deserializer] = None,
initial_offsets: Optional[List[TopicPartitionOffset]] = None,
**kwargs,
) -> Callable[[StreamFunc], Stream]:
def decorator(func: StreamFunc) -> Stream:
stream_from_func = stream(
topics, name=name, deserializer=deserializer, **kwargs
topics,
name=name,
deserializer=deserializer,
initial_offsets=initial_offsets,
**kwargs,
)(func)
self.add_stream(stream_from_func)

Expand Down
34 changes: 31 additions & 3 deletions kstreams/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@
Dict,
List,
Optional,
Set,
Type,
Union,
)

from aiokafka import errors, structs
from aiokafka import errors

from kstreams import ConsumerRecord, TopicPartition
from kstreams.exceptions import BackendNotSet
from kstreams.structs import TopicPartitionOffset

from .backends.kafka import Kafka
from .clients import Consumer, ConsumerType
Expand All @@ -38,6 +41,7 @@ def __init__(
config: Optional[Dict] = None,
model: Optional[Any] = None,
deserializer: Optional[Deserializer] = None,
initial_offsets: Optional[List[TopicPartitionOffset]] = None,
) -> None:
self.func = func
self.backend = backend
Expand All @@ -49,6 +53,7 @@ def __init__(
self.model = model
self.deserializer = deserializer
self.running = False
self.initial_offsets = initial_offsets

# aiokafka expects topic names as arguments, meaning that
# can receive N topics -> N arguments,
Expand Down Expand Up @@ -93,6 +98,8 @@ async def func_wrapper(func):
await self.consumer.start()
self.running = True

self._seek_to_initial_offsets()

if inspect.isasyncgen(func):
return func
else:
Expand All @@ -101,6 +108,25 @@ async def func_wrapper(func):
self._consumer_task = asyncio.create_task(func_wrapper(func))
return None

def _seek_to_initial_offsets(self):
assignments: Set[TopicPartition] = self.consumer.assignment()
if self.initial_offsets is not None:
topicPartitionOffset: TopicPartitionOffset
for topicPartitionOffset in self.initial_offsets:
tp = TopicPartition(
topic=topicPartitionOffset.topic,
partition=topicPartitionOffset.partition,
)
if tp in assignments:
self.consumer.seek(partition=tp, offset=topicPartitionOffset.offset)
else:
logger.warning(
f"""You are attempting to seek on an TopicPartitionOffset that isn't in the
consumer assignments. The code will simply ignore the seek request
on this partition. {tp} is not in the partition assignment.
The partition assignment is {assignments}."""
)

async def __aenter__(self) -> AsyncGenerator:
"""
Start the kafka Consumer and return an async_gen so it can be iterated
Expand Down Expand Up @@ -129,14 +155,14 @@ async def __aexit__(self, exc_type, exc, tb) -> None:
def __aiter__(self):
return self

async def __anext__(self) -> structs.ConsumerRecord:
async def __anext__(self) -> ConsumerRecord:
# This will be used only with async generators
if not self.running:
await self.start()

try:
# value is a ConsumerRecord, which is a dataclass
consumer_record: structs.ConsumerRecord = (
consumer_record: ConsumerRecord = (
await self.consumer.getone() # type: ignore
)

Expand All @@ -158,6 +184,7 @@ def stream(
*,
name: Optional[str] = None,
deserializer: Optional[Deserializer] = None,
initial_offsets: Optional[List[TopicPartitionOffset]] = None,
**kwargs,
) -> Callable[[StreamFunc], Stream]:
def decorator(func: StreamFunc) -> Stream:
Expand All @@ -166,6 +193,7 @@ def decorator(func: StreamFunc) -> Stream:
func=func,
name=name,
deserializer=deserializer,
initial_offsets=initial_offsets,
config=kwargs,
)
update_wrapper(s, func)
Expand Down
7 changes: 7 additions & 0 deletions kstreams/structs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from typing import NamedTuple


class TopicPartitionOffset(NamedTuple):
topic: str
partition: int
offset: int
5 changes: 0 additions & 5 deletions kstreams/test_utils/structs.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
from typing import NamedTuple


class TopicPartition(NamedTuple):
topic: str
partition: int


class RecordMetadata(NamedTuple):
offset: int
partition: int
Expand Down
5 changes: 2 additions & 3 deletions kstreams/test_utils/test_clients.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
from datetime import datetime
from typing import Any, Coroutine, Dict, List, Optional, Set, Tuple

from aiokafka.structs import ConsumerRecord

from kstreams import ConsumerRecord, TopicPartition
from kstreams.clients import Consumer, Producer
from kstreams.serializers import Serializer
from kstreams.types import Headers

from .structs import RecordMetadata, TopicPartition
from .structs import RecordMetadata
from .topics import Topic, TopicManager


Expand Down
19 changes: 9 additions & 10 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@

import pytest

from kstreams import StreamEngine
from kstreams import StreamEngine, TopicPartition
from kstreams.streams import Stream
from kstreams.test_utils import (
TestConsumer,
TestProducer,
TestStreamClient,
TopicManager,
structs,
)


Expand Down Expand Up @@ -60,7 +59,7 @@ async def test_streams_consume_events(stream_engine: StreamEngine):
client = TestStreamClient(stream_engine)
topic = "local--kstreams-consumer"
event = b'{"message": "Hello world!"}'
tp = structs.TopicPartition(topic=topic, partition=0)
tp = TopicPartition(topic=topic, partition=0)
save_to_db = Mock()

@stream_engine.stream(topic, name="my-stream")
Expand Down Expand Up @@ -187,16 +186,16 @@ async def consume(stream):
await client.send(topic_name, value=value, key=key, partition=10)

topic_partitions = [
structs.TopicPartition(topic_name, 0),
structs.TopicPartition(topic_name, 2),
structs.TopicPartition(topic_name, 10),
TopicPartition(topic_name, 0),
TopicPartition(topic_name, 2),
TopicPartition(topic_name, 10),
]

stream = stream_engine.get_stream("my-stream")
assert (await stream.consumer.end_offsets(topic_partitions)) == {
structs.TopicPartition(topic="local--kstreams", partition=0): 2,
structs.TopicPartition(topic="local--kstreams", partition=2): 1,
structs.TopicPartition(topic="local--kstreams", partition=10): 1,
TopicPartition(topic="local--kstreams", partition=0): 2,
TopicPartition(topic="local--kstreams", partition=2): 1,
TopicPartition(topic="local--kstreams", partition=10): 1,
}


Expand All @@ -207,7 +206,7 @@ async def test_consumer_commit(stream_engine: StreamEngine):
name = "my-stream"
key = "1"
partition = 2
tp = structs.TopicPartition(
tp = TopicPartition(
topic=topic_name,
partition=partition,
)
Expand Down
83 changes: 83 additions & 0 deletions tests/test_stream_engine.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,95 @@
import asyncio
from typing import Set
from unittest import mock

import pytest

from kstreams import TopicPartition
from kstreams.clients import Consumer, Producer
from kstreams.engine import Stream, StreamEngine
from kstreams.exceptions import DuplicateStreamException, EngineNotStartedException
from kstreams.streams import stream
from kstreams.structs import TopicPartitionOffset


@pytest.mark.asyncio
async def test_seek_to_initial_offsets_normal(stream_engine: StreamEngine):
with mock.patch("kstreams.clients.aiokafka.AIOKafkaConsumer.start"):
with mock.patch(
"kstreams.clients.aiokafka.AIOKafkaConsumer.assignment"
) as assignment:
stream_name = "example_stream"
offset = 100
partition = 100
topic_name = "example_topic"

assignments: Set[TopicPartition] = set()
assignments.add(TopicPartition(topic=topic_name, partition=partition))
assignment.return_value = assignments

@stream_engine.stream(
topic_name,
name=stream_name,
initial_offsets=[
TopicPartitionOffset(
topic=topic_name, partition=partition, offset=offset
)
],
)
async def consume(stream):
async for _ in stream:
...

stream = stream_engine.get_stream(stream_name)
with mock.patch(
"kstreams.clients.aiokafka.AIOKafkaConsumer.seek"
) as mock_seek:
await stream.start()
mock_seek.assert_called_once_with(
partition=TopicPartition(topic=topic_name, partition=partition),
offset=offset,
)


@pytest.mark.asyncio
async def test_seek_to_initial_offsets_ignores_wrong_input(stream_engine: StreamEngine):
with mock.patch("kstreams.clients.aiokafka.AIOKafkaConsumer.start"):
with mock.patch(
"kstreams.clients.aiokafka.AIOKafkaConsumer.assignment"
) as assignment:
stream_name = "example_stream"
offset = 100
partition = 100
topic_name = "example_topic"
wrong_topic = "different_topic"
wrong_partition = 1

assignments: Set[TopicPartition] = set()
assignments.add(TopicPartition(topic=topic_name, partition=partition))
assignment.return_value = assignments

@stream_engine.stream(
topic_name,
name=stream_name,
initial_offsets=[
TopicPartitionOffset(
topic=wrong_topic, partition=partition, offset=offset
),
TopicPartitionOffset(
topic=topic_name, partition=wrong_partition, offset=offset
),
],
)
async def consume(stream):
async for _ in stream:
...

stream = stream_engine.get_stream(stream_name)
with mock.patch(
"kstreams.clients.aiokafka.AIOKafkaConsumer.seek"
) as mock_seek:
await stream.start()
mock_seek.assert_not_called()


@pytest.mark.asyncio
Expand Down

0 comments on commit 6cfbe04

Please sign in to comment.