-
-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
0 parents
commit 9eabbf1
Showing
94 changed files
with
9,218 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
# Sphinx build info version 1 | ||
# This file hashes the configuration used when building these files. When it is not found, a full rebuild will be done. | ||
config: 9db9da0647afe63ce71f2b065b7306e5 | ||
tags: 645f666f9bcd5a90fca523b33c5a78b7 |
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Empty file.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
=================== | ||
Arroyo Architecture | ||
=================== | ||
|
||
Arroyo is a set of high level abstractions to interact with Kafka. | ||
These are meant to help the developer in writing performant consumers with | ||
specific delivery guarantees. | ||
|
||
Common problems addressed by Arroyo are guaranteeing at-least-once delivery, | ||
providing a dead letter queue abstraction, support parallel (multi-processing) | ||
message processing, etc. | ||
|
||
The library is divided into three layers: the basic Kafka connectivity, the | ||
streaming engine and the high level abstractions. | ||
|
||
The basic connectivity layer is a simple wrapper around the Confluent python | ||
library, which is itself based on librdkafka. Besides some cosmetic changes, | ||
this level provides a Fake in memory broker and consumer to make unit test quick | ||
to run. | ||
|
||
The streaming engine provides an asynchronous processing interface to write | ||
consumers. The consumer is written as a pipeline where each segment is an | ||
asynchronous operation. The streaming engine implements the main consumer loop | ||
and delegates the processing to the pipeline. | ||
|
||
On top of the streaming engine, the library provides high-level abstractions that | ||
are common when writing Kafka consumers like: *map*, *reduce*, *filter* together | ||
with some common messaging application patterns like the dead letter queue. | ||
|
||
Streaming Interface and Streaming Engine | ||
---------------------------------------- | ||
|
||
A Kafka consumer is built as a pipeline where each segment processes messages in | ||
an asynchronous way. The Streaming engine provides a message to a segment. The | ||
segment is not supposed to execute small CPU work in a blocking way or do IO in a | ||
non-blocking way. We generally use futures for this, and heavier CPU work in a | ||
separate process. | ||
|
||
Arroyo provides an interface to implement to write a pipeline segment. | ||
The segment interface is called *ProcessingStrategy* and is in | ||
`this module <https://github.com/getsentry/arroyo/blob/main/arroyo/processing/strategies/abstract.py>`_. | ||
(TODO: bring the docstrings to the docs and reference that). | ||
|
||
In most cases, when developing a consumer, the developer would not implement | ||
that interface directly. A higher level abstraction would be used. | ||
|
||
.. figure:: _static/diagrams/arroyo_processing.png | ||
|
||
The main consumer loop is managed by the `stream engine <https://github.com/getsentry/arroyo/blob/main/arroyo/processing/processor.py>`_. | ||
These are the phases: | ||
|
||
* Poll from the Kafka consumer through the basic library. If a message is there | ||
proceed or repeat. | ||
|
||
* Submit the message to the first *ProcessingStrategy*. This is supposed to deliver | ||
work for the strategy to do. It is not supposed to be a blocking operation. The | ||
strategy should return immediately. | ||
|
||
* Poll the strategy to execute work or to forward results to the following step | ||
in the pipeline. Ideally all IO should be done in separate threads and heavy cpu | ||
work should be done in separate processes so the *poll* method should check for | ||
completed work, dispatch to the next step and return. In practice, work is executed | ||
here in a blocking way if the overhead of offloading the work is too high. | ||
|
||
The *ProcessingStrategy* may decide not to take the message and instead apply back-pressure. | ||
This is done by raising the *MessageRejected* exception. In this case, the streaming | ||
engine pauses the consumer till the strategy is ready to accept the message. | ||
|
||
The *ProcessingStrategy* decides when it is time to commit a message. This is done | ||
through a commit callback provided to the strategy when it is instantiated. | ||
|
||
The streaming engine orchestrates the life cycle of the *ProcessingStrategy*, thus | ||
when it thinks it is time to shut the strategy down it would wait for all in-flight | ||
work to be completed and then destroy the strategy. | ||
|
||
There are two scenarios where this can happen: | ||
|
||
* The consumer is being terminated. | ||
* A rebalancing happened. A rebalancing revokes partitions and assigns new ones. | ||
After a rebalancing is complete it is impossible to commit a message from a partition | ||
that was revoked. In order to ensure the consumer behaves in a consistent way, | ||
upon rebalancing, the streaming engine destroys the strategy and builds a new one. | ||
This allows the strategy to complete all in-flight work before being terminated. | ||
|
||
High level strategies | ||
----------------------- | ||
|
||
Most consumers follow the same few patterns, so Arroyo provides abstractions that | ||
are based on the *ProcessingStrategy* but are simpler to implement for the common | ||
use cases. | ||
|
||
Common examples are: | ||
|
||
* ``run task, run task in threads, run task with multiprocessing``. The run task | ||
set of strategies are designed to be the most flexible and simple to use. They take | ||
a function provided by the user and execute it on every message, passing the output | ||
to the next step. The library includes synchronous and asynchronous versions depending | ||
on the kind of concurrency required by the user. | ||
|
||
* ``filter, map and forward``. This type of consumer inspects a message, decides | ||
whether to process it or discard it, transforms its content, and produces the result | ||
on a new topic. In this case, Arroyo provides three implementations of the | ||
*ProcessingStrategy*: *filter*, *transform*, and *produce*. The developer only needs | ||
to wire them together and provide the map and filtering logic. | ||
|
||
* ``consume, apply side effects, produce``. This is a variation of the one above. | ||
In this case, the transform operation can have side-effects like storing the content | ||
of the message somewhere. | ||
|
||
* ``high throughput cpu intensive transform``. The python GIL does not allow CPU intensive | ||
work to take advantage of parallelism. Arroyo provides an implementation of the *map* | ||
pattern that batches messages and dispatches the work to separate processes via shared | ||
memory. This is largely transparent to the developers. | ||
|
||
* ``map, reduce and store``. The reduce function is carried out by the *Collector*, which | ||
batches messages and executes some logic with side-effects when the batch is full. | ||
This is a typical way to write messages on a storages in batches to reduce the | ||
round trips. | ||
|
||
All strategies included with Arroyo are in `the strategies module <https://github.com/getsentry/arroyo/tree/main/arroyo/processing/strategies>`_. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
Backpressure | ||
============ | ||
|
||
.. py:currentmodule:: arroyo.processing.strategies | ||
Arroyo's own processing strategies internally apply backpressure by raising | ||
:py:class:`~abstract.MessageRejected`. Most | ||
consumers do not require additional work to deal with backpressure correctly. | ||
|
||
If you want to slow down the consumer based on some external signal or | ||
condition, you can achieve that most effectively by raising the same exception | ||
from within a callback passed to :py:class:`~run_task.RunTask` while the | ||
consumer is supposed to be paused | ||
|
||
.. code-block:: Python | ||
class ConsumerStrategyFactory(ProcessingStrategyFactory[KafkaPayload]): | ||
def __init__(self): | ||
self.is_paused = False | ||
def create_with_partitions( | ||
self, | ||
commit: Commit, | ||
partitions: Mapping[Partition, int], | ||
) -> ProcessingStrategy[KafkaPayload]: | ||
def handle_message(message: Message[KafkaPayload]) -> Message[KafkaPayload]: | ||
if self.is_paused: | ||
raise MessageRejected() | ||
print(f"MSG: {message.payload}") | ||
return message | ||
return RunTask(handle_message, CommitOffsets(commit)) | ||
It is not recommended to apply backpressure by just ``sleep()``-ing in | ||
:py:class:`~abstract.ProcessingStrategy.submit` (or, in this example, | ||
``handle_message``) for more than a few milliseconds. While this definitely | ||
pauses the consumer, it will block the main thread for too long and and prevent | ||
things like consumer rebalancing from occuring. | ||
|
||
A 0.01 second sleep is applied each time :py:class:`~abstract.MessageRejected` is | ||
raised to prevent the main thread spinning at 100% CPU. However background thread | ||
performance may be impacted during this time. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
================== | ||
Dead letter queues | ||
================== | ||
|
||
.. warning:: | ||
Dead letter queues should be used with caution as they break some of the ordering guarantees | ||
otherwise offered by Arroyo and Kafka consumer code. In particular, it must be safe for the | ||
consumer to drop a message. If replaying or later re-processing of the DLQ'ed messages is done, | ||
it is critical that ordering is not a requirement in the relevant downstream code. | ||
|
||
Arroyo provides support for routing invalid messages to dead letter queues in consumers. | ||
Dead letter queues are critical in some applications because messages are ordered in Kafka | ||
and a single invalid message can cause a consumer to crash and every subsequent message to | ||
not be processed. | ||
|
||
The dead letter queue configuration is passed to the `StreamProcessor` and, if provided, any | ||
`InvalidMessage` raise by a strategy will be produced to the dead letter queue. | ||
|
||
|
||
.. automodule:: arroyo.dlq | ||
:members: InvalidMessage, DlqLimit, DlqPolicy, DlqProducer, KafkaDlqProducer, NoopDlqProducer |
Oops, something went wrong.