Skip to content

Commit

Permalink
Merge pull request flux-framework#6523 from chu11/kvs_stream_flag
Browse files Browse the repository at this point in the history
kvs-watch: support FLUX_KVS_STREAM flag
  • Loading branch information
mergify[bot] authored Dec 20, 2024
2 parents c9eb3a8 + 05aa903 commit e10d676
Show file tree
Hide file tree
Showing 9 changed files with 355 additions and 39 deletions.
14 changes: 14 additions & 0 deletions doc/man1/flux-kvs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,13 @@ the value is zero length.
replaced. The :option:`--full` option ensures these changes are reported
as well, at greater overhead.

.. option:: -S, --stream

Return potentially large values in multiple responses. This may improve
response times of very large values in the KVS.

Will not work in combination with :option:`--watch`.

put
---

Expand Down Expand Up @@ -506,6 +513,13 @@ Display the contents of an RFC 18 KVS eventlog referred to by *key*.
'auto', 'never', or 'always'. The default value of *WHEN* if omitted is
'always'. The default is 'auto' if the option is unused.

.. option:: -S, --stream

Return potentially large eventlogs in multiple responses. This may improve
response times of very large eventlogs in the KVS.

Will not work in combination with :option:`--watch`.

eventlog append
---------------

Expand Down
4 changes: 4 additions & 0 deletions doc/man3/flux_kvs_lookup.rst
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ FLUX_KVS_WAITCREATE
fulfilled with an ENODATA error to ensure the cancel request has been
received and processed.

FLUX_KVS_STREAM
Return a potentially large value in multiple responses terminated
by an ENODATA error response.


RETURN VALUE
============
Expand Down
54 changes: 44 additions & 10 deletions src/cmd/flux-kvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ static struct optparse_option get_opts[] = {
{ .name = "count", .key = 'c', .has_arg = 1, .arginfo = "COUNT",
.usage = "Display at most COUNT changes",
},
{ .name = "stream", .key = 'S', .has_arg = 0,
.usage = "Return potentially large values in multiple responses."
},
OPTPARSE_TABLE_END
};

Expand Down Expand Up @@ -287,7 +290,7 @@ static struct optparse_subcommand subcommands[] = {
NULL,
},
{ "get",
"[-N ns] [-r|-t] [-a treeobj] [-l] [-W] [-w] [-u] [-A] [-f] "
"[-N ns] [-r|-t] [-a treeobj] [-l] [-W] [-w] [-u] [-A] [-f] [-S] "
"[-c COUNT] key [key...]",
"Get value stored under key",
cmd_get,
Expand Down Expand Up @@ -686,6 +689,7 @@ struct lookup_ctx {
optparse_t *p;
int maxcount;
int count;
bool have_output;
const char *ns;
};

Expand All @@ -695,9 +699,13 @@ void lookup_continuation (flux_future_t *f, void *arg)
struct lookup_ctx *ctx = arg;
const char *key = flux_kvs_lookup_get_key (f);

if (optparse_hasopt (ctx->p, "watch")
if ((optparse_hasopt (ctx->p, "watch")
|| optparse_hasopt (ctx->p, "stream"))
&& flux_rpc_get (f, NULL) < 0
&& errno == ENODATA) {
if (optparse_hasopt (ctx->p, "stream")
&& ctx->have_output)
printf ("\n");
flux_future_destroy (f);
return; // EOF
}
Expand Down Expand Up @@ -726,8 +734,13 @@ void lookup_continuation (flux_future_t *f, void *arg)
log_err_exit ("%s", key);
if (optparse_hasopt (ctx->p, "label"))
printf ("%s=", key);
if (value)
printf ("%s\n", value);
if (value) {
if (optparse_hasopt (ctx->p, "stream"))
printf ("%s", value);
else
printf ("%s\n", value);
ctx->have_output = true;
}
}
fflush (stdout);
if (optparse_hasopt (ctx->p, "watch")) {
Expand All @@ -737,6 +750,9 @@ void lookup_continuation (flux_future_t *f, void *arg)
log_err_exit ("flux_kvs_lookup_cancel");
}
}
else if (optparse_hasopt (ctx->p, "stream")) {
flux_future_reset (f);
}
else
flux_future_destroy (f);
}
Expand All @@ -759,6 +775,8 @@ void cmd_get_one (flux_t *h, const char *key, struct lookup_ctx *ctx)
}
if (optparse_hasopt (ctx->p, "waitcreate"))
flags |= FLUX_KVS_WAITCREATE;
if (optparse_hasopt (ctx->p, "stream"))
flags |= FLUX_KVS_STREAM;
if (optparse_hasopt (ctx->p, "at")) {
const char *reference = optparse_get_str (ctx->p, "at", NULL);
if (!(f = flux_kvs_lookupat (h, flags, key, reference)))
Expand Down Expand Up @@ -790,6 +808,7 @@ int cmd_get (optparse_t *p, int argc, char **argv)
ctx.p = p;
ctx.count = 0;
ctx.maxcount = optparse_get_int (p, "count", 0);
ctx.have_output = false;
ctx.ns = optparse_get_str (p, "namespace", NULL);

if (!(h = flux_open (NULL, 0)))
Expand Down Expand Up @@ -1985,11 +2004,12 @@ void eventlog_get_continuation (flux_future_t *f, void *arg)
flux_error_t error;
bool limit_reached = false;

/* Handle canceled lookup (FLUX_KVS_WATCH flag only).
/* Handle canceled (FLUX_KVS_WATCH) or finished (FLUX_KVS_STREAM) lookup.
* Destroy the future and return (reactor will then terminate).
* Errors other than ENODATA are handled by the flux_kvs_lookup_get().
*/
if (optparse_hasopt (ctx->p, "watch")
if ((optparse_hasopt (ctx->p, "watch")
|| optparse_hasopt (ctx->p, "stream"))
&& flux_rpc_get (f, NULL) < 0
&& errno == ENODATA) {
flux_future_destroy (f);
Expand All @@ -2003,12 +2023,18 @@ void eventlog_get_continuation (flux_future_t *f, void *arg)
log_err_exit ("eventlog_decode");

json_array_foreach (a, index, value) {
if (ctx->maxcount == 0 || ctx->count < ctx->maxcount) {
if (optparse_hasopt (ctx->p, "watch")) {
if (ctx->maxcount == 0 || ctx->count < ctx->maxcount) {
if (eventlog_entry_dumpf (ctx->evf, stdout, &error, value) < 0)
log_msg ("failed to print eventlog entry: %s", error.text);
}
if (ctx->maxcount > 0 && ++ctx->count == ctx->maxcount)
limit_reached = true;
}
else {
if (eventlog_entry_dumpf (ctx->evf, stdout, &error, value) < 0)
log_msg ("failed to print eventlog entry: %s", error.text);
}
if (ctx->maxcount > 0 && ++ctx->count == ctx->maxcount)
limit_reached = true;
}

fflush (stdout);
Expand All @@ -2024,6 +2050,9 @@ void eventlog_get_continuation (flux_future_t *f, void *arg)
log_err_exit ("flux_kvs_lookup_cancel");
}
}
else if (optparse_hasopt (ctx->p, "stream")) {
flux_future_reset (f);
}
else
flux_future_destroy (f);

Expand Down Expand Up @@ -2053,6 +2082,8 @@ int cmd_eventlog_get (optparse_t *p, int argc, char **argv)
}
if (optparse_hasopt (p, "waitcreate"))
flags |= FLUX_KVS_WAITCREATE;
if (optparse_hasopt (p, "stream"))
flags |= FLUX_KVS_STREAM;

ctx.p = p;
ctx.count = 0;
Expand Down Expand Up @@ -2225,6 +2256,9 @@ static struct optparse_option eventlog_get_opts[] = {
.usage = "Colorize output when supported; WHEN can be 'always' "
"(default if omitted), 'never', or 'auto' (default)."
},
{ .name = "stream", .key = 'S', .has_arg = 0,
.usage = "Return potentially large values in multiple responses."
},
OPTPARSE_TABLE_END
};

Expand Down Expand Up @@ -2267,7 +2301,7 @@ static struct optparse_subcommand eventlog_subcommands[] = {
eventlog_append_opts,
},
{ "get",
"[-N ns] [-u] [-W] [-w] [-c COUNT] [-H] [-L auto|always|never] key",
"[-N ns] [-u] [-W] [-w] [-c COUNT] [-H] [-S] [-L auto|always|never] key",
"Get eventlog",
cmd_eventlog_get,
0,
Expand Down
3 changes: 2 additions & 1 deletion src/common/libkvs/kvs.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ enum kvs_op {
FLUX_KVS_APPEND = 32,
FLUX_KVS_WATCH_FULL = 64,
FLUX_KVS_WATCH_UNIQ = 128,
FLUX_KVS_WATCH_APPEND = 256
FLUX_KVS_WATCH_APPEND = 256,
FLUX_KVS_STREAM = 512
};

/* Namespace
Expand Down
21 changes: 14 additions & 7 deletions src/common/libkvs/kvs_lookup.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,21 @@ static int validate_lookup_flags (int flags, bool watch_ok)
if ((flags & FLUX_KVS_WATCH_FLAGS)
&& !(flags & FLUX_KVS_WATCH))
return -1;
/* FLUX_KVS_WAITCREATE does not require FLUX_KVS_WATCH to be set,
* but it requires that we be able to communicate with the
* kvs-watch module, so we use the watch_ok bool here.
/* FLUX_KVS_WAITCREATE and FLUX_KVS_STREAM do not require
* FLUX_KVS_WATCH to be set, but it requires that we be able to
* communicate with the kvs-watch module, so we use the watch_ok
* bool here.
*/
if ((flags & FLUX_KVS_WAITCREATE) && !watch_ok)
if (((flags & FLUX_KVS_WAITCREATE)
|| (flags & FLUX_KVS_STREAM))
&& !watch_ok)
return -1;

flags &= ~FLUX_KVS_WATCH;
flags &= ~(FLUX_KVS_WATCH_FLAGS);

flags &= ~FLUX_KVS_WAITCREATE;
flags &= ~FLUX_KVS_STREAM;

switch (flags) {
case 0:
Expand Down Expand Up @@ -129,9 +133,11 @@ flux_future_t *flux_kvs_lookup (flux_t *h,
if (!(ctx = alloc_ctx (h, flags, key)))
return NULL;
if ((flags & FLUX_KVS_WATCH)
|| (flags & FLUX_KVS_WAITCREATE))
|| (flags & FLUX_KVS_WAITCREATE)
|| (flags & FLUX_KVS_STREAM))
topic = "kvs-watch.lookup"; // redirect to kvs-watch module
if ((flags & FLUX_KVS_WATCH))
if ((flags & FLUX_KVS_WATCH)
|| (flags & FLUX_KVS_STREAM))
rpc_flags |= FLUX_RPC_STREAMING;
if (!(f = flux_rpc_pack (h,
topic,
Expand Down Expand Up @@ -420,7 +426,8 @@ int flux_kvs_lookup_cancel (flux_future_t *f)
if (!f
|| !(ctx = flux_future_aux_get (f, auxkey))
|| (!(ctx->flags & FLUX_KVS_WATCH)
&& !(ctx->flags & FLUX_KVS_WAITCREATE))) {
&& !(ctx->flags & FLUX_KVS_WAITCREATE)
&& !(ctx->flags & FLUX_KVS_STREAM))) {
errno = EINVAL;
return -1;
}
Expand Down
Loading

0 comments on commit e10d676

Please sign in to comment.