Skip to content

Commit

Permalink
fixes #1523 rare SEGV in sub nni_list_remove
Browse files Browse the repository at this point in the history
Credit goes to Wu Xuan (@willwu1217) for diagnosing and proposing
a fix as part of #1695.  This approach takes a revised approach
to avoid adding extra memory, and it also is slightly faster as we
do not need to update both pointers in the linked list, by reusing
the reap node.

As part of this a new internal API, nni_aio_completions, is introduced.
In all likelihood we will be able to use this to solve some similar
crashes in other areas of the code.
  • Loading branch information
gdamore committed Nov 26, 2023
1 parent a54820f commit 9fda269
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 36 deletions.
33 changes: 33 additions & 0 deletions src/core/aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,39 @@ nni_aio_list_active(nni_aio *aio)
return (nni_list_node_active(&aio->a_prov_node));
}

// completions list.
// Implementation note: in order to avoid wasting space, we
// reuse the reap node -- which will be inactive here.
void
nni_aio_completions_init(nni_aio_completions *clp)
{
*clp = NULL;
}

void
nni_aio_completions_add(nni_aio_completions *clp, nni_aio *aio, int result, size_t count)
{
NNI_ASSERT(!nni_aio_list_active(aio));
aio->a_reap_node.rn_next = *clp;
aio->a_result = result;
aio->a_count = count;
*clp = aio;
}

void
nni_aio_completions_run(nni_aio_completions *clp)
{
nni_aio *aio;
nni_aio *cl = *clp;
*clp = NULL;

while ((aio = cl) != NULL) {
cl = (void *)aio->a_reap_node.rn_next;
aio->a_reap_node.rn_next = NULL;
nni_aio_finish_sync(aio, aio->a_result, aio->a_count);
}
}

static void
nni_aio_expire_add(nni_aio *aio)
{
Expand Down
26 changes: 25 additions & 1 deletion src/core/aio.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2022 Staysail Systems, Inc. <[email protected]>
// Copyright 2023 Staysail Systems, Inc. <[email protected]>
// Copyright 2018 Capitar IT Group BV <[email protected]>
//
// This software is supplied under the terms of the MIT License, a
Expand Down Expand Up @@ -166,6 +166,30 @@ extern int nni_aio_schedule(nni_aio *, nni_aio_cancel_fn, void *);

extern void nni_sleep_aio(nni_duration, nni_aio *);

// nni_aio_completion_list is used after removing the aio from an
// active work queue, and keeping them so that the completions can
// be run in a deferred manner. These lists are simple, and intended
// to be used as local variables. It's important to initialize the
// list before using it. Also, any AIO added to a completion list must
// not be in active use anywhere.
typedef void *nni_aio_completions;

// nni_aio_completions_init just initializes a completions list.
// This just sets the pointed value to NULL.
extern void nni_aio_completions_init(nni_aio_completions *);

// nni_aio_completions_run runs nni_aio_finish_sync for all the aio objects
// that have been added to the completions. The result code and count used
// are those supplied in nni_aio_completions_add. Callers should not hold
// locks when calling this.
extern void nni_aio_completions_run(nni_aio_completions *);

// nni_aio_completions_add adds an aio (with the result code and length as
// appropriate) to the completion list. This should be done while the
// appropriate lock is held. The aio must not be scheduled.
extern void nni_aio_completions_add(nni_aio_completions *, nni_aio *,
int, size_t);

extern int nni_aio_sys_init(void);
extern void nni_aio_sys_fini(void);

Expand Down
67 changes: 32 additions & 35 deletions src/sp/protocol/pubsub0/sub.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2021 Staysail Systems, Inc. <[email protected]>
// Copyright 2023 Staysail Systems, Inc. <[email protected]>
// Copyright 2018 Capitar IT Group BV <[email protected]>
// Copyright 2019 Nathan Kent <[email protected]>
//
Expand Down Expand Up @@ -44,14 +44,14 @@ static void sub0_pipe_fini(void *);
struct sub0_topic {
nni_list_node node;
size_t len;
void * buf;
void *buf;
};

// sub0_ctx is a context for a SUB socket. The advantage of contexts is
// that different contexts can maintain different subscriptions.
struct sub0_ctx {
nni_list_node node;
sub0_sock * sock;
sub0_sock *sock;
nni_list topics; // TODO: Consider patricia trie
nni_list recv_queue; // can have multiple pending receives
nni_lmq lmq;
Expand All @@ -71,15 +71,15 @@ struct sub0_sock {

// sub0_pipe is our per-pipe protocol private structure.
struct sub0_pipe {
nni_pipe * pipe;
nni_pipe *pipe;
sub0_sock *sub;
nni_aio aio_recv;
};

static void
sub0_ctx_cancel(nng_aio *aio, void *arg, int rv)
{
sub0_ctx * ctx = arg;
sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
nni_mtx_lock(&sock->lk);
if (nni_list_active(&ctx->recv_queue, aio)) {
Expand All @@ -92,9 +92,9 @@ sub0_ctx_cancel(nng_aio *aio, void *arg, int rv)
static void
sub0_ctx_recv(void *arg, nni_aio *aio)
{
sub0_ctx * ctx = arg;
sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
nni_msg * msg;
nni_msg *msg;

if (nni_aio_begin(aio) != 0) {
return;
Expand Down Expand Up @@ -140,9 +140,9 @@ sub0_ctx_send(void *arg, nni_aio *aio)
static void
sub0_ctx_close(void *arg)
{
sub0_ctx * ctx = arg;
sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
nni_aio * aio;
nni_aio *aio;

nni_mtx_lock(&sock->lk);
while ((aio = nni_list_first(&ctx->recv_queue)) != NULL) {
Expand All @@ -155,8 +155,8 @@ sub0_ctx_close(void *arg)
static void
sub0_ctx_fini(void *arg)
{
sub0_ctx * ctx = arg;
sub0_sock * sock = ctx->sock;
sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
sub0_topic *topic;

sub0_ctx_close(ctx);
Expand All @@ -179,7 +179,7 @@ static void
sub0_ctx_init(void *ctx_arg, void *sock_arg)
{
sub0_sock *sock = sock_arg;
sub0_ctx * ctx = ctx_arg;
sub0_ctx *ctx = ctx_arg;
size_t len;
bool prefer_new;

Expand Down Expand Up @@ -311,22 +311,22 @@ sub0_matches(sub0_ctx *ctx, uint8_t *body, size_t len)
static void
sub0_recv_cb(void *arg)
{
sub0_pipe *p = arg;
sub0_sock *sock = p->sub;
sub0_ctx * ctx;
nni_msg * msg;
size_t len;
uint8_t * body;
nni_list finish;
nng_aio * aio;
nni_msg * dup_msg;
sub0_pipe *p = arg;
sub0_sock *sock = p->sub;
sub0_ctx *ctx;
nni_msg *msg;
size_t len;
uint8_t *body;
nng_aio *aio;
nni_msg *dup_msg;
nni_aio_completions finish;

if (nni_aio_result(&p->aio_recv) != 0) {
nni_pipe_close(p->pipe);
return;
}

nni_aio_list_init(&finish);
nni_aio_completions_init(&finish);

msg = nni_aio_get_msg(&p->aio_recv);
nni_aio_set_msg(&p->aio_recv, NULL);
Expand Down Expand Up @@ -370,7 +370,7 @@ sub0_recv_cb(void *arg)
nni_aio_set_msg(aio, dup_msg);

// Save for synchronous completion
nni_list_append(&finish, aio);
nni_aio_completions_add(&finish, aio, 0, len);
} else if (nni_lmq_full(&ctx->lmq)) {
// Make space for the new message.
nni_msg *old;
Expand Down Expand Up @@ -401,18 +401,15 @@ sub0_recv_cb(void *arg)
nni_msg_free(msg);
}

while ((aio = nni_list_first(&finish)) != NULL) {
nni_list_remove(&finish, aio);
nni_aio_finish_sync(aio, 0, len);
}
nni_aio_completions_run(&finish);

nni_pipe_recv(p->pipe, &p->aio_recv);
}

static int
sub0_ctx_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t)
{
sub0_ctx * ctx = arg;
sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
int val;
nni_mtx_lock(&sock->lk);
Expand All @@ -425,7 +422,7 @@ sub0_ctx_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t)
static int
sub0_ctx_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t)
{
sub0_ctx * ctx = arg;
sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
int val;
int rv;
Expand Down Expand Up @@ -456,8 +453,8 @@ sub0_ctx_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t)
static int
sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t)
{
sub0_ctx * ctx = arg;
sub0_sock * sock = ctx->sock;
sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
sub0_topic *topic;
sub0_topic *new_topic;
NNI_ARG_UNUSED(t);
Expand Down Expand Up @@ -494,8 +491,8 @@ sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t)
static int
sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t)
{
sub0_ctx * ctx = arg;
sub0_sock * sock = ctx->sock;
sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
sub0_topic *topic;
size_t len;
NNI_ARG_UNUSED(t);
Expand Down Expand Up @@ -540,7 +537,7 @@ sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t)
static int
sub0_ctx_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t)
{
sub0_ctx * ctx = arg;
sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
bool val;

Expand All @@ -554,7 +551,7 @@ sub0_ctx_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t)
static int
sub0_ctx_set_prefer_new(void *arg, const void *buf, size_t sz, nni_type t)
{
sub0_ctx * ctx = arg;
sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
bool val;
int rv;
Expand Down

0 comments on commit 9fda269

Please sign in to comment.