-
Notifications
You must be signed in to change notification settings - Fork 76
feat: Interruption and termination signals are handled in taps and targets #2620
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
CodSpeed Performance ReportMerging #2620 will not alter performanceComparing Summary
|
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #2620 +/- ##
==========================================
- Coverage 91.68% 91.65% -0.04%
==========================================
Files 62 62
Lines 5315 5318 +3
Branches 686 686
==========================================
+ Hits 4873 4874 +1
- Misses 310 312 +2
Partials 132 132 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
57ed344
to
b795525
Compare
ce5a41e
to
f87387e
Compare
af2c00e
to
aa5831a
Compare
aa5831a
to
cc7a06b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements enhanced handling of interruption and termination signals across taps and targets to ensure graceful shutdown behavior. Key changes include:
- Adding a _handle_termination method in target_base and tap_base to drain sinks and write state before termination.
- Registering signal handlers in plugin_base for SIGINT and SIGTERM with a graceful abort.
- Updating sample configurations and adding temporary simulation logic in countries_streams.
Reviewed Changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.
Show a summary per file
File | Description |
---|---|
singer_sdk/target_base.py | Adds _handle_termination to drain sinks and delegate termination. |
singer_sdk/tap_base.py | Implements _handle_termination to write state messages upon exit. |
singer_sdk/plugin_base.py | Registers termination signal handlers and aborts on termination. |
samples/sample_target_csv/csv_target.py | Adds a default value for target_folder in the CSV target config. |
samples/sample_tap_countries/countries_streams.py | Introduces temporary slowdown simulation with a FIXME comment. |
Comments suppressed due to low confidence (2)
singer_sdk/plugin_base.py:468
- [nitpick] Consider including the actual signal number in the error message to provide more context for debugging purposes.
errmsg = "Received termination signal"
samples/sample_tap_countries/countries_streams.py:89
- Please remove the temporary slowdown simulation in the request_records method before merging as indicated by the FIXME comment.
# FIXME: revert these changes before merging
signum: Signal number. | ||
frame: Frame. | ||
""" | ||
self.write_message(StateMessage(value=self.state)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Consider adding a comment to clarify why termination handling in taps involves writing a state message before calling the super implementation.
Copilot uses AI. Check for mistakes.
cc7a06b
to
6344071
Compare
@sourcery-ai review |
Reviewer's GuideThis PR centralizes termination signal handling by registering SIGINT/SIGTERM in PluginBase and extending taps and targets to perform graceful shutdown tasks (emitting final state, draining sinks, aborting via click), and updates sample connectors to simulate a slow stream and set a default output folder. Sequence Diagram: Tap Termination Signal HandlingsequenceDiagram
actor User as OS/User
participant TapInstance as Tap
participant PB as PluginBase
User->>TapInstance: Sends SIGINT/SIGTERM
activate TapInstance
TapInstance->>TapInstance: _handle_termination(signum, frame)
TapInstance->>TapInstance: Writes final state message
TapInstance->>PB: super()._handle_termination(signum, frame)
activate PB
PB->>PB: Logs "Gracefully shutting down..."
PB->>PB: Raises click.Abort
deactivate PB
deactivate TapInstance
Sequence Diagram: Target Termination Signal HandlingsequenceDiagram
actor User as OS/User
participant TargetInstance as Target
participant PB as PluginBase
User->>TargetInstance: Sends SIGINT/SIGTERM
activate TargetInstance
TargetInstance->>TargetInstance: _handle_termination(signum, frame)
TargetInstance->>TargetInstance: Logs "Draining sinks..."
TargetInstance->>TargetInstance: Drains all sinks
TargetInstance->>PB: super()._handle_termination(signum, frame)
activate PB
PB->>PB: Logs "Gracefully shutting down..."
PB->>PB: Raises click.Abort
deactivate PB
deactivate TargetInstance
Updated Class Diagram for Termination Signal HandlingclassDiagram
class PluginBase {
+__init__() void
#_handle_termination(signum: int, frame: object) void
}
class TapBase {
#_handle_termination(signum: int, frame: object) void
}
class TargetBase {
#_handle_termination(signum: int, frame: object) void
}
PluginBase <|-- TapBase
PluginBase <|-- TargetBase
File-Level Changes
Assessment against linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @edgarrmondragon - I've reviewed your changes and found some issues that need to be addressed.
Blocking issues:
- time.sleep() call; did you mean to leave this in? (link)
- Undefined 't' used for TYPE_CHECKING import guard (link)
Here's what I looked at during the review
- 🔴 General issues: 1 blocking issue, 5 other issues
- 🔴 Security: 1 blocking issue
- 🟢 Testing: all looks good
- 🟢 Documentation: all looks good
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
# Signal handling | ||
signal.signal(signal.SIGINT, self._handle_termination) | ||
signal.signal(signal.SIGTERM, self._handle_termination) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (bug_risk): Unconditionally registering signal handlers can override other handlers and doesn't guard against non-main threads
Ensure signal handlers are registered only in the main thread and avoid overwriting existing handlers, possibly by checking current handlers or using a class-level setup flag.
# Signal handling | |
signal.signal(signal.SIGINT, self._handle_termination) | |
signal.signal(signal.SIGTERM, self._handle_termination) | |
# Signal handling | |
import signal | |
import threading | |
# Class-level flag to ensure handlers are only set up once | |
if not hasattr(self.__class__, "_signal_handlers_registered"): | |
self.__class__._signal_handlers_registered = False | |
if ( | |
threading.current_thread() is threading.main_thread() | |
and not self.__class__._signal_handlers_registered | |
): | |
for sig in (signal.SIGINT, signal.SIGTERM): | |
current_handler = signal.getsignal(sig) | |
if current_handler is signal.SIG_DFL or current_handler is None: | |
signal.signal(sig, self._handle_termination) | |
self.__class__._signal_handlers_registered = True |
self.logger.info("Gracefully shutting down...") | ||
errmsg = "Received termination signal" | ||
raise click.Abort(errmsg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (bug_risk): Raising exceptions directly in a signal handler may not work reliably
Consider using sys.exit() or triggering click's abort outside the signal handler for a more reliable shutdown.
self.logger.info("Gracefully shutting down...") | |
errmsg = "Received termination signal" | |
raise click.Abort(errmsg) | |
self.logger.info("Gracefully shutting down...") | |
errmsg = "Received termination signal" | |
import sys | |
sys.exit(errmsg) |
@@ -35,6 +35,7 @@ | |||
|
|||
if t.TYPE_CHECKING: | |||
from pathlib import PurePath | |||
from types import FrameType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (bug_risk): FrameType is used in the handler signature but only imported under TYPE_CHECKING
Make sure FrameType is imported at runtime or use a string annotation to prevent NameError.
@@ -17,6 +17,9 @@ | |||
from singer_sdk import typing as th | |||
from singer_sdk.streams.graphql import GraphQLStream | |||
|
|||
if t.TYPE_CHECKING: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (bug_risk): Undefined 't' used for TYPE_CHECKING import guard
Please import 'typing as t' or use 'from typing import TYPE_CHECKING' to fix the reference.
@@ -83,6 +86,18 @@ class CountriesStream(CountriesAPIStream): | |||
), | |||
).to_dict() | |||
|
|||
# FIXME: revert these changes before merging |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue: Debugging code left in sample tap stream
Remove or conditionally gate this simulated slow stream code before merging.
th.Property( | ||
"target_folder", th.StringType, required=True, title="Target Folder" | ||
"target_folder", | ||
th.StringType, | ||
default="output", | ||
title="Target Folder", | ||
), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: Default value added but required flag was removed implicitly
Explicitly set required=False or document that the default value will be used if not provided.
th.Property( | |
"target_folder", th.StringType, required=True, title="Target Folder" | |
"target_folder", | |
th.StringType, | |
default="output", | |
title="Target Folder", | |
), | |
th.Property( | |
"target_folder", | |
th.StringType, | |
default="output", | |
required=False, # Default value will be used if not provided | |
title="Target Folder", | |
), |
|
||
yield next(records) # Emit the first record | ||
|
||
time.sleep(60) # Simulate a slow stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
security (arbitrary-sleep): time.sleep() call; did you mean to leave this in?
Source: opengrep
Closes #129
📚 Documentation preview 📚: https://meltano-sdk--2620.org.readthedocs.build/en/2620/
Summary by Sourcery
Handle interruption and termination signals across plugins, taps, and targets to enable graceful shutdown by aborting, draining sinks, and flushing state, and refine sample plugins with default settings and test simulation.
New Features:
Enhancements:
Chores: