diff --git a/etc/rc1 b/etc/rc1 index 9f6e967f58fd..0b20f08154d6 100755 --- a/etc/rc1 +++ b/etc/rc1 @@ -26,7 +26,9 @@ modload all content modload all barrier if test "$(flux config get --default=false systemd.enable)" = "true"; then modload all sdbus + modload all --name sdbus-sys sdbus system modload all sdexec + modload all sdmon fi if test $RANK -eq 0; then diff --git a/etc/rc3 b/etc/rc3 index 8977b402daaf..72a8cc1a964c 100755 --- a/etc/rc3 +++ b/etc/rc3 @@ -37,7 +37,9 @@ modrm 0 job-manager modrm all job-ingest modrm 0 cron +modrm all sdmon modrm all sdexec +modrm all sdbus-sys modrm all sdbus modrm all barrier diff --git a/src/common/libsdexec/list.c b/src/common/libsdexec/list.c index 531a77835f82..7d6d45ebba78 100644 --- a/src/common/libsdexec/list.c +++ b/src/common/libsdexec/list.c @@ -66,14 +66,19 @@ bool sdexec_list_units_next (flux_future_t *f, struct unit_info *infop) /* N.B. Input params: states=[], patterns=[pattern]. */ -flux_future_t *sdexec_list_units (flux_t *h, uint32_t rank, const char *pattern) +flux_future_t *sdexec_list_units (flux_t *h, + const char *service, + uint32_t rank, + const char *pattern) { - if (!h || !pattern) { + if (!h || !pattern || !service) { errno = EINVAL; return NULL; } + char topic[256]; + snprintf (topic, sizeof (topic), "%s.call", service); return flux_rpc_pack (h, - "sdbus.call", + topic, rank, 0, "{s:s s:[[] [s]]}", diff --git a/src/common/libsdexec/list.h b/src/common/libsdexec/list.h index 115767ff5d51..dc53de395fcb 100644 --- a/src/common/libsdexec/list.h +++ b/src/common/libsdexec/list.h @@ -32,6 +32,7 @@ struct unit_info { * (E.g. use "*" for all). */ flux_future_t *sdexec_list_units (flux_t *h, + const char *service, uint32_t rank, const char *pattern); diff --git a/src/common/libsdexec/property.c b/src/common/libsdexec/property.c index 82ffdcac3b2a..cfa08e919b00 100644 --- a/src/common/libsdexec/property.c +++ b/src/common/libsdexec/property.c @@ -32,17 +32,20 @@ static const char *serv_interface = "org.freedesktop.systemd1.Service"; static const char *prop_interface = "org.freedesktop.DBus.Properties"; flux_future_t *sdexec_property_get_all (flux_t *h, + const char *service, uint32_t rank, const char *path) { flux_future_t *f; + char topic[256]; - if (!h || !path) { + if (!h || !service || !path) { errno = EINVAL; return NULL; } + snprintf (topic, sizeof (topic), "%s.call", service); if (!(f = flux_rpc_pack (h, - "sdbus.call", + topic, rank, 0, "{s:s s:s s:s s:[s]}", @@ -55,16 +58,19 @@ flux_future_t *sdexec_property_get_all (flux_t *h, } flux_future_t *sdexec_property_get (flux_t *h, + const char *service, uint32_t rank, const char *path, const char *name) { flux_future_t *f; + char topic[256]; - if (!h || !path || !name) { + if (!h || !service || !path || !name) { errno = EINVAL; return NULL; } + snprintf (topic, sizeof (topic), "%s.call", service); if (!(f = flux_rpc_pack (h, "sdbus.call", rank, @@ -79,13 +85,15 @@ flux_future_t *sdexec_property_get (flux_t *h, } flux_future_t *sdexec_property_changed (flux_t *h, + const char *service, uint32_t rank, const char *path) { flux_future_t *f; json_t *o; + char topic[256]; - if (!h) { + if (!h || !service) { errno = EINVAL; return NULL; } @@ -100,8 +108,9 @@ flux_future_t *sdexec_property_changed (flux_t *h, goto nomem; } } + snprintf (topic, sizeof (topic), "%s.subscribe", service); if (!(f = flux_rpc_pack (h, - "sdbus.subscribe", + topic, rank, FLUX_RPC_STREAMING, "O", o))) diff --git a/src/common/libsdexec/property.h b/src/common/libsdexec/property.h index 631859199ba4..46780c070521 100644 --- a/src/common/libsdexec/property.h +++ b/src/common/libsdexec/property.h @@ -18,6 +18,7 @@ j************************************************************/ * Parse the returned value with sdexec_property_get_unpack(). */ flux_future_t *sdexec_property_get (flux_t *h, + const char *service, uint32_t rank, const char *path, const char *name); @@ -28,6 +29,7 @@ int sdexec_property_get_unpack (flux_future_t *f, const char *fmt, ...); * which can be further parsed with sdexec_property_dict_unpack(). */ flux_future_t *sdexec_property_get_all (flux_t *h, + const char *service, uint32_t rank, const char *path); json_t *sdexec_property_get_all_dict (flux_future_t *f); @@ -40,6 +42,7 @@ json_t *sdexec_property_get_all_dict (flux_future_t *f); * sdexec_property_changed_path() to get the path for each response. */ flux_future_t *sdexec_property_changed (flux_t *h, + const char *service, uint32_t rank, const char *path); json_t *sdexec_property_changed_dict (flux_future_t *f); diff --git a/src/common/libsdexec/test/list.c b/src/common/libsdexec/test/list.c index 3653a8e39657..d6d4962b5cc4 100644 --- a/src/common/libsdexec/test/list.c +++ b/src/common/libsdexec/test/list.c @@ -31,10 +31,13 @@ void test_inval (void) BAIL_OUT ("could not create future for testing"); errno = 0; - ok (sdexec_list_units (NULL, 0, "*") == NULL && errno == EINVAL, + ok (sdexec_list_units (NULL, "sdexec", 0, "*") == NULL && errno == EINVAL, "sdexec_list_units h=NULL fails with EINVAL"); errno = 0; - ok (sdexec_list_units (h, 0, NULL) == NULL && errno == EINVAL, + ok (sdexec_list_units (h, NULL, 0, "*") == NULL && errno == EINVAL, + "sdexec_list_units service=NULL fails with EINVAL"); + errno = 0; + ok (sdexec_list_units (h, "sdexec", 0, NULL) == NULL && errno == EINVAL, "sdexec_list_units pattern=NULL fails with EINVAL"); ok (sdexec_list_units_next (NULL, &info) == false, diff --git a/src/common/libsdexec/test/property.c b/src/common/libsdexec/test/property.c index e928ccbc199d..4934b2925830 100644 --- a/src/common/libsdexec/test/property.c +++ b/src/common/libsdexec/test/property.c @@ -52,13 +52,20 @@ void test_inval (void) BAIL_OUT ("could not create property dict for testing"); errno = 0; - ok (sdexec_property_get (NULL, 0, "foo", "bar") == NULL && errno == EINVAL, + ok (sdexec_property_get (NULL, "sdexec", 0, "foo", "bar") == NULL + && errno == EINVAL, "sdexec_property_get h=NULL fails with EINVAL"); errno = 0; - ok (sdexec_property_get (h, 0, NULL, "bar") == NULL && errno == EINVAL, + ok (sdexec_property_get (h, NULL, 0, "foo", "bar") == NULL + && errno == EINVAL, + "sdexec_property_get service=NULL fails with EINVAL"); + errno = 0; + ok (sdexec_property_get (h, "sdexec", 0, NULL, "bar") == NULL + && errno == EINVAL, "sdexec_property_get path=NULL fails with EINVAL"); errno = 0; - ok (sdexec_property_get (h, 0, "foo", NULL) == NULL && errno == EINVAL, + ok (sdexec_property_get (h, "sdexec", 0, "foo", NULL) == NULL + && errno == EINVAL, "sdexec_property_get name=NULL fails with EINVAL"); errno = 0; @@ -69,10 +76,16 @@ void test_inval (void) "sdexec_property_get_unpack fmt=NULL fails with EINVAL"); errno = 0; - ok (sdexec_property_get_all (NULL, 0, "foo") == NULL && errno == EINVAL, + ok (sdexec_property_get_all (NULL, "sdexec", 0, "foo") == NULL + && errno == EINVAL, "sdexec_property_get_all h=NULL fails with EINVAL"); errno = 0; - ok (sdexec_property_get_all (h, 0, NULL) == NULL && errno == EINVAL, + ok (sdexec_property_get_all (h, NULL, 0, "foo") == NULL + && errno == EINVAL, + "sdexec_property_get_all service=NULL fails with EINVAL"); + errno = 0; + ok (sdexec_property_get_all (h, "sdexec", 0, NULL) == NULL + && errno == EINVAL, "sdexec_property_get_all path=NULL fails with EINVAL"); errno = 0; @@ -80,7 +93,8 @@ void test_inval (void) "sdexec_property_get_all_dict f=NULL fails with EINVAL"); errno = 0; - ok (sdexec_property_changed (NULL, 0, "foo") == NULL && errno == EINVAL, + ok (sdexec_property_changed (NULL, "sdexec", 0, "foo") == NULL + && errno == EINVAL, "sdexec_property_changed h=NULL fails with EINVAL"); errno = 0; diff --git a/src/modules/Makefile.am b/src/modules/Makefile.am index 14197ef31716..53ad5358642e 100644 --- a/src/modules/Makefile.am +++ b/src/modules/Makefile.am @@ -44,6 +44,7 @@ fluxmod_LTLIBRARIES = \ kvs-watch.la \ resource.la \ sched-simple.la \ + sdmon.la \ sdexec.la \ sdbus.la @@ -268,6 +269,19 @@ sched_simple_la_LIBADD = \ $(JANSSON_LIBS) sched_simple_la_LDFLAGS = $(fluxmod_ldflags) -module +sdmon_la_SOURCES = \ + sdmon/sdmon.c +sdmon_la_CPPFLAGS = \ + $(AM_CPPFLAGS) \ + $(LIBUUID_CFLAGS) +sdmon_la_LIBADD = \ + $(top_builddir)/src/common/libsdexec/libsdexec.la \ + $(top_builddir)/src/common/libflux-core.la \ + $(top_builddir)/src/common/libflux-internal.la \ + $(LIBUUID_LIBS) \ + $(JANSSON_LIBS) +sdmon_la_LDFLAGS = $(fluxmod_ldflags) -module + sdexec_la_SOURCES = \ sdexec/sdexec.c sdexec_la_CPPFLAGS = \ diff --git a/src/modules/resource/monitor.c b/src/modules/resource/monitor.c index 35459c7bde0a..953ce5f124bf 100644 --- a/src/modules/resource/monitor.c +++ b/src/modules/resource/monitor.c @@ -17,6 +17,13 @@ * the initial response to the request to watch broker.online cannot * be processed until the reactor runs. * + * If systemd is enabled, watch sdmon.online instead of broker.online. This + * behaves exactly like broker.online, except that it isn't joined until + * sdmon has verified that the node has no running flux systemd units. + * This guards against scheduling new work on a node that hasn't been + * properly cleaned up. As with broker.online, nodes are automatically + * removed from the group when they are shut down or lost. + * * Some synchronization notes: * - rc1 completes on rank 0 before any other ranks can join broker.online, * therefore the scheduler must allow flux module load to complete with @@ -401,7 +408,7 @@ void monitor_destroy (struct monitor *monitor) struct monitor *monitor_create (struct resource_ctx *ctx, int inventory_size, - bool monitor_force_up) + struct resource_config *config) { struct monitor *monitor; @@ -439,12 +446,15 @@ struct monitor *monitor_create (struct resource_ctx *ctx, || !(monitor->torpid = idset_create (monitor->size, 0)) || !(monitor->lost = idset_create (monitor->size, 0))) goto error; - if (monitor_force_up) { + if (config->monitor_force_up) { if (idset_range_set (monitor->up, 0, monitor->size - 1) < 0) goto error; } else if (!flux_attr_get (ctx->h, "broker.recovery-mode")) { - if (!(monitor->f_online = group_monitor (ctx->h, "broker.online")) + const char *online_group = "broker.online"; + if (config->systemd_enable) + online_group = "sdmon.online"; + if (!(monitor->f_online = group_monitor (ctx->h, online_group)) || flux_future_then (monitor->f_online, -1, broker_online_cb, @@ -466,7 +476,6 @@ struct monitor *monitor_create (struct resource_ctx *ctx, return NULL; } - /* * vi:tabstop=4 shiftwidth=4 expandtab */ diff --git a/src/modules/resource/monitor.h b/src/modules/resource/monitor.h index c107e009d697..aeecead39f20 100644 --- a/src/modules/resource/monitor.h +++ b/src/modules/resource/monitor.h @@ -13,7 +13,7 @@ struct monitor *monitor_create (struct resource_ctx *ctx, int inventory_size, - bool monitor_force_up); + struct resource_config *config); void monitor_destroy (struct monitor *monitor); const struct idset *monitor_get_down (struct monitor *monitor); diff --git a/src/modules/resource/resource.c b/src/modules/resource/resource.c index 06b64adeac88..10398f7a8ed4 100644 --- a/src/modules/resource/resource.c +++ b/src/modules/resource/resource.c @@ -151,6 +151,15 @@ static int parse_config (struct resource_ctx *ctx, return -1; } } + /* Check systemd.enable so we know whether sdmon.online will be populated. + * Configuration errors in [systemd] are handled elsewhere. + */ + int systemd_enable = 0; + (void)flux_conf_unpack (conf, + NULL, + "{s?{s?b}}", + "systemd", + "enable", &systemd_enable); if (rconfig) { rconfig->journal_max = journal_max; rconfig->exclude_idset = exclude; @@ -159,6 +168,7 @@ static int parse_config (struct resource_ctx *ctx, rconfig->no_update_watch = no_update_watch ? true : false; rconfig->rediscover = rediscover ? true : false; rconfig->R = o; + rconfig->systemd_enable = systemd_enable ? true : false; } else json_decref (o); @@ -402,7 +412,7 @@ int mod_main (flux_t *h, int argc, char **argv) goto error; if (!(ctx->monitor = monitor_create (ctx, inventory_get_size (ctx->inventory), - config.monitor_force_up))) + &config))) goto error; if (!(ctx->status = status_create (ctx))) goto error; diff --git a/src/modules/resource/resource.h b/src/modules/resource/resource.h index 4e1693ed6e84..9c3d02a1c254 100644 --- a/src/modules/resource/resource.h +++ b/src/modules/resource/resource.h @@ -20,6 +20,7 @@ struct resource_config { bool no_update_watch; bool monitor_force_up; int journal_max; + bool systemd_enable; // systemd.enable, not under [resource] }; struct resource_ctx { diff --git a/src/modules/sdbus/connect.c b/src/modules/sdbus/connect.c index 6665551c1bf2..c5aa69543e0c 100644 --- a/src/modules/sdbus/connect.c +++ b/src/modules/sdbus/connect.c @@ -14,7 +14,10 @@ #if HAVE_CONFIG_H #include "config.h" #endif - +#include +#ifndef HAVE_STRLCPY +#include "src/common/libmissing/strlcpy.h" +#endif #include #include @@ -26,6 +29,7 @@ struct sdconnect { double retry_min; double retry_max; bool first_time; + bool system_bus; }; static void sdconnect_destroy (struct sdconnect *sdc) @@ -58,6 +62,28 @@ static void bus_destroy (sd_bus *bus) } } +static void make_system_bus_path (char *buf, size_t size) +{ + char *path; + + if ((path = getenv ("DBUS_SYSTEM_BUS_ADDRESS"))) + strlcpy (buf, path, size); + else + strlcpy (buf, "sd_bus_open_system", size); +} + +static void make_user_bus_path (char *buf, size_t size) +{ + char *path; + + if ((path = getenv ("DBUS_SESSION_BUS_ADDRESS"))) + strlcpy (buf, path, size); + else if ((path = getenv ("XDG_RUNTIME_DIR"))) + snprintf (buf, size, "unix:path:%s/bus", path); + else + strlcpy (buf, "sd_bus_open_user", size); +} + /* The timer callback calls sd_bus_open_user(). If it succeeds, the future * is fulfilled. If it fails, the timer is re-armed for a calculated timeout. * Retries proceed forever. If they need to be capped, this can be done by @@ -73,23 +99,22 @@ static void timer_cb (flux_reactor_t *r, sd_bus *bus; int e; double timeout; + char path[1024]; sdc->attempt++; timeout = sdc->retry_min * sdc->attempt; if (timeout > sdc->retry_max) timeout = sdc->retry_max; - if ((e = sd_bus_open_user (&bus)) < 0) { - char buf[1024]; - const char *path = getenv ("DBUS_SESSION_BUS_ADDRESS"); - if (!path) { - if ((path = getenv ("XDG_RUNTIME_DIR"))) { - snprintf (buf, sizeof (buf), "unix:path:%s/bus", path); - path = buf; - } - } - if (!path) - path = "sd_bus_open_user"; + if (sdc->system_bus) { + make_system_bus_path (path, sizeof (path)); + e = sd_bus_open_system (&bus); + } + else { + make_user_bus_path (path, sizeof (path)); + e = sd_bus_open_user (&bus); + } + if (e < 0) { flux_log (sdc->h, LOG_INFO, "%s: %s (retrying in %.0fs)", @@ -98,7 +123,7 @@ static void timer_cb (flux_reactor_t *r, timeout); goto retry; } - flux_log (sdc->h, LOG_INFO, "connected"); + flux_log (sdc->h, LOG_INFO, "%s: connected", path); flux_future_fulfill (f, bus, (flux_free_f)bus_destroy); sdc->attempt = 0; return; @@ -141,7 +166,8 @@ static void initialize_cb (flux_future_t *f, void *arg) flux_future_t *sdbus_connect (flux_t *h, bool first_time, double retry_min, - double retry_max) + double retry_max, + bool system_bus) { flux_future_t *f; struct sdconnect *sdc = NULL; @@ -159,6 +185,7 @@ flux_future_t *sdbus_connect (flux_t *h, sdc->retry_min = retry_min; sdc->retry_max = retry_max; sdc->first_time = first_time; + sdc->system_bus = system_bus; flux_future_set_flux (f, h); return f; error: diff --git a/src/modules/sdbus/connect.h b/src/modules/sdbus/connect.h index faa9a00fcf60..a40999c973a9 100644 --- a/src/modules/sdbus/connect.h +++ b/src/modules/sdbus/connect.h @@ -27,7 +27,8 @@ flux_future_t *sdbus_connect (flux_t *h, bool first_time, double retry_min, - double retry_max); + double retry_max, + bool system_bus); #endif /* !_SDBUS_CONNECT_H */ diff --git a/src/modules/sdbus/main.c b/src/modules/sdbus/main.c index 9c51acb35286..370108da7068 100644 --- a/src/modules/sdbus/main.c +++ b/src/modules/sdbus/main.c @@ -33,7 +33,7 @@ int mod_main (flux_t *h, int argc, char **argv) flux_error_t error; int rc = -1; - if (!(ctx = sdbus_ctx_create (h, &error))) { + if (!(ctx = sdbus_ctx_create (h, argc, argv, &error))) { flux_log (h, LOG_ERR, "%s", error.text); goto error; } @@ -52,6 +52,4 @@ int mod_main (flux_t *h, int argc, char **argv) #endif } -MOD_NAME ("sdbus"); - // vi:ts=4 sw=4 expandtab diff --git a/src/modules/sdbus/sdbus.c b/src/modules/sdbus/sdbus.c index 6fd678c4718e..6a4e532b1124 100644 --- a/src/modules/sdbus/sdbus.c +++ b/src/modules/sdbus/sdbus.c @@ -31,6 +31,7 @@ #include "sdbus.h" struct sdbus_ctx { + bool system_bus; // connect to system bus instead of user bus flux_future_t *f_conn; // owns ctx->bus sd_bus *bus; flux_watcher_t *bus_w; @@ -560,32 +561,32 @@ static void reload_cb (flux_t *h, static struct flux_msg_handler_spec htab[] = { { FLUX_MSGTYPE_REQUEST, - "sdbus.disconnect", + "disconnect", disconnect_cb, 0 }, { FLUX_MSGTYPE_REQUEST, - "sdbus.call", + "call", call_cb, 0 }, { FLUX_MSGTYPE_REQUEST, - "sdbus.subscribe", + "subscribe", subscribe_cb, 0 }, { FLUX_MSGTYPE_REQUEST, - "sdbus.subscribe-cancel", + "subscribe-cancel", subscribe_cancel_cb, 0 }, { FLUX_MSGTYPE_REQUEST, - "sdbus.reconnect", + "reconnect", reconnect_cb, 0 }, { FLUX_MSGTYPE_REQUEST, - "sdbus.config-reload", + "config-reload", reload_cb, 0 }, @@ -722,13 +723,33 @@ static void sdbus_recover (struct sdbus_ctx *ctx, const char *reason) * libsystemd complaining about unexpected internal states(?) and the * occasional segfault. */ - if (!(ctx->f_conn = sdbus_connect (ctx->h, false, retry_min, retry_max)) + if (!(ctx->f_conn = sdbus_connect (ctx->h, + false, + retry_min, + retry_max, + ctx->system_bus)) || flux_future_then (ctx->f_conn, -1, connect_continuation, ctx) < 0) { flux_log_error (ctx->h, "error starting bus connect"); flux_reactor_stop_error (flux_get_reactor (ctx->h)); } } +static int parse_module_args (struct sdbus_ctx *ctx, + int argc, + char **argv, + flux_error_t *error) +{ + for (int i = 0; i < argc; i++) { + if (streq (argv[i], "system")) + ctx->system_bus = true; + else { + errprintf (error, "unknown module option: %s", argv[i]); + return -1; + } + } + return 0; +} + void sdbus_ctx_destroy (struct sdbus_ctx *ctx) { if (ctx) { @@ -755,17 +776,27 @@ void sdbus_ctx_destroy (struct sdbus_ctx *ctx) } } -struct sdbus_ctx *sdbus_ctx_create (flux_t *h, flux_error_t *error) +struct sdbus_ctx *sdbus_ctx_create (flux_t *h, + int argc, + char **argv, + flux_error_t *error) { struct sdbus_ctx *ctx; + const char *name = flux_aux_get (h, "flux::name"); if (!(ctx = calloc (1, sizeof (*ctx)))) goto error_create; + if (parse_module_args (ctx, argc, argv, error) < 0) + goto error; if (sdbus_configure (ctx, flux_get_conf (h), error) < 0) goto error; - if (!(ctx->f_conn = sdbus_connect (h, true, retry_min, retry_max)) + if (!(ctx->f_conn = sdbus_connect (h, + true, + retry_min, + retry_max, + ctx->system_bus)) || flux_future_then (ctx->f_conn, -1, connect_continuation, ctx) < 0 - || flux_msg_handler_addvec (h, htab, ctx, &ctx->handlers) < 0 + || flux_msg_handler_addvec_ex (h, name, htab, ctx, &ctx->handlers) < 0 || !(ctx->requests = flux_msglist_create ()) || !(ctx->subscribers = flux_msglist_create ()) || flux_get_rank (h, &ctx->rank) < 0) diff --git a/src/modules/sdbus/sdbus.h b/src/modules/sdbus/sdbus.h index 3252b4908006..b1763719d234 100644 --- a/src/modules/sdbus/sdbus.h +++ b/src/modules/sdbus/sdbus.h @@ -13,7 +13,10 @@ #include -struct sdbus_ctx *sdbus_ctx_create (flux_t *h, flux_error_t *error); +struct sdbus_ctx *sdbus_ctx_create (flux_t *h, + int argc, + char **argv, + flux_error_t *error); void sdbus_ctx_destroy (struct sdbus_ctx *ctx); #endif /* !_SDBUS_SDBUS_H */ diff --git a/src/modules/sdbus/subscribe.c b/src/modules/sdbus/subscribe.c index f8e8698b4c5a..e3ada43e84f0 100644 --- a/src/modules/sdbus/subscribe.c +++ b/src/modules/sdbus/subscribe.c @@ -27,13 +27,16 @@ static void subscribe_continuation (flux_future_t *f1, void *arg) flux_t *h = flux_future_get_flux (f1); const char *errmsg = NULL; flux_future_t *f2; + const char *name = flux_aux_get (h, "flux::name"); + char topic[128]; if (flux_rpc_get (f1, NULL) < 0) { errmsg = future_strerror (f1, errno); goto error; } + snprintf (topic, sizeof (topic), "%s.call", name); if (!(f2 = flux_rpc_pack (h, - "sdbus.call", + topic, FLUX_NODEID_ANY, 0, "{s:s s:s s:s s:s s:[s]}", @@ -58,9 +61,12 @@ flux_future_t *sdbus_subscribe (flux_t *h) { flux_future_t *f1; flux_future_t *fc; + const char *name = flux_aux_get (h, "flux::name"); + char topic[128]; + snprintf (topic, sizeof (topic), "%s.call", name); if (!(f1 = flux_rpc_pack (h, - "sdbus.call", + topic, FLUX_NODEID_ANY, 0, "{s:s s:[]}", diff --git a/src/modules/sdexec/sdexec.c b/src/modules/sdexec/sdexec.c index 38faa53b8988..092a6d2b5462 100644 --- a/src/modules/sdexec/sdexec.c +++ b/src/modules/sdexec/sdexec.c @@ -837,6 +837,7 @@ static void exec_cb (flux_t *h, proc->msg = msg; sdexec_log_debug (h, "watch %s", sdexec_unit_name (proc->unit)); if (!(proc->f_watch = sdexec_property_changed (h, + "sdbus", ctx->rank, sdexec_unit_path (proc->unit))) || flux_future_then (proc->f_watch, @@ -1146,7 +1147,7 @@ static void stats_cb (flux_t *h, * units that were started by that UUID a SIGKILL to begin cleanup. Leave * the request in ctx->requests so the unit can be "reaped". Let normal * cleanup of the request (including generating a response which shouldn't - * hurt) to occur when that happens. + * hurt) occur when that happens. */ static void disconnect_cb (flux_t *h, flux_msg_handler_t *mh, diff --git a/src/modules/sdmon/sdmon.c b/src/modules/sdmon/sdmon.c new file mode 100644 index 000000000000..cf38451af0e0 --- /dev/null +++ b/src/modules/sdmon/sdmon.c @@ -0,0 +1,432 @@ +/************************************************************\ + * Copyright 2025 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +/* sdmon.c - create and maintain a list of running flux systemd units + * + * This monitors two instances of systemd: + * - the user one, running as user flux (where jobs are run) + * - the system one (where housekeeping, prolog, epilog run) + * + * A list of units matching flux unit globs is requested at initialization, + * and a subscription to property updates on those globs is obtained. + * After the initial list, monitoring is driven solely by property updates. + * + * Join the sdmon.online broker group once the unit list responses have been + * received and there are no Flux units running on the node. This lets the + * resource module on rank 0 hold back nodes that require cleanup from the + * scheduler. + */ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include +#include "ccan/str/str.h" +#include "src/common/libutil/errprintf.h" +#include "src/common/libutil/errno_safe.h" +#include "src/common/libutil/basename.h" +#include "src/common/libczmqcontainers/czmq_containers.h" +#include "src/common/libsdexec/list.h" +#include "src/common/libsdexec/property.h" +#include "src/common/libsdexec/unit.h" +#include "src/common/libsdexec/state.h" + +struct sdmon_bus { + flux_future_t *fp; // SERVICE.subscribe + flux_future_t *fl; // SERVICE.call ListUnitsByPattern + bool unmute_property_updates; // set true after list response is received + const char *service; // sdbus or sdbus-sys + const char *unit_glob; +}; + +struct sdmon_ctx { + flux_t *h; + uint32_t rank; + flux_msg_handler_t **handlers; + struct sdmon_bus sys; + struct sdmon_bus usr; + zhashx_t *units; // unit name => (struct unit *) + bool group_joined; + bool cleanup_needed; + flux_future_t *fg; +}; + +static const char *path_prefix = "/org/freedesktop/systemd1/unit"; + +static const char *sys_glob = "flux-*"; +static const char *usr_glob = "*shell-*"; // match with and without imp- prefix + +static const char *group_name = "sdmon.online"; + +/* Process a group response. This is very unlikely to fail but if it does, + * make sure we get a log message. + */ +static void sdmon_join_continuation (flux_future_t *f, void *arg) +{ + struct sdmon_ctx *ctx = arg; + if (flux_future_get (f, NULL) < 0) { + flux_log (ctx->h, + LOG_ERR, + "groups.join request failed: %s", + future_strerror (f, errno)); + } +} + +/* Send a broker groups.join request IFF: + * - we haven't joined yet + * - both busses have their initial list responses (prop updates unmuted) + * - the unit hash is empty + */ +static void sdmon_group_join_if_ready (struct sdmon_ctx *ctx) +{ + if (ctx->group_joined + || !ctx->sys.unmute_property_updates + || !ctx->usr.unmute_property_updates + || zhashx_size (ctx->units) > 0) + return; + + // unit(s) needing cleanup were logged, so indicate they are resolved now. + ctx->group_joined = true; + if (ctx->cleanup_needed) + flux_log (ctx->h, LOG_ERR, "cleanup complete - resources are online"); + + flux_future_destroy (ctx->fg); + if (!(ctx->fg = flux_rpc_pack (ctx->h, + "groups.join", + ctx->rank, + 0, + "{s:s}", + "name", group_name)) + || flux_future_then (ctx->fg, -1, sdmon_join_continuation, ctx) < 0) + flux_log_error (ctx->h, "error sending groups.join request"); +} + +/* List the units that sdmon thinks are running and their state.substate. + */ +static void sdmon_stats_cb (flux_t *h, + flux_msg_handler_t *mh, + const flux_msg_t *msg, + void *arg) +{ + struct sdmon_ctx *ctx = arg; + json_t *units; + struct unit *unit; + + if (!(units = json_array ())) + goto error; + unit = zhashx_first (ctx->units); + while (unit) { + json_t *o; + char state[64]; + + snprintf (state, + sizeof (state), + "%s.%s", + sdexec_statetostr (sdexec_unit_state (unit)), + sdexec_substatetostr (sdexec_unit_substate (unit))); + + if (!(o = json_pack ("{s:s s:s}", + "name", sdexec_unit_name (unit), + "state", state)) + || json_array_append_new (units, o) < 0) { + json_decref (o); + errno = ENOMEM; + goto error; + } + unit = zhashx_next (ctx->units); + } + if (flux_respond_pack (h, msg, "{s:O}", "units", units) < 0) + flux_log_error (h, "error responding to stats-get request"); + json_decref (units); + return; +error: + if (flux_respond_error (h, msg, errno, NULL) < 0) + flux_log_error (h, "error responding to stats-get request"); + json_decref (units); +} + +// zhashx_destructor_fn footprint +static void sdmon_unit_destructor (void **item) +{ + if (*item) { + sdexec_unit_destroy (*item); + *item = NULL; + } +} + +/* Determine if a unit is considered "running" for purposes of this module. + */ +static bool sdmon_unit_is_running (struct unit *unit) +{ + bool running = false; + + switch (sdexec_unit_state (unit)) { + case STATE_ACTIVATING: + case STATE_ACTIVE: + case STATE_DEACTIVATING: + running = true; + break; + case STATE_UNKNOWN: + case STATE_INACTIVE: + case STATE_FAILED: + break; + } + return running; +} + +/* A unit matching a subscribed-to glob (on either bus) has changed properties. + * If it's a new, running unit, add it to the units hash. + * If it's a known unit that is no longer running, remove it. + * Join the group if the unit hash transitions to empty. + */ +static void sdmon_property_continuation (flux_future_t *f, void *arg) +{ + struct sdmon_ctx *ctx = arg; + struct sdmon_bus *bus = f == ctx->usr.fp ? &ctx->usr : &ctx->sys; + const char *path; + const char *name; + json_t *dict; + struct unit *unit; + bool unit_is_new = false; + + + if (!(path = sdexec_property_changed_path (f)) + || (!(dict = sdexec_property_changed_dict (f)))) { + flux_log (ctx->h, + LOG_ERR, + "%s.subscribe: %s", + bus->service, + future_strerror (f, errno)); + goto fatal; + } + if (!bus->unmute_property_updates) + goto done; + name = basename_simple (path); + if (!(unit = zhashx_lookup (ctx->units, name))) { + if (!(unit = sdexec_unit_create (name))) { + flux_log_error (ctx->h, "error creating unit %s", name); + goto done; + } + unit_is_new = true; + } + if (!sdexec_unit_update (unit, dict) && !unit_is_new) + goto done; // nothing changed + + if (sdmon_unit_is_running (unit)) { + if (unit_is_new) { + if (zhashx_insert (ctx->units, name, unit) < 0) { + flux_log (ctx->h, LOG_ERR, "error tracking unit %s", name); + sdexec_unit_destroy (unit); + goto done; + } + } + } + else { + if (unit_is_new) + sdexec_unit_destroy (unit); + else + zhashx_delete (ctx->units, name); + } + sdmon_group_join_if_ready (ctx); +done: + flux_future_reset (f); + return; +fatal: + flux_reactor_stop_error (flux_get_reactor (ctx->h)); +} + +/* Process the initial list of units that match our glob (on either bus). + * Add any running units to the unit hash, then unmute property updates. + * Join the group if the unit hash is empty after that. + */ +static void sdmon_list_continuation (flux_future_t *f, void *arg) +{ + struct sdmon_ctx *ctx = arg; + struct sdmon_bus *bus = f == ctx->usr.fl ? &ctx->usr : &ctx->sys; + struct unit_info info; + + if (flux_future_get (f, NULL) < 0) { + flux_log (ctx->h, + LOG_ERR, + "%s.call: %s", + bus->service, + future_strerror (f, errno)); + goto fatal; + } + + while (sdexec_list_units_next (f, &info)) { + struct unit *unit; + + if (!(unit = sdexec_unit_create (info.name))) { + flux_log_error (ctx->h, "error creating unit %s", info.name); + continue; + } + (void)sdexec_unit_update_frominfo (unit, &info); + if (sdmon_unit_is_running (unit)) { + flux_log (ctx->h, + LOG_ERR, + "%s needs cleanup - resources are offline", + info.name); + ctx->cleanup_needed = true; + if (zhashx_insert (ctx->units, info.name, unit) < 0) { + flux_log_error (ctx->h, "error tracking unit %s", info.name); + sdexec_unit_destroy (unit); + continue; + } + } + } + bus->unmute_property_updates = true; + sdmon_group_join_if_ready (ctx); + return; +fatal: + flux_reactor_stop_error (flux_get_reactor (ctx->h)); +} + +/* Check if the sdbus module is loaded on the local rank by pinging its + * stats-get method. N.B. sdbus handles its D-bus connect asynchronously + * so stats-get should be responsive even if D-Bus is not. + */ +static int sdbus_is_loaded (flux_t *h, + const char *service, + uint32_t rank, + flux_error_t *error) +{ + flux_future_t *f; + char topic[256]; + + snprintf (topic, sizeof (topic), "%s.stats-get", service); + if (!(f = flux_rpc (h, topic, NULL, rank, 0)) + || flux_rpc_get (f, NULL) < 0) { + if (errno == ENOSYS) + errprintf (error, "%s module is not loaded", service); + else + errprintf (error, "%s: %s", service, future_strerror (f, errno)); + flux_future_destroy (f); + return -1; + } + flux_future_destroy (f); + return 0; +} + +static void sdmon_bus_finalize (struct sdmon_bus *bus) +{ + flux_future_destroy (bus->fp); + flux_future_destroy (bus->fl); +} + +/* Send sdbus.subscribe and sdbus.call (ListUnitsByPatterns). + * N.B. The subscribe request must be sent before the list request to avoid + * missing property updates that immediately follow the list response. + * Set 'bus->unmute_property_updates' after the list response is received. + * Any property updates received before that are ignored. +*/ +static int sdmon_bus_init (struct sdmon_bus *bus, + struct sdmon_ctx *ctx, + const char *service, + const char *pattern, + flux_error_t *error) +{ + flux_future_t *fp = NULL; + flux_future_t *fl = NULL; + char path[256]; + + if (sdbus_is_loaded (ctx->h, service, ctx->rank, error) < 0) + return -1; + snprintf (path, sizeof (path), "%s/%s", path_prefix, pattern); + if (!(fp = sdexec_property_changed (ctx->h, service, ctx->rank, path)) + || flux_future_then (fp, -1, sdmon_property_continuation, ctx) < 0) { + errprintf (error, "%s.subscribe: %s", service, strerror (errno)); + goto error; + } + if (!(fl = sdexec_list_units (ctx->h, service, ctx->rank, pattern)) + || flux_future_then (fl, -1, sdmon_list_continuation, ctx) < 0) { + errprintf (error, "%s.call: %s", service, strerror (errno)); + goto error; + } + bus->service = service; + bus->fp = fp; + bus->fl = fl; + return 0; +error: + flux_future_destroy (fp); + flux_future_destroy (fl); + return -1; +} + +static void sdmon_ctx_destroy (struct sdmon_ctx *ctx) +{ + if (ctx) { + int saved_errno = errno; + sdmon_bus_finalize (&ctx->sys); + sdmon_bus_finalize (&ctx->usr); + flux_future_destroy (ctx->fg); + flux_msg_handler_delvec (ctx->handlers); + zhashx_destroy (&ctx->units); + free (ctx); + errno = saved_errno; + } +} + +static struct sdmon_ctx *sdmon_ctx_create (flux_t *h) +{ + struct sdmon_ctx *ctx; + + if (!(ctx = calloc (1, sizeof (*ctx)))) + return NULL; + if (flux_get_rank (h, &ctx->rank) < 0) + goto error; + if (!(ctx->units = zhashx_new ())) { + errno = ENOMEM; + goto error; + } + zhashx_set_destructor (ctx->units, sdmon_unit_destructor); + ctx->h = h; + return ctx; +error: + sdmon_ctx_destroy (ctx); + return NULL; +} + +static struct flux_msg_handler_spec htab[] = { + { FLUX_MSGTYPE_REQUEST, + "stats-get", + sdmon_stats_cb, + 0 + }, + FLUX_MSGHANDLER_TABLE_END +}; + +int mod_main (flux_t *h, int argc, char **argv) +{ + struct sdmon_ctx *ctx; + flux_error_t error; + const char *modname = flux_aux_get (h, "flux::name"); + int rc = -1; + + if (!(ctx = sdmon_ctx_create (h))) + goto error; + if (flux_msg_handler_addvec_ex (h, modname, htab, ctx, &ctx->handlers) < 0) + goto error; + if (sdmon_bus_init (&ctx->sys, ctx, "sdbus-sys", sys_glob, &error) < 0) + goto error; + if (sdmon_bus_init (&ctx->usr, ctx, "sdbus", usr_glob, &error) < 0) + goto error; + if (flux_reactor_run (flux_get_reactor (h), 0) < 0) { + flux_log_error (h, "reactor exited abnormally"); + goto error; + } + rc = 0; +error: + sdmon_ctx_destroy (ctx); + return rc; +} + +// vi:ts=4 sw=4 expandtab diff --git a/t/Makefile.am b/t/Makefile.am index 442bf928018f..7288fa860b98 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -198,6 +198,8 @@ TESTSCRIPTS = \ t2409-sdexec.t \ t2410-sdexec-memlimit.t \ t2411-sdexec-job.t \ + t2412-sdmon.t \ + t2413-sdmon-resource.t \ t2500-job-attach.t \ t2501-job-status.t \ t2600-job-shell-rcalc.t \ diff --git a/t/t2407-sdbus.t b/t/t2407-sdbus.t index 06209850c1a0..c75d3ee7e0e2 100755 --- a/t/t2407-sdbus.t +++ b/t/t2407-sdbus.t @@ -20,7 +20,7 @@ fi test_under_flux 2 minimal -flux setattr log-stderr-level 1 +flux setattr log-stderr-level 3 # # N.B. ListUnitsByPatterns response payload is a 'params' array whose first @@ -127,6 +127,12 @@ test_expect_success 'sdbus reconfig fails with bad sdbus-debug value' ' EOT grep "Expected true or false" config.err ' +test_expect_success 'restore correct config in case we reload later' ' + flux config load <<-EOT + [systemd] + sdbus-debug = true + EOT +' test_expect_success 'sdbus list-units works' ' count=$(bus_count_units "*") && @@ -305,19 +311,31 @@ test_expect_success 'create list script' ' cat >list.py <<-EOT && import sys import flux - print(flux.Flux().rpc("sdbus.call",{"member":"ListUnitsByPatterns","params":[[],["*"]]}).get_str()) + print(flux.Flux().rpc(sys.argv[1] + ".call",{"member":"ListUnitsByPatterns","params":[[],["*"]]}).get_str()) EOT chmod +x list.py ' test_expect_success 'list from rank 0 is allowed' ' - flux python ./list.py >/dev/null + flux python ./list.py sdbus >/dev/null ' test_expect_success 'list from rank 1 is restricted' ' - test_must_fail flux exec -r 1 flux python ./list.py 2>list1.err && + test_must_fail flux exec -r 1 \ + flux python ./list.py sdbus 2>list1.err && grep "not allowed" list1.err ' +test_expect_success 'load sdbus-sys module' ' + flux module load --name sdbus-sys sdbus system +' +test_expect_success 'list system units works' ' + flux python ./list.py sdbus-sys >/dev/null +' +test_expect_success 'remove sdbus-sys module' ' + flux module remove sdbus-sys +' test_expect_success 'remove sdbus module' ' flux module remove sdbus ' + + test_done diff --git a/t/t2412-sdmon.t b/t/t2412-sdmon.t new file mode 100755 index 000000000000..8f02f96fcf90 --- /dev/null +++ b/t/t2412-sdmon.t @@ -0,0 +1,125 @@ +#!/bin/sh +# ci=system + +test_description='Test flux systemd monitoring' + +. $(dirname $0)/sharness.sh + +if ! flux version | grep systemd; then + skip_all="flux was not built with systemd" + test_done +fi +if ! systemctl --user show --property Version; then + skip_all="user systemd is not running" + test_done +fi +if ! busctl --user status >/dev/null; then + skip_all="user dbus is not running" + test_done +fi +if ! busctl status >/dev/null; then + skip_all="system dbus is not running" + test_done +fi + +test_under_flux 1 minimal + +flux setattr log-stderr-level 1 + +# Usage: start test unit NAME (without service suffix) +start_test_unit() { + local sleep=$(which sleep) + flux exec \ + --service sdexec \ + --setopt SDEXEC_NAME="$1.service" \ + $sleep 3600 & +} +# Usage: stop_test_unit NAME (without service suffix) +stop_test_unit() { + systemctl --user stop $1 +} +# Usage: wait_for_none MAXSEC +wait_for_none() { + local retry=$(($1*10)) + while ! flux module stats sdmon | jq -e ".units == []"; do + sleep 0.1 + retry=$(($retry-1)) + test $retry -gt 0 || exit 1 + done +} +# Usage: wait_for_some MAXSEC +wait_for_some() { + local retry=$(($1*10)) + while flux module stats sdmon | jq -e ".units == []"; do + sleep 0.1 + retry=$(($retry-1)) + test $retry -gt 0 || exit 1 + done +} + +groups="flux python ${SHARNESS_TEST_SRCDIR}/scripts/groups.py" + +test_expect_success 'load sdbus,sdexec modules' ' + flux module load --name sdbus-sys sdbus system && + flux module load sdbus && + flux module load sdexec +' +test_expect_success 'load sdmon module' ' + flux module load sdmon +' +test_expect_success 'make sure residual test units are not running' ' + stop_test_unit shell-t2412 || true && + stop_test_unit imp-shell-t2412 || true +' +test_expect_success 'wait for it to join the sdmon.online group' ' + run_timeout 30 $groups waitfor --count=1 sdmon.online +' +test_expect_success 'module stats units array is empty' ' + flux module stats sdmon | jq -e ".units == []" +' +test_expect_success 'run a systemd unit with imp-shell- prefix' ' + start_test_unit imp-shell-t2412 +' +test_expect_success 'wait for module stats to show test unit' ' + wait_for_some 30 +' +test_expect_success 'remove sdmon module' ' + flux module remove sdmon +' +# removing the module triggers a disconnect that causes a group leave +test_expect_success 'wait for it to leave the sdmon.online group' ' + run_timeout 30 $groups waitfor --count=0 sdmon.online +' +test_expect_success 'load sdmon module' ' + flux module load sdmon +' +test_expect_success 'wait for module stats to show test unit' ' + wait_for_some 30 +' +test_expect_success 'stop the unit' ' + stop_test_unit imp-shell-t2412 +' +test_expect_success 'wait for sdmon to join the sdmon.online group' ' + run_timeout 30 $groups waitfor --count=1 sdmon.online +' +test_expect_success 'run a systemd unit with shell- prefix' ' + start_test_unit shell-t2412 +' +test_expect_success 'wait for module stats to show test unit' ' + wait_for_some 30 +' +test_expect_success 'stop the unit' ' + stop_test_unit shell-t2412 +' +test_expect_success 'wait for module stats stop showing test unit' ' + wait_for_none 30 +' +test_expect_success 'remove sdmon module' ' + flux module remove sdmon +' +test_expect_success 'remove sdexec,sdbus modules' ' + flux module remove sdexec && + flux module remove sdbus && + flux module remove sdbus-sys +' +test_done diff --git a/t/t2413-sdmon-resource.t b/t/t2413-sdmon-resource.t new file mode 100755 index 000000000000..10ac59a9915e --- /dev/null +++ b/t/t2413-sdmon-resource.t @@ -0,0 +1,108 @@ +#!/bin/sh +# ci=system + +test_description='Test flux systemd monitoring' + +. $(dirname $0)/sharness.sh + +if ! flux version | grep systemd; then + skip_all="flux was not built with systemd" + test_done +fi +if ! systemctl --user show --property Version; then + skip_all="user systemd is not running" + test_done +fi +if ! busctl --user status >/dev/null; then + skip_all="user dbus is not running" + test_done +fi +if ! busctl status >/dev/null; then + skip_all="system dbus is not running" + test_done +fi + +mkdir -p config +cat >config/config.toml </dev/null || true +' +test_expect_success 'wait for sdmon.online group' ' + run_timeout 30 $groups waitfor --count=1 sdmon.online +' +test_expect_success 'wait for online resource event' ' + run_timeout 30 flux resource eventlog --wait=online +' +test_expect_success 'start a test unit that looks like a job shell' ' + start_test_unit shell-t2413 +' +test_expect_success 'wait for module stats to show test unit' ' + wait_for_some 30 +' +test_expect_success 'clear dmesg, then reload sdmin, resource, sched-simple' ' + flux dmesg -C && + flux module remove sched-simple && + flux module remove resource && + flux module reload sdmon && + flux module load resource && + flux module load sched-simple +' +test_expect_success 'stop test unit' ' + stop_test_unit shell-t2413 +' +test_expect_success 'wait for module stats to show nothing' ' + wait_for_none 30 +' +test_expect_success 'wait for online resource event' ' + run_timeout 30 flux resource eventlog --wait=online +' +test_expect_success 'unit cleanup was logged' ' + flux dmesg -H >dmesg.out && + grep "shell-t2413.service needs cleanup" dmesg.out && + grep "cleanup complete" dmesg.out +' + +test_done