diff --git a/pr-preview/pr-155/engine/index.html b/pr-preview/pr-155/engine/index.html index 409dcd30..5829e1da 100644 --- a/pr-preview/pr-155/engine/index.html +++ b/pr-preview/pr-155/engine/index.html @@ -849,9 +849,7 @@
class StreamEngine:
+255
class StreamEngine:
"""
Attributes:
backend kstreams.backends.Kafka: Backend to connect. Default `Kafka`
@@ -893,7 +891,7 @@
consumer_class: typing.Type[ConsumerType],
producer_class: typing.Type[ProducerType],
monitor: PrometheusMonitor,
- middlewares: typing.List[MiddlewareFactory],
+ middlewares: typing.List[Middleware],
title: typing.Optional[str] = None,
deserializer: typing.Optional[Deserializer] = None,
serializer: typing.Optional[Serializer] = None,
@@ -1041,9 +1039,7 @@
def build_stream_middleware_stack(self, stream: Stream) -> NextMiddlewareCall:
udf_handler = UdfHandler(handler=stream.func, stream=stream)
stream.middlewares = (
- [MiddlewareFactory(ExceptionMiddleware)]
- + self.middlewares
- + stream.middlewares
+ [Middleware(ExceptionMiddleware)] + self.middlewares + stream.middlewares
)
next_call = udf_handler
@@ -1066,7 +1062,7 @@
deserializer: typing.Optional[Deserializer] = None,
initial_offsets: typing.Optional[typing.List[TopicPartitionOffset]] = None,
rebalance_listener: typing.Optional[RebalanceListener] = None,
- middlewares: typing.Optional[typing.List[MiddlewareFactory]] = None,
+ middlewares: typing.Optional[typing.List[Middleware]] = None,
**kwargs,
) -> typing.Callable[[StreamFunc], Stream]:
def decorator(func: StreamFunc) -> Stream:
diff --git a/pr-preview/pr-155/middleware/index.html b/pr-preview/pr-155/middleware/index.html
index 3dd27dc1..6e9e20e9 100644
--- a/pr-preview/pr-155/middleware/index.html
+++ b/pr-preview/pr-155/middleware/index.html
@@ -543,7 +543,7 @@
Middleware
Kstreams allows you to include middlewares for adding behavior to streams.
-A middleware is a callable
that works with every ConsumerRecord
(CR) before it is processed by a specific stream
. Also after the CR
has been handled Middlewares
also have access to the stream
, send
function.
+A middleware is a callable
that works with every ConsumerRecord
(CR) before and after it is processed by a specific stream
. Middlewares
also have access to the stream
, send
function.
- It takes each
CR
that arrives to a topic.
- It can then do something to the
CR
or run any needed code.
@@ -560,37 +560,27 @@ Middleware
-
- Bases: Protocol
Source code in kstreams/middleware/middleware.py
- 15
-16
-17
-18
-19
-20
-21
-22
-23
-24
-25
-26
-27
class Middleware(typing.Protocol):
+ 30
+31
+32
+33
+34
+35
+36
+37
+38
class Middleware:
def __init__(
- self,
- *,
- next_call: types.NextMiddlewareCall,
- send: types.Send,
- stream: "Stream",
- **kwargs: typing.Any,
+ self, middleware: typing.Type[MiddlewareProtocol], **kwargs: typing.Any
) -> None:
- ... # pragma: no cover
+ self.middleware = middleware
+ self.kwargs = kwargs
- async def __call__(self, cr: ConsumerRecord) -> typing.Any:
- ... # pragma: no cover
+ def __iter__(self) -> typing.Iterator:
+ return iter((self.middleware, self.kwargs))
@@ -647,7 +637,7 @@ Creating a middleware
from .engine import stream_engine
-middlewares = [middleware.MiddlewareFactory(ElasticMiddleware)]
+middlewares = [middleware.Middleware(ElasticMiddleware)]
@stream_engine.stream("kstreams-topic", middlewares=middlewares)
async def processor(cr: ConsumerRecord):
@@ -657,7 +647,7 @@ Creating a middleware
from kstreams import ConsumerRecord, middleware, create
-middlewares = [middleware.MiddlewareFactory(ElasticMiddleware)]
+middlewares = [middleware.Middleware(ElasticMiddleware)]
stream_engine = kstreams.create_engine(
title="my-stream-engine",
@@ -678,21 +668,12 @@ Creating a middleware
Adding extra configuration to middlewares
If you want to provide extra configuration to middleware you should override the init method, ensuring that is contains the keywargs next_call
, send
and stream
, then any remaining are optional keywargs
.
Let's consider that we want to send an event to a spcific topic when a ValueError
is raised inside a stream
(Dead Letter Queue)
-from kstreams import ConsumerRecord, types, Stream
+from kstreams import ConsumerRecord, types, Stream, middleware
-class DLQMiddleware:
- def __init__(
- self,
- *,
- next_call: types.NextMiddlewareCall,
- send: types.Send,
- stream: Stream,
- topic: str,
- ) -> None:
- self.next_call = next_call
- self.send = send
- self.stream = stream
+class DLQMiddleware(middleware.BaseMiddleware):
+ def __init__(self, *, topic: str, **kwargs) -> None:
+ super().__init__(**kwargs)
self.topic = topic
async def __call__(self, cr: ConsumerRecord):
@@ -704,7 +685,7 @@ Adding extra configuration to
# Create the middlewares
middlewares = [
- middleware.MiddlewareFactory(
+ middleware.Middleware(
DLQMiddleware, topic="kstreams-dlq-topic"
)
]
@@ -789,7 +770,22 @@ Middleware by default
- If middlewares are added to a
stream
and stream_engine
, then the middleware stack is build first with stream
middlewares and then with the stream_engines
middlewares. This means the first the stream
middlewares are evaluated first.
In the example we are adding three middelwares in the following order: DLQMiddleware
, ElasticMiddleware
, and S3Middleware
. The code chain execution will be:
-topic event
--> ExceptionMiddleware
--> DLQMiddleware
--> ElasticMiddleware
--> S3Middleware
--> processor
+sequenceDiagram
+ autonumber
+ ExceptionMiddleware->>DLQMiddleware:
+ Note left of ExceptionMiddleware: Event received
+ alt No Processing Error
+ DLQMiddleware->>ElasticMiddleware:
+ Note right of ElasticMiddleware: Store CR on Elastic
+ ElasticMiddleware->>S3Middleware:
+ Note right of S3Middleware: Store CR on S3
+ S3Middleware->>Stream:
+ Note right of Stream: CR processed
+ Stream-->>S3Middleware:
+ S3Middleware-->>ElasticMiddleware:
+ ElasticMiddleware-->>DLQMiddleware:
+ DLQMiddleware-->>ExceptionMiddleware:
+ end
Multiple middlewares examplefrom kstreams import ConsumerRecord, Stream, middleware
@@ -814,9 +810,9 @@ Middleware by default
middlewares = [
- middleware.MiddlewareFactory(DLQMiddleware),
- middleware.MiddlewareFactory(ElasticMiddleware),
- middleware.MiddlewareFactory(S3Middleware),
+ middleware.Middleware(DLQMiddleware),
+ middleware.Middleware(ElasticMiddleware),
+ middleware.Middleware(S3Middleware),
]
@@ -851,8 +847,8 @@ Executing Code after the CR w
middlewares = [
- middleware.MiddlewareFactory(DLQMiddleware),
- middleware.MiddlewareFactory(ElasticMiddleware),
+ middleware.Middleware(DLQMiddleware),
+ middleware.Middleware(ElasticMiddleware),
]
diff --git a/pr-preview/pr-155/sitemap.xml.gz b/pr-preview/pr-155/sitemap.xml.gz
index dc0a0a54..52d46e53 100644
Binary files a/pr-preview/pr-155/sitemap.xml.gz and b/pr-preview/pr-155/sitemap.xml.gz differ
diff --git a/pr-preview/pr-155/stream/index.html b/pr-preview/pr-155/stream/index.html
index 7e96ef05..289ad1f9 100644
--- a/pr-preview/pr-155/stream/index.html
+++ b/pr-preview/pr-155/stream/index.html
@@ -1351,7 +1351,7 @@
deserializer: typing.Optional[Deserializer] = None,
initial_offsets: typing.Optional[typing.List[TopicPartitionOffset]] = None,
rebalance_listener: typing.Optional[RebalanceListener] = None,
- middlewares: typing.Optional[typing.List[MiddlewareFactory]] = None,
+ middlewares: typing.Optional[typing.List[Middleware]] = None,
) -> None:
self.func = func
self.backend = backend