From cd353414274e736dbf971ee56e70294e6fb494ba Mon Sep 17 00:00:00 2001 From: Alberto Carretero Date: Wed, 17 Jul 2024 10:45:57 +0200 Subject: [PATCH 1/4] feat(snapshot): integrate pool, first version --- Makefile.am | 4 + src/raft/recv_install_snapshot.c | 45 ++++-- src/raft/recv_install_snapshot.h | 14 +- test/raft/unit/test_snapshot.c | 261 ++++++++++++++++++++++++------- 4 files changed, 249 insertions(+), 75 deletions(-) diff --git a/Makefile.am b/Makefile.am index fb601c71b..bb780ae1c 100644 --- a/Makefile.am +++ b/Makefile.am @@ -221,6 +221,7 @@ libraft_la_LDFLAGS = $(UV_LIBS) raft_core_unit_test_SOURCES = \ $(libraft_la_SOURCES) \ src/lib/sm.c \ + src/lib/threadpool.c \ src/tracing.c \ test/raft/unit/main_core.c \ test/raft/unit/test_byte.c \ @@ -238,6 +239,7 @@ raft_core_unit_test_LDADD = libtest.la raft_core_integration_test_SOURCES = \ src/tracing.c \ src/lib/sm.c \ + src/lib/threadpool.c \ test/raft/integration/main_core.c \ test/raft/integration/test_apply.c \ test/raft/integration/test_assign.c \ @@ -263,6 +265,7 @@ raft_core_integration_test_LDADD = libtest.la libraft.la raft_core_fuzzy_test_SOURCES = \ src/lib/sm.c \ + src/lib/threadpool.c \ src/tracing.c \ test/raft/fuzzy/main_core.c \ test/raft/fuzzy/test_election.c \ @@ -295,6 +298,7 @@ raft_uv_integration_test_SOURCES = \ $(libraft_la_SOURCES) \ src/tracing.c \ src/lib/sm.c \ + src/lib/threadpool.c \ test/raft/integration/main_uv.c \ test/raft/integration/test_uv_init.c \ test/raft/integration/test_uv_append.c \ diff --git a/src/raft/recv_install_snapshot.c b/src/raft/recv_install_snapshot.c index 8ee79ebe7..e13847240 100644 --- a/src/raft/recv_install_snapshot.c +++ b/src/raft/recv_install_snapshot.c @@ -14,6 +14,7 @@ #include "../raft.h" #include "../raft/recv_install_snapshot.h" #include "../utils.h" +#include "../lib/threadpool.h" /** * =Overview @@ -301,11 +302,11 @@ static const struct sm_conf rpc_sm_conf[RPC_NR] = { }, [RPC_ERROR] = { .name = "error", - .allowed = BITS(RPC_INIT), .flags = SM_FINAL, }, [RPC_END] = { .name = "end", + .allowed = BITS(RPC_END), .flags = SM_FINAL, }, }; @@ -369,12 +370,6 @@ static const struct sm_conf to_sm_conf[TO_NR] = { #define M_TIMEOUT ((const struct raft_message *) 2) #define M_WORK_DONE ((const struct raft_message *) 1) -static bool is_main_thread(void) -{ - // TODO: thread local storage. - return true; -} - static bool work_sm_invariant(const struct sm *sm, int prev_state) { (void)sm; @@ -410,17 +405,23 @@ static bool to_sm_invariant(const struct sm *sm, int prev_state) return true; } -static void leader_work_done(struct work *w) +static void leader_work_done(pool_work_t *w) { - struct leader *leader = CONTAINER_OF(w, struct leader, work); - sm_move(&w->sm, WORK_DONE); + struct work *work = CONTAINER_OF(w, struct work, pool_work); + struct leader *leader = CONTAINER_OF(work, struct leader, work); + + PRE(leader->ops->is_main_thread()); + sm_move(&work->sm, WORK_DONE); leader_tick(leader, M_WORK_DONE); } -static void follower_work_done(struct work *w) +static void follower_work_done(pool_work_t *w) { - struct follower *follower = CONTAINER_OF(w, struct follower, work); - sm_move(&w->sm, WORK_DONE); + struct work *work = CONTAINER_OF(w, struct work, pool_work); + struct follower *follower = CONTAINER_OF(work, struct follower, work); + + PRE(follower->ops->is_main_thread()); + sm_move(&work->sm, WORK_DONE); follower_tick(follower, M_WORK_DONE); } @@ -429,6 +430,11 @@ static void rpc_to_cb(uv_timer_t *handle) struct timeout *to = CONTAINER_OF(handle, struct timeout, handle); struct rpc *rpc = CONTAINER_OF(to, struct rpc, timeout); struct leader *leader = CONTAINER_OF(rpc, struct leader, rpc); + + PRE(leader->ops->is_main_thread()); + if (sm_state(&to->sm) == TO_CANCELED) { + return; + } sm_move(&to->sm, TO_EXPIRED); sm_move(&rpc->sm, RPC_TIMEDOUT); leader_tick(leader, M_TIMEOUT); @@ -438,6 +444,11 @@ static void leader_to_cb(uv_timer_t *handle) { struct timeout *to = CONTAINER_OF(handle, struct timeout, handle); struct leader *leader = CONTAINER_OF(to, struct leader, timeout); + + PRE(leader->ops->is_main_thread()); + if (sm_state(&to->sm) == TO_CANCELED) { + return; + } sm_move(&to->sm, TO_EXPIRED); leader_tick(leader, M_TIMEOUT); } @@ -465,11 +476,11 @@ static void leader_sent_cb(struct sender *s, int rc) struct rpc *rpc = CONTAINER_OF(s, struct rpc, sender); struct leader *leader = CONTAINER_OF(rpc, struct leader, rpc); + PRE(leader->ops->is_main_thread()); if (UNLIKELY(rc != 0)) { sm_move(&rpc->sm, RPC_ERROR); return; } - leader_tick(leader, M_MSG_SENT); } @@ -478,11 +489,11 @@ static void follower_sent_cb(struct sender *s, int rc) struct rpc *rpc = CONTAINER_OF(s, struct rpc, sender); struct follower *follower = CONTAINER_OF(rpc, struct follower, rpc); + PRE(follower->ops->is_main_thread()); if (UNLIKELY(rc != 0)) { sm_move(&rpc->sm, RPC_ERROR); return; } - follower_tick(follower, M_MSG_SENT); } @@ -701,7 +712,7 @@ __attribute__((unused)) void leader_tick(struct leader *leader, const struct raf struct sm *sm = &leader->sm; const struct leader_ops *ops = leader->ops; - PRE(is_main_thread()); + PRE(ops->is_main_thread()); if (!is_a_trigger_leader(leader, incoming) || is_a_duplicate(leader, incoming)) @@ -778,7 +789,7 @@ __attribute__((unused)) void follower_tick(struct follower *follower, const stru is_a_duplicate(follower, incoming)) return; - PRE(is_main_thread()); + PRE(ops->is_main_thread()); again: switch (sm_state(&follower->sm)) { diff --git a/src/raft/recv_install_snapshot.h b/src/raft/recv_install_snapshot.h index 4cb1072b2..75dc5d91b 100644 --- a/src/raft/recv_install_snapshot.h +++ b/src/raft/recv_install_snapshot.h @@ -1,5 +1,4 @@ /* InstallSnapshot RPC handlers. */ - #ifndef RECV_INSTALL_SNAPSHOT_H_ #define RECV_INSTALL_SNAPSHOT_H_ @@ -7,19 +6,22 @@ #include #include "../raft.h" +#include "../lib/threadpool.h" struct work; struct sender; struct timeout; typedef void (*to_cb_op)(uv_timer_t *handle); -typedef void (*work_op)(struct work *w); +typedef void (*work_op)(pool_work_t *w); typedef void (*sender_cb_op)(struct sender *s, int rc); struct work { + struct sm sm; work_op work_cb; work_op after_cb; - struct sm sm; + + pool_work_t pool_work; }; struct sender { @@ -27,8 +29,8 @@ struct sender { }; struct timeout { - to_cb_op cb; struct sm sm; + to_cb_op cb; uv_timer_t handle; }; @@ -55,6 +57,7 @@ struct leader_ops { void (*work_queue)(struct work *w, work_op work, work_op after_cb); + bool (*is_main_thread)(void); }; struct follower_ops { @@ -66,6 +69,7 @@ struct follower_ops { sender_send_op sender_send; void (*work_queue)(struct work *w, work_op work, work_op after_cb); + bool (*is_main_thread)(void); }; struct leader { @@ -128,7 +132,7 @@ static const struct sm_conf leader_sm_conf[LS_NR] = { .flags = SM_INITIAL | SM_FINAL, .name = "online", .allowed = BITS(LS_HT_WAIT) - | BITS(LS_F_ONLINE), + | BITS(LS_F_ONLINE), }, [LS_HT_WAIT] = { .name = "ht-wait", diff --git a/test/raft/unit/test_snapshot.c b/test/raft/unit/test_snapshot.c index cd19f3fea..93fbb6988 100644 --- a/test/raft/unit/test_snapshot.c +++ b/test/raft/unit/test_snapshot.c @@ -8,6 +8,7 @@ #include "../../../src/raft.h" #include "../../../src/raft/recv_install_snapshot.h" #include "../../../src/utils.h" +#include "../../../src/lib/threadpool.h" struct fixture { }; @@ -39,34 +40,34 @@ static void ut_follower_message_received(struct follower *follower, follower_tick(follower, incoming); } -static void ut_ht_create_op(struct work *w) +static void ut_ht_create_op(pool_work_t *w) { (void)w; } -static void ut_fill_ht_op(struct work *w) +static void ut_fill_ht_op(pool_work_t *w) { (void)w; } -static void ut_write_chunk_op(struct work *w) +static void ut_write_chunk_op(pool_work_t *w) { (void)w; } -static void ut_read_sig_op(struct work *w) +static void ut_read_sig_op(pool_work_t *w) { (void)w; } static void ut_disk_io(struct work *work) { - work->work_cb(work); + work->work_cb(&work->pool_work); } static void ut_disk_io_done(struct work *work) { - work->after_cb(work); + work->after_cb(&work->pool_work); } static void ut_to_expired(struct leader *leader) @@ -177,6 +178,10 @@ int ut_sender_send_op(struct sender *s, return 0; } +static bool ut_is_main_thread_op(void) { + return true; +} + TEST(snapshot_follower, basic, set_up, tear_down, 0, NULL) { struct follower_ops ops = { .ht_create = ut_ht_create_op, @@ -185,6 +190,7 @@ TEST(snapshot_follower, basic, set_up, tear_down, 0, NULL) { .read_sig = ut_read_sig_op, .write_chunk = ut_write_chunk_op, .fill_ht = ut_fill_ht_op, + .is_main_thread = ut_is_main_thread_op, }; struct follower follower = { @@ -245,6 +251,7 @@ TEST(snapshot_leader, basic, set_up, tear_down, 0, NULL) { .ht_create = ut_ht_create_op, .work_queue = ut_work_queue_op, .sender_send = ut_sender_send_op, + .is_main_thread = ut_is_main_thread_op, }; struct leader leader = { @@ -306,6 +313,7 @@ TEST(snapshot_leader, timeouts, set_up, tear_down, 0, NULL) { .ht_create = ut_ht_create_op, .work_queue = ut_work_queue_op, .sender_send = ut_sender_send_op, + .is_main_thread = ut_is_main_thread_op, }; struct leader leader = { @@ -373,12 +381,45 @@ TEST(snapshot_leader, timeouts, set_up, tear_down, 0, NULL) { return MUNIT_OK; } +struct test_fixture { + union { + struct leader leader; + struct follower follower; + }; + /* true when union contains leader, false when it contains follower */ + bool is_leader; + pool_t pool; + + work_op orig_cb; + bool work_done; +}; + +/* Not problematic because each test runs in a different process. */ +static struct test_fixture global_fixture; + +#define MAGIC_MAIN_THREAD 0xdef1 + +static __thread int thread_identifier; + static void progress(void) { - for (unsigned i = 0; i < 100; i++) { + for (unsigned i = 0; i < 20; i++) { uv_run(uv_default_loop(), UV_RUN_NOWAIT); } } +static void wait_work(void) { + while (!global_fixture.work_done) { + uv_run(uv_default_loop(), UV_RUN_NOWAIT); + } +} + +/* Decorates the callback used when the pool work is done to set the test + * fixture flag to true, then calls the original callback.*/ +static void test_fixture_work_cb(pool_work_t *w) { + global_fixture.work_done = true; + global_fixture.orig_cb(w); +} + static void pool_to_start_op(struct timeout *to, unsigned delay, to_cb_op cb) { uv_timer_start(&to->handle, cb, delay, 0); @@ -395,6 +436,14 @@ static void pool_to_init_op(struct timeout *to) uv_timer_init(uv_default_loop(), &to->handle); } +static void pool_work_queue_op(struct work *w, work_op work_cb, work_op after_cb) +{ + w->pool_work = (pool_work_t) { 0 }; + global_fixture.orig_cb = after_cb; + global_fixture.work_done = false; + pool_queue_work(&global_fixture.pool, &w->pool_work, 0/* */, WT_UNORD, work_cb, test_fixture_work_cb); +} + static void pool_to_expired(struct leader *leader) { uv_timer_start(&leader->timeout.handle, leader->timeout.cb, 0, 0); @@ -407,19 +456,62 @@ static void pool_rpc_to_expired(struct rpc *rpc) progress(); } -/* TODO(alberto): combine them with tests above once the rest is in place. - * Dispatch to one of two implementations ut or pool in general functions. */ +static bool pool_is_main_thread_op(void) { + return thread_identifier == MAGIC_MAIN_THREAD; +} + +static void pool_ht_create_op(pool_work_t *w) +{ + if (global_fixture.is_leader) { + PRE(!global_fixture.leader.ops->is_main_thread()); + } else { + PRE(!global_fixture.follower.ops->is_main_thread()); + } + (void)w; +} + +static void pool_fill_ht_op(pool_work_t *w) +{ + if (global_fixture.is_leader) { + PRE(!global_fixture.leader.ops->is_main_thread()); + } else { + PRE(!global_fixture.follower.ops->is_main_thread()); + } + (void)w; +} + +static void pool_write_chunk_op(pool_work_t *w) +{ + struct work *work = CONTAINER_OF(w, struct work, pool_work); + struct follower *follower = CONTAINER_OF(work, struct follower, work); + PRE(!follower->ops->is_main_thread()); +} + +static void pool_read_sig_op(pool_work_t *w) +{ + struct work *work = CONTAINER_OF(w, struct work, pool_work); + struct follower *follower = CONTAINER_OF(work, struct follower, work); + PRE(!follower->ops->is_main_thread()); +} + TEST(snapshot_leader, pool_timeouts, set_up, tear_down, 0, NULL) { struct leader_ops ops = { .to_init = pool_to_init_op, .to_stop = pool_to_stop_op, .to_start = pool_to_start_op, - .ht_create = ut_ht_create_op, - .work_queue = ut_work_queue_op, + .ht_create = pool_ht_create_op, + .work_queue = pool_work_queue_op, .sender_send = ut_sender_send_op, + .is_main_thread = pool_is_main_thread_op, }; - struct leader leader = { + pool_init(&global_fixture.pool, uv_default_loop(), 4, POOL_QOS_PRIO_FAIR); + global_fixture.pool.flags |= POOL_FOR_UT; + global_fixture.is_leader = true; + thread_identifier = MAGIC_MAIN_THREAD; + + struct leader *leader = &global_fixture.leader; + *leader = (struct leader) { .ops = &ops, .sigs_more = false, @@ -427,59 +519,122 @@ TEST(snapshot_leader, pool_timeouts, set_up, tear_down, 0, NULL) { .sigs_calculated = false, }; - sm_init(&leader.sm, leader_sm_invariant, + sm_init(&leader->sm, leader_sm_invariant, NULL, leader_sm_conf, "leader", LS_F_ONLINE); - PRE(sm_state(&leader.sm) == LS_F_ONLINE); - ut_leader_message_received(&leader, append_entries()); + PRE(sm_state(&leader->sm) == LS_F_ONLINE); + ut_leader_message_received(leader, append_entries()); - PRE(sm_state(&leader.sm) == LS_HT_WAIT); - ut_disk_io(&leader.work); - ut_disk_io_done(&leader.work); + wait_work(); - PRE(sm_state(&leader.sm) == LS_F_NEEDS_SNAP); - ut_rpc_sent(&leader.rpc); - pool_rpc_to_expired(&leader.rpc); + PRE(sm_state(&leader->sm) == LS_F_NEEDS_SNAP); + ut_rpc_sent(&leader->rpc); + pool_rpc_to_expired(&leader->rpc); - PRE(sm_state(&leader.sm) == LS_F_NEEDS_SNAP); - ut_rpc_sent(&leader.rpc); - ut_leader_message_received(&leader, ut_install_snapshot_result()); + PRE(sm_state(&leader->sm) == LS_F_NEEDS_SNAP); + ut_rpc_sent(&leader->rpc); + ut_leader_message_received(leader, ut_install_snapshot_result()); - PRE(sm_state(&leader.sm) == LS_CHECK_F_HAS_SIGS); - ut_rpc_sent(&leader.rpc); - ut_leader_message_received(&leader, ut_sign_result()); - pool_to_expired(&leader); + PRE(sm_state(&leader->sm) == LS_CHECK_F_HAS_SIGS); + ut_rpc_sent(&leader->rpc); + ut_leader_message_received(leader, ut_sign_result()); + pool_to_expired(leader); - PRE(sm_state(&leader.sm) == LS_CHECK_F_HAS_SIGS); - ut_rpc_sent(&leader.rpc); - pool_rpc_to_expired(&leader.rpc); + PRE(sm_state(&leader->sm) == LS_CHECK_F_HAS_SIGS); + ut_rpc_sent(&leader->rpc); + pool_rpc_to_expired(&leader->rpc); - PRE(sm_state(&leader.sm) == LS_CHECK_F_HAS_SIGS); - leader.sigs_calculated = true; - ut_rpc_sent(&leader.rpc); - ut_leader_message_received(&leader, ut_sign_result()); + PRE(sm_state(&leader->sm) == LS_CHECK_F_HAS_SIGS); + leader->sigs_calculated = true; + ut_rpc_sent(&leader->rpc); + ut_leader_message_received(leader, ut_sign_result()); - PRE(sm_state(&leader.sm) == LS_REQ_SIG_LOOP); - ut_rpc_sent(&leader.rpc); - PRE(sm_state(&leader.sm) == LS_REQ_SIG_LOOP); - ut_leader_message_received(&leader, ut_sign_result()); - ut_disk_io(&leader.work); - ut_disk_io_done(&leader.work); - ut_disk_io(&leader.work); - ut_disk_io_done(&leader.work); + PRE(sm_state(&leader->sm) == LS_REQ_SIG_LOOP); + ut_rpc_sent(&leader->rpc); + PRE(sm_state(&leader->sm) == LS_REQ_SIG_LOOP); + ut_leader_message_received(leader, ut_sign_result()); - PRE(sm_state(&leader.sm) == LS_PAGE_READ); - ut_rpc_sent(&leader.rpc); - pool_rpc_to_expired(&leader.rpc); + wait_work(); + wait_work(); - PRE(sm_state(&leader.sm) == LS_PAGE_READ); - ut_rpc_sent(&leader.rpc); - ut_leader_message_received(&leader, ut_page_result()); + PRE(sm_state(&leader->sm) == LS_PAGE_READ); + ut_rpc_sent(&leader->rpc); + pool_rpc_to_expired(&leader->rpc); - PRE(sm_state(&leader.sm) == LS_SNAP_DONE); - ut_rpc_sent(&leader.rpc); - ut_leader_message_received(&leader, ut_install_snapshot_result()); + PRE(sm_state(&leader->sm) == LS_PAGE_READ); + ut_rpc_sent(&leader->rpc); + ut_leader_message_received(leader, ut_page_result()); - sm_fini(&leader.sm); + PRE(sm_state(&leader->sm) == LS_SNAP_DONE); + ut_rpc_sent(&leader->rpc); + ut_leader_message_received(leader, ut_install_snapshot_result()); + + sm_fini(&leader->sm); + return MUNIT_OK; +} + +TEST(snapshot_follower, pool, set_up, tear_down, 0, NULL) { + struct follower_ops ops = { + .ht_create = pool_ht_create_op, + .work_queue = pool_work_queue_op, + .sender_send = ut_sender_send_op, + .read_sig = pool_read_sig_op, + .write_chunk = pool_write_chunk_op, + .fill_ht = pool_fill_ht_op, + .is_main_thread = pool_is_main_thread_op, + }; + + pool_init(&global_fixture.pool, uv_default_loop(), 4, POOL_QOS_PRIO_FAIR); + global_fixture.pool.flags |= POOL_FOR_UT; + global_fixture.is_leader = false; + thread_identifier = MAGIC_MAIN_THREAD; + + struct follower *follower = &global_fixture.follower; + + *follower = (struct follower) { + .ops = &ops, + }; + + sm_init(&follower->sm, follower_sm_invariant, + NULL, follower_sm_conf, "follower", FS_NORMAL); + + PRE(sm_state(&follower->sm) == FS_NORMAL); + ut_follower_message_received(follower, ut_install_snapshot()); + ut_rpc_sent(&follower->rpc); + + wait_work(); + + PRE(sm_state(&follower->sm) == FS_SIGS_CALC_LOOP); + ut_follower_message_received(follower, ut_sign()); + ut_rpc_sent(&follower->rpc); + + PRE(sm_state(&follower->sm) == FS_SIGS_CALC_LOOP); + + follower->sigs_calculated = true; + wait_work(); + + PRE(sm_state(&follower->sm) == FS_SIGS_CALC_LOOP); + ut_follower_message_received(follower, ut_sign()); + ut_rpc_sent(&follower->rpc); + + PRE(sm_state(&follower->sm) == FS_SIG_RECEIVING); + ut_follower_message_received(follower, ut_sign()); + + PRE(sm_state(&follower->sm) == FS_SIG_PROCESSED); + + wait_work(); + + PRE(sm_state(&follower->sm) == FS_SIG_READ); + ut_rpc_sent(&follower->rpc); + + PRE(sm_state(&follower->sm) == FS_CHUNCK_RECEIVING); + ut_follower_message_received(follower, ut_page()); + + wait_work(); + + PRE(sm_state(&follower->sm) == FS_CHUNCK_APPLIED); + ut_rpc_sent(&follower->rpc); + + sm_fini(&follower->sm); return MUNIT_OK; } From 834489e98a1100472eef813aef342a264f8741d2 Mon Sep 17 00:00:00 2001 From: Alberto Carretero Date: Wed, 17 Jul 2024 10:52:20 +0200 Subject: [PATCH 2/4] bugfix(snapshot): add missing state to follower sm --- src/raft/recv_install_snapshot.c | 15 +++++++++------ src/raft/recv_install_snapshot.h | 8 +++++++- test/raft/unit/test_snapshot.c | 8 ++++++++ 3 files changed, 24 insertions(+), 7 deletions(-) diff --git a/src/raft/recv_install_snapshot.c b/src/raft/recv_install_snapshot.c index e13847240..b1ebf1bb2 100644 --- a/src/raft/recv_install_snapshot.c +++ b/src/raft/recv_install_snapshot.c @@ -94,10 +94,9 @@ * job. Once the background job is finished, the Follower replies with * CPResult() or MVResult(). * - * 8. When the iteration has finished the Leader sends - * InstallShapshot(..., done=true) message to the Follower. It moves the - * Follower back to NORMAL state and the state machine corresponding to the - * Follower on the Leader is moved to SNAPSHOT_DONE state. + * 8. When the iteration has finished the Leader moves into SNAP_DONE state and + * sends InstallShapshot(..., done=true) message to the Follower. When Follower + * replies, both state machines move to their FINAL states. * * 9. The Leader sends AppendEntries() RPC to the Follower and restarts the * algorithm from (1). The Leader's state machine is being moved to @@ -242,8 +241,11 @@ * | +------- CHUNK_APPLIED | * | | V Chunk has been written to disk. | * | +------- CHUNK_REPLIED ---------------------------+ - * | (@ || %) | CP()/MV() had done=true. - * | V CPResult()/MVResult() sent. + * | | | CP()/MV() had done=true. + * | | V CPResult()/MVResult() sent. + * | +--------- SNAP_DONE + * | (@ || %) | InstallSnapshot(done=true) received, + * | V and reply sent. * | FINAL * | | * +-----------------------+ @@ -797,6 +799,7 @@ __attribute__((unused)) void follower_tick(struct follower *follower, const stru case FS_SIGS_CALC_LOOP: case FS_SIG_READ: case FS_CHUNCK_APPLIED: + case FS_SNAP_DONE: follower_rpc_tick(&follower->rpc); if (sm_state(&follower->rpc.sm) == RPC_SENT) { rpc_fini(&follower->rpc); diff --git a/src/raft/recv_install_snapshot.h b/src/raft/recv_install_snapshot.h index 75dc5d91b..a975b18c4 100644 --- a/src/raft/recv_install_snapshot.h +++ b/src/raft/recv_install_snapshot.h @@ -222,6 +222,7 @@ enum follower_states { FS_CHUNCK_APPLIED, FS_CHUNCK_REPLIED, + FS_SNAP_DONE, FS_FINAL, FS_NR, @@ -305,7 +306,12 @@ static const struct sm_conf follower_sm_conf[FS_NR] = { [FS_CHUNCK_REPLIED] = { .name = "chunk_replied", .allowed = BITS(FS_CHUNCK_PROCESSED) - | BITS(FS_FINAL) + | BITS(FS_SNAP_DONE) + | BITS(FS_NORMAL), + }, + [FS_SNAP_DONE] = { + .name = "snap_done", + .allowed = BITS(FS_FINAL) | BITS(FS_NORMAL), }, [FS_FINAL] = { diff --git a/test/raft/unit/test_snapshot.c b/test/raft/unit/test_snapshot.c index 93fbb6988..9781ebb3b 100644 --- a/test/raft/unit/test_snapshot.c +++ b/test/raft/unit/test_snapshot.c @@ -239,6 +239,10 @@ TEST(snapshot_follower, basic, set_up, tear_down, 0, NULL) { PRE(sm_state(&follower.sm) == FS_CHUNCK_APPLIED); ut_rpc_sent(&follower.rpc); + PRE(sm_state(&follower.sm) == FS_SNAP_DONE); + ut_follower_message_received(&follower, ut_install_snapshot()); + ut_rpc_sent(&follower.rpc); + sm_fini(&follower.sm); return MUNIT_OK; } @@ -635,6 +639,10 @@ TEST(snapshot_follower, pool, set_up, tear_down, 0, NULL) { PRE(sm_state(&follower->sm) == FS_CHUNCK_APPLIED); ut_rpc_sent(&follower->rpc); + PRE(sm_state(&follower->sm) == FS_SNAP_DONE); + ut_follower_message_received(follower, ut_install_snapshot()); + ut_rpc_sent(&follower->rpc); + sm_fini(&follower->sm); return MUNIT_OK; } From 7de953b70a80e5a80506e05aac3183f9e4b11616 Mon Sep 17 00:00:00 2001 From: Alberto Carretero Date: Wed, 17 Jul 2024 10:58:26 +0200 Subject: [PATCH 3/4] feat(snapshot): rpc fill and rpc trigger, first version --- src/raft.h | 2 + src/raft/recv_install_snapshot.c | 184 ++++++++++++++++++++++--- src/raft/recv_install_snapshot.h | 1 + test/raft/unit/test_snapshot.c | 225 ++++++++++++++++++++++++------- 4 files changed, 349 insertions(+), 63 deletions(-) diff --git a/src/raft.h b/src/raft.h index b9f4d2a45..fce3f5518 100644 --- a/src/raft.h +++ b/src/raft.h @@ -379,6 +379,7 @@ struct raft_signature { struct page_from_to page_from_to; pageno_t cs_page_no; enum raft_result result; + bool ask_calculated; }; #define RAFT_SIGNATURE_VERSION 0 @@ -390,6 +391,7 @@ struct raft_signature_result { unsigned int cs_nr; pageno_t cs_page_no; enum raft_result result; + bool calculated; }; #define RAFT_SIGNATURE_RESULT_VERSION 0 diff --git a/src/raft/recv_install_snapshot.c b/src/raft/recv_install_snapshot.c index b1ebf1bb2..e6763f84d 100644 --- a/src/raft/recv_install_snapshot.c +++ b/src/raft/recv_install_snapshot.c @@ -16,6 +16,20 @@ #include "../utils.h" #include "../lib/threadpool.h" +#define IN_1(E, X) E == X +#define IN_2(E, X, ...) E == X || IN_1(E,__VA_ARGS__) +#define IN_3(E, X, ...) E == X || IN_2(E,__VA_ARGS__) +#define IN_4(E, X, ...) E == X || IN_3(E,__VA_ARGS__) +#define IN_5(E, X, ...) E == X || IN_4(E,__VA_ARGS__) +#define IN_6(E, X, ...) E == X || IN_5(E,__VA_ARGS__) +#define IN_7(E, X, ...) E == X || IN_6(E,__VA_ARGS__) +#define IN_8(E, X, ...) E == X || IN_7(E,__VA_ARGS__) +#define IN_9(E, X, ...) E == X || IN_8(E,__VA_ARGS__) + +#define GET_IN_MACRO(_1,_2,_3,_4,_5,_6,_7,_8,_9,NAME,...) NAME +#define IN(E, ...) \ + (GET_IN_MACRO(__VA_ARGS__,IN_9,IN_8,IN_7,IN_6,IN_5,IN_4,IN_3,IN_2,IN_1)(E,__VA_ARGS__)) + /** * =Overview * @@ -501,22 +515,91 @@ static void follower_sent_cb(struct sender *s, int rc) static bool is_a_trigger_leader(const struct leader *leader, const struct raft_message *incoming) { - (void)leader; - (void)incoming; - return true; + int state = sm_state(&leader->sm); + + /* Special cases: */ + if (incoming == M_WORK_DONE) { + return IN(state, LS_HT_WAIT, LS_PAGE_SENT, LS_PERSISTED_SIG_PART, + LS_PAGE_READ); + } else if (incoming == M_MSG_SENT) { + if (sm_state(&leader->rpc.sm) != RPC_FILLED) { + return false; + } + return IN(state, LS_PAGE_READ, LS_SNAP_DONE, LS_F_NEEDS_SNAP, + LS_REQ_SIG_LOOP, LS_CHECK_F_HAS_SIGS); + } else if (incoming == M_TIMEOUT) { + return IN(state, LS_PAGE_READ, LS_SNAP_DONE, LS_F_NEEDS_SNAP, + LS_REQ_SIG_LOOP, LS_CHECK_F_HAS_SIGS, LS_WAIT_SIGS); + } + + /* From now on, the message pointer is a valid pointer. */ + PRE(incoming != M_MSG_SENT && incoming != M_TIMEOUT && + incoming != M_WORK_DONE); + + /* Leader has not send the AppendEntries message but reacts on its + * reply. */ + if (state == LS_F_ONLINE) { + // TODO check if raft entry is present, else it is not a trigger. + return incoming->type == RAFT_IO_APPEND_ENTRIES_RESULT; + } + + /* Leader is waiting for follower reply. */ + if (sm_state(&leader->rpc.sm) != RPC_SENT) { + /* We are not expecting a reply. */ + return false; + }; + switch (state) { + case LS_PAGE_READ: + return IN(incoming->type, RAFT_IO_INSTALL_SNAPSHOT_CP_RESULT, + RAFT_IO_INSTALL_SNAPSHOT_MV_RESULT); + case LS_SNAP_DONE: + case LS_F_NEEDS_SNAP: + return incoming->type == RAFT_IO_INSTALL_SNAPSHOT_RESULT; + case LS_REQ_SIG_LOOP: + case LS_CHECK_F_HAS_SIGS: + return incoming->type == RAFT_IO_SIGNATURE_RESULT; + } + return false; } static bool is_a_trigger_follower(const struct follower *follower, const struct raft_message *incoming) { - switch (sm_state(&follower->sm)) { + int state = sm_state(&follower->sm); + + /* Special cases: */ + if (incoming == M_WORK_DONE) { + return IN(state, FS_SIG_PROCESSED, FS_CHUNCK_PROCESSED, + FS_SIG_PROCESSED, FS_CHUNCK_PROCESSED, FS_CHUNCK_REPLIED, + FS_HT_WAIT); + } else if (incoming == M_MSG_SENT) { + if (sm_state(&follower->rpc.sm) != RPC_FILLED) { + return false; + } + return IN(state, FS_NORMAL, FS_SIGS_CALC_LOOP, FS_SIG_READ, + FS_CHUNCK_APPLIED, FS_SNAP_DONE); + } else if (incoming == M_TIMEOUT) { + /* No timeouts in follower. */ + return false; + } + + /* From now on, the message pointer is a valid pointer. */ + PRE(incoming != M_MSG_SENT && incoming != M_TIMEOUT && + incoming != M_WORK_DONE); + + switch (state) { + case FS_NORMAL: + case FS_SNAP_DONE: + return incoming->type == RAFT_IO_INSTALL_SNAPSHOT; case FS_SIGS_CALC_LOOP: - return incoming != M_WORK_DONE; - case FS_SIG_PROCESSED: - case FS_CHUNCK_PROCESSED: - return incoming == M_WORK_DONE; + return incoming->type == RAFT_IO_SIGNATURE; + case FS_SIG_RECEIVING: + return incoming->type == RAFT_IO_SIGNATURE; + case FS_CHUNCK_RECEIVING: + return IN(incoming->type, RAFT_IO_INSTALL_SNAPSHOT_CP, + RAFT_IO_INSTALL_SNAPSHOT_MV); } - return true; + return false; } static bool is_a_duplicate(const void *state, @@ -572,16 +655,83 @@ static void work_fill_follower(struct follower *follower) static void rpc_fill_leader(struct leader *leader) { - rpc_init(&leader->rpc); - sm_relate(&leader->sm, &leader->rpc.sm); - sm_move(&leader->rpc.sm, RPC_FILLED); + struct rpc *rpc = &leader->rpc; + + rpc_init(rpc); + sm_relate(&leader->sm, &rpc->sm); + + switch (sm_state(&leader->sm)) { + case LS_PAGE_READ: + rpc->message = (struct raft_message) { + .type = RAFT_IO_INSTALL_SNAPSHOT_CP, // TODO CP OR MV. + }; + break; + case LS_SNAP_DONE: + rpc->message = (struct raft_message) { + .type = RAFT_IO_INSTALL_SNAPSHOT, + .install_snapshot = (struct raft_install_snapshot) { + .result = RAFT_RESULT_DONE, + }, + }; + break; + case LS_F_NEEDS_SNAP: + rpc->message = (struct raft_message) { + .type = RAFT_IO_INSTALL_SNAPSHOT, + }; + break; + case LS_REQ_SIG_LOOP: + rpc->message = (struct raft_message) { + .type = RAFT_IO_SIGNATURE, + }; + break; + case LS_CHECK_F_HAS_SIGS: + rpc->message = (struct raft_message) { + .type = RAFT_IO_SIGNATURE, + .signature = (struct raft_signature) { + .ask_calculated = true, + }, + }; + break; + } + + sm_move(&rpc->sm, RPC_FILLED); } static void rpc_fill_follower(struct follower *follower) { - rpc_init(&follower->rpc); - sm_relate(&follower->sm, &follower->rpc.sm); - sm_move(&follower->rpc.sm, RPC_FILLED); + struct rpc *rpc = &follower->rpc; + + rpc_init(rpc); + sm_relate(&follower->sm, &rpc->sm); + + switch (sm_state(&follower->sm)) { + case FS_SIGS_CALC_LOOP: + rpc->message = (struct raft_message) { + .type = RAFT_IO_SIGNATURE_RESULT, + .signature_result = (struct raft_signature_result) { + .calculated = follower->sigs_calculated, + }, + }; + break; + case FS_SIG_READ: + rpc->message = (struct raft_message) { + .type = RAFT_IO_SIGNATURE_RESULT, + }; + break; + case FS_CHUNCK_APPLIED: + rpc->message = (struct raft_message) { + .type = RAFT_IO_INSTALL_SNAPSHOT_CP_RESULT, + }; + break; + case FS_NORMAL: + case FS_SNAP_DONE: + rpc->message = (struct raft_message) { + .type = RAFT_IO_INSTALL_SNAPSHOT_RESULT, + }; + break; + } + + sm_move(&rpc->sm, RPC_FILLED); } static int rpc_send(struct rpc *rpc, sender_send_op op, sender_cb_op sent_cb) @@ -644,7 +794,7 @@ static bool is_an_unexpected_trigger(const struct leader *leader, enum raft_result res = RAFT_RESULT_UNEXPECTED; switch (msg->type) { - case RAFT_IO_APPEND_ENTRIES: + case RAFT_IO_APPEND_ENTRIES_RESULT: res = RAFT_RESULT_OK; break; case RAFT_IO_INSTALL_SNAPSHOT: @@ -814,7 +964,6 @@ __attribute__((unused)) void follower_tick(struct follower *follower, const stru break; case FS_SIG_PROCESSED: case FS_CHUNCK_PROCESSED: - case FS_CHUNCK_REPLIED: case FS_HT_WAIT: sm_move(sm, follower_next_state(sm)); goto again; @@ -829,6 +978,7 @@ __attribute__((unused)) void follower_tick(struct follower *follower, const stru case FS_SIG_REPLIED: case FS_SIGS_CALC_DONE: case FS_SIGS_CALC_MSG_RECEIVED: + case FS_CHUNCK_REPLIED: case FS_FINAL: sm_move(sm, follower_next_state(sm)); break; diff --git a/src/raft/recv_install_snapshot.h b/src/raft/recv_install_snapshot.h index a975b18c4..614b7aec7 100644 --- a/src/raft/recv_install_snapshot.h +++ b/src/raft/recv_install_snapshot.h @@ -25,6 +25,7 @@ struct work { }; struct sender { + // TODO embbed the uv req here. sender_cb_op cb; }; diff --git a/test/raft/unit/test_snapshot.c b/test/raft/unit/test_snapshot.c index 9781ebb3b..368719c94 100644 --- a/test/raft/unit/test_snapshot.c +++ b/test/raft/unit/test_snapshot.c @@ -16,13 +16,13 @@ struct fixture { static void *set_up(MUNIT_UNUSED const MunitParameter params[], MUNIT_UNUSED void *user_data) { - struct fixture *f = munit_malloc(sizeof *f); - return f; + struct fixture *f = munit_malloc(sizeof *f); + return f; } static void tear_down(void *data) { - free(data); + free(data); } SUITE(snapshot_leader) @@ -85,10 +85,10 @@ static void ut_rpc_to_expired(struct rpc *rpc) rpc->timeout.cb(&rpc->timeout.handle); } -static const struct raft_message *append_entries(void) +static const struct raft_message *append_entries_result(void) { static struct raft_message append_entries = { - .type = RAFT_IO_APPEND_ENTRIES, + .type = RAFT_IO_APPEND_ENTRIES_RESULT, }; return &append_entries; @@ -170,10 +170,20 @@ static void ut_to_stop_op(struct timeout *to) (void)to; } +static bool ut_msg_consumed = false; +static struct raft_message ut_last_msg_sent; + +struct raft_message ut_get_msg_sent(void) { + munit_assert(!ut_msg_consumed); + ut_msg_consumed = true; + return ut_last_msg_sent; +} + int ut_sender_send_op(struct sender *s, struct raft_message *payload, sender_cb_op cb) { - (void)payload; + ut_last_msg_sent = *payload; + ut_msg_consumed = false; s->cb = cb; return 0; } @@ -203,6 +213,7 @@ TEST(snapshot_follower, basic, set_up, tear_down, 0, NULL) { PRE(sm_state(&follower.sm) == FS_NORMAL); ut_follower_message_received(&follower, ut_install_snapshot()); ut_rpc_sent(&follower.rpc); + munit_assert_int(ut_get_msg_sent().type, ==, RAFT_IO_INSTALL_SNAPSHOT_RESULT); ut_disk_io(&follower.work); PRE(sm_state(&follower.sm) == FS_HT_WAIT); @@ -211,6 +222,7 @@ TEST(snapshot_follower, basic, set_up, tear_down, 0, NULL) { PRE(sm_state(&follower.sm) == FS_SIGS_CALC_LOOP); ut_follower_message_received(&follower, ut_sign()); ut_rpc_sent(&follower.rpc); + munit_assert_int(ut_get_msg_sent().type, ==, RAFT_IO_SIGNATURE_RESULT); PRE(sm_state(&follower.sm) == FS_SIGS_CALC_LOOP); ut_disk_io(&follower.work); @@ -220,6 +232,7 @@ TEST(snapshot_follower, basic, set_up, tear_down, 0, NULL) { PRE(sm_state(&follower.sm) == FS_SIGS_CALC_LOOP); ut_follower_message_received(&follower, ut_sign()); ut_rpc_sent(&follower.rpc); + munit_assert_int(ut_get_msg_sent().type, ==, RAFT_IO_SIGNATURE_RESULT); PRE(sm_state(&follower.sm) == FS_SIG_RECEIVING); ut_follower_message_received(&follower, ut_sign()); @@ -230,6 +243,7 @@ TEST(snapshot_follower, basic, set_up, tear_down, 0, NULL) { PRE(sm_state(&follower.sm) == FS_SIG_READ); ut_rpc_sent(&follower.rpc); + munit_assert_int(ut_get_msg_sent().type, ==, RAFT_IO_SIGNATURE_RESULT); PRE(sm_state(&follower.sm) == FS_CHUNCK_RECEIVING); ut_follower_message_received(&follower, ut_page()); @@ -238,10 +252,12 @@ TEST(snapshot_follower, basic, set_up, tear_down, 0, NULL) { PRE(sm_state(&follower.sm) == FS_CHUNCK_APPLIED); ut_rpc_sent(&follower.rpc); + munit_assert_int(ut_get_msg_sent().type, ==, RAFT_IO_INSTALL_SNAPSHOT_CP_RESULT); PRE(sm_state(&follower.sm) == FS_SNAP_DONE); ut_follower_message_received(&follower, ut_install_snapshot()); ut_rpc_sent(&follower.rpc); + munit_assert_int(ut_get_msg_sent().type, ==, RAFT_IO_INSTALL_SNAPSHOT_RESULT); sm_fini(&follower.sm); return MUNIT_OK; @@ -270,7 +286,7 @@ TEST(snapshot_leader, basic, set_up, tear_down, 0, NULL) { NULL, leader_sm_conf, "leader", LS_F_ONLINE); PRE(sm_state(&leader.sm) == LS_F_ONLINE); - ut_leader_message_received(&leader, append_entries()); + ut_leader_message_received(&leader, append_entries_result()); PRE(sm_state(&leader.sm) == LS_HT_WAIT); ut_disk_io(&leader.work); @@ -278,17 +294,21 @@ TEST(snapshot_leader, basic, set_up, tear_down, 0, NULL) { PRE(sm_state(&leader.sm) == LS_F_NEEDS_SNAP); ut_rpc_sent(&leader.rpc); + munit_assert_int(ut_get_msg_sent().type, ==, RAFT_IO_INSTALL_SNAPSHOT); ut_leader_message_received(&leader, ut_install_snapshot_result()); PRE(sm_state(&leader.sm) == LS_CHECK_F_HAS_SIGS); ut_rpc_sent(&leader.rpc); + munit_assert_int(ut_get_msg_sent().type, ==, RAFT_IO_SIGNATURE); ut_leader_message_received(&leader, ut_sign_result()); ut_to_expired(&leader); leader.sigs_calculated = true; ut_rpc_sent(&leader.rpc); + munit_assert_int(ut_get_msg_sent().type, ==, RAFT_IO_SIGNATURE); ut_leader_message_received(&leader, ut_sign_result()); PRE(sm_state(&leader.sm) == LS_REQ_SIG_LOOP); + munit_assert_int(ut_get_msg_sent().type, ==, RAFT_IO_SIGNATURE); ut_rpc_sent(&leader.rpc); PRE(sm_state(&leader.sm) == LS_REQ_SIG_LOOP); ut_leader_message_received(&leader, ut_sign_result()); @@ -298,10 +318,12 @@ TEST(snapshot_leader, basic, set_up, tear_down, 0, NULL) { ut_disk_io_done(&leader.work); PRE(sm_state(&leader.sm) == LS_PAGE_READ); + munit_assert_int(ut_get_msg_sent().type, ==, RAFT_IO_INSTALL_SNAPSHOT_CP); ut_rpc_sent(&leader.rpc); ut_leader_message_received(&leader, ut_page_result()); PRE(sm_state(&leader.sm) == LS_SNAP_DONE); + munit_assert_int(ut_get_msg_sent().type, ==, RAFT_IO_INSTALL_SNAPSHOT); ut_rpc_sent(&leader.rpc); ut_leader_message_received(&leader, ut_install_snapshot_result()); @@ -332,7 +354,7 @@ TEST(snapshot_leader, timeouts, set_up, tear_down, 0, NULL) { NULL, leader_sm_conf, "leader", LS_F_ONLINE); PRE(sm_state(&leader.sm) == LS_F_ONLINE); - ut_leader_message_received(&leader, append_entries()); + ut_leader_message_received(&leader, append_entries_result()); PRE(sm_state(&leader.sm) == LS_HT_WAIT); ut_disk_io(&leader.work); @@ -340,28 +362,34 @@ TEST(snapshot_leader, timeouts, set_up, tear_down, 0, NULL) { PRE(sm_state(&leader.sm) == LS_F_NEEDS_SNAP); ut_rpc_sent(&leader.rpc); + munit_assert_int(ut_get_msg_sent().type, ==, RAFT_IO_INSTALL_SNAPSHOT); ut_rpc_to_expired(&leader.rpc); PRE(sm_state(&leader.sm) == LS_F_NEEDS_SNAP); ut_rpc_sent(&leader.rpc); + munit_assert_int(ut_get_msg_sent().type, ==, RAFT_IO_INSTALL_SNAPSHOT); ut_leader_message_received(&leader, ut_install_snapshot_result()); PRE(sm_state(&leader.sm) == LS_CHECK_F_HAS_SIGS); ut_rpc_sent(&leader.rpc); + munit_assert_int(ut_get_msg_sent().type, ==, RAFT_IO_SIGNATURE); ut_leader_message_received(&leader, ut_sign_result()); ut_to_expired(&leader); PRE(sm_state(&leader.sm) == LS_CHECK_F_HAS_SIGS); ut_rpc_sent(&leader.rpc); + munit_assert_int(ut_get_msg_sent().type, ==, RAFT_IO_SIGNATURE); ut_rpc_to_expired(&leader.rpc); PRE(sm_state(&leader.sm) == LS_CHECK_F_HAS_SIGS); leader.sigs_calculated = true; ut_rpc_sent(&leader.rpc); + munit_assert_int(ut_get_msg_sent().type, ==, RAFT_IO_SIGNATURE); ut_leader_message_received(&leader, ut_sign_result()); PRE(sm_state(&leader.sm) == LS_REQ_SIG_LOOP); ut_rpc_sent(&leader.rpc); + munit_assert_int(ut_get_msg_sent().type, ==, RAFT_IO_SIGNATURE); PRE(sm_state(&leader.sm) == LS_REQ_SIG_LOOP); ut_leader_message_received(&leader, ut_sign_result()); ut_disk_io(&leader.work); @@ -371,14 +399,17 @@ TEST(snapshot_leader, timeouts, set_up, tear_down, 0, NULL) { PRE(sm_state(&leader.sm) == LS_PAGE_READ); ut_rpc_sent(&leader.rpc); + munit_assert_int(ut_get_msg_sent().type, ==, RAFT_IO_INSTALL_SNAPSHOT_CP); ut_rpc_to_expired(&leader.rpc); PRE(sm_state(&leader.sm) == LS_PAGE_READ); ut_rpc_sent(&leader.rpc); + munit_assert_int(ut_get_msg_sent().type, ==, RAFT_IO_INSTALL_SNAPSHOT_CP); ut_leader_message_received(&leader, ut_page_result()); PRE(sm_state(&leader.sm) == LS_SNAP_DONE); ut_rpc_sent(&leader.rpc); + munit_assert_int(ut_get_msg_sent().type, ==, RAFT_IO_INSTALL_SNAPSHOT); ut_leader_message_received(&leader, ut_install_snapshot_result()); sm_fini(&leader.sm); @@ -386,15 +417,24 @@ TEST(snapshot_leader, timeouts, set_up, tear_down, 0, NULL) { } struct test_fixture { + pool_t pool; + union { struct leader leader; struct follower follower; }; - /* true when union contains leader, false when it contains follower */ + /* true when union contains leader, false when it contains follower. */ bool is_leader; - pool_t pool; - work_op orig_cb; + /* We only expect one message to be in-flight. */ + struct raft_message last_msg_sent; + /* Message was sent and has not been consumed, see uv_get_msg_sent(). */ + bool msg_valid; + + /* TODO: should accomodate several background jobs in the future. Probably + * by scheduling a barrier in the pool after all the works that toggles this + * flag. */ + work_op orig_work_cb; bool work_done; }; @@ -405,23 +445,69 @@ static struct test_fixture global_fixture; static __thread int thread_identifier; +static void *pool_set_up(MUNIT_UNUSED const MunitParameter params[], + MUNIT_UNUSED void *user_data) +{ + /* Prevent hangs. */ + alarm(2); + + global_fixture = (struct test_fixture) { 0 }; + pool_init(&global_fixture.pool, uv_default_loop(), 4, POOL_QOS_PRIO_FAIR); + global_fixture.pool.flags |= POOL_FOR_UT; + thread_identifier = MAGIC_MAIN_THREAD; + + struct fixture *f = munit_malloc(sizeof *f); + return f; +} + +static void pool_tear_down(void *data) +{ + pool_close(&global_fixture.pool); + pool_fini(&global_fixture.pool); + free(data); +} + static void progress(void) { for (unsigned i = 0; i < 20; i++) { uv_run(uv_default_loop(), UV_RUN_NOWAIT); } } +static bool pool_is_main_thread_op(void) { + return thread_identifier == MAGIC_MAIN_THREAD; +} + +/* Advances libuv in the main thread until the in-flight background work is + * finished. + * + * This function is designed with the constaint that there can only be one + * request in-flight. It will hang until the work is finished. */ static void wait_work(void) { + PRE(pool_is_main_thread_op()); + while (!global_fixture.work_done) { uv_run(uv_default_loop(), UV_RUN_NOWAIT); } } +/* Advances libuv in the main thread until the in-flight message that was queued + * is sent. + * + * This function is designed with the constaint that there can only be one + * message in-flight. It will hang until the message is sent. */ +static void wait_msg_sent(void) { + PRE(pool_is_main_thread_op()); + + while (!global_fixture.msg_valid) { + uv_run(uv_default_loop(), UV_RUN_NOWAIT); + } +} + /* Decorates the callback used when the pool work is done to set the test * fixture flag to true, then calls the original callback.*/ static void test_fixture_work_cb(pool_work_t *w) { global_fixture.work_done = true; - global_fixture.orig_cb(w); + global_fixture.orig_work_cb(w); } static void pool_to_start_op(struct timeout *to, unsigned delay, to_cb_op cb) @@ -443,9 +529,9 @@ static void pool_to_init_op(struct timeout *to) static void pool_work_queue_op(struct work *w, work_op work_cb, work_op after_cb) { w->pool_work = (pool_work_t) { 0 }; - global_fixture.orig_cb = after_cb; + global_fixture.orig_work_cb = after_cb; global_fixture.work_done = false; - pool_queue_work(&global_fixture.pool, &w->pool_work, 0/* */, WT_UNORD, work_cb, test_fixture_work_cb); + pool_queue_work(&global_fixture.pool, &w->pool_work, 0, WT_UNORD, work_cb, test_fixture_work_cb); } static void pool_to_expired(struct leader *leader) @@ -460,10 +546,6 @@ static void pool_rpc_to_expired(struct rpc *rpc) progress(); } -static bool pool_is_main_thread_op(void) { - return thread_identifier == MAGIC_MAIN_THREAD; -} - static void pool_ht_create_op(pool_work_t *w) { if (global_fixture.is_leader) { @@ -498,22 +580,62 @@ static void pool_read_sig_op(pool_work_t *w) PRE(!follower->ops->is_main_thread()); } -TEST(snapshot_leader, pool_timeouts, set_up, tear_down, 0, NULL) { +struct uv_sender_send_data { + struct sender *s; + sender_cb_op cb; +}; + +static void uv_sender_send_cb(uv_work_t *req) { + (void)req; +} + +static void uv_sender_send_after_cb(uv_work_t *req, int status) { + global_fixture.msg_valid = true; + struct uv_sender_send_data *data = req->data; + data->cb(data->s, status); +} + +static int uv_sender_send_op(struct sender *s, + struct raft_message *payload, + sender_cb_op cb) { + /* We only expect one message to be in-flight. */ + static uv_work_t req; + static struct uv_sender_send_data req_data; + + global_fixture.last_msg_sent = *payload; + /* Flag is only toggled when the after_cb is called, emulating the message + * being sent. */ + global_fixture.msg_valid = false; + s->cb = cb; + req_data = (struct uv_sender_send_data) { + .s = s, + .cb = cb, + }; + req = (uv_work_t) { + .data = &req_data, + }; + uv_queue_work(uv_default_loop(), &req, uv_sender_send_cb, uv_sender_send_after_cb); + return 0; +} + +struct raft_message uv_get_msg_sent(void) { + munit_assert(global_fixture.msg_valid); + global_fixture.msg_valid = false; + return global_fixture.last_msg_sent; +} + +TEST(snapshot_leader, pool_timeouts, pool_set_up, pool_tear_down, 0, NULL) { struct leader_ops ops = { .to_init = pool_to_init_op, .to_stop = pool_to_stop_op, .to_start = pool_to_start_op, .ht_create = pool_ht_create_op, .work_queue = pool_work_queue_op, - .sender_send = ut_sender_send_op, + .sender_send = uv_sender_send_op, .is_main_thread = pool_is_main_thread_op, }; - pool_init(&global_fixture.pool, uv_default_loop(), 4, POOL_QOS_PRIO_FAIR); - global_fixture.pool.flags |= POOL_FOR_UT; global_fixture.is_leader = true; - thread_identifier = MAGIC_MAIN_THREAD; - struct leader *leader = &global_fixture.leader; *leader = (struct leader) { .ops = &ops, @@ -527,34 +649,40 @@ TEST(snapshot_leader, pool_timeouts, set_up, tear_down, 0, NULL) { NULL, leader_sm_conf, "leader", LS_F_ONLINE); PRE(sm_state(&leader->sm) == LS_F_ONLINE); - ut_leader_message_received(leader, append_entries()); + ut_leader_message_received(leader, append_entries_result()); wait_work(); PRE(sm_state(&leader->sm) == LS_F_NEEDS_SNAP); - ut_rpc_sent(&leader->rpc); + wait_msg_sent(); + munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_INSTALL_SNAPSHOT); pool_rpc_to_expired(&leader->rpc); PRE(sm_state(&leader->sm) == LS_F_NEEDS_SNAP); - ut_rpc_sent(&leader->rpc); + wait_msg_sent(); + munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_INSTALL_SNAPSHOT); ut_leader_message_received(leader, ut_install_snapshot_result()); PRE(sm_state(&leader->sm) == LS_CHECK_F_HAS_SIGS); - ut_rpc_sent(&leader->rpc); + wait_msg_sent(); + munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_SIGNATURE); ut_leader_message_received(leader, ut_sign_result()); pool_to_expired(leader); PRE(sm_state(&leader->sm) == LS_CHECK_F_HAS_SIGS); - ut_rpc_sent(&leader->rpc); + wait_msg_sent(); + munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_SIGNATURE); pool_rpc_to_expired(&leader->rpc); PRE(sm_state(&leader->sm) == LS_CHECK_F_HAS_SIGS); leader->sigs_calculated = true; - ut_rpc_sent(&leader->rpc); + wait_msg_sent(); + munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_SIGNATURE); ut_leader_message_received(leader, ut_sign_result()); PRE(sm_state(&leader->sm) == LS_REQ_SIG_LOOP); - ut_rpc_sent(&leader->rpc); + wait_msg_sent(); + munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_SIGNATURE); PRE(sm_state(&leader->sm) == LS_REQ_SIG_LOOP); ut_leader_message_received(leader, ut_sign_result()); @@ -562,37 +690,36 @@ TEST(snapshot_leader, pool_timeouts, set_up, tear_down, 0, NULL) { wait_work(); PRE(sm_state(&leader->sm) == LS_PAGE_READ); - ut_rpc_sent(&leader->rpc); + wait_msg_sent(); + munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_INSTALL_SNAPSHOT_CP); pool_rpc_to_expired(&leader->rpc); PRE(sm_state(&leader->sm) == LS_PAGE_READ); - ut_rpc_sent(&leader->rpc); + wait_msg_sent(); + munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_INSTALL_SNAPSHOT_CP); ut_leader_message_received(leader, ut_page_result()); PRE(sm_state(&leader->sm) == LS_SNAP_DONE); - ut_rpc_sent(&leader->rpc); + wait_msg_sent(); + munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_INSTALL_SNAPSHOT); ut_leader_message_received(leader, ut_install_snapshot_result()); sm_fini(&leader->sm); return MUNIT_OK; } -TEST(snapshot_follower, pool, set_up, tear_down, 0, NULL) { +TEST(snapshot_follower, pool, pool_set_up, pool_tear_down, 0, NULL) { struct follower_ops ops = { .ht_create = pool_ht_create_op, .work_queue = pool_work_queue_op, - .sender_send = ut_sender_send_op, + .sender_send = uv_sender_send_op, .read_sig = pool_read_sig_op, .write_chunk = pool_write_chunk_op, .fill_ht = pool_fill_ht_op, .is_main_thread = pool_is_main_thread_op, }; - pool_init(&global_fixture.pool, uv_default_loop(), 4, POOL_QOS_PRIO_FAIR); - global_fixture.pool.flags |= POOL_FOR_UT; global_fixture.is_leader = false; - thread_identifier = MAGIC_MAIN_THREAD; - struct follower *follower = &global_fixture.follower; *follower = (struct follower) { @@ -604,13 +731,15 @@ TEST(snapshot_follower, pool, set_up, tear_down, 0, NULL) { PRE(sm_state(&follower->sm) == FS_NORMAL); ut_follower_message_received(follower, ut_install_snapshot()); - ut_rpc_sent(&follower->rpc); + wait_msg_sent(); + munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_INSTALL_SNAPSHOT_RESULT); wait_work(); PRE(sm_state(&follower->sm) == FS_SIGS_CALC_LOOP); ut_follower_message_received(follower, ut_sign()); - ut_rpc_sent(&follower->rpc); + wait_msg_sent(); + munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_SIGNATURE_RESULT); PRE(sm_state(&follower->sm) == FS_SIGS_CALC_LOOP); @@ -619,7 +748,8 @@ TEST(snapshot_follower, pool, set_up, tear_down, 0, NULL) { PRE(sm_state(&follower->sm) == FS_SIGS_CALC_LOOP); ut_follower_message_received(follower, ut_sign()); - ut_rpc_sent(&follower->rpc); + wait_msg_sent(); + munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_SIGNATURE_RESULT); PRE(sm_state(&follower->sm) == FS_SIG_RECEIVING); ut_follower_message_received(follower, ut_sign()); @@ -629,7 +759,8 @@ TEST(snapshot_follower, pool, set_up, tear_down, 0, NULL) { wait_work(); PRE(sm_state(&follower->sm) == FS_SIG_READ); - ut_rpc_sent(&follower->rpc); + wait_msg_sent(); + munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_SIGNATURE_RESULT); PRE(sm_state(&follower->sm) == FS_CHUNCK_RECEIVING); ut_follower_message_received(follower, ut_page()); @@ -637,11 +768,13 @@ TEST(snapshot_follower, pool, set_up, tear_down, 0, NULL) { wait_work(); PRE(sm_state(&follower->sm) == FS_CHUNCK_APPLIED); - ut_rpc_sent(&follower->rpc); + wait_msg_sent(); + munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_INSTALL_SNAPSHOT_CP_RESULT); PRE(sm_state(&follower->sm) == FS_SNAP_DONE); ut_follower_message_received(follower, ut_install_snapshot()); - ut_rpc_sent(&follower->rpc); + wait_msg_sent(); + munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_INSTALL_SNAPSHOT_RESULT); sm_fini(&follower->sm); return MUNIT_OK; From defc3f3f5aa4aed4565ca16633f02ef466668c5a Mon Sep 17 00:00:00 2001 From: Alberto Carretero Date: Tue, 23 Jul 2024 09:56:38 +0200 Subject: [PATCH 4/4] feat: pool thread identification --- src/lib/threadpool.c | 30 +++++++++++++++++++++++- src/lib/threadpool.h | 2 ++ src/raft/recv_install_snapshot.c | 16 ++++++------- src/raft/recv_install_snapshot.h | 4 ++-- test/raft/unit/test_snapshot.c | 39 ++++++++++++-------------------- 5 files changed, 56 insertions(+), 35 deletions(-) diff --git a/src/lib/threadpool.c b/src/lib/threadpool.c index c82e4e95c..f87577184 100644 --- a/src/lib/threadpool.c +++ b/src/lib/threadpool.c @@ -1,13 +1,15 @@ #include "threadpool.h" #include #include +#include #include #include +#include #include +#include #include "../../src/lib/queue.h" #include "../../src/lib/sm.h" #include "../../src/utils.h" -#include "../tracing.h" /** * Planner thread state machine. @@ -74,6 +76,9 @@ static const struct sm_conf planner_states[PS_NR] = { }, }; +static const uint64_t pool_thread_magic = 0xf344e2; +static uv_key_t thread_identifier_key; + enum { THREADPOOL_SIZE_MAX = 1024, }; @@ -125,6 +130,14 @@ struct pool_impl { }; /* clang-format on */ +/* Callback does not allow passing data, we use a static variable to report + * errors back. */ +static int thread_key_create_err = 0; +static void thread_key_create(void) { + PRE(thread_key_create_err == 0); + thread_key_create_err = uv_key_create(&thread_identifier_key); +} + static inline bool pool_is_inited(const pool_t *pool) { return pool->pi != NULL; @@ -325,6 +338,7 @@ static void worker(void *arg) pool_work_t *w; queue *q; + uv_key_set(&thread_identifier_key, (void*)pool_thread_magic); uv_sem_post(ta->sem); uv_mutex_lock(mutex); for (;;) { @@ -552,7 +566,17 @@ int pool_init(pool_t *pool, return rc; } + static uv_once_t once = UV_ONCE_INIT; + uv_once(&once, thread_key_create); + if (thread_key_create_err != 0) { + uv_close((uv_handle_t *)&pi->outq_async, NULL); + uv_mutex_destroy(&pi->outq_mutex); + free(pi); + return thread_key_create_err; + } + pool_threads_init(pool); + return 0; } @@ -581,6 +605,10 @@ void pool_close(pool_t *pool) uv_mutex_unlock(&pi->mutex); } +bool pool_is_pool_thread(void) { + return uv_key_get(&thread_identifier_key) == (void*)pool_thread_magic; +} + pool_t *pool_ut_fallback(void) { static pool_t pool; diff --git a/src/lib/threadpool.h b/src/lib/threadpool.h index 8858e9aac..7503a2ddd 100644 --- a/src/lib/threadpool.h +++ b/src/lib/threadpool.h @@ -2,6 +2,7 @@ #define __THREAD_POOL__ #include +#include #include "queue.h" /** @@ -111,6 +112,7 @@ void pool_queue_work(pool_t *pool, enum pool_work_type type, void (*work_cb)(pool_work_t *w), void (*after_work_cb)(pool_work_t *w)); +bool pool_is_pool_thread(void); pool_t *pool_ut_fallback(void); diff --git a/src/raft/recv_install_snapshot.c b/src/raft/recv_install_snapshot.c index e6763f84d..d372f2d5b 100644 --- a/src/raft/recv_install_snapshot.c +++ b/src/raft/recv_install_snapshot.c @@ -426,7 +426,7 @@ static void leader_work_done(pool_work_t *w) struct work *work = CONTAINER_OF(w, struct work, pool_work); struct leader *leader = CONTAINER_OF(work, struct leader, work); - PRE(leader->ops->is_main_thread()); + PRE(!leader->ops->is_pool_thread()); sm_move(&work->sm, WORK_DONE); leader_tick(leader, M_WORK_DONE); } @@ -436,7 +436,7 @@ static void follower_work_done(pool_work_t *w) struct work *work = CONTAINER_OF(w, struct work, pool_work); struct follower *follower = CONTAINER_OF(work, struct follower, work); - PRE(follower->ops->is_main_thread()); + PRE(!follower->ops->is_pool_thread()); sm_move(&work->sm, WORK_DONE); follower_tick(follower, M_WORK_DONE); } @@ -447,7 +447,7 @@ static void rpc_to_cb(uv_timer_t *handle) struct rpc *rpc = CONTAINER_OF(to, struct rpc, timeout); struct leader *leader = CONTAINER_OF(rpc, struct leader, rpc); - PRE(leader->ops->is_main_thread()); + PRE(!leader->ops->is_pool_thread()); if (sm_state(&to->sm) == TO_CANCELED) { return; } @@ -461,7 +461,7 @@ static void leader_to_cb(uv_timer_t *handle) struct timeout *to = CONTAINER_OF(handle, struct timeout, handle); struct leader *leader = CONTAINER_OF(to, struct leader, timeout); - PRE(leader->ops->is_main_thread()); + PRE(!leader->ops->is_pool_thread()); if (sm_state(&to->sm) == TO_CANCELED) { return; } @@ -492,7 +492,7 @@ static void leader_sent_cb(struct sender *s, int rc) struct rpc *rpc = CONTAINER_OF(s, struct rpc, sender); struct leader *leader = CONTAINER_OF(rpc, struct leader, rpc); - PRE(leader->ops->is_main_thread()); + PRE(!leader->ops->is_pool_thread()); if (UNLIKELY(rc != 0)) { sm_move(&rpc->sm, RPC_ERROR); return; @@ -505,7 +505,7 @@ static void follower_sent_cb(struct sender *s, int rc) struct rpc *rpc = CONTAINER_OF(s, struct rpc, sender); struct follower *follower = CONTAINER_OF(rpc, struct follower, rpc); - PRE(follower->ops->is_main_thread()); + PRE(!follower->ops->is_pool_thread()); if (UNLIKELY(rc != 0)) { sm_move(&rpc->sm, RPC_ERROR); return; @@ -864,7 +864,7 @@ __attribute__((unused)) void leader_tick(struct leader *leader, const struct raf struct sm *sm = &leader->sm; const struct leader_ops *ops = leader->ops; - PRE(ops->is_main_thread()); + PRE(!ops->is_pool_thread()); if (!is_a_trigger_leader(leader, incoming) || is_a_duplicate(leader, incoming)) @@ -941,7 +941,7 @@ __attribute__((unused)) void follower_tick(struct follower *follower, const stru is_a_duplicate(follower, incoming)) return; - PRE(ops->is_main_thread()); + PRE(!ops->is_pool_thread()); again: switch (sm_state(&follower->sm)) { diff --git a/src/raft/recv_install_snapshot.h b/src/raft/recv_install_snapshot.h index 614b7aec7..901b36258 100644 --- a/src/raft/recv_install_snapshot.h +++ b/src/raft/recv_install_snapshot.h @@ -58,7 +58,7 @@ struct leader_ops { void (*work_queue)(struct work *w, work_op work, work_op after_cb); - bool (*is_main_thread)(void); + bool (*is_pool_thread)(void); }; struct follower_ops { @@ -70,7 +70,7 @@ struct follower_ops { sender_send_op sender_send; void (*work_queue)(struct work *w, work_op work, work_op after_cb); - bool (*is_main_thread)(void); + bool (*is_pool_thread)(void); }; struct leader { diff --git a/test/raft/unit/test_snapshot.c b/test/raft/unit/test_snapshot.c index 368719c94..5b5a13981 100644 --- a/test/raft/unit/test_snapshot.c +++ b/test/raft/unit/test_snapshot.c @@ -188,8 +188,8 @@ int ut_sender_send_op(struct sender *s, return 0; } -static bool ut_is_main_thread_op(void) { - return true; +static bool ut_is_pool_thread_op(void) { + return false; } TEST(snapshot_follower, basic, set_up, tear_down, 0, NULL) { @@ -200,7 +200,7 @@ TEST(snapshot_follower, basic, set_up, tear_down, 0, NULL) { .read_sig = ut_read_sig_op, .write_chunk = ut_write_chunk_op, .fill_ht = ut_fill_ht_op, - .is_main_thread = ut_is_main_thread_op, + .is_pool_thread = ut_is_pool_thread_op, }; struct follower follower = { @@ -271,7 +271,7 @@ TEST(snapshot_leader, basic, set_up, tear_down, 0, NULL) { .ht_create = ut_ht_create_op, .work_queue = ut_work_queue_op, .sender_send = ut_sender_send_op, - .is_main_thread = ut_is_main_thread_op, + .is_pool_thread = ut_is_pool_thread_op, }; struct leader leader = { @@ -339,7 +339,7 @@ TEST(snapshot_leader, timeouts, set_up, tear_down, 0, NULL) { .ht_create = ut_ht_create_op, .work_queue = ut_work_queue_op, .sender_send = ut_sender_send_op, - .is_main_thread = ut_is_main_thread_op, + .is_pool_thread = ut_is_pool_thread_op, }; struct leader leader = { @@ -441,10 +441,6 @@ struct test_fixture { /* Not problematic because each test runs in a different process. */ static struct test_fixture global_fixture; -#define MAGIC_MAIN_THREAD 0xdef1 - -static __thread int thread_identifier; - static void *pool_set_up(MUNIT_UNUSED const MunitParameter params[], MUNIT_UNUSED void *user_data) { @@ -454,7 +450,6 @@ static void *pool_set_up(MUNIT_UNUSED const MunitParameter params[], global_fixture = (struct test_fixture) { 0 }; pool_init(&global_fixture.pool, uv_default_loop(), 4, POOL_QOS_PRIO_FAIR); global_fixture.pool.flags |= POOL_FOR_UT; - thread_identifier = MAGIC_MAIN_THREAD; struct fixture *f = munit_malloc(sizeof *f); return f; @@ -473,17 +468,13 @@ static void progress(void) { } } -static bool pool_is_main_thread_op(void) { - return thread_identifier == MAGIC_MAIN_THREAD; -} - /* Advances libuv in the main thread until the in-flight background work is * finished. * * This function is designed with the constaint that there can only be one * request in-flight. It will hang until the work is finished. */ static void wait_work(void) { - PRE(pool_is_main_thread_op()); + PRE(!pool_is_pool_thread()); while (!global_fixture.work_done) { uv_run(uv_default_loop(), UV_RUN_NOWAIT); @@ -496,7 +487,7 @@ static void wait_work(void) { * This function is designed with the constaint that there can only be one * message in-flight. It will hang until the message is sent. */ static void wait_msg_sent(void) { - PRE(pool_is_main_thread_op()); + PRE(!pool_is_pool_thread()); while (!global_fixture.msg_valid) { uv_run(uv_default_loop(), UV_RUN_NOWAIT); @@ -549,9 +540,9 @@ static void pool_rpc_to_expired(struct rpc *rpc) static void pool_ht_create_op(pool_work_t *w) { if (global_fixture.is_leader) { - PRE(!global_fixture.leader.ops->is_main_thread()); + PRE(global_fixture.leader.ops->is_pool_thread()); } else { - PRE(!global_fixture.follower.ops->is_main_thread()); + PRE(global_fixture.follower.ops->is_pool_thread()); } (void)w; } @@ -559,9 +550,9 @@ static void pool_ht_create_op(pool_work_t *w) static void pool_fill_ht_op(pool_work_t *w) { if (global_fixture.is_leader) { - PRE(!global_fixture.leader.ops->is_main_thread()); + PRE(global_fixture.leader.ops->is_pool_thread()); } else { - PRE(!global_fixture.follower.ops->is_main_thread()); + PRE(global_fixture.follower.ops->is_pool_thread()); } (void)w; } @@ -570,14 +561,14 @@ static void pool_write_chunk_op(pool_work_t *w) { struct work *work = CONTAINER_OF(w, struct work, pool_work); struct follower *follower = CONTAINER_OF(work, struct follower, work); - PRE(!follower->ops->is_main_thread()); + PRE(follower->ops->is_pool_thread()); } static void pool_read_sig_op(pool_work_t *w) { struct work *work = CONTAINER_OF(w, struct work, pool_work); struct follower *follower = CONTAINER_OF(work, struct follower, work); - PRE(!follower->ops->is_main_thread()); + PRE(follower->ops->is_pool_thread()); } struct uv_sender_send_data { @@ -632,7 +623,7 @@ TEST(snapshot_leader, pool_timeouts, pool_set_up, pool_tear_down, 0, NULL) { .ht_create = pool_ht_create_op, .work_queue = pool_work_queue_op, .sender_send = uv_sender_send_op, - .is_main_thread = pool_is_main_thread_op, + .is_pool_thread = pool_is_pool_thread, }; global_fixture.is_leader = true; @@ -716,7 +707,7 @@ TEST(snapshot_follower, pool, pool_set_up, pool_tear_down, 0, NULL) { .read_sig = pool_read_sig_op, .write_chunk = pool_write_chunk_op, .fill_ht = pool_fill_ht_op, - .is_main_thread = pool_is_main_thread_op, + .is_pool_thread = pool_is_pool_thread, }; global_fixture.is_leader = false;