Skip to content

Commit

Permalink
feat(snapshot): rpc pool integration, first version
Browse files Browse the repository at this point in the history
  • Loading branch information
letFunny committed Jul 17, 2024
1 parent 45f88ca commit a1ab273
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 30 deletions.
1 change: 1 addition & 0 deletions src/raft/recv_install_snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ struct work {
};

struct sender {
// TODO embbed the uv req here.
sender_cb_op cb;
};

Expand Down
146 changes: 116 additions & 30 deletions test/raft/unit/test_snapshot.c
Original file line number Diff line number Diff line change
Expand Up @@ -423,9 +423,17 @@ struct test_fixture {
};
/* true when union contains leader, false when it contains follower */
bool is_leader;

/* We only expect one message to be in-flight. */
struct raft_message last_msg_sent;
bool msg_valid;

pool_t pool;

work_op orig_cb;
/* TODO: should accomodate several background jobs in the future. Probably
* by scheduling a barrier in the pool after all the works that toggles this
* flag. */
work_op orig_work_cb;
bool work_done;
};

Expand All @@ -436,6 +444,25 @@ static struct test_fixture global_fixture;

static __thread int thread_identifier;

static void *pool_set_up(MUNIT_UNUSED const MunitParameter params[],
MUNIT_UNUSED void *user_data)
{
/* Prevent hangs. */
alarm(2);

pool_init(&global_fixture.pool, uv_default_loop(), 4, POOL_QOS_PRIO_FAIR);
global_fixture.pool.flags |= POOL_FOR_UT;
thread_identifier = MAGIC_MAIN_THREAD;

struct fixture *f = munit_malloc(sizeof *f);
return f;
}

static void pool_tear_down(void *data)
{
free(data);
}

static void progress(void) {
for (unsigned i = 0; i < 20; i++) {
uv_run(uv_default_loop(), UV_RUN_NOWAIT);
Expand All @@ -448,11 +475,17 @@ static void wait_work(void) {
}
}

static void wait_msg_sent(void) {
while (!global_fixture.msg_valid) {
uv_run(uv_default_loop(), UV_RUN_NOWAIT);
}
}

/* Decorates the callback used when the pool work is done to set the test
* fixture flag to true, then calls the original callback.*/
static void test_fixture_work_cb(pool_work_t *w) {
global_fixture.work_done = true;
global_fixture.orig_cb(w);
global_fixture.orig_work_cb(w);
}

static void pool_to_start_op(struct timeout *to, unsigned delay, to_cb_op cb)
Expand All @@ -474,7 +507,7 @@ static void pool_to_init_op(struct timeout *to)
static void pool_work_queue_op(struct work *w, work_op work_cb, work_op after_cb)
{
w->pool_work = (pool_work_t) { 0 };
global_fixture.orig_cb = after_cb;
global_fixture.orig_work_cb = after_cb;
global_fixture.work_done = false;
pool_queue_work(&global_fixture.pool, &w->pool_work, 0/* */, WT_UNORD, work_cb, test_fixture_work_cb);
}
Expand Down Expand Up @@ -529,22 +562,64 @@ static void pool_read_sig_op(pool_work_t *w)
PRE(!follower->ops->is_main_thread());
}

TEST(snapshot_leader, pool_timeouts, set_up, tear_down, 0, NULL) {
struct uv_sender_send_data {
struct sender *s;
sender_cb_op cb;
};

void uv_sender_send_cb(uv_work_t *req) {
(void)req;
}

void uv_sender_send_after_cb(uv_work_t *req, int status) {
(void)req;
(void)status;
global_fixture.msg_valid = true;
struct uv_sender_send_data *data = req->data;
data->cb(data->s, 0);
}

int uv_sender_send_op(struct sender *s,
struct raft_message *payload,
sender_cb_op cb) {
/* We only expect one message to be in-flight. */
static uv_work_t req;
static struct uv_sender_send_data req_data;

global_fixture.last_msg_sent = *payload;
/* Flag is only toggled when the after_cb is called, emulating the message
* being sent. */
global_fixture.msg_valid = false;
s->cb = cb;
req_data = (struct uv_sender_send_data) {
.s = s,
.cb = cb,
};
req = (uv_work_t) {
.data = &req_data,
};
uv_queue_work(uv_default_loop(), &req, uv_sender_send_cb, uv_sender_send_after_cb);
return 0;
}

struct raft_message uv_get_msg_sent(void) {
munit_assert(global_fixture.msg_valid);
global_fixture.msg_valid = false;
return global_fixture.last_msg_sent;
}

TEST(snapshot_leader, pool_timeouts, pool_set_up, pool_tear_down, 0, NULL) {
struct leader_ops ops = {
.to_init = pool_to_init_op,
.to_stop = pool_to_stop_op,
.to_start = pool_to_start_op,
.ht_create = pool_ht_create_op,
.work_queue = pool_work_queue_op,
.sender_send = ut_sender_send_op,
.sender_send = uv_sender_send_op,
.is_main_thread = pool_is_main_thread_op,
};

pool_init(&global_fixture.pool, uv_default_loop(), 4, POOL_QOS_PRIO_FAIR);
global_fixture.pool.flags |= POOL_FOR_UT;
global_fixture.is_leader = true;
thread_identifier = MAGIC_MAIN_THREAD;

struct leader *leader = &global_fixture.leader;
*leader = (struct leader) {
.ops = &ops,
Expand All @@ -563,67 +638,72 @@ TEST(snapshot_leader, pool_timeouts, set_up, tear_down, 0, NULL) {
wait_work();

PRE(sm_state(&leader->sm) == LS_F_NEEDS_SNAP);
ut_rpc_sent(&leader->rpc);
wait_msg_sent();
munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_INSTALL_SNAPSHOT);
pool_rpc_to_expired(&leader->rpc);

PRE(sm_state(&leader->sm) == LS_F_NEEDS_SNAP);
ut_rpc_sent(&leader->rpc);
wait_msg_sent();
munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_INSTALL_SNAPSHOT);
ut_leader_message_received(leader, ut_install_snapshot_result());

PRE(sm_state(&leader->sm) == LS_CHECK_F_HAS_SIGS);
ut_rpc_sent(&leader->rpc);
wait_msg_sent();
munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_SIGNATURE);
ut_leader_message_received(leader, ut_sign_result());
pool_to_expired(leader);

PRE(sm_state(&leader->sm) == LS_CHECK_F_HAS_SIGS);
ut_rpc_sent(&leader->rpc);
wait_msg_sent();
munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_SIGNATURE);
pool_rpc_to_expired(&leader->rpc);

PRE(sm_state(&leader->sm) == LS_CHECK_F_HAS_SIGS);
leader->sigs_calculated = true;
ut_rpc_sent(&leader->rpc);
wait_msg_sent();
munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_SIGNATURE);
ut_leader_message_received(leader, ut_sign_result());

PRE(sm_state(&leader->sm) == LS_REQ_SIG_LOOP);
ut_rpc_sent(&leader->rpc);
wait_msg_sent();
munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_SIGNATURE);
PRE(sm_state(&leader->sm) == LS_REQ_SIG_LOOP);
ut_leader_message_received(leader, ut_sign_result());

wait_work();
wait_work();

PRE(sm_state(&leader->sm) == LS_PAGE_READ);
ut_rpc_sent(&leader->rpc);
wait_msg_sent();
munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_INSTALL_SNAPSHOT_CP);
pool_rpc_to_expired(&leader->rpc);

PRE(sm_state(&leader->sm) == LS_PAGE_READ);
ut_rpc_sent(&leader->rpc);
wait_msg_sent();
munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_INSTALL_SNAPSHOT_CP);
ut_leader_message_received(leader, ut_page_result());

PRE(sm_state(&leader->sm) == LS_SNAP_DONE);
ut_rpc_sent(&leader->rpc);
wait_msg_sent();
munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_INSTALL_SNAPSHOT);
ut_leader_message_received(leader, ut_install_snapshot_result());

sm_fini(&leader->sm);
return MUNIT_OK;
}

TEST(snapshot_follower, pool, set_up, tear_down, 0, NULL) {
TEST(snapshot_follower, pool, pool_set_up, pool_tear_down, 0, NULL) {
struct follower_ops ops = {
.ht_create = pool_ht_create_op,
.work_queue = pool_work_queue_op,
.sender_send = ut_sender_send_op,
.sender_send = uv_sender_send_op,
.read_sig = pool_read_sig_op,
.write_chunk = pool_write_chunk_op,
.fill_ht = pool_fill_ht_op,
.is_main_thread = pool_is_main_thread_op,
};

pool_init(&global_fixture.pool, uv_default_loop(), 4, POOL_QOS_PRIO_FAIR);
global_fixture.pool.flags |= POOL_FOR_UT;
global_fixture.is_leader = false;
thread_identifier = MAGIC_MAIN_THREAD;

struct follower *follower = &global_fixture.follower;

*follower = (struct follower) {
Expand All @@ -635,13 +715,15 @@ TEST(snapshot_follower, pool, set_up, tear_down, 0, NULL) {

PRE(sm_state(&follower->sm) == FS_NORMAL);
ut_follower_message_received(follower, ut_install_snapshot());
ut_rpc_sent(&follower->rpc);
wait_msg_sent();
munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_INSTALL_SNAPSHOT_RESULT);

wait_work();

PRE(sm_state(&follower->sm) == FS_SIGS_CALC_LOOP);
ut_follower_message_received(follower, ut_sign());
ut_rpc_sent(&follower->rpc);
wait_msg_sent();
munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_SIGNATURE_RESULT);

PRE(sm_state(&follower->sm) == FS_SIGS_CALC_LOOP);

Expand All @@ -650,7 +732,8 @@ TEST(snapshot_follower, pool, set_up, tear_down, 0, NULL) {

PRE(sm_state(&follower->sm) == FS_SIGS_CALC_LOOP);
ut_follower_message_received(follower, ut_sign());
ut_rpc_sent(&follower->rpc);
wait_msg_sent();
munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_SIGNATURE_RESULT);

PRE(sm_state(&follower->sm) == FS_SIG_RECEIVING);
ut_follower_message_received(follower, ut_sign());
Expand All @@ -660,19 +743,22 @@ TEST(snapshot_follower, pool, set_up, tear_down, 0, NULL) {
wait_work();

PRE(sm_state(&follower->sm) == FS_SIG_READ);
ut_rpc_sent(&follower->rpc);
wait_msg_sent();
munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_SIGNATURE_RESULT);

PRE(sm_state(&follower->sm) == FS_CHUNCK_RECEIVING);
ut_follower_message_received(follower, ut_page());

wait_work();

PRE(sm_state(&follower->sm) == FS_CHUNCK_APPLIED);
ut_rpc_sent(&follower->rpc);
wait_msg_sent();
munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_INSTALL_SNAPSHOT_CP_RESULT);

PRE(sm_state(&follower->sm) == FS_SNAP_DONE);
ut_follower_message_received(follower, ut_install_snapshot());
ut_rpc_sent(&follower->rpc);
wait_msg_sent();
munit_assert_int(uv_get_msg_sent().type, ==, RAFT_IO_INSTALL_SNAPSHOT_RESULT);

sm_fini(&follower->sm);
return MUNIT_OK;
Expand Down

0 comments on commit a1ab273

Please sign in to comment.