Skip to content

Commit

Permalink
Merge pull request #72 from kpn/fix/add-end-offsets-to-consumer-test-…
Browse files Browse the repository at this point in the history
…client

fix(test_clients.py): adds end_offsets to consumer test client
  • Loading branch information
reidmeyer authored Oct 13, 2022
2 parents faed11f + 4d58d77 commit f603215
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 1 deletion.
12 changes: 12 additions & 0 deletions kstreams/test_utils/test_clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,18 @@ async def commit(self, offsets: Optional[Dict[TopicPartition, int]] = None) -> N
async def committed(self, topic_partition: TopicPartition) -> Optional[int]:
return self.partitions_committed.get(topic_partition)

async def end_offsets(
self, partitions: List[TopicPartition]
) -> Dict[TopicPartition, int]:
topic = TopicManager.get(partitions[0].topic)
end_offsets = {
topic_partition: topic.get_total_partition_events(
partition=topic_partition.partition
)
for topic_partition in partitions
}
return end_offsets

def partitions_for_topic(self, topic: str) -> Set:
"""
Return the partitions of all assigned topics. The `topic` argument is not used
Expand Down
35 changes: 34 additions & 1 deletion tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,47 @@ async def consume(stream):

async with client:
# produce to events and consume only one in the client context
await client.send(topic_name, value=value, key=key)
await client.send(topic_name, value=value, key=key, partition=0)
await client.send(topic_name, value=value, key=key, partition=2)
await client.send(topic_name, value=value, key=key, partition=10)

stream = stream_engine.get_stream("my-stream")
assert stream.consumer.partitions_for_topic(topic_name) == set([0, 2, 10])


@pytest.mark.asyncio
async def test_end_offsets(stream_engine: StreamEngine):
topic_name = "local--kstreams"
value = b'{"message": "Hello world!"}'
key = "1"
client = TestStreamClient(stream_engine)

@stream_engine.stream(topic_name, name="my-stream")
async def consume(stream):
async for cr in stream:
...

async with client:
# produce to events and consume only one in the client context
await client.send(topic_name, value=value, key=key, partition=0)
await client.send(topic_name, value=value, key=key, partition=0)
await client.send(topic_name, value=value, key=key, partition=2)
await client.send(topic_name, value=value, key=key, partition=10)

topic_partitions = [
structs.TopicPartition(topic_name, 0),
structs.TopicPartition(topic_name, 2),
structs.TopicPartition(topic_name, 10),
]

stream = stream_engine.get_stream("my-stream")
assert (await stream.consumer.end_offsets(topic_partitions)) == {
structs.TopicPartition(topic="local--kstreams", partition=0): 2,
structs.TopicPartition(topic="local--kstreams", partition=2): 1,
structs.TopicPartition(topic="local--kstreams", partition=10): 1,
}


@pytest.mark.asyncio
async def test_consumer_commit(stream_engine: StreamEngine):
topic_name = "local--kstreams-consumer-commit"
Expand Down

0 comments on commit f603215

Please sign in to comment.