From 7a256792300b6a4862632d99e338c8fdb505f553 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 3 Feb 2025 08:49:54 -0800 Subject: [PATCH 01/14] sdexec: fix comment grammar Problem: a comment has an extra "to" that makes the sentence incorrect. Drop the extra word. --- src/modules/sdexec/sdexec.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/modules/sdexec/sdexec.c b/src/modules/sdexec/sdexec.c index 38faa53b8988..69e154acffb6 100644 --- a/src/modules/sdexec/sdexec.c +++ b/src/modules/sdexec/sdexec.c @@ -1146,7 +1146,7 @@ static void stats_cb (flux_t *h, * units that were started by that UUID a SIGKILL to begin cleanup. Leave * the request in ctx->requests so the unit can be "reaped". Let normal * cleanup of the request (including generating a response which shouldn't - * hurt) to occur when that happens. + * hurt) occur when that happens. */ static void disconnect_cb (flux_t *h, flux_msg_handler_t *mh, From 09587c900eed63c7ce5be6529b72196316dcb89e Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 3 Feb 2025 10:54:41 -0800 Subject: [PATCH 02/14] sdbus: clean up error handling Problem: the timer used by sdbus_connect() is hard to modify because of the embedded error handling. Extract a function for building the user bus path for the error log. Now the timer is a bit simpler. --- src/modules/sdbus/connect.c | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/src/modules/sdbus/connect.c b/src/modules/sdbus/connect.c index 6665551c1bf2..5fc5d12238fc 100644 --- a/src/modules/sdbus/connect.c +++ b/src/modules/sdbus/connect.c @@ -14,7 +14,10 @@ #if HAVE_CONFIG_H #include "config.h" #endif - +#include +#ifndef HAVE_STRLCPY +#include "src/common/libmissing/strlcpy.h" +#endif #include #include @@ -58,6 +61,18 @@ static void bus_destroy (sd_bus *bus) } } +static void make_user_bus_path (char *buf, size_t size) +{ + char *path; + + if ((path = getenv ("DBUS_SESSION_BUS_ADDRESS"))) + strlcpy (buf, path, size); + else if ((path = getenv ("XDG_RUNTIME_DIR"))) + snprintf (buf, size, "unix:path:%s/bus", path); + else + strlcpy (buf, "sd_bus_open_user", size); +} + /* The timer callback calls sd_bus_open_user(). If it succeeds, the future * is fulfilled. If it fails, the timer is re-armed for a calculated timeout. * Retries proceed forever. If they need to be capped, this can be done by @@ -80,16 +95,8 @@ static void timer_cb (flux_reactor_t *r, timeout = sdc->retry_max; if ((e = sd_bus_open_user (&bus)) < 0) { - char buf[1024]; - const char *path = getenv ("DBUS_SESSION_BUS_ADDRESS"); - if (!path) { - if ((path = getenv ("XDG_RUNTIME_DIR"))) { - snprintf (buf, sizeof (buf), "unix:path:%s/bus", path); - path = buf; - } - } - if (!path) - path = "sd_bus_open_user"; + char path[1024]; + make_user_bus_path (path, sizeof (path)); flux_log (sdc->h, LOG_INFO, "%s: %s (retrying in %.0fs)", From 9326af6e478f499704e2a06a9e0a385c5842f753 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 3 Feb 2025 11:10:56 -0800 Subject: [PATCH 03/14] sdbus: add system module option Problem: the sdbus module is hardwired to connect to a systemd user instance, but Flux now has "work" running in the systemd system instance as well (prolog, epilog, housekeeping). Add a "system" module option which directs sdbus to connect to the systemd system instance instead. Future commits will allow a second instance of the sdbus module to be loaded with this option so access to both systemd instances can be handled concurrently. --- src/modules/sdbus/connect.c | 28 ++++++++++++++++++++++++---- src/modules/sdbus/connect.h | 3 ++- src/modules/sdbus/main.c | 2 +- src/modules/sdbus/sdbus.c | 36 +++++++++++++++++++++++++++++++++--- src/modules/sdbus/sdbus.h | 5 ++++- 5 files changed, 64 insertions(+), 10 deletions(-) diff --git a/src/modules/sdbus/connect.c b/src/modules/sdbus/connect.c index 5fc5d12238fc..c5aa69543e0c 100644 --- a/src/modules/sdbus/connect.c +++ b/src/modules/sdbus/connect.c @@ -29,6 +29,7 @@ struct sdconnect { double retry_min; double retry_max; bool first_time; + bool system_bus; }; static void sdconnect_destroy (struct sdconnect *sdc) @@ -61,6 +62,16 @@ static void bus_destroy (sd_bus *bus) } } +static void make_system_bus_path (char *buf, size_t size) +{ + char *path; + + if ((path = getenv ("DBUS_SYSTEM_BUS_ADDRESS"))) + strlcpy (buf, path, size); + else + strlcpy (buf, "sd_bus_open_system", size); +} + static void make_user_bus_path (char *buf, size_t size) { char *path; @@ -88,15 +99,22 @@ static void timer_cb (flux_reactor_t *r, sd_bus *bus; int e; double timeout; + char path[1024]; sdc->attempt++; timeout = sdc->retry_min * sdc->attempt; if (timeout > sdc->retry_max) timeout = sdc->retry_max; - if ((e = sd_bus_open_user (&bus)) < 0) { - char path[1024]; + if (sdc->system_bus) { + make_system_bus_path (path, sizeof (path)); + e = sd_bus_open_system (&bus); + } + else { make_user_bus_path (path, sizeof (path)); + e = sd_bus_open_user (&bus); + } + if (e < 0) { flux_log (sdc->h, LOG_INFO, "%s: %s (retrying in %.0fs)", @@ -105,7 +123,7 @@ static void timer_cb (flux_reactor_t *r, timeout); goto retry; } - flux_log (sdc->h, LOG_INFO, "connected"); + flux_log (sdc->h, LOG_INFO, "%s: connected", path); flux_future_fulfill (f, bus, (flux_free_f)bus_destroy); sdc->attempt = 0; return; @@ -148,7 +166,8 @@ static void initialize_cb (flux_future_t *f, void *arg) flux_future_t *sdbus_connect (flux_t *h, bool first_time, double retry_min, - double retry_max) + double retry_max, + bool system_bus) { flux_future_t *f; struct sdconnect *sdc = NULL; @@ -166,6 +185,7 @@ flux_future_t *sdbus_connect (flux_t *h, sdc->retry_min = retry_min; sdc->retry_max = retry_max; sdc->first_time = first_time; + sdc->system_bus = system_bus; flux_future_set_flux (f, h); return f; error: diff --git a/src/modules/sdbus/connect.h b/src/modules/sdbus/connect.h index faa9a00fcf60..a40999c973a9 100644 --- a/src/modules/sdbus/connect.h +++ b/src/modules/sdbus/connect.h @@ -27,7 +27,8 @@ flux_future_t *sdbus_connect (flux_t *h, bool first_time, double retry_min, - double retry_max); + double retry_max, + bool system_bus); #endif /* !_SDBUS_CONNECT_H */ diff --git a/src/modules/sdbus/main.c b/src/modules/sdbus/main.c index 9c51acb35286..1ec31b8e04dc 100644 --- a/src/modules/sdbus/main.c +++ b/src/modules/sdbus/main.c @@ -33,7 +33,7 @@ int mod_main (flux_t *h, int argc, char **argv) flux_error_t error; int rc = -1; - if (!(ctx = sdbus_ctx_create (h, &error))) { + if (!(ctx = sdbus_ctx_create (h, argc, argv, &error))) { flux_log (h, LOG_ERR, "%s", error.text); goto error; } diff --git a/src/modules/sdbus/sdbus.c b/src/modules/sdbus/sdbus.c index 6fd678c4718e..f01da102ea0b 100644 --- a/src/modules/sdbus/sdbus.c +++ b/src/modules/sdbus/sdbus.c @@ -31,6 +31,7 @@ #include "sdbus.h" struct sdbus_ctx { + bool system_bus; // connect to system bus instead of user bus flux_future_t *f_conn; // owns ctx->bus sd_bus *bus; flux_watcher_t *bus_w; @@ -722,13 +723,33 @@ static void sdbus_recover (struct sdbus_ctx *ctx, const char *reason) * libsystemd complaining about unexpected internal states(?) and the * occasional segfault. */ - if (!(ctx->f_conn = sdbus_connect (ctx->h, false, retry_min, retry_max)) + if (!(ctx->f_conn = sdbus_connect (ctx->h, + false, + retry_min, + retry_max, + ctx->system_bus)) || flux_future_then (ctx->f_conn, -1, connect_continuation, ctx) < 0) { flux_log_error (ctx->h, "error starting bus connect"); flux_reactor_stop_error (flux_get_reactor (ctx->h)); } } +static int parse_module_args (struct sdbus_ctx *ctx, + int argc, + char **argv, + flux_error_t *error) +{ + for (int i = 0; i < argc; i++) { + if (streq (argv[i], "system")) + ctx->system_bus = true; + else { + errprintf (error, "unknown module option: %s", argv[i]); + return -1; + } + } + return 0; +} + void sdbus_ctx_destroy (struct sdbus_ctx *ctx) { if (ctx) { @@ -755,15 +776,24 @@ void sdbus_ctx_destroy (struct sdbus_ctx *ctx) } } -struct sdbus_ctx *sdbus_ctx_create (flux_t *h, flux_error_t *error) +struct sdbus_ctx *sdbus_ctx_create (flux_t *h, + int argc, + char **argv, + flux_error_t *error) { struct sdbus_ctx *ctx; if (!(ctx = calloc (1, sizeof (*ctx)))) goto error_create; + if (parse_module_args (ctx, argc, argv, error) < 0) + goto error; if (sdbus_configure (ctx, flux_get_conf (h), error) < 0) goto error; - if (!(ctx->f_conn = sdbus_connect (h, true, retry_min, retry_max)) + if (!(ctx->f_conn = sdbus_connect (h, + true, + retry_min, + retry_max, + ctx->system_bus)) || flux_future_then (ctx->f_conn, -1, connect_continuation, ctx) < 0 || flux_msg_handler_addvec (h, htab, ctx, &ctx->handlers) < 0 || !(ctx->requests = flux_msglist_create ()) diff --git a/src/modules/sdbus/sdbus.h b/src/modules/sdbus/sdbus.h index 3252b4908006..b1763719d234 100644 --- a/src/modules/sdbus/sdbus.h +++ b/src/modules/sdbus/sdbus.h @@ -13,7 +13,10 @@ #include -struct sdbus_ctx *sdbus_ctx_create (flux_t *h, flux_error_t *error); +struct sdbus_ctx *sdbus_ctx_create (flux_t *h, + int argc, + char **argv, + flux_error_t *error); void sdbus_ctx_destroy (struct sdbus_ctx *ctx); #endif /* !_SDBUS_SDBUS_H */ From 88c93a8c570df6b95cc06e10f7b01e88d2017505 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 3 Feb 2025 12:10:23 -0800 Subject: [PATCH 04/14] testsuite: cover sdbus system option Problem: the sdbus system option has no coverage. Amend the 2407-sdbus.t test with a simple test of "system mode". --- t/t2407-sdbus.t | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/t/t2407-sdbus.t b/t/t2407-sdbus.t index 06209850c1a0..9d3c6aa26bd3 100755 --- a/t/t2407-sdbus.t +++ b/t/t2407-sdbus.t @@ -20,7 +20,7 @@ fi test_under_flux 2 minimal -flux setattr log-stderr-level 1 +flux setattr log-stderr-level 3 # # N.B. ListUnitsByPatterns response payload is a 'params' array whose first @@ -127,6 +127,12 @@ test_expect_success 'sdbus reconfig fails with bad sdbus-debug value' ' EOT grep "Expected true or false" config.err ' +test_expect_success 'restore correct config in case we reload later' ' + flux config load <<-EOT + [systemd] + sdbus-debug = true + EOT +' test_expect_success 'sdbus list-units works' ' count=$(bus_count_units "*") && @@ -317,7 +323,15 @@ test_expect_success 'list from rank 1 is restricted' ' grep "not allowed" list1.err ' +test_expect_success 'reload sdbus module wtih system option' ' + flux module reload sdbus system +' +test_expect_success 'list system units works' ' + flux python ./list.py >/dev/null +' test_expect_success 'remove sdbus module' ' flux module remove sdbus ' + + test_done From 9e3c7419e54ea77f16577f1bf95a502b30737516 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 3 Feb 2025 12:38:46 -0800 Subject: [PATCH 05/14] sdbus: allow module to be loaded under any name Problem: the sdbus module can only be loaded once because it uses an explicit service name. Drop the outdated MOD_NAME() symbol declaration. Register methods in a way that lets the default service name change. Update the self-contacting "subscribe" composite RPC to determine the topic string to contact programmatically. Now the module can be loaded as many times as we like using e.g. flux module load --name NEWNAME sdbus --- src/modules/sdbus/main.c | 2 -- src/modules/sdbus/sdbus.c | 15 ++++++++------- src/modules/sdbus/subscribe.c | 10 ++++++++-- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/src/modules/sdbus/main.c b/src/modules/sdbus/main.c index 1ec31b8e04dc..370108da7068 100644 --- a/src/modules/sdbus/main.c +++ b/src/modules/sdbus/main.c @@ -52,6 +52,4 @@ int mod_main (flux_t *h, int argc, char **argv) #endif } -MOD_NAME ("sdbus"); - // vi:ts=4 sw=4 expandtab diff --git a/src/modules/sdbus/sdbus.c b/src/modules/sdbus/sdbus.c index f01da102ea0b..6a4e532b1124 100644 --- a/src/modules/sdbus/sdbus.c +++ b/src/modules/sdbus/sdbus.c @@ -561,32 +561,32 @@ static void reload_cb (flux_t *h, static struct flux_msg_handler_spec htab[] = { { FLUX_MSGTYPE_REQUEST, - "sdbus.disconnect", + "disconnect", disconnect_cb, 0 }, { FLUX_MSGTYPE_REQUEST, - "sdbus.call", + "call", call_cb, 0 }, { FLUX_MSGTYPE_REQUEST, - "sdbus.subscribe", + "subscribe", subscribe_cb, 0 }, { FLUX_MSGTYPE_REQUEST, - "sdbus.subscribe-cancel", + "subscribe-cancel", subscribe_cancel_cb, 0 }, { FLUX_MSGTYPE_REQUEST, - "sdbus.reconnect", + "reconnect", reconnect_cb, 0 }, { FLUX_MSGTYPE_REQUEST, - "sdbus.config-reload", + "config-reload", reload_cb, 0 }, @@ -782,6 +782,7 @@ struct sdbus_ctx *sdbus_ctx_create (flux_t *h, flux_error_t *error) { struct sdbus_ctx *ctx; + const char *name = flux_aux_get (h, "flux::name"); if (!(ctx = calloc (1, sizeof (*ctx)))) goto error_create; @@ -795,7 +796,7 @@ struct sdbus_ctx *sdbus_ctx_create (flux_t *h, retry_max, ctx->system_bus)) || flux_future_then (ctx->f_conn, -1, connect_continuation, ctx) < 0 - || flux_msg_handler_addvec (h, htab, ctx, &ctx->handlers) < 0 + || flux_msg_handler_addvec_ex (h, name, htab, ctx, &ctx->handlers) < 0 || !(ctx->requests = flux_msglist_create ()) || !(ctx->subscribers = flux_msglist_create ()) || flux_get_rank (h, &ctx->rank) < 0) diff --git a/src/modules/sdbus/subscribe.c b/src/modules/sdbus/subscribe.c index f8e8698b4c5a..e3ada43e84f0 100644 --- a/src/modules/sdbus/subscribe.c +++ b/src/modules/sdbus/subscribe.c @@ -27,13 +27,16 @@ static void subscribe_continuation (flux_future_t *f1, void *arg) flux_t *h = flux_future_get_flux (f1); const char *errmsg = NULL; flux_future_t *f2; + const char *name = flux_aux_get (h, "flux::name"); + char topic[128]; if (flux_rpc_get (f1, NULL) < 0) { errmsg = future_strerror (f1, errno); goto error; } + snprintf (topic, sizeof (topic), "%s.call", name); if (!(f2 = flux_rpc_pack (h, - "sdbus.call", + topic, FLUX_NODEID_ANY, 0, "{s:s s:s s:s s:s s:[s]}", @@ -58,9 +61,12 @@ flux_future_t *sdbus_subscribe (flux_t *h) { flux_future_t *f1; flux_future_t *fc; + const char *name = flux_aux_get (h, "flux::name"); + char topic[128]; + snprintf (topic, sizeof (topic), "%s.call", name); if (!(f1 = flux_rpc_pack (h, - "sdbus.call", + topic, FLUX_NODEID_ANY, 0, "{s:s s:[]}", From 47a22f4261505f464fda4a639ba4bd26e1322ba5 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 3 Feb 2025 12:53:49 -0800 Subject: [PATCH 06/14] testsuite: cover sdbus-sys Problem: there are no tests for loading sdbus under a different name Modify the system test to load sdbus under the name "sdbus-sys" in system mode instead of reloading the module. Show that it works for listing units in the system instance. --- t/t2407-sdbus.t | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/t/t2407-sdbus.t b/t/t2407-sdbus.t index 9d3c6aa26bd3..c75d3ee7e0e2 100755 --- a/t/t2407-sdbus.t +++ b/t/t2407-sdbus.t @@ -311,23 +311,27 @@ test_expect_success 'create list script' ' cat >list.py <<-EOT && import sys import flux - print(flux.Flux().rpc("sdbus.call",{"member":"ListUnitsByPatterns","params":[[],["*"]]}).get_str()) + print(flux.Flux().rpc(sys.argv[1] + ".call",{"member":"ListUnitsByPatterns","params":[[],["*"]]}).get_str()) EOT chmod +x list.py ' test_expect_success 'list from rank 0 is allowed' ' - flux python ./list.py >/dev/null + flux python ./list.py sdbus >/dev/null ' test_expect_success 'list from rank 1 is restricted' ' - test_must_fail flux exec -r 1 flux python ./list.py 2>list1.err && + test_must_fail flux exec -r 1 \ + flux python ./list.py sdbus 2>list1.err && grep "not allowed" list1.err ' -test_expect_success 'reload sdbus module wtih system option' ' - flux module reload sdbus system +test_expect_success 'load sdbus-sys module' ' + flux module load --name sdbus-sys sdbus system ' test_expect_success 'list system units works' ' - flux python ./list.py >/dev/null + flux python ./list.py sdbus-sys >/dev/null +' +test_expect_success 'remove sdbus-sys module' ' + flux module remove sdbus-sys ' test_expect_success 'remove sdbus module' ' flux module remove sdbus From 7e98ecda9367add46ffa2d1ba24276499d7378d0 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 3 Feb 2025 13:32:19 -0800 Subject: [PATCH 07/14] rc: load sdbus-sys Problem: when the system is configured to use systemd, sdbus is only loaded for the systemd user instance. Load sdbus-sys as well. --- etc/rc1 | 1 + etc/rc3 | 1 + 2 files changed, 2 insertions(+) diff --git a/etc/rc1 b/etc/rc1 index 9f6e967f58fd..da2ec215215f 100755 --- a/etc/rc1 +++ b/etc/rc1 @@ -26,6 +26,7 @@ modload all content modload all barrier if test "$(flux config get --default=false systemd.enable)" = "true"; then modload all sdbus + modload all --name sdbus-sys sdbus system modload all sdexec fi diff --git a/etc/rc3 b/etc/rc3 index 8977b402daaf..6d9476c38697 100755 --- a/etc/rc3 +++ b/etc/rc3 @@ -38,6 +38,7 @@ modrm all job-ingest modrm 0 cron modrm all sdexec +modrm all sdbus-sys modrm all sdbus modrm all barrier From b8bf11209a8130f8e74889e9519c28cb6c0e3b7a Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 3 Feb 2025 15:43:39 -0800 Subject: [PATCH 08/14] libsdexec: add service name to some RPC functions Problem: some libsdexec RPCs can now be directed to different services to reach the systemd system or user instance. Add a service parameter to the following functions: sdexec_list_units() sdexec_property_get() sdexec_property_get_all() sdexec_property_changed() Update sdexec. Update tests. --- src/common/libsdexec/list.c | 11 ++++++++--- src/common/libsdexec/list.h | 1 + src/common/libsdexec/property.c | 19 ++++++++++++++----- src/common/libsdexec/property.h | 3 +++ src/common/libsdexec/test/list.c | 7 +++++-- src/common/libsdexec/test/property.c | 26 ++++++++++++++++++++------ src/modules/sdexec/sdexec.c | 1 + 7 files changed, 52 insertions(+), 16 deletions(-) diff --git a/src/common/libsdexec/list.c b/src/common/libsdexec/list.c index 531a77835f82..7d6d45ebba78 100644 --- a/src/common/libsdexec/list.c +++ b/src/common/libsdexec/list.c @@ -66,14 +66,19 @@ bool sdexec_list_units_next (flux_future_t *f, struct unit_info *infop) /* N.B. Input params: states=[], patterns=[pattern]. */ -flux_future_t *sdexec_list_units (flux_t *h, uint32_t rank, const char *pattern) +flux_future_t *sdexec_list_units (flux_t *h, + const char *service, + uint32_t rank, + const char *pattern) { - if (!h || !pattern) { + if (!h || !pattern || !service) { errno = EINVAL; return NULL; } + char topic[256]; + snprintf (topic, sizeof (topic), "%s.call", service); return flux_rpc_pack (h, - "sdbus.call", + topic, rank, 0, "{s:s s:[[] [s]]}", diff --git a/src/common/libsdexec/list.h b/src/common/libsdexec/list.h index 115767ff5d51..dc53de395fcb 100644 --- a/src/common/libsdexec/list.h +++ b/src/common/libsdexec/list.h @@ -32,6 +32,7 @@ struct unit_info { * (E.g. use "*" for all). */ flux_future_t *sdexec_list_units (flux_t *h, + const char *service, uint32_t rank, const char *pattern); diff --git a/src/common/libsdexec/property.c b/src/common/libsdexec/property.c index 82ffdcac3b2a..cfa08e919b00 100644 --- a/src/common/libsdexec/property.c +++ b/src/common/libsdexec/property.c @@ -32,17 +32,20 @@ static const char *serv_interface = "org.freedesktop.systemd1.Service"; static const char *prop_interface = "org.freedesktop.DBus.Properties"; flux_future_t *sdexec_property_get_all (flux_t *h, + const char *service, uint32_t rank, const char *path) { flux_future_t *f; + char topic[256]; - if (!h || !path) { + if (!h || !service || !path) { errno = EINVAL; return NULL; } + snprintf (topic, sizeof (topic), "%s.call", service); if (!(f = flux_rpc_pack (h, - "sdbus.call", + topic, rank, 0, "{s:s s:s s:s s:[s]}", @@ -55,16 +58,19 @@ flux_future_t *sdexec_property_get_all (flux_t *h, } flux_future_t *sdexec_property_get (flux_t *h, + const char *service, uint32_t rank, const char *path, const char *name) { flux_future_t *f; + char topic[256]; - if (!h || !path || !name) { + if (!h || !service || !path || !name) { errno = EINVAL; return NULL; } + snprintf (topic, sizeof (topic), "%s.call", service); if (!(f = flux_rpc_pack (h, "sdbus.call", rank, @@ -79,13 +85,15 @@ flux_future_t *sdexec_property_get (flux_t *h, } flux_future_t *sdexec_property_changed (flux_t *h, + const char *service, uint32_t rank, const char *path) { flux_future_t *f; json_t *o; + char topic[256]; - if (!h) { + if (!h || !service) { errno = EINVAL; return NULL; } @@ -100,8 +108,9 @@ flux_future_t *sdexec_property_changed (flux_t *h, goto nomem; } } + snprintf (topic, sizeof (topic), "%s.subscribe", service); if (!(f = flux_rpc_pack (h, - "sdbus.subscribe", + topic, rank, FLUX_RPC_STREAMING, "O", o))) diff --git a/src/common/libsdexec/property.h b/src/common/libsdexec/property.h index 631859199ba4..46780c070521 100644 --- a/src/common/libsdexec/property.h +++ b/src/common/libsdexec/property.h @@ -18,6 +18,7 @@ j************************************************************/ * Parse the returned value with sdexec_property_get_unpack(). */ flux_future_t *sdexec_property_get (flux_t *h, + const char *service, uint32_t rank, const char *path, const char *name); @@ -28,6 +29,7 @@ int sdexec_property_get_unpack (flux_future_t *f, const char *fmt, ...); * which can be further parsed with sdexec_property_dict_unpack(). */ flux_future_t *sdexec_property_get_all (flux_t *h, + const char *service, uint32_t rank, const char *path); json_t *sdexec_property_get_all_dict (flux_future_t *f); @@ -40,6 +42,7 @@ json_t *sdexec_property_get_all_dict (flux_future_t *f); * sdexec_property_changed_path() to get the path for each response. */ flux_future_t *sdexec_property_changed (flux_t *h, + const char *service, uint32_t rank, const char *path); json_t *sdexec_property_changed_dict (flux_future_t *f); diff --git a/src/common/libsdexec/test/list.c b/src/common/libsdexec/test/list.c index 3653a8e39657..d6d4962b5cc4 100644 --- a/src/common/libsdexec/test/list.c +++ b/src/common/libsdexec/test/list.c @@ -31,10 +31,13 @@ void test_inval (void) BAIL_OUT ("could not create future for testing"); errno = 0; - ok (sdexec_list_units (NULL, 0, "*") == NULL && errno == EINVAL, + ok (sdexec_list_units (NULL, "sdexec", 0, "*") == NULL && errno == EINVAL, "sdexec_list_units h=NULL fails with EINVAL"); errno = 0; - ok (sdexec_list_units (h, 0, NULL) == NULL && errno == EINVAL, + ok (sdexec_list_units (h, NULL, 0, "*") == NULL && errno == EINVAL, + "sdexec_list_units service=NULL fails with EINVAL"); + errno = 0; + ok (sdexec_list_units (h, "sdexec", 0, NULL) == NULL && errno == EINVAL, "sdexec_list_units pattern=NULL fails with EINVAL"); ok (sdexec_list_units_next (NULL, &info) == false, diff --git a/src/common/libsdexec/test/property.c b/src/common/libsdexec/test/property.c index e928ccbc199d..4934b2925830 100644 --- a/src/common/libsdexec/test/property.c +++ b/src/common/libsdexec/test/property.c @@ -52,13 +52,20 @@ void test_inval (void) BAIL_OUT ("could not create property dict for testing"); errno = 0; - ok (sdexec_property_get (NULL, 0, "foo", "bar") == NULL && errno == EINVAL, + ok (sdexec_property_get (NULL, "sdexec", 0, "foo", "bar") == NULL + && errno == EINVAL, "sdexec_property_get h=NULL fails with EINVAL"); errno = 0; - ok (sdexec_property_get (h, 0, NULL, "bar") == NULL && errno == EINVAL, + ok (sdexec_property_get (h, NULL, 0, "foo", "bar") == NULL + && errno == EINVAL, + "sdexec_property_get service=NULL fails with EINVAL"); + errno = 0; + ok (sdexec_property_get (h, "sdexec", 0, NULL, "bar") == NULL + && errno == EINVAL, "sdexec_property_get path=NULL fails with EINVAL"); errno = 0; - ok (sdexec_property_get (h, 0, "foo", NULL) == NULL && errno == EINVAL, + ok (sdexec_property_get (h, "sdexec", 0, "foo", NULL) == NULL + && errno == EINVAL, "sdexec_property_get name=NULL fails with EINVAL"); errno = 0; @@ -69,10 +76,16 @@ void test_inval (void) "sdexec_property_get_unpack fmt=NULL fails with EINVAL"); errno = 0; - ok (sdexec_property_get_all (NULL, 0, "foo") == NULL && errno == EINVAL, + ok (sdexec_property_get_all (NULL, "sdexec", 0, "foo") == NULL + && errno == EINVAL, "sdexec_property_get_all h=NULL fails with EINVAL"); errno = 0; - ok (sdexec_property_get_all (h, 0, NULL) == NULL && errno == EINVAL, + ok (sdexec_property_get_all (h, NULL, 0, "foo") == NULL + && errno == EINVAL, + "sdexec_property_get_all service=NULL fails with EINVAL"); + errno = 0; + ok (sdexec_property_get_all (h, "sdexec", 0, NULL) == NULL + && errno == EINVAL, "sdexec_property_get_all path=NULL fails with EINVAL"); errno = 0; @@ -80,7 +93,8 @@ void test_inval (void) "sdexec_property_get_all_dict f=NULL fails with EINVAL"); errno = 0; - ok (sdexec_property_changed (NULL, 0, "foo") == NULL && errno == EINVAL, + ok (sdexec_property_changed (NULL, "sdexec", 0, "foo") == NULL + && errno == EINVAL, "sdexec_property_changed h=NULL fails with EINVAL"); errno = 0; diff --git a/src/modules/sdexec/sdexec.c b/src/modules/sdexec/sdexec.c index 69e154acffb6..092a6d2b5462 100644 --- a/src/modules/sdexec/sdexec.c +++ b/src/modules/sdexec/sdexec.c @@ -837,6 +837,7 @@ static void exec_cb (flux_t *h, proc->msg = msg; sdexec_log_debug (h, "watch %s", sdexec_unit_name (proc->unit)); if (!(proc->f_watch = sdexec_property_changed (h, + "sdbus", ctx->rank, sdexec_unit_path (proc->unit))) || flux_future_then (proc->f_watch, From a7ac6a8bf902bdf4fab830570dabb2fb2f0676c6 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 3 Feb 2025 15:21:35 -0800 Subject: [PATCH 09/14] sdmon: add new systemd monitor module Problem: there is no mechanism to track systemd units across a broker restart. Add a broker module that creates and maintains a list of running flux systemd units. This monitors two instances of systemd: - the user one, running as user flux (where jobs are run) - the system one (where housekeeping, prolog, epilog run) A list of units matching flux unit globs is requested at initialization, and a subscription to property updates on those globs is obtained. After the initial list, monitoring is driven solely by property updates. Join the sdmon.online broker group once the node is demonstrably idle. This lets the resource module on rank 0 notice compute nodes that need cleanup at restart and withhold them from the scheduler. Once the group is joined, sdmon does not explicitly leave it. It implicitly leaves the group if sdmon is unloaded or the node goes offline/lost. If there are running units at startup, log this information at LOG_ERR level, and again when the units are cleaned up, e.g. flux-housekeeping@fAMpzjUPvsR.service needs cleanup - resources are offline cleanup complete - resources are online In the future, this module's role could be expanded to support tooling for listing running work and obtaining runtime information such as pids and cgroup resource parameters. It could also play a role in informing other flux components about work that should be re-attached after a full or partial restart, when support for that is added. --- src/modules/Makefile.am | 14 ++ src/modules/sdmon/sdmon.c | 432 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 446 insertions(+) create mode 100644 src/modules/sdmon/sdmon.c diff --git a/src/modules/Makefile.am b/src/modules/Makefile.am index 14197ef31716..53ad5358642e 100644 --- a/src/modules/Makefile.am +++ b/src/modules/Makefile.am @@ -44,6 +44,7 @@ fluxmod_LTLIBRARIES = \ kvs-watch.la \ resource.la \ sched-simple.la \ + sdmon.la \ sdexec.la \ sdbus.la @@ -268,6 +269,19 @@ sched_simple_la_LIBADD = \ $(JANSSON_LIBS) sched_simple_la_LDFLAGS = $(fluxmod_ldflags) -module +sdmon_la_SOURCES = \ + sdmon/sdmon.c +sdmon_la_CPPFLAGS = \ + $(AM_CPPFLAGS) \ + $(LIBUUID_CFLAGS) +sdmon_la_LIBADD = \ + $(top_builddir)/src/common/libsdexec/libsdexec.la \ + $(top_builddir)/src/common/libflux-core.la \ + $(top_builddir)/src/common/libflux-internal.la \ + $(LIBUUID_LIBS) \ + $(JANSSON_LIBS) +sdmon_la_LDFLAGS = $(fluxmod_ldflags) -module + sdexec_la_SOURCES = \ sdexec/sdexec.c sdexec_la_CPPFLAGS = \ diff --git a/src/modules/sdmon/sdmon.c b/src/modules/sdmon/sdmon.c new file mode 100644 index 000000000000..cf38451af0e0 --- /dev/null +++ b/src/modules/sdmon/sdmon.c @@ -0,0 +1,432 @@ +/************************************************************\ + * Copyright 2025 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +/* sdmon.c - create and maintain a list of running flux systemd units + * + * This monitors two instances of systemd: + * - the user one, running as user flux (where jobs are run) + * - the system one (where housekeeping, prolog, epilog run) + * + * A list of units matching flux unit globs is requested at initialization, + * and a subscription to property updates on those globs is obtained. + * After the initial list, monitoring is driven solely by property updates. + * + * Join the sdmon.online broker group once the unit list responses have been + * received and there are no Flux units running on the node. This lets the + * resource module on rank 0 hold back nodes that require cleanup from the + * scheduler. + */ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include +#include "ccan/str/str.h" +#include "src/common/libutil/errprintf.h" +#include "src/common/libutil/errno_safe.h" +#include "src/common/libutil/basename.h" +#include "src/common/libczmqcontainers/czmq_containers.h" +#include "src/common/libsdexec/list.h" +#include "src/common/libsdexec/property.h" +#include "src/common/libsdexec/unit.h" +#include "src/common/libsdexec/state.h" + +struct sdmon_bus { + flux_future_t *fp; // SERVICE.subscribe + flux_future_t *fl; // SERVICE.call ListUnitsByPattern + bool unmute_property_updates; // set true after list response is received + const char *service; // sdbus or sdbus-sys + const char *unit_glob; +}; + +struct sdmon_ctx { + flux_t *h; + uint32_t rank; + flux_msg_handler_t **handlers; + struct sdmon_bus sys; + struct sdmon_bus usr; + zhashx_t *units; // unit name => (struct unit *) + bool group_joined; + bool cleanup_needed; + flux_future_t *fg; +}; + +static const char *path_prefix = "/org/freedesktop/systemd1/unit"; + +static const char *sys_glob = "flux-*"; +static const char *usr_glob = "*shell-*"; // match with and without imp- prefix + +static const char *group_name = "sdmon.online"; + +/* Process a group response. This is very unlikely to fail but if it does, + * make sure we get a log message. + */ +static void sdmon_join_continuation (flux_future_t *f, void *arg) +{ + struct sdmon_ctx *ctx = arg; + if (flux_future_get (f, NULL) < 0) { + flux_log (ctx->h, + LOG_ERR, + "groups.join request failed: %s", + future_strerror (f, errno)); + } +} + +/* Send a broker groups.join request IFF: + * - we haven't joined yet + * - both busses have their initial list responses (prop updates unmuted) + * - the unit hash is empty + */ +static void sdmon_group_join_if_ready (struct sdmon_ctx *ctx) +{ + if (ctx->group_joined + || !ctx->sys.unmute_property_updates + || !ctx->usr.unmute_property_updates + || zhashx_size (ctx->units) > 0) + return; + + // unit(s) needing cleanup were logged, so indicate they are resolved now. + ctx->group_joined = true; + if (ctx->cleanup_needed) + flux_log (ctx->h, LOG_ERR, "cleanup complete - resources are online"); + + flux_future_destroy (ctx->fg); + if (!(ctx->fg = flux_rpc_pack (ctx->h, + "groups.join", + ctx->rank, + 0, + "{s:s}", + "name", group_name)) + || flux_future_then (ctx->fg, -1, sdmon_join_continuation, ctx) < 0) + flux_log_error (ctx->h, "error sending groups.join request"); +} + +/* List the units that sdmon thinks are running and their state.substate. + */ +static void sdmon_stats_cb (flux_t *h, + flux_msg_handler_t *mh, + const flux_msg_t *msg, + void *arg) +{ + struct sdmon_ctx *ctx = arg; + json_t *units; + struct unit *unit; + + if (!(units = json_array ())) + goto error; + unit = zhashx_first (ctx->units); + while (unit) { + json_t *o; + char state[64]; + + snprintf (state, + sizeof (state), + "%s.%s", + sdexec_statetostr (sdexec_unit_state (unit)), + sdexec_substatetostr (sdexec_unit_substate (unit))); + + if (!(o = json_pack ("{s:s s:s}", + "name", sdexec_unit_name (unit), + "state", state)) + || json_array_append_new (units, o) < 0) { + json_decref (o); + errno = ENOMEM; + goto error; + } + unit = zhashx_next (ctx->units); + } + if (flux_respond_pack (h, msg, "{s:O}", "units", units) < 0) + flux_log_error (h, "error responding to stats-get request"); + json_decref (units); + return; +error: + if (flux_respond_error (h, msg, errno, NULL) < 0) + flux_log_error (h, "error responding to stats-get request"); + json_decref (units); +} + +// zhashx_destructor_fn footprint +static void sdmon_unit_destructor (void **item) +{ + if (*item) { + sdexec_unit_destroy (*item); + *item = NULL; + } +} + +/* Determine if a unit is considered "running" for purposes of this module. + */ +static bool sdmon_unit_is_running (struct unit *unit) +{ + bool running = false; + + switch (sdexec_unit_state (unit)) { + case STATE_ACTIVATING: + case STATE_ACTIVE: + case STATE_DEACTIVATING: + running = true; + break; + case STATE_UNKNOWN: + case STATE_INACTIVE: + case STATE_FAILED: + break; + } + return running; +} + +/* A unit matching a subscribed-to glob (on either bus) has changed properties. + * If it's a new, running unit, add it to the units hash. + * If it's a known unit that is no longer running, remove it. + * Join the group if the unit hash transitions to empty. + */ +static void sdmon_property_continuation (flux_future_t *f, void *arg) +{ + struct sdmon_ctx *ctx = arg; + struct sdmon_bus *bus = f == ctx->usr.fp ? &ctx->usr : &ctx->sys; + const char *path; + const char *name; + json_t *dict; + struct unit *unit; + bool unit_is_new = false; + + + if (!(path = sdexec_property_changed_path (f)) + || (!(dict = sdexec_property_changed_dict (f)))) { + flux_log (ctx->h, + LOG_ERR, + "%s.subscribe: %s", + bus->service, + future_strerror (f, errno)); + goto fatal; + } + if (!bus->unmute_property_updates) + goto done; + name = basename_simple (path); + if (!(unit = zhashx_lookup (ctx->units, name))) { + if (!(unit = sdexec_unit_create (name))) { + flux_log_error (ctx->h, "error creating unit %s", name); + goto done; + } + unit_is_new = true; + } + if (!sdexec_unit_update (unit, dict) && !unit_is_new) + goto done; // nothing changed + + if (sdmon_unit_is_running (unit)) { + if (unit_is_new) { + if (zhashx_insert (ctx->units, name, unit) < 0) { + flux_log (ctx->h, LOG_ERR, "error tracking unit %s", name); + sdexec_unit_destroy (unit); + goto done; + } + } + } + else { + if (unit_is_new) + sdexec_unit_destroy (unit); + else + zhashx_delete (ctx->units, name); + } + sdmon_group_join_if_ready (ctx); +done: + flux_future_reset (f); + return; +fatal: + flux_reactor_stop_error (flux_get_reactor (ctx->h)); +} + +/* Process the initial list of units that match our glob (on either bus). + * Add any running units to the unit hash, then unmute property updates. + * Join the group if the unit hash is empty after that. + */ +static void sdmon_list_continuation (flux_future_t *f, void *arg) +{ + struct sdmon_ctx *ctx = arg; + struct sdmon_bus *bus = f == ctx->usr.fl ? &ctx->usr : &ctx->sys; + struct unit_info info; + + if (flux_future_get (f, NULL) < 0) { + flux_log (ctx->h, + LOG_ERR, + "%s.call: %s", + bus->service, + future_strerror (f, errno)); + goto fatal; + } + + while (sdexec_list_units_next (f, &info)) { + struct unit *unit; + + if (!(unit = sdexec_unit_create (info.name))) { + flux_log_error (ctx->h, "error creating unit %s", info.name); + continue; + } + (void)sdexec_unit_update_frominfo (unit, &info); + if (sdmon_unit_is_running (unit)) { + flux_log (ctx->h, + LOG_ERR, + "%s needs cleanup - resources are offline", + info.name); + ctx->cleanup_needed = true; + if (zhashx_insert (ctx->units, info.name, unit) < 0) { + flux_log_error (ctx->h, "error tracking unit %s", info.name); + sdexec_unit_destroy (unit); + continue; + } + } + } + bus->unmute_property_updates = true; + sdmon_group_join_if_ready (ctx); + return; +fatal: + flux_reactor_stop_error (flux_get_reactor (ctx->h)); +} + +/* Check if the sdbus module is loaded on the local rank by pinging its + * stats-get method. N.B. sdbus handles its D-bus connect asynchronously + * so stats-get should be responsive even if D-Bus is not. + */ +static int sdbus_is_loaded (flux_t *h, + const char *service, + uint32_t rank, + flux_error_t *error) +{ + flux_future_t *f; + char topic[256]; + + snprintf (topic, sizeof (topic), "%s.stats-get", service); + if (!(f = flux_rpc (h, topic, NULL, rank, 0)) + || flux_rpc_get (f, NULL) < 0) { + if (errno == ENOSYS) + errprintf (error, "%s module is not loaded", service); + else + errprintf (error, "%s: %s", service, future_strerror (f, errno)); + flux_future_destroy (f); + return -1; + } + flux_future_destroy (f); + return 0; +} + +static void sdmon_bus_finalize (struct sdmon_bus *bus) +{ + flux_future_destroy (bus->fp); + flux_future_destroy (bus->fl); +} + +/* Send sdbus.subscribe and sdbus.call (ListUnitsByPatterns). + * N.B. The subscribe request must be sent before the list request to avoid + * missing property updates that immediately follow the list response. + * Set 'bus->unmute_property_updates' after the list response is received. + * Any property updates received before that are ignored. +*/ +static int sdmon_bus_init (struct sdmon_bus *bus, + struct sdmon_ctx *ctx, + const char *service, + const char *pattern, + flux_error_t *error) +{ + flux_future_t *fp = NULL; + flux_future_t *fl = NULL; + char path[256]; + + if (sdbus_is_loaded (ctx->h, service, ctx->rank, error) < 0) + return -1; + snprintf (path, sizeof (path), "%s/%s", path_prefix, pattern); + if (!(fp = sdexec_property_changed (ctx->h, service, ctx->rank, path)) + || flux_future_then (fp, -1, sdmon_property_continuation, ctx) < 0) { + errprintf (error, "%s.subscribe: %s", service, strerror (errno)); + goto error; + } + if (!(fl = sdexec_list_units (ctx->h, service, ctx->rank, pattern)) + || flux_future_then (fl, -1, sdmon_list_continuation, ctx) < 0) { + errprintf (error, "%s.call: %s", service, strerror (errno)); + goto error; + } + bus->service = service; + bus->fp = fp; + bus->fl = fl; + return 0; +error: + flux_future_destroy (fp); + flux_future_destroy (fl); + return -1; +} + +static void sdmon_ctx_destroy (struct sdmon_ctx *ctx) +{ + if (ctx) { + int saved_errno = errno; + sdmon_bus_finalize (&ctx->sys); + sdmon_bus_finalize (&ctx->usr); + flux_future_destroy (ctx->fg); + flux_msg_handler_delvec (ctx->handlers); + zhashx_destroy (&ctx->units); + free (ctx); + errno = saved_errno; + } +} + +static struct sdmon_ctx *sdmon_ctx_create (flux_t *h) +{ + struct sdmon_ctx *ctx; + + if (!(ctx = calloc (1, sizeof (*ctx)))) + return NULL; + if (flux_get_rank (h, &ctx->rank) < 0) + goto error; + if (!(ctx->units = zhashx_new ())) { + errno = ENOMEM; + goto error; + } + zhashx_set_destructor (ctx->units, sdmon_unit_destructor); + ctx->h = h; + return ctx; +error: + sdmon_ctx_destroy (ctx); + return NULL; +} + +static struct flux_msg_handler_spec htab[] = { + { FLUX_MSGTYPE_REQUEST, + "stats-get", + sdmon_stats_cb, + 0 + }, + FLUX_MSGHANDLER_TABLE_END +}; + +int mod_main (flux_t *h, int argc, char **argv) +{ + struct sdmon_ctx *ctx; + flux_error_t error; + const char *modname = flux_aux_get (h, "flux::name"); + int rc = -1; + + if (!(ctx = sdmon_ctx_create (h))) + goto error; + if (flux_msg_handler_addvec_ex (h, modname, htab, ctx, &ctx->handlers) < 0) + goto error; + if (sdmon_bus_init (&ctx->sys, ctx, "sdbus-sys", sys_glob, &error) < 0) + goto error; + if (sdmon_bus_init (&ctx->usr, ctx, "sdbus", usr_glob, &error) < 0) + goto error; + if (flux_reactor_run (flux_get_reactor (h), 0) < 0) { + flux_log_error (h, "reactor exited abnormally"); + goto error; + } + rc = 0; +error: + sdmon_ctx_destroy (ctx); + return rc; +} + +// vi:ts=4 sw=4 expandtab From 4c50d8e8bf6bac96c043f327e2c7d54dd12467de Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 3 Feb 2025 15:22:46 -0800 Subject: [PATCH 10/14] rc: conditionally load sdmon module Problem: the sdmon module is not loaded by default. Load it if systemd.enable = true in the configuration. --- etc/rc1 | 1 + etc/rc3 | 1 + 2 files changed, 2 insertions(+) diff --git a/etc/rc1 b/etc/rc1 index da2ec215215f..0b20f08154d6 100755 --- a/etc/rc1 +++ b/etc/rc1 @@ -28,6 +28,7 @@ if test "$(flux config get --default=false systemd.enable)" = "true"; then modload all sdbus modload all --name sdbus-sys sdbus system modload all sdexec + modload all sdmon fi if test $RANK -eq 0; then diff --git a/etc/rc3 b/etc/rc3 index 6d9476c38697..72a8cc1a964c 100755 --- a/etc/rc3 +++ b/etc/rc3 @@ -37,6 +37,7 @@ modrm 0 job-manager modrm all job-ingest modrm 0 cron +modrm all sdmon modrm all sdexec modrm all sdbus-sys modrm all sdbus From df1741aee4f3033916586b2422fa4dc7a99f0afc Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 6 Feb 2025 07:45:37 -0800 Subject: [PATCH 11/14] resource: parse systemd.enable Problem: the monitor subsystem of the resource module needs to know whether the "sdmon.online" broker group will be populated. Parse the enable key from [systemd]. Pass the whole resource_config struct to the monitor subsystem instead of just monitor_force_up. --- src/modules/resource/monitor.c | 5 ++--- src/modules/resource/monitor.h | 2 +- src/modules/resource/resource.c | 12 +++++++++++- src/modules/resource/resource.h | 1 + 4 files changed, 15 insertions(+), 5 deletions(-) diff --git a/src/modules/resource/monitor.c b/src/modules/resource/monitor.c index 35459c7bde0a..762e262d981f 100644 --- a/src/modules/resource/monitor.c +++ b/src/modules/resource/monitor.c @@ -401,7 +401,7 @@ void monitor_destroy (struct monitor *monitor) struct monitor *monitor_create (struct resource_ctx *ctx, int inventory_size, - bool monitor_force_up) + struct resource_config *config) { struct monitor *monitor; @@ -439,7 +439,7 @@ struct monitor *monitor_create (struct resource_ctx *ctx, || !(monitor->torpid = idset_create (monitor->size, 0)) || !(monitor->lost = idset_create (monitor->size, 0))) goto error; - if (monitor_force_up) { + if (config->monitor_force_up) { if (idset_range_set (monitor->up, 0, monitor->size - 1) < 0) goto error; } @@ -466,7 +466,6 @@ struct monitor *monitor_create (struct resource_ctx *ctx, return NULL; } - /* * vi:tabstop=4 shiftwidth=4 expandtab */ diff --git a/src/modules/resource/monitor.h b/src/modules/resource/monitor.h index c107e009d697..aeecead39f20 100644 --- a/src/modules/resource/monitor.h +++ b/src/modules/resource/monitor.h @@ -13,7 +13,7 @@ struct monitor *monitor_create (struct resource_ctx *ctx, int inventory_size, - bool monitor_force_up); + struct resource_config *config); void monitor_destroy (struct monitor *monitor); const struct idset *monitor_get_down (struct monitor *monitor); diff --git a/src/modules/resource/resource.c b/src/modules/resource/resource.c index 06b64adeac88..10398f7a8ed4 100644 --- a/src/modules/resource/resource.c +++ b/src/modules/resource/resource.c @@ -151,6 +151,15 @@ static int parse_config (struct resource_ctx *ctx, return -1; } } + /* Check systemd.enable so we know whether sdmon.online will be populated. + * Configuration errors in [systemd] are handled elsewhere. + */ + int systemd_enable = 0; + (void)flux_conf_unpack (conf, + NULL, + "{s?{s?b}}", + "systemd", + "enable", &systemd_enable); if (rconfig) { rconfig->journal_max = journal_max; rconfig->exclude_idset = exclude; @@ -159,6 +168,7 @@ static int parse_config (struct resource_ctx *ctx, rconfig->no_update_watch = no_update_watch ? true : false; rconfig->rediscover = rediscover ? true : false; rconfig->R = o; + rconfig->systemd_enable = systemd_enable ? true : false; } else json_decref (o); @@ -402,7 +412,7 @@ int mod_main (flux_t *h, int argc, char **argv) goto error; if (!(ctx->monitor = monitor_create (ctx, inventory_get_size (ctx->inventory), - config.monitor_force_up))) + &config))) goto error; if (!(ctx->status = status_create (ctx))) goto error; diff --git a/src/modules/resource/resource.h b/src/modules/resource/resource.h index 4e1693ed6e84..9c3d02a1c254 100644 --- a/src/modules/resource/resource.h +++ b/src/modules/resource/resource.h @@ -20,6 +20,7 @@ struct resource_config { bool no_update_watch; bool monitor_force_up; int journal_max; + bool systemd_enable; // systemd.enable, not under [resource] }; struct resource_ctx { From c614eb3e9395f6a4020e5a0144627d33a2ab24e7 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 6 Feb 2025 13:56:00 -0800 Subject: [PATCH 12/14] resource: conditionally monitor sdmon.online Problem: nodes are not checked for untracked running work when a Flux instance starts up. This might happen, for example, if - job-exec deems job shell(s) unkillable - housekeeping/prolog/epilog gets stuck on a hung file system - the broker exits without proper shutdown When systemd is enabled, the new sdmon module joins the 'sdmon.online' broker group on startup. However, if there are any running flux units, this is delayed until those units are no longer running. Change the resource module so that it monitors sdmon.online instead of broker.online when systemd is enabled. This will withhold "busy" nodes from the scheduler until they become idle. Fixes #6590 --- src/modules/resource/monitor.c | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/modules/resource/monitor.c b/src/modules/resource/monitor.c index 762e262d981f..953ce5f124bf 100644 --- a/src/modules/resource/monitor.c +++ b/src/modules/resource/monitor.c @@ -17,6 +17,13 @@ * the initial response to the request to watch broker.online cannot * be processed until the reactor runs. * + * If systemd is enabled, watch sdmon.online instead of broker.online. This + * behaves exactly like broker.online, except that it isn't joined until + * sdmon has verified that the node has no running flux systemd units. + * This guards against scheduling new work on a node that hasn't been + * properly cleaned up. As with broker.online, nodes are automatically + * removed from the group when they are shut down or lost. + * * Some synchronization notes: * - rc1 completes on rank 0 before any other ranks can join broker.online, * therefore the scheduler must allow flux module load to complete with @@ -444,7 +451,10 @@ struct monitor *monitor_create (struct resource_ctx *ctx, goto error; } else if (!flux_attr_get (ctx->h, "broker.recovery-mode")) { - if (!(monitor->f_online = group_monitor (ctx->h, "broker.online")) + const char *online_group = "broker.online"; + if (config->systemd_enable) + online_group = "sdmon.online"; + if (!(monitor->f_online = group_monitor (ctx->h, online_group)) || flux_future_then (monitor->f_online, -1, broker_online_cb, From 27977acafb1b9ff94ab70da4d352f948d08fc410 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Fri, 7 Feb 2025 10:06:43 -0800 Subject: [PATCH 13/14] testsuite: add sdmon test script Problem: there is no test coverage for the sdmon module. Add a new sharness script. --- t/Makefile.am | 1 + t/t2412-sdmon.t | 125 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 126 insertions(+) create mode 100755 t/t2412-sdmon.t diff --git a/t/Makefile.am b/t/Makefile.am index 442bf928018f..3b7a88671e83 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -198,6 +198,7 @@ TESTSCRIPTS = \ t2409-sdexec.t \ t2410-sdexec-memlimit.t \ t2411-sdexec-job.t \ + t2412-sdmon.t \ t2500-job-attach.t \ t2501-job-status.t \ t2600-job-shell-rcalc.t \ diff --git a/t/t2412-sdmon.t b/t/t2412-sdmon.t new file mode 100755 index 000000000000..8f02f96fcf90 --- /dev/null +++ b/t/t2412-sdmon.t @@ -0,0 +1,125 @@ +#!/bin/sh +# ci=system + +test_description='Test flux systemd monitoring' + +. $(dirname $0)/sharness.sh + +if ! flux version | grep systemd; then + skip_all="flux was not built with systemd" + test_done +fi +if ! systemctl --user show --property Version; then + skip_all="user systemd is not running" + test_done +fi +if ! busctl --user status >/dev/null; then + skip_all="user dbus is not running" + test_done +fi +if ! busctl status >/dev/null; then + skip_all="system dbus is not running" + test_done +fi + +test_under_flux 1 minimal + +flux setattr log-stderr-level 1 + +# Usage: start test unit NAME (without service suffix) +start_test_unit() { + local sleep=$(which sleep) + flux exec \ + --service sdexec \ + --setopt SDEXEC_NAME="$1.service" \ + $sleep 3600 & +} +# Usage: stop_test_unit NAME (without service suffix) +stop_test_unit() { + systemctl --user stop $1 +} +# Usage: wait_for_none MAXSEC +wait_for_none() { + local retry=$(($1*10)) + while ! flux module stats sdmon | jq -e ".units == []"; do + sleep 0.1 + retry=$(($retry-1)) + test $retry -gt 0 || exit 1 + done +} +# Usage: wait_for_some MAXSEC +wait_for_some() { + local retry=$(($1*10)) + while flux module stats sdmon | jq -e ".units == []"; do + sleep 0.1 + retry=$(($retry-1)) + test $retry -gt 0 || exit 1 + done +} + +groups="flux python ${SHARNESS_TEST_SRCDIR}/scripts/groups.py" + +test_expect_success 'load sdbus,sdexec modules' ' + flux module load --name sdbus-sys sdbus system && + flux module load sdbus && + flux module load sdexec +' +test_expect_success 'load sdmon module' ' + flux module load sdmon +' +test_expect_success 'make sure residual test units are not running' ' + stop_test_unit shell-t2412 || true && + stop_test_unit imp-shell-t2412 || true +' +test_expect_success 'wait for it to join the sdmon.online group' ' + run_timeout 30 $groups waitfor --count=1 sdmon.online +' +test_expect_success 'module stats units array is empty' ' + flux module stats sdmon | jq -e ".units == []" +' +test_expect_success 'run a systemd unit with imp-shell- prefix' ' + start_test_unit imp-shell-t2412 +' +test_expect_success 'wait for module stats to show test unit' ' + wait_for_some 30 +' +test_expect_success 'remove sdmon module' ' + flux module remove sdmon +' +# removing the module triggers a disconnect that causes a group leave +test_expect_success 'wait for it to leave the sdmon.online group' ' + run_timeout 30 $groups waitfor --count=0 sdmon.online +' +test_expect_success 'load sdmon module' ' + flux module load sdmon +' +test_expect_success 'wait for module stats to show test unit' ' + wait_for_some 30 +' +test_expect_success 'stop the unit' ' + stop_test_unit imp-shell-t2412 +' +test_expect_success 'wait for sdmon to join the sdmon.online group' ' + run_timeout 30 $groups waitfor --count=1 sdmon.online +' +test_expect_success 'run a systemd unit with shell- prefix' ' + start_test_unit shell-t2412 +' +test_expect_success 'wait for module stats to show test unit' ' + wait_for_some 30 +' +test_expect_success 'stop the unit' ' + stop_test_unit shell-t2412 +' +test_expect_success 'wait for module stats stop showing test unit' ' + wait_for_none 30 +' +test_expect_success 'remove sdmon module' ' + flux module remove sdmon +' +test_expect_success 'remove sdexec,sdbus modules' ' + flux module remove sdexec && + flux module remove sdbus && + flux module remove sdbus-sys +' +test_done From 6b86e5ff2f8b1d2925175382e583dafa5ace1e9a Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Fri, 7 Feb 2025 15:52:43 -0800 Subject: [PATCH 14/14] testsuite: cover resource with sdmon.online Problem: there is no test coverage for the resource module's behavior when systemd is configured and sdmon is providing sdmon.online. Add a sharness script for that. --- t/Makefile.am | 1 + t/t2413-sdmon-resource.t | 108 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 109 insertions(+) create mode 100755 t/t2413-sdmon-resource.t diff --git a/t/Makefile.am b/t/Makefile.am index 3b7a88671e83..7288fa860b98 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -199,6 +199,7 @@ TESTSCRIPTS = \ t2410-sdexec-memlimit.t \ t2411-sdexec-job.t \ t2412-sdmon.t \ + t2413-sdmon-resource.t \ t2500-job-attach.t \ t2501-job-status.t \ t2600-job-shell-rcalc.t \ diff --git a/t/t2413-sdmon-resource.t b/t/t2413-sdmon-resource.t new file mode 100755 index 000000000000..10ac59a9915e --- /dev/null +++ b/t/t2413-sdmon-resource.t @@ -0,0 +1,108 @@ +#!/bin/sh +# ci=system + +test_description='Test flux systemd monitoring' + +. $(dirname $0)/sharness.sh + +if ! flux version | grep systemd; then + skip_all="flux was not built with systemd" + test_done +fi +if ! systemctl --user show --property Version; then + skip_all="user systemd is not running" + test_done +fi +if ! busctl --user status >/dev/null; then + skip_all="user dbus is not running" + test_done +fi +if ! busctl status >/dev/null; then + skip_all="system dbus is not running" + test_done +fi + +mkdir -p config +cat >config/config.toml </dev/null || true +' +test_expect_success 'wait for sdmon.online group' ' + run_timeout 30 $groups waitfor --count=1 sdmon.online +' +test_expect_success 'wait for online resource event' ' + run_timeout 30 flux resource eventlog --wait=online +' +test_expect_success 'start a test unit that looks like a job shell' ' + start_test_unit shell-t2413 +' +test_expect_success 'wait for module stats to show test unit' ' + wait_for_some 30 +' +test_expect_success 'clear dmesg, then reload sdmin, resource, sched-simple' ' + flux dmesg -C && + flux module remove sched-simple && + flux module remove resource && + flux module reload sdmon && + flux module load resource && + flux module load sched-simple +' +test_expect_success 'stop test unit' ' + stop_test_unit shell-t2413 +' +test_expect_success 'wait for module stats to show nothing' ' + wait_for_none 30 +' +test_expect_success 'wait for online resource event' ' + run_timeout 30 flux resource eventlog --wait=online +' +test_expect_success 'unit cleanup was logged' ' + flux dmesg -H >dmesg.out && + grep "shell-t2413.service needs cleanup" dmesg.out && + grep "cleanup complete" dmesg.out +' + +test_done