From 068220fc15d8c7b9634c8051bf21d7aa6a414c26 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Sun, 26 Nov 2023 15:18:31 -0800 Subject: [PATCH] python: remove manual construction of jobspec/R Problem: In the kvslookup.py module, the current jobspecs and R values are done manually. They could instead take advantage of the job-info.lookup service and the recently added CURRENT flag. Update kvslookup.py to use job-info.lookup with the CURRENT flag. Remove code that manually constructs current jobspecs and R. --- src/bindings/python/flux/job/kvslookup.py | 87 ++++++----------------- 1 file changed, 23 insertions(+), 64 deletions(-) diff --git a/src/bindings/python/flux/job/kvslookup.py b/src/bindings/python/flux/job/kvslookup.py index 276ec08635b5..d6766d819cd0 100644 --- a/src/bindings/python/flux/job/kvslookup.py +++ b/src/bindings/python/flux/job/kvslookup.py @@ -10,12 +10,10 @@ import errno import json +import flux.constants from _flux._core import ffi, lib from flux.future import WaitAllFuture -from flux.job import JobID, JobspecV1 -from flux.job.event import EventLogEvent from flux.rpc import RPC -from flux.util import set_treedict def _decode_field(data, key): @@ -48,26 +46,19 @@ def get_decode(self): return data -def job_info_lookup(flux_handle, jobid, keys=["jobspec"]): - payload = {"id": int(jobid), "keys": keys, "flags": 0} +def job_info_lookup(flux_handle, jobid, keys=["jobspec"], flags=0): + payload = {"id": int(jobid), "keys": keys, "flags": flags} rpc = JobInfoLookupRPC(flux_handle, "job-info.lookup", payload) rpc.jobid = jobid return rpc -def _setup_lookup_keys(keys, original, base): +def _setup_lookup_keys(keys, original): if "jobspec" in keys: if original: keys.remove("jobspec") if "J" not in keys: keys.append("J") - elif not base: - if "eventlog" not in keys: - keys.append("eventlog") - if "R" in keys: - if not base: - if "eventlog" not in keys: - keys.append("eventlog") def _get_original_jobspec(job_data): @@ -78,36 +69,7 @@ def _get_original_jobspec(job_data): return result.decode("utf-8") -def _get_updated_jobspec(job_data): - if isinstance(job_data["jobspec"], str): - data = json.loads(job_data["jobspec"]) - else: - data = job_data["jobspec"] - jobspec = JobspecV1(**data) - for entry in job_data["eventlog"].splitlines(): - event = EventLogEvent(entry) - if event.name == "jobspec-update": - for key, value in event.context.items(): - jobspec.setattr(key, value) - return jobspec.dumps() - - -def _get_updated_R(job_data): - if isinstance(job_data["R"], str): - R = json.loads(job_data["R"]) - else: - R = job_data["R"] - for entry in job_data["eventlog"].splitlines(): - event = EventLogEvent(entry) - if event.name == "resource-update": - for key, value in event.context.items(): - if key == "expiration": - set_treedict(R, "execution.expiration", value) - return json.dumps(R, ensure_ascii=False) - - -def _update_keys(job_data, decode, keys, original, base): - remove_eventlog = False +def _update_keys(job_data, decode, keys, original): if "jobspec" in keys: if original: job_data["jobspec"] = _get_original_jobspec(job_data) @@ -115,21 +77,6 @@ def _update_keys(job_data, decode, keys, original, base): _decode_field(job_data, "jobspec") if "J" not in keys: job_data.pop("J") - elif not base: - job_data["jobspec"] = _get_updated_jobspec(job_data) - if decode: - _decode_field(job_data, "jobspec") - if "eventlog" not in keys: - remove_eventlog = True - if "R" in keys: - if not base: - job_data["R"] = _get_updated_R(job_data) - if decode: - _decode_field(job_data, "R") - if "eventlog" not in keys: - remove_eventlog = True - if remove_eventlog: - job_data.pop("eventlog") # jobs_kvs_lookup simple variant for one jobid @@ -153,15 +100,21 @@ def job_kvs_lookup( :base: For 'jobspec' or 'R', get base value, do not apply updates from eventlog """ keyslookup = list(keys) - _setup_lookup_keys(keyslookup, original, base) - payload = {"id": int(jobid), "keys": keyslookup, "flags": 0} + _setup_lookup_keys(keyslookup, original) + # N.B. JobInfoLookupRPC() has a "get_decode()" member + # function, so we will always get the non-decoded result from + # job-info. + flags = 0 + if not base: + flags |= flux.constants.FLUX_JOB_LOOKUP_CURRENT + payload = {"id": int(jobid), "keys": keyslookup, "flags": flags} rpc = JobInfoLookupRPC(flux_handle, "job-info.lookup", payload) try: if decode: rsp = rpc.get_decode() else: rsp = rpc.get() - _update_keys(rsp, decode, keys, original, base) + _update_keys(rsp, decode, keys, original) # The job does not exist! except FileNotFoundError: return None @@ -242,7 +195,7 @@ def __init__( self.original = original self.base = base self.errors = [] - _setup_lookup_keys(self.keyslookup, self.original, self.base) + _setup_lookup_keys(self.keyslookup, self.original) def fetch_data(self): """Initiate the job info lookup to the Flux job-info module @@ -255,9 +208,15 @@ def fetch_data(self): JobKVSLookupFuture.errors is non-empty, then it will contain a list of errors returned via the query. """ + flags = 0 + # N.B. JobInfoLookupRPC() has a "get_decode()" member + # function, so we will always get the non-decoded result from + # job-info. + if not self.base: + flags |= flux.constants.FLUX_JOB_LOOKUP_CURRENT listids = JobKVSLookupFuture() for jobid in self.ids: - listids.push(job_info_lookup(self.handle, jobid, self.keyslookup)) + listids.push(job_info_lookup(self.handle, jobid, self.keyslookup, flags)) return listids def data(self): @@ -276,5 +235,5 @@ def data(self): if hasattr(rpc, "errors"): self.errors = rpc.errors for job_data in data: - _update_keys(job_data, self.decode, self.keys, self.original, self.base) + _update_keys(job_data, self.decode, self.keys, self.original) return data