Skip to content

Commit

Permalink
python: remove manual jobspec/R updates
Browse files Browse the repository at this point in the history
Problem: In the kvslookup.py module "updated" jobspecs and R values
are done manually.  They could instead take advantage of the
job-info.lookup service and the recently added UPDATE flag.

Update kvslookup.py to use job-info.lookup with the UPDATE flag.  Remove
code that manually constructs updated jobspecs and R.
  • Loading branch information
chu11 committed Dec 20, 2023
1 parent e2d5a71 commit d65bc7e
Showing 1 changed file with 23 additions and 61 deletions.
84 changes: 23 additions & 61 deletions src/bindings/python/flux/job/kvslookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import errno
import json

import flux.constants
from _flux._core import ffi, lib
from flux.future import WaitAllFuture
from flux.job import JobID, JobspecV1
Expand Down Expand Up @@ -48,26 +49,19 @@ def get_decode(self):
return data


def job_info_lookup(flux_handle, jobid, keys=["jobspec"]):
payload = {"id": int(jobid), "keys": keys, "flags": 0}
def job_info_lookup(flux_handle, jobid, keys=["jobspec"], flags=0):
payload = {"id": int(jobid), "keys": keys, "flags": flags}
rpc = JobInfoLookupRPC(flux_handle, "job-info.lookup", payload)
rpc.jobid = jobid
return rpc


def _setup_lookup_keys(keys, original, base):
def _setup_lookup_keys(keys, original):
if "jobspec" in keys:
if original:
keys.remove("jobspec")
if "J" not in keys:
keys.append("J")
elif not base:
if "eventlog" not in keys:
keys.append("eventlog")
if "R" in keys:
if not base:
if "eventlog" not in keys:
keys.append("eventlog")


def _get_original_jobspec(job_data):
Expand All @@ -78,58 +72,14 @@ def _get_original_jobspec(job_data):
return result.decode("utf-8")


def _get_updated_jobspec(job_data):
if isinstance(job_data["jobspec"], str):
data = json.loads(job_data["jobspec"])
else:
data = job_data["jobspec"]
jobspec = JobspecV1(**data)
for entry in job_data["eventlog"].splitlines():
event = EventLogEvent(entry)
if event.name == "jobspec-update":
for key, value in event.context.items():
jobspec.setattr(key, value)
return jobspec.dumps()


def _get_updated_R(job_data):
if isinstance(job_data["R"], str):
R = json.loads(job_data["R"])
else:
R = job_data["R"]
for entry in job_data["eventlog"].splitlines():
event = EventLogEvent(entry)
if event.name == "resource-update":
for key, value in event.context.items():
if key == "expiration":
set_treedict(R, "execution.expiration", value)
return json.dumps(R, ensure_ascii=False)


def _update_keys(job_data, decode, keys, original, base):
remove_eventlog = False
def _update_keys(job_data, decode, keys, original):
if "jobspec" in keys:
if original:
job_data["jobspec"] = _get_original_jobspec(job_data)
if decode:
_decode_field(job_data, "jobspec")
if "J" not in keys:
job_data.pop("J")
elif not base:
job_data["jobspec"] = _get_updated_jobspec(job_data)
if decode:
_decode_field(job_data, "jobspec")
if "eventlog" not in keys:
remove_eventlog = True
if "R" in keys:
if not base:
job_data["R"] = _get_updated_R(job_data)
if decode:
_decode_field(job_data, "R")
if "eventlog" not in keys:
remove_eventlog = True
if remove_eventlog:
job_data.pop("eventlog")


# jobs_kvs_lookup simple variant for one jobid
Expand All @@ -153,15 +103,21 @@ def job_kvs_lookup(
:base: For 'jobspec' or 'R', get base value, do not apply updates from eventlog
"""
keyslookup = list(keys)
_setup_lookup_keys(keyslookup, original, base)
payload = {"id": int(jobid), "keys": keyslookup, "flags": 0}
_setup_lookup_keys(keyslookup, original)
# N.B. JobInfoLookupRPC() has a "get_decode()" member
# function, so we will always get the non-decoded result from
# job-info.
flags = 0
if not base:
flags |= flux.constants.FLUX_JOB_LOOKUP_UPDATE
payload = {"id": int(jobid), "keys": keyslookup, "flags": flags}
rpc = JobInfoLookupRPC(flux_handle, "job-info.lookup", payload)
try:
if decode:
rsp = rpc.get_decode()
else:
rsp = rpc.get()
_update_keys(rsp, decode, keys, original, base)
_update_keys(rsp, decode, keys, original)
# The job does not exist!
except FileNotFoundError:
return None
Expand Down Expand Up @@ -242,7 +198,7 @@ def __init__(
self.original = original
self.base = base
self.errors = []
_setup_lookup_keys(self.keyslookup, self.original, self.base)
_setup_lookup_keys(self.keyslookup, self.original)

def fetch_data(self):
"""Initiate the job info lookup to the Flux job-info module
Expand All @@ -255,9 +211,15 @@ def fetch_data(self):
JobKVSLookupFuture.errors is non-empty, then it will contain a
list of errors returned via the query.
"""
flags = 0
# N.B. JobInfoLookupRPC() has a "get_decode()" member
# function, so we will always get the non-decoded result from
# job-info.
if not self.base:
flags |= flux.constants.FLUX_JOB_LOOKUP_UPDATE
listids = JobKVSLookupFuture()
for jobid in self.ids:
listids.push(job_info_lookup(self.handle, jobid, self.keyslookup))
listids.push(job_info_lookup(self.handle, jobid, self.keyslookup, flags))
return listids

def data(self):
Expand All @@ -276,5 +238,5 @@ def data(self):
if hasattr(rpc, "errors"):
self.errors = rpc.errors
for job_data in data:
_update_keys(job_data, self.decode, self.keys, self.original, self.base)
_update_keys(job_data, self.decode, self.keys, self.original)
return data

0 comments on commit d65bc7e

Please sign in to comment.