diff --git a/snuba/cli/multistorage_consumer.py b/snuba/cli/multistorage_consumer.py deleted file mode 100644 index 343782b139d..00000000000 --- a/snuba/cli/multistorage_consumer.py +++ /dev/null @@ -1,315 +0,0 @@ -import signal -from typing import Any, Optional, Sequence - -import click -from arroyo import Topic, configure_metrics -from arroyo.backends.kafka import ( - KafkaConsumer, - KafkaPayload, - KafkaProducer, - build_kafka_configuration, - build_kafka_consumer_configuration, -) -from arroyo.commit import IMMEDIATE -from arroyo.dlq import DlqLimit, DlqPolicy, KafkaDlqProducer -from arroyo.processing import StreamProcessor -from arroyo.processing.strategies import ProcessingStrategyFactory -from arroyo.utils.profiler import ProcessingStrategyProfilerWrapperFactory -from confluent_kafka import Producer as ConfluentKafkaProducer - -from snuba import environment, settings -from snuba.consumers.consumer import ( - CommitLogConfig, - MultistorageConsumerProcessingStrategyFactory, -) -from snuba.consumers.consumer_config import resolve_consumer_config -from snuba.datasets.storage import WritableTableStorage -from snuba.datasets.storages.factory import ( - get_writable_storage, - get_writable_storage_keys, -) -from snuba.datasets.storages.storage_key import StorageKey -from snuba.environment import setup_logging, setup_sentry -from snuba.utils.metrics.backends.abstract import MetricsBackend -from snuba.utils.metrics.wrapper import MetricsWrapper -from snuba.utils.streams.metrics_adapter import StreamMetricsAdapter - - -@click.command(hidden=True) -@click.option( - "--storage", - "storage_names", - type=click.Choice( - [storage_key.value for storage_key in get_writable_storage_keys()] - ), - multiple=True, - required=True, -) -@click.option("--raw-events-topic", help="Topic to consume raw events from.") -@click.option( - "--replacements-topic", - help="Topic to produce replacement messages info.", -) -@click.option( - "--commit-log-topic", - help="Topic for committed offsets to be written to, triggering post-processing task(s)", -) -@click.option( - "--consumer-group", - default="snuba-consumers", -) -@click.option( - "--bootstrap-server", - multiple=True, - help="Kafka bootstrap server to use.", -) -@click.option( - "--slice-id", - "slice_id", - type=int, - help="The slice id for the storage", -) -@click.option( - "--max-batch-size", - default=settings.DEFAULT_MAX_BATCH_SIZE, - type=int, - help="Max number of messages to batch in memory before writing to Kafka.", -) -@click.option( - "--max-batch-time-ms", - default=settings.DEFAULT_MAX_BATCH_TIME_MS, - type=int, - help="Max length of time to buffer messages in memory before writing to Kafka.", -) -@click.option( - "--auto-offset-reset", - default="error", - type=click.Choice(["error", "earliest", "latest"]), - help="Kafka consumer auto offset reset.", -) -@click.option( - "--no-strict-offset-reset", - is_flag=True, - help="Forces the kafka consumer auto offset reset.", -) -@click.option( - "--queued-max-messages-kbytes", - default=settings.DEFAULT_QUEUED_MAX_MESSAGE_KBYTES, - type=int, - help="Maximum number of kilobytes per topic+partition in the local consumer queue.", -) -@click.option( - "--queued-min-messages", - default=settings.DEFAULT_QUEUED_MIN_MESSAGES, - type=int, - help="Minimum number of messages per topic+partition librdkafka tries to maintain in the local consumer queue.", -) -@click.option("--processes", type=int) -@click.option( - "--input-block-size", - type=int, -) -@click.option( - "--output-block-size", - type=int, -) -@click.option("--log-level") -@click.option( - "--profile-path", type=click.Path(dir_okay=True, file_okay=False, exists=True) -) -def multistorage_consumer( - storage_names: Sequence[str], - raw_events_topic: Optional[str], - replacements_topic: Optional[str], - commit_log_topic: Optional[str], - consumer_group: str, - bootstrap_server: Sequence[str], - slice_id: Optional[int], - max_batch_size: int, - max_batch_time_ms: int, - auto_offset_reset: str, - no_strict_offset_reset: bool, - queued_max_messages_kbytes: int, - queued_min_messages: int, - processes: Optional[int], - input_block_size: Optional[int], - output_block_size: Optional[int], - log_level: Optional[str] = None, - profile_path: Optional[str] = None, -) -> None: - - DEFAULT_BLOCK_SIZE = int(32 * 1e6) - - if processes is not None: - if input_block_size is None: - input_block_size = DEFAULT_BLOCK_SIZE - - if output_block_size is None: - output_block_size = DEFAULT_BLOCK_SIZE - - setup_logging(log_level) - setup_sentry() - - consumer_config = resolve_consumer_config( - storage_names=storage_names, - raw_topic=raw_events_topic, - commit_log_topic=commit_log_topic, - replacements_topic=replacements_topic, - bootstrap_servers=bootstrap_server, - # Unlike the main consumer, commit log and replacement bootstrap servers are not - # currently able to be specified independently in the multistorage consumer. - commit_log_bootstrap_servers=[], - replacement_bootstrap_servers=[], - slice_id=slice_id, - max_batch_size=max_batch_size, - max_batch_time_ms=max_batch_time_ms, - queued_max_messages_kbytes=queued_max_messages_kbytes, - queued_min_messages=queued_min_messages, - ) - - storages = { - key: get_writable_storage(key) - for key in (getattr(StorageKey, name.upper()) for name in storage_names) - } - writable_storages = [*storages.values()] - - topic = Topic(consumer_config.raw_topic.physical_topic_name) - - replacements = ( - Topic(consumer_config.replacements_topic.physical_topic_name) - if consumer_config.replacements_topic is not None - else None - ) - - # XXX: This requires that all storages are associated with the same Kafka - # cluster so that they can be consumed by the same consumer instance. - # Unfortunately, we don't have the concept of independently configurable - # Kafka clusters in settings, only consumer configurations that are - # associated with storages and/or global default configurations. To avoid - # implementing yet another method of configuring Kafka clusters, this just - # piggybacks on the existing configuration method(s), with the assumption - # that most deployments are going to be using the default configuration. - storage_keys = [*storages.keys()] - - configuration = build_kafka_consumer_configuration( - consumer_config.raw_topic.broker_config, - group_id=consumer_group, - auto_offset_reset=auto_offset_reset, - strict_offset_reset=not no_strict_offset_reset, - queued_max_messages_kbytes=queued_max_messages_kbytes, - queued_min_messages=queued_min_messages, - ) - - metrics_tags = { - "group": consumer_group, - "storage": "_".join([storage_keys[0].value, "m"]), - } - - if slice_id: - metrics_tags["slice_id"] = str(slice_id) - - metrics = MetricsWrapper( - environment.metrics, - "consumer", - tags=metrics_tags, - ) - - consumer = KafkaConsumer(configuration) - - if consumer_config.commit_log_topic is None: - commit_log_config = None - else: - # XXX: This relies on the assumptions that a.) all storages are - # located on the same Kafka cluster (validated above.) - commit_log_producer = ConfluentKafkaProducer( - build_kafka_configuration(consumer_config.commit_log_topic.broker_config) - ) - - commit_log_config = CommitLogConfig( - commit_log_producer, - Topic(consumer_config.commit_log_topic.physical_topic_name), - consumer_group, - ) - - strategy_factory = build_multistorage_streaming_strategy_factory( - writable_storages, - max_batch_size, - max_batch_time_ms, - processes, - input_block_size, - output_block_size, - metrics, - commit_log_config, - replacements, - slice_id, - profile_path, - ) - - configure_metrics(StreamMetricsAdapter(metrics)) - - if consumer_config.dlq_topic is not None: - dlq_producer = KafkaProducer( - build_kafka_configuration(consumer_config.dlq_topic.broker_config) - ) - - dlq_policy = DlqPolicy( - KafkaDlqProducer( - dlq_producer, Topic(consumer_config.dlq_topic.physical_topic_name) - ), - DlqLimit( - max_invalid_ratio=0.01, - max_consecutive_count=1000, - ), - None, - ) - else: - dlq_policy = None - - processor = StreamProcessor( - consumer, topic, strategy_factory, IMMEDIATE, dlq_policy=dlq_policy - ) - - def handler(signum: int, frame: Any) -> None: - processor.signal_shutdown() - - signal.signal(signal.SIGINT, handler) - signal.signal(signal.SIGTERM, handler) - processor.run() - - -def build_multistorage_streaming_strategy_factory( - storages: Sequence[WritableTableStorage], - max_batch_size: int, - max_batch_time_ms: int, - processes: Optional[int], - input_block_size: Optional[int], - output_block_size: Optional[int], - metrics: MetricsBackend, - commit_log_config: Optional[CommitLogConfig], - replacements: Optional[Topic], - slice_id: Optional[int], - profile_path: Optional[str], -) -> ProcessingStrategyFactory[KafkaPayload]: - - strategy_factory: ProcessingStrategyFactory[ - KafkaPayload - ] = MultistorageConsumerProcessingStrategyFactory( - storages, - max_batch_size, - max_batch_time_ms / 1000.0, - processes=processes, - input_block_size=input_block_size, - output_block_size=output_block_size, - metrics=metrics, - slice_id=slice_id, - commit_log_config=commit_log_config, - replacements=replacements, - ) - - if profile_path is not None: - strategy_factory = ProcessingStrategyProfilerWrapperFactory( - strategy_factory, - profile_path, - ) - - return strategy_factory diff --git a/snuba/consumers/consumer.py b/snuba/consumers/consumer.py index 34b806b9b62..d72b2f5141e 100644 --- a/snuba/consumers/consumer.py +++ b/snuba/consumers/consumer.py @@ -4,7 +4,6 @@ import time from collections import defaultdict from datetime import datetime -from functools import partial from pickle import PickleBuffer from typing import ( Any, @@ -29,28 +28,7 @@ from arroyo.backends.kafka.commit import CommitCodec from arroyo.commit import Commit as CommitLogCommit from arroyo.dlq import InvalidMessage -from arroyo.processing.strategies import ( - CommitOffsets, - FilterStep, - ProcessingStrategy, - ProcessingStrategyFactory, - Reduce, - RunTask, - RunTaskInThreads, - RunTaskWithMultiprocessing, -) -from arroyo.processing.strategies.run_task_with_multiprocessing import ( - MultiprocessingPool, -) -from arroyo.types import ( - BaseValue, - BrokerValue, - Commit, - FilteredPayload, - Message, - Partition, - Topic, -) +from arroyo.types import BrokerValue, Message, Partition, Topic from confluent_kafka import KafkaError from confluent_kafka import Message as ConfluentMessage from confluent_kafka import Producer as ConfluentKafkaProducer @@ -60,19 +38,11 @@ from snuba.clickhouse.http import JSONRow, JSONRowEncoder, ValuesRowEncoder from snuba.consumers.schemas import _NOOP_CODEC, get_json_codec from snuba.consumers.types import KafkaMessageMetadata -from snuba.datasets.storage import WritableTableStorage -from snuba.datasets.storages.factory import get_writable_storage from snuba.datasets.storages.storage_key import StorageKey from snuba.datasets.table_storage import TableWriter -from snuba.processor import ( - AggregateInsertBatch, - InsertBatch, - MessageProcessor, - ReplacementBatch, -) +from snuba.processor import InsertBatch, MessageProcessor, ReplacementBatch from snuba.utils.metrics import MetricsBackend from snuba.utils.metrics.wrapper import MetricsWrapper -from snuba.utils.streams.configuration_builder import build_kafka_producer_configuration from snuba.utils.streams.topics import Topic as SnubaTopic from snuba.writer import BatchWriter @@ -614,253 +584,3 @@ def process_message( ) else: return result - - -def _process_message_multistorage_work( - metadata: KafkaMessageMetadata, storage_key: StorageKey, storage_message: Any -) -> Union[None, BytesInsertBatch, ReplacementBatch]: - result = ( - get_writable_storage(storage_key) - .get_table_writer() - .get_stream_loader() - .get_processor() - .process_message(storage_message, metadata) - ) - - if isinstance(result, AggregateInsertBatch): - values_row_encoder = get_values_row_encoder(storage_key) - return BytesInsertBatch( - [values_row_encoder.encode(row) for row in result.rows], - result.origin_timestamp, - result.sentry_received_timestamp, - ) - elif isinstance(result, InsertBatch): - return BytesInsertBatch( - [json_row_encoder.encode(row) for row in result.rows], - result.origin_timestamp, - result.sentry_received_timestamp, - ) - else: - return result - - -def process_message_multistorage( - message: Message[MultistorageKafkaPayload], -) -> MultistorageProcessedMessage: - assert isinstance(message.value, BrokerValue) - value = rapidjson.loads(message.payload.payload.value) - metadata = KafkaMessageMetadata( - message.value.offset, message.value.partition.index, message.value.timestamp - ) - - results: MutableSequence[ - Tuple[StorageKey, Union[None, BytesInsertBatch, ReplacementBatch]] - ] = [] - - for index, storage_key in enumerate(message.payload.storage_keys): - result = _process_message_multistorage_work( - metadata=metadata, storage_key=storage_key, storage_message=value - ) - results.append((storage_key, result)) - - return results - - -def has_destination_storages(message: Message[MultistorageKafkaPayload]) -> bool: - return len(message.payload.storage_keys) > 0 - - -def find_destination_storages( - storages: Sequence[WritableTableStorage], message: Message[KafkaPayload] -) -> MultistorageKafkaPayload: - storage_keys: MutableSequence[StorageKey] = [] - for storage in storages: - filter = storage.get_table_writer().get_stream_loader().get_pre_filter() - if filter is None or not filter.should_drop(message): - storage_keys.append(storage.get_storage_key()) - return MultistorageKafkaPayload(storage_keys, message.payload) - - -def build_multistorage_batch_writer( - metrics: MetricsBackend, - storage: WritableTableStorage, - replacements: Optional[Topic], - slice_id: Optional[int], -) -> ProcessedMessageBatchWriter: - replacement_batch_writer: Optional[ReplacementBatchWriter] - stream_loader = storage.get_table_writer().get_stream_loader() - replacement_topic_spec = stream_loader.get_replacement_topic_spec() - if replacements is not None: - assert replacement_topic_spec is not None - # XXX: The producer is flushed when closed on strategy teardown - # after an assignment is revoked, but never explicitly closed. - replacement_batch_writer = ReplacementBatchWriter( - ConfluentKafkaProducer( - build_kafka_producer_configuration( - replacement_topic_spec.topic, - override_params={ - "partitioner": "consistent", - "message.max.bytes": 10000000, # 10MB, default is 1MB - }, - ) - ), - replacements, - ) - else: - replacement_batch_writer = None - - insertion_metrics = MetricsWrapper( - metrics, - "insertions", - {"storage": storage.get_storage_key().value}, - ) - - return ProcessedMessageBatchWriter( - InsertBatchWriter( - storage.get_table_writer().get_batch_writer( - metrics, - {"load_balancing": "in_order", "insert_distributed_sync": 1}, - slice_id=slice_id, - ), - insertion_metrics, - ), - replacement_batch_writer, - metrics=insertion_metrics, - ) - - -def build_collector( - metrics: MetricsBackend, - storages: Sequence[WritableTableStorage], - commit_log_config: Optional[CommitLogConfig], - replacements: Optional[Topic], - slice_id: Optional[int], -) -> MultistorageCollector: - return MultistorageCollector( - { - storage.get_storage_key(): build_multistorage_batch_writer( - metrics, storage, replacements, slice_id - ) - for storage in storages - }, - commit_log_config, - ignore_errors={ - storage.get_storage_key() - for storage in storages - if storage.get_is_write_error_ignorable() is True - }, - ) - - -class MultistorageConsumerProcessingStrategyFactory( - ProcessingStrategyFactory[KafkaPayload] -): - def __init__( - self, - storages: Sequence[WritableTableStorage], - max_batch_size: int, - max_batch_time: float, - processes: Optional[int], - input_block_size: Optional[int], - output_block_size: Optional[int], - metrics: MetricsBackend, - slice_id: Optional[int], - commit_log_config: Optional[CommitLogConfig] = None, - replacements: Optional[Topic] = None, - initialize_parallel_transform: Optional[Callable[[], None]] = None, - parallel_collect_timeout: float = 10.0, - ) -> None: - if processes is not None: - assert input_block_size is not None, "input block size required" - assert output_block_size is not None, "output block size required" - else: - assert ( - input_block_size is None - ), "input block size cannot be used without processes" - assert ( - output_block_size is None - ), "output block size cannot be used without processes" - - self.__input_block_size = input_block_size - self.__output_block_size = output_block_size - self.__initialize_parallel_transform = initialize_parallel_transform - - self.__max_batch_size = max_batch_size - self.__max_batch_time = max_batch_time - self.__processes = processes - - self.__storages = storages - self.__metrics = metrics - - self.__process_message_fn = process_message_multistorage - - self.__collector = partial( - build_collector, - self.__metrics, - self.__storages, - commit_log_config, - replacements, - slice_id, - ) - self.__pool = ( - MultiprocessingPool(self.__processes, initialize_parallel_transform) - if self.__processes is not None - else None - ) - - def create_with_partitions( - self, - commit: Commit, - partitions: Mapping[Partition, int], - ) -> ProcessingStrategy[KafkaPayload]: - def accumulator( - batch_writer: MultistorageCollector, - message: BaseValue[MultistorageProcessedMessage], - ) -> MultistorageCollector: - batch_writer.submit(Message(message)) - return batch_writer - - def flush_batch( - message: Message[MultistorageCollector], - ) -> Message[MultistorageCollector]: - message.payload.close() - return message - - collect = Reduce[MultistorageProcessedMessage, MultistorageCollector]( - self.__max_batch_size, - self.__max_batch_time, - accumulator, - self.__collector, - RunTaskInThreads(flush_batch, 1, 1, CommitOffsets(commit)), - ) - - transform_function = self.__process_message_fn - - inner_strategy: ProcessingStrategy[ - Union[FilteredPayload, MultistorageKafkaPayload] - ] - - if self.__pool is None: - inner_strategy = RunTask(transform_function, collect) - else: - inner_strategy = RunTaskWithMultiprocessing( - transform_function, - collect, - max_batch_size=self.__max_batch_size, - max_batch_time=self.__max_batch_time, - pool=self.__pool, - input_block_size=self.__input_block_size, - output_block_size=self.__output_block_size, - ) - - return RunTask( - partial(find_destination_storages, self.__storages), - FilterStep( - has_destination_storages, - inner_strategy, - ), - ) - - def shutdown(self) -> None: - if self.__pool: - self.__pool.close() diff --git a/tests/test_consumer.py b/tests/test_consumer.py index 49865fe4f1d..33bce2bf77f 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -4,7 +4,7 @@ import pickle from datetime import datetime from pickle import PickleBuffer -from typing import MutableSequence, Optional +from typing import MutableSequence from unittest.mock import Mock, call import pytest @@ -17,19 +17,15 @@ BytesInsertBatch, InsertBatchWriter, LatencyRecorder, - MultistorageConsumerProcessingStrategyFactory, ProcessedMessageBatchWriter, ReplacementBatchWriter, + build_batch_writer, process_message, ) from snuba.consumers.strategy_factory import KafkaConsumerStrategyFactory from snuba.datasets.schemas.tables import TableSchema from snuba.datasets.storage import Storage -from snuba.datasets.storages.factory import ( - get_cdc_storage, - get_storage, - get_writable_storage, -) +from snuba.datasets.storages.factory import get_storage, get_writable_storage from snuba.datasets.storages.storage_key import StorageKey from snuba.processor import InsertBatch, ReplacementBatch from snuba.utils.metrics.wrapper import MetricsWrapper @@ -159,74 +155,6 @@ def get_row_count(storage: Storage) -> int: ) -@pytest.mark.clickhouse_db -@pytest.mark.parametrize( - "processes, input_block_size, output_block_size", - [ - pytest.param(1, int(32 * 1e6), int(64 * 1e6), id="multiprocessing"), - pytest.param(None, None, None, id="no multiprocessing"), - ], -) -def test_multistorage_strategy( - processes: Optional[int], - input_block_size: Optional[int], - output_block_size: Optional[int], -) -> None: - from tests.datasets.cdc.test_groupassignee import TestGroupassignee - from tests.datasets.cdc.test_groupedmessage import TestGroupedMessage - - groupassignees_storage = get_cdc_storage(StorageKey.GROUPASSIGNEES) - groupedmessages_storage = get_cdc_storage(StorageKey.GROUPEDMESSAGES) - - commit = Mock() - partitions = Mock() - - storages = [groupassignees_storage, groupedmessages_storage] - - strategy = MultistorageConsumerProcessingStrategyFactory( - storages, - 10, - 10, - processes, - input_block_size, - output_block_size, - TestingMetricsBackend(), - None, - None, - ).create_with_partitions(commit, partitions) - - payloads = [ - KafkaPayload(None, b"{}", [("table", b"ignored")]), - KafkaPayload( - None, - json.dumps(TestGroupassignee.INSERT_MSG).encode("utf8"), - [("table", groupassignees_storage.get_postgres_table().encode("utf8"))], - ), - KafkaPayload( - None, - json.dumps(TestGroupedMessage.INSERT_MSG).encode("utf8"), - [("table", groupedmessages_storage.get_postgres_table().encode("utf8"))], - ), - ] - - now = datetime.now() - - messages = [ - Message(BrokerValue(payload, Partition(Topic("topic"), 0), offset, now)) - for offset, payload in enumerate(payloads) - ] - - with assert_changes( - lambda: get_row_count(groupedmessages_storage), 0, 1 - ), assert_changes(lambda: get_row_count(groupedmessages_storage), 0, 1): - - for message in messages: - strategy.submit(message) - - strategy.close() - strategy.join() - - @pytest.mark.clickhouse_db def test_metrics_writing_e2e() -> None: distributions_storage = get_storage(StorageKey.METRICS_DISTRIBUTIONS) @@ -239,22 +167,43 @@ def test_metrics_writing_e2e() -> None: "unit": "ms", "type": "d", "value": [24.0, 80.0, 119.0, 146.0, 182.0], - "timestamp": datetime.now().timestamp(), + "timestamp": int(datetime.now().timestamp()), "tags": {"6": 91, "9": 134, "4": 117, "5": 7}, "metric_id": 8, "retention_days": 90, "sentry_received_timestamp": datetime.now().timestamp(), + "use_case_id": "performance", + "mapping_meta": {}, } ) commit = Mock() - partitions = Mock() - storages = [polymorphic_bucket] + table_writer = polymorphic_bucket.get_table_writer() + stream_loader = table_writer.get_stream_loader() - strategy = MultistorageConsumerProcessingStrategyFactory( - storages, 10, 10, None, None, None, TestingMetricsBackend(), None, None - ).create_with_partitions(commit, partitions) + metrics = TestingMetricsBackend() + + strategy = KafkaConsumerStrategyFactory( + None, + functools.partial( + process_message, + stream_loader.get_processor(), + "consumer_group", + SnubaTopic.METRICS, + True, + ), + build_batch_writer(table_writer, metrics=metrics), + max_batch_size=10, + max_batch_time=60, + max_insert_batch_size=None, + max_insert_batch_time=None, + processes=None, + input_block_size=None, + output_block_size=None, + health_check_file=None, + metrics_tags={}, + ).create_with_partitions(commit, {}) payloads = [KafkaPayload(None, dist_message.encode("utf-8"), [])] now = datetime.now()