diff --git a/src/core/aio.c b/src/core/aio.c index 564e91a30..e849b33dc 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -508,6 +508,39 @@ nni_aio_list_active(nni_aio *aio) return (nni_list_node_active(&aio->a_prov_node)); } +// completions list. +// Implementation note: in order to avoid wasting space, we +// reuse the reap node -- which will be inactive here. +void +nni_aio_completions_init(nni_aio_completions *clp) +{ + *clp = NULL; +} + +void +nni_aio_completions_add(nni_aio_completions *clp, nni_aio *aio, int result, size_t count) +{ + NNI_ASSERT(!nni_aio_list_active(aio)); + aio->a_reap_node.rn_next = *clp; + aio->a_result = result; + aio->a_count = count; + *clp = aio; +} + +void +nni_aio_completions_run(nni_aio_completions *clp) +{ + nni_aio *aio; + nni_aio *cl = *clp; + *clp = NULL; + + while ((aio = cl) != NULL) { + cl = (void *)aio->a_reap_node.rn_next; + aio->a_reap_node.rn_next = NULL; + nni_aio_finish_sync(aio, aio->a_result, aio->a_count); + } +} + static void nni_aio_expire_add(nni_aio *aio) { diff --git a/src/core/aio.h b/src/core/aio.h index 6315e90c9..a2ebf70a9 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -1,5 +1,5 @@ // -// Copyright 2022 Staysail Systems, Inc. +// Copyright 2023 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a @@ -166,6 +166,30 @@ extern int nni_aio_schedule(nni_aio *, nni_aio_cancel_fn, void *); extern void nni_sleep_aio(nni_duration, nni_aio *); +// nni_aio_completion_list is used after removing the aio from an +// active work queue, and keeping them so that the completions can +// be run in a deferred manner. These lists are simple, and intended +// to be used as local variables. It's important to initialize the +// list before using it. Also, any AIO added to a completion list must +// not be in active use anywhere. +typedef void *nni_aio_completions; + +// nni_aio_completions_init just initializes a completions list. +// This just sets the pointed value to NULL. +extern void nni_aio_completions_init(nni_aio_completions *); + +// nni_aio_completions_run runs nni_aio_finish_sync for all the aio objects +// that have been added to the completions. The result code and count used +// are those supplied in nni_aio_completions_add. Callers should not hold +// locks when calling this. +extern void nni_aio_completions_run(nni_aio_completions *); + +// nni_aio_completions_add adds an aio (with the result code and length as +// appropriate) to the completion list. This should be done while the +// appropriate lock is held. The aio must not be scheduled. +extern void nni_aio_completions_add(nni_aio_completions *, nni_aio *, + int, size_t); + extern int nni_aio_sys_init(void); extern void nni_aio_sys_fini(void); diff --git a/src/sp/protocol/pubsub0/sub.c b/src/sp/protocol/pubsub0/sub.c index 10f42724d..e7540deea 100644 --- a/src/sp/protocol/pubsub0/sub.c +++ b/src/sp/protocol/pubsub0/sub.c @@ -1,5 +1,5 @@ // -// Copyright 2021 Staysail Systems, Inc. +// Copyright 2023 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // Copyright 2019 Nathan Kent // @@ -44,14 +44,14 @@ static void sub0_pipe_fini(void *); struct sub0_topic { nni_list_node node; size_t len; - void * buf; + void *buf; }; // sub0_ctx is a context for a SUB socket. The advantage of contexts is // that different contexts can maintain different subscriptions. struct sub0_ctx { nni_list_node node; - sub0_sock * sock; + sub0_sock *sock; nni_list topics; // TODO: Consider patricia trie nni_list recv_queue; // can have multiple pending receives nni_lmq lmq; @@ -71,7 +71,7 @@ struct sub0_sock { // sub0_pipe is our per-pipe protocol private structure. struct sub0_pipe { - nni_pipe * pipe; + nni_pipe *pipe; sub0_sock *sub; nni_aio aio_recv; }; @@ -79,7 +79,7 @@ struct sub0_pipe { static void sub0_ctx_cancel(nng_aio *aio, void *arg, int rv) { - sub0_ctx * ctx = arg; + sub0_ctx *ctx = arg; sub0_sock *sock = ctx->sock; nni_mtx_lock(&sock->lk); if (nni_list_active(&ctx->recv_queue, aio)) { @@ -92,9 +92,9 @@ sub0_ctx_cancel(nng_aio *aio, void *arg, int rv) static void sub0_ctx_recv(void *arg, nni_aio *aio) { - sub0_ctx * ctx = arg; + sub0_ctx *ctx = arg; sub0_sock *sock = ctx->sock; - nni_msg * msg; + nni_msg *msg; if (nni_aio_begin(aio) != 0) { return; @@ -140,9 +140,9 @@ sub0_ctx_send(void *arg, nni_aio *aio) static void sub0_ctx_close(void *arg) { - sub0_ctx * ctx = arg; + sub0_ctx *ctx = arg; sub0_sock *sock = ctx->sock; - nni_aio * aio; + nni_aio *aio; nni_mtx_lock(&sock->lk); while ((aio = nni_list_first(&ctx->recv_queue)) != NULL) { @@ -155,8 +155,8 @@ sub0_ctx_close(void *arg) static void sub0_ctx_fini(void *arg) { - sub0_ctx * ctx = arg; - sub0_sock * sock = ctx->sock; + sub0_ctx *ctx = arg; + sub0_sock *sock = ctx->sock; sub0_topic *topic; sub0_ctx_close(ctx); @@ -179,7 +179,7 @@ static void sub0_ctx_init(void *ctx_arg, void *sock_arg) { sub0_sock *sock = sock_arg; - sub0_ctx * ctx = ctx_arg; + sub0_ctx *ctx = ctx_arg; size_t len; bool prefer_new; @@ -311,22 +311,22 @@ sub0_matches(sub0_ctx *ctx, uint8_t *body, size_t len) static void sub0_recv_cb(void *arg) { - sub0_pipe *p = arg; - sub0_sock *sock = p->sub; - sub0_ctx * ctx; - nni_msg * msg; - size_t len; - uint8_t * body; - nni_list finish; - nng_aio * aio; - nni_msg * dup_msg; + sub0_pipe *p = arg; + sub0_sock *sock = p->sub; + sub0_ctx *ctx; + nni_msg *msg; + size_t len; + uint8_t *body; + nng_aio *aio; + nni_msg *dup_msg; + nni_aio_completions finish; if (nni_aio_result(&p->aio_recv) != 0) { nni_pipe_close(p->pipe); return; } - nni_aio_list_init(&finish); + nni_aio_completions_init(&finish); msg = nni_aio_get_msg(&p->aio_recv); nni_aio_set_msg(&p->aio_recv, NULL); @@ -370,7 +370,7 @@ sub0_recv_cb(void *arg) nni_aio_set_msg(aio, dup_msg); // Save for synchronous completion - nni_list_append(&finish, aio); + nni_aio_completions_add(&finish, aio, 0, len); } else if (nni_lmq_full(&ctx->lmq)) { // Make space for the new message. nni_msg *old; @@ -401,10 +401,7 @@ sub0_recv_cb(void *arg) nni_msg_free(msg); } - while ((aio = nni_list_first(&finish)) != NULL) { - nni_list_remove(&finish, aio); - nni_aio_finish_sync(aio, 0, len); - } + nni_aio_completions_run(&finish); nni_pipe_recv(p->pipe, &p->aio_recv); } @@ -412,7 +409,7 @@ sub0_recv_cb(void *arg) static int sub0_ctx_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t) { - sub0_ctx * ctx = arg; + sub0_ctx *ctx = arg; sub0_sock *sock = ctx->sock; int val; nni_mtx_lock(&sock->lk); @@ -425,7 +422,7 @@ sub0_ctx_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t) static int sub0_ctx_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t) { - sub0_ctx * ctx = arg; + sub0_ctx *ctx = arg; sub0_sock *sock = ctx->sock; int val; int rv; @@ -456,8 +453,8 @@ sub0_ctx_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t) static int sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t) { - sub0_ctx * ctx = arg; - sub0_sock * sock = ctx->sock; + sub0_ctx *ctx = arg; + sub0_sock *sock = ctx->sock; sub0_topic *topic; sub0_topic *new_topic; NNI_ARG_UNUSED(t); @@ -494,8 +491,8 @@ sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t) static int sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t) { - sub0_ctx * ctx = arg; - sub0_sock * sock = ctx->sock; + sub0_ctx *ctx = arg; + sub0_sock *sock = ctx->sock; sub0_topic *topic; size_t len; NNI_ARG_UNUSED(t); @@ -540,7 +537,7 @@ sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t) static int sub0_ctx_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t) { - sub0_ctx * ctx = arg; + sub0_ctx *ctx = arg; sub0_sock *sock = ctx->sock; bool val; @@ -554,7 +551,7 @@ sub0_ctx_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t) static int sub0_ctx_set_prefer_new(void *arg, const void *buf, size_t sz, nni_type t) { - sub0_ctx * ctx = arg; + sub0_ctx *ctx = arg; sub0_sock *sock = ctx->sock; bool val; int rv;