Skip to content

Commit

Permalink
Basic job state recovery tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
PeterKraus committed Dec 10, 2023
1 parent a75ac33 commit 8f8030a
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 6 deletions.
2 changes: 1 addition & 1 deletion src/tomato/daemon/cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def merge_pipelines(
if pip.jobid is not None:
ret[pname] = pip
else:
if pip == new[pname]:
if pip.devs == new[pname].devs:
ret[pname] = pip
elif pip.jobid is None:
ret[pname] = new[pname]
Expand Down
1 change: 1 addition & 0 deletions src/tomato/daemon/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ def load(daemon: Daemon):
daemon.jobs = loaded.jobs
daemon.pips = loaded.pips
daemon.devs = loaded.devs
daemon.nextjob = loaded.nextjob
4 changes: 3 additions & 1 deletion src/tomato/daemon/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import subprocess
import logging
import json
import time
from datetime import datetime, timezone
from pathlib import Path
from threading import currentThread
Expand Down Expand Up @@ -176,7 +177,7 @@ def manager(port: int, context: zmq.Context):
poller.register(req, zmq.POLLIN)
timeout = 1000
while getattr(thread, "do_run"):
req.send_pyobj(dict(cmd="status", with_data=True))
req.send_pyobj(dict(cmd="status", with_data=True, sender=f"{__name__}.manager"))
events = dict(poller.poll(timeout))
if req not in events:
logger.warning(f"could not contact daemon in {timeout} ms")
Expand All @@ -186,4 +187,5 @@ def manager(port: int, context: zmq.Context):
manage_running_pips(daemon, req)
matched_pips = check_queued_jobs(daemon, req)
action_queued_jobs(daemon, matched_pips, req)
time.sleep(0.5)
logger.info("instructed to quit")
2 changes: 0 additions & 2 deletions src/tomato/drivers/driver_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,6 @@ def driver_worker(
log.debug(f"started 'log_listener' on pid {listener.pid}")

jobs = []
print(f"{devices=}")
print(f"{pipeline['devs']=}")
for cname, comp in pipeline["devs"].items():
dev = devices[cname]
log.info(f"device id: {cname}")
Expand Down
3 changes: 1 addition & 2 deletions src/tomato/tomato/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ def load_device_file(yamlpath: Path) -> dict:
def get_pipelines(devs: dict[str, Device], pipelines: list) -> dict[str, Pipeline]:
pips = {}
for pip in pipelines:
print(f"{pip=}")
if "*" in pip["name"]:
data = {"name": pip["name"], "devs": {}}
if len(pip["devices"]) > 1:
Expand Down Expand Up @@ -106,7 +105,7 @@ def status(
logger.debug(f"checking status of tomato on port {port}")
req = context.socket(zmq.REQ)
req.connect(f"tcp://127.0.0.1:{port}")
req.send_pyobj(dict(cmd="status", with_data=with_data))
req.send_pyobj(dict(cmd="status", with_data=with_data, sender=f"{__name__}.status"))
poller = zmq.Poller()
poller.register(req, zmq.POLLIN)
events = dict(poller.poll(timeout))
Expand Down
61 changes: 61 additions & 0 deletions tests/test_03_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import os
from pathlib import Path
import zmq
import time

from tomato import ketchup, tomato

PORT = 12345
CTXT = zmq.Context()

kwargs = dict(port=PORT, timeout=1000, context=CTXT)


def test_recover_queued_jobs(datadir, start_tomato_daemon, stop_tomato_daemon):
os.chdir(datadir)
ketchup.submit(payload="dummy_random_5_2.yml", jobname="job-1", **kwargs)
ketchup.submit(payload="dummy_random_5_2.yml", jobname="job-2", **kwargs)
tomato.stop(**kwargs)
assert os.path.exists("tomato_state_12345.pkl")
tomato.start(**kwargs, appdir=Path(), logdir=Path(), verbosity=0)
ret = tomato.status(**kwargs, with_data=True)
print(f"{ret=}")
assert ret.success
assert len(ret.data.jobs) == 2
assert ret.data.nextjob == 3


def test_recover_running_jobs(datadir, start_tomato_daemon, stop_tomato_daemon):
os.chdir(datadir)
ketchup.submit(payload="dummy_random_5_2.yml", jobname="job-1", **kwargs)
tomato.pipeline_load(**kwargs, pipeline="dummy-5", sampleid="dummy_random_5_2")
tomato.pipeline_ready(**kwargs, pipeline="dummy-5")
time.sleep(2)
tomato.stop(**kwargs)
assert os.path.exists("tomato_state_12345.pkl")
tomato.start(**kwargs, appdir=Path(), logdir=Path(), verbosity=0)
ret = tomato.status(**kwargs, with_data=True)
print(f"{ret=}")
assert ret.success
assert len(ret.data.jobs) == 1
assert ret.data.nextjob == 2
assert ret.data.jobs[1].status == "r"


def test_recover_waiting_jobs(datadir, start_tomato_daemon, stop_tomato_daemon):
os.chdir(datadir)
ketchup.submit(payload="dummy_random_5_2.yml", jobname="job-1", **kwargs)
tomato.pipeline_load(**kwargs, pipeline="dummy-5", sampleid="dummy_random_5_2")
tomato.pipeline_ready(**kwargs, pipeline="dummy-5")
time.sleep(2)
tomato.stop(**kwargs)
assert os.path.exists("tomato_state_12345.pkl")
time.sleep(5)
tomato.start(**kwargs, appdir=Path(), logdir=Path(), verbosity=0)
time.sleep(2)
ret = tomato.status(**kwargs, with_data=True)
print(f"{ret=}")
assert ret.success
assert len(ret.data.jobs) == 1
assert ret.data.nextjob == 2
assert ret.data.jobs[1].status == "c"

0 comments on commit 8f8030a

Please sign in to comment.