From a1ab2731f0154cd061ccb6e2e8049b42511c41af Mon Sep 17 00:00:00 2001 From: Alberto Carretero Date: Wed, 17 Jul 2024 10:59:50 +0200 Subject: [PATCH] feat(snapshot): rpc pool integration, first version --- src/raft/recv_install_snapshot.h | 1 + test/raft/unit/test_snapshot.c | 146 ++++++++++++++++++++++++------- 2 files changed, 117 insertions(+), 30 deletions(-) diff --git a/src/raft/recv_install_snapshot.h b/src/raft/recv_install_snapshot.h index 63f881e00..9c1c57567 100644 --- a/src/raft/recv_install_snapshot.h +++ b/src/raft/recv_install_snapshot.h @@ -25,6 +25,7 @@ struct work { }; struct sender { + // TODO embbed the uv req here. sender_cb_op cb; }; diff --git a/test/raft/unit/test_snapshot.c b/test/raft/unit/test_snapshot.c index 47c681110..4213c037b 100644 --- a/test/raft/unit/test_snapshot.c +++ b/test/raft/unit/test_snapshot.c @@ -423,9 +423,17 @@ struct test_fixture { }; /* true when union contains leader, false when it contains follower */ bool is_leader; + + /* We only expect one message to be in-flight. */ + struct raft_message last_msg_sent; + bool msg_valid; + pool_t pool; - work_op orig_cb; + /* TODO: should accomodate several background jobs in the future. Probably + * by scheduling a barrier in the pool after all the works that toggles this + * flag. */ + work_op orig_work_cb; bool work_done; }; @@ -436,6 +444,25 @@ static struct test_fixture global_fixture; static __thread int thread_identifier; +static void *pool_set_up(MUNIT_UNUSED const MunitParameter params[], + MUNIT_UNUSED void *user_data) +{ + /* Prevent hangs. */ + alarm(2); + + pool_init(&global_fixture.pool, uv_default_loop(), 4, POOL_QOS_PRIO_FAIR); + global_fixture.pool.flags |= POOL_FOR_UT; + thread_identifier = MAGIC_MAIN_THREAD; + + struct fixture *f = munit_malloc(sizeof *f); + return f; +} + +static void pool_tear_down(void *data) +{ + free(data); +} + static void progress(void) { for (unsigned i = 0; i < 20; i++) { uv_run(uv_default_loop(), UV_RUN_NOWAIT); @@ -448,11 +475,17 @@ static void wait_work(void) { } } +static void wait_msg_sent(void) { + while (!global_fixture.msg_valid) { + uv_run(uv_default_loop(), UV_RUN_NOWAIT); + } +} + /* Decorates the callback used when the pool work is done to set the test * fixture flag to true, then calls the original callback.*/ static void test_fixture_work_cb(pool_work_t *w) { global_fixture.work_done = true; - global_fixture.orig_cb(w); + global_fixture.orig_work_cb(w); } static void pool_to_start_op(struct timeout *to, unsigned delay, to_cb_op cb) @@ -474,7 +507,7 @@ static void pool_to_init_op(struct timeout *to) static void pool_work_queue_op(struct work *w, work_op work_cb, work_op after_cb) { w->pool_work = (pool_work_t) { 0 }; - global_fixture.orig_cb = after_cb; + global_fixture.orig_work_cb = after_cb; global_fixture.work_done = false; pool_queue_work(&global_fixture.pool, &w->pool_work, 0/* */, WT_UNORD, work_cb, test_fixture_work_cb); } @@ -529,22 +562,64 @@ static void pool_read_sig_op(pool_work_t *w) PRE(!follower->ops->is_main_thread()); } -TEST(snapshot_leader, pool_timeouts, set_up, tear_down, 0, NULL) { +struct uv_sender_send_data { + struct sender *s; + sender_cb_op cb; +}; + +void uv_sender_send_cb(uv_work_t *req) { + (void)req; +} + +void uv_sender_send_after_cb(uv_work_t *req, int status) { + (void)req; + (void)status; + global_fixture.msg_valid = true; + struct uv_sender_send_data *data = req->data; + data->cb(data->s, 0); +} + +int uv_sender_send_op(struct sender *s, + struct raft_message *payload, + sender_cb_op cb) { + /* We only expect one message to be in-flight. */ + static uv_work_t req; + static struct uv_sender_send_data req_data; + + global_fixture.last_msg_sent = *payload; + /* Flag is only toggled when the after_cb is called, emulating the message + * being sent. */ + global_fixture.msg_valid = false; + s->cb = cb; + req_data = (struct uv_sender_send_data) { + .s = s, + .cb = cb, + }; + req = (uv_work_t) { + .data = &req_data, + }; + uv_queue_work(uv_default_loop(), &req, uv_sender_send_cb, uv_sender_send_after_cb); + return 0; +} + +struct raft_message uv_get_msg_sent(void) { + munit_assert(global_fixture.msg_valid); + global_fixture.msg_valid = false; + return global_fixture.last_msg_sent; +} + +TEST(snapshot_leader, pool_timeouts, pool_set_up, pool_tear_down, 0, NULL) { struct leader_ops ops = { .to_init = pool_to_init_op, .to_stop = pool_to_stop_op, .to_start = pool_to_start_op, .ht_create = pool_ht_create_op, .work_queue = pool_work_queue_op, - .sender_send = ut_sender_send_op, + .sender_send = uv_sender_send_op, .is_main_thread = pool_is_main_thread_op, }; - pool_init(&global_fixture.pool, uv_default_loop(), 4, POOL_QOS_PRIO_FAIR); - global_fixture.pool.flags |= POOL_FOR_UT; global_fixture.is_leader = true; - thread_identifier = MAGIC_MAIN_THREAD; - struct leader *leader = &global_fixture.leader; *leader = (struct leader) { .ops = &ops, @@ -563,29 +638,35 @@ TEST(snapshot_leader, pool_timeouts, set_up, tear_down, 0, NULL) { wait_work(); PRE(sm_state(&leader->sm) == LS_F_NEEDS_SNAP); - ut_rpc_sent(&leader->rpc); + wait_msg_sent(); + munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_INSTALL_SNAPSHOT); pool_rpc_to_expired(&leader->rpc); PRE(sm_state(&leader->sm) == LS_F_NEEDS_SNAP); - ut_rpc_sent(&leader->rpc); + wait_msg_sent(); + munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_INSTALL_SNAPSHOT); ut_leader_message_received(leader, ut_install_snapshot_result()); PRE(sm_state(&leader->sm) == LS_CHECK_F_HAS_SIGS); - ut_rpc_sent(&leader->rpc); + wait_msg_sent(); + munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_SIGNATURE); ut_leader_message_received(leader, ut_sign_result()); pool_to_expired(leader); PRE(sm_state(&leader->sm) == LS_CHECK_F_HAS_SIGS); - ut_rpc_sent(&leader->rpc); + wait_msg_sent(); + munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_SIGNATURE); pool_rpc_to_expired(&leader->rpc); PRE(sm_state(&leader->sm) == LS_CHECK_F_HAS_SIGS); leader->sigs_calculated = true; - ut_rpc_sent(&leader->rpc); + wait_msg_sent(); + munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_SIGNATURE); ut_leader_message_received(leader, ut_sign_result()); PRE(sm_state(&leader->sm) == LS_REQ_SIG_LOOP); - ut_rpc_sent(&leader->rpc); + wait_msg_sent(); + munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_SIGNATURE); PRE(sm_state(&leader->sm) == LS_REQ_SIG_LOOP); ut_leader_message_received(leader, ut_sign_result()); @@ -593,37 +674,36 @@ TEST(snapshot_leader, pool_timeouts, set_up, tear_down, 0, NULL) { wait_work(); PRE(sm_state(&leader->sm) == LS_PAGE_READ); - ut_rpc_sent(&leader->rpc); + wait_msg_sent(); + munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_INSTALL_SNAPSHOT_CP); pool_rpc_to_expired(&leader->rpc); PRE(sm_state(&leader->sm) == LS_PAGE_READ); - ut_rpc_sent(&leader->rpc); + wait_msg_sent(); + munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_INSTALL_SNAPSHOT_CP); ut_leader_message_received(leader, ut_page_result()); PRE(sm_state(&leader->sm) == LS_SNAP_DONE); - ut_rpc_sent(&leader->rpc); + wait_msg_sent(); + munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_INSTALL_SNAPSHOT); ut_leader_message_received(leader, ut_install_snapshot_result()); sm_fini(&leader->sm); return MUNIT_OK; } -TEST(snapshot_follower, pool, set_up, tear_down, 0, NULL) { +TEST(snapshot_follower, pool, pool_set_up, pool_tear_down, 0, NULL) { struct follower_ops ops = { .ht_create = pool_ht_create_op, .work_queue = pool_work_queue_op, - .sender_send = ut_sender_send_op, + .sender_send = uv_sender_send_op, .read_sig = pool_read_sig_op, .write_chunk = pool_write_chunk_op, .fill_ht = pool_fill_ht_op, .is_main_thread = pool_is_main_thread_op, }; - pool_init(&global_fixture.pool, uv_default_loop(), 4, POOL_QOS_PRIO_FAIR); - global_fixture.pool.flags |= POOL_FOR_UT; global_fixture.is_leader = false; - thread_identifier = MAGIC_MAIN_THREAD; - struct follower *follower = &global_fixture.follower; *follower = (struct follower) { @@ -635,13 +715,15 @@ TEST(snapshot_follower, pool, set_up, tear_down, 0, NULL) { PRE(sm_state(&follower->sm) == FS_NORMAL); ut_follower_message_received(follower, ut_install_snapshot()); - ut_rpc_sent(&follower->rpc); + wait_msg_sent(); + munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_INSTALL_SNAPSHOT_RESULT); wait_work(); PRE(sm_state(&follower->sm) == FS_SIGS_CALC_LOOP); ut_follower_message_received(follower, ut_sign()); - ut_rpc_sent(&follower->rpc); + wait_msg_sent(); + munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_SIGNATURE_RESULT); PRE(sm_state(&follower->sm) == FS_SIGS_CALC_LOOP); @@ -650,7 +732,8 @@ TEST(snapshot_follower, pool, set_up, tear_down, 0, NULL) { PRE(sm_state(&follower->sm) == FS_SIGS_CALC_LOOP); ut_follower_message_received(follower, ut_sign()); - ut_rpc_sent(&follower->rpc); + wait_msg_sent(); + munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_SIGNATURE_RESULT); PRE(sm_state(&follower->sm) == FS_SIG_RECEIVING); ut_follower_message_received(follower, ut_sign()); @@ -660,7 +743,8 @@ TEST(snapshot_follower, pool, set_up, tear_down, 0, NULL) { wait_work(); PRE(sm_state(&follower->sm) == FS_SIG_READ); - ut_rpc_sent(&follower->rpc); + wait_msg_sent(); + munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_SIGNATURE_RESULT); PRE(sm_state(&follower->sm) == FS_CHUNCK_RECEIVING); ut_follower_message_received(follower, ut_page()); @@ -668,11 +752,13 @@ TEST(snapshot_follower, pool, set_up, tear_down, 0, NULL) { wait_work(); PRE(sm_state(&follower->sm) == FS_CHUNCK_APPLIED); - ut_rpc_sent(&follower->rpc); + wait_msg_sent(); + munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_INSTALL_SNAPSHOT_CP_RESULT); PRE(sm_state(&follower->sm) == FS_SNAP_DONE); ut_follower_message_received(follower, ut_install_snapshot()); - ut_rpc_sent(&follower->rpc); + wait_msg_sent(); + munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_INSTALL_SNAPSHOT_RESULT); sm_fini(&follower->sm); return MUNIT_OK;