From 13ef1063d7e827000bbcdf6833a885b6423fdea8 Mon Sep 17 00:00:00 2001 From: Arpad Borsos Date: Fri, 16 Jun 2023 12:00:48 +0200 Subject: [PATCH] feat(backpressure): Add Service monitoring (#50928) This changes the existing queue-based monitoring to a more generic `service` monitoring. Services are organized in a hierarchy of base services, and consumers depending on those. The main monitor / monitoring loop will check the health of different base services, and aggregates that health for the consumers. The consumer health is then persisted and queried. The settings around monitoring and checking are also streamlined, and a new setting for high watermarks replaces the existing queue-size based options. fixes https://github.com/getsentry/team-processing/issues/56 fixes https://github.com/getsentry/team-processing/issues/55 --------- Co-authored-by: Sebastian Zivota --- src/sentry/conf/server.py | 40 ++-- src/sentry/options/defaults.py | 49 +++-- src/sentry/processing/backpressure/arroyo.py | 6 +- src/sentry/processing/backpressure/health.py | 57 ++++++ src/sentry/processing/backpressure/memory.py | 64 ++++++ src/sentry/processing/backpressure/monitor.py | 102 ++++++++++ .../processing/backpressure/rabbitmq.py | 191 ------------------ src/sentry/processing/backpressure/redis.py | 55 ----- .../processing/backpressure/topology.py | 67 ++---- src/sentry/runner/commands/run.py | 4 +- tests/sentry/monitoring/test_queues.py | 119 ----------- .../processing/backpressure/test_checking.py | 94 +++++++++ .../backpressure/test_monitoring.py | 84 ++++++++ .../processing/backpressure/test_redis.py | 13 +- 14 files changed, 477 insertions(+), 468 deletions(-) create mode 100644 src/sentry/processing/backpressure/health.py create mode 100644 src/sentry/processing/backpressure/memory.py create mode 100644 src/sentry/processing/backpressure/monitor.py delete mode 100644 src/sentry/processing/backpressure/rabbitmq.py delete mode 100644 src/sentry/processing/backpressure/redis.py delete mode 100644 tests/sentry/monitoring/test_queues.py create mode 100644 tests/sentry/processing/backpressure/test_checking.py create mode 100644 tests/sentry/processing/backpressure/test_monitoring.py diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 19033dff8e19d..45f7bbc64c195 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -16,7 +16,6 @@ Callable, Dict, Iterable, - List, Mapping, Optional, Tuple, @@ -3528,24 +3527,27 @@ class TopicDefinition(TypedDict): # Once we start detecting crashes for other SDKs, this will be a mapping of SDK name to project ID or something similar. SDK_CRASH_DETECTION_PROJECT_ID: Optional[int] = None -# The Redis cluster to use for monitoring the health of -# Celery queues. -SENTRY_QUEUE_MONITORING_REDIS_CLUSTER = "default" - -# The RabbitMQ hosts whose health should be monitored by the backpressure system. -# This should be a list of dictionaries with keys "url" and "vhost". -# E.g. for local testing: [{"url": "https://guest:guest@localhost:15672", "vhost": "%2F"}] -SENTRY_QUEUE_MONITORING_RABBITMQ_HOSTS: List[Dict[str, str]] = [] - -# This is a mapping between the various processing stores, -# and the redis `cluster` they are using. -# This setting needs to be appropriately synched across the various deployments -# for automatic backpressure management to properly work. -SENTRY_PROCESSING_REDIS_CLUSTERS = { - "attachments": "rc-short", - # "processing": "processing", - "locks": "default", - "post_process_locks": "default", +# The Redis cluster to use for monitoring the service / consumer health. +SENTRY_SERVICE_MONITORING_REDIS_CLUSTER = "default" + +# This is a view of which abstract processing service is backed by which infrastructure. +# Right now, the infrastructure can be `redis` or `rabbitmq`. +# +# For `redis`, one has to provide the cluster id. +# It has to match a cluster defined in `redis.redis_clusters`. +# +# For `rabbitmq`, one has to provide a list of server URLs. +# The URL is in the format `http://{user}:{password}@{hostname}:{port}/`. +# +# The definition can also be empty, in which case nothing is checked and +# the service is assumed to be healthy. +# However, the service *must* be defined. +SENTRY_PROCESSING_SERVICES: Mapping[str, Any] = { + "celery": {"redis": "default"}, + "attachments-store": {"redis": "rc-short"}, + "processing-store": {}, # "redis": "processing"}, + "processing-locks": {"redis": "default"}, + "post-process-locks": {"redis": "default"}, } diff --git a/src/sentry/options/defaults.py b/src/sentry/options/defaults.py index 2a550de55afc8..da2d0611f42fc 100644 --- a/src/sentry/options/defaults.py +++ b/src/sentry/options/defaults.py @@ -1305,24 +1305,37 @@ register("span_descs.bump-lifetime-sample-rate", default=0.25, flags=FLAG_AUTOMATOR_MODIFIABLE) # Decides whether artifact bundles asynchronous renewal is enabled. register("sourcemaps.artifact-bundles.enable-renewal", default=0.0, flags=FLAG_AUTOMATOR_MODIFIABLE) -# Enables reporting status of queues for backpressure management. -register( - "backpressure.monitor_queues.enable_status", default=False, flags=FLAG_AUTOMATOR_MODIFIABLE -) -# Enables checking queue health in consumers for backpressure management. -register("backpressure.monitor_queues.enable_check", default=False, flags=FLAG_AUTOMATOR_MODIFIABLE) + +# === Backpressure related runtime options === + +# Enables monitoring of services for backpressure management. +register("backpressure.monitoring.enabled", default=False, flags=FLAG_AUTOMATOR_MODIFIABLE) +# How often the monitor will check service health. +register("backpressure.monitoring.interval", default=5, flags=FLAG_AUTOMATOR_MODIFIABLE) + +# Enables checking consumer health for backpressure management. +register("backpressure.checking.enabled", default=False, flags=FLAG_AUTOMATOR_MODIFIABLE) +# How often a consumer will check for its health in a debounced fassion. +register("backpressure.checking.interval", default=5, flags=FLAG_AUTOMATOR_MODIFIABLE) + + +# How long a status is persisted, which means that updates to health status can be paused for that long before consumers will assume things are unhealthy +register("backpressure.status_ttl", default=60, flags=FLAG_AUTOMATOR_MODIFIABLE) + +# The high-watermark levels per-service which will mark a service as unhealthy. +# This should mirror the `SENTRY_PROCESSING_SERVICES` setting. +# If this option is being modified downstream, and a new default setting +# may be added, it has to be updated downstream as well, otherwise the +# code will throw. This is intentional. We want to throw fast and loud on +# misconfiguration. register( - "backpressure.monitor_queues.check_interval_in_seconds", - default=5, + "backpressure.high_watermarks", + default={ + "celery": 0.5, + "attachments-store": 0.5, + "processing-store": 0.5, + "processing-locks": 0.5, + "post-process-locks": 0.5, + }, flags=FLAG_AUTOMATOR_MODIFIABLE, ) -register( - "backpressure.monitor_queues.unhealthy_threshold", default=1000, flags=FLAG_AUTOMATOR_MODIFIABLE -) -# How often we check queue health. -register("backpressure.monitor_queues.check_interval", default=5, flags=FLAG_AUTOMATOR_MODIFIABLE) -# How many times in a row a queue must be unhealthy before it is -# recorded in Redis. 12 * 5sec = unhealthy for 1 minute. -register( - "backpressure.monitor_queues.strike_threshold", default=12, flags=FLAG_AUTOMATOR_MODIFIABLE -) diff --git a/src/sentry/processing/backpressure/arroyo.py b/src/sentry/processing/backpressure/arroyo.py index 741e28eb0cd7d..cb9b80bdf3c9a 100644 --- a/src/sentry/processing/backpressure/arroyo.py +++ b/src/sentry/processing/backpressure/arroyo.py @@ -7,7 +7,7 @@ from arroyo.types import FilteredPayload, Message from sentry import options -from sentry.processing.backpressure.rabbitmq import is_consumer_healthy +from sentry.processing.backpressure.health import is_consumer_healthy # As arroyo would otherwise busy-wait, we will sleep for a short time # when a message is rejected. @@ -24,9 +24,7 @@ def __init__(self, consumer_name: str = "default"): def is_healthy(self) -> bool: now = time.time() # Check queue health if it's been more than the interval - if now - self.last_check >= options.get( - "backpressure.monitor_queues.check_interval_in_seconds" - ): + if now - self.last_check >= options.get("backpressure.checking.interval"): # TODO: We would want to at first monitor everything all at once, # and make it more fine-grained later on. self.is_queue_healthy = is_consumer_healthy(self.consumer_name) diff --git a/src/sentry/processing/backpressure/health.py b/src/sentry/processing/backpressure/health.py new file mode 100644 index 0000000000000..6c6762bcf7802 --- /dev/null +++ b/src/sentry/processing/backpressure/health.py @@ -0,0 +1,57 @@ +from typing import Mapping + +import sentry_sdk +from django.conf import settings + +from sentry import options +from sentry.processing.backpressure.topology import CONSUMERS +from sentry.utils import redis + + +def _prefix_key(key_name: str) -> str: + return f"bp1:{key_name}" + + +HEALTHY_KEY_NAME = "consumer_is_healthy" + + +def _unhealthy_consumer_key(name: str) -> str: + return _prefix_key(f"{HEALTHY_KEY_NAME}:{name}") + + +service_monitoring_cluster = redis.redis_clusters.get( + settings.SENTRY_SERVICE_MONITORING_REDIS_CLUSTER +) + + +def is_consumer_healthy(consumer_name: str = "default") -> bool: + """Checks whether the given consumer is healthy by looking it up in Redis. + + NB: If the consumer is not found in Redis, it is assumed to be healthy. + This behavior might change in the future. + """ + + if not options.get("backpressure.checking.enabled"): + return True + # check if queue is healthy by pinging Redis + try: + return service_monitoring_cluster.get(_unhealthy_consumer_key(consumer_name)) == "true" + except Exception as e: + sentry_sdk.capture_exception(e) + # By default it's considered unhealthy + return False + + +def record_consumer_health(service_health: Mapping[str, bool]) -> None: + with service_monitoring_cluster.pipeline() as pipeline: + key_ttl = options.get("backpressure.status_ttl") + for name, dependencies in CONSUMERS.items(): + is_healthy = True + for dependency in dependencies: + is_healthy = is_healthy and service_health[dependency] + + pipeline.set( + _unhealthy_consumer_key(name), "true" if is_healthy else "false", ex=key_ttl + ) + + pipeline.execute() diff --git a/src/sentry/processing/backpressure/memory.py b/src/sentry/processing/backpressure/memory.py new file mode 100644 index 0000000000000..dda75053ca145 --- /dev/null +++ b/src/sentry/processing/backpressure/memory.py @@ -0,0 +1,64 @@ +from dataclasses import dataclass +from typing import Any, Generator, Mapping, Union + +import requests +from redis import Redis +from rediscluster import RedisCluster + + +@dataclass +class ServiceMemory: + used: int + available: int + percentage: float + + def __init__(self, used: int, available: int): + self.used = used + self.available = available + self.percentage = used / available + + +def query_rabbitmq_memory_usage(host: str) -> ServiceMemory: + """Returns the currently used memory and the memory limit of a + RabbitMQ host. + """ + + if not host.endswith("/"): + host += "/" + url = f"{host}api/nodes" + + response = requests.get(url) + response.raise_for_status() + json = response.json() + return ServiceMemory(json[0]["mem_used"], json[0]["mem_limit"]) + + +# Based on configuration, this could be: +# - a `rediscluster` Cluster (actually `RetryingRedisCluster`) +# - a straight `Redis` client (actually `FailoverRedis`) +# - or any class configured via `client_class`. +# It could in theory also be a `rb` (aka redis blaster) Cluster, but we +# intentionally do not support these. +Cluster = Union[RedisCluster, Redis] + + +def get_memory_usage(info: Mapping[str, Any]) -> ServiceMemory: + # or alternatively: `used_memory_rss`? + memory_used = info.get("used_memory", 0) + # `maxmemory` might be 0 in development + memory_available = info.get("maxmemory", 0) or info["total_system_memory"] + + return ServiceMemory(memory_used, memory_available) + + +def iter_cluster_memory_usage(cluster: Cluster) -> Generator[ServiceMemory, None, None]: + """ + A generator that yields redis `INFO` results for each of the nodes in the `cluster`. + """ + if isinstance(cluster, RedisCluster): + # `RedisCluster` returns these as a dictionary, with the node-id as key + for info in cluster.info().values(): + yield get_memory_usage(info) + else: + # otherwise, lets just hope that `info()` does the right thing + yield get_memory_usage(cluster.info()) diff --git a/src/sentry/processing/backpressure/monitor.py b/src/sentry/processing/backpressure/monitor.py new file mode 100644 index 0000000000000..25ec7bc3b1229 --- /dev/null +++ b/src/sentry/processing/backpressure/monitor.py @@ -0,0 +1,102 @@ +import time +from dataclasses import dataclass +from typing import Dict, Generator, List, Mapping, Union + +from django.conf import settings + +from sentry import options +from sentry.processing.backpressure.health import record_consumer_health + +# from sentry import options +from sentry.processing.backpressure.memory import ( + Cluster, + ServiceMemory, + iter_cluster_memory_usage, + query_rabbitmq_memory_usage, +) +from sentry.processing.backpressure.topology import PROCESSING_SERVICES +from sentry.utils import redis + + +@dataclass +class Redis: + cluster: Cluster + + +@dataclass +class RabbitMq: + servers: List[str] + + +Service = Union[Redis, RabbitMq, None] + + +def check_service_memory(service: Service) -> Generator[ServiceMemory, None, None]: + """ + This queries the given [`Service`] and returns the [`ServiceMemory`] + for each of the individual servers that comprise the service. + """ + + if isinstance(service, Redis): + yield from iter_cluster_memory_usage(service.cluster) + + elif isinstance(service, RabbitMq): + for server in service.servers: + yield query_rabbitmq_memory_usage(server) + + +def load_service_definitions() -> Dict[str, Service]: + services: Dict[str, Service] = {} + for name, definition in settings.SENTRY_PROCESSING_SERVICES.items(): + if cluster_id := definition.get("redis"): + cluster = redis.redis_clusters.get(cluster_id) + services[name] = Redis(cluster) + + elif rabbitmq_urls := definition.get("rabbitmq"): + services[name] = RabbitMq(rabbitmq_urls) + + else: + services[name] = None + + return services + + +def assert_all_services_defined(services: Dict[str, Service]) -> None: + for name in PROCESSING_SERVICES: + if name not in services: + raise ValueError( + f"The `{name}` Service is missing from `settings.SENTRY_PROCESSING_SERVICES`." + ) + + +def check_service_health(services: Mapping[str, Service]) -> Mapping[str, bool]: + service_health = {} + high_watermarks = options.get("backpressure.high_watermarks") + + for name, service in services.items(): + high_watermark = high_watermarks[name] + is_healthy = True + for memory in check_service_memory(service): + is_healthy = is_healthy and memory.percentage < high_watermark + + service_health[name] = is_healthy + + return service_health + + +def start_service_monitoring() -> None: + services = load_service_definitions() + assert_all_services_defined(services) + + while True: + if not options.get("backpressure.monitoring.enabled"): + time.sleep(options.get("backpressure.monitoring.interval")) + continue + + # first, check each base service and record its health + service_health = check_service_health(services) + + # then, check the derived services and record their health + record_consumer_health(service_health) + + time.sleep(options.get("backpressure.monitoring.interval")) diff --git a/src/sentry/processing/backpressure/rabbitmq.py b/src/sentry/processing/backpressure/rabbitmq.py deleted file mode 100644 index 6f6568063837b..0000000000000 --- a/src/sentry/processing/backpressure/rabbitmq.py +++ /dev/null @@ -1,191 +0,0 @@ -from dataclasses import dataclass -from time import sleep -from typing import Dict, List -from urllib.parse import urlparse - -import requests -import sentry_sdk -from django.conf import settings - -from sentry import options -from sentry.processing.backpressure.topology import ALL_QUEUES, CONSUMERS, Queue, Services -from sentry.utils import redis - -QUEUES = ["profiles.process"] - -UNHEALTHY_KEY_NAME = "unhealthy-consumers" -DEBUG_KEY_NAME = "queue-debug" - - -queue_monitoring_cluster = redis.redis_clusters.get(settings.SENTRY_QUEUE_MONITORING_REDIS_CLUSTER) - - -@dataclass(frozen=True) -class RabbitMqHost: - hostname: str - port: int - vhost: str - username: str - password: str - - -def _parse_rabbitmq(host: Dict[str, str]) -> RabbitMqHost: - if "url" not in host: - raise ValueError("missing url") - - if "vhost" not in host: - raise ValueError("missing vhost") - - url = host["url"] - vhost = host["vhost"] - dsn = urlparse(url) - hostname, port = dsn.hostname, dsn.port - username, password = dsn.username, dsn.password - if port is None: - port = 15672 - - if hostname is None: - raise ValueError("missing hostname") - - if username is None: - raise ValueError("missing username") - - if password is None: - raise ValueError("missing password") - - return RabbitMqHost(hostname, port, vhost, username, password) - - -def _prefix_key(key_name: str) -> str: - return f"bp1:{key_name}" - - -def _unhealthy_consumer_key(consumer_name: str) -> str: - return _prefix_key(f"{UNHEALTHY_KEY_NAME}:{consumer_name}") - - -def is_consumer_healthy(consumer_name: str = "default") -> bool: - """Checks whether the given consumer is healthy by looking it up in Redis. - - NB: If the consumer is not found in Redis, it is assumed to be healthy. - This behavior might change in the future. - """ - - if not options.get("backpressure.monitor_queues.enable_check"): - return True - # check if queue is healthy by pinging Redis - try: - # We set the key if the queue is unhealthy. If the key exists, - # the queue is unhealthy and we need to return False. - healthy = not queue_monitoring_cluster.exists(_unhealthy_consumer_key(consumer_name)) - # TODO: do we want to also check the `default` consumer as a catch-all? - except Exception as e: - sentry_sdk.capture_exception(e) - # By default it's considered healthy - healthy = True - return healthy - - -def _get_queue_sizes(hosts: List[RabbitMqHost], queues: List[str]) -> Dict[str, int]: - new_sizes = {queue: 0 for queue in queues} - - for host in hosts: - url = f"http://{host.hostname}:{host.port}/api/queues/{host.vhost}" - response = requests.get(url, auth=(host.username, host.password)) - response.raise_for_status() - - for queue in response.json(): - name = queue["name"] - size = queue["messages"] - if name in queues: - new_sizes[name] = max(new_sizes[name], size) - - return new_sizes - - -def _update_queue_stats(queue_history: Dict[str, int]) -> None: - strike_threshold = options.get("backpressure.monitor_queues.strike_threshold") - queue_health = _list_queues_over_threshold(strike_threshold, queue_history) - - unhealthy_queues = [queue for (queue, is_unhealthy) in queue_health.items() if is_unhealthy] - if unhealthy_queues: - # Report list of unhealthy queues to sentry - with sentry_sdk.push_scope() as scope: - scope.set_extra("unhealthy_queues", unhealthy_queues) - sentry_sdk.capture_message("RabbitMQ queues are exceeding size threshold") - - with queue_monitoring_cluster.pipeline() as pipeline: - # write the queue history to redis, for debugging purposes - pipeline.hmset(_prefix_key("queue-history"), queue_history) - - # update health markers for services - for (consumer_name, services) in CONSUMERS.items(): - unhealthy = _check_consumer_health(services, queue_health) - - if unhealthy: - pipeline.set(_unhealthy_consumer_key(consumer_name), "1", ex=60) - else: - pipeline.delete(_unhealthy_consumer_key(consumer_name)) - - pipeline.execute() - - -def _check_consumer_health(services: Services, queue_health: Dict[str, bool]) -> bool: - """ - Checks all the queues in `services` for their health. - """ - for service in services: - # TODO: we want to eventually also check the redis stores - if isinstance(service, Queue): - if queue_health.get(service.name, False): - return False - return True - - -def _is_healthy(queue_size) -> bool: - return queue_size < options.get("backpressure.monitor_queues.unhealthy_threshold") - - -def run_queue_stats_updater() -> None: - hosts = [] - for host in settings.SENTRY_QUEUE_MONITORING_RABBITMQ_HOSTS: - try: - hosts.append(_parse_rabbitmq(host)) - except ValueError as e: - with sentry_sdk.push_scope() as scope: - scope.set_extra("invalid_host", host) - sentry_sdk.capture_exception(e) - - queue_history = {queue: 0 for queue in ALL_QUEUES} - - while True: - if not options.get("backpressure.monitor_queues.enable_status"): - sleep(10) - continue - - try: - new_sizes = _get_queue_sizes(hosts, ALL_QUEUES) - for (queue, size) in new_sizes.items(): - if _is_healthy(size): - queue_history[queue] = 0 - else: - queue_history[queue] += 1 - except Exception as e: - sentry_sdk.capture_exception(e) - # If there was an error getting queue sizes from RabbitMQ, assume - # all queues are unhealthy - for queue in ALL_QUEUES: - queue_history[queue] += 1 - - try: - _update_queue_stats(queue_history) - except Exception as e: - sentry_sdk.capture_exception(e) - - sleep(options.get("backpressure.monitor_queues.check_interval")) - - -def _list_queues_over_threshold( - strike_threshold: int, queue_history: Dict[str, int] -) -> Dict[str, bool]: - return {queue: count >= strike_threshold for (queue, count) in queue_history.items()} diff --git a/src/sentry/processing/backpressure/redis.py b/src/sentry/processing/backpressure/redis.py deleted file mode 100644 index 7f35340f3016a..0000000000000 --- a/src/sentry/processing/backpressure/redis.py +++ /dev/null @@ -1,55 +0,0 @@ -from typing import Any, Generator, Mapping, Sequence, Union - -from redis import Redis -from rediscluster import RedisCluster - -# Based on configuration, this could be: -# - a `rediscluster` Cluster (actually `RetryingRedisCluster`) -# - a straight `Redis` client (actually `FailoverRedis`) -# - or any class configured via `client_class`. -# It could in theory also be a `rb` (aka redis blaster) Cluster, but we -# intentionally do not support these. -Cluster = Union[RedisCluster, Redis] - - -class RedisMemoryUsageMetrics: - """ - This class allows querying for the memory usage percentage of a number of - redis clusters, each consisting of a number of nodes. - """ - - def __init__(self, clusters: Sequence[Cluster]) -> None: - self.clusters = clusters - - def query_usage_percentage(self) -> float: - """ - Queries the memory usage (using the `INFO` command) of all the cluster nodes in the - given clusters, and returns the *highest* usage percentage of any participating node. - """ - - highest_usage = 0.0 - - infos = (info for cluster in self.clusters for info in iter_cluster_node_infos(cluster)) - for info in infos: - # or alternatively: `used_memory_rss`? - memory_used = info.get("used_memory", 0) - # `maxmemory` might be 0 in development - memory_available = info.get("maxmemory", 0) or info.get("total_system_memory", 0) - - if memory_available: - node_usage = min(memory_used / memory_available, 1.0) - highest_usage = max(highest_usage, node_usage) - - return highest_usage - - -def iter_cluster_node_infos(cluster: Cluster) -> Generator[Mapping[str, Any], None, None]: - """ - A generator that yields redis `INFO` results for each of the nodes in the `cluster`. - """ - if isinstance(cluster, RedisCluster): - # `RedisCluster` returns these as a dictionary, with the node-id as key - yield from cluster.info().values() - else: - # otherwise, lets just hope that `info()` does the right thing - yield cluster.info() diff --git a/src/sentry/processing/backpressure/topology.py b/src/sentry/processing/backpressure/topology.py index 1c79ce575f969..27059d0330e66 100644 --- a/src/sentry/processing/backpressure/topology.py +++ b/src/sentry/processing/backpressure/topology.py @@ -4,62 +4,19 @@ In other words, which service (consumer) depends on which other services (queues, processing store). """ -from dataclasses import dataclass -from typing import Mapping, Set, Union - -from django.conf import settings - -PROFILING_QUEUES = ["profiles.process"] -PROCESSING_QUEUES = [ - "events.preprocess_event", - "events.process_event", - "events.save_event", - "events.save_event_transaction", - "events.save_event_attachments", -] -SYMBOLICATION_QUEUES = [ - "events.symbolicate_event", - "events.symbolicate_js_event", - "events.symbolicate_event_low_priority", - "events.symbolicate_js_event_low_priority", -] -REPROCESSING_QUEUES = [ - "events.reprocess_events", - "events.reprocessing.preprocess_event", - "events.reprocessing.process_event", - "events.reprocessing.symbolicate_event", - "events.reprocessing.symbolicate_event_low_priority", +PROCESSING_SERVICES = [ + "celery", + "attachments-store", + "processing-store", + "processing-locks", + "post-process-locks", ] -INGEST_QUEUES = PROCESSING_QUEUES + SYMBOLICATION_QUEUES - -ALL_QUEUES = PROFILING_QUEUES + PROCESSING_QUEUES + SYMBOLICATION_QUEUES + REPROCESSING_QUEUES - - -@dataclass(frozen=True) -class Queue: - name: str - - -@dataclass(frozen=True) -class Redis: - name: str - - -ALL_REDIS_STORES = { - Redis(cluster) for cluster in settings.SENTRY_PROCESSING_REDIS_CLUSTERS.values() -} - - -Services = Set[Union[Queue, Redis]] - -CONSUMERS: Mapping[str, Services] = { +CONSUMERS = { # fallback if no explicit consumer was defined - "default": {Queue(name) for name in ALL_QUEUES}.union(ALL_REDIS_STORES), - "profiles": {Queue(name) for name in PROFILING_QUEUES}, - # TODO: - # We might want to eventually make this more fine-grained for different - # consumer types. For example, normal `ingest-events` does not depend on the - # `attachments` store, and other ingest - "ingest": {Queue(name) for name in INGEST_QUEUES}.union(ALL_REDIS_STORES), + "default": PROCESSING_SERVICES, + "profiles": ["celery"], + # We might want to eventually make this more fine-grained for different consumer types. + # For example, normal `ingest-events` does not depend on `attachments-store`. + "ingest": PROCESSING_SERVICES, } diff --git a/src/sentry/runner/commands/run.py b/src/sentry/runner/commands/run.py index 404d7db44acc0..953d71dcd0105 100644 --- a/src/sentry/runner/commands/run.py +++ b/src/sentry/runner/commands/run.py @@ -820,6 +820,6 @@ def last_seen_updater(**options): @log_options() @configuration def backpressure_monitor(): - from sentry.processing.backpressure.rabbitmq import run_queue_stats_updater + from sentry.processing.backpressure.monitor import start_service_monitoring - run_queue_stats_updater() + start_service_monitoring() diff --git a/tests/sentry/monitoring/test_queues.py b/tests/sentry/monitoring/test_queues.py deleted file mode 100644 index 9d59aa25a2d63..0000000000000 --- a/tests/sentry/monitoring/test_queues.py +++ /dev/null @@ -1,119 +0,0 @@ -from datetime import datetime -from unittest.mock import Mock, patch - -import msgpack -from arroyo.backends.kafka import KafkaPayload -from arroyo.processing.strategies.abstract import MessageRejected -from arroyo.types import BrokerValue, Message, Partition, Topic -from pytest import raises - -from sentry import options -from sentry.processing.backpressure.rabbitmq import ( - _list_queues_over_threshold, - _unhealthy_consumer_key, - queue_monitoring_cluster, -) -from sentry.profiles.consumers.process.factory import ProcessProfileStrategyFactory -from sentry.testutils.cases import TestCase -from sentry.utils import json - - -class TestMonitoringQueues(TestCase): - @staticmethod - def processing_factory(): - return ProcessProfileStrategyFactory() - - def test_list_queues_over_threshold(self): - strike_threshold = 10 - with self.options( - { - "backpressure.monitor_queues.strike_threshold": strike_threshold, - } - ): - queue_history = { - "replays.process": strike_threshold - 1, - "profiles.process": strike_threshold + 1, - } - strike_threshold = options.get("backpressure.monitor_queues.strike_threshold") - under_threshold = _list_queues_over_threshold(strike_threshold, queue_history) - - assert under_threshold == { - "replays.process": False, - "profiles.process": True, - } - - def test_backpressure_unhealthy(self): - queue_name = _unhealthy_consumer_key("profiles") - - # Set the queue as unhealthy so it shouldn't process messages - queue_monitoring_cluster.set(queue_name, "1") - with self.options( - { - "backpressure.monitor_queues.enable_check": True, - "backpressure.monitor_queues.check_interval_in_seconds": 0, - "backpressure.monitor_queues.unhealthy_threshold": 0, - "backpressure.monitor_queues.strike_threshold": 1, - } - ): - with raises(MessageRejected): - self.process_one_message() - - @patch("sentry.profiles.consumers.process.factory.process_profile_task.s") - def test_backpressure_healthy(self, process_profile_task): - queue_name = _unhealthy_consumer_key("profiles") - - # Set the queue as healthy - queue_monitoring_cluster.delete(queue_name) - with self.options( - { - "backpressure.monitor_queues.enable_check": True, - "backpressure.monitor_queues.check_interval_in_seconds": 0, - "backpressure.monitor_queues.unhealthy_threshold": 1000, - "backpressure.monitor_queues.strike_threshold": 1, - } - ): - self.process_one_message() - - process_profile_task.assert_called_once() - - @patch("sentry.profiles.consumers.process.factory.process_profile_task.s") - def test_backpressure_not_enabled(self, process_profile_task): - with self.options( - { - "backpressure.monitor_queues.enable_check": False, - } - ): - self.process_one_message() - - process_profile_task.assert_called_once() - - def process_one_message(self): - processing_strategy = self.processing_factory().create_with_partitions( - commit=Mock(), partitions=None - ) - message_dict = { - "organization_id": 1, - "project_id": 1, - "key_id": 1, - "received": int(datetime.utcnow().timestamp()), - "payload": json.dumps({"platform": "android", "profile": ""}), - } - payload = msgpack.packb(message_dict) - - processing_strategy.submit( - Message( - BrokerValue( - KafkaPayload( - b"key", - payload, - [], - ), - Partition(Topic("profiles"), 1), - 1, - datetime.now(), - ) - ) - ) - processing_strategy.poll() - processing_strategy.join(1) - processing_strategy.terminate() diff --git a/tests/sentry/processing/backpressure/test_checking.py b/tests/sentry/processing/backpressure/test_checking.py new file mode 100644 index 0000000000000..6c4fcd5257c82 --- /dev/null +++ b/tests/sentry/processing/backpressure/test_checking.py @@ -0,0 +1,94 @@ +from datetime import datetime +from unittest.mock import Mock, patch + +import msgpack +from arroyo.backends.kafka import KafkaPayload +from arroyo.processing.strategies.abstract import MessageRejected +from arroyo.types import BrokerValue, Message, Partition, Topic +from pytest import raises + +from sentry.processing.backpressure.health import record_consumer_health +from sentry.profiles.consumers.process.factory import ProcessProfileStrategyFactory +from sentry.testutils.helpers.options import override_options +from sentry.utils import json + + +@override_options( + { + "backpressure.checking.enabled": True, + "backpressure.checking.interval": 5, + "backpressure.status_ttl": 60, + } +) +def test_backpressure_unhealthy(): + record_consumer_health({"celery": False}) + with raises(MessageRejected): + process_one_message() + + +@patch("sentry.profiles.consumers.process.factory.process_profile_task.s") +@override_options( + { + "backpressure.checking.enabled": True, + "backpressure.checking.interval": 5, + "backpressure.status_ttl": 60, + } +) +def test_backpressure_healthy(process_profile_task): + record_consumer_health( + { + "celery": True, + "attachments-store": True, + "processing-store": True, + "processing-locks": True, + "post-process-locks": True, + } + ) + process_one_message() + + process_profile_task.assert_called_once() + + +@patch("sentry.profiles.consumers.process.factory.process_profile_task.s") +@override_options( + { + "backpressure.checking.enabled": False, + "backpressure.checking.interval": 5, + } +) +def test_backpressure_not_enabled(process_profile_task): + process_one_message() + + process_profile_task.assert_called_once() + + +def process_one_message(): + processing_strategy = ProcessProfileStrategyFactory().create_with_partitions( + commit=Mock(), partitions={} + ) + message_dict = { + "organization_id": 1, + "project_id": 1, + "key_id": 1, + "received": int(datetime.utcnow().timestamp()), + "payload": json.dumps({"platform": "android", "profile": ""}), + } + payload = msgpack.packb(message_dict) + + processing_strategy.submit( + Message( + BrokerValue( + KafkaPayload( + b"key", + payload, + [], + ), + Partition(Topic("profiles"), 1), + 1, + datetime.now(), + ) + ) + ) + processing_strategy.poll() + processing_strategy.join(1) + processing_strategy.terminate() diff --git a/tests/sentry/processing/backpressure/test_monitoring.py b/tests/sentry/processing/backpressure/test_monitoring.py new file mode 100644 index 0000000000000..616154cc33f1b --- /dev/null +++ b/tests/sentry/processing/backpressure/test_monitoring.py @@ -0,0 +1,84 @@ +import pytest +from django.test.utils import override_settings + +from sentry.processing.backpressure.health import is_consumer_healthy, record_consumer_health +from sentry.processing.backpressure.monitor import ( + Redis, + assert_all_services_defined, + check_service_health, + load_service_definitions, +) +from sentry.testutils.helpers.options import override_options +from sentry.utils import redis + + +def test_loading_definitions() -> None: + with override_settings(SENTRY_PROCESSING_SERVICES={"redis": {"redis": "default"}}): + services = load_service_definitions() + assert "redis" in services + + with pytest.raises(ValueError): + assert_all_services_defined( + { + "sellerie": None, # oops + "attachments-store": None, + "processing-store": None, + "processing-locks": None, + "post-process-locks": None, + } + ) + + +def test_check_redis_health() -> None: + cluster = redis.redis_clusters.get("default") + services = {"redis": Redis(cluster)} + + with override_options( + { + "backpressure.high_watermarks": {"redis": 1.0}, + } + ): + service_health = check_service_health(services) + assert service_health["redis"] is True + + with override_options( + { + # NOTE: the default cluster for local testing will return *some* kind of used and available memory + "backpressure.high_watermarks": {"redis": 0.0}, + } + ): + service_health = check_service_health(services) + assert service_health["redis"] is False + + +@override_options( + { + "backpressure.checking.enabled": True, + "backpressure.status_ttl": 60, + } +) +def test_record_consumer_health() -> None: + service_health = { + "celery": True, + "attachments-store": True, + "processing-store": True, + "processing-locks": True, + "post-process-locks": True, + } + record_consumer_health(service_health) + assert is_consumer_healthy() is True + + service_health["celery"] = False + record_consumer_health(service_health) + assert is_consumer_healthy() is False + + with pytest.raises(KeyError): + record_consumer_health( + { + "sellerie": True, # oops + "attachments-store": True, + "processing-store": True, + "processing-locks": True, + "post-process-locks": True, + } + ) diff --git a/tests/sentry/processing/backpressure/test_redis.py b/tests/sentry/processing/backpressure/test_redis.py index 54747b6b79fcb..3b37735066e6a 100644 --- a/tests/sentry/processing/backpressure/test_redis.py +++ b/tests/sentry/processing/backpressure/test_redis.py @@ -1,10 +1,13 @@ -from sentry.processing.backpressure.redis import RedisMemoryUsageMetrics +from sentry.processing.backpressure.memory import iter_cluster_memory_usage from sentry.utils import redis def test_returns_some_usage() -> None: - client = redis.redis_clusters.get("default") - metrics = RedisMemoryUsageMetrics([client]) + cluster = redis.redis_clusters.get("default") - usage = metrics.query_usage_percentage() - assert 0 < usage < 1 + usage = [usage for usage in iter_cluster_memory_usage(cluster)] + assert len(usage) > 0 + memory = usage[0] + assert memory.used > 0 + assert memory.available > 0 + assert 0.0 < memory.percentage < 1.0