Skip to content

Commit

Permalink
Implement job state recovery (#62)
Browse files Browse the repository at this point in the history
* add state pickler

* Basic job state recovery tests.

* More reliable state tests.

* Fix waiting test.

* Add crashed-pruning test

* Add a 100 ms delay and check false

* This is getting silly.
  • Loading branch information
PeterKraus authored Feb 3, 2024
1 parent 32f2c33 commit c5796a8
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 9 deletions.
14 changes: 13 additions & 1 deletion src/tomato/daemon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@
.. codeauthor::
Peter Kraus
"""

import logging
import argparse
from threading import Thread
from pathlib import Path
import toml

import time
import zmq

from tomato.models import Reply, Daemon
import tomato.daemon.cmd as cmd
import tomato.daemon.job as job
import tomato.daemon.io as io

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -45,6 +47,10 @@ def run_daemon():
setup_logging(daemon)
logger.info(f"logging set up with verbosity {daemon.verbosity}")

logger.debug("attempting to restore daemon state")
io.load(daemon)
logger.debug(f"{daemon=}")

context = zmq.Context()
rep = context.socket(zmq.REP)
logger.debug(f"binding zmq.REP socket on port {daemon.port}")
Expand All @@ -54,6 +60,7 @@ def run_daemon():

logger.debug("entering main loop")
jmgr = None
t0 = time.process_time()
while True:
socks = dict(poller.poll(100))
if rep in socks:
Expand Down Expand Up @@ -85,4 +92,9 @@ def run_daemon():
logger.debug(f"reply with {ret=}")
rep.send_pyobj(ret)
if daemon.status == "stop":
io.store(daemon)
break
tN = time.process_time()
if tN - t0 > 10:
io.store(daemon)
t0 = tN
2 changes: 1 addition & 1 deletion src/tomato/daemon/cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def merge_pipelines(
if pip.jobid is not None:
ret[pname] = pip
else:
if pip == new[pname]:
if pip.devs == new[pname].devs:
ret[pname] = pip
elif pip.jobid is None:
ret[pname] = new[pname]
Expand Down
26 changes: 26 additions & 0 deletions src/tomato/daemon/io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import pickle
import logging
from pathlib import Path
from tomato.models import Daemon

logger = logging.getLogger(__name__)


def store(daemon: Daemon):
outfile = Path(daemon.settings["datadir"]) / f"tomato_state_{daemon.port}.pkl"
logger.debug(f"storing daemon state to {outfile}")
with outfile.open("wb") as out:
pickle.dump(daemon, out)


def load(daemon: Daemon):
infile = Path(daemon.settings["datadir"]) / f"tomato_state_{daemon.port}.pkl"
if infile.exists() is False:
logger.debug(f"daemon state file {infile} does not exist")
return
with infile.open("rb") as inp:
loaded = pickle.load(inp)
daemon.jobs = loaded.jobs
daemon.pips = loaded.pips
daemon.devs = loaded.devs
daemon.nextjob = loaded.nextjob
4 changes: 3 additions & 1 deletion src/tomato/daemon/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import subprocess
import logging
import json
import time
from datetime import datetime, timezone
from pathlib import Path
from threading import currentThread
Expand Down Expand Up @@ -176,7 +177,7 @@ def manager(port: int, context: zmq.Context):
poller.register(req, zmq.POLLIN)
timeout = 1000
while getattr(thread, "do_run"):
req.send_pyobj(dict(cmd="status", with_data=True))
req.send_pyobj(dict(cmd="status", with_data=True, sender=f"{__name__}.manager"))
events = dict(poller.poll(timeout))
if req not in events:
logger.warning(f"could not contact daemon in {timeout} ms")
Expand All @@ -186,4 +187,5 @@ def manager(port: int, context: zmq.Context):
manage_running_pips(daemon, req)
matched_pips = check_queued_jobs(daemon, req)
action_queued_jobs(daemon, matched_pips, req)
time.sleep(0.5)
logger.info("instructed to quit")
2 changes: 0 additions & 2 deletions src/tomato/drivers/driver_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,6 @@ def driver_worker(
log.debug(f"started 'log_listener' on pid {listener.pid}")

jobs = []
print(f"{devices=}")
print(f"{pipeline['devs']=}")
for cname, comp in pipeline["devs"].items():
dev = devices[cname]
log.info(f"device id: {cname}")
Expand Down
1 change: 1 addition & 0 deletions src/tomato/ketchup/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- :func:`search` to find a ``jobid`` of a *job* from ``jobname``
"""

import json
import logging
from pathlib import Path
Expand Down
6 changes: 4 additions & 2 deletions src/tomato/tomato/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
- :func:`pipeline_ready` to mark a pipeline as ready
"""

import os
import subprocess
import textwrap
Expand Down Expand Up @@ -64,7 +65,6 @@ def load_device_file(yamlpath: Path) -> dict:
def get_pipelines(devs: dict[str, Device], pipelines: list) -> dict[str, Pipeline]:
pips = {}
for pip in pipelines:
print(f"{pip=}")
if "*" in pip["name"]:
data = {"name": pip["name"], "devs": {}}
if len(pip["devices"]) > 1:
Expand Down Expand Up @@ -106,7 +106,7 @@ def status(
logger.debug(f"checking status of tomato on port {port}")
req = context.socket(zmq.REQ)
req.connect(f"tcp://127.0.0.1:{port}")
req.send_pyobj(dict(cmd="status", with_data=with_data))
req.send_pyobj(dict(cmd="status", with_data=with_data, sender=f"{__name__}.status"))
poller = zmq.Poller()
poller.register(req, zmq.POLLIN)
events = dict(poller.poll(timeout))
Expand Down Expand Up @@ -215,6 +215,8 @@ def init(
f"""\
# Default settings for tomato-{VERSION}
# Generated on {str(datetime.now(timezone.utc))}
datadir = '{datadir.resolve()}'
[jobs]
storage = '{datadir.resolve() / 'Jobs'}'
Expand Down
103 changes: 103 additions & 0 deletions tests/test_03_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import os
from pathlib import Path
import zmq
import time
import psutil

from tomato import ketchup, tomato
from .utils import wait_until_tomato_running, wait_until_ketchup_status

PORT = 12345
CTXT = zmq.Context()
WAIT = 5000

kwargs = dict(port=PORT, timeout=1000, context=CTXT)


def test_recover_queued_jobs(datadir, start_tomato_daemon, stop_tomato_daemon):
assert wait_until_tomato_running(port=PORT, timeout=WAIT)
os.chdir(datadir)
ketchup.submit(payload="dummy_random_5_2.yml", jobname="job-1", **kwargs)
ketchup.submit(payload="dummy_random_5_2.yml", jobname="job-2", **kwargs)
tomato.stop(**kwargs)
assert not wait_until_tomato_running(port=PORT, timeout=100)
assert os.path.exists("tomato_state_12345.pkl")

tomato.start(**kwargs, appdir=Path(), logdir=Path(), verbosity=0)
assert wait_until_tomato_running(port=PORT, timeout=WAIT)
ret = tomato.status(**kwargs, with_data=True)
print(f"{ret=}")
assert ret.success
assert len(ret.data.jobs) == 2
assert ret.data.nextjob == 3


def test_recover_running_jobs(datadir, start_tomato_daemon, stop_tomato_daemon):
assert wait_until_tomato_running(port=PORT, timeout=WAIT)
os.chdir(datadir)
ketchup.submit(payload="dummy_random_5_2.yml", jobname="job-1", **kwargs)
tomato.pipeline_load(**kwargs, pipeline="dummy-5", sampleid="dummy_random_5_2")
tomato.pipeline_ready(**kwargs, pipeline="dummy-5")
wait_until_ketchup_status(jobid=1, status="r", port=PORT, timeout=WAIT)
tomato.stop(**kwargs)
assert not wait_until_tomato_running(port=PORT, timeout=100)
assert os.path.exists("tomato_state_12345.pkl")

tomato.start(**kwargs, appdir=Path(), logdir=Path(), verbosity=0)
assert wait_until_tomato_running(port=PORT, timeout=WAIT)
ret = tomato.status(**kwargs, with_data=True)
print(f"{ret=}")
assert ret.success
assert len(ret.data.jobs) == 1
assert ret.data.nextjob == 2
assert ret.data.jobs[1].status == "r"


def test_recover_waiting_jobs(datadir, start_tomato_daemon, stop_tomato_daemon):
assert wait_until_tomato_running(port=PORT, timeout=WAIT)
os.chdir(datadir)
ketchup.submit(payload="dummy_random_5_2.yml", jobname="job-1", **kwargs)
tomato.pipeline_load(**kwargs, pipeline="dummy-5", sampleid="dummy_random_5_2")
tomato.pipeline_ready(**kwargs, pipeline="dummy-5")
wait_until_ketchup_status(jobid=1, status="r", port=PORT, timeout=WAIT)
tomato.stop(**kwargs)
assert not wait_until_tomato_running(port=PORT, timeout=100)
assert os.path.exists("tomato_state_12345.pkl")

time.sleep(10)

tomato.start(**kwargs, appdir=Path(), logdir=Path(), verbosity=0)
assert wait_until_tomato_running(port=PORT, timeout=WAIT)
ret = tomato.status(**kwargs, with_data=True)
print(f"{ret=}")
assert ret.success
assert len(ret.data.jobs) == 1
assert ret.data.nextjob == 2
assert ret.data.jobs[1].status == "c"


def test_prune_crashed_jobs(datadir, start_tomato_daemon, stop_tomato_daemon):
assert wait_until_tomato_running(port=PORT, timeout=WAIT)
os.chdir(datadir)
ketchup.submit(payload="dummy_random_30_1.yml", jobname="job-1", **kwargs)
tomato.pipeline_load(**kwargs, pipeline="dummy-5", sampleid="dummy_random_30_1")
tomato.pipeline_ready(**kwargs, pipeline="dummy-5")
wait_until_ketchup_status(jobid=1, status="r", port=PORT, timeout=WAIT)
ret = tomato.status(**kwargs, with_data=True)
print(f"{ret=}")
tomato.stop(**kwargs)
assert not wait_until_tomato_running(port=PORT, timeout=100)
assert os.path.exists("tomato_state_12345.pkl")

proc = psutil.Process(pid=ret.data.jobs[1].pid)
proc.terminate()
time.sleep(5)

tomato.start(**kwargs, appdir=Path(), logdir=Path(), verbosity=0)
assert wait_until_tomato_running(port=PORT, timeout=WAIT)
ret = tomato.status(**kwargs, with_data=True)
print(f"{ret=}")
assert ret.success
assert len(ret.data.jobs) == 1
assert ret.data.nextjob == 2
assert ret.data.jobs[1].status == "ce"
2 changes: 0 additions & 2 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ def wait_until_tomato_running(port: int, timeout: int):
data = yaml.safe_load(ret.stdout)
if data["success"]:
return True
print(f"{data=}")
time.sleep(timeout / 20000)
return False

Expand All @@ -56,6 +55,5 @@ def wait_until_ketchup_status(jobid: int, status: str, port: int, timeout: int):
data = yaml.safe_load(ret.stdout)["data"]
if data[jobid]["status"] == status:
return True
print(f"{data=}")
time.sleep(timeout / 20000)
return False

0 comments on commit c5796a8

Please sign in to comment.