|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +from typing import Any, Mapping, MutableMapping, TypedDict |
| 4 | + |
| 5 | +from arroyo.backends.kafka.configuration import ( |
| 6 | + build_kafka_configuration, |
| 7 | + build_kafka_consumer_configuration, |
| 8 | +) |
| 9 | +from arroyo.backends.kafka.consumer import KafkaConsumer, KafkaPayload, KafkaProducer |
| 10 | +from arroyo.processing.processor import StreamProcessor |
| 11 | +from arroyo.types import Topic |
| 12 | + |
| 13 | +from sentry_streams.adapters.arroyo.consumer import ( |
| 14 | + ArroyoConsumer, |
| 15 | + ArroyoStreamingFactory, |
| 16 | +) |
| 17 | +from sentry_streams.adapters.arroyo.routes import Route |
| 18 | +from sentry_streams.adapters.arroyo.steps import FilterStep, KafkaSinkStep, MapStep |
| 19 | +from sentry_streams.adapters.stream_adapter import PipelineConfig, StreamAdapter |
| 20 | +from sentry_streams.pipeline.pipeline import ( |
| 21 | + Filter, |
| 22 | + FlatMapStep, |
| 23 | + KafkaSink, |
| 24 | + KafkaSource, |
| 25 | + Map, |
| 26 | + Reduce, |
| 27 | + Sink, |
| 28 | + Source, |
| 29 | +) |
| 30 | + |
| 31 | + |
| 32 | +class KafkaConsumerConfig(TypedDict): |
| 33 | + bootstrap_servers: str |
| 34 | + auto_offset_reset: str |
| 35 | + consumer_group: str |
| 36 | + additional_settings: Mapping[str, Any] |
| 37 | + |
| 38 | + |
| 39 | +class KafkaProducerConfig(TypedDict): |
| 40 | + bootstrap_servers: str |
| 41 | + additional_settings: Mapping[str, Any] |
| 42 | + |
| 43 | + |
| 44 | +class KafkaSources: |
| 45 | + def __init__( |
| 46 | + self, |
| 47 | + sources_config: Mapping[str, KafkaConsumerConfig], |
| 48 | + sources_override: Mapping[str, KafkaConsumer] = {}, |
| 49 | + ) -> None: |
| 50 | + super().__init__() |
| 51 | + |
| 52 | + self.__sources_config = sources_config |
| 53 | + |
| 54 | + # Overrides are for unit testing purposes |
| 55 | + self.__source_topics: MutableMapping[str, Topic] = {} |
| 56 | + self.__sources: MutableMapping[str, KafkaConsumer] = {**sources_override} |
| 57 | + |
| 58 | + def add_source(self, step: Source) -> None: |
| 59 | + """ |
| 60 | + Builds an Arroyo Kafka consumer as a stream source. |
| 61 | + By default it uses the configuration provided to the adapter. |
| 62 | +
|
| 63 | + It is possible to override the configuration by providing an |
| 64 | + instantiated consumer for unit testing purposes. |
| 65 | + """ |
| 66 | + # TODO: Provide a better way to get the logical stream name from |
| 67 | + # the Sink step. We should not have to assert it is a Kafka sink |
| 68 | + assert isinstance(step, KafkaSource), "Only Kafka Sources are supported" |
| 69 | + source_name = step.name |
| 70 | + if source_name not in self.__sources: |
| 71 | + config = self.__sources_config.get(source_name) |
| 72 | + assert config, f"Config not provided for source {source_name}" |
| 73 | + self.__sources[source_name] = KafkaConsumer( |
| 74 | + build_kafka_consumer_configuration( |
| 75 | + default_config=config["additional_settings"], |
| 76 | + bootstrap_servers=config["bootstrap_servers"], |
| 77 | + auto_offset_reset=config["auto_offset_reset"], |
| 78 | + group_id=config["consumer_group"], |
| 79 | + ) |
| 80 | + ) |
| 81 | + |
| 82 | + self.__source_topics[source_name] = Topic(step.logical_topic) |
| 83 | + |
| 84 | + def get_topic(self, source: str) -> Topic: |
| 85 | + return self.__source_topics[source] |
| 86 | + |
| 87 | + def get_consumer(self, source: str) -> KafkaConsumer: |
| 88 | + return self.__sources[source] |
| 89 | + |
| 90 | + |
| 91 | +class ArroyoAdapter(StreamAdapter[Route, Route]): |
| 92 | + |
| 93 | + def __init__( |
| 94 | + self, |
| 95 | + sources_config: Mapping[str, KafkaConsumerConfig], |
| 96 | + sinks_config: Mapping[str, KafkaProducerConfig], |
| 97 | + sources_override: Mapping[str, KafkaConsumer] = {}, |
| 98 | + sinks_override: Mapping[str, KafkaProducer] = {}, |
| 99 | + ) -> None: |
| 100 | + super().__init__() |
| 101 | + |
| 102 | + self.__sources = KafkaSources(sources_config, sources_override) |
| 103 | + self.__sinks_config = sinks_config |
| 104 | + |
| 105 | + # Overrides are for unit testing purposes |
| 106 | + self.__sinks: MutableMapping[str, Any] = {**sinks_override} |
| 107 | + |
| 108 | + self.__consumers: MutableMapping[str, ArroyoConsumer] = {} |
| 109 | + |
| 110 | + @classmethod |
| 111 | + def build(cls, config: PipelineConfig) -> ArroyoAdapter: |
| 112 | + return cls( |
| 113 | + config["sources_config"], |
| 114 | + config["sinks_config"], |
| 115 | + config.get("sources_override", {}), |
| 116 | + config.get("sinks_override", {}), |
| 117 | + ) |
| 118 | + |
| 119 | + def source(self, step: Source) -> Route: |
| 120 | + """ |
| 121 | + Builds an Arroyo Kafka consumer as a stream source. |
| 122 | + By default it uses the configuration provided to the adapter. |
| 123 | +
|
| 124 | + It is possible to override the configuration by providing an |
| 125 | + instantiated consumer for unit testing purposes. |
| 126 | + """ |
| 127 | + source_name = step.name |
| 128 | + self.__sources.add_source(step) |
| 129 | + self.__consumers[source_name] = ArroyoConsumer(source_name) |
| 130 | + |
| 131 | + return Route(source_name, []) |
| 132 | + |
| 133 | + def sink(self, step: Sink, stream: Route) -> Route: |
| 134 | + """ |
| 135 | + Builds an Arroyo Kafka producer as a stream sink. |
| 136 | + By default it uses the configuration provided to the adapter. |
| 137 | +
|
| 138 | + It is possible to override the configuration by providing an |
| 139 | + instantiated consumer for unit testing purposes. |
| 140 | + """ |
| 141 | + # TODO: Provide a better way to get the logical stream name from |
| 142 | + # the Sink step. We should not have to assert it is a Kafka sink |
| 143 | + assert isinstance(step, KafkaSink), "Only Kafka Sinks are supported" |
| 144 | + |
| 145 | + sink_name = step.name |
| 146 | + if sink_name not in self.__sinks: |
| 147 | + config = self.__sinks_config.get(sink_name) |
| 148 | + assert config, f"Config not provided for sink {sink_name}" |
| 149 | + producer = KafkaProducer( |
| 150 | + build_kafka_configuration( |
| 151 | + default_config=config["additional_settings"], |
| 152 | + bootstrap_servers=config["bootstrap_servers"], |
| 153 | + ) |
| 154 | + ) |
| 155 | + else: |
| 156 | + producer = self.__sinks[sink_name] |
| 157 | + |
| 158 | + assert ( |
| 159 | + stream.source in self.__consumers |
| 160 | + ), f"Stream starting at source {stream.source} not found when adding a producer" |
| 161 | + |
| 162 | + self.__consumers[stream.source].add_step( |
| 163 | + KafkaSinkStep(route=stream, producer=producer, topic_name=step.logical_topic) |
| 164 | + ) |
| 165 | + |
| 166 | + return stream |
| 167 | + |
| 168 | + def map(self, step: Map, stream: Route) -> Route: |
| 169 | + """ |
| 170 | + Builds a map operator for the platform the adapter supports. |
| 171 | + """ |
| 172 | + assert ( |
| 173 | + stream.source in self.__consumers |
| 174 | + ), f"Stream starting at source {stream.source} not found when adding a map" |
| 175 | + |
| 176 | + self.__consumers[stream.source].add_step(MapStep(route=stream, pipeline_step=step)) |
| 177 | + return stream |
| 178 | + |
| 179 | + def flat_map(self, step: FlatMapStep, stream: Route) -> Route: |
| 180 | + """ |
| 181 | + Builds a flat-map operator for the platform the adapter supports. |
| 182 | + """ |
| 183 | + raise NotImplementedError |
| 184 | + |
| 185 | + def filter(self, step: Filter, stream: Route) -> Route: |
| 186 | + """ |
| 187 | + Builds a filter operator for the platform the adapter supports. |
| 188 | + """ |
| 189 | + assert ( |
| 190 | + stream.source in self.__consumers |
| 191 | + ), f"Stream starting at source {stream.source} not found when adding a filter" |
| 192 | + |
| 193 | + self.__consumers[stream.source].add_step(FilterStep(route=stream, pipeline_step=step)) |
| 194 | + return stream |
| 195 | + |
| 196 | + def reduce( |
| 197 | + self, |
| 198 | + step: Reduce, |
| 199 | + stream: Route, |
| 200 | + ) -> Route: |
| 201 | + """ |
| 202 | + Build a map operator for the platform the adapter supports. |
| 203 | + """ |
| 204 | + raise NotImplementedError |
| 205 | + |
| 206 | + def get_processor(self, source: str) -> StreamProcessor[KafkaPayload]: |
| 207 | + """ |
| 208 | + Returns the stream processor for the given source. |
| 209 | + """ |
| 210 | + factory = ArroyoStreamingFactory(self.__consumers[source]) |
| 211 | + |
| 212 | + return StreamProcessor( |
| 213 | + consumer=self.__sources.get_consumer(source), |
| 214 | + topic=self.__sources.get_topic(source), |
| 215 | + processor_factory=factory, |
| 216 | + ) |
| 217 | + |
| 218 | + def run(self) -> None: |
| 219 | + """ |
| 220 | + Starts the pipeline |
| 221 | + """ |
| 222 | + # TODO: Support multiple consumers |
| 223 | + assert len(self.__consumers) == 1, "Only one consumer is supported" |
| 224 | + source = next(iter(self.__consumers)) |
| 225 | + |
| 226 | + processor = self.get_processor(source) |
| 227 | + processor.run() |
0 commit comments