diff --git a/src/sentry/rules/processing/delayed_processing.py b/src/sentry/rules/processing/delayed_processing.py index eef36e4ba63b28..7cc10a2fb84b0a 100644 --- a/src/sentry/rules/processing/delayed_processing.py +++ b/src/sentry/rules/processing/delayed_processing.py @@ -1,13 +1,19 @@ import logging +import os import random +import signal +import threading +import time import uuid from collections import defaultdict +from contextlib import contextmanager from dataclasses import dataclass from datetime import datetime, timedelta, timezone from typing import Any, DefaultDict, NamedTuple import sentry_sdk from celery import Task +from django.core.exceptions import SoftTimeLimitExceeded from django.db.models import OuterRef, Subquery from sentry import buffer, features, nodestore @@ -596,7 +602,8 @@ def fire_rules( # TODO(cathy): add opposite of the FF organizations:workflow-engine-trigger-actions for callback, futures in callback_and_futures: - safe_execute(callback, groupevent, futures) + with failure_context(3): + callback(groupevent, futures) if log_config.num_events_issue_debugging: logger.info( @@ -632,6 +639,64 @@ def cleanup_redis_buffer( ) +class TimeoutException(Exception): + pass + + +class timeout: + _nesting = 0 + _lock = threading.Lock() + + def __init__(self, timeout: timedelta): + self.seconds: float = timeout.total_seconds() + self._thread: threading.Thread | None = None + self._prev_handler = None + + def _handle_timeout(self, signum, frame): + raise TimeoutException(f"Timed out after {self.seconds} seconds") + + def _timeout_thread(self): + time.sleep(self.seconds) + # Send SIGUSR1 to main thread + os.kill(os.getpid(), signal.SIGUSR1) + + def __enter__(self): + with timeout._lock: + timeout._nesting += 1 + if timeout._nesting == 1: + # Install handler only at outermost level + self._prev_handler = signal.getsignal(signal.SIGUSR1) + signal.signal(signal.SIGUSR1, self._handle_timeout) + # Start local timer thread + self._thread = threading.Thread(target=self._timeout_thread) + self._thread.daemon = True + self._thread.start() + + def __exit__(self, exc_type, exc_value, traceback): + with timeout._lock: + timeout._nesting -= 1 + if timeout._nesting == 0: + # Restore original handler only at outermost level + signal.signal(signal.SIGUSR1, self._prev_handler) + + +@contextmanager +def failure_context(deadline: timedelta): + """ + Context manager for running code that you want to allow to fail without leaking + exceptions. + """ + with timeout(deadline.total_seconds()): + try: + yield + except TimeoutException: + raise + except SoftTimeLimitExceeded: + raise + except Exception as e: + logger.exception("delayed_processing.failure_context", extra={"exception": e}) + + @instrumented_task( name="sentry.rules.processing.delayed_processing", queue="delayed_rules",