From 5213d08503ce022e5f04362a184480a586a8b519 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Wed, 21 Aug 2024 20:22:07 -0600 Subject: [PATCH 1/5] feat: Interruption and termination signals in taps and targets --- singer_sdk/plugin_base.py | 25 ++++++++++++++++++++++++- singer_sdk/tap_base.py | 17 +++++++++++++++++ singer_sdk/target_base.py | 19 +++++++++++++++++++ 3 files changed, 60 insertions(+), 1 deletion(-) diff --git a/singer_sdk/plugin_base.py b/singer_sdk/plugin_base.py index 155b7423f..f15a77796 100644 --- a/singer_sdk/plugin_base.py +++ b/singer_sdk/plugin_base.py @@ -5,13 +5,14 @@ import abc import logging import os +import signal import sys import time import typing as t import warnings from importlib import metadata from pathlib import Path, PurePath -from types import MappingProxyType +from types import FrameType, MappingProxyType import click @@ -217,6 +218,10 @@ def __init__( # Initialization timestamp self.__initialized_at = int(time.time() * 1000) + # Signal handling + signal.signal(signal.SIGINT, self._handle_termination) + signal.signal(signal.SIGTERM, self._handle_termination) + def setup_mapper(self) -> None: """Initialize the plugin mapper for this tap.""" self._mapper = PluginMapper( @@ -445,6 +450,24 @@ def _validate_config(self, *, raise_errors: bool = True) -> list[str]: return errors + def _handle_termination( # pragma: no cover + self, + signum: int, # noqa: ARG002 + frame: FrameType | None, # noqa: ARG002 + ) -> None: + """Handle termination signal. + + Args: + signum: Signal number. + frame: Frame. + + Raises: + click.Abort: If the termination signal is received. + """ + self.logger.info("Gracefully shutting down...") + errmsg = "Received termination signal" + raise click.Abort(errmsg) + @classmethod def print_version( cls: type[PluginBase], diff --git a/singer_sdk/tap_base.py b/singer_sdk/tap_base.py index 7eb313063..11785fa06 100644 --- a/singer_sdk/tap_base.py +++ b/singer_sdk/tap_base.py @@ -35,6 +35,7 @@ if t.TYPE_CHECKING: from pathlib import PurePath + from types import FrameType from singer_sdk.connectors import SQLConnector from singer_sdk.mapper import PluginMapper @@ -498,6 +499,22 @@ def sync_all(self) -> None: # Command Line Execution + def _handle_termination( # pragma: no cover + self, + signum: int, + frame: FrameType | None, + ) -> None: + """Handle termination signal. + + Args: + signum: Signal number. + frame: Frame. + """ + # Emit a final state message to ensure the state is written to the output + # even if the process is terminated by a signal. + self.write_message(StateMessage(value=self.state)) + super()._handle_termination(signum, frame) + @classmethod def invoke( # type: ignore[override] cls: type[Tap], diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index 494979678..7a8cfa294 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -34,6 +34,7 @@ if t.TYPE_CHECKING: from collections.abc import Iterable from pathlib import PurePath + from types import FrameType from singer_sdk.connectors import SQLConnector from singer_sdk.mapper import PluginMapper @@ -537,6 +538,24 @@ def _write_state_message(self, state: dict) -> None: # CLI handler + def _handle_termination( # pragma: no cover + self, + signum: int, + frame: FrameType | None, + ) -> None: + """Handle termination signals. + + Args: + signum: Signal number. + frame: Frame object. + """ + self.logger.info( + "Received termination signal %d, draining all sinks...", + signum, + ) + self.drain_all(is_endofpipe=True) + super()._handle_termination(signum, frame) + @classmethod def invoke( # type: ignore[override] cls: type[Target], From bc1b48a67fd34aa35449418d4b192c9b66288964 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= Date: Thu, 22 May 2025 14:25:24 -0600 Subject: [PATCH 2/5] refactor: Ensure termination is always handled Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com> --- singer_sdk/tap_base.py | 6 ++++-- singer_sdk/target_base.py | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/singer_sdk/tap_base.py b/singer_sdk/tap_base.py index 11785fa06..b77e079f0 100644 --- a/singer_sdk/tap_base.py +++ b/singer_sdk/tap_base.py @@ -512,8 +512,10 @@ def _handle_termination( # pragma: no cover """ # Emit a final state message to ensure the state is written to the output # even if the process is terminated by a signal. - self.write_message(StateMessage(value=self.state)) - super()._handle_termination(signum, frame) + try: + self.write_message(StateMessage(value=self.state)) + finally: + super()._handle_termination(signum, frame) @classmethod def invoke( # type: ignore[override] diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index 7a8cfa294..ce51595d9 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -553,8 +553,10 @@ def _handle_termination( # pragma: no cover "Received termination signal %d, draining all sinks...", signum, ) - self.drain_all(is_endofpipe=True) - super()._handle_termination(signum, frame) + try: + self.drain_all(is_endofpipe=True) + finally: + super()._handle_termination(signum, frame) @classmethod def invoke( # type: ignore[override] From d20169e37a5a08a5711bac011ca467d708a3e096 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= Date: Thu, 22 May 2025 14:33:53 -0600 Subject: [PATCH 3/5] refactor: Use `sys.exit(0)` for a graceful shutdown --- singer_sdk/plugin_base.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/singer_sdk/plugin_base.py b/singer_sdk/plugin_base.py index f15a77796..f777e2c7b 100644 --- a/singer_sdk/plugin_base.py +++ b/singer_sdk/plugin_base.py @@ -460,13 +460,9 @@ def _handle_termination( # pragma: no cover Args: signum: Signal number. frame: Frame. - - Raises: - click.Abort: If the termination signal is received. """ self.logger.info("Gracefully shutting down...") - errmsg = "Received termination signal" - raise click.Abort(errmsg) + sys.exit(0) @classmethod def print_version( From b2cdd66ac0e1b4b4dbfd8ebaed82d01b03d88dc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= Date: Thu, 22 May 2025 14:34:42 -0600 Subject: [PATCH 4/5] refactor: Only register handlers in the main thread --- singer_sdk/plugin_base.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/singer_sdk/plugin_base.py b/singer_sdk/plugin_base.py index f777e2c7b..82debf1f2 100644 --- a/singer_sdk/plugin_base.py +++ b/singer_sdk/plugin_base.py @@ -7,6 +7,7 @@ import os import signal import sys +import threading import time import typing as t import warnings @@ -219,8 +220,14 @@ def __init__( self.__initialized_at = int(time.time() * 1000) # Signal handling - signal.signal(signal.SIGINT, self._handle_termination) - signal.signal(signal.SIGTERM, self._handle_termination) + self._setup_signal_handlers() + + def _setup_signal_handlers(self) -> None: + if threading.current_thread() == threading.main_thread(): + if hasattr(signal, "SIGINT"): + signal.signal(signal.SIGINT, self._handle_termination) + if hasattr(signal, "SIGTERM"): + signal.signal(signal.SIGTERM, self._handle_termination) def setup_mapper(self) -> None: """Initialize the plugin mapper for this tap.""" From 2d05622210367320383867b79ff3474040df19de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= Date: Thu, 22 May 2025 14:38:34 -0600 Subject: [PATCH 5/5] chore: Ignore coverage --- singer_sdk/plugin_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/singer_sdk/plugin_base.py b/singer_sdk/plugin_base.py index 82debf1f2..0f73c60a1 100644 --- a/singer_sdk/plugin_base.py +++ b/singer_sdk/plugin_base.py @@ -222,7 +222,7 @@ def __init__( # Signal handling self._setup_signal_handlers() - def _setup_signal_handlers(self) -> None: + def _setup_signal_handlers(self) -> None: # pragma: no cover if threading.current_thread() == threading.main_thread(): if hasattr(signal, "SIGINT"): signal.signal(signal.SIGINT, self._handle_termination)