Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

avoid scheduling jobs on compute nodes that are not cleaned up #6616

Merged
merged 14 commits into from
Mar 1, 2025
Merged
2 changes: 2 additions & 0 deletions etc/rc1
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ modload all content
modload all barrier
if test "$(flux config get --default=false systemd.enable)" = "true"; then
modload all sdbus
modload all --name sdbus-sys sdbus system
modload all sdexec
modload all sdmon
fi

if test $RANK -eq 0; then
Expand Down
2 changes: 2 additions & 0 deletions etc/rc3
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ modrm 0 job-manager
modrm all job-ingest

modrm 0 cron
modrm all sdmon
modrm all sdexec
modrm all sdbus-sys
modrm all sdbus
modrm all barrier

Expand Down
11 changes: 8 additions & 3 deletions src/common/libsdexec/list.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,19 @@ bool sdexec_list_units_next (flux_future_t *f, struct unit_info *infop)

/* N.B. Input params: states=[], patterns=[pattern].
*/
flux_future_t *sdexec_list_units (flux_t *h, uint32_t rank, const char *pattern)
flux_future_t *sdexec_list_units (flux_t *h,
const char *service,
uint32_t rank,
const char *pattern)
{
if (!h || !pattern) {
if (!h || !pattern || !service) {
errno = EINVAL;
return NULL;
}
char topic[256];
snprintf (topic, sizeof (topic), "%s.call", service);
return flux_rpc_pack (h,
"sdbus.call",
topic,
rank,
0,
"{s:s s:[[] [s]]}",
Expand Down
1 change: 1 addition & 0 deletions src/common/libsdexec/list.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ struct unit_info {
* (E.g. use "*" for all).
*/
flux_future_t *sdexec_list_units (flux_t *h,
const char *service,
uint32_t rank,
const char *pattern);

Expand Down
19 changes: 14 additions & 5 deletions src/common/libsdexec/property.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,20 @@ static const char *serv_interface = "org.freedesktop.systemd1.Service";
static const char *prop_interface = "org.freedesktop.DBus.Properties";

flux_future_t *sdexec_property_get_all (flux_t *h,
const char *service,
uint32_t rank,
const char *path)
{
flux_future_t *f;
char topic[256];

if (!h || !path) {
if (!h || !service || !path) {
errno = EINVAL;
return NULL;
}
snprintf (topic, sizeof (topic), "%s.call", service);
if (!(f = flux_rpc_pack (h,
"sdbus.call",
topic,
rank,
0,
"{s:s s:s s:s s:[s]}",
Expand All @@ -55,16 +58,19 @@ flux_future_t *sdexec_property_get_all (flux_t *h,
}

flux_future_t *sdexec_property_get (flux_t *h,
const char *service,
uint32_t rank,
const char *path,
const char *name)
{
flux_future_t *f;
char topic[256];

if (!h || !path || !name) {
if (!h || !service || !path || !name) {
errno = EINVAL;
return NULL;
}
snprintf (topic, sizeof (topic), "%s.call", service);
if (!(f = flux_rpc_pack (h,
"sdbus.call",
rank,
Expand All @@ -79,13 +85,15 @@ flux_future_t *sdexec_property_get (flux_t *h,
}

flux_future_t *sdexec_property_changed (flux_t *h,
const char *service,
uint32_t rank,
const char *path)
{
flux_future_t *f;
json_t *o;
char topic[256];

if (!h) {
if (!h || !service) {
errno = EINVAL;
return NULL;
}
Expand All @@ -100,8 +108,9 @@ flux_future_t *sdexec_property_changed (flux_t *h,
goto nomem;
}
}
snprintf (topic, sizeof (topic), "%s.subscribe", service);
if (!(f = flux_rpc_pack (h,
"sdbus.subscribe",
topic,
rank,
FLUX_RPC_STREAMING,
"O", o)))
Expand Down
3 changes: 3 additions & 0 deletions src/common/libsdexec/property.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ j************************************************************/
* Parse the returned value with sdexec_property_get_unpack().
*/
flux_future_t *sdexec_property_get (flux_t *h,
const char *service,
uint32_t rank,
const char *path,
const char *name);
Expand All @@ -28,6 +29,7 @@ int sdexec_property_get_unpack (flux_future_t *f, const char *fmt, ...);
* which can be further parsed with sdexec_property_dict_unpack().
*/
flux_future_t *sdexec_property_get_all (flux_t *h,
const char *service,
uint32_t rank,
const char *path);
json_t *sdexec_property_get_all_dict (flux_future_t *f);
Expand All @@ -40,6 +42,7 @@ json_t *sdexec_property_get_all_dict (flux_future_t *f);
* sdexec_property_changed_path() to get the path for each response.
*/
flux_future_t *sdexec_property_changed (flux_t *h,
const char *service,
uint32_t rank,
const char *path);
json_t *sdexec_property_changed_dict (flux_future_t *f);
Expand Down
7 changes: 5 additions & 2 deletions src/common/libsdexec/test/list.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ void test_inval (void)
BAIL_OUT ("could not create future for testing");

errno = 0;
ok (sdexec_list_units (NULL, 0, "*") == NULL && errno == EINVAL,
ok (sdexec_list_units (NULL, "sdexec", 0, "*") == NULL && errno == EINVAL,
"sdexec_list_units h=NULL fails with EINVAL");
errno = 0;
ok (sdexec_list_units (h, 0, NULL) == NULL && errno == EINVAL,
ok (sdexec_list_units (h, NULL, 0, "*") == NULL && errno == EINVAL,
"sdexec_list_units service=NULL fails with EINVAL");
errno = 0;
ok (sdexec_list_units (h, "sdexec", 0, NULL) == NULL && errno == EINVAL,
"sdexec_list_units pattern=NULL fails with EINVAL");

ok (sdexec_list_units_next (NULL, &info) == false,
Expand Down
26 changes: 20 additions & 6 deletions src/common/libsdexec/test/property.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,20 @@ void test_inval (void)
BAIL_OUT ("could not create property dict for testing");

errno = 0;
ok (sdexec_property_get (NULL, 0, "foo", "bar") == NULL && errno == EINVAL,
ok (sdexec_property_get (NULL, "sdexec", 0, "foo", "bar") == NULL
&& errno == EINVAL,
"sdexec_property_get h=NULL fails with EINVAL");
errno = 0;
ok (sdexec_property_get (h, 0, NULL, "bar") == NULL && errno == EINVAL,
ok (sdexec_property_get (h, NULL, 0, "foo", "bar") == NULL
&& errno == EINVAL,
"sdexec_property_get service=NULL fails with EINVAL");
errno = 0;
ok (sdexec_property_get (h, "sdexec", 0, NULL, "bar") == NULL
&& errno == EINVAL,
"sdexec_property_get path=NULL fails with EINVAL");
errno = 0;
ok (sdexec_property_get (h, 0, "foo", NULL) == NULL && errno == EINVAL,
ok (sdexec_property_get (h, "sdexec", 0, "foo", NULL) == NULL
&& errno == EINVAL,
"sdexec_property_get name=NULL fails with EINVAL");

errno = 0;
Expand All @@ -69,18 +76,25 @@ void test_inval (void)
"sdexec_property_get_unpack fmt=NULL fails with EINVAL");

errno = 0;
ok (sdexec_property_get_all (NULL, 0, "foo") == NULL && errno == EINVAL,
ok (sdexec_property_get_all (NULL, "sdexec", 0, "foo") == NULL
&& errno == EINVAL,
"sdexec_property_get_all h=NULL fails with EINVAL");
errno = 0;
ok (sdexec_property_get_all (h, 0, NULL) == NULL && errno == EINVAL,
ok (sdexec_property_get_all (h, NULL, 0, "foo") == NULL
&& errno == EINVAL,
"sdexec_property_get_all service=NULL fails with EINVAL");
errno = 0;
ok (sdexec_property_get_all (h, "sdexec", 0, NULL) == NULL
&& errno == EINVAL,
"sdexec_property_get_all path=NULL fails with EINVAL");

errno = 0;
ok (sdexec_property_get_all_dict (NULL) == NULL && errno == EINVAL,
"sdexec_property_get_all_dict f=NULL fails with EINVAL");

errno = 0;
ok (sdexec_property_changed (NULL, 0, "foo") == NULL && errno == EINVAL,
ok (sdexec_property_changed (NULL, "sdexec", 0, "foo") == NULL
&& errno == EINVAL,
"sdexec_property_changed h=NULL fails with EINVAL");

errno = 0;
Expand Down
14 changes: 14 additions & 0 deletions src/modules/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ fluxmod_LTLIBRARIES = \
kvs-watch.la \
resource.la \
sched-simple.la \
sdmon.la \
sdexec.la \
sdbus.la

Expand Down Expand Up @@ -268,6 +269,19 @@ sched_simple_la_LIBADD = \
$(JANSSON_LIBS)
sched_simple_la_LDFLAGS = $(fluxmod_ldflags) -module

sdmon_la_SOURCES = \
sdmon/sdmon.c
sdmon_la_CPPFLAGS = \
$(AM_CPPFLAGS) \
$(LIBUUID_CFLAGS)
sdmon_la_LIBADD = \
$(top_builddir)/src/common/libsdexec/libsdexec.la \
$(top_builddir)/src/common/libflux-core.la \
$(top_builddir)/src/common/libflux-internal.la \
$(LIBUUID_LIBS) \
$(JANSSON_LIBS)
sdmon_la_LDFLAGS = $(fluxmod_ldflags) -module

sdexec_la_SOURCES = \
sdexec/sdexec.c
sdexec_la_CPPFLAGS = \
Expand Down
17 changes: 13 additions & 4 deletions src/modules/resource/monitor.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@
* the initial response to the request to watch broker.online cannot
* be processed until the reactor runs.
*
* If systemd is enabled, watch sdmon.online instead of broker.online. This
* behaves exactly like broker.online, except that it isn't joined until
* sdmon has verified that the node has no running flux systemd units.
* This guards against scheduling new work on a node that hasn't been
* properly cleaned up. As with broker.online, nodes are automatically
* removed from the group when they are shut down or lost.
*
* Some synchronization notes:
* - rc1 completes on rank 0 before any other ranks can join broker.online,
* therefore the scheduler must allow flux module load to complete with
Expand Down Expand Up @@ -401,7 +408,7 @@ void monitor_destroy (struct monitor *monitor)

struct monitor *monitor_create (struct resource_ctx *ctx,
int inventory_size,
bool monitor_force_up)
struct resource_config *config)
{
struct monitor *monitor;

Expand Down Expand Up @@ -439,12 +446,15 @@ struct monitor *monitor_create (struct resource_ctx *ctx,
|| !(monitor->torpid = idset_create (monitor->size, 0))
|| !(monitor->lost = idset_create (monitor->size, 0)))
goto error;
if (monitor_force_up) {
if (config->monitor_force_up) {
if (idset_range_set (monitor->up, 0, monitor->size - 1) < 0)
goto error;
}
else if (!flux_attr_get (ctx->h, "broker.recovery-mode")) {
if (!(monitor->f_online = group_monitor (ctx->h, "broker.online"))
const char *online_group = "broker.online";
if (config->systemd_enable)
online_group = "sdmon.online";
if (!(monitor->f_online = group_monitor (ctx->h, online_group))
|| flux_future_then (monitor->f_online,
-1,
broker_online_cb,
Expand All @@ -466,7 +476,6 @@ struct monitor *monitor_create (struct resource_ctx *ctx,
return NULL;
}


/*
* vi:tabstop=4 shiftwidth=4 expandtab
*/
2 changes: 1 addition & 1 deletion src/modules/resource/monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

struct monitor *monitor_create (struct resource_ctx *ctx,
int inventory_size,
bool monitor_force_up);
struct resource_config *config);
void monitor_destroy (struct monitor *monitor);

const struct idset *monitor_get_down (struct monitor *monitor);
Expand Down
12 changes: 11 additions & 1 deletion src/modules/resource/resource.c
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,15 @@ static int parse_config (struct resource_ctx *ctx,
return -1;
}
}
/* Check systemd.enable so we know whether sdmon.online will be populated.
* Configuration errors in [systemd] are handled elsewhere.
*/
int systemd_enable = 0;
(void)flux_conf_unpack (conf,
NULL,
"{s?{s?b}}",
"systemd",
"enable", &systemd_enable);
if (rconfig) {
rconfig->journal_max = journal_max;
rconfig->exclude_idset = exclude;
Expand All @@ -159,6 +168,7 @@ static int parse_config (struct resource_ctx *ctx,
rconfig->no_update_watch = no_update_watch ? true : false;
rconfig->rediscover = rediscover ? true : false;
rconfig->R = o;
rconfig->systemd_enable = systemd_enable ? true : false;
}
else
json_decref (o);
Expand Down Expand Up @@ -402,7 +412,7 @@ int mod_main (flux_t *h, int argc, char **argv)
goto error;
if (!(ctx->monitor = monitor_create (ctx,
inventory_get_size (ctx->inventory),
config.monitor_force_up)))
&config)))
goto error;
if (!(ctx->status = status_create (ctx)))
goto error;
Expand Down
1 change: 1 addition & 0 deletions src/modules/resource/resource.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ struct resource_config {
bool no_update_watch;
bool monitor_force_up;
int journal_max;
bool systemd_enable; // systemd.enable, not under [resource]
};

struct resource_ctx {
Expand Down
Loading
Loading