Skip to content

Commit

Permalink
feat: pool thread identification
Browse files Browse the repository at this point in the history
  • Loading branch information
letFunny committed Jul 31, 2024
1 parent 7de953b commit defc3f3
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 35 deletions.
30 changes: 29 additions & 1 deletion src/lib/threadpool.c
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
#include "threadpool.h"
#include <assert.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <uv.h>
#include <uv/unix.h>
#include "../../src/lib/queue.h"
#include "../../src/lib/sm.h"
#include "../../src/utils.h"
#include "../tracing.h"

/**
* Planner thread state machine.
Expand Down Expand Up @@ -74,6 +76,9 @@ static const struct sm_conf planner_states[PS_NR] = {
},
};

static const uint64_t pool_thread_magic = 0xf344e2;
static uv_key_t thread_identifier_key;

enum {
THREADPOOL_SIZE_MAX = 1024,
};
Expand Down Expand Up @@ -125,6 +130,14 @@ struct pool_impl {
};
/* clang-format on */

/* Callback does not allow passing data, we use a static variable to report
* errors back. */
static int thread_key_create_err = 0;
static void thread_key_create(void) {
PRE(thread_key_create_err == 0);
thread_key_create_err = uv_key_create(&thread_identifier_key);
}

static inline bool pool_is_inited(const pool_t *pool)
{
return pool->pi != NULL;
Expand Down Expand Up @@ -325,6 +338,7 @@ static void worker(void *arg)
pool_work_t *w;
queue *q;

uv_key_set(&thread_identifier_key, (void*)pool_thread_magic);
uv_sem_post(ta->sem);
uv_mutex_lock(mutex);
for (;;) {
Expand Down Expand Up @@ -552,7 +566,17 @@ int pool_init(pool_t *pool,
return rc;
}

static uv_once_t once = UV_ONCE_INIT;
uv_once(&once, thread_key_create);
if (thread_key_create_err != 0) {
uv_close((uv_handle_t *)&pi->outq_async, NULL);
uv_mutex_destroy(&pi->outq_mutex);
free(pi);
return thread_key_create_err;

Check warning on line 575 in src/lib/threadpool.c

View check run for this annotation

Codecov / codecov/patch

src/lib/threadpool.c#L572-L575

Added lines #L572 - L575 were not covered by tests
}

pool_threads_init(pool);

return 0;
}

Expand Down Expand Up @@ -581,6 +605,10 @@ void pool_close(pool_t *pool)
uv_mutex_unlock(&pi->mutex);
}

bool pool_is_pool_thread(void) {
return uv_key_get(&thread_identifier_key) == (void*)pool_thread_magic;
}

pool_t *pool_ut_fallback(void)
{
static pool_t pool;
Expand Down
2 changes: 2 additions & 0 deletions src/lib/threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define __THREAD_POOL__

#include <uv.h>
#include <stdbool.h>
#include "queue.h"

/**
Expand Down Expand Up @@ -111,6 +112,7 @@ void pool_queue_work(pool_t *pool,
enum pool_work_type type,
void (*work_cb)(pool_work_t *w),
void (*after_work_cb)(pool_work_t *w));
bool pool_is_pool_thread(void);

pool_t *pool_ut_fallback(void);

Expand Down
16 changes: 8 additions & 8 deletions src/raft/recv_install_snapshot.c
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ static void leader_work_done(pool_work_t *w)
struct work *work = CONTAINER_OF(w, struct work, pool_work);
struct leader *leader = CONTAINER_OF(work, struct leader, work);

PRE(leader->ops->is_main_thread());
PRE(!leader->ops->is_pool_thread());
sm_move(&work->sm, WORK_DONE);
leader_tick(leader, M_WORK_DONE);
}
Expand All @@ -436,7 +436,7 @@ static void follower_work_done(pool_work_t *w)
struct work *work = CONTAINER_OF(w, struct work, pool_work);
struct follower *follower = CONTAINER_OF(work, struct follower, work);

PRE(follower->ops->is_main_thread());
PRE(!follower->ops->is_pool_thread());
sm_move(&work->sm, WORK_DONE);
follower_tick(follower, M_WORK_DONE);
}
Expand All @@ -447,7 +447,7 @@ static void rpc_to_cb(uv_timer_t *handle)
struct rpc *rpc = CONTAINER_OF(to, struct rpc, timeout);
struct leader *leader = CONTAINER_OF(rpc, struct leader, rpc);

PRE(leader->ops->is_main_thread());
PRE(!leader->ops->is_pool_thread());
if (sm_state(&to->sm) == TO_CANCELED) {
return;

Check warning on line 452 in src/raft/recv_install_snapshot.c

View check run for this annotation

Codecov / codecov/patch

src/raft/recv_install_snapshot.c#L452

Added line #L452 was not covered by tests
}
Expand All @@ -461,7 +461,7 @@ static void leader_to_cb(uv_timer_t *handle)
struct timeout *to = CONTAINER_OF(handle, struct timeout, handle);
struct leader *leader = CONTAINER_OF(to, struct leader, timeout);

PRE(leader->ops->is_main_thread());
PRE(!leader->ops->is_pool_thread());
if (sm_state(&to->sm) == TO_CANCELED) {
return;

Check warning on line 466 in src/raft/recv_install_snapshot.c

View check run for this annotation

Codecov / codecov/patch

src/raft/recv_install_snapshot.c#L466

Added line #L466 was not covered by tests
}
Expand Down Expand Up @@ -492,7 +492,7 @@ static void leader_sent_cb(struct sender *s, int rc)
struct rpc *rpc = CONTAINER_OF(s, struct rpc, sender);
struct leader *leader = CONTAINER_OF(rpc, struct leader, rpc);

PRE(leader->ops->is_main_thread());
PRE(!leader->ops->is_pool_thread());
if (UNLIKELY(rc != 0)) {
sm_move(&rpc->sm, RPC_ERROR);
return;
Expand All @@ -505,7 +505,7 @@ static void follower_sent_cb(struct sender *s, int rc)
struct rpc *rpc = CONTAINER_OF(s, struct rpc, sender);
struct follower *follower = CONTAINER_OF(rpc, struct follower, rpc);

PRE(follower->ops->is_main_thread());
PRE(!follower->ops->is_pool_thread());
if (UNLIKELY(rc != 0)) {
sm_move(&rpc->sm, RPC_ERROR);
return;
Expand Down Expand Up @@ -864,7 +864,7 @@ __attribute__((unused)) void leader_tick(struct leader *leader, const struct raf
struct sm *sm = &leader->sm;
const struct leader_ops *ops = leader->ops;

PRE(ops->is_main_thread());
PRE(!ops->is_pool_thread());

if (!is_a_trigger_leader(leader, incoming) ||
is_a_duplicate(leader, incoming))
Expand Down Expand Up @@ -941,7 +941,7 @@ __attribute__((unused)) void follower_tick(struct follower *follower, const stru
is_a_duplicate(follower, incoming))
return;

PRE(ops->is_main_thread());
PRE(!ops->is_pool_thread());

again:
switch (sm_state(&follower->sm)) {
Expand Down
4 changes: 2 additions & 2 deletions src/raft/recv_install_snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ struct leader_ops {

void (*work_queue)(struct work *w,
work_op work, work_op after_cb);
bool (*is_main_thread)(void);
bool (*is_pool_thread)(void);
};

struct follower_ops {
Expand All @@ -70,7 +70,7 @@ struct follower_ops {
sender_send_op sender_send;
void (*work_queue)(struct work *w,
work_op work, work_op after_cb);
bool (*is_main_thread)(void);
bool (*is_pool_thread)(void);
};

struct leader {
Expand Down
39 changes: 15 additions & 24 deletions test/raft/unit/test_snapshot.c
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ int ut_sender_send_op(struct sender *s,
return 0;
}

static bool ut_is_main_thread_op(void) {
return true;
static bool ut_is_pool_thread_op(void) {
return false;
}

TEST(snapshot_follower, basic, set_up, tear_down, 0, NULL) {
Expand All @@ -200,7 +200,7 @@ TEST(snapshot_follower, basic, set_up, tear_down, 0, NULL) {
.read_sig = ut_read_sig_op,
.write_chunk = ut_write_chunk_op,
.fill_ht = ut_fill_ht_op,
.is_main_thread = ut_is_main_thread_op,
.is_pool_thread = ut_is_pool_thread_op,
};

struct follower follower = {
Expand Down Expand Up @@ -271,7 +271,7 @@ TEST(snapshot_leader, basic, set_up, tear_down, 0, NULL) {
.ht_create = ut_ht_create_op,
.work_queue = ut_work_queue_op,
.sender_send = ut_sender_send_op,
.is_main_thread = ut_is_main_thread_op,
.is_pool_thread = ut_is_pool_thread_op,
};

struct leader leader = {
Expand Down Expand Up @@ -339,7 +339,7 @@ TEST(snapshot_leader, timeouts, set_up, tear_down, 0, NULL) {
.ht_create = ut_ht_create_op,
.work_queue = ut_work_queue_op,
.sender_send = ut_sender_send_op,
.is_main_thread = ut_is_main_thread_op,
.is_pool_thread = ut_is_pool_thread_op,
};

struct leader leader = {
Expand Down Expand Up @@ -441,10 +441,6 @@ struct test_fixture {
/* Not problematic because each test runs in a different process. */
static struct test_fixture global_fixture;

#define MAGIC_MAIN_THREAD 0xdef1

static __thread int thread_identifier;

static void *pool_set_up(MUNIT_UNUSED const MunitParameter params[],
MUNIT_UNUSED void *user_data)
{
Expand All @@ -454,7 +450,6 @@ static void *pool_set_up(MUNIT_UNUSED const MunitParameter params[],
global_fixture = (struct test_fixture) { 0 };
pool_init(&global_fixture.pool, uv_default_loop(), 4, POOL_QOS_PRIO_FAIR);
global_fixture.pool.flags |= POOL_FOR_UT;
thread_identifier = MAGIC_MAIN_THREAD;

struct fixture *f = munit_malloc(sizeof *f);
return f;
Expand All @@ -473,17 +468,13 @@ static void progress(void) {
}
}

static bool pool_is_main_thread_op(void) {
return thread_identifier == MAGIC_MAIN_THREAD;
}

/* Advances libuv in the main thread until the in-flight background work is
* finished.
*
* This function is designed with the constaint that there can only be one
* request in-flight. It will hang until the work is finished. */
static void wait_work(void) {
PRE(pool_is_main_thread_op());
PRE(!pool_is_pool_thread());

while (!global_fixture.work_done) {
uv_run(uv_default_loop(), UV_RUN_NOWAIT);
Expand All @@ -496,7 +487,7 @@ static void wait_work(void) {
* This function is designed with the constaint that there can only be one
* message in-flight. It will hang until the message is sent. */
static void wait_msg_sent(void) {
PRE(pool_is_main_thread_op());
PRE(!pool_is_pool_thread());

while (!global_fixture.msg_valid) {
uv_run(uv_default_loop(), UV_RUN_NOWAIT);
Expand Down Expand Up @@ -549,19 +540,19 @@ static void pool_rpc_to_expired(struct rpc *rpc)
static void pool_ht_create_op(pool_work_t *w)
{
if (global_fixture.is_leader) {
PRE(!global_fixture.leader.ops->is_main_thread());
PRE(global_fixture.leader.ops->is_pool_thread());
} else {
PRE(!global_fixture.follower.ops->is_main_thread());
PRE(global_fixture.follower.ops->is_pool_thread());
}
(void)w;
}

static void pool_fill_ht_op(pool_work_t *w)
{
if (global_fixture.is_leader) {
PRE(!global_fixture.leader.ops->is_main_thread());
PRE(global_fixture.leader.ops->is_pool_thread());

Check warning on line 553 in test/raft/unit/test_snapshot.c

View check run for this annotation

Codecov / codecov/patch

test/raft/unit/test_snapshot.c#L553

Added line #L553 was not covered by tests
} else {
PRE(!global_fixture.follower.ops->is_main_thread());
PRE(global_fixture.follower.ops->is_pool_thread());
}
(void)w;
}
Expand All @@ -570,14 +561,14 @@ static void pool_write_chunk_op(pool_work_t *w)
{
struct work *work = CONTAINER_OF(w, struct work, pool_work);
struct follower *follower = CONTAINER_OF(work, struct follower, work);
PRE(!follower->ops->is_main_thread());
PRE(follower->ops->is_pool_thread());
}

static void pool_read_sig_op(pool_work_t *w)
{
struct work *work = CONTAINER_OF(w, struct work, pool_work);
struct follower *follower = CONTAINER_OF(work, struct follower, work);
PRE(!follower->ops->is_main_thread());
PRE(follower->ops->is_pool_thread());
}

struct uv_sender_send_data {
Expand Down Expand Up @@ -632,7 +623,7 @@ TEST(snapshot_leader, pool_timeouts, pool_set_up, pool_tear_down, 0, NULL) {
.ht_create = pool_ht_create_op,
.work_queue = pool_work_queue_op,
.sender_send = uv_sender_send_op,
.is_main_thread = pool_is_main_thread_op,
.is_pool_thread = pool_is_pool_thread,
};

global_fixture.is_leader = true;
Expand Down Expand Up @@ -716,7 +707,7 @@ TEST(snapshot_follower, pool, pool_set_up, pool_tear_down, 0, NULL) {
.read_sig = pool_read_sig_op,
.write_chunk = pool_write_chunk_op,
.fill_ht = pool_fill_ht_op,
.is_main_thread = pool_is_main_thread_op,
.is_pool_thread = pool_is_pool_thread,
};

global_fixture.is_leader = false;
Expand Down

0 comments on commit defc3f3

Please sign in to comment.