Skip to content

feat: Interruption and termination signals are handled in taps and targets #2620

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

edgarrmondragon
Copy link
Collaborator

@edgarrmondragon edgarrmondragon commented Aug 22, 2024

Closes #129


📚 Documentation preview 📚: https://meltano-sdk--2620.org.readthedocs.build/en/2620/

Summary by Sourcery

Handle interruption and termination signals across plugins, taps, and targets to enable graceful shutdown by aborting, draining sinks, and flushing state, and refine sample plugins with default settings and test simulation.

New Features:

  • Register SIGINT and SIGTERM handlers in PluginBase to initiate graceful shutdown.

Enhancements:

  • Extend TapBase to emit a final state message on termination.
  • Extend TargetBase to drain all sinks before aborting on termination.
  • Set a default value for the target_folder configuration in the sample CSV target.

Chores:

  • Add simulated slow-stream behavior with a time delay in the sample CountriesStream (marked FIXME).

Copy link

codspeed-hq bot commented Aug 22, 2024

CodSpeed Performance Report

Merging #2620 will not alter performance

Comparing edgarrmondragon/feat/signals (6344071) with main (8adf452)

Summary

✅ 8 untouched benchmarks

Copy link

codecov bot commented Aug 22, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 91.65%. Comparing base (8adf452) to head (6344071).

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2620      +/-   ##
==========================================
- Coverage   91.68%   91.65%   -0.04%     
==========================================
  Files          62       62              
  Lines        5315     5318       +3     
  Branches      686      686              
==========================================
+ Hits         4873     4874       +1     
- Misses        310      312       +2     
  Partials      132      132              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@edgarrmondragon edgarrmondragon force-pushed the edgarrmondragon/feat/signals branch 2 times, most recently from 57ed344 to b795525 Compare August 22, 2024 02:45
@edgarrmondragon edgarrmondragon changed the title feat: Interruption and termination signals in taps and targets feat: Interruption and termination signals are handled in taps and targets Aug 22, 2024
@edgarrmondragon edgarrmondragon self-assigned this Aug 22, 2024
@edgarrmondragon edgarrmondragon added kind/Feature New feature or request Release Highlight Call this out in the release notes labels Aug 22, 2024
@edgarrmondragon edgarrmondragon force-pushed the edgarrmondragon/feat/signals branch from ce5a41e to f87387e Compare August 28, 2024 16:13
@edgarrmondragon edgarrmondragon force-pushed the edgarrmondragon/feat/signals branch 2 times, most recently from af2c00e to aa5831a Compare November 15, 2024 23:27
@edgarrmondragon edgarrmondragon marked this pull request as ready for review November 15, 2024 23:31
@edgarrmondragon edgarrmondragon force-pushed the edgarrmondragon/feat/signals branch from aa5831a to cc7a06b Compare May 22, 2025 16:25
@edgarrmondragon edgarrmondragon requested a review from Copilot May 22, 2025 16:26
@edgarrmondragon edgarrmondragon added this to the v0.47 milestone May 22, 2025
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements enhanced handling of interruption and termination signals across taps and targets to ensure graceful shutdown behavior. Key changes include:

  • Adding a _handle_termination method in target_base and tap_base to drain sinks and write state before termination.
  • Registering signal handlers in plugin_base for SIGINT and SIGTERM with a graceful abort.
  • Updating sample configurations and adding temporary simulation logic in countries_streams.

Reviewed Changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
singer_sdk/target_base.py Adds _handle_termination to drain sinks and delegate termination.
singer_sdk/tap_base.py Implements _handle_termination to write state messages upon exit.
singer_sdk/plugin_base.py Registers termination signal handlers and aborts on termination.
samples/sample_target_csv/csv_target.py Adds a default value for target_folder in the CSV target config.
samples/sample_tap_countries/countries_streams.py Introduces temporary slowdown simulation with a FIXME comment.
Comments suppressed due to low confidence (2)

singer_sdk/plugin_base.py:468

  • [nitpick] Consider including the actual signal number in the error message to provide more context for debugging purposes.
errmsg = "Received termination signal"

samples/sample_tap_countries/countries_streams.py:89

  • Please remove the temporary slowdown simulation in the request_records method before merging as indicated by the FIXME comment.
# FIXME: revert these changes before merging

signum: Signal number.
frame: Frame.
"""
self.write_message(StateMessage(value=self.state))
Copy link
Preview

Copilot AI May 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Consider adding a comment to clarify why termination handling in taps involves writing a state message before calling the super implementation.

Copilot uses AI. Check for mistakes.

@edgarrmondragon edgarrmondragon force-pushed the edgarrmondragon/feat/signals branch from cc7a06b to 6344071 Compare May 22, 2025 16:37
@edgarrmondragon
Copy link
Collaborator Author

@sourcery-ai review

Copy link
Contributor

sourcery-ai bot commented May 22, 2025

Reviewer's Guide

This PR centralizes termination signal handling by registering SIGINT/SIGTERM in PluginBase and extending taps and targets to perform graceful shutdown tasks (emitting final state, draining sinks, aborting via click), and updates sample connectors to simulate a slow stream and set a default output folder.

Sequence Diagram: Tap Termination Signal Handling

sequenceDiagram
    actor User as OS/User
    participant TapInstance as Tap
    participant PB as PluginBase

    User->>TapInstance: Sends SIGINT/SIGTERM
    activate TapInstance
    TapInstance->>TapInstance: _handle_termination(signum, frame)
    TapInstance->>TapInstance: Writes final state message
    TapInstance->>PB: super()._handle_termination(signum, frame)
    activate PB
    PB->>PB: Logs "Gracefully shutting down..."
    PB->>PB: Raises click.Abort
    deactivate PB
    deactivate TapInstance
Loading

Sequence Diagram: Target Termination Signal Handling

sequenceDiagram
    actor User as OS/User
    participant TargetInstance as Target
    participant PB as PluginBase

    User->>TargetInstance: Sends SIGINT/SIGTERM
    activate TargetInstance
    TargetInstance->>TargetInstance: _handle_termination(signum, frame)
    TargetInstance->>TargetInstance: Logs "Draining sinks..."
    TargetInstance->>TargetInstance: Drains all sinks
    TargetInstance->>PB: super()._handle_termination(signum, frame)
    activate PB
    PB->>PB: Logs "Gracefully shutting down..."
    PB->>PB: Raises click.Abort
    deactivate PB
    deactivate TargetInstance
Loading

Updated Class Diagram for Termination Signal Handling

classDiagram
    class PluginBase {
        +__init__() void
        #_handle_termination(signum: int, frame: object) void
    }

    class TapBase {
        #_handle_termination(signum: int, frame: object) void
    }

    class TargetBase {
        #_handle_termination(signum: int, frame: object) void
    }

    PluginBase <|-- TapBase
    PluginBase <|-- TargetBase
Loading

File-Level Changes

Change Details Files
Signal handling added to PluginBase to catch termination signals
  • Register SIGINT and SIGTERM handlers in init
  • Implement _handle_termination to log shutdown and raise click.Abort
singer_sdk/plugin_base.py
Targets override termination handler to drain sinks before aborting
  • Implement _handle_termination to log signal receipt and invoke drain_all(is_endofpipe=True)
  • Call super()._handle_termination after draining sinks
singer_sdk/target_base.py
Taps override termination handler to emit final state before aborting
  • Implement _handle_termination to write a final StateMessage
  • Call super()._handle_termination after emitting state
singer_sdk/tap_base.py
Sample connector code tweaked for demonstration and defaults
  • Override request_records in sample tap to yield first record, sleep, then yield the rest
  • Set default value for target_folder to "output" in sample CSV target config
samples/sample_tap_countries/countries_streams.py
samples/sample_target_csv/csv_target.py

Assessment against linked issues

Issue Objective Addressed Explanation
#129 Handle SIGTERM messages gracefully in SDK-based taps.
#129 For sorted incremental streams, flush the current stream and cancel syncing any other records upon receiving SIGTERM.
#129 Avoid starting any other streams once the running streams are handled, as specified above.

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @edgarrmondragon - I've reviewed your changes and found some issues that need to be addressed.

Blocking issues:

  • time.sleep() call; did you mean to leave this in? (link)
  • Undefined 't' used for TYPE_CHECKING import guard (link)
Here's what I looked at during the review
  • 🔴 General issues: 1 blocking issue, 5 other issues
  • 🔴 Security: 1 blocking issue
  • 🟢 Testing: all looks good
  • 🟢 Documentation: all looks good

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +221 to +223
# Signal handling
signal.signal(signal.SIGINT, self._handle_termination)
signal.signal(signal.SIGTERM, self._handle_termination)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Unconditionally registering signal handlers can override other handlers and doesn't guard against non-main threads

Ensure signal handlers are registered only in the main thread and avoid overwriting existing handlers, possibly by checking current handlers or using a class-level setup flag.

Suggested change
# Signal handling
signal.signal(signal.SIGINT, self._handle_termination)
signal.signal(signal.SIGTERM, self._handle_termination)
# Signal handling
import signal
import threading
# Class-level flag to ensure handlers are only set up once
if not hasattr(self.__class__, "_signal_handlers_registered"):
self.__class__._signal_handlers_registered = False
if (
threading.current_thread() is threading.main_thread()
and not self.__class__._signal_handlers_registered
):
for sig in (signal.SIGINT, signal.SIGTERM):
current_handler = signal.getsignal(sig)
if current_handler is signal.SIG_DFL or current_handler is None:
signal.signal(sig, self._handle_termination)
self.__class__._signal_handlers_registered = True

Comment on lines +467 to +469
self.logger.info("Gracefully shutting down...")
errmsg = "Received termination signal"
raise click.Abort(errmsg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Raising exceptions directly in a signal handler may not work reliably

Consider using sys.exit() or triggering click's abort outside the signal handler for a more reliable shutdown.

Suggested change
self.logger.info("Gracefully shutting down...")
errmsg = "Received termination signal"
raise click.Abort(errmsg)
self.logger.info("Gracefully shutting down...")
errmsg = "Received termination signal"
import sys
sys.exit(errmsg)

@@ -35,6 +35,7 @@

if t.TYPE_CHECKING:
from pathlib import PurePath
from types import FrameType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): FrameType is used in the handler signature but only imported under TYPE_CHECKING

Make sure FrameType is imported at runtime or use a string annotation to prevent NameError.

@@ -17,6 +17,9 @@
from singer_sdk import typing as th
from singer_sdk.streams.graphql import GraphQLStream

if t.TYPE_CHECKING:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Undefined 't' used for TYPE_CHECKING import guard

Please import 'typing as t' or use 'from typing import TYPE_CHECKING' to fix the reference.

@@ -83,6 +86,18 @@ class CountriesStream(CountriesAPIStream):
),
).to_dict()

# FIXME: revert these changes before merging
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: Debugging code left in sample tap stream

Remove or conditionally gate this simulated slow stream code before merging.

Comment on lines 15 to 20
th.Property(
"target_folder", th.StringType, required=True, title="Target Folder"
"target_folder",
th.StringType,
default="output",
title="Target Folder",
),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Default value added but required flag was removed implicitly

Explicitly set required=False or document that the default value will be used if not provided.

Suggested change
th.Property(
"target_folder", th.StringType, required=True, title="Target Folder"
"target_folder",
th.StringType,
default="output",
title="Target Folder",
),
th.Property(
"target_folder",
th.StringType,
default="output",
required=False, # Default value will be used if not provided
title="Target Folder",
),


yield next(records) # Emit the first record

time.sleep(60) # Simulate a slow stream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (arbitrary-sleep): time.sleep() call; did you mean to leave this in?

Source: opengrep

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/Feature New feature or request Release Highlight Call this out in the release notes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Handle SIGTERM messages to shut down gracefully
1 participant