diff --git a/metaflow/task.py b/metaflow/task.py index a28fd25a24d..17746caed9b 100644 --- a/metaflow/task.py +++ b/metaflow/task.py @@ -4,6 +4,7 @@ import sys import os import time +import signal from types import MethodType, FunctionType @@ -39,6 +40,42 @@ MAX_FOREACH_PATH_LENGTH = 256 +class SystemSignalHandler: + def __init__(self, run_id, step_name, task_id, metadata_service, metadata_tags): + self.run_id = run_id + self.step_name = (step_name,) + self.task_id = task_id + self.metadata_service = metadata_service + self.metadata_tags = metadata_tags + + signal.signal(signal.SIGINT, self.exit_sigint_gracefully) + signal.signal(signal.SIGTERM, self.exit_sigterm_gracefully) + print("SystemSignalHandler initialized...") + + def log_exit_signal(self, signal): + metadata_info = [ + MetaDatum( + field="signal", + value=signal, + type="signal", + tags=self.metadata_tags, + ) + ] + self.metadata_service.register_metadata( + self.run_id, self.step_name, self.task_id, metadata_info + ) + + def exit_sigint_gracefully(self, signum, frame): + print("SIGINT received... ") + self.log_exit_signal("SIGINT") + self.status = signal.SIGINT + + def exit_sigterm_gracefully(self, signum, frame): + print("SIGTERM received... ") + self.log_exit_signal("SIGTERM") + self.status = signal.SIGTERM + + class MetaflowTask(object): """ MetaflowTask prepares a Flow instance for execution of a single step. @@ -412,6 +449,7 @@ def run_step( ) metadata_tags = ["attempt_id:{0}".format(retry_count)] + SystemSignalHandler(run_id, step_name, task_id, self.metadata, metadata_tags) metadata = [ MetaDatum(