Skip to content

Commit

Permalink
Merge pull request #5 from ML4GW/query-refactor
Browse files Browse the repository at this point in the history
Query refactor
  • Loading branch information
EthanMarx authored Jan 8, 2024
2 parents 6f55f47 + 2099f15 commit c71a76a
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 6 deletions.
27 changes: 24 additions & 3 deletions pycondor/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,24 @@ def __init__(self, id: int):
# the ids and condor files for each of the associated
# processes in order to avoid having to make a query
# for each process in the cluster.
configs = query(id)
self.procs = self.query_procs()

def refresh(self):
# requery procs for current running jobs.
# if new procs have been added,
# append them to proc list.
# this is to handle use cases where
# all the jobs in the cluster may not have been
# submitted up front. For example, setting the
# `max_materialze` condor argument

current = self.query_procs()
for proc in current:
if proc.id not in self.proc_ids:
self.procs.append(proc)

def query_procs(self):
configs = query(self.id)
procs = []
for config in configs:
proc = Proc(
Expand All @@ -59,15 +76,15 @@ def __init__(self, id: int):
err=config["err"],
)
procs.append(proc)

self.procs: list[Proc] = procs
return procs

def get_statuses(self) -> List[JobStatus]:
"""
Condor query the cluster id and parse the responses
for individual processes to get their statuses,
returned as a list.
"""
self.refresh()
configs = query(self.id)
configs = {int(c.pop("procid")): c for c in configs}
statuses = []
Expand Down Expand Up @@ -97,6 +114,10 @@ def check_status(
reducer = any if how == "any" else all
return reducer([i in status for i in statuses])

@property
def proc_ids(self):
return [proc.id for proc in self.procs]

@property
def id(self):
return self._id
Expand Down
6 changes: 3 additions & 3 deletions pycondor/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from .cluster import JobCluster
from .utils import (checkdir, string_rep, requires_command,
split_command_string)
split_command_string, decode_string)
from .basenode import BaseNode

JobArg = namedtuple('JobArg', ['arg', 'name', 'retry'])
Expand Down Expand Up @@ -446,12 +446,12 @@ def submit_job(self, submit_options=None):

# check if the job submission reported any errors
if err:
msg = err.strip().replace('ERROR: ', '')
msg = decode_string(err).strip().replace('ERROR: ', '')
raise FailedSubmitError(msg)

# otherwise, try to parse the stdout to determine
# the id of the cluster of submitted jobs
match = re.search(r'(?<=submitted\sto\scluster )[0-9]+', out)
match = re.search(r'(?<=submitted\sto\scluster )[0-9]+', decode_string(out))
if match is None:
raise ValueError(
'Something went wrong, couldn\'t retrieve cluster id '
Expand Down

0 comments on commit c71a76a

Please sign in to comment.