Skip to content

Commit

Permalink
feat(backpressure): Add Service monitoring (#50928)
Browse files Browse the repository at this point in the history
This changes the existing queue-based monitoring to a more generic
`service` monitoring.
    
Services are organized in a hierarchy of base services, and consumers
depending on those.
    
The main monitor / monitoring loop will check the health of different
base services,
and aggregates that health for the consumers. The consumer health is
then persisted and queried.
    
The settings around monitoring and checking are also streamlined, and a
new setting for high watermarks replaces the existing queue-size based
options.

fixes https://github.com/getsentry/team-processing/issues/56
fixes https://github.com/getsentry/team-processing/issues/55

---------

Co-authored-by: Sebastian Zivota <[email protected]>
  • Loading branch information
Swatinem and loewenheim authored Jun 16, 2023
1 parent 904a049 commit 13ef106
Show file tree
Hide file tree
Showing 14 changed files with 477 additions and 468 deletions.
40 changes: 21 additions & 19 deletions src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
Callable,
Dict,
Iterable,
List,
Mapping,
Optional,
Tuple,
Expand Down Expand Up @@ -3528,24 +3527,27 @@ class TopicDefinition(TypedDict):
# Once we start detecting crashes for other SDKs, this will be a mapping of SDK name to project ID or something similar.
SDK_CRASH_DETECTION_PROJECT_ID: Optional[int] = None

# The Redis cluster to use for monitoring the health of
# Celery queues.
SENTRY_QUEUE_MONITORING_REDIS_CLUSTER = "default"

# The RabbitMQ hosts whose health should be monitored by the backpressure system.
# This should be a list of dictionaries with keys "url" and "vhost".
# E.g. for local testing: [{"url": "https://guest:guest@localhost:15672", "vhost": "%2F"}]
SENTRY_QUEUE_MONITORING_RABBITMQ_HOSTS: List[Dict[str, str]] = []

# This is a mapping between the various processing stores,
# and the redis `cluster` they are using.
# This setting needs to be appropriately synched across the various deployments
# for automatic backpressure management to properly work.
SENTRY_PROCESSING_REDIS_CLUSTERS = {
"attachments": "rc-short",
# "processing": "processing",
"locks": "default",
"post_process_locks": "default",
# The Redis cluster to use for monitoring the service / consumer health.
SENTRY_SERVICE_MONITORING_REDIS_CLUSTER = "default"

# This is a view of which abstract processing service is backed by which infrastructure.
# Right now, the infrastructure can be `redis` or `rabbitmq`.
#
# For `redis`, one has to provide the cluster id.
# It has to match a cluster defined in `redis.redis_clusters`.
#
# For `rabbitmq`, one has to provide a list of server URLs.
# The URL is in the format `http://{user}:{password}@{hostname}:{port}/`.
#
# The definition can also be empty, in which case nothing is checked and
# the service is assumed to be healthy.
# However, the service *must* be defined.
SENTRY_PROCESSING_SERVICES: Mapping[str, Any] = {
"celery": {"redis": "default"},
"attachments-store": {"redis": "rc-short"},
"processing-store": {}, # "redis": "processing"},
"processing-locks": {"redis": "default"},
"post-process-locks": {"redis": "default"},
}


Expand Down
49 changes: 31 additions & 18 deletions src/sentry/options/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -1305,24 +1305,37 @@
register("span_descs.bump-lifetime-sample-rate", default=0.25, flags=FLAG_AUTOMATOR_MODIFIABLE)
# Decides whether artifact bundles asynchronous renewal is enabled.
register("sourcemaps.artifact-bundles.enable-renewal", default=0.0, flags=FLAG_AUTOMATOR_MODIFIABLE)
# Enables reporting status of queues for backpressure management.
register(
"backpressure.monitor_queues.enable_status", default=False, flags=FLAG_AUTOMATOR_MODIFIABLE
)
# Enables checking queue health in consumers for backpressure management.
register("backpressure.monitor_queues.enable_check", default=False, flags=FLAG_AUTOMATOR_MODIFIABLE)

# === Backpressure related runtime options ===

# Enables monitoring of services for backpressure management.
register("backpressure.monitoring.enabled", default=False, flags=FLAG_AUTOMATOR_MODIFIABLE)
# How often the monitor will check service health.
register("backpressure.monitoring.interval", default=5, flags=FLAG_AUTOMATOR_MODIFIABLE)

# Enables checking consumer health for backpressure management.
register("backpressure.checking.enabled", default=False, flags=FLAG_AUTOMATOR_MODIFIABLE)
# How often a consumer will check for its health in a debounced fassion.
register("backpressure.checking.interval", default=5, flags=FLAG_AUTOMATOR_MODIFIABLE)


# How long a status is persisted, which means that updates to health status can be paused for that long before consumers will assume things are unhealthy
register("backpressure.status_ttl", default=60, flags=FLAG_AUTOMATOR_MODIFIABLE)

# The high-watermark levels per-service which will mark a service as unhealthy.
# This should mirror the `SENTRY_PROCESSING_SERVICES` setting.
# If this option is being modified downstream, and a new default setting
# may be added, it has to be updated downstream as well, otherwise the
# code will throw. This is intentional. We want to throw fast and loud on
# misconfiguration.
register(
"backpressure.monitor_queues.check_interval_in_seconds",
default=5,
"backpressure.high_watermarks",
default={
"celery": 0.5,
"attachments-store": 0.5,
"processing-store": 0.5,
"processing-locks": 0.5,
"post-process-locks": 0.5,
},
flags=FLAG_AUTOMATOR_MODIFIABLE,
)
register(
"backpressure.monitor_queues.unhealthy_threshold", default=1000, flags=FLAG_AUTOMATOR_MODIFIABLE
)
# How often we check queue health.
register("backpressure.monitor_queues.check_interval", default=5, flags=FLAG_AUTOMATOR_MODIFIABLE)
# How many times in a row a queue must be unhealthy before it is
# recorded in Redis. 12 * 5sec = unhealthy for 1 minute.
register(
"backpressure.monitor_queues.strike_threshold", default=12, flags=FLAG_AUTOMATOR_MODIFIABLE
)
6 changes: 2 additions & 4 deletions src/sentry/processing/backpressure/arroyo.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from arroyo.types import FilteredPayload, Message

from sentry import options
from sentry.processing.backpressure.rabbitmq import is_consumer_healthy
from sentry.processing.backpressure.health import is_consumer_healthy

# As arroyo would otherwise busy-wait, we will sleep for a short time
# when a message is rejected.
Expand All @@ -24,9 +24,7 @@ def __init__(self, consumer_name: str = "default"):
def is_healthy(self) -> bool:
now = time.time()
# Check queue health if it's been more than the interval
if now - self.last_check >= options.get(
"backpressure.monitor_queues.check_interval_in_seconds"
):
if now - self.last_check >= options.get("backpressure.checking.interval"):
# TODO: We would want to at first monitor everything all at once,
# and make it more fine-grained later on.
self.is_queue_healthy = is_consumer_healthy(self.consumer_name)
Expand Down
57 changes: 57 additions & 0 deletions src/sentry/processing/backpressure/health.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from typing import Mapping

import sentry_sdk
from django.conf import settings

from sentry import options
from sentry.processing.backpressure.topology import CONSUMERS
from sentry.utils import redis


def _prefix_key(key_name: str) -> str:
return f"bp1:{key_name}"


HEALTHY_KEY_NAME = "consumer_is_healthy"


def _unhealthy_consumer_key(name: str) -> str:
return _prefix_key(f"{HEALTHY_KEY_NAME}:{name}")


service_monitoring_cluster = redis.redis_clusters.get(
settings.SENTRY_SERVICE_MONITORING_REDIS_CLUSTER
)


def is_consumer_healthy(consumer_name: str = "default") -> bool:
"""Checks whether the given consumer is healthy by looking it up in Redis.
NB: If the consumer is not found in Redis, it is assumed to be healthy.
This behavior might change in the future.
"""

if not options.get("backpressure.checking.enabled"):
return True
# check if queue is healthy by pinging Redis
try:
return service_monitoring_cluster.get(_unhealthy_consumer_key(consumer_name)) == "true"
except Exception as e:
sentry_sdk.capture_exception(e)
# By default it's considered unhealthy
return False


def record_consumer_health(service_health: Mapping[str, bool]) -> None:
with service_monitoring_cluster.pipeline() as pipeline:
key_ttl = options.get("backpressure.status_ttl")
for name, dependencies in CONSUMERS.items():
is_healthy = True
for dependency in dependencies:
is_healthy = is_healthy and service_health[dependency]

pipeline.set(
_unhealthy_consumer_key(name), "true" if is_healthy else "false", ex=key_ttl
)

pipeline.execute()
64 changes: 64 additions & 0 deletions src/sentry/processing/backpressure/memory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from dataclasses import dataclass
from typing import Any, Generator, Mapping, Union

import requests
from redis import Redis
from rediscluster import RedisCluster


@dataclass
class ServiceMemory:
used: int
available: int
percentage: float

def __init__(self, used: int, available: int):
self.used = used
self.available = available
self.percentage = used / available


def query_rabbitmq_memory_usage(host: str) -> ServiceMemory:
"""Returns the currently used memory and the memory limit of a
RabbitMQ host.
"""

if not host.endswith("/"):
host += "/"
url = f"{host}api/nodes"

response = requests.get(url)
response.raise_for_status()
json = response.json()
return ServiceMemory(json[0]["mem_used"], json[0]["mem_limit"])


# Based on configuration, this could be:
# - a `rediscluster` Cluster (actually `RetryingRedisCluster`)
# - a straight `Redis` client (actually `FailoverRedis`)
# - or any class configured via `client_class`.
# It could in theory also be a `rb` (aka redis blaster) Cluster, but we
# intentionally do not support these.
Cluster = Union[RedisCluster, Redis]


def get_memory_usage(info: Mapping[str, Any]) -> ServiceMemory:
# or alternatively: `used_memory_rss`?
memory_used = info.get("used_memory", 0)
# `maxmemory` might be 0 in development
memory_available = info.get("maxmemory", 0) or info["total_system_memory"]

return ServiceMemory(memory_used, memory_available)


def iter_cluster_memory_usage(cluster: Cluster) -> Generator[ServiceMemory, None, None]:
"""
A generator that yields redis `INFO` results for each of the nodes in the `cluster`.
"""
if isinstance(cluster, RedisCluster):
# `RedisCluster` returns these as a dictionary, with the node-id as key
for info in cluster.info().values():
yield get_memory_usage(info)
else:
# otherwise, lets just hope that `info()` does the right thing
yield get_memory_usage(cluster.info())
102 changes: 102 additions & 0 deletions src/sentry/processing/backpressure/monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import time
from dataclasses import dataclass
from typing import Dict, Generator, List, Mapping, Union

from django.conf import settings

from sentry import options
from sentry.processing.backpressure.health import record_consumer_health

# from sentry import options
from sentry.processing.backpressure.memory import (
Cluster,
ServiceMemory,
iter_cluster_memory_usage,
query_rabbitmq_memory_usage,
)
from sentry.processing.backpressure.topology import PROCESSING_SERVICES
from sentry.utils import redis


@dataclass
class Redis:
cluster: Cluster


@dataclass
class RabbitMq:
servers: List[str]


Service = Union[Redis, RabbitMq, None]


def check_service_memory(service: Service) -> Generator[ServiceMemory, None, None]:
"""
This queries the given [`Service`] and returns the [`ServiceMemory`]
for each of the individual servers that comprise the service.
"""

if isinstance(service, Redis):
yield from iter_cluster_memory_usage(service.cluster)

elif isinstance(service, RabbitMq):
for server in service.servers:
yield query_rabbitmq_memory_usage(server)


def load_service_definitions() -> Dict[str, Service]:
services: Dict[str, Service] = {}
for name, definition in settings.SENTRY_PROCESSING_SERVICES.items():
if cluster_id := definition.get("redis"):
cluster = redis.redis_clusters.get(cluster_id)
services[name] = Redis(cluster)

elif rabbitmq_urls := definition.get("rabbitmq"):
services[name] = RabbitMq(rabbitmq_urls)

else:
services[name] = None

return services


def assert_all_services_defined(services: Dict[str, Service]) -> None:
for name in PROCESSING_SERVICES:
if name not in services:
raise ValueError(
f"The `{name}` Service is missing from `settings.SENTRY_PROCESSING_SERVICES`."
)


def check_service_health(services: Mapping[str, Service]) -> Mapping[str, bool]:
service_health = {}
high_watermarks = options.get("backpressure.high_watermarks")

for name, service in services.items():
high_watermark = high_watermarks[name]
is_healthy = True
for memory in check_service_memory(service):
is_healthy = is_healthy and memory.percentage < high_watermark

service_health[name] = is_healthy

return service_health


def start_service_monitoring() -> None:
services = load_service_definitions()
assert_all_services_defined(services)

while True:
if not options.get("backpressure.monitoring.enabled"):
time.sleep(options.get("backpressure.monitoring.interval"))
continue

# first, check each base service and record its health
service_health = check_service_health(services)

# then, check the derived services and record their health
record_consumer_health(service_health)

time.sleep(options.get("backpressure.monitoring.interval"))
Loading

0 comments on commit 13ef106

Please sign in to comment.