From c5796a8d5f7694a2ea71fddb2321d66363994b9f Mon Sep 17 00:00:00 2001 From: Peter Kraus Date: Sat, 3 Feb 2024 19:20:43 +0000 Subject: [PATCH] Implement job state recovery (#62) * add state pickler * Basic job state recovery tests. * More reliable state tests. * Fix waiting test. * Add crashed-pruning test * Add a 100 ms delay and check false * This is getting silly. --- src/tomato/daemon/__init__.py | 14 +++- src/tomato/daemon/cmd.py | 2 +- src/tomato/daemon/io.py | 26 ++++++++ src/tomato/daemon/job.py | 4 +- src/tomato/drivers/driver_funcs.py | 2 - src/tomato/ketchup/__init__.py | 1 + src/tomato/tomato/__init__.py | 6 +- tests/test_03_state.py | 103 +++++++++++++++++++++++++++++ tests/utils.py | 2 - 9 files changed, 151 insertions(+), 9 deletions(-) create mode 100644 src/tomato/daemon/io.py create mode 100644 tests/test_03_state.py diff --git a/src/tomato/daemon/__init__.py b/src/tomato/daemon/__init__.py index aedc615c..5e891294 100644 --- a/src/tomato/daemon/__init__.py +++ b/src/tomato/daemon/__init__.py @@ -4,17 +4,19 @@ .. codeauthor:: Peter Kraus """ + import logging import argparse from threading import Thread from pathlib import Path import toml - +import time import zmq from tomato.models import Reply, Daemon import tomato.daemon.cmd as cmd import tomato.daemon.job as job +import tomato.daemon.io as io logger = logging.getLogger(__name__) @@ -45,6 +47,10 @@ def run_daemon(): setup_logging(daemon) logger.info(f"logging set up with verbosity {daemon.verbosity}") + logger.debug("attempting to restore daemon state") + io.load(daemon) + logger.debug(f"{daemon=}") + context = zmq.Context() rep = context.socket(zmq.REP) logger.debug(f"binding zmq.REP socket on port {daemon.port}") @@ -54,6 +60,7 @@ def run_daemon(): logger.debug("entering main loop") jmgr = None + t0 = time.process_time() while True: socks = dict(poller.poll(100)) if rep in socks: @@ -85,4 +92,9 @@ def run_daemon(): logger.debug(f"reply with {ret=}") rep.send_pyobj(ret) if daemon.status == "stop": + io.store(daemon) break + tN = time.process_time() + if tN - t0 > 10: + io.store(daemon) + t0 = tN diff --git a/src/tomato/daemon/cmd.py b/src/tomato/daemon/cmd.py index a785a413..e21b92a1 100644 --- a/src/tomato/daemon/cmd.py +++ b/src/tomato/daemon/cmd.py @@ -15,7 +15,7 @@ def merge_pipelines( if pip.jobid is not None: ret[pname] = pip else: - if pip == new[pname]: + if pip.devs == new[pname].devs: ret[pname] = pip elif pip.jobid is None: ret[pname] = new[pname] diff --git a/src/tomato/daemon/io.py b/src/tomato/daemon/io.py new file mode 100644 index 00000000..00101a8c --- /dev/null +++ b/src/tomato/daemon/io.py @@ -0,0 +1,26 @@ +import pickle +import logging +from pathlib import Path +from tomato.models import Daemon + +logger = logging.getLogger(__name__) + + +def store(daemon: Daemon): + outfile = Path(daemon.settings["datadir"]) / f"tomato_state_{daemon.port}.pkl" + logger.debug(f"storing daemon state to {outfile}") + with outfile.open("wb") as out: + pickle.dump(daemon, out) + + +def load(daemon: Daemon): + infile = Path(daemon.settings["datadir"]) / f"tomato_state_{daemon.port}.pkl" + if infile.exists() is False: + logger.debug(f"daemon state file {infile} does not exist") + return + with infile.open("rb") as inp: + loaded = pickle.load(inp) + daemon.jobs = loaded.jobs + daemon.pips = loaded.pips + daemon.devs = loaded.devs + daemon.nextjob = loaded.nextjob diff --git a/src/tomato/daemon/job.py b/src/tomato/daemon/job.py index 1615d013..e61b74d0 100644 --- a/src/tomato/daemon/job.py +++ b/src/tomato/daemon/job.py @@ -2,6 +2,7 @@ import subprocess import logging import json +import time from datetime import datetime, timezone from pathlib import Path from threading import currentThread @@ -176,7 +177,7 @@ def manager(port: int, context: zmq.Context): poller.register(req, zmq.POLLIN) timeout = 1000 while getattr(thread, "do_run"): - req.send_pyobj(dict(cmd="status", with_data=True)) + req.send_pyobj(dict(cmd="status", with_data=True, sender=f"{__name__}.manager")) events = dict(poller.poll(timeout)) if req not in events: logger.warning(f"could not contact daemon in {timeout} ms") @@ -186,4 +187,5 @@ def manager(port: int, context: zmq.Context): manage_running_pips(daemon, req) matched_pips = check_queued_jobs(daemon, req) action_queued_jobs(daemon, matched_pips, req) + time.sleep(0.5) logger.info("instructed to quit") diff --git a/src/tomato/drivers/driver_funcs.py b/src/tomato/drivers/driver_funcs.py index 051cbf3b..a5c537e4 100644 --- a/src/tomato/drivers/driver_funcs.py +++ b/src/tomato/drivers/driver_funcs.py @@ -233,8 +233,6 @@ def driver_worker( log.debug(f"started 'log_listener' on pid {listener.pid}") jobs = [] - print(f"{devices=}") - print(f"{pipeline['devs']=}") for cname, comp in pipeline["devs"].items(): dev = devices[cname] log.info(f"device id: {cname}") diff --git a/src/tomato/ketchup/__init__.py b/src/tomato/ketchup/__init__.py index 8737c1be..a24c22c0 100644 --- a/src/tomato/ketchup/__init__.py +++ b/src/tomato/ketchup/__init__.py @@ -14,6 +14,7 @@ - :func:`search` to find a ``jobid`` of a *job* from ``jobname`` """ + import json import logging from pathlib import Path diff --git a/src/tomato/tomato/__init__.py b/src/tomato/tomato/__init__.py index a9a5c8b1..fdd98eb8 100644 --- a/src/tomato/tomato/__init__.py +++ b/src/tomato/tomato/__init__.py @@ -19,6 +19,7 @@ - :func:`pipeline_ready` to mark a pipeline as ready """ + import os import subprocess import textwrap @@ -64,7 +65,6 @@ def load_device_file(yamlpath: Path) -> dict: def get_pipelines(devs: dict[str, Device], pipelines: list) -> dict[str, Pipeline]: pips = {} for pip in pipelines: - print(f"{pip=}") if "*" in pip["name"]: data = {"name": pip["name"], "devs": {}} if len(pip["devices"]) > 1: @@ -106,7 +106,7 @@ def status( logger.debug(f"checking status of tomato on port {port}") req = context.socket(zmq.REQ) req.connect(f"tcp://127.0.0.1:{port}") - req.send_pyobj(dict(cmd="status", with_data=with_data)) + req.send_pyobj(dict(cmd="status", with_data=with_data, sender=f"{__name__}.status")) poller = zmq.Poller() poller.register(req, zmq.POLLIN) events = dict(poller.poll(timeout)) @@ -215,6 +215,8 @@ def init( f"""\ # Default settings for tomato-{VERSION} # Generated on {str(datetime.now(timezone.utc))} + datadir = '{datadir.resolve()}' + [jobs] storage = '{datadir.resolve() / 'Jobs'}' diff --git a/tests/test_03_state.py b/tests/test_03_state.py new file mode 100644 index 00000000..ade8c298 --- /dev/null +++ b/tests/test_03_state.py @@ -0,0 +1,103 @@ +import os +from pathlib import Path +import zmq +import time +import psutil + +from tomato import ketchup, tomato +from .utils import wait_until_tomato_running, wait_until_ketchup_status + +PORT = 12345 +CTXT = zmq.Context() +WAIT = 5000 + +kwargs = dict(port=PORT, timeout=1000, context=CTXT) + + +def test_recover_queued_jobs(datadir, start_tomato_daemon, stop_tomato_daemon): + assert wait_until_tomato_running(port=PORT, timeout=WAIT) + os.chdir(datadir) + ketchup.submit(payload="dummy_random_5_2.yml", jobname="job-1", **kwargs) + ketchup.submit(payload="dummy_random_5_2.yml", jobname="job-2", **kwargs) + tomato.stop(**kwargs) + assert not wait_until_tomato_running(port=PORT, timeout=100) + assert os.path.exists("tomato_state_12345.pkl") + + tomato.start(**kwargs, appdir=Path(), logdir=Path(), verbosity=0) + assert wait_until_tomato_running(port=PORT, timeout=WAIT) + ret = tomato.status(**kwargs, with_data=True) + print(f"{ret=}") + assert ret.success + assert len(ret.data.jobs) == 2 + assert ret.data.nextjob == 3 + + +def test_recover_running_jobs(datadir, start_tomato_daemon, stop_tomato_daemon): + assert wait_until_tomato_running(port=PORT, timeout=WAIT) + os.chdir(datadir) + ketchup.submit(payload="dummy_random_5_2.yml", jobname="job-1", **kwargs) + tomato.pipeline_load(**kwargs, pipeline="dummy-5", sampleid="dummy_random_5_2") + tomato.pipeline_ready(**kwargs, pipeline="dummy-5") + wait_until_ketchup_status(jobid=1, status="r", port=PORT, timeout=WAIT) + tomato.stop(**kwargs) + assert not wait_until_tomato_running(port=PORT, timeout=100) + assert os.path.exists("tomato_state_12345.pkl") + + tomato.start(**kwargs, appdir=Path(), logdir=Path(), verbosity=0) + assert wait_until_tomato_running(port=PORT, timeout=WAIT) + ret = tomato.status(**kwargs, with_data=True) + print(f"{ret=}") + assert ret.success + assert len(ret.data.jobs) == 1 + assert ret.data.nextjob == 2 + assert ret.data.jobs[1].status == "r" + + +def test_recover_waiting_jobs(datadir, start_tomato_daemon, stop_tomato_daemon): + assert wait_until_tomato_running(port=PORT, timeout=WAIT) + os.chdir(datadir) + ketchup.submit(payload="dummy_random_5_2.yml", jobname="job-1", **kwargs) + tomato.pipeline_load(**kwargs, pipeline="dummy-5", sampleid="dummy_random_5_2") + tomato.pipeline_ready(**kwargs, pipeline="dummy-5") + wait_until_ketchup_status(jobid=1, status="r", port=PORT, timeout=WAIT) + tomato.stop(**kwargs) + assert not wait_until_tomato_running(port=PORT, timeout=100) + assert os.path.exists("tomato_state_12345.pkl") + + time.sleep(10) + + tomato.start(**kwargs, appdir=Path(), logdir=Path(), verbosity=0) + assert wait_until_tomato_running(port=PORT, timeout=WAIT) + ret = tomato.status(**kwargs, with_data=True) + print(f"{ret=}") + assert ret.success + assert len(ret.data.jobs) == 1 + assert ret.data.nextjob == 2 + assert ret.data.jobs[1].status == "c" + + +def test_prune_crashed_jobs(datadir, start_tomato_daemon, stop_tomato_daemon): + assert wait_until_tomato_running(port=PORT, timeout=WAIT) + os.chdir(datadir) + ketchup.submit(payload="dummy_random_30_1.yml", jobname="job-1", **kwargs) + tomato.pipeline_load(**kwargs, pipeline="dummy-5", sampleid="dummy_random_30_1") + tomato.pipeline_ready(**kwargs, pipeline="dummy-5") + wait_until_ketchup_status(jobid=1, status="r", port=PORT, timeout=WAIT) + ret = tomato.status(**kwargs, with_data=True) + print(f"{ret=}") + tomato.stop(**kwargs) + assert not wait_until_tomato_running(port=PORT, timeout=100) + assert os.path.exists("tomato_state_12345.pkl") + + proc = psutil.Process(pid=ret.data.jobs[1].pid) + proc.terminate() + time.sleep(5) + + tomato.start(**kwargs, appdir=Path(), logdir=Path(), verbosity=0) + assert wait_until_tomato_running(port=PORT, timeout=WAIT) + ret = tomato.status(**kwargs, with_data=True) + print(f"{ret=}") + assert ret.success + assert len(ret.data.jobs) == 1 + assert ret.data.nextjob == 2 + assert ret.data.jobs[1].status == "ce" diff --git a/tests/utils.py b/tests/utils.py index ac40e16d..997880cc 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -40,7 +40,6 @@ def wait_until_tomato_running(port: int, timeout: int): data = yaml.safe_load(ret.stdout) if data["success"]: return True - print(f"{data=}") time.sleep(timeout / 20000) return False @@ -56,6 +55,5 @@ def wait_until_ketchup_status(jobid: int, status: str, port: int, timeout: int): data = yaml.safe_load(ret.stdout)["data"] if data[jobid]["status"] == status: return True - print(f"{data=}") time.sleep(timeout / 20000) return False