Skip to content

Commit 6344071

Browse files
feat: Interruption and termination signals in taps and targets
1 parent 8adf452 commit 6344071

File tree

5 files changed

+79
-2
lines changed

5 files changed

+79
-2
lines changed

samples/sample_tap_countries/countries_streams.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
from singer_sdk import typing as th
1818
from singer_sdk.streams.graphql import GraphQLStream
1919

20+
if t.TYPE_CHECKING:
21+
from collections.abc import Iterable
22+
2023
SCHEMAS_DIR = importlib.resources.files(__package__) / "schemas"
2124

2225

@@ -83,6 +86,18 @@ class CountriesStream(CountriesAPIStream):
8386
),
8487
).to_dict()
8588

89+
# FIXME: revert these changes before merging
90+
def request_records(self, context) -> Iterable[dict]:
91+
import time # noqa: PLC0415
92+
93+
records = super().request_records(context)
94+
95+
yield next(records) # Emit the first record
96+
97+
time.sleep(60) # Simulate a slow stream
98+
99+
yield from records # Emit the rest of the records
100+
86101

87102
class ContinentsStream(CountriesAPIStream):
88103
"""Continents stream from the Countries API."""

samples/sample_target_csv/csv_target.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@ class SampleTargetCSV(Target):
1313
name = "target-csv"
1414
config_jsonschema = th.PropertiesList(
1515
th.Property(
16-
"target_folder", th.StringType, required=True, title="Target Folder"
16+
"target_folder",
17+
th.StringType,
18+
default="output",
19+
title="Target Folder",
1720
),
1821
th.Property("file_naming_scheme", th.StringType, title="File Naming Scheme"),
1922
).to_dict()

singer_sdk/plugin_base.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,14 @@
55
import abc
66
import logging
77
import os
8+
import signal
89
import sys
910
import time
1011
import typing as t
1112
import warnings
1213
from importlib import metadata
1314
from pathlib import Path, PurePath
14-
from types import MappingProxyType
15+
from types import FrameType, MappingProxyType
1516

1617
import click
1718

@@ -217,6 +218,10 @@ def __init__(
217218
# Initialization timestamp
218219
self.__initialized_at = int(time.time() * 1000)
219220

221+
# Signal handling
222+
signal.signal(signal.SIGINT, self._handle_termination)
223+
signal.signal(signal.SIGTERM, self._handle_termination)
224+
220225
def setup_mapper(self) -> None:
221226
"""Initialize the plugin mapper for this tap."""
222227
self._mapper = PluginMapper(
@@ -445,6 +450,24 @@ def _validate_config(self, *, raise_errors: bool = True) -> list[str]:
445450

446451
return errors
447452

453+
def _handle_termination( # pragma: no cover
454+
self,
455+
signum: int, # noqa: ARG002
456+
frame: FrameType | None, # noqa: ARG002
457+
) -> None:
458+
"""Handle termination signal.
459+
460+
Args:
461+
signum: Signal number.
462+
frame: Frame.
463+
464+
Raises:
465+
click.Abort: If the termination signal is received.
466+
"""
467+
self.logger.info("Gracefully shutting down...")
468+
errmsg = "Received termination signal"
469+
raise click.Abort(errmsg)
470+
448471
@classmethod
449472
def print_version(
450473
cls: type[PluginBase],

singer_sdk/tap_base.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
if t.TYPE_CHECKING:
3737
from pathlib import PurePath
38+
from types import FrameType
3839

3940
from singer_sdk.connectors import SQLConnector
4041
from singer_sdk.mapper import PluginMapper
@@ -498,6 +499,22 @@ def sync_all(self) -> None:
498499

499500
# Command Line Execution
500501

502+
def _handle_termination( # pragma: no cover
503+
self,
504+
signum: int,
505+
frame: FrameType | None,
506+
) -> None:
507+
"""Handle termination signal.
508+
509+
Args:
510+
signum: Signal number.
511+
frame: Frame.
512+
"""
513+
# Emit a final state message to ensure the state is written to the output
514+
# even if the process is terminated by a signal.
515+
self.write_message(StateMessage(value=self.state))
516+
super()._handle_termination(signum, frame)
517+
501518
@classmethod
502519
def invoke( # type: ignore[override]
503520
cls: type[Tap],

singer_sdk/target_base.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
if t.TYPE_CHECKING:
3535
from collections.abc import Iterable
3636
from pathlib import PurePath
37+
from types import FrameType
3738

3839
from singer_sdk.connectors import SQLConnector
3940
from singer_sdk.mapper import PluginMapper
@@ -537,6 +538,24 @@ def _write_state_message(self, state: dict) -> None:
537538

538539
# CLI handler
539540

541+
def _handle_termination( # pragma: no cover
542+
self,
543+
signum: int,
544+
frame: FrameType | None,
545+
) -> None:
546+
"""Handle termination signals.
547+
548+
Args:
549+
signum: Signal number.
550+
frame: Frame object.
551+
"""
552+
self.logger.info(
553+
"Received termination signal %d, draining all sinks...",
554+
signum,
555+
)
556+
self.drain_all(is_endofpipe=True)
557+
super()._handle_termination(signum, frame)
558+
540559
@classmethod
541560
def invoke( # type: ignore[override]
542561
cls: type[Target],

0 commit comments

Comments
 (0)