From 77510f0664c76e30d1b4ba1f008c27ca378a6e73 Mon Sep 17 00:00:00 2001 From: Madhur Tandon Date: Mon, 4 Nov 2024 13:00:21 +0530 Subject: [PATCH] sigterm to workers --- metaflow/runtime.py | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/metaflow/runtime.py b/metaflow/runtime.py index d5fbc0b6837..2d6c556b778 100644 --- a/metaflow/runtime.py +++ b/metaflow/runtime.py @@ -519,12 +519,16 @@ def _killall(self): # If we are here, all children have received a signal and are shutting down. # We want to give them an opportunity to do so and then kill live_workers = set(self._workers.values()) - now = int(time.time()) self._logger( - "Terminating %d active tasks..." % len(live_workers), + "Attempting graceful shutdown of %d active tasks..." % len(live_workers), system_msg=True, bad=True, ) + + for each_worker in live_workers: + each_worker.shutdown() + + now = int(time.time()) while live_workers and int(time.time()) - now < 5: # While not all workers are dead and we have waited less than 5 seconds live_workers = [worker for worker in live_workers if not worker.clean()] @@ -1523,6 +1527,8 @@ def __init__(self, task, max_logs_size): } self._encoding = sys.stdout.encoding or "UTF-8" + self.terminated = False + # Terminated indicates that SIGTERM was sent to the task self.killed = False # Killed indicates that the task was forcibly killed # with SIGKILL by the master process. # A killed task is always considered cleaned @@ -1622,11 +1628,22 @@ def clean(self): return True if not self.cleaned: for fileobj, buf in self._logs.values(): - msg = b"[KILLED BY ORCHESTRATOR]\n" + if self.terminated: + msg = b"[TERMINATED BY ORCHESTRATOR]\n" + else: + msg = b"[KILLED BY ORCHESTRATOR]\n" self.emit_log(msg, buf, system_msg=True) self.cleaned = True return self._proc.poll() is not None + def shutdown(self): + if not self.terminated: + try: + self._proc.terminate() + except: + pass + self.terminated = True + def kill(self): if not self.killed: try: