Skip to content

Commit

Permalink
docs: patch getting-started example (#318)
Browse files Browse the repository at this point in the history
* docs: added imports and req argument

* docs: more imports

* Update docs/source/getstarted.rst

---------

Co-authored-by: Markus Unterwaditzer <[email protected]>
  • Loading branch information
mj0nez and untitaker authored Dec 27, 2023
1 parent 0b3be1b commit 0bf12df
Showing 1 changed file with 20 additions and 0 deletions.
20 changes: 20 additions & 0 deletions docs/source/getstarted.rst
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,18 @@ Here we are using the `RunTask` strategy which runs a custom function over each

.. code-block:: Python
from typing import Mapping
from arroyo.backends.kafka import KafkaPayload
from arroyo.processing.strategies import (
CommitOffsets,
ProcessingStrategy,
ProcessingStrategyFactory,
RunTask,
)
from arroyo.types import Commit, Message, Partition, Topic
def handle_message(message: Message[KafkaPayload]) -> Message[KafkaPayload]:
print(f"MSG: {message.payload}")
return message
Expand All @@ -174,10 +186,14 @@ The code above is orchestrated by the Arroyo runtime called `StreamProcessor`.

.. code-block:: Python
from arroyo.processing import StreamProcessor
from arroyo.commit import ONCE_PER_SECOND
processor = StreamProcessor(
consumer=consumer,
topic=TOPIC,
processor_factory=ConsumerStrategyFactory(),
commit_policy=ONCE_PER_SECOND,
)
processor.run()
Expand All @@ -192,6 +208,10 @@ Now we will chain the `Produce` strategy to produce messages on a second topic a

.. code-block:: Python
from arroyo.backends.kafka import KafkaProducer
from arroyo.backends.kafka.configuration import build_kafka_configuration
from arroyo.processing.strategies import Produce
class ConsumerStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
"""
The factory manages the lifecycle of the `ProcessingStrategy`.
Expand Down

0 comments on commit 0bf12df

Please sign in to comment.