Skip to content

Commit

Permalink
Merge pull request #5586 from chu11/pr5467_fixes
Browse files Browse the repository at this point in the history
job-info: misc cleanup
  • Loading branch information
mergify[bot] authored Nov 22, 2023
2 parents 420e8e7 + df00e22 commit 5a0b0b5
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 125 deletions.
108 changes: 66 additions & 42 deletions src/modules/job-info/guest_watch.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ struct guest_watch_ctx {
flux_jobid_t id;
char *path;
int flags;
bool cancel;
bool eventlog_watch_canceled;
bool cancel; /* cancel or disconnect */

/* transition possibilities
*
Expand Down Expand Up @@ -178,12 +179,16 @@ static struct guest_watch_ctx *guest_watch_ctx_create (struct info_ctx *ctx,
return NULL;
}

static int send_cancel (struct guest_watch_ctx *gw, flux_future_t *f)
static int send_eventlog_watch_cancel (struct guest_watch_ctx *gw,
flux_future_t *f,
bool cancel)
{
if (!gw->cancel) {
if (!gw->eventlog_watch_canceled) {
flux_future_t *f2;
int matchtag;

gw->cancel = cancel;

if (!f) {
if (gw->state == GUEST_WATCH_STATE_WAIT_GUEST_NAMESPACE)
f = gw->wait_guest_namespace_f;
Expand All @@ -192,20 +197,23 @@ static int send_cancel (struct guest_watch_ctx *gw, flux_future_t *f)
else if (gw->state == GUEST_WATCH_STATE_MAIN_NAMESPACE_LOOKUP) {
/* Since this is a lookup, we don't need to perform an actual
* cancel to "job-info.eventlog-watch-cancel". Just return
* ENODATA to the caller.
* ENODATA to the caller if necessary.
*/
gw->cancel = true;
if (flux_respond_error (gw->ctx->h,
gw->msg,
ENODATA,
NULL) < 0)
flux_log_error (gw->ctx->h, "%s: flux_respond_error",
__FUNCTION__);
gw->eventlog_watch_canceled = true;
if (gw->cancel) {
if (flux_respond_error (gw->ctx->h,
gw->msg,
ENODATA,
NULL) < 0)
flux_log_error (gw->ctx->h, "%s: flux_respond_error",
__FUNCTION__);
}
return 0;
}
else {
/* gw->state == GUEST_WATCH_STATE_INIT */
gw->cancel = true;
/* gw->state == GUEST_WATCH_STATE_INIT, eventlog-watch
* never started so sort of "auto-canceled" */
gw->eventlog_watch_canceled = true;
return 0;
}
}
Expand All @@ -222,7 +230,7 @@ static int send_cancel (struct guest_watch_ctx *gw, flux_future_t *f)
return -1;
}
flux_future_destroy (f2);
gw->cancel = true;
gw->eventlog_watch_canceled = true;
}
return 0;
}
Expand Down Expand Up @@ -323,9 +331,11 @@ static void get_main_eventlog_continuation (flux_future_t *f, void *arg)
goto error;
}

if (gw->cancel) {
if (flux_respond_error (ctx->h, gw->msg, ENODATA, NULL) < 0)
flux_log_error (ctx->h, "%s: flux_respond_error", __FUNCTION__);
if (gw->eventlog_watch_canceled) {
if (gw->cancel) {
if (flux_respond_error (ctx->h, gw->msg, ENODATA, NULL) < 0)
flux_log_error (ctx->h, "%s: flux_respond_error", __FUNCTION__);
}
goto done;
}

Expand Down Expand Up @@ -455,9 +465,11 @@ static void wait_guest_namespace_continuation (flux_future_t *f, void *arg)
if (gw->guest_started) {
/* check for racy cancel - user canceled while this
* error was in transit */
if (gw->cancel) {
if (gw->eventlog_watch_canceled) {
errno = ENODATA;
goto error;
if (gw->cancel)
goto error;
goto cleanup;
}
if (guest_namespace_watch (gw) < 0)
goto error;
Expand All @@ -470,9 +482,11 @@ static void wait_guest_namespace_continuation (flux_future_t *f, void *arg)
goto error;
}

if (gw->cancel) {
if (gw->eventlog_watch_canceled) {
errno = ENODATA;
goto error;
if (gw->cancel)
goto error;
goto cleanup;
}

if (flux_job_event_watch_get (f, &event) < 0) {
Expand All @@ -488,8 +502,8 @@ static void wait_guest_namespace_continuation (flux_future_t *f, void *arg)
flux_future_t *f2;

/* cancel this watcher, and once its canceled, watch the guest
* namespace. Don't call send_cancel(), this is not an error
* or "full" cancel */
* namespace. Don't call send_eventlog_watch_cancel(), this
* is not an error or "full" cancel */
if (!(f2 = flux_rpc_pack (gw->ctx->h,
"job-info.eventlog-watch-cancel",
FLUX_NODEID_ANY,
Expand All @@ -508,16 +522,18 @@ static void wait_guest_namespace_continuation (flux_future_t *f, void *arg)
error_cancel:
/* If we haven't sent a cancellation yet, must do so so that
* the future's matchtag will eventually be freed */
if (!gw->cancel) {
if (!gw->eventlog_watch_canceled) {
int save_errno = errno;
(void) send_cancel (gw, gw->wait_guest_namespace_f);
(void) send_eventlog_watch_cancel (gw,
gw->wait_guest_namespace_f,
false);
errno = save_errno;
}

error:
if (flux_respond_error (ctx->h, gw->msg, errno, NULL) < 0)
flux_log_error (ctx->h, "%s: flux_respond_error", __FUNCTION__);

cleanup:
/* flux future destroyed in guest_watch_ctx_destroy, which is
* called via zlist_remove() */
zlist_remove (ctx->guest_watchers, gw);
Expand Down Expand Up @@ -591,28 +607,33 @@ static void guest_namespace_watch_continuation (flux_future_t *f, void *arg)
*/
/* check for racy cancel - user canceled while this
* error was in transit */
if (gw->cancel) {
if (gw->eventlog_watch_canceled) {
errno = ENODATA;
goto error;
if (gw->cancel)
goto error;
goto cleanup;
}
if (main_namespace_lookup (gw) < 0)
goto error;
return;
}
else {
/* We assume ENODATA always comes from a user cancellation,
* or similar error. There is no circumstance where would
* desire to ENODATA this stream.
/* Generally speaking we assume ENODATA always comes from
* a user cancellation, or similar error. There is no
* circumstance where would desire to ENODATA this stream.
*/
if (errno != ENOENT && errno != ENODATA)
flux_log_error (ctx->h, "%s: flux_rpc_get", __FUNCTION__);
goto error;
}
}

if (gw->cancel) {
errno = ENODATA;
goto error;
if (gw->eventlog_watch_canceled) {
if (gw->cancel) {
errno = ENODATA;
goto error;
}
goto cleanup;
}

if (flux_respond_pack (ctx->h, gw->msg, "{s:s}", "event", event) < 0) {
Expand All @@ -628,16 +649,18 @@ static void guest_namespace_watch_continuation (flux_future_t *f, void *arg)
error_cancel:
/* If we haven't sent a cancellation yet, must do so so that
* the future's matchtag will eventually be freed */
if (!gw->cancel) {
if (!gw->eventlog_watch_canceled) {
int save_errno = errno;
(void) send_cancel (gw, gw->guest_namespace_watch_f);
(void) send_eventlog_watch_cancel (gw,
gw->guest_namespace_watch_f,
false);
errno = save_errno;
}

error:
if (flux_respond_error (ctx->h, gw->msg, errno, NULL) < 0)
flux_log_error (ctx->h, "%s: flux_respond_error", __FUNCTION__);

cleanup:
/* flux future destroyed in guest_watch_ctx_destroy, which is called
* via zlist_remove() */
zlist_remove (ctx->guest_watchers, gw);
Expand Down Expand Up @@ -727,13 +750,14 @@ static void main_namespace_lookup_continuation (flux_future_t *f, void *arg)
goto error;
}

if (gw->cancel) {
/* already sent ENODATA via send_cancel(), so just cleanup */
if (gw->eventlog_watch_canceled) {
/* already sent ENODATA via send_eventlog_watch_cancel(), so
* just cleanup */
goto cleanup;
}

input = s + gw->offset;
while (eventlog_parse_next (&input, &tok, &toklen)) {
while (get_next_eventlog_entry (&input, &tok, &toklen)) {
if (flux_respond_pack (ctx->h, gw->msg,
"{s:s#}",
"event", tok, toklen) < 0) {
Expand Down Expand Up @@ -797,7 +821,7 @@ static void guest_watch_cancel (struct info_ctx *ctx,
else
match = flux_disconnect_match (msg, gw->msg);
if (match)
send_cancel (gw, NULL);
send_eventlog_watch_cancel (gw, NULL, cancel);
}

void guest_watchers_cancel (struct info_ctx *ctx,
Expand All @@ -818,7 +842,7 @@ void guest_watch_cleanup (struct info_ctx *ctx)
struct guest_watch_ctx *gw;

while ((gw = zlist_pop (ctx->guest_watchers))) {
send_cancel (gw, NULL);
send_eventlog_watch_cancel (gw, NULL, false);

if (flux_respond_error (ctx->h, gw->msg, ENOSYS, NULL) < 0)
flux_log_error (ctx->h, "%s: flux_respond_error",
Expand Down
63 changes: 25 additions & 38 deletions src/modules/job-info/update.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,8 @@ static struct update_ctx *update_ctx_create (struct info_ctx *ctx,
uc->ctx = ctx;
uc->type = type;
uc->id = id;
if (!(uc->key = strdup (key))) {
errno = ENOMEM;
if (!(uc->key = strdup (key)))
goto error;
}
if (streq (key, "R"))
uc->update_name = "resource-update";
else {
Expand Down Expand Up @@ -291,9 +289,9 @@ static void lookup_continuation (flux_future_t *f, void *arg)
struct info_ctx *ctx = uc->ctx;
const char *key_str;
const char *eventlog_str;
const char *input;
const char *tok;
size_t toklen;
json_t *eventlog = NULL;
size_t index;
json_t *entry;
const char *errmsg = NULL;
bool job_ended = false;
bool submit_parsed = false;
Expand All @@ -317,17 +315,15 @@ static void lookup_continuation (flux_future_t *f, void *arg)
goto error;
}

input = eventlog_str;
while (eventlog_parse_next (&input, &tok, &toklen)) {
json_t *entry = NULL;
if (!(eventlog = eventlog_decode (eventlog_str))) {
errno = EINVAL;
errmsg = "lookup eventlog cannot be parsed";
goto error;
}
json_array_foreach (eventlog, index, entry) {
const char *name;
json_t *context = NULL;
if (eventlog_parse_entry_chunk (uc->ctx->h,
tok,
toklen,
&entry,
&name,
&context) < 0) {
if (eventlog_entry_parse (entry, NULL, &name, &context) < 0) {
errmsg = "error parsing eventlog";
goto error;
}
Expand All @@ -349,7 +345,6 @@ static void lookup_continuation (flux_future_t *f, void *arg)
}
else if (streq (name, "clean"))
job_ended = true;
json_decref (entry);
}

/* double check, generally speaking should be impossible */
Expand Down Expand Up @@ -403,6 +398,7 @@ static void lookup_continuation (flux_future_t *f, void *arg)
if (eventlog_watch (uc) < 0)
goto error;

json_decref (eventlog);
return;

error:
Expand All @@ -422,6 +418,7 @@ static void lookup_continuation (flux_future_t *f, void *arg)
}
else
zlist_remove (ctx->update_lookups, uc);
json_decref (eventlog);
}

static int update_lookup (struct info_ctx *ctx,
Expand Down Expand Up @@ -639,34 +636,24 @@ static void update_watch_cancel (struct update_ctx *uc,
const flux_msg_t *msg,
bool cancel)
{
const flux_msg_t *cmpmsg;

cmpmsg = flux_msglist_first (uc->msglist);
while (cmpmsg) {
bool match;

if (cancel)
match = flux_cancel_match (msg, cmpmsg);
else
match = flux_disconnect_match (msg, cmpmsg);

if (match) {
if (flux_respond_error (uc->ctx->h, cmpmsg, ENODATA, NULL) < 0)
flux_log_error (uc->ctx->h,
"%s: flux_respond_error",
__FUNCTION__);
/* deletes at cursor */
flux_msglist_delete (uc->msglist);
}

cmpmsg = flux_msglist_next (uc->msglist);
if (cancel) {
if (flux_msglist_cancel (uc->ctx->h, uc->msglist, msg) < 0)
flux_log_error (uc->ctx->h,
"error handling job-info.update-watch-cancel");
}
else {
if (flux_msglist_disconnect (uc->msglist, msg) < 0)
flux_log_error (uc->ctx->h,
"error handling job-info.update-watch disconnect");
}

if (flux_msglist_count (uc->msglist) == 0)
eventlog_watch_cancel (uc);
}

void update_watchers_cancel (struct info_ctx *ctx, const flux_msg_t *msg, bool cancel)
void update_watchers_cancel (struct info_ctx *ctx,
const flux_msg_t *msg,
bool cancel)
{
struct update_ctx *uc;

Expand Down
Loading

0 comments on commit 5a0b0b5

Please sign in to comment.