Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes #1313 support deferred nng_aio destruction #1371

Merged
merged 1 commit into from
Dec 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/man/libnng.3.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ The following functions are used in the asynchronous model:
|xref:nng_aio_get_input.3.adoc[nng_aio_get_input()]|return input parameter
|xref:nng_aio_get_msg.3.adoc[nng_aio_get_msg()]|get message from an asynchronous receive
|xref:nng_aio_get_output.3.adoc[nng_aio_get_output()]|return output result
|xref:nng_aio_free.3.adoc[nng_aio_reap()]|reap asynchronous I/O handle
|xref:nng_aio_result.3.adoc[nng_aio_result()]|return result of asynchronous operation
|xref:nng_aio_set_input.3.adoc[nng_aio_set_input()]|set input parameter
|xref:nng_aio_set_iov.3.adoc[nng_aio_set_iov()]|set scatter/gather vector
Expand Down
9 changes: 8 additions & 1 deletion docs/man/nng_aio_free.3.adoc
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
= nng_aio_free(3)
//
// Copyright 2018 Staysail Systems, Inc. <[email protected]>
// Copyright 2020 Staysail Systems, Inc. <[email protected]>
// Copyright 2018 Capitar IT Group BV <[email protected]>
//
// This document is supplied under the terms of the MIT License, a
Expand All @@ -20,6 +20,7 @@ nng_aio_free - free asynchronous I/O handle
#include <nng/nng.h>

void nng_aio_free(nng_aio *aio);
void nng_aio_reap(nng_aio *aio);
----

== DESCRIPTION
Expand All @@ -30,6 +31,12 @@ caller is blocked until the operation is completely canceled, to ensure
that it is safe to deallocate the handle and any associated resources.
(This is done by implicitly calling xref:nng_aio_stop.3.adoc[`nng_aio_stop()`].)

The `nng_aio_reap()` function is the same as `nng_aio_free()`, but does
it's work in a background thread.
This can be useful to discard the _aio_ object from within the callback for the _aio_.

IMPORTANT: Once either of these functions are called, the _aio_ object is invalid and must not be used again.

== RETURN VALUES

None.
Expand Down
7 changes: 7 additions & 0 deletions include/nng/nng.h
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,13 @@ NNG_DECL int nng_aio_alloc(nng_aio **, void (*)(void *), void *);
// It *must not* be in use at the time it is freed.
NNG_DECL void nng_aio_free(nng_aio *);

// nng_aio_reap is like nng_aio_free, but calls it from a background
// reaper thread. This can be useful to free aio objects from aio
// callbacks (e.g. when the result of the callback is to discard
// the object in question.) The aio object must be in further use
// when this is called.
NNG_DECL void nng_aio_reap(nng_aio *);

// nng_aio_stop stops any outstanding operation, and waits for the
// AIO to be free, including for the callback to have completed
// execution. Therefore the caller must NOT hold any locks that
Expand Down
134 changes: 104 additions & 30 deletions src/core/aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,18 @@
static nni_mtx nni_aio_lk;
// These are used for expiration.
static nni_cv nni_aio_expire_cv;
static int nni_aio_expire_run;
static bool nni_aio_expire_exit;
static nni_thr nni_aio_expire_thr;
static nni_list nni_aio_expire_list;
static nni_aio *nni_aio_expire_aio;

// Reaping items.
static nni_thr nni_aio_reap_thr;
static nni_aio *nni_aio_reap_list;
static nni_mtx nni_aio_reap_lk;
static nni_cv nni_aio_reap_cv;
static bool nni_aio_reap_exit;

// Design notes.
//
// AIOs are only ever "completed" by the provider, which must call
Expand Down Expand Up @@ -76,7 +83,7 @@ void
nni_aio_fini(nni_aio *aio)
{
nni_aio_cancel_fn fn;
void * arg;
void * arg;

// TODO: This probably could just use nni_aio_stop.

Expand Down Expand Up @@ -134,6 +141,18 @@ nni_aio_free(nni_aio *aio)
}
}

void
nni_aio_reap(nni_aio *aio)
{
if (aio != NULL) {
nni_mtx_lock(&nni_aio_reap_lk);
aio->a_reap_next = nni_aio_reap_list;
nni_aio_reap_list = aio;
nni_cv_wake1(&nni_aio_reap_cv);
nni_mtx_unlock(&nni_aio_reap_lk);
}
}

int
nni_aio_set_iov(nni_aio *aio, unsigned nio, const nni_iov *iov)
{
Expand Down Expand Up @@ -164,7 +183,7 @@ nni_aio_stop(nni_aio *aio)
{
if (aio != NULL) {
nni_aio_cancel_fn fn;
void * arg;
void * arg;

nni_mtx_lock(&nni_aio_lk);
fn = aio->a_cancel_fn;
Expand All @@ -187,7 +206,7 @@ nni_aio_close(nni_aio *aio)
{
if (aio != NULL) {
nni_aio_cancel_fn fn;
void * arg;
void * arg;

nni_mtx_lock(&nni_aio_lk);
fn = aio->a_cancel_fn;
Expand Down Expand Up @@ -347,7 +366,7 @@ void
nni_aio_abort(nni_aio *aio, int rv)
{
nni_aio_cancel_fn fn;
void * arg;
void * arg;

nni_mtx_lock(&nni_aio_lk);
fn = aio->a_cancel_fn;
Expand Down Expand Up @@ -471,21 +490,21 @@ nni_aio_expire_loop(void *unused)

NNI_ARG_UNUSED(unused);

nni_thr_set_name(NULL, "nng:aio:expire");
nni_thr_set_name(NULL, "nng:aio:expire");

for (;;) {
nni_aio_cancel_fn fn;
nni_time now;
nni_aio * aio;
int rv;
nni_time now;
nni_aio * aio;
int rv;

now = nni_clock();

nni_mtx_lock(&nni_aio_lk);

if ((aio = nni_list_first(list)) == NULL) {

if (nni_aio_expire_run == 0) {
if (nni_aio_expire_exit) {
nni_mtx_unlock(&nni_aio_lk);
return;
}
Expand Down Expand Up @@ -530,6 +549,41 @@ nni_aio_expire_loop(void *unused)
}
}

static void
nni_aio_reap_loop(void *unused)
{
NNI_ARG_UNUSED(unused);

nni_thr_set_name(NULL, "nng:aio:reap");

nni_mtx_lock(&nni_aio_reap_lk);

for (;;) {
nni_aio *aio;

if ((aio = nni_aio_reap_list) == NULL) {
if (nni_aio_reap_exit) {
break;
}

nni_cv_wait(&nni_aio_reap_cv);
continue;
}
nni_aio_reap_list = NULL;
nni_mtx_unlock(&nni_aio_reap_lk);

while (aio != NULL) {
nni_aio *old = aio;
aio = aio->a_reap_next;
nni_aio_free(old);
}

nni_mtx_lock(&nni_aio_reap_lk);
}

nni_mtx_unlock(&nni_aio_reap_lk);
}

void *
nni_aio_get_prov_extra(nni_aio *aio, unsigned index)
{
Expand Down Expand Up @@ -645,40 +699,60 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio)
void
nni_aio_sys_fini(void)
{
nni_mtx *mtx = &nni_aio_lk;
nni_cv * cv = &nni_aio_expire_cv;
nni_thr *thr = &nni_aio_expire_thr;
nni_mtx *mtx1 = &nni_aio_lk;
nni_cv * cv1 = &nni_aio_expire_cv;
nni_thr *thr1 = &nni_aio_expire_thr;
nni_mtx *mtx2 = &nni_aio_reap_lk;
nni_cv * cv2 = &nni_aio_reap_cv;
nni_thr *thr2 = &nni_aio_reap_thr;

if (nni_aio_expire_run) {
nni_mtx_lock(mtx);
nni_aio_expire_run = 0;
nni_cv_wake(cv);
nni_mtx_unlock(mtx);
if (!nni_aio_expire_exit) {
nni_mtx_lock(mtx1);
nni_aio_expire_exit = true;
nni_cv_wake(cv1);
nni_mtx_unlock(mtx1);
}

nni_thr_fini(thr);
nni_cv_fini(cv);
nni_mtx_fini(mtx);
if (!nni_aio_reap_exit) {
nni_mtx_lock(mtx2);
nni_aio_reap_exit = true;
nni_cv_wake(cv2);
nni_mtx_unlock(mtx2);
}

nni_thr_fini(thr1);
nni_cv_fini(cv1);
nni_mtx_fini(mtx1);

nni_thr_fini(thr2);
nni_cv_fini(cv2);
nni_mtx_fini(mtx2);
}

int
nni_aio_sys_init(void)
{
int rv;
nni_mtx *mtx = &nni_aio_lk;
nni_cv * cv = &nni_aio_expire_cv;
nni_thr *thr = &nni_aio_expire_thr;
int rv, rv1, rv2;
nni_thr *thr1 = &nni_aio_expire_thr;
nni_thr *thr2 = &nni_aio_reap_thr;

NNI_LIST_INIT(&nni_aio_expire_list, nni_aio, a_expire_node);
nni_mtx_init(mtx);
nni_cv_init(cv, mtx);
nni_mtx_init(&nni_aio_lk);
nni_cv_init(&nni_aio_expire_cv, &nni_aio_lk);
nni_mtx_init(&nni_aio_reap_lk);
nni_cv_init(&nni_aio_reap_cv, &nni_aio_reap_lk);

nni_aio_expire_exit = false;
nni_aio_reap_exit = false;

if ((rv = nni_thr_init(thr, nni_aio_expire_loop, NULL)) != 0) {
rv1 = nni_thr_init(thr1, nni_aio_expire_loop, NULL);
rv2 = nni_thr_init(thr2, nni_aio_reap_loop, NULL);
if (((rv = rv1) != 0) || ((rv = rv2) != 0)) {
nni_aio_sys_fini();
return (rv);
}

nni_aio_expire_run = 1;
nni_thr_run(thr);
nni_thr_run(thr1);
nni_thr_run(thr2);
return (0);
}
8 changes: 6 additions & 2 deletions src/core/aio.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ extern void nni_aio_init(nni_aio *, nni_cb, void *arg);
// It waits for the callback to complete.
extern void nni_aio_fini(nni_aio *);

// nni_aio_reap is used to asynchronously reap the aio. It can
// be called even from the callback of the aio itself.
extern void nni_aio_reap(nni_aio *);

// nni_aio_alloc allocates an aio object and initializes it. The callback
// is called with the supplied argument when the operation is complete.
// If NULL is supplied for the callback, then nni_aio_wake is used in its
Expand Down Expand Up @@ -195,8 +199,8 @@ struct nng_aio {
nni_list_node a_prov_node; // Linkage on provider list.
void * a_prov_extra[2]; // Extra data used by provider

// Expire node.
nni_list_node a_expire_node;
nni_list_node a_expire_node; // Expiration node
struct nng_aio *a_reap_next;
};

#endif // CORE_AIO_H
21 changes: 19 additions & 2 deletions src/core/aio_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ void
test_sleep(void)
{
nng_time start;
nng_time end = 0;
nng_time end = 0;
nng_aio *aio;

NUTS_PASS(nng_aio_alloc(&aio, sleep_done, &end));
Expand All @@ -55,7 +55,7 @@ void
test_sleep_timeout(void)
{
nng_time start;
nng_time end = 0;
nng_time end = 0;
nng_aio *aio;

NUTS_TRUE(nng_aio_alloc(&aio, sleep_done, &end) == 0);
Expand Down Expand Up @@ -226,6 +226,22 @@ test_zero_timeout(void)
NUTS_PASS(nng_close(s));
}

static void
aio_sleep_cb(void *arg)
{
nng_aio *aio = *(nng_aio **) arg;
nng_aio_reap(aio);
}

void
test_aio_reap(void)
{
nng_aio *a;
NUTS_PASS(nng_aio_alloc(&a, aio_sleep_cb, &a));
nng_sleep_aio(10, a);
nng_msleep(20);
}

NUTS_TESTS = {
{ "sleep", test_sleep },
{ "sleep timeout", test_sleep_timeout },
Expand All @@ -236,5 +252,6 @@ NUTS_TESTS = {
{ "explicit timeout", test_explicit_timeout },
{ "inherited timeout", test_inherited_timeout },
{ "zero timeout", test_zero_timeout },
{ "aio reap", test_aio_reap },
{ NULL, NULL },
};
6 changes: 6 additions & 0 deletions src/nng.c
Original file line number Diff line number Diff line change
Expand Up @@ -1736,6 +1736,12 @@ nng_aio_free(nng_aio *aio)
nni_aio_free(aio);
}

void
nng_aio_reap(nng_aio *aio)
{
nni_aio_reap(aio);
}

void
nng_sleep_aio(nng_duration ms, nng_aio *aio)
{
Expand Down
Loading