Skip to content

Commit

Permalink
fixes #1663 Request/Reply Protocol Throughput and Scalability
Browse files Browse the repository at this point in the history
This eliminates the req protocols use of nni_timer (and setting
a single timer node per request.  This was problematic because it
devolves into O(n^2) as we wind up inserting timer nodes and having
to scan the list for the timer node.

The solution is to use a single scan - stop worrying about insertion,
but instead use a coarse granularity timer (defaults to 1 second)
for retries.  Then do the O(n) scan just once per interval.
  • Loading branch information
gdamore committed Dec 17, 2023
1 parent cc58517 commit 9aab700
Showing 1 changed file with 62 additions and 34 deletions.
96 changes: 62 additions & 34 deletions src/sp/protocol/reqrep0/req.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ typedef struct req0_ctx req0_ctx;

static void req0_run_send_queue(req0_sock *, nni_aio_completions *);
static void req0_ctx_reset(req0_ctx *);
static void req0_ctx_timeout(void *);
static void req0_pipe_fini(void *);
static void req0_ctx_fini(void *);
static void req0_ctx_init(void *, void *);
static void req0_retry_cb(void *);

// A req0_ctx is a "context" for the request. It uses most of the
// socket, but keeps track of its own outstanding replays, the request ID,
Expand All @@ -34,31 +34,36 @@ struct req0_ctx {
nni_list_node sock_node; // node on the socket context list
nni_list_node send_node; // node on the send_queue
nni_list_node pipe_node; // node on the pipe list
nni_list_node retry_node; // node on the socket retry list
uint32_t request_id; // request ID, without high bit set
nni_aio *recv_aio; // user aio waiting to recv - only one!
nni_aio *send_aio; // user aio waiting to send
nng_msg *req_msg; // request message (owned by protocol)
size_t req_len; // length of request message (for stats)
nng_msg *rep_msg; // reply message
nni_timer_node timer;
nni_duration retry;
nni_time retry_time; // retry after this expires
bool conn_reset; // sent message w/o retry, peer disconnect
};

// A req0_sock is our per-socket protocol private structure.
struct req0_sock {
nni_duration retry;
bool closed;
bool retry_active; // true if retry aio running
nni_atomic_int ttl;
req0_ctx master; // base socket master
nni_list ready_pipes;
nni_list busy_pipes;
nni_list stop_pipes;
nni_list contexts;
nni_list send_queue; // contexts waiting to send.
nni_list retry_queue;
nni_aio retry_aio; // retry timer
nni_id_map requests; // contexts by request ID
nni_pollable readable;
nni_pollable writable;
nni_duration retry_tick; // clock interval for retry timer
nni_mtx mtx;
};

Expand Down Expand Up @@ -95,16 +100,20 @@ req0_sock_init(void *arg, nni_sock *sock)
NNI_LIST_INIT(&s->busy_pipes, req0_pipe, node);
NNI_LIST_INIT(&s->stop_pipes, req0_pipe, node);
NNI_LIST_INIT(&s->send_queue, req0_ctx, send_node);
NNI_LIST_INIT(&s->retry_queue, req0_ctx, retry_node);
NNI_LIST_INIT(&s->contexts, req0_ctx, sock_node);

// this is "semi random" start for request IDs.
s->retry = NNI_SECOND * 60;
s->retry_tick = NNI_SECOND; // how often we check for retries

req0_ctx_init(&s->master, s);

nni_pollable_init(&s->writable);
nni_pollable_init(&s->readable);

nni_aio_init(&s->retry_aio, req0_retry_cb, s);

nni_atomic_init(&s->ttl);
nni_atomic_set(&s->ttl, 8);
}
Expand All @@ -130,6 +139,7 @@ req0_sock_fini(void *arg)
{
req0_sock *s = arg;

nni_aio_stop(&s->retry_aio);
nni_mtx_lock(&s->mtx);
NNI_ASSERT(nni_list_empty(&s->busy_pipes));
NNI_ASSERT(nni_list_empty(&s->stop_pipes));
Expand All @@ -140,6 +150,7 @@ req0_sock_fini(void *arg)
nni_pollable_fini(&s->readable);
nni_pollable_fini(&s->writable);
nni_id_map_fini(&s->requests);
nni_aio_fini(&s->retry_aio);
nni_mtx_fini(&s->mtx);
}

Expand Down Expand Up @@ -236,12 +247,9 @@ req0_pipe_close(void *arg)
ctx->conn_reset = true;
}
} else {
// Reset the timer on this so it expires immediately.
// This is actually easier than canceling the timer and
// running the send_queue separately. (In particular,
// it avoids a potential deadlock on cancelling the
// timer.)
nni_timer_schedule(&ctx->timer, NNI_TIME_ZERO);
// Reset the retry time to make it expire immediately.
// The timer should already be running.
ctx->retry_time = nni_clock();
}
}
nni_mtx_unlock(&s->mtx);
Expand Down Expand Up @@ -363,16 +371,41 @@ req0_recv_cb(void *arg)
}

static void
req0_ctx_timeout(void *arg)
req0_retry_cb(void *arg)
{
req0_ctx *ctx = arg;
req0_sock *s = ctx->sock;

req0_sock *s = arg;
req0_ctx *ctx;
nni_time now;
bool reschedule = false;

// The design of this is that retries are infrequent, because
// we should normally be succeeding. We also hope that we are not
// executing this linear scan of all requests too often, once
// per clock tick is all we want.
now = nni_clock();
nni_mtx_lock(&s->mtx);
if ((ctx->req_msg != NULL) && (!s->closed)) {
if (s->closed || (nni_aio_result(&s->retry_aio) != 0)) {
nni_mtx_unlock(&s->mtx);
return;
}

NNI_LIST_FOREACH (&s->retry_queue, ctx) {
if (ctx->retry_time > now || (ctx->req_msg == NULL)) {
continue;
}
if (!nni_list_node_active(&ctx->send_node)) {
nni_list_append(&s->send_queue, ctx);
}
reschedule = true;
}
if (!nni_list_empty(&s->retry_queue)) {
// if there are still jobs in the queue waiting to be
// retried, do them.
nni_sleep_aio(s->retry_tick, &s->retry_aio);
} else {
s->retry_active = false;
}
if (reschedule) {
req0_run_send_queue(s, NULL);
}
nni_mtx_unlock(&s->mtx);
Expand All @@ -384,8 +417,6 @@ req0_ctx_init(void *arg, void *sock)
req0_sock *s = sock;
req0_ctx *ctx = arg;

nni_timer_init(&ctx->timer, req0_ctx_timeout, ctx);

nni_mtx_lock(&s->mtx);
ctx->sock = s;
ctx->recv_aio = NULL;
Expand Down Expand Up @@ -415,9 +446,6 @@ req0_ctx_fini(void *arg)
req0_ctx_reset(ctx);
nni_list_remove(&s->contexts, ctx);
nni_mtx_unlock(&s->mtx);

nni_timer_cancel(&ctx->timer);
nni_timer_fini(&ctx->timer);
}

static int
Expand Down Expand Up @@ -448,20 +476,20 @@ req0_run_send_queue(req0_sock *s, nni_aio_completions *sent_list)
return;
}

// We have a place to send it, so do the send.
// We have a place to send it, so send it.
// If a sending error occurs that causes the message to
// be dropped, we rely on the resend timer to pick it up.
// We also notify the completion callback if this is the
// first send attempt.
nni_list_remove(&s->send_queue, ctx);

// Schedule a resubmit timer. We only do this if we got
// Schedule a retry. We only do this if we got
// a pipe to send to. Otherwise, we should get handled
// the next time that the send_queue is run. We don't do this
// if the retry is "disabled" with NNG_DURATION_INFINITE.
if (ctx->retry > 0) {
nni_timer_schedule(
&ctx->timer, nni_clock() + ctx->retry);
nni_list_node_remove(&ctx->retry_node);
nni_list_append(&s->retry_queue, ctx);
}

// Put us on the pipe list of active contexts.
Expand Down Expand Up @@ -489,7 +517,7 @@ req0_run_send_queue(req0_sock *s, nni_aio_completions *sent_list)
}

// At this point, we will never give this message back to
// to the user, so we don't have to worry about making it
// the user, so we don't have to worry about making it
// unique. We can freely clone it.
nni_msg_clone(ctx->req_msg);
nni_aio_set_msg(&p->aio_send, ctx->req_msg);
Expand All @@ -503,16 +531,7 @@ req0_ctx_reset(req0_ctx *ctx)
req0_sock *s = ctx->sock;
// Call with sock lock held!

// We cannot safely "wait" using nni_timer_cancel, but this removes
// any scheduled timer activation. If the timeout is already running
// concurrently, it will still run. It should do nothing, because
// we toss the request. There is still a very narrow race if the
// timeout fires, but doesn't actually start running before we
// both finish this function, *and* manage to reschedule another
// request. The consequence of that occurring is that the request
// will be emitted on the wire twice. This is not actually tragic.
nni_timer_schedule(&ctx->timer, NNI_TIME_NEVER);

nni_list_node_remove(&ctx->retry_node);
nni_list_node_remove(&ctx->pipe_node);
nni_list_node_remove(&ctx->send_node);
if (ctx->request_id != 0) {
Expand Down Expand Up @@ -561,7 +580,7 @@ req0_ctx_cancel_recv(nni_aio *aio, void *arg, int rv)
// entire state machine. This allows us to preserve the
// semantic of exactly one receive operation per send
// operation, and should be the least surprising for users. The
// main consequence is that if a receive operation is completed
// main consequence is that if the operation is completed
// (in error or otherwise), the user must submit a new send
// operation to restart the state machine.
req0_ctx_reset(ctx);
Expand Down Expand Up @@ -713,6 +732,15 @@ req0_ctx_send(void *arg, nni_aio *aio)
ctx->send_aio = aio;
nni_aio_set_msg(aio, NULL);

if (ctx->retry > 0) {
ctx->retry_time = nni_clock() + ctx->retry;
nni_list_append(&s->retry_queue, ctx);
if (!s->retry_active) {
s->retry_active = true;
nni_sleep_aio(s->retry_tick, &s->retry_aio);
}
}

// Stick us on the send_queue list.
nni_list_append(&s->send_queue, ctx);

Expand Down

0 comments on commit 9aab700

Please sign in to comment.