Skip to content

Commit

Permalink
Reset signals on exiting SchedulerJob loop (#44370)
Browse files Browse the repository at this point in the history
Under normal use (i.e. running `airflow scheduler`) this doesn't matter as the
process is about to exit.

However this can come up in running tests -- for instance if you try to run
`pytest tests/jobs/test_scheduler_job.py tests/executors/` it will fail as the
signal handler from running the scheduler is still installed in the main
pytest process.

Since the fix is easy and doesn't significantly complicate anything it is
worth making the behaviour under pytest more "correct".
  • Loading branch information
ashb authored Nov 25, 2024
1 parent 28ce656 commit c9484fc
Showing 1 changed file with 19 additions and 8 deletions.
27 changes: 19 additions & 8 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import time
from collections import Counter, defaultdict, deque
from collections.abc import Collection, Iterable, Iterator
from contextlib import suppress
from contextlib import ExitStack, suppress
from datetime import timedelta
from functools import lru_cache, partial
from itertools import groupby
Expand Down Expand Up @@ -219,14 +219,22 @@ def __init__(
def heartbeat_callback(self, session: Session = NEW_SESSION) -> None:
Stats.incr("scheduler_heartbeat", 1, 1)

def register_signals(self) -> None:
def register_signals(self) -> ExitStack:
"""Register signals that stop child processes."""
signal.signal(signal.SIGINT, self._exit_gracefully)
signal.signal(signal.SIGTERM, self._exit_gracefully)
signal.signal(signal.SIGUSR2, self._debug_dump)
resetter = ExitStack()
prev_int = signal.signal(signal.SIGINT, self._exit_gracefully)
prev_term = signal.signal(signal.SIGTERM, self._exit_gracefully)
prev_usr2 = signal.signal(signal.SIGUSR2, self._debug_dump)

resetter.callback(signal.signal, signal.SIGINT, prev_int)
resetter.callback(signal.signal, signal.SIGTERM, prev_term)
resetter.callback(signal.signal, signal.SIGUSR2, prev_usr2)

if self._enable_tracemalloc:
signal.signal(signal.SIGUSR1, self._log_memory_usage)
prev = signal.signal(signal.SIGUSR1, self._log_memory_usage)
resetter.callback(signal.signal, signal.SIGUSR1, prev)

return resetter

def _exit_gracefully(self, signum: int, frame: FrameType | None) -> None:
"""Clean up processor_agent to avoid leaving orphan processes."""
Expand Down Expand Up @@ -927,6 +935,7 @@ def _execute(self) -> int | None:
async_mode=async_mode,
)

reset_signals = self.register_signals()
try:
callback_sink: PipeCallbackSink | DatabaseCallbackSink

Expand All @@ -944,8 +953,6 @@ def _execute(self) -> int | None:
executor.callback_sink = callback_sink
executor.start()

self.register_signals()

if self.processor_agent:
self.processor_agent.start()

Expand Down Expand Up @@ -981,6 +988,10 @@ def _execute(self) -> int | None:
self.processor_agent.end()
except Exception:
self.log.exception("Exception when executing DagFileProcessorAgent.end")

# Under normal execution, this doesn't metter, but by resetting signals it lets us run more things
# in the same process under testing without leaking global state
reset_signals.close()
self.log.info("Exited execute loop")
return None

Expand Down

0 comments on commit c9484fc

Please sign in to comment.