diff --git a/docs/stream.md b/docs/stream.md index 02e230e3..b2defccc 100644 --- a/docs/stream.md +++ b/docs/stream.md @@ -79,6 +79,40 @@ if __name__ == "__main__": stream_engine.remove_stream(stream) ``` +### Starting the stream with initial offsets +If you want to start your consumption from certain offsets, you can include that in your stream instantiation. + +Use case: +This feature is useful if one wants to manage their own offsets, rather than committing consumed offsets to Kafka. +When an application manages its own offsets and tries to start a stream, we start the stream using the initial +offsets as defined in the database. + +If you try to seek on a partition or topic that is not assigned to your stream, the code will ignore the seek +and print out a warning. For example, if you have two consumers that are consuming from different partitions, +and you try to seek for all of the partitions on each consumer, each consumer will seek for the partitions +it has been assigned, and it will print out a warning log for the ones it was not assigned. + +If you try to seek on offsets that are not yet present on your partition, the consumer will revert to the auto_offset_reset +config. There will not be a warning, so be aware of this. + +Also be aware that when your application restarts, it most likely will trigger the initial_offsets again. +This means that setting intial_offsets to be a hardcoded number might not get the results you expect. + +```python title="Initial Offsets from Database" + +topic_name = "local--kstreams" +db_table = ExampleDatabase() +initial_offsets: [List[TopicPartitionOffset]] = [TopicPartitionOffset(topic=topic_name, partition=0, offset=db_table.offset)] + +stream = Stream( + topic_name, + name="my-stream" + func=stream, # coroutine or async generator + deserializer=MyDeserializer(), + initial_offsets=initial_offsets +) +``` + ## Stream crashing If your stream `crashes` for any reason, the event consumption will stop meaning that non event will be consumed from the `topic`. diff --git a/docs/test_client.md b/docs/test_client.md index e6a8c0bd..f0c0fa79 100644 --- a/docs/test_client.md +++ b/docs/test_client.md @@ -122,6 +122,7 @@ In some cases your stream will commit, in this situation checking the commited p ```python import pytest from kstreams.test_utils import TestStreamClient +from kstreams import TopicPartition from .example import produce, stream_engine @@ -130,7 +131,7 @@ value = b'{"message": "Hello world!"}' name = "my-stream" key = "1" partition = 2 -tp = structs.TopicPartition( +tp = TopicPartition( topic=topic_name, partition=partition, ) diff --git a/kstreams/__init__.py b/kstreams/__init__.py index e59965d4..d0c99132 100644 --- a/kstreams/__init__.py +++ b/kstreams/__init__.py @@ -1,9 +1,10 @@ -from aiokafka.structs import ConsumerRecord +from aiokafka.structs import ConsumerRecord, TopicPartition from .clients import Consumer, ConsumerType, Producer, ProducerType from .create import StreamEngine, create_engine from .prometheus.monitor import PrometheusMonitor, PrometheusMonitorType from .streams import Stream, stream +from .structs import TopicPartitionOffset __all__ = [ "Consumer", @@ -17,4 +18,6 @@ "Stream", "stream", "ConsumerRecord", + "TopicPartition", + "TopicPartitionOffset", ] diff --git a/kstreams/engine.py b/kstreams/engine.py index f63f2735..f2178c50 100644 --- a/kstreams/engine.py +++ b/kstreams/engine.py @@ -5,6 +5,8 @@ from aiokafka.structs import RecordMetadata +from kstreams.structs import TopicPartitionOffset + from .backends.kafka import Kafka from .clients import ConsumerType, ProducerType from .exceptions import DuplicateStreamException, EngineNotStartedException @@ -165,11 +167,16 @@ def stream( *, name: Optional[str] = None, deserializer: Optional[Deserializer] = None, + initial_offsets: Optional[List[TopicPartitionOffset]] = None, **kwargs, ) -> Callable[[StreamFunc], Stream]: def decorator(func: StreamFunc) -> Stream: stream_from_func = stream( - topics, name=name, deserializer=deserializer, **kwargs + topics, + name=name, + deserializer=deserializer, + initial_offsets=initial_offsets, + **kwargs, )(func) self.add_stream(stream_from_func) diff --git a/kstreams/streams.py b/kstreams/streams.py index 7e117f8f..4b53c609 100644 --- a/kstreams/streams.py +++ b/kstreams/streams.py @@ -11,13 +11,16 @@ Dict, List, Optional, + Set, Type, Union, ) -from aiokafka import errors, structs +from aiokafka import errors +from kstreams import ConsumerRecord, TopicPartition from kstreams.exceptions import BackendNotSet +from kstreams.structs import TopicPartitionOffset from .backends.kafka import Kafka from .clients import Consumer, ConsumerType @@ -38,6 +41,7 @@ def __init__( config: Optional[Dict] = None, model: Optional[Any] = None, deserializer: Optional[Deserializer] = None, + initial_offsets: Optional[List[TopicPartitionOffset]] = None, ) -> None: self.func = func self.backend = backend @@ -49,6 +53,7 @@ def __init__( self.model = model self.deserializer = deserializer self.running = False + self.initial_offsets = initial_offsets # aiokafka expects topic names as arguments, meaning that # can receive N topics -> N arguments, @@ -93,6 +98,8 @@ async def func_wrapper(func): await self.consumer.start() self.running = True + self._seek_to_initial_offsets() + if inspect.isasyncgen(func): return func else: @@ -101,6 +108,25 @@ async def func_wrapper(func): self._consumer_task = asyncio.create_task(func_wrapper(func)) return None + def _seek_to_initial_offsets(self): + assignments: Set[TopicPartition] = self.consumer.assignment() + if self.initial_offsets is not None: + topicPartitionOffset: TopicPartitionOffset + for topicPartitionOffset in self.initial_offsets: + tp = TopicPartition( + topic=topicPartitionOffset.topic, + partition=topicPartitionOffset.partition, + ) + if tp in assignments: + self.consumer.seek(partition=tp, offset=topicPartitionOffset.offset) + else: + logger.warning( + f"""You are attempting to seek on an TopicPartitionOffset that isn't in the + consumer assignments. The code will simply ignore the seek request + on this partition. {tp} is not in the partition assignment. + The partition assignment is {assignments}.""" + ) + async def __aenter__(self) -> AsyncGenerator: """ Start the kafka Consumer and return an async_gen so it can be iterated @@ -129,14 +155,14 @@ async def __aexit__(self, exc_type, exc, tb) -> None: def __aiter__(self): return self - async def __anext__(self) -> structs.ConsumerRecord: + async def __anext__(self) -> ConsumerRecord: # This will be used only with async generators if not self.running: await self.start() try: # value is a ConsumerRecord, which is a dataclass - consumer_record: structs.ConsumerRecord = ( + consumer_record: ConsumerRecord = ( await self.consumer.getone() # type: ignore ) @@ -158,6 +184,7 @@ def stream( *, name: Optional[str] = None, deserializer: Optional[Deserializer] = None, + initial_offsets: Optional[List[TopicPartitionOffset]] = None, **kwargs, ) -> Callable[[StreamFunc], Stream]: def decorator(func: StreamFunc) -> Stream: @@ -166,6 +193,7 @@ def decorator(func: StreamFunc) -> Stream: func=func, name=name, deserializer=deserializer, + initial_offsets=initial_offsets, config=kwargs, ) update_wrapper(s, func) diff --git a/kstreams/structs.py b/kstreams/structs.py new file mode 100644 index 00000000..1217f719 --- /dev/null +++ b/kstreams/structs.py @@ -0,0 +1,7 @@ +from typing import NamedTuple + + +class TopicPartitionOffset(NamedTuple): + topic: str + partition: int + offset: int diff --git a/kstreams/test_utils/structs.py b/kstreams/test_utils/structs.py index 29bb7a36..d37e3b9a 100644 --- a/kstreams/test_utils/structs.py +++ b/kstreams/test_utils/structs.py @@ -1,11 +1,6 @@ from typing import NamedTuple -class TopicPartition(NamedTuple): - topic: str - partition: int - - class RecordMetadata(NamedTuple): offset: int partition: int diff --git a/kstreams/test_utils/test_clients.py b/kstreams/test_utils/test_clients.py index f4f904a9..f59e203b 100644 --- a/kstreams/test_utils/test_clients.py +++ b/kstreams/test_utils/test_clients.py @@ -1,13 +1,12 @@ from datetime import datetime from typing import Any, Coroutine, Dict, List, Optional, Set, Tuple -from aiokafka.structs import ConsumerRecord - +from kstreams import ConsumerRecord, TopicPartition from kstreams.clients import Consumer, Producer from kstreams.serializers import Serializer from kstreams.types import Headers -from .structs import RecordMetadata, TopicPartition +from .structs import RecordMetadata from .topics import Topic, TopicManager diff --git a/tests/test_client.py b/tests/test_client.py index b2c6b79a..09820236 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -2,14 +2,13 @@ import pytest -from kstreams import StreamEngine +from kstreams import StreamEngine, TopicPartition from kstreams.streams import Stream from kstreams.test_utils import ( TestConsumer, TestProducer, TestStreamClient, TopicManager, - structs, ) @@ -60,7 +59,7 @@ async def test_streams_consume_events(stream_engine: StreamEngine): client = TestStreamClient(stream_engine) topic = "local--kstreams-consumer" event = b'{"message": "Hello world!"}' - tp = structs.TopicPartition(topic=topic, partition=0) + tp = TopicPartition(topic=topic, partition=0) save_to_db = Mock() @stream_engine.stream(topic, name="my-stream") @@ -187,16 +186,16 @@ async def consume(stream): await client.send(topic_name, value=value, key=key, partition=10) topic_partitions = [ - structs.TopicPartition(topic_name, 0), - structs.TopicPartition(topic_name, 2), - structs.TopicPartition(topic_name, 10), + TopicPartition(topic_name, 0), + TopicPartition(topic_name, 2), + TopicPartition(topic_name, 10), ] stream = stream_engine.get_stream("my-stream") assert (await stream.consumer.end_offsets(topic_partitions)) == { - structs.TopicPartition(topic="local--kstreams", partition=0): 2, - structs.TopicPartition(topic="local--kstreams", partition=2): 1, - structs.TopicPartition(topic="local--kstreams", partition=10): 1, + TopicPartition(topic="local--kstreams", partition=0): 2, + TopicPartition(topic="local--kstreams", partition=2): 1, + TopicPartition(topic="local--kstreams", partition=10): 1, } @@ -207,7 +206,7 @@ async def test_consumer_commit(stream_engine: StreamEngine): name = "my-stream" key = "1" partition = 2 - tp = structs.TopicPartition( + tp = TopicPartition( topic=topic_name, partition=partition, ) diff --git a/tests/test_stream_engine.py b/tests/test_stream_engine.py index e4c1d653..e8e566b6 100644 --- a/tests/test_stream_engine.py +++ b/tests/test_stream_engine.py @@ -1,12 +1,95 @@ import asyncio +from typing import Set from unittest import mock import pytest +from kstreams import TopicPartition from kstreams.clients import Consumer, Producer from kstreams.engine import Stream, StreamEngine from kstreams.exceptions import DuplicateStreamException, EngineNotStartedException from kstreams.streams import stream +from kstreams.structs import TopicPartitionOffset + + +@pytest.mark.asyncio +async def test_seek_to_initial_offsets_normal(stream_engine: StreamEngine): + with mock.patch("kstreams.clients.aiokafka.AIOKafkaConsumer.start"): + with mock.patch( + "kstreams.clients.aiokafka.AIOKafkaConsumer.assignment" + ) as assignment: + stream_name = "example_stream" + offset = 100 + partition = 100 + topic_name = "example_topic" + + assignments: Set[TopicPartition] = set() + assignments.add(TopicPartition(topic=topic_name, partition=partition)) + assignment.return_value = assignments + + @stream_engine.stream( + topic_name, + name=stream_name, + initial_offsets=[ + TopicPartitionOffset( + topic=topic_name, partition=partition, offset=offset + ) + ], + ) + async def consume(stream): + async for _ in stream: + ... + + stream = stream_engine.get_stream(stream_name) + with mock.patch( + "kstreams.clients.aiokafka.AIOKafkaConsumer.seek" + ) as mock_seek: + await stream.start() + mock_seek.assert_called_once_with( + partition=TopicPartition(topic=topic_name, partition=partition), + offset=offset, + ) + + +@pytest.mark.asyncio +async def test_seek_to_initial_offsets_ignores_wrong_input(stream_engine: StreamEngine): + with mock.patch("kstreams.clients.aiokafka.AIOKafkaConsumer.start"): + with mock.patch( + "kstreams.clients.aiokafka.AIOKafkaConsumer.assignment" + ) as assignment: + stream_name = "example_stream" + offset = 100 + partition = 100 + topic_name = "example_topic" + wrong_topic = "different_topic" + wrong_partition = 1 + + assignments: Set[TopicPartition] = set() + assignments.add(TopicPartition(topic=topic_name, partition=partition)) + assignment.return_value = assignments + + @stream_engine.stream( + topic_name, + name=stream_name, + initial_offsets=[ + TopicPartitionOffset( + topic=wrong_topic, partition=partition, offset=offset + ), + TopicPartitionOffset( + topic=topic_name, partition=wrong_partition, offset=offset + ), + ], + ) + async def consume(stream): + async for _ in stream: + ... + + stream = stream_engine.get_stream(stream_name) + with mock.patch( + "kstreams.clients.aiokafka.AIOKafkaConsumer.seek" + ) as mock_seek: + await stream.start() + mock_seek.assert_not_called() @pytest.mark.asyncio