Skip to content

Commit

Permalink
Merge pull request #6661 from garlick/stop_timeout
Browse files Browse the repository at this point in the history
improve management of systemd stop timeout to avoid SIGKILL while making progress
  • Loading branch information
mergify[bot] authored Feb 25, 2025
2 parents 9762f63 + 7a8b91d commit 4b0a520
Show file tree
Hide file tree
Showing 18 changed files with 261 additions and 82 deletions.
1 change: 1 addition & 0 deletions debian/control
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Standards-Version: 4.1.2
Build-Depends:
debhelper (>= 10),
flux-security (>= 0.13.0),
libsystemd-dev (>= 0.23),
libarchive-dev,
uuid-dev,
libzmq3-dev,
Expand Down
5 changes: 5 additions & 0 deletions doc/man1/flux-dump.rst
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ OPTIONS
KVS key and treat it as a warning. Without this option, content load
failures are treated as immediate fatal errors.

.. option:: --sd-notify

Regularly inform the broker of progress so it can reset the systemd
stop timer in rc3 context and provide human readable progress for
:program:`systemctl status flux`.

OTHER NOTES
===========
Expand Down
5 changes: 5 additions & 0 deletions doc/man1/flux-restore.rst
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ OPTIONS
suffix k or K=1024, M=1024\*1024, or G=1024\*1024\*1024 (up to
``INT_MAX``).

.. option:: --sd-notify

Regularly inform the broker of progress so it can provide human readable
progress for :program:`systemctl status flux`.

RESOURCES
=========

Expand Down
11 changes: 11 additions & 0 deletions doc/man7/flux-broker-attributes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,17 @@ broker.exit-restart [Updates: C, R]
broker.starttime
Timestamp of broker startup from :man3:`flux_reactor_now`.

broker.sd-notify
A boolean indicating that the broker should use :linux:man3:`sd_notify`
to inform systemd of its status. This is set to 1 in the Flux systemd
unit file.

broker.sd-stop-timeout
A timeout value (in RFC 23 Flux Standard Duration format) used by the
broker to extend the systemd stop timeout while it is making progress
towards shutdown. This is set to the same value as ``TimeoutStopSec``
in the Flux systemd unit file.

conf.shell_initrc [Updates: C, R]
The path to the :man1:`flux-shell` initrc script. Default:
``${prefix}/etc/flux/shell/initrc.lua``.
Expand Down
1 change: 1 addition & 0 deletions doc/test/spell.en.pws
Original file line number Diff line number Diff line change
Expand Up @@ -947,3 +947,4 @@ SIGALRM
backoff
unkillable
Feitelson
TimeoutStopSec
3 changes: 2 additions & 1 deletion etc/flux.service.in
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Wants=munge.service

[Service]
Type=notify
NotifyAccess=all
NotifyAccess=main
TimeoutStopSec=90
KillMode=mixed
ExecStart=/bin/bash -c '\
Expand All @@ -24,6 +24,7 @@ ExecStart=/bin/bash -c '\
-Sbroker.cleanup-timeout=45 \
-Sbroker.exit-norestart=42 \
-Sbroker.sd-notify=1 \
-Sbroker.sd-stop-timeout=90 \
-Scontent.dump=auto \
-Scontent.restore=auto \
'
Expand Down
3 changes: 2 additions & 1 deletion etc/rc1
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ if test $RANK -eq 0; then
flux module load ${backingmod} truncate
fi
echo "restoring content from ${dumpfile}"
flux restore --quiet --checkpoint --size-limit=100M ${dumpfile}
flux restore --sd-notify --quiet --checkpoint --size-limit=100M \
${dumpfile}
if test -n "${dumplink}"; then
rm -f ${dumplink}
fi
Expand Down
3 changes: 2 additions & 1 deletion etc/rc3
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ if test $RANK -eq 0; then
dumplink="${statedir:-.}/dump/RESTORE"
fi
echo "dumping content to ${dumpfile}"
if flux dump --quiet --ignore-failed-read --checkpoint ${dumpfile}; then
if flux dump --sd-notify --quiet --ignore-failed-read \
--checkpoint ${dumpfile}; then
test -n "$dumplink" && ln -s $(basename ${dumpfile}) ${dumplink}
else
exit_rc=1
Expand Down
12 changes: 8 additions & 4 deletions src/broker/broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -421,9 +421,6 @@ int main (int argc, char *argv[])
|| init_critical_ranks_attr (ctx.overlay, ctx.attrs) < 0)
goto cleanup;

if (create_runat_phases (&ctx) < 0)
goto cleanup;

/* Wire up the overlay.
*/
if (ctx.rank > 0) {
Expand Down Expand Up @@ -474,6 +471,12 @@ int main (int argc, char *argv[])
log_err ("error creating broker state machine");
goto cleanup;
}
/* This registers a state machine callback so call after
* state_machine_create().
*/
if (create_runat_phases (&ctx) < 0)
goto cleanup;

state_machine_post (ctx.state_machine, "start");

/* Create shutdown mechanism
Expand Down Expand Up @@ -746,7 +749,8 @@ static int create_runat_phases (broker_ctx_t *ctx)
if (!(ctx->runat = runat_create (ctx->h,
local_uri,
jobid,
ctx->sd_notify))) {
(runat_notify_f)state_machine_sd_notify,
ctx->state_machine))) {
log_err ("runat_create");
return -1;
}
Expand Down
23 changes: 14 additions & 9 deletions src/broker/runat.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@
#include "src/common/libmissing/argz.h"
#endif
#include <jansson.h>
#if HAVE_LIBSYSTEMD
#include <systemd/sd-daemon.h>
#endif
#include <flux/core.h>

#include "src/common/libczmqcontainers/czmq_containers.h"
Expand Down Expand Up @@ -68,6 +65,8 @@ struct runat {
flux_msg_handler_t **handlers;
bool sd_notify;
struct termios saved_termios;
runat_notify_f notify_cb;
void *notify_handle;
};

static void runat_command_destroy (struct runat_command *cmd);
Expand All @@ -90,6 +89,7 @@ static const char *env_blocklist[] = {
"FLUX_PMI_LIBRARY_PATH",
"I_MPI_PMI_LIBRARY",
"SLURM_*", // flux-framework/flux-core#5206
"NOTIFY_SOCKET", // see systemd sd_notify(3)
NULL,
};

Expand Down Expand Up @@ -306,13 +306,16 @@ static void start_next_command (struct runat *r, struct runat_entry *entry)
}
else {
while (!started && (cmd = zlist_head (entry->commands))) {
#if HAVE_LIBSYSTEMD
if (r->sd_notify) {
if (r->notify_cb) {
char *s = get_cmdline (cmd->cmd);
sd_notifyf (0, "STATUS=Running %s", s ? s : "unknown command");
char buf[256];
snprintf (buf,
sizeof (buf),
"Running %s",
s ? s : "unknown command");
r->notify_cb (r->notify_handle, buf);
free (s);
}
#endif
if (!(cmd->p = start_command (r, entry, cmd))) {
log_command (r->h, entry, 1, 0, "error starting command");
if (entry->exit_code == 0)
Expand Down Expand Up @@ -708,7 +711,8 @@ static const struct flux_msg_handler_spec htab[] = {
struct runat *runat_create (flux_t *h,
const char *local_uri,
const char *jobid,
bool sdnotify)
runat_notify_f notify_cb,
void *notify_handle)
{
struct runat *r;

Expand All @@ -722,7 +726,8 @@ struct runat *runat_create (flux_t *h,
r->h = h;
r->jobid = jobid;
r->local_uri = local_uri;
r->sd_notify = sdnotify;
r->notify_cb = notify_cb;
r->notify_handle = notify_handle;
if (isatty (STDIN_FILENO)
&& tcgetattr (STDIN_FILENO, &r->saved_termios) < 0)
flux_log_error (r->h, "failed to save terminal attributes");
Expand Down
5 changes: 4 additions & 1 deletion src/broker/runat.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@ typedef void (*runat_completion_f)(struct runat *r,
const char *name,
void *arg);

typedef void (*runat_notify_f)(void *handle, const char *msg);

struct runat *runat_create (flux_t *h,
const char *local_uri,
const char *jobid,
bool sdnotify);
runat_notify_f notify_cb,
void *notify_handle);

void runat_destroy (struct runat *r);

Expand Down
88 changes: 87 additions & 1 deletion src/broker/state_machine.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ struct monitor {
unsigned int parent_error:1;
};

struct systemd {
bool timeout_is_active;
double stop_timeout;
};

struct state_machine {
struct broker *ctx;
broker_state_t state;
Expand All @@ -85,6 +90,8 @@ struct state_machine {
struct cleanup cleanup;
struct shutdown shutdown;

struct systemd sd;

struct flux_msglist *wait_requests;

int exit_norestart;
Expand Down Expand Up @@ -167,6 +174,7 @@ static struct state_next nexttab[] = {
static const double default_quorum_warn = 60; // log slow joiners
static const double default_shutdown_warn = 60; // log slow shutdown
static const double default_cleanup_timeout = -1;
static const double default_sd_stop_timeout = -1;
static const double goodbye_timeout = 60;

static void state_action (struct state_machine *s, broker_state_t state)
Expand Down Expand Up @@ -202,6 +210,54 @@ static broker_state_t state_next (broker_state_t current, const char *event)
return current;
}

/* When systemd is tracking service stop, a SIGKILL will be sent to the broker
* if TimeoutStopSec is exceeded. Call this function when stop begins, and
* each time the broker enters a new state thereafter, to extend the timeout.
* That way if a state consumes a bunch of time but eventually completes,
* the remaining states aren't running under diminishing time constraints.
*/
static void sd_timeout_reset (struct state_machine *s)
{
#if HAVE_LIBSYSTEMD
if (s->ctx->sd_notify) {
if (s->sd.timeout_is_active) {
if (s->sd.stop_timeout > 0) {
double timeout_usec = s->sd.stop_timeout * 1E6;
sd_notifyf (0, "EXTEND_TIMEOUT_USEC=%d", (int)timeout_usec);
}
}
else {
/* Uncomment to tell systemd to transition the unit to
* "deactivating" state and begin the stop timeout.
* For now, we don't inform systemd so flux-shutdown(1)
* escapes the timeout.
*/
//sd_notify (0, "STOPPING=1");
s->sd.timeout_is_active = true;
}
}
#endif
}

/* Update systemd status, if enabled and status is non-NULL.
* Reset the systemd stop timeout also, if it is running.
* N.B. this is registered as a runat_notify_f callback and is called
* from the state-machine.sd-notify RPC handler.
*/
void state_machine_sd_notify (struct state_machine *s, const char *status)
{
if (s) {
#if HAVE_LIBSYSTEMD
if (s->ctx->sd_notify) {
if (s->sd.timeout_is_active)
sd_timeout_reset (s);
if (status)
sd_notifyf (0, "STATUS=%s", status);
}
#endif
}
}

static void action_init (struct state_machine *s)
{
s->ctx->online = true;
Expand Down Expand Up @@ -230,7 +286,8 @@ static void action_join (struct state_machine *s)
join_check_parent (s);
}
#if HAVE_LIBSYSTEMD
sd_notify (0, "READY=1");
if (s->ctx->sd_notify)
sd_notify (0, "READY=1");
#endif
}

Expand Down Expand Up @@ -392,6 +449,7 @@ static void cleanup_timer_cb (flux_reactor_t *r,

static void action_cleanup (struct state_machine *s)
{
sd_timeout_reset (s);
/* Prevent new downstream clients from saying hello, but
* let existing ones continue to communicate so they can
* shut down and disconnect.
Expand All @@ -418,6 +476,7 @@ static void action_cleanup (struct state_machine *s)

static void action_finalize (struct state_machine *s)
{
sd_timeout_reset (s);
/* Now that all clients have disconnected, finalize all
* downstream communication.
*/
Expand Down Expand Up @@ -461,6 +520,8 @@ static void shutdown_warn_timer_cb (flux_reactor_t *r,

static void action_shutdown (struct state_machine *s)
{
sd_timeout_reset (s);

if (overlay_get_child_peer_count (s->ctx->overlay) == 0) {
state_machine_post (s, "children-none");
return;
Expand Down Expand Up @@ -495,6 +556,7 @@ static void goodbye_continuation (flux_future_t *f, void *arg)

static void action_goodbye (struct state_machine *s)
{
sd_timeout_reset (s);
/* On rank 0, "goodbye" is posted by shutdown.c.
* On other ranks, send a goodbye message and wait for a response
* (with timeout) before continuing on.
Expand Down Expand Up @@ -1271,6 +1333,17 @@ static void disconnect_cb (flux_t *h,
flux_log_error (h, "error handling state-machine.disconnect");
}

static void state_machine_sd_notify_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
{
struct state_machine *s = arg;
const char *status = NULL;
(void)flux_request_unpack (msg, NULL, "{s?s}", "status", &status);
state_machine_sd_notify (s, status);
}

static const struct flux_msg_handler_spec htab[] = {
{
FLUX_MSGTYPE_REQUEST,
Expand All @@ -1296,6 +1369,12 @@ static const struct flux_msg_handler_spec htab[] = {
state_machine_get_cb,
FLUX_ROLE_USER,
},
{
FLUX_MSGTYPE_REQUEST,
"state-machine.sd-notify",
state_machine_sd_notify_cb,
0,
},
FLUX_MSGHANDLER_TABLE_END,
};

Expand Down Expand Up @@ -1387,6 +1466,13 @@ struct state_machine *state_machine_create (struct broker *ctx)
log_err ("error configuring cleanup timeout attribute");
goto error;
}
if (timeout_configure (s,
"broker.sd-stop-timeout",
&s->sd.stop_timeout,
default_sd_stop_timeout) < 0) {
log_err ("error configuring systemd stop timeout attribute");
goto error;
}
if (timeout_configure (s,
"broker.shutdown-warn",
&s->shutdown.warn_period,
Expand Down
2 changes: 2 additions & 0 deletions src/broker/state_machine.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ void state_machine_kill (struct state_machine *s, int signum);

int state_machine_shutdown (struct state_machine *s, flux_error_t *error);

void state_machine_sd_notify (struct state_machine *s, const char *status);

#endif /* !_BROKER_STATE_MACHINE_H */

/*
Expand Down
Loading

0 comments on commit 4b0a520

Please sign in to comment.