From 0ec054105b51b8ca39d0b534e677b967a6f3f273 Mon Sep 17 00:00:00 2001 From: Darin Yu Date: Wed, 14 Feb 2024 00:16:54 +0000 Subject: [PATCH] handle sigterm gracefully by adding status to metadata --- metaflow/task.py | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/metaflow/task.py b/metaflow/task.py index a28fd25a24d..17746caed9b 100644 --- a/metaflow/task.py +++ b/metaflow/task.py @@ -4,6 +4,7 @@ import sys import os import time +import signal from types import MethodType, FunctionType @@ -39,6 +40,42 @@ MAX_FOREACH_PATH_LENGTH = 256 +class SystemSignalHandler: + def __init__(self, run_id, step_name, task_id, metadata_service, metadata_tags): + self.run_id = run_id + self.step_name = (step_name,) + self.task_id = task_id + self.metadata_service = metadata_service + self.metadata_tags = metadata_tags + + signal.signal(signal.SIGINT, self.exit_sigint_gracefully) + signal.signal(signal.SIGTERM, self.exit_sigterm_gracefully) + print("SystemSignalHandler initialized...") + + def log_exit_signal(self, signal): + metadata_info = [ + MetaDatum( + field="signal", + value=signal, + type="signal", + tags=self.metadata_tags, + ) + ] + self.metadata_service.register_metadata( + self.run_id, self.step_name, self.task_id, metadata_info + ) + + def exit_sigint_gracefully(self, signum, frame): + print("SIGINT received... ") + self.log_exit_signal("SIGINT") + self.status = signal.SIGINT + + def exit_sigterm_gracefully(self, signum, frame): + print("SIGTERM received... ") + self.log_exit_signal("SIGTERM") + self.status = signal.SIGTERM + + class MetaflowTask(object): """ MetaflowTask prepares a Flow instance for execution of a single step. @@ -412,6 +449,7 @@ def run_step( ) metadata_tags = ["attempt_id:{0}".format(retry_count)] + SystemSignalHandler(run_id, step_name, task_id, self.metadata, metadata_tags) metadata = [ MetaDatum(