Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

job-info: support lookup of updated jobspec, remove manual construction of updated R / jobspec #5635

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 24 additions & 64 deletions src/bindings/python/flux/job/kvslookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@
import errno
import json

import flux.constants
from _flux._core import ffi, lib
from flux.future import WaitAllFuture
from flux.job import JobID, JobspecV1
from flux.job.event import EventLogEvent
from flux.job import JobID
from flux.rpc import RPC
from flux.util import set_treedict


def _decode_field(data, key):
Expand Down Expand Up @@ -48,26 +47,19 @@ def get_decode(self):
return data


def job_info_lookup(flux_handle, jobid, keys=["jobspec"]):
payload = {"id": int(jobid), "keys": keys, "flags": 0}
def job_info_lookup(flux_handle, jobid, keys=["jobspec"], flags=0):
payload = {"id": int(jobid), "keys": keys, "flags": flags}
rpc = JobInfoLookupRPC(flux_handle, "job-info.lookup", payload)
rpc.jobid = jobid
return rpc


def _setup_lookup_keys(keys, original, base):
def _setup_lookup_keys(keys, original):
if "jobspec" in keys:
if original:
keys.remove("jobspec")
if "J" not in keys:
keys.append("J")
elif not base:
if "eventlog" not in keys:
keys.append("eventlog")
if "R" in keys:
if not base:
if "eventlog" not in keys:
keys.append("eventlog")


def _get_original_jobspec(job_data):
Expand All @@ -78,58 +70,14 @@ def _get_original_jobspec(job_data):
return result.decode("utf-8")


def _get_updated_jobspec(job_data):
if isinstance(job_data["jobspec"], str):
data = json.loads(job_data["jobspec"])
else:
data = job_data["jobspec"]
jobspec = JobspecV1(**data)
for entry in job_data["eventlog"].splitlines():
event = EventLogEvent(entry)
if event.name == "jobspec-update":
for key, value in event.context.items():
jobspec.setattr(key, value)
return jobspec.dumps()


def _get_updated_R(job_data):
if isinstance(job_data["R"], str):
R = json.loads(job_data["R"])
else:
R = job_data["R"]
for entry in job_data["eventlog"].splitlines():
event = EventLogEvent(entry)
if event.name == "resource-update":
for key, value in event.context.items():
if key == "expiration":
set_treedict(R, "execution.expiration", value)
return json.dumps(R, ensure_ascii=False)


def _update_keys(job_data, decode, keys, original, base):
remove_eventlog = False
def _update_keys(job_data, decode, keys, original):
if "jobspec" in keys:
if original:
job_data["jobspec"] = _get_original_jobspec(job_data)
if decode:
_decode_field(job_data, "jobspec")
if "J" not in keys:
job_data.pop("J")
elif not base:
job_data["jobspec"] = _get_updated_jobspec(job_data)
if decode:
_decode_field(job_data, "jobspec")
if "eventlog" not in keys:
remove_eventlog = True
if "R" in keys:
if not base:
job_data["R"] = _get_updated_R(job_data)
if decode:
_decode_field(job_data, "R")
if "eventlog" not in keys:
remove_eventlog = True
if remove_eventlog:
job_data.pop("eventlog")


# jobs_kvs_lookup simple variant for one jobid
Expand All @@ -153,15 +101,21 @@ def job_kvs_lookup(
:base: For 'jobspec' or 'R', get base value, do not apply updates from eventlog
"""
keyslookup = list(keys)
_setup_lookup_keys(keyslookup, original, base)
payload = {"id": int(jobid), "keys": keyslookup, "flags": 0}
_setup_lookup_keys(keyslookup, original)
# N.B. JobInfoLookupRPC() has a "get_decode()" member
# function, so we will always get the non-decoded result from
# job-info.
flags = 0
if not base:
flags |= flux.constants.FLUX_JOB_LOOKUP_CURRENT
payload = {"id": int(jobid), "keys": keyslookup, "flags": flags}
rpc = JobInfoLookupRPC(flux_handle, "job-info.lookup", payload)
try:
if decode:
rsp = rpc.get_decode()
else:
rsp = rpc.get()
_update_keys(rsp, decode, keys, original, base)
_update_keys(rsp, decode, keys, original)
# The job does not exist!
except FileNotFoundError:
return None
Expand Down Expand Up @@ -242,7 +196,7 @@ def __init__(
self.original = original
self.base = base
self.errors = []
_setup_lookup_keys(self.keyslookup, self.original, self.base)
_setup_lookup_keys(self.keyslookup, self.original)

def fetch_data(self):
"""Initiate the job info lookup to the Flux job-info module
Expand All @@ -255,9 +209,15 @@ def fetch_data(self):
JobKVSLookupFuture.errors is non-empty, then it will contain a
list of errors returned via the query.
"""
flags = 0
# N.B. JobInfoLookupRPC() has a "get_decode()" member
# function, so we will always get the non-decoded result from
# job-info.
if not self.base:
flags |= flux.constants.FLUX_JOB_LOOKUP_CURRENT
listids = JobKVSLookupFuture()
for jobid in self.ids:
listids.push(job_info_lookup(self.handle, jobid, self.keyslookup))
listids.push(job_info_lookup(self.handle, jobid, self.keyslookup, flags))
return listids

def data(self):
Expand All @@ -276,5 +236,5 @@ def data(self):
if hasattr(rpc, "errors"):
self.errors = rpc.errors
for job_data in data:
_update_keys(job_data, self.decode, self.keys, self.original, self.base)
_update_keys(job_data, self.decode, self.keys, self.original)
return data
64 changes: 3 additions & 61 deletions src/cmd/flux-job.c
Original file line number Diff line number Diff line change
Expand Up @@ -3308,42 +3308,6 @@ int cmd_wait_event (optparse_t *p, int argc, char **argv)
return (0);
}

char *reconstruct_current_jobspec (const char *jobspec_str,
const char *eventlog_str)
{
json_t *jobspec;
json_t *eventlog;
size_t index;
json_t *entry;
char *result;

if (!(jobspec = json_loads (jobspec_str, 0, NULL)))
log_msg_exit ("error decoding jobspec");
if (!(eventlog = eventlog_decode (eventlog_str)))
log_msg_exit ("error decoding eventlog");
json_array_foreach (eventlog, index, entry) {
const char *name;
json_t *context;

if (eventlog_entry_parse (entry, NULL, &name, &context) < 0)
log_msg_exit ("error decoding eventlog entry");
if (streq (name, "jobspec-update")) {
const char *path;
json_t *value;

json_object_foreach (context, path, value) {
if (jpath_set (jobspec, path, value) < 0)
log_err_exit ("failed to update jobspec");
}
}
}
if (!(result = json_dumps (jobspec, JSON_COMPACT)))
log_msg_exit ("failed to encode jobspec object");
json_decref (jobspec);
json_decref (eventlog);
return result;
}

void info_usage (void)
{
fprintf (stderr,
Expand Down Expand Up @@ -3403,33 +3367,11 @@ int cmd_info (optparse_t *p, int argc, char **argv)
log_msg_exit ("Failed to unwrap J to get jobspec: %s", error.text);
val = new_val;
}
/* The current (non --base) jobspec has to be reconstructed by fetching
* the jobspec and the eventlog and updating the former with the latter.
*/
else if (!optparse_hasopt (p, "base") && streq (key, "jobspec")) {
const char *jobspec;
const char *eventlog;

// fetch the two keys in parallel
if (!(f = flux_rpc_pack (h,
"job-info.lookup",
FLUX_NODEID_ANY,
0,
"{s:I s:[ss] s:i}",
"id", id,
"keys", "jobspec", "eventlog",
"flags", 0))
|| flux_rpc_get_unpack (f,
"{s:s s:s}",
"jobspec", &jobspec,
"eventlog", &eventlog) < 0)
log_msg_exit ("%s", future_strerror (f, errno));
val = new_val = reconstruct_current_jobspec (jobspec, eventlog);
}
/* The current (non --base) R is obtained through the
/* The current (non --base) R and jobspec are obtained through the
* job-info.lookup RPC w/ the CURRENT flag.
*/
else if (!optparse_hasopt (p, "base") && streq (key, "R")) {
else if (!optparse_hasopt (p, "base")
&& (streq (key, "R") || streq (key, "jobspec"))) {
if (!(f = flux_rpc_pack (h,
"job-info.lookup",
FLUX_NODEID_ANY,
Expand Down
2 changes: 1 addition & 1 deletion src/common/libjob/job.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ enum job_lookup_flags {
FLUX_JOB_LOOKUP_JSON_DECODE = 1,
/* get current value of special fields by applying eventlog
* updates for those fields
* - currently works for R
* - currently works for jobspec and R
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commit message typo: "an current/update jobspec"

Also, perhaps capitalize the first "current" because it reads as "the current flag" instead of "the flag that is named current" and is a bit confusing with all the other currents in the description.

*/
FLUX_JOB_LOOKUP_CURRENT = 2,
};
Expand Down
13 changes: 11 additions & 2 deletions src/modules/job-info/lookup.c
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ static int lookup_current (struct lookup_ctx *l,

if (streq (key, "R"))
update_event_name = "resource-update";
else if (streq (key, "jobspec"))
update_event_name = "jobspec-update";

if (!(value_object = json_loads (value, 0, NULL))) {
errno = EINVAL;
Expand Down Expand Up @@ -205,6 +207,12 @@ static int lookup_current (struct lookup_ctx *l,
if (streq (name, update_event_name)) {
if (streq (key, "R"))
apply_updates_R (l->ctx->h, l->id, key, value_object, context);
else if (streq (key, "jobspec"))
apply_updates_jobspec (l->ctx->h,
l->id,
key,
value_object,
context);
}
}

Expand Down Expand Up @@ -288,7 +296,8 @@ static void info_lookup_continuation (flux_future_t *fall, void *arg)
}

if ((l->flags & FLUX_JOB_LOOKUP_CURRENT)
&& streq (keystr, "R")) {
&& (streq (keystr, "R")
|| streq (keystr, "jobspec"))) {
if (lookup_current (l, fall, keystr, s, &current_value) < 0)
goto error;
s = current_value;
Expand Down Expand Up @@ -442,7 +451,7 @@ static int lookup_cached (struct lookup_ctx *l)

key_str = json_string_value (key);

if (!streq (key_str, "R"))
if (!streq (key_str, "R") && !streq (key_str, "jobspec"))
return 0;

if ((ret = update_watch_get_cached (l->ctx,
Expand Down
16 changes: 15 additions & 1 deletion src/modules/job-info/update.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ static struct update_ctx *update_ctx_create (struct info_ctx *ctx,
goto error;
if (streq (key, "R"))
uc->update_name = "resource-update";
else if (streq (key, "jobspec"))
uc->update_name = "jobspec-update";
else {
errno = EINVAL;
goto error;
Expand Down Expand Up @@ -180,6 +182,12 @@ static void eventlog_continuation (flux_future_t *f, void *arg)
uc->key,
uc->update_object,
context);
else if (streq (uc->key, "jobspec"))
apply_updates_jobspec (uc->ctx->h,
uc->id,
uc->key,
uc->update_object,
context);

msg = flux_msglist_first (uc->msglist);
while (msg) {
Expand Down Expand Up @@ -315,6 +323,12 @@ static void lookup_continuation (flux_future_t *f, void *arg)
uc->key,
uc->update_object,
context);
else if (streq (uc->key, "jobspec"))
apply_updates_jobspec (uc->ctx->h,
uc->id,
uc->key,
uc->update_object,
context);
uc->initial_update_count++;
}
else if (streq (name, "clean"))
Expand Down Expand Up @@ -474,7 +488,7 @@ void update_watch_cb (flux_t *h, flux_msg_handler_t *mh,
errmsg = "update-watch request rejected without streaming RPC flag";
goto error;
}
if (!streq (key, "R")) {
if (!streq (key, "R") && !streq (key, "jobspec")) {
errno = EINVAL;
errmsg = "update-watch unsupported key specified";
goto error;
Expand Down
19 changes: 19 additions & 0 deletions src/modules/job-info/util.c
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,25 @@
}
}

void apply_updates_jobspec (flux_t *h,
flux_jobid_t id,
const char *key,
json_t *jobspec,
json_t *context)
{
const char *ckey;
json_t *value;

json_object_foreach (context, ckey, value) {
if (jpath_set (jobspec,
ckey,
value) < 0)
flux_log (h, LOG_INFO,

Check warning on line 141 in src/modules/job-info/util.c

View check run for this annotation

Codecov / codecov/patch

src/modules/job-info/util.c#L141

Added line #L141 was not covered by tests
"%s: failed to update job %s %s",
__FUNCTION__, idf58 (id), key);
}
}

/*
* vi:tabstop=4 shiftwidth=4 expandtab
*/
7 changes: 7 additions & 0 deletions src/modules/job-info/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ void apply_updates_R (flux_t *h,
json_t *R,
json_t *context);

/* apply context updates to the jobspec object */
void apply_updates_jobspec (flux_t *h,
flux_jobid_t id,
const char *key,
json_t *jobspec,
json_t *context);

#endif /* ! _FLUX_JOB_INFO_UTIL_H */

/*
Expand Down
1 change: 1 addition & 0 deletions t/python/t0014-job-kvslookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ def test_info_00_job_info_lookup(self):
data = rpc.get()
self.check_jobspec_str(data, self.jobid1, 0)
data = rpc.get_decode()
self.check_jobspec_decoded(data, self.jobid1, 0)
self.assertEqual(data["id"], self.jobid1, 0)

def test_info_01_job_info_lookup_keys(self):
Expand Down
Loading
Loading