diff --git a/src/lib/threadpool.c b/src/lib/threadpool.c index c82e4e95c..f87577184 100644 --- a/src/lib/threadpool.c +++ b/src/lib/threadpool.c @@ -1,13 +1,15 @@ #include "threadpool.h" #include #include +#include #include #include +#include #include +#include #include "../../src/lib/queue.h" #include "../../src/lib/sm.h" #include "../../src/utils.h" -#include "../tracing.h" /** * Planner thread state machine. @@ -74,6 +76,9 @@ static const struct sm_conf planner_states[PS_NR] = { }, }; +static const uint64_t pool_thread_magic = 0xf344e2; +static uv_key_t thread_identifier_key; + enum { THREADPOOL_SIZE_MAX = 1024, }; @@ -125,6 +130,14 @@ struct pool_impl { }; /* clang-format on */ +/* Callback does not allow passing data, we use a static variable to report + * errors back. */ +static int thread_key_create_err = 0; +static void thread_key_create(void) { + PRE(thread_key_create_err == 0); + thread_key_create_err = uv_key_create(&thread_identifier_key); +} + static inline bool pool_is_inited(const pool_t *pool) { return pool->pi != NULL; @@ -325,6 +338,7 @@ static void worker(void *arg) pool_work_t *w; queue *q; + uv_key_set(&thread_identifier_key, (void*)pool_thread_magic); uv_sem_post(ta->sem); uv_mutex_lock(mutex); for (;;) { @@ -552,7 +566,17 @@ int pool_init(pool_t *pool, return rc; } + static uv_once_t once = UV_ONCE_INIT; + uv_once(&once, thread_key_create); + if (thread_key_create_err != 0) { + uv_close((uv_handle_t *)&pi->outq_async, NULL); + uv_mutex_destroy(&pi->outq_mutex); + free(pi); + return thread_key_create_err; + } + pool_threads_init(pool); + return 0; } @@ -581,6 +605,10 @@ void pool_close(pool_t *pool) uv_mutex_unlock(&pi->mutex); } +bool pool_is_pool_thread(void) { + return uv_key_get(&thread_identifier_key) == (void*)pool_thread_magic; +} + pool_t *pool_ut_fallback(void) { static pool_t pool; diff --git a/src/lib/threadpool.h b/src/lib/threadpool.h index 8858e9aac..7503a2ddd 100644 --- a/src/lib/threadpool.h +++ b/src/lib/threadpool.h @@ -2,6 +2,7 @@ #define __THREAD_POOL__ #include +#include #include "queue.h" /** @@ -111,6 +112,7 @@ void pool_queue_work(pool_t *pool, enum pool_work_type type, void (*work_cb)(pool_work_t *w), void (*after_work_cb)(pool_work_t *w)); +bool pool_is_pool_thread(void); pool_t *pool_ut_fallback(void); diff --git a/src/raft/recv_install_snapshot.c b/src/raft/recv_install_snapshot.c index e6763f84d..d372f2d5b 100644 --- a/src/raft/recv_install_snapshot.c +++ b/src/raft/recv_install_snapshot.c @@ -426,7 +426,7 @@ static void leader_work_done(pool_work_t *w) struct work *work = CONTAINER_OF(w, struct work, pool_work); struct leader *leader = CONTAINER_OF(work, struct leader, work); - PRE(leader->ops->is_main_thread()); + PRE(!leader->ops->is_pool_thread()); sm_move(&work->sm, WORK_DONE); leader_tick(leader, M_WORK_DONE); } @@ -436,7 +436,7 @@ static void follower_work_done(pool_work_t *w) struct work *work = CONTAINER_OF(w, struct work, pool_work); struct follower *follower = CONTAINER_OF(work, struct follower, work); - PRE(follower->ops->is_main_thread()); + PRE(!follower->ops->is_pool_thread()); sm_move(&work->sm, WORK_DONE); follower_tick(follower, M_WORK_DONE); } @@ -447,7 +447,7 @@ static void rpc_to_cb(uv_timer_t *handle) struct rpc *rpc = CONTAINER_OF(to, struct rpc, timeout); struct leader *leader = CONTAINER_OF(rpc, struct leader, rpc); - PRE(leader->ops->is_main_thread()); + PRE(!leader->ops->is_pool_thread()); if (sm_state(&to->sm) == TO_CANCELED) { return; } @@ -461,7 +461,7 @@ static void leader_to_cb(uv_timer_t *handle) struct timeout *to = CONTAINER_OF(handle, struct timeout, handle); struct leader *leader = CONTAINER_OF(to, struct leader, timeout); - PRE(leader->ops->is_main_thread()); + PRE(!leader->ops->is_pool_thread()); if (sm_state(&to->sm) == TO_CANCELED) { return; } @@ -492,7 +492,7 @@ static void leader_sent_cb(struct sender *s, int rc) struct rpc *rpc = CONTAINER_OF(s, struct rpc, sender); struct leader *leader = CONTAINER_OF(rpc, struct leader, rpc); - PRE(leader->ops->is_main_thread()); + PRE(!leader->ops->is_pool_thread()); if (UNLIKELY(rc != 0)) { sm_move(&rpc->sm, RPC_ERROR); return; @@ -505,7 +505,7 @@ static void follower_sent_cb(struct sender *s, int rc) struct rpc *rpc = CONTAINER_OF(s, struct rpc, sender); struct follower *follower = CONTAINER_OF(rpc, struct follower, rpc); - PRE(follower->ops->is_main_thread()); + PRE(!follower->ops->is_pool_thread()); if (UNLIKELY(rc != 0)) { sm_move(&rpc->sm, RPC_ERROR); return; @@ -864,7 +864,7 @@ __attribute__((unused)) void leader_tick(struct leader *leader, const struct raf struct sm *sm = &leader->sm; const struct leader_ops *ops = leader->ops; - PRE(ops->is_main_thread()); + PRE(!ops->is_pool_thread()); if (!is_a_trigger_leader(leader, incoming) || is_a_duplicate(leader, incoming)) @@ -941,7 +941,7 @@ __attribute__((unused)) void follower_tick(struct follower *follower, const stru is_a_duplicate(follower, incoming)) return; - PRE(ops->is_main_thread()); + PRE(!ops->is_pool_thread()); again: switch (sm_state(&follower->sm)) { diff --git a/src/raft/recv_install_snapshot.h b/src/raft/recv_install_snapshot.h index 614b7aec7..901b36258 100644 --- a/src/raft/recv_install_snapshot.h +++ b/src/raft/recv_install_snapshot.h @@ -58,7 +58,7 @@ struct leader_ops { void (*work_queue)(struct work *w, work_op work, work_op after_cb); - bool (*is_main_thread)(void); + bool (*is_pool_thread)(void); }; struct follower_ops { @@ -70,7 +70,7 @@ struct follower_ops { sender_send_op sender_send; void (*work_queue)(struct work *w, work_op work, work_op after_cb); - bool (*is_main_thread)(void); + bool (*is_pool_thread)(void); }; struct leader { diff --git a/test/raft/unit/test_snapshot.c b/test/raft/unit/test_snapshot.c index 368719c94..5b5a13981 100644 --- a/test/raft/unit/test_snapshot.c +++ b/test/raft/unit/test_snapshot.c @@ -188,8 +188,8 @@ int ut_sender_send_op(struct sender *s, return 0; } -static bool ut_is_main_thread_op(void) { - return true; +static bool ut_is_pool_thread_op(void) { + return false; } TEST(snapshot_follower, basic, set_up, tear_down, 0, NULL) { @@ -200,7 +200,7 @@ TEST(snapshot_follower, basic, set_up, tear_down, 0, NULL) { .read_sig = ut_read_sig_op, .write_chunk = ut_write_chunk_op, .fill_ht = ut_fill_ht_op, - .is_main_thread = ut_is_main_thread_op, + .is_pool_thread = ut_is_pool_thread_op, }; struct follower follower = { @@ -271,7 +271,7 @@ TEST(snapshot_leader, basic, set_up, tear_down, 0, NULL) { .ht_create = ut_ht_create_op, .work_queue = ut_work_queue_op, .sender_send = ut_sender_send_op, - .is_main_thread = ut_is_main_thread_op, + .is_pool_thread = ut_is_pool_thread_op, }; struct leader leader = { @@ -339,7 +339,7 @@ TEST(snapshot_leader, timeouts, set_up, tear_down, 0, NULL) { .ht_create = ut_ht_create_op, .work_queue = ut_work_queue_op, .sender_send = ut_sender_send_op, - .is_main_thread = ut_is_main_thread_op, + .is_pool_thread = ut_is_pool_thread_op, }; struct leader leader = { @@ -441,10 +441,6 @@ struct test_fixture { /* Not problematic because each test runs in a different process. */ static struct test_fixture global_fixture; -#define MAGIC_MAIN_THREAD 0xdef1 - -static __thread int thread_identifier; - static void *pool_set_up(MUNIT_UNUSED const MunitParameter params[], MUNIT_UNUSED void *user_data) { @@ -454,7 +450,6 @@ static void *pool_set_up(MUNIT_UNUSED const MunitParameter params[], global_fixture = (struct test_fixture) { 0 }; pool_init(&global_fixture.pool, uv_default_loop(), 4, POOL_QOS_PRIO_FAIR); global_fixture.pool.flags |= POOL_FOR_UT; - thread_identifier = MAGIC_MAIN_THREAD; struct fixture *f = munit_malloc(sizeof *f); return f; @@ -473,17 +468,13 @@ static void progress(void) { } } -static bool pool_is_main_thread_op(void) { - return thread_identifier == MAGIC_MAIN_THREAD; -} - /* Advances libuv in the main thread until the in-flight background work is * finished. * * This function is designed with the constaint that there can only be one * request in-flight. It will hang until the work is finished. */ static void wait_work(void) { - PRE(pool_is_main_thread_op()); + PRE(!pool_is_pool_thread()); while (!global_fixture.work_done) { uv_run(uv_default_loop(), UV_RUN_NOWAIT); @@ -496,7 +487,7 @@ static void wait_work(void) { * This function is designed with the constaint that there can only be one * message in-flight. It will hang until the message is sent. */ static void wait_msg_sent(void) { - PRE(pool_is_main_thread_op()); + PRE(!pool_is_pool_thread()); while (!global_fixture.msg_valid) { uv_run(uv_default_loop(), UV_RUN_NOWAIT); @@ -549,9 +540,9 @@ static void pool_rpc_to_expired(struct rpc *rpc) static void pool_ht_create_op(pool_work_t *w) { if (global_fixture.is_leader) { - PRE(!global_fixture.leader.ops->is_main_thread()); + PRE(global_fixture.leader.ops->is_pool_thread()); } else { - PRE(!global_fixture.follower.ops->is_main_thread()); + PRE(global_fixture.follower.ops->is_pool_thread()); } (void)w; } @@ -559,9 +550,9 @@ static void pool_ht_create_op(pool_work_t *w) static void pool_fill_ht_op(pool_work_t *w) { if (global_fixture.is_leader) { - PRE(!global_fixture.leader.ops->is_main_thread()); + PRE(global_fixture.leader.ops->is_pool_thread()); } else { - PRE(!global_fixture.follower.ops->is_main_thread()); + PRE(global_fixture.follower.ops->is_pool_thread()); } (void)w; } @@ -570,14 +561,14 @@ static void pool_write_chunk_op(pool_work_t *w) { struct work *work = CONTAINER_OF(w, struct work, pool_work); struct follower *follower = CONTAINER_OF(work, struct follower, work); - PRE(!follower->ops->is_main_thread()); + PRE(follower->ops->is_pool_thread()); } static void pool_read_sig_op(pool_work_t *w) { struct work *work = CONTAINER_OF(w, struct work, pool_work); struct follower *follower = CONTAINER_OF(work, struct follower, work); - PRE(!follower->ops->is_main_thread()); + PRE(follower->ops->is_pool_thread()); } struct uv_sender_send_data { @@ -632,7 +623,7 @@ TEST(snapshot_leader, pool_timeouts, pool_set_up, pool_tear_down, 0, NULL) { .ht_create = pool_ht_create_op, .work_queue = pool_work_queue_op, .sender_send = uv_sender_send_op, - .is_main_thread = pool_is_main_thread_op, + .is_pool_thread = pool_is_pool_thread, }; global_fixture.is_leader = true; @@ -716,7 +707,7 @@ TEST(snapshot_follower, pool, pool_set_up, pool_tear_down, 0, NULL) { .read_sig = pool_read_sig_op, .write_chunk = pool_write_chunk_op, .fill_ht = pool_fill_ht_op, - .is_main_thread = pool_is_main_thread_op, + .is_pool_thread = pool_is_pool_thread, }; global_fixture.is_leader = false;