From 0bf12df0aec8b13dd9d3fd84d1c54916019c7157 Mon Sep 17 00:00:00 2001 From: Marcel Johannesmann Date: Wed, 27 Dec 2023 14:57:38 +0100 Subject: [PATCH] docs: patch getting-started example (#318) * docs: added imports and req argument * docs: more imports * Update docs/source/getstarted.rst --------- Co-authored-by: Markus Unterwaditzer --- docs/source/getstarted.rst | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/docs/source/getstarted.rst b/docs/source/getstarted.rst index 1051739f..0af1cd3a 100644 --- a/docs/source/getstarted.rst +++ b/docs/source/getstarted.rst @@ -152,6 +152,18 @@ Here we are using the `RunTask` strategy which runs a custom function over each .. code-block:: Python + from typing import Mapping + + from arroyo.backends.kafka import KafkaPayload + from arroyo.processing.strategies import ( + CommitOffsets, + ProcessingStrategy, + ProcessingStrategyFactory, + RunTask, + ) + from arroyo.types import Commit, Message, Partition, Topic + + def handle_message(message: Message[KafkaPayload]) -> Message[KafkaPayload]: print(f"MSG: {message.payload}") return message @@ -174,10 +186,14 @@ The code above is orchestrated by the Arroyo runtime called `StreamProcessor`. .. code-block:: Python + from arroyo.processing import StreamProcessor + from arroyo.commit import ONCE_PER_SECOND + processor = StreamProcessor( consumer=consumer, topic=TOPIC, processor_factory=ConsumerStrategyFactory(), + commit_policy=ONCE_PER_SECOND, ) processor.run() @@ -192,6 +208,10 @@ Now we will chain the `Produce` strategy to produce messages on a second topic a .. code-block:: Python + from arroyo.backends.kafka import KafkaProducer + from arroyo.backends.kafka.configuration import build_kafka_configuration + from arroyo.processing.strategies import Produce + class ConsumerStrategyFactory(ProcessingStrategyFactory[KafkaPayload]): """ The factory manages the lifecycle of the `ProcessingStrategy`.