Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

job-info: misc cleanup #5586

Merged
merged 9 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 66 additions & 42 deletions src/modules/job-info/guest_watch.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ struct guest_watch_ctx {
flux_jobid_t id;
char *path;
int flags;
bool cancel;
bool eventlog_watch_canceled;
bool cancel; /* cancel or disconnect */

/* transition possibilities
*
Expand Down Expand Up @@ -178,12 +179,16 @@ static struct guest_watch_ctx *guest_watch_ctx_create (struct info_ctx *ctx,
return NULL;
}

static int send_cancel (struct guest_watch_ctx *gw, flux_future_t *f)
static int send_eventlog_watch_cancel (struct guest_watch_ctx *gw,
flux_future_t *f,
bool cancel)
{
if (!gw->cancel) {
if (!gw->eventlog_watch_canceled) {
flux_future_t *f2;
int matchtag;

gw->cancel = cancel;

if (!f) {
if (gw->state == GUEST_WATCH_STATE_WAIT_GUEST_NAMESPACE)
f = gw->wait_guest_namespace_f;
Expand All @@ -192,20 +197,23 @@ static int send_cancel (struct guest_watch_ctx *gw, flux_future_t *f)
else if (gw->state == GUEST_WATCH_STATE_MAIN_NAMESPACE_LOOKUP) {
/* Since this is a lookup, we don't need to perform an actual
* cancel to "job-info.eventlog-watch-cancel". Just return
* ENODATA to the caller.
* ENODATA to the caller if necessary.
*/
gw->cancel = true;
if (flux_respond_error (gw->ctx->h,
gw->msg,
ENODATA,
NULL) < 0)
flux_log_error (gw->ctx->h, "%s: flux_respond_error",
__FUNCTION__);
gw->eventlog_watch_canceled = true;
if (gw->cancel) {
if (flux_respond_error (gw->ctx->h,
gw->msg,
ENODATA,
NULL) < 0)
flux_log_error (gw->ctx->h, "%s: flux_respond_error",
__FUNCTION__);
}
return 0;
}
else {
/* gw->state == GUEST_WATCH_STATE_INIT */
gw->cancel = true;
/* gw->state == GUEST_WATCH_STATE_INIT, eventlog-watch
* never started so sort of "auto-canceled" */
gw->eventlog_watch_canceled = true;
return 0;
}
}
Expand All @@ -222,7 +230,7 @@ static int send_cancel (struct guest_watch_ctx *gw, flux_future_t *f)
return -1;
}
flux_future_destroy (f2);
gw->cancel = true;
gw->eventlog_watch_canceled = true;
}
return 0;
}
Expand Down Expand Up @@ -323,9 +331,11 @@ static void get_main_eventlog_continuation (flux_future_t *f, void *arg)
goto error;
}

if (gw->cancel) {
if (flux_respond_error (ctx->h, gw->msg, ENODATA, NULL) < 0)
flux_log_error (ctx->h, "%s: flux_respond_error", __FUNCTION__);
if (gw->eventlog_watch_canceled) {
if (gw->cancel) {
if (flux_respond_error (ctx->h, gw->msg, ENODATA, NULL) < 0)
flux_log_error (ctx->h, "%s: flux_respond_error", __FUNCTION__);
}
goto done;
}

Expand Down Expand Up @@ -455,9 +465,11 @@ static void wait_guest_namespace_continuation (flux_future_t *f, void *arg)
if (gw->guest_started) {
/* check for racy cancel - user canceled while this
* error was in transit */
if (gw->cancel) {
if (gw->eventlog_watch_canceled) {
errno = ENODATA;
goto error;
if (gw->cancel)
goto error;
goto cleanup;
}
if (guest_namespace_watch (gw) < 0)
goto error;
Expand All @@ -470,9 +482,11 @@ static void wait_guest_namespace_continuation (flux_future_t *f, void *arg)
goto error;
}

if (gw->cancel) {
if (gw->eventlog_watch_canceled) {
errno = ENODATA;
goto error;
if (gw->cancel)
goto error;
goto cleanup;
}

if (flux_job_event_watch_get (f, &event) < 0) {
Expand All @@ -488,8 +502,8 @@ static void wait_guest_namespace_continuation (flux_future_t *f, void *arg)
flux_future_t *f2;

/* cancel this watcher, and once its canceled, watch the guest
* namespace. Don't call send_cancel(), this is not an error
* or "full" cancel */
* namespace. Don't call send_eventlog_watch_cancel(), this
* is not an error or "full" cancel */
if (!(f2 = flux_rpc_pack (gw->ctx->h,
"job-info.eventlog-watch-cancel",
FLUX_NODEID_ANY,
Expand All @@ -508,16 +522,18 @@ static void wait_guest_namespace_continuation (flux_future_t *f, void *arg)
error_cancel:
/* If we haven't sent a cancellation yet, must do so so that
* the future's matchtag will eventually be freed */
if (!gw->cancel) {
if (!gw->eventlog_watch_canceled) {
int save_errno = errno;
(void) send_cancel (gw, gw->wait_guest_namespace_f);
(void) send_eventlog_watch_cancel (gw,
gw->wait_guest_namespace_f,
false);
errno = save_errno;
}

error:
if (flux_respond_error (ctx->h, gw->msg, errno, NULL) < 0)
flux_log_error (ctx->h, "%s: flux_respond_error", __FUNCTION__);

cleanup:
/* flux future destroyed in guest_watch_ctx_destroy, which is
* called via zlist_remove() */
zlist_remove (ctx->guest_watchers, gw);
Expand Down Expand Up @@ -591,28 +607,33 @@ static void guest_namespace_watch_continuation (flux_future_t *f, void *arg)
*/
/* check for racy cancel - user canceled while this
* error was in transit */
if (gw->cancel) {
if (gw->eventlog_watch_canceled) {
errno = ENODATA;
goto error;
if (gw->cancel)
goto error;
goto cleanup;
}
if (main_namespace_lookup (gw) < 0)
goto error;
return;
}
else {
/* We assume ENODATA always comes from a user cancellation,
* or similar error. There is no circumstance where would
* desire to ENODATA this stream.
/* Generally speaking we assume ENODATA always comes from
* a user cancellation, or similar error. There is no
* circumstance where would desire to ENODATA this stream.
*/
if (errno != ENOENT && errno != ENODATA)
flux_log_error (ctx->h, "%s: flux_rpc_get", __FUNCTION__);
goto error;
}
}

if (gw->cancel) {
errno = ENODATA;
goto error;
if (gw->eventlog_watch_canceled) {
if (gw->cancel) {
errno = ENODATA;
goto error;
}
goto cleanup;
}

if (flux_respond_pack (ctx->h, gw->msg, "{s:s}", "event", event) < 0) {
Expand All @@ -628,16 +649,18 @@ static void guest_namespace_watch_continuation (flux_future_t *f, void *arg)
error_cancel:
/* If we haven't sent a cancellation yet, must do so so that
* the future's matchtag will eventually be freed */
if (!gw->cancel) {
if (!gw->eventlog_watch_canceled) {
int save_errno = errno;
(void) send_cancel (gw, gw->guest_namespace_watch_f);
(void) send_eventlog_watch_cancel (gw,
gw->guest_namespace_watch_f,
false);
errno = save_errno;
}

error:
if (flux_respond_error (ctx->h, gw->msg, errno, NULL) < 0)
flux_log_error (ctx->h, "%s: flux_respond_error", __FUNCTION__);

cleanup:
/* flux future destroyed in guest_watch_ctx_destroy, which is called
* via zlist_remove() */
zlist_remove (ctx->guest_watchers, gw);
Expand Down Expand Up @@ -727,13 +750,14 @@ static void main_namespace_lookup_continuation (flux_future_t *f, void *arg)
goto error;
}

if (gw->cancel) {
/* already sent ENODATA via send_cancel(), so just cleanup */
if (gw->eventlog_watch_canceled) {
/* already sent ENODATA via send_eventlog_watch_cancel(), so
* just cleanup */
goto cleanup;
}

input = s + gw->offset;
while (eventlog_parse_next (&input, &tok, &toklen)) {
while (get_next_eventlog_entry (&input, &tok, &toklen)) {
if (flux_respond_pack (ctx->h, gw->msg,
"{s:s#}",
"event", tok, toklen) < 0) {
Expand Down Expand Up @@ -797,7 +821,7 @@ static void guest_watch_cancel (struct info_ctx *ctx,
else
match = flux_disconnect_match (msg, gw->msg);
if (match)
send_cancel (gw, NULL);
send_eventlog_watch_cancel (gw, NULL, cancel);
}

void guest_watchers_cancel (struct info_ctx *ctx,
Expand All @@ -818,7 +842,7 @@ void guest_watch_cleanup (struct info_ctx *ctx)
struct guest_watch_ctx *gw;

while ((gw = zlist_pop (ctx->guest_watchers))) {
send_cancel (gw, NULL);
send_eventlog_watch_cancel (gw, NULL, false);

if (flux_respond_error (ctx->h, gw->msg, ENOSYS, NULL) < 0)
flux_log_error (ctx->h, "%s: flux_respond_error",
Expand Down
63 changes: 25 additions & 38 deletions src/modules/job-info/update.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,8 @@ static struct update_ctx *update_ctx_create (struct info_ctx *ctx,
uc->ctx = ctx;
uc->type = type;
uc->id = id;
if (!(uc->key = strdup (key))) {
errno = ENOMEM;
if (!(uc->key = strdup (key)))
goto error;
}
if (streq (key, "R"))
uc->update_name = "resource-update";
else {
Expand Down Expand Up @@ -291,9 +289,9 @@ static void lookup_continuation (flux_future_t *f, void *arg)
struct info_ctx *ctx = uc->ctx;
const char *key_str;
const char *eventlog_str;
const char *input;
const char *tok;
size_t toklen;
json_t *eventlog = NULL;
size_t index;
json_t *entry;
const char *errmsg = NULL;
bool job_ended = false;
bool submit_parsed = false;
Expand All @@ -317,17 +315,15 @@ static void lookup_continuation (flux_future_t *f, void *arg)
goto error;
}

input = eventlog_str;
while (eventlog_parse_next (&input, &tok, &toklen)) {
json_t *entry = NULL;
if (!(eventlog = eventlog_decode (eventlog_str))) {
errno = EINVAL;
errmsg = "lookup eventlog cannot be parsed";
goto error;
}
json_array_foreach (eventlog, index, entry) {
const char *name;
json_t *context = NULL;
if (eventlog_parse_entry_chunk (uc->ctx->h,
tok,
toklen,
&entry,
&name,
&context) < 0) {
if (eventlog_entry_parse (entry, NULL, &name, &context) < 0) {
errmsg = "error parsing eventlog";
goto error;
}
Expand All @@ -349,7 +345,6 @@ static void lookup_continuation (flux_future_t *f, void *arg)
}
else if (streq (name, "clean"))
job_ended = true;
json_decref (entry);
}

/* double check, generally speaking should be impossible */
Expand Down Expand Up @@ -403,6 +398,7 @@ static void lookup_continuation (flux_future_t *f, void *arg)
if (eventlog_watch (uc) < 0)
goto error;

json_decref (eventlog);
return;

error:
Expand All @@ -422,6 +418,7 @@ static void lookup_continuation (flux_future_t *f, void *arg)
}
else
zlist_remove (ctx->update_lookups, uc);
json_decref (eventlog);
}

static int update_lookup (struct info_ctx *ctx,
Expand Down Expand Up @@ -639,34 +636,24 @@ static void update_watch_cancel (struct update_ctx *uc,
const flux_msg_t *msg,
bool cancel)
{
const flux_msg_t *cmpmsg;

cmpmsg = flux_msglist_first (uc->msglist);
while (cmpmsg) {
bool match;

if (cancel)
match = flux_cancel_match (msg, cmpmsg);
else
match = flux_disconnect_match (msg, cmpmsg);

if (match) {
if (flux_respond_error (uc->ctx->h, cmpmsg, ENODATA, NULL) < 0)
flux_log_error (uc->ctx->h,
"%s: flux_respond_error",
__FUNCTION__);
/* deletes at cursor */
flux_msglist_delete (uc->msglist);
}

cmpmsg = flux_msglist_next (uc->msglist);
if (cancel) {
if (flux_msglist_cancel (uc->ctx->h, uc->msglist, msg) < 0)
flux_log_error (uc->ctx->h,
"error handling job-info.update-watch-cancel");
}
else {
if (flux_msglist_disconnect (uc->msglist, msg) < 0)
flux_log_error (uc->ctx->h,
"error handling job-info.update-watch disconnect");
}

if (flux_msglist_count (uc->msglist) == 0)
eventlog_watch_cancel (uc);
}

void update_watchers_cancel (struct info_ctx *ctx, const flux_msg_t *msg, bool cancel)
void update_watchers_cancel (struct info_ctx *ctx,
const flux_msg_t *msg,
bool cancel)
{
struct update_ctx *uc;

Expand Down
Loading