From 0d5588c006e90bac9c0bc1b442a56df1916a2724 Mon Sep 17 00:00:00 2001 From: marcosschroh Date: Tue, 16 Jan 2024 16:23:50 +0000 Subject: [PATCH] =?UTF-8?q?Deploy=20preview=20for=20PR=20155=20?= =?UTF-8?q?=F0=9F=9B=AB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pr-preview/pr-155/engine/index.html | 12 ++-- pr-preview/pr-155/middleware/index.html | 90 +++++++++++------------- pr-preview/pr-155/sitemap.xml.gz | Bin 127 -> 127 bytes pr-preview/pr-155/stream/index.html | 2 +- 4 files changed, 48 insertions(+), 56 deletions(-) diff --git a/pr-preview/pr-155/engine/index.html b/pr-preview/pr-155/engine/index.html index 409dcd30..5829e1da 100644 --- a/pr-preview/pr-155/engine/index.html +++ b/pr-preview/pr-155/engine/index.html @@ -849,9 +849,7 @@

252 253 254 -255 -256 -257
class StreamEngine:
+255
class StreamEngine:
     """
     Attributes:
         backend kstreams.backends.Kafka: Backend to connect. Default `Kafka`
@@ -893,7 +891,7 @@ 

consumer_class: typing.Type[ConsumerType], producer_class: typing.Type[ProducerType], monitor: PrometheusMonitor, - middlewares: typing.List[MiddlewareFactory], + middlewares: typing.List[Middleware], title: typing.Optional[str] = None, deserializer: typing.Optional[Deserializer] = None, serializer: typing.Optional[Serializer] = None, @@ -1041,9 +1039,7 @@

def build_stream_middleware_stack(self, stream: Stream) -> NextMiddlewareCall: udf_handler = UdfHandler(handler=stream.func, stream=stream) stream.middlewares = ( - [MiddlewareFactory(ExceptionMiddleware)] - + self.middlewares - + stream.middlewares + [Middleware(ExceptionMiddleware)] + self.middlewares + stream.middlewares ) next_call = udf_handler @@ -1066,7 +1062,7 @@

deserializer: typing.Optional[Deserializer] = None, initial_offsets: typing.Optional[typing.List[TopicPartitionOffset]] = None, rebalance_listener: typing.Optional[RebalanceListener] = None, - middlewares: typing.Optional[typing.List[MiddlewareFactory]] = None, + middlewares: typing.Optional[typing.List[Middleware]] = None, **kwargs, ) -> typing.Callable[[StreamFunc], Stream]: def decorator(func: StreamFunc) -> Stream: diff --git a/pr-preview/pr-155/middleware/index.html b/pr-preview/pr-155/middleware/index.html index 3dd27dc1..6e9e20e9 100644 --- a/pr-preview/pr-155/middleware/index.html +++ b/pr-preview/pr-155/middleware/index.html @@ -543,7 +543,7 @@

Middleware

Kstreams allows you to include middlewares for adding behavior to streams.

-

A middleware is a callable that works with every ConsumerRecord (CR) before it is processed by a specific stream. Also after the CR has been handled Middlewares also have access to the stream, send function.

+

A middleware is a callable that works with every ConsumerRecord (CR) before and after it is processed by a specific stream. Middlewares also have access to the stream, send function.

  • It takes each CR that arrives to a topic.
  • It can then do something to the CR or run any needed code.
  • @@ -560,37 +560,27 @@

    Middleware

    -

    - Bases: Protocol

    Source code in kstreams/middleware/middleware.py -
    15
    -16
    -17
    -18
    -19
    -20
    -21
    -22
    -23
    -24
    -25
    -26
    -27
    class Middleware(typing.Protocol):
    +              
    30
    +31
    +32
    +33
    +34
    +35
    +36
    +37
    +38
    class Middleware:
         def __init__(
    -        self,
    -        *,
    -        next_call: types.NextMiddlewareCall,
    -        send: types.Send,
    -        stream: "Stream",
    -        **kwargs: typing.Any,
    +        self, middleware: typing.Type[MiddlewareProtocol], **kwargs: typing.Any
         ) -> None:
    -        ...  #  pragma: no cover
    +        self.middleware = middleware
    +        self.kwargs = kwargs
     
    -    async def __call__(self, cr: ConsumerRecord) -> typing.Any:
    -        ...  #  pragma: no cover
    +    def __iter__(self) -> typing.Iterator:
    +        return iter((self.middleware, self.kwargs))
     
    @@ -647,7 +637,7 @@

    Creating a middleware

    from .engine import stream_engine -middlewares = [middleware.MiddlewareFactory(ElasticMiddleware)] +middlewares = [middleware.Middleware(ElasticMiddleware)] @stream_engine.stream("kstreams-topic", middlewares=middlewares) async def processor(cr: ConsumerRecord): @@ -657,7 +647,7 @@

    Creating a middleware

    from kstreams import ConsumerRecord, middleware, create
     
    -middlewares = [middleware.MiddlewareFactory(ElasticMiddleware)]
    +middlewares = [middleware.Middleware(ElasticMiddleware)]
     
     stream_engine = kstreams.create_engine(
         title="my-stream-engine",
    @@ -678,21 +668,12 @@ 

    Creating a middleware

    Adding extra configuration to middlewares

    If you want to provide extra configuration to middleware you should override the init method, ensuring that is contains the keywargs next_call, send and stream, then any remaining are optional keywargs.

    Let's consider that we want to send an event to a spcific topic when a ValueError is raised inside a stream (Dead Letter Queue)

    -
    from kstreams import ConsumerRecord, types, Stream
    +
    from kstreams import ConsumerRecord, types, Stream, middleware
     
     
    -class DLQMiddleware:
    -    def __init__(
    -        self,
    -        *,
    -        next_call: types.NextMiddlewareCall,
    -        send: types.Send,
    -        stream: Stream,
    -        topic: str,
    -    ) -> None:
    -        self.next_call = next_call
    -        self.send = send
    -        self.stream = stream
    +class DLQMiddleware(middleware.BaseMiddleware):
    +    def __init__(self, *, topic: str, **kwargs) -> None:
    +        super().__init__(**kwargs)
             self.topic = topic
     
         async def __call__(self, cr: ConsumerRecord):
    @@ -704,7 +685,7 @@ 

    Adding extra configuration to # Create the middlewares middlewares = [ - middleware.MiddlewareFactory( + middleware.Middleware( DLQMiddleware, topic="kstreams-dlq-topic" ) ] @@ -789,7 +770,22 @@

    Middleware by default

  • If middlewares are added to a stream and stream_engine, then the middleware stack is build first with stream middlewares and then with the stream_engines middlewares. This means the first the stream middlewares are evaluated first.
  • In the example we are adding three middelwares in the following order: DLQMiddleware, ElasticMiddleware, and S3Middleware. The code chain execution will be:

    -

    topic event --> ExceptionMiddleware --> DLQMiddleware --> ElasticMiddleware --> S3Middleware --> processor

    +
    sequenceDiagram
    +    autonumber
    +    ExceptionMiddleware->>DLQMiddleware: 
    +    Note left of ExceptionMiddleware: Event received
    +    alt No Processing Error
    +    DLQMiddleware->>ElasticMiddleware: 
    +    Note right of ElasticMiddleware: Store CR on Elastic
    +    ElasticMiddleware->>S3Middleware: 
    +    Note right of S3Middleware: Store CR on S3
    +    S3Middleware->>Stream: 
    +    Note right of Stream: CR processed
    +    Stream-->>S3Middleware: 
    +    S3Middleware-->>ElasticMiddleware: 
    +    ElasticMiddleware-->>DLQMiddleware: 
    +    DLQMiddleware-->>ExceptionMiddleware: 
    +    end
    Multiple middlewares example
    from kstreams import ConsumerRecord, Stream, middleware
     
     
    @@ -814,9 +810,9 @@ 

    Middleware by default

    middlewares = [ - middleware.MiddlewareFactory(DLQMiddleware), - middleware.MiddlewareFactory(ElasticMiddleware), - middleware.MiddlewareFactory(S3Middleware), + middleware.Middleware(DLQMiddleware), + middleware.Middleware(ElasticMiddleware), + middleware.Middleware(S3Middleware), ] @@ -851,8 +847,8 @@

    Executing Code after the CR w middlewares = [ - middleware.MiddlewareFactory(DLQMiddleware), - middleware.MiddlewareFactory(ElasticMiddleware), + middleware.Middleware(DLQMiddleware), + middleware.Middleware(ElasticMiddleware), ] diff --git a/pr-preview/pr-155/sitemap.xml.gz b/pr-preview/pr-155/sitemap.xml.gz index dc0a0a542f52a4a38c54e564b24db941bce7cac0..52d46e53df77694c49e52b9001260b5274a5bd4a 100644 GIT binary patch delta 13 Ucmb=gXP58h;F!90*+ljV03QMbxBvhE delta 13 Ucmb=gXP58h;E2^(IFY>q02>(t`Tzg` diff --git a/pr-preview/pr-155/stream/index.html b/pr-preview/pr-155/stream/index.html index 7e96ef05..289ad1f9 100644 --- a/pr-preview/pr-155/stream/index.html +++ b/pr-preview/pr-155/stream/index.html @@ -1351,7 +1351,7 @@

    deserializer: typing.Optional[Deserializer] = None, initial_offsets: typing.Optional[typing.List[TopicPartitionOffset]] = None, rebalance_listener: typing.Optional[RebalanceListener] = None, - middlewares: typing.Optional[typing.List[MiddlewareFactory]] = None, + middlewares: typing.Optional[typing.List[Middleware]] = None, ) -> None: self.func = func self.backend = backend