From fdebf3c3f9316b7c1828c932bcb0472456689e08 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 3 Mar 2025 06:12:11 -0800 Subject: [PATCH 01/11] heartbeat: support loading on all ranks Problem: the heartbeat initialization logic assumes the module is only loaded on rank 0, but a future commit adds functionality that requires the module to be loaded on all ranks. Fetch the rank and only generate the heartbeat on rank 0. --- src/modules/heartbeat/heartbeat.c | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/src/modules/heartbeat/heartbeat.c b/src/modules/heartbeat/heartbeat.c index cd9b964b0a28..3112189c7848 100644 --- a/src/modules/heartbeat/heartbeat.c +++ b/src/modules/heartbeat/heartbeat.c @@ -24,6 +24,7 @@ static const double default_period = 2.0; struct heartbeat { flux_t *h; + uint32_t rank; double period; flux_watcher_t *timer; flux_msg_handler_t **handlers; @@ -133,7 +134,8 @@ static struct heartbeat *heartbeat_create (flux_t *h) return NULL; hb->h = h; hb->period = default_period; - if (flux_msg_handler_addvec (hb->h, htab, hb, &hb->handlers) < 0) + if (flux_get_rank (h, &hb->rank) < 0 + || flux_msg_handler_addvec (hb->h, htab, hb, &hb->handlers) < 0) goto error; return hb; error: @@ -150,13 +152,15 @@ int mod_main (flux_t *h, int argc, char **argv) return -1; if (parse_args (argc, argv, hb) < 0) goto error; - if (!(hb->timer = flux_timer_watcher_create (r, - 0., - hb->period, - timer_cb, - hb))) - goto error; - flux_watcher_start (hb->timer); + if (hb->rank == 0) { + if (!(hb->timer = flux_timer_watcher_create (r, + 0., + hb->period, + timer_cb, + hb))) + goto error; + flux_watcher_start (hb->timer); + } if (flux_reactor_run (r, 0) < 0) { flux_log_error (h, "flux_reactor_run"); goto error; From d08260df4acef6591a69a405d2ec8b4680f96f8a Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 3 Mar 2025 06:15:36 -0800 Subject: [PATCH 02/11] rc: load heartbeat on all ranks Problem: the heartbeat module is only loaded on rank 0, but a future commit adds functionality that requires it to be loaded on all ranks. Load on all ranks. At this point, only the leader does anything - follower ranks sit idle. --- etc/rc1 | 2 +- etc/rc3 | 2 +- t/rc/rc1-job | 2 +- t/rc/rc1-kvs | 2 +- t/rc/rc3-job | 2 +- t/rc/rc3-kvs | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/etc/rc1 b/etc/rc1 index 0b20f08154d6..db8d1d6eb9aa 100755 --- a/etc/rc1 +++ b/etc/rc1 @@ -86,7 +86,7 @@ fi modload all job-ingest modload 0 job-exec -modload 0 heartbeat +modload all heartbeat core_dir=$(cd ${0%/*} && pwd -P) all_dirs=$core_dir${FLUX_RC_EXTRA:+":$FLUX_RC_EXTRA"} diff --git a/etc/rc3 b/etc/rc3 index 72a8cc1a964c..21050fcf4cf1 100755 --- a/etc/rc3 +++ b/etc/rc3 @@ -27,7 +27,7 @@ for rcdir in $all_dirs; do done done -modrm 0 heartbeat +modrm all heartbeat modrm 0 sched-simple modrm all resource modrm 0 job-exec diff --git a/t/rc/rc1-job b/t/rc/rc1-job index 1a02897c874c..135ec90a6426 100755 --- a/t/rc/rc1-job +++ b/t/rc/rc1-job @@ -39,7 +39,7 @@ modload all job-ingest modload all job-info modload 0 job-list modload all barrier -modload 0 heartbeat +modload all heartbeat if test $RANK -eq 0; then # Set fake resources for testing diff --git a/t/rc/rc1-kvs b/t/rc/rc1-kvs index 33cf0d90b8f1..fab4cb172147 100755 --- a/t/rc/rc1-kvs +++ b/t/rc/rc1-kvs @@ -14,4 +14,4 @@ modload all content blob-size-limit=1048576 modload 0 content-sqlite modload all kvs modload all kvs-watch -modload 0 heartbeat +modload all heartbeat diff --git a/t/rc/rc3-job b/t/rc/rc3-job index 533489858cb1..e6097a760166 100755 --- a/t/rc/rc3-job +++ b/t/rc/rc3-job @@ -14,7 +14,7 @@ if [ "${TEST_UNDER_FLUX_NO_EXEC}" != "y" ] then modrm 0 job-exec fi -modrm 0 heartbeat +modrm all heartbeat modrm 0 sched-simple modrm all resource modrm 0 job-list diff --git a/t/rc/rc3-kvs b/t/rc/rc3-kvs index afb19ac48687..eff0907384d7 100755 --- a/t/rc/rc3-kvs +++ b/t/rc/rc3-kvs @@ -11,7 +11,7 @@ modrm() { } -modrm 0 heartbeat +modrm all heartbeat modrm all kvs-watch modrm all kvs From 0a74143884e6a050535e2d15abdfae2b3829588f Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 3 Mar 2025 06:29:28 -0800 Subject: [PATCH 03/11] heartbeat: implement stats-get method handler Problem: heartbeat implements a custom RPC for testing, but nowadays we use the stats-get method for that. Switch to stats-get. Update heartbeat test. --- src/modules/heartbeat/heartbeat.c | 28 +++++++++++----------------- t/t1107-heartbeat.t | 9 ++------- 2 files changed, 13 insertions(+), 24 deletions(-) diff --git a/src/modules/heartbeat/heartbeat.c b/src/modules/heartbeat/heartbeat.c index 3112189c7848..dba5f9647b34 100644 --- a/src/modules/heartbeat/heartbeat.c +++ b/src/modules/heartbeat/heartbeat.c @@ -31,25 +31,18 @@ struct heartbeat { flux_future_t *f; }; -static void heartbeat_get_cb (flux_t *h, - flux_msg_handler_t *mh, - const flux_msg_t *msg, - void *arg) +static void heartbeat_stats_cb (flux_t *h, + flux_msg_handler_t *mh, + const flux_msg_t *msg, + void *arg) { struct heartbeat *hb = arg; - if (flux_request_decode (msg, NULL, NULL) < 0) - goto error; if (flux_respond_pack (h, msg, "{s:f}", - "period", - hb->period) < 0) - flux_log_error (h, "error responding to heartbeat.get request"); - return; -error: - if (flux_respond_error (h, msg, errno, NULL) < 0) - flux_log_error (h, "error responding to heartbeat.get request"); + "period", hb->period) < 0) + flux_log_error (h, "error responding to stats-get request"); } static void publish_continuation (flux_future_t *f, void *arg) @@ -106,10 +99,11 @@ static int parse_args (int argc, char **argv, struct heartbeat *hb) } static const struct flux_msg_handler_spec htab[] = { - { FLUX_MSGTYPE_REQUEST, - "heartbeat.get", - heartbeat_get_cb, - FLUX_ROLE_USER + { + FLUX_MSGTYPE_REQUEST, + "heartbeat.stats-get", + heartbeat_stats_cb, + 0, }, FLUX_MSGHANDLER_TABLE_END, }; diff --git a/t/t1107-heartbeat.t b/t/t1107-heartbeat.t index 7a943e50bd51..dfd0c2e65fed 100755 --- a/t/t1107-heartbeat.t +++ b/t/t1107-heartbeat.t @@ -8,19 +8,14 @@ SIZE=1 test_under_flux ${SIZE} minimal -get_heartbeat() { - flux python -c \ - "import flux; print(flux.Flux().rpc(\"heartbeat.get\").get_str())" -} - test_expect_success 'load heartbeat' ' flux module load heartbeat ' test_expect_success 'reload heartbeat with period=10s and verify' ' - period1=$(get_heartbeat | jq -r -e .period) && + period1=$(flux module stats heartbeat | jq -r -e .period) && flux module reload heartbeat period=10s && - period2=$(get_heartbeat | jq -r -e .period) && + period2=$(flux module stats heartbeat | jq -r -e .period) && echo period changed from $period1 to $period2 && test "$period1" != "$period2" ' From 7a563b8d7b6bd711a4d7ae06ba671847c7e2fc6e Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 3 Mar 2025 07:01:52 -0800 Subject: [PATCH 04/11] heartbeat: support [heartbeat] config Problem: the heartbeat module can only be configured on the command line. Support a [heartbeat] TOML table containing one FSD value: period. --- src/modules/heartbeat/heartbeat.c | 94 +++++++++++++++++++++++++++++-- 1 file changed, 89 insertions(+), 5 deletions(-) diff --git a/src/modules/heartbeat/heartbeat.c b/src/modules/heartbeat/heartbeat.c index dba5f9647b34..bacdbf5f978e 100644 --- a/src/modules/heartbeat/heartbeat.c +++ b/src/modules/heartbeat/heartbeat.c @@ -17,7 +17,8 @@ #include -#include +#include "src/common/libutil/fsd.h" +#include "src/common/libutil/errprintf.h" #include "ccan/str/str.h" static const double default_period = 2.0; @@ -76,19 +77,24 @@ static void timer_cb (flux_reactor_t *r, } } -static int parse_args (int argc, char **argv, struct heartbeat *hb) +static int heartbeat_parse_args (struct heartbeat *hb, + int argc, + char **argv, + flux_error_t *error) { int i; for (i = 0; i < argc; i++) { if (strstarts (argv[i], "period=")) { if (fsd_parse_duration (argv[i] + 7, &hb->period) < 0) { - flux_log_error (hb->h, "error parsing period value"); + errprintf (error, + "period: error parsing FSD: %s", + strerror (errno)); return -1; } } else { - flux_log (hb->h, LOG_ERR, "unknown option: %s", argv[i]); + errprintf (error, "%s: unknown option", argv[i]); goto inval; } } @@ -98,6 +104,74 @@ static int parse_args (int argc, char **argv, struct heartbeat *hb) return -1; } +static int heartbeat_parse_config (struct heartbeat *hb, + const flux_conf_t *conf, + flux_error_t *error) +{ + flux_error_t conf_error; + const char *period_fsd = NULL; + double new_period = default_period; + + if (flux_conf_unpack (conf, + &conf_error, + "{s?{s?s !}}", + "heartbeat", + "period", &period_fsd) < 0) { + errprintf (error, + "error reading [heartbeat] config table: %s", + conf_error.text); + return -1; + } + if (period_fsd) { + if (fsd_parse_duration (period_fsd, &new_period) < 0) { + errprintf (error, "error parsing heartbeat.period FSD value"); + return -1; + } + if (new_period <= 0.) { + errprintf (error, "heartbeat.period must be a positive FSD value"); + errno = EINVAL; + return -1; + } + } + if (new_period != hb->period) { + hb->period = new_period; + if (hb->timer) { + flux_timer_watcher_reset (hb->timer, 0., hb->period); + flux_timer_watcher_again (hb->timer); + } + } + return 0; +} + +static void heartbeat_config_reload_cb (flux_t *h, + flux_msg_handler_t *mh, + const flux_msg_t *msg, + void *arg) +{ + struct heartbeat *hb = arg; + const flux_conf_t *conf; + flux_error_t error; + const char *errstr = NULL; + + if (flux_conf_reload_decode (msg, &conf) < 0) + goto error; + if (heartbeat_parse_config (hb, conf, &error) < 0) { + errstr = error.text; + goto error; + } + if (flux_set_conf (h, flux_conf_incref (conf)) < 0) { + errstr = "error updating cached configuration"; + flux_conf_decref (conf); + goto error; + } + if (flux_respond (h, msg, NULL) < 0) + flux_log_error (h, "error responding to config-reload request"); + return; +error: + if (flux_respond_error (h, msg, errno, errstr) < 0) + flux_log_error (h, "error responding to config-reload request"); +} + static const struct flux_msg_handler_spec htab[] = { { FLUX_MSGTYPE_REQUEST, @@ -105,6 +179,12 @@ static const struct flux_msg_handler_spec htab[] = { heartbeat_stats_cb, 0, }, + { + FLUX_MSGTYPE_REQUEST, + "heartbeat.config-reload", + heartbeat_config_reload_cb, + 0 + }, FLUX_MSGHANDLER_TABLE_END, }; @@ -141,11 +221,15 @@ int mod_main (flux_t *h, int argc, char **argv) { struct heartbeat *hb; flux_reactor_t *r = flux_get_reactor (h); + flux_error_t error; if (!(hb = heartbeat_create (h))) return -1; - if (parse_args (argc, argv, hb) < 0) + if (heartbeat_parse_config (hb, flux_get_conf (h), &error) < 0 + || heartbeat_parse_args (hb, argc, argv, &error) < 0) { + flux_log (h, LOG_ERR, "%s", error.text); goto error; + } if (hb->rank == 0) { if (!(hb->timer = flux_timer_watcher_create (r, 0., From ae0ac9561c36135833c82694b502ceb801bc9540 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 3 Mar 2025 07:56:13 -0800 Subject: [PATCH 05/11] testsuite: cover heartbeat config Problem: the [heartbeat] config has no test coverage Add some tests. --- t/t1107-heartbeat.t | 78 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 68 insertions(+), 10 deletions(-) diff --git a/t/t1107-heartbeat.t b/t/t1107-heartbeat.t index dfd0c2e65fed..f55dee9a0d60 100755 --- a/t/t1107-heartbeat.t +++ b/t/t1107-heartbeat.t @@ -8,22 +8,71 @@ SIZE=1 test_under_flux ${SIZE} minimal -test_expect_success 'load heartbeat' ' - flux module load heartbeat +test_expect_success 'load heartbeat, period is 2s' ' + flux module load heartbeat && + flux module stats heartbeat | jq -r -e ".period == 2" ' - -test_expect_success 'reload heartbeat with period=10s and verify' ' - period1=$(flux module stats heartbeat | jq -r -e .period) && +test_expect_success 'reload heartbeat with period=10s' ' flux module reload heartbeat period=10s && - period2=$(flux module stats heartbeat | jq -r -e .period) && - echo period changed from $period1 to $period2 && - test "$period1" != "$period2" + flux module stats heartbeat | jq -r -e ".period == 10" +' +test_expect_success 'reconfig with period=5s' ' + flux config load <<-EOT && + [heartbeat] + period = "5s" + EOT + flux module stats heartbeat | jq -r -e ".period == 5" +' +test_expect_success 'reconfig with wrong period type, period is still 5s' ' + test_must_fail flux config load <<-EOT && + [heartbeat] + period = 4 + EOT + flux module stats heartbeat | jq -r -e ".period == 5" +' +test_expect_success 'reconfig with bad period FSD, period is still 5s' ' + test_must_fail flux config load <<-EOT && + [heartbeat] + period = "zzz" + EOT + flux module stats heartbeat | jq -r -e ".period == 5" +' +test_expect_success 'reconfig with bad key' ' + test_must_fail flux config load <<-EOT + [heartbeat] + z = 42 + EOT +' +test_expect_success 'reconfig with empty table, period is 2s' ' + flux config load <<-EOT && + [heartbeat] + EOT + flux module stats heartbeat | jq -r -e ".period == 2" +' +test_expect_success 'unload heartbeat' ' + flux module unload heartbeat +' +test_expect_success 'reconfig period of 0 (unchecked)' ' + flux config load <<-EOT + [heartbeat] + period = "0" + EOT +' +test_expect_success 'loading heartbeat fails with bad config' ' + test_must_fail flux module load heartbeat +' +test_expect_success 'reconfig with empty [heartbeat] table' ' + flux config load <<-EOT + [heartbeat] + EOT +' +test_expect_success 'load heartbeat, period is 2s' ' + flux module load heartbeat && + flux module stats heartbeat | jq -r -e ".period == 2" ' - test_expect_success 'unload heartbeat' ' flux module unload heartbeat ' - test_expect_success 'reload heartbeat with period=bad fsd' ' test_must_fail flux module load heartbeat period=1x ' @@ -31,5 +80,14 @@ test_expect_success 'reload heartbeat with period=bad fsd' ' test_expect_success 'reload heartbeat with bad option' ' test_must_fail flux module load heartbeat foo=42 ' +test_expect_success 'load heartbeat with period=1s' ' + flux module load heartbeat period=1s +' +test_expect_success 'period is 1s' ' + flux module stats heartbeat | jq -r -e ".period == 1" +' +test_expect_success 'unload heartbeat' ' + flux module unload heartbeat +' test_done From a5db5f90e98f8c980e92eabddc6b5987a8147912 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 3 Mar 2025 08:02:53 -0800 Subject: [PATCH 06/11] flux-config-heartbeat(5): new man page Problem: [heartbeat] configuration is undocumented. Add a man page. --- doc/Makefile.am | 3 +- doc/man5/flux-config-heartbeat.rst | 48 ++++++++++++++++++++++++++++++ doc/manpages.py | 1 + 3 files changed, 51 insertions(+), 1 deletion(-) create mode 100644 doc/man5/flux-config-heartbeat.rst diff --git a/doc/Makefile.am b/doc/Makefile.am index 8139b4b6a7ee..877312f60076 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -363,7 +363,8 @@ MAN5_FILES_PRIMARY = \ man5/flux-config-ingest.5 \ man5/flux-config-kvs.5 \ man5/flux-config-policy.5 \ - man5/flux-config-queues.5 + man5/flux-config-queues.5 \ + man5/flux-config-heartbeat.5 MAN7_FILES = $(MAN7_FILES_PRIMARY) diff --git a/doc/man5/flux-config-heartbeat.rst b/doc/man5/flux-config-heartbeat.rst new file mode 100644 index 000000000000..4252ca400a16 --- /dev/null +++ b/doc/man5/flux-config-heartbeat.rst @@ -0,0 +1,48 @@ +======================== +flux-config-heartbeat(5) +======================== + + +DESCRIPTION +=========== + +The ``heartbeat`` table may be used to tune the configuration of the +Flux heartbeat module, which publishes periodic ``heartbeat.pulse`` messages +for synchronization. + +It may contain the following keys: + + +KEYS +==== + +period + (optional) The interval (in RFC 23 Flux Standard Duration format) between + the publication of heartbeat messages. Default: ``"2s"``. + + +EXAMPLE +======= + +:: + + [heartbeat] + period = "5s" + + +RESOURCES +========= + +.. include:: common/resources.rst + + +FLUX RFC +======== + +:doc:`rfc:spec_23` + + +SEE ALSO +======== + +:man5:`flux-config` diff --git a/doc/manpages.py b/doc/manpages.py index 40832827a285..34ef8157a255 100644 --- a/doc/manpages.py +++ b/doc/manpages.py @@ -358,6 +358,7 @@ ('man5/flux-config-queues', 'flux-config-queues', 'configure Flux job queues', [author], 5), ('man5/flux-config-job-manager', 'flux-config-job-manager', 'configure Flux job manager service', [author], 5), ('man5/flux-config-kvs', 'flux-config-kvs', 'configure Flux kvs service', [author], 5), + ('man5/flux-config-heartbeat', 'flux-config-heartbeat', 'configure Flux heartbeat service', [author], 5), ('man7/flux-broker-attributes', 'flux-broker-attributes', 'overview Flux broker attributes', [author], 7), ('man7/flux-jobtap-plugins', 'flux-jobtap-plugins', 'overview Flux jobtap plugin API', [author], 7), ('man7/flux-environment', 'flux-environment', 'Flux environment overview', [author], 7), From 8c3776b5f2f28b94a842b2442d061b3f12684b28 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 3 Mar 2025 16:10:38 -0800 Subject: [PATCH 07/11] broker: add overlay.parent-disconnect RPC Problem: there is no way to force the broker to disconnect from its overlay parent, other than from the parent itself. Add overlay.parent-disconnect RPC, which allows any part of the system to force an overlay disconnect. This forces all pending and future RPCs to the parent to fail fast. Because the broker monitors the parent broker state machine using a streaming RPC, the net effect is to force the subtree to shut down. --- src/broker/overlay.c | 40 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 36 insertions(+), 4 deletions(-) diff --git a/src/broker/overlay.c b/src/broker/overlay.c index 46c6e70a1169..88ca0177477c 100644 --- a/src/broker/overlay.c +++ b/src/broker/overlay.c @@ -1059,6 +1059,16 @@ static void fail_parent_rpc (const flux_msg_t *msg, void *arg) log_tracker_error (ov->h, msg, errno); } +static void parent_disconnect (struct overlay *ov) +{ + if (ov->parent.zsock) { + (void)zmq_disconnect (ov->parent.zsock, ov->parent.uri); + ov->parent.offline = true; + rpc_track_purge (ov->parent.tracker, fail_parent_rpc, ov); + overlay_monitor_notify (ov, FLUX_NODEID_ANY); + } +} + static void parent_cb (flux_reactor_t *r, flux_watcher_t *w, int revents, @@ -1115,10 +1125,7 @@ static void parent_cb (flux_reactor_t *r, "%s (rank %lu) sent disconnect control message", flux_get_hostbyrank (ov->h, ov->parent.rank), (unsigned long)ov->parent.rank); - (void)zmq_disconnect (ov->parent.zsock, ov->parent.uri); - ov->parent.offline = true; - rpc_track_purge (ov->parent.tracker, fail_parent_rpc, ov); - overlay_monitor_notify (ov, FLUX_NODEID_ANY); + parent_disconnect (ov); } else logdrop (ov, OVERLAY_UPSTREAM, msg, "unknown control type"); @@ -1940,6 +1947,25 @@ static void overlay_disconnect_subtree_cb (flux_t *h, flux_log_error (h, "error responding to overlay.disconnect-subtree"); } +/* Log a message then force the parent to disconnect. + */ +static void overlay_disconnect_parent_cb (flux_t *h, + flux_msg_handler_t *mh, + const flux_msg_t *msg, + void *arg) +{ + struct overlay *ov = arg; + const char *reason; + + if (flux_request_unpack (msg, NULL, "{s:s}", "reason", &reason) < 0) + goto error; + flux_log (h, LOG_CRIT, "disconnecting: %s", reason); + parent_disconnect (ov); + return; +error: + flux_log_error (h, "overlay.disconnect-parent error"); +} + static void overlay_trace_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, @@ -2457,6 +2483,12 @@ static const struct flux_msg_handler_spec htab[] = { overlay_disconnect_subtree_cb, 0 }, + { + FLUX_MSGTYPE_REQUEST, + "overlay.disconnect-parent", + overlay_disconnect_parent_cb, + 0 + }, { FLUX_MSGTYPE_REQUEST, "overlay.goodbye", From a6882eada450e2f0ce0ab585c62438657a9d1d4b Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 3 Mar 2025 12:24:54 -0800 Subject: [PATCH 08/11] heartbeat: add timeout Problem: we have observed situations where a node is lost to the rank 0 broker even though it remains connected. Since we periodically publish heartbeat messages to all ranks, add a timeout mechanism so that a broker can force a parent disconnect upon not receiving heartbeats for a configurable period. In addition, log a warning when heartbeats are delayed by more than 3 heartbeat periods. --- src/modules/heartbeat/heartbeat.c | 180 ++++++++++++++++++++++++++---- 1 file changed, 158 insertions(+), 22 deletions(-) diff --git a/src/modules/heartbeat/heartbeat.c b/src/modules/heartbeat/heartbeat.c index bacdbf5f978e..2ffa66695192 100644 --- a/src/modules/heartbeat/heartbeat.c +++ b/src/modules/heartbeat/heartbeat.c @@ -9,27 +9,44 @@ \************************************************************/ /* heartbeat.c - publish regular heartbeat messages + * + * Heartbeats are published on rank 0 (leader) + * Heartbeats are subscribed to on rank > 0 (followers) + * + * By default, if a follower broker does not receive heartbeats within + * a timeout window (5m), it forces an overlay parent disconnect. */ #if HAVE_CONFIG_H #include "config.h" #endif - +#include #include +#include +#include #include "src/common/libutil/fsd.h" #include "src/common/libutil/errprintf.h" #include "ccan/str/str.h" static const double default_period = 2.0; +static const double default_timeout = 300.; + +static const int default_warn_thresh = 3; // number of heartbeat periods struct heartbeat { flux_t *h; uint32_t rank; double period; + double timeout; flux_watcher_t *timer; flux_msg_handler_t **handlers; flux_future_t *f; + flux_future_t *sync; + json_int_t count; + double t_stamp; + int warn_thresh; + bool over_warn_thresh; }; static void heartbeat_stats_cb (flux_t *h, @@ -41,11 +58,47 @@ static void heartbeat_stats_cb (flux_t *h, if (flux_respond_pack (h, msg, - "{s:f}", - "period", hb->period) < 0) + "{s:f s:f s:I s:i}", + "period", hb->period, + "timeout", hb->timeout, + "count", hb->count, + "warn_thresh", hb->warn_thresh) < 0) flux_log_error (h, "error responding to stats-get request"); } +static void sync_cb (flux_future_t *f, void *arg) +{ + struct heartbeat *hb = arg; + double now = flux_reactor_now (flux_get_reactor (hb->h)); + + if (flux_future_get (f, NULL) < 0) { + if (errno != ETIMEDOUT) { + flux_log_error (hb->h, "unexpected sync error"); + goto done; + } + char buf[64] = "unknown period"; + char msg[128]; + (void)fsd_format_duration_ex (buf, sizeof (buf), now - hb->t_stamp, 2); + snprintf (msg, sizeof (msg), "no heartbeat for %s", buf); + + flux_future_t *f2; + if (!(f2 = flux_rpc_pack (hb->h, + "overlay.disconnect-parent", + FLUX_NODEID_ANY, + FLUX_RPC_NORESPONSE, + "{s:s}", + "reason", msg))) + flux_log_error (hb->h, "overlay.disconnect-parent"); + flux_future_destroy (f2); + } + else { + hb->count++; + hb->t_stamp = now; + } +done: + flux_future_reset (f); +} + static void publish_continuation (flux_future_t *f, void *arg) { struct heartbeat *hb = arg; @@ -57,15 +110,9 @@ static void publish_continuation (flux_future_t *f, void *arg) hb->f = NULL; } -static void timer_cb (flux_reactor_t *r, - flux_watcher_t *w, - int revents, - void *arg) +static void heartbeat_publish (struct heartbeat *hb) { - struct heartbeat *hb = arg; - flux_future_destroy (hb->f); - if (!(hb->f = flux_event_publish (hb->h, "heartbeat.pulse", 0, NULL))) { flux_log_error (hb->h, "error sending publish request"); return; @@ -75,6 +122,58 @@ static void timer_cb (flux_reactor_t *r, flux_future_destroy (hb->f); hb->f = NULL; } + hb->count++; +} + +static void heartbeat_warn (struct heartbeat *hb) +{ + double now = flux_reactor_now (flux_get_reactor (hb->h)); + bool over_thresh = false; + + if (now - hb->t_stamp > hb->period * hb->warn_thresh) + over_thresh = true; + + if (over_thresh && !hb->over_warn_thresh) { + flux_log (hb->h, LOG_WARNING, "heartbeat overdue"); + hb->over_warn_thresh = true; + } + else if (!over_thresh && hb->over_warn_thresh) { + flux_log (hb->h, LOG_WARNING, "heartbeat received"); + hb->over_warn_thresh = false; + } +} + +static void timer_cb (flux_reactor_t *r, + flux_watcher_t *w, + int revents, + void *arg) +{ + struct heartbeat *hb = arg; + + if (hb->rank == 0) + heartbeat_publish (hb); + else + heartbeat_warn (hb); +} + +static void heartbeat_period_adjust (struct heartbeat *hb, double period) +{ + if (hb->timer) { + flux_timer_watcher_reset (hb->timer, 0., period); + flux_timer_watcher_again (hb->timer); + } +} + +static int heartbeat_timeout_adjust (struct heartbeat *hb, double timeout) +{ + if (hb->sync) { + flux_future_t *f; + if (!(f = flux_sync_create (hb->h, 0.)) + || flux_future_then (f, timeout, sync_cb, hb) < 0) + return -1; + hb->sync = f; + } + return 0; } static int heartbeat_parse_args (struct heartbeat *hb, @@ -110,13 +209,18 @@ static int heartbeat_parse_config (struct heartbeat *hb, { flux_error_t conf_error; const char *period_fsd = NULL; + const char *timeout_fsd = NULL; double new_period = default_period; + double new_timeout = default_timeout; + int new_warn_thresh = default_warn_thresh; if (flux_conf_unpack (conf, &conf_error, - "{s?{s?s !}}", + "{s?{s?s s?s s?i !}}", "heartbeat", - "period", &period_fsd) < 0) { + "period", &period_fsd, + "timeout", &timeout_fsd, + "warn_thresh", &new_warn_thresh) < 0) { errprintf (error, "error reading [heartbeat] config table: %s", conf_error.text); @@ -133,13 +237,38 @@ static int heartbeat_parse_config (struct heartbeat *hb, return -1; } } + if (timeout_fsd) { + if (fsd_parse_duration (timeout_fsd, &new_timeout) < 0) { + errprintf (error, "error parsing heartbeat.timeout FSD value"); + return -1; + } + if (new_timeout == 0 || new_timeout == INFINITY) + new_timeout = -1; + } + if (new_timeout < new_period * 2 && new_timeout != -1) { + errprintf (error, + "heartbeat.timeout must be >= 2*heartbeat.period," + " infinity, or 0"); + errno = EINVAL; + return -1; + } if (new_period != hb->period) { + heartbeat_period_adjust (hb, new_period); hb->period = new_period; - if (hb->timer) { - flux_timer_watcher_reset (hb->timer, 0., hb->period); - flux_timer_watcher_again (hb->timer); + } + if (new_timeout != hb->timeout) { + if (heartbeat_timeout_adjust (hb, new_timeout) < 0) { + errprintf (error, "error adjusting timeout: %s", strerror (errno)); + return -1; } + hb->timeout = new_timeout; } + if (new_warn_thresh <= 0) { + errprintf (error, "heartbeat.warn_thresh must be positive"); + errno = EINVAL; + return -1; + } + hb->warn_thresh = new_warn_thresh; return 0; } @@ -192,6 +321,7 @@ static void heartbeat_destroy (struct heartbeat *hb) { if (hb) { int saved_errno = errno; + flux_future_destroy (hb->sync); flux_future_destroy (hb->f); flux_msg_handler_delvec (hb->handlers); flux_watcher_destroy (hb->timer); @@ -208,6 +338,9 @@ static struct heartbeat *heartbeat_create (flux_t *h) return NULL; hb->h = h; hb->period = default_period; + hb->timeout = default_timeout; + hb->t_stamp = flux_reactor_now (flux_get_reactor (h)); + hb->warn_thresh = default_warn_thresh; if (flux_get_rank (h, &hb->rank) < 0 || flux_msg_handler_addvec (hb->h, htab, hb, &hb->handlers) < 0) goto error; @@ -230,14 +363,17 @@ int mod_main (flux_t *h, int argc, char **argv) flux_log (h, LOG_ERR, "%s", error.text); goto error; } - if (hb->rank == 0) { - if (!(hb->timer = flux_timer_watcher_create (r, - 0., - hb->period, - timer_cb, - hb))) + if (!(hb->timer = flux_timer_watcher_create (r, + 0., + hb->period, + timer_cb, + hb))) + goto error; + flux_watcher_start (hb->timer); + if (hb->rank > 0) { + if (!(hb->sync = flux_sync_create (h, 0)) + || flux_future_then (hb->sync, hb->timeout, sync_cb, hb) < 0) goto error; - flux_watcher_start (hb->timer); } if (flux_reactor_run (r, 0) < 0) { flux_log_error (h, "flux_reactor_run"); From dec1bb61ac88928d5a3932a865d72e9a5efdb35f Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 3 Mar 2025 12:25:07 -0800 Subject: [PATCH 09/11] testsuite: cover heartbeat timeout Problem: heartbeat timeout has no test coverage. Add some tests. --- t/t1107-heartbeat.t | 91 ++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 86 insertions(+), 5 deletions(-) diff --git a/t/t1107-heartbeat.t b/t/t1107-heartbeat.t index f55dee9a0d60..49318f51f5bf 100755 --- a/t/t1107-heartbeat.t +++ b/t/t1107-heartbeat.t @@ -4,12 +4,11 @@ test_description='Test heartbeat module' . `dirname $0`/sharness.sh -SIZE=1 +SIZE=2 test_under_flux ${SIZE} minimal - test_expect_success 'load heartbeat, period is 2s' ' - flux module load heartbeat && + flux exec flux module load heartbeat && flux module stats heartbeat | jq -r -e ".period == 2" ' test_expect_success 'reload heartbeat with period=10s' ' @@ -86,8 +85,90 @@ test_expect_success 'load heartbeat with period=1s' ' test_expect_success 'period is 1s' ' flux module stats heartbeat | jq -r -e ".period == 1" ' + +# Cover heartbeat.timeout + +test_expect_success 'reconfig with timeout=infinity works' ' + flux config load <<-EOT && + [heartbeat] + timeout = "infinity" + EOT + flux module stats heartbeat | jq -r -e ".timeout == -1" +' +test_expect_success 'reconfig with timeout=0 works' ' + flux config load <<-EOT && + [heartbeat] + timeout = "0" + EOT + flux module stats heartbeat | jq -r -e ".timeout == -1" +' +test_expect_success 'reconfig with timeout=1m works' ' + flux config load <<-EOT && + [heartbeat] + period = "2s" + timeout = "1m" + EOT + flux module stats heartbeat | jq -r -e ".timeout == 60" +' +test_expect_success 'reconfig with wrong timeout type fails' ' + test_must_fail flux config load <<-EOT && + [heartbeat] + period = "2s" + timeout = 42 + EOT + flux module stats heartbeat | jq -r -e ".timeout == 60" +' +test_expect_success 'reconfig with bad timeout FSD fails' ' + test_must_fail flux config load <<-EOT && + [heartbeat] + period = "2s" + timeout = "42z" + EOT + flux module stats heartbeat | jq -r -e ".timeout == 60" +' +test_expect_success 'reconfig with timeout < period fails' ' + test_must_fail flux config load <<-EOT && + [heartbeat] + period = "2s" + timeout = "1s" + EOT + flux module stats heartbeat | jq -r -e ".timeout == 60" +' +test_expect_success 'reconfig with warn_thresh=-1 fails' ' + test_must_fail flux config load <<-EOT + [heartbeat] + warn_thresh = -1 + EOT +' +test_expect_success 'reconfig with warn_thresh=10 works' ' + flux config load <<-EOT && + [heartbeat] + warn_thresh = 10 + EOT + flux module stats heartbeat | jq -r -e ".warn_thresh == 10" +' +test_expect_success 'reload with period=0.1s timeout=infinity warn_thresh=3' ' + flux exec flux setattr log-stderr-level 4 && + flux exec -r 1 flux dmesg -C && + flux exec -r 1 flux config load <<-EOT && + [heartbeat] + period = "0.1s" + timeout = "infinity" + warn_thresh = 3 + EOT + flux exec -r 1 flux module reload heartbeat && + flux exec -r 1 flux module stats heartbeat +' +test_expect_success 'stop leader broker and get follower log messages' ' + flux module remove heartbeat && + sleep 2 && + flux module load heartbeat && + flux exec -r 1 flux dmesg -H >dmesg.log +' +test_expect_success 'heartbeat overdue was logged' ' + grep -q "heartbeat overdue" dmesg.log +' test_expect_success 'unload heartbeat' ' - flux module unload heartbeat + flux exec flux module unload heartbeat ' - test_done From 0eef5ac6449a9396d2a1b1835f7ca4ac7c619480 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 3 Mar 2025 20:17:10 -0800 Subject: [PATCH 10/11] testsuite: cover system-level heartbeat timeout Problem: there is no test coverage to verify that a heartbeat timeout can cause a broker to disconnect from its parent. Add a test for that. --- t/Makefile.am | 1 + t/t3310-system-heartbeat.t | 39 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+) create mode 100755 t/t3310-system-heartbeat.t diff --git a/t/Makefile.am b/t/Makefile.am index 7288fa860b98..9bcb8048ab13 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -261,6 +261,7 @@ TESTSCRIPTS = \ t3307-system-leafcrash.t \ t3308-system-torpid.t \ t3309-system-reconnect.t \ + t3310-system-heartbeat.t \ t3400-overlay-trace.t \ t3401-module-trace.t \ lua/t0001-send-recv.t \ diff --git a/t/t3310-system-heartbeat.t b/t/t3310-system-heartbeat.t new file mode 100755 index 000000000000..702939bd8980 --- /dev/null +++ b/t/t3310-system-heartbeat.t @@ -0,0 +1,39 @@ +#!/bin/sh +# + +test_description='Deprive a leaf node of heartbeats and make it sad' + +. `dirname $0`/sharness.sh + +startctl="flux python ${SHARNESS_TEST_SRCDIR}/scripts/startctl.py" + +test_under_flux 2 system + +test_expect_success 'ensure child is online' ' + flux overlay status --timeout=0 --wait full +' + +test_expect_success 'tell brokers to log to stderr' ' + flux exec flux setattr log-stderr-mode local +' +test_expect_success 'configure heartbeat' ' + flux exec flux config load <<-EOT + [heartbeat] + period = "0.1s" + timeout = "0.5s" + EOT +' +test_expect_success 'stop heartbeat' ' + flux module remove heartbeat +' +test_expect_success 'wait for rank 1 to exit' ' + test_expect_code 1 $startctl wait 1 +' +test_expect_success 'start heartbeat' ' + flux module load heartbeat +' +test_expect_success 'wait for degraded status' ' + flux overlay status --timeout=0 --wait=degraded +' + +test_done From 9b9266215f3f405c279130f36eeb5725440b35b1 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 3 Mar 2025 12:33:19 -0800 Subject: [PATCH 11/11] flux-config-heartbeat(5): describe timeout Problem: the heartbeat timeout configuration is not documented. Update the man page. Add a USE CASES section to provide background on the various config settings. --- doc/man5/flux-config-heartbeat.rst | 58 ++++++++++++++++++++++++++---- 1 file changed, 52 insertions(+), 6 deletions(-) diff --git a/doc/man5/flux-config-heartbeat.rst b/doc/man5/flux-config-heartbeat.rst index 4252ca400a16..2dba11701e7d 100644 --- a/doc/man5/flux-config-heartbeat.rst +++ b/doc/man5/flux-config-heartbeat.rst @@ -6,20 +6,30 @@ flux-config-heartbeat(5) DESCRIPTION =========== -The ``heartbeat`` table may be used to tune the configuration of the -Flux heartbeat module, which publishes periodic ``heartbeat.pulse`` messages -for synchronization. - -It may contain the following keys: +The Flux heartbeat service publishes periodic ``heartbeat.pulse`` messages +from the leader broker for synchronization. Follower brokers subscribe +to these messages and may optionally force a disconnect from their overlay +network parent when they are are missed for a configurable period. +The ``heartbeat`` table may be used to tune the heartbeat service. It may +contain the following keys: KEYS ==== period (optional) The interval (in RFC 23 Flux Standard Duration format) between - the publication of heartbeat messages. Default: ``"2s"``. + the publication of heartbeat messages. Default: *2s*. + +timeout + (optional) The period (in RFC 23 Flux Standard Duration format) after + which a follower broker will forcibly disconnect from its overlay network + parent if it hasn't received a heartbeat message. Set to *0* or *infinity* + to disable. Default: *5m*. +warn_thresh + (optional) The number of missed heartbeat periods after which a warning + message will be logged. Default: 3. EXAMPLE ======= @@ -28,7 +38,43 @@ EXAMPLE [heartbeat] period = "5s" + timeout = "1m" + warn_thresh = 3 + +USE CASES +========= +Heartbeats may be used to synchronize Flux activities across brokers to +reduce the operating system jitter that affects some sensitive bulk-synchronous +applications. :man3:`flux_sync_create` provides a way to invoke work that +is synchronized with the heartbeat. + +.. note:: + The efficacy of heartbeats to mitigate noise is limited by the propagation + delay of published messages through the tree based overlay network; however, + this may be reduced in the future with a side channel transport such as + TCP multicast, hardware collectives, or quantum entanglement. + +The heartbeat timeout may be used to work around a peculiarity of ZeroMQ, +the software layer underpinning the overlay network. When a Flux broker +loses the TCP connection to its overlay parent without a shutdown (for example, +if the parent crashes or there is a network partition and TCP times out), +ZeroMQ tries indefinitely to re-establish the connection without informing +the broker. The child broker remains in RUN state with any upstream RPCs +blocked until the parent returns to service, after which it is forced to +disconnect and shut down, which causes the RPCs to fail. A heartbeat timeout +forces the broker to "fail fast", with the same net effect, but arriving +at a steady state sooner. + +The effect of a follower broker shutdown depends on its role. If it is +not a leaf node, the effect applies to its entire subtree. In a system +instance, systemd restarts brokers that shut down this way. Upon restart, +the brokers remain in JOIN state until the parent returns to service. +The heartbeat service is not loaded until after the parent connection is +established, so heartbeat timeouts do not apply in this phase. In a user +allocation where brokers are not restarted, the outcome depends on whether +or not the broker is one of the *critical ranks* described in +:man7:`flux-broker-attributes`. RESOURCES =========