Skip to content

Commit 2033988

Browse files
committedDec 12, 2020
fixes nanomsg#1313 support deferred nng_aio destruction
1 parent b45f876 commit 2033988

File tree

8 files changed

+161
-46
lines changed

8 files changed

+161
-46
lines changed
 

‎docs/man/libnng.3.adoc

+1
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ The following functions are used in the asynchronous model:
170170
|xref:nng_aio_get_input.3.adoc[nng_aio_get_input()]|return input parameter
171171
|xref:nng_aio_get_msg.3.adoc[nng_aio_get_msg()]|get message from an asynchronous receive
172172
|xref:nng_aio_get_output.3.adoc[nng_aio_get_output()]|return output result
173+
|xref:nng_aio_free.3.adoc[nng_aio_reap()]|reap asynchronous I/O handle
173174
|xref:nng_aio_result.3.adoc[nng_aio_result()]|return result of asynchronous operation
174175
|xref:nng_aio_set_input.3.adoc[nng_aio_set_input()]|set input parameter
175176
|xref:nng_aio_set_iov.3.adoc[nng_aio_set_iov()]|set scatter/gather vector

‎docs/man/nng_aio_free.3.adoc

+8-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
= nng_aio_free(3)
22
//
3-
// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
3+
// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
44
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
55
//
66
// This document is supplied under the terms of the MIT License, a
@@ -20,6 +20,7 @@ nng_aio_free - free asynchronous I/O handle
2020
#include <nng/nng.h>
2121
2222
void nng_aio_free(nng_aio *aio);
23+
void nng_aio_reap(nng_aio *aio);
2324
----
2425

2526
== DESCRIPTION
@@ -30,6 +31,12 @@ caller is blocked until the operation is completely canceled, to ensure
3031
that it is safe to deallocate the handle and any associated resources.
3132
(This is done by implicitly calling xref:nng_aio_stop.3.adoc[`nng_aio_stop()`].)
3233

34+
The `nng_aio_reap()` function is the same as `nng_aio_free()`, but does
35+
it's work in a background thread.
36+
This can be useful to discard the _aio_ object from within the callback for the _aio_.
37+
38+
IMPORTANT: Once either of these functions are called, the _aio_ object is invalid and must not be used again.
39+
3340
== RETURN VALUES
3441

3542
None.

‎include/nng/nng.h

+7
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,13 @@ NNG_DECL int nng_aio_alloc(nng_aio **, void (*)(void *), void *);
488488
// It *must not* be in use at the time it is freed.
489489
NNG_DECL void nng_aio_free(nng_aio *);
490490

491+
// nng_aio_reap is like nng_aio_free, but calls it from a background
492+
// reaper thread. This can be useful to free aio objects from aio
493+
// callbacks (e.g. when the result of the callback is to discard
494+
// the object in question.) The aio object must be in further use
495+
// when this is called.
496+
NNG_DECL void nng_aio_reap(nng_aio *);
497+
491498
// nng_aio_stop stops any outstanding operation, and waits for the
492499
// AIO to be free, including for the callback to have completed
493500
// execution. Therefore the caller must NOT hold any locks that

‎src/core/aio.c

+104-30
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,18 @@
1414
static nni_mtx nni_aio_lk;
1515
// These are used for expiration.
1616
static nni_cv nni_aio_expire_cv;
17-
static int nni_aio_expire_run;
17+
static bool nni_aio_expire_exit;
1818
static nni_thr nni_aio_expire_thr;
1919
static nni_list nni_aio_expire_list;
2020
static nni_aio *nni_aio_expire_aio;
2121

22+
// Reaping items.
23+
static nni_thr nni_aio_reap_thr;
24+
static nni_aio *nni_aio_reap_list;
25+
static nni_mtx nni_aio_reap_lk;
26+
static nni_cv nni_aio_reap_cv;
27+
static bool nni_aio_reap_exit;
28+
2229
// Design notes.
2330
//
2431
// AIOs are only ever "completed" by the provider, which must call
@@ -76,7 +83,7 @@ void
7683
nni_aio_fini(nni_aio *aio)
7784
{
7885
nni_aio_cancel_fn fn;
79-
void * arg;
86+
void * arg;
8087

8188
// TODO: This probably could just use nni_aio_stop.
8289

@@ -134,6 +141,18 @@ nni_aio_free(nni_aio *aio)
134141
}
135142
}
136143

144+
void
145+
nni_aio_reap(nni_aio *aio)
146+
{
147+
if (aio != NULL) {
148+
nni_mtx_lock(&nni_aio_reap_lk);
149+
aio->a_reap_next = nni_aio_reap_list;
150+
nni_aio_reap_list = aio;
151+
nni_cv_wake1(&nni_aio_reap_cv);
152+
nni_mtx_unlock(&nni_aio_reap_lk);
153+
}
154+
}
155+
137156
int
138157
nni_aio_set_iov(nni_aio *aio, unsigned nio, const nni_iov *iov)
139158
{
@@ -164,7 +183,7 @@ nni_aio_stop(nni_aio *aio)
164183
{
165184
if (aio != NULL) {
166185
nni_aio_cancel_fn fn;
167-
void * arg;
186+
void * arg;
168187

169188
nni_mtx_lock(&nni_aio_lk);
170189
fn = aio->a_cancel_fn;
@@ -187,7 +206,7 @@ nni_aio_close(nni_aio *aio)
187206
{
188207
if (aio != NULL) {
189208
nni_aio_cancel_fn fn;
190-
void * arg;
209+
void * arg;
191210

192211
nni_mtx_lock(&nni_aio_lk);
193212
fn = aio->a_cancel_fn;
@@ -347,7 +366,7 @@ void
347366
nni_aio_abort(nni_aio *aio, int rv)
348367
{
349368
nni_aio_cancel_fn fn;
350-
void * arg;
369+
void * arg;
351370

352371
nni_mtx_lock(&nni_aio_lk);
353372
fn = aio->a_cancel_fn;
@@ -471,21 +490,21 @@ nni_aio_expire_loop(void *unused)
471490

472491
NNI_ARG_UNUSED(unused);
473492

474-
nni_thr_set_name(NULL, "nng:aio:expire");
493+
nni_thr_set_name(NULL, "nng:aio:expire");
475494

476495
for (;;) {
477496
nni_aio_cancel_fn fn;
478-
nni_time now;
479-
nni_aio * aio;
480-
int rv;
497+
nni_time now;
498+
nni_aio * aio;
499+
int rv;
481500

482501
now = nni_clock();
483502

484503
nni_mtx_lock(&nni_aio_lk);
485504

486505
if ((aio = nni_list_first(list)) == NULL) {
487506

488-
if (nni_aio_expire_run == 0) {
507+
if (nni_aio_expire_exit) {
489508
nni_mtx_unlock(&nni_aio_lk);
490509
return;
491510
}
@@ -530,6 +549,41 @@ nni_aio_expire_loop(void *unused)
530549
}
531550
}
532551

552+
static void
553+
nni_aio_reap_loop(void *unused)
554+
{
555+
NNI_ARG_UNUSED(unused);
556+
557+
nni_thr_set_name(NULL, "nng:aio:reap");
558+
559+
nni_mtx_lock(&nni_aio_reap_lk);
560+
561+
for (;;) {
562+
nni_aio *aio;
563+
564+
if ((aio = nni_aio_reap_list) == NULL) {
565+
if (nni_aio_reap_exit) {
566+
break;
567+
}
568+
569+
nni_cv_wait(&nni_aio_reap_cv);
570+
continue;
571+
}
572+
nni_aio_reap_list = NULL;
573+
nni_mtx_unlock(&nni_aio_reap_lk);
574+
575+
while (aio != NULL) {
576+
nni_aio *old = aio;
577+
aio = aio->a_reap_next;
578+
nni_aio_free(old);
579+
}
580+
581+
nni_mtx_lock(&nni_aio_reap_lk);
582+
}
583+
584+
nni_mtx_unlock(&nni_aio_reap_lk);
585+
}
586+
533587
void *
534588
nni_aio_get_prov_extra(nni_aio *aio, unsigned index)
535589
{
@@ -645,40 +699,60 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio)
645699
void
646700
nni_aio_sys_fini(void)
647701
{
648-
nni_mtx *mtx = &nni_aio_lk;
649-
nni_cv * cv = &nni_aio_expire_cv;
650-
nni_thr *thr = &nni_aio_expire_thr;
702+
nni_mtx *mtx1 = &nni_aio_lk;
703+
nni_cv * cv1 = &nni_aio_expire_cv;
704+
nni_thr *thr1 = &nni_aio_expire_thr;
705+
nni_mtx *mtx2 = &nni_aio_reap_lk;
706+
nni_cv * cv2 = &nni_aio_reap_cv;
707+
nni_thr *thr2 = &nni_aio_reap_thr;
651708

652-
if (nni_aio_expire_run) {
653-
nni_mtx_lock(mtx);
654-
nni_aio_expire_run = 0;
655-
nni_cv_wake(cv);
656-
nni_mtx_unlock(mtx);
709+
if (!nni_aio_expire_exit) {
710+
nni_mtx_lock(mtx1);
711+
nni_aio_expire_exit = true;
712+
nni_cv_wake(cv1);
713+
nni_mtx_unlock(mtx1);
657714
}
658715

659-
nni_thr_fini(thr);
660-
nni_cv_fini(cv);
661-
nni_mtx_fini(mtx);
716+
if (!nni_aio_reap_exit) {
717+
nni_mtx_lock(mtx2);
718+
nni_aio_reap_exit = true;
719+
nni_cv_wake(cv2);
720+
nni_mtx_unlock(mtx2);
721+
}
722+
723+
nni_thr_fini(thr1);
724+
nni_cv_fini(cv1);
725+
nni_mtx_fini(mtx1);
726+
727+
nni_thr_fini(thr2);
728+
nni_cv_fini(cv2);
729+
nni_mtx_fini(mtx2);
662730
}
663731

664732
int
665733
nni_aio_sys_init(void)
666734
{
667-
int rv;
668-
nni_mtx *mtx = &nni_aio_lk;
669-
nni_cv * cv = &nni_aio_expire_cv;
670-
nni_thr *thr = &nni_aio_expire_thr;
735+
int rv, rv1, rv2;
736+
nni_thr *thr1 = &nni_aio_expire_thr;
737+
nni_thr *thr2 = &nni_aio_reap_thr;
671738

672739
NNI_LIST_INIT(&nni_aio_expire_list, nni_aio, a_expire_node);
673-
nni_mtx_init(mtx);
674-
nni_cv_init(cv, mtx);
740+
nni_mtx_init(&nni_aio_lk);
741+
nni_cv_init(&nni_aio_expire_cv, &nni_aio_lk);
742+
nni_mtx_init(&nni_aio_reap_lk);
743+
nni_cv_init(&nni_aio_reap_cv, &nni_aio_reap_lk);
744+
745+
nni_aio_expire_exit = false;
746+
nni_aio_reap_exit = false;
675747

676-
if ((rv = nni_thr_init(thr, nni_aio_expire_loop, NULL)) != 0) {
748+
rv1 = nni_thr_init(thr1, nni_aio_expire_loop, NULL);
749+
rv2 = nni_thr_init(thr2, nni_aio_reap_loop, NULL);
750+
if (((rv = rv1) != 0) || ((rv = rv2) != 0)) {
677751
nni_aio_sys_fini();
678752
return (rv);
679753
}
680754

681-
nni_aio_expire_run = 1;
682-
nni_thr_run(thr);
755+
nni_thr_run(thr1);
756+
nni_thr_run(thr2);
683757
return (0);
684758
}

‎src/core/aio.h

+6-2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ extern void nni_aio_init(nni_aio *, nni_cb, void *arg);
2828
// It waits for the callback to complete.
2929
extern void nni_aio_fini(nni_aio *);
3030

31+
// nni_aio_reap is used to asynchronously reap the aio. It can
32+
// be called even from the callback of the aio itself.
33+
extern void nni_aio_reap(nni_aio *);
34+
3135
// nni_aio_alloc allocates an aio object and initializes it. The callback
3236
// is called with the supplied argument when the operation is complete.
3337
// If NULL is supplied for the callback, then nni_aio_wake is used in its
@@ -195,8 +199,8 @@ struct nng_aio {
195199
nni_list_node a_prov_node; // Linkage on provider list.
196200
void * a_prov_extra[2]; // Extra data used by provider
197201

198-
// Expire node.
199-
nni_list_node a_expire_node;
202+
nni_list_node a_expire_node; // Expiration node
203+
struct nng_aio *a_reap_next;
200204
};
201205

202206
#endif // CORE_AIO_H

‎src/core/aio_test.c

+19-2
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ void
3535
test_sleep(void)
3636
{
3737
nng_time start;
38-
nng_time end = 0;
38+
nng_time end = 0;
3939
nng_aio *aio;
4040

4141
NUTS_PASS(nng_aio_alloc(&aio, sleep_done, &end));
@@ -55,7 +55,7 @@ void
5555
test_sleep_timeout(void)
5656
{
5757
nng_time start;
58-
nng_time end = 0;
58+
nng_time end = 0;
5959
nng_aio *aio;
6060

6161
NUTS_TRUE(nng_aio_alloc(&aio, sleep_done, &end) == 0);
@@ -226,6 +226,22 @@ test_zero_timeout(void)
226226
NUTS_PASS(nng_close(s));
227227
}
228228

229+
static void
230+
aio_sleep_cb(void *arg)
231+
{
232+
nng_aio *aio = *(nng_aio **) arg;
233+
nng_aio_reap(aio);
234+
}
235+
236+
void
237+
test_aio_reap(void)
238+
{
239+
nng_aio *a;
240+
NUTS_PASS(nng_aio_alloc(&a, aio_sleep_cb, &a));
241+
nng_sleep_aio(10, a);
242+
nng_msleep(20);
243+
}
244+
229245
NUTS_TESTS = {
230246
{ "sleep", test_sleep },
231247
{ "sleep timeout", test_sleep_timeout },
@@ -236,5 +252,6 @@ NUTS_TESTS = {
236252
{ "explicit timeout", test_explicit_timeout },
237253
{ "inherited timeout", test_inherited_timeout },
238254
{ "zero timeout", test_zero_timeout },
255+
{ "aio reap", test_aio_reap },
239256
{ NULL, NULL },
240257
};

‎src/nng.c

+6
Original file line numberDiff line numberDiff line change
@@ -1736,6 +1736,12 @@ nng_aio_free(nng_aio *aio)
17361736
nni_aio_free(aio);
17371737
}
17381738

1739+
void
1740+
nng_aio_reap(nng_aio *aio)
1741+
{
1742+
nni_aio_reap(aio);
1743+
}
1744+
17391745
void
17401746
nng_sleep_aio(nng_duration ms, nng_aio *aio)
17411747
{

‎src/supplemental/http/http_client.c

+10-11
Original file line numberDiff line numberDiff line change
@@ -224,11 +224,10 @@ typedef struct http_txn {
224224
nni_http_res * res;
225225
nni_http_chunks *chunks;
226226
http_txn_state state;
227-
nni_reap_item reap;
228227
} http_txn;
229228

230229
static void
231-
http_txn_reap(void *arg)
230+
http_txn_fini(void *arg)
232231
{
233232
http_txn *txn = arg;
234233
if (txn->client != NULL) {
@@ -239,7 +238,7 @@ http_txn_reap(void *arg)
239238
}
240239
}
241240
nni_http_chunks_free(txn->chunks);
242-
nni_aio_free(txn->aio);
241+
nni_aio_reap(txn->aio);
243242
NNI_FREE_STRUCT(txn);
244243
}
245244

@@ -270,7 +269,7 @@ http_txn_cb(void *arg)
270269
if ((rv = nni_aio_result(txn->aio)) != 0) {
271270
http_txn_finish_aios(txn, rv);
272271
nni_mtx_unlock(&http_txn_lk);
273-
nni_reap(&txn->reap, http_txn_reap, txn);
272+
http_txn_fini(txn);
274273
return;
275274
}
276275
switch (txn->state) {
@@ -314,7 +313,7 @@ http_txn_cb(void *arg)
314313
// never transfers data), then we are done.
315314
http_txn_finish_aios(txn, 0);
316315
nni_mtx_unlock(&http_txn_lk);
317-
nni_reap(&txn->reap, http_txn_reap, txn);
316+
http_txn_fini(txn);
318317
return;
319318
}
320319

@@ -333,7 +332,7 @@ http_txn_cb(void *arg)
333332
// All done!
334333
http_txn_finish_aios(txn, 0);
335334
nni_mtx_unlock(&http_txn_lk);
336-
nni_reap(&txn->reap, http_txn_reap, txn);
335+
http_txn_fini(txn);
337336
return;
338337

339338
case HTTP_RECVING_CHUNKS:
@@ -352,15 +351,15 @@ http_txn_cb(void *arg)
352351
}
353352
http_txn_finish_aios(txn, 0);
354353
nni_mtx_unlock(&http_txn_lk);
355-
nni_reap(&txn->reap, http_txn_reap, txn);
354+
http_txn_fini(txn);
356355
return;
357356
}
358357

359358
error:
360359
http_txn_finish_aios(txn, rv);
361360
nni_http_conn_close(txn->conn);
362361
nni_mtx_unlock(&http_txn_lk);
363-
nni_reap(&txn->reap, http_txn_reap, txn);
362+
http_txn_fini(txn);
364363
}
365364

366365
static void
@@ -411,7 +410,7 @@ nni_http_transact_conn(
411410
if ((rv = nni_aio_schedule(aio, http_txn_cancel, txn)) != 0) {
412411
nni_mtx_unlock(&http_txn_lk);
413412
nni_aio_finish_error(aio, rv);
414-
nni_reap(&txn->reap, http_txn_reap, txn);
413+
http_txn_fini(txn);
415414
return;
416415
}
417416
nni_http_res_reset(txn->res);
@@ -448,7 +447,7 @@ nni_http_transact(nni_http_client *client, nni_http_req *req,
448447

449448
if ((rv = nni_http_req_set_header(req, "Connection", "close")) != 0) {
450449
nni_aio_finish_error(aio, rv);
451-
nni_reap(&txn->reap, http_txn_reap, txn);
450+
http_txn_fini(txn);
452451
return;
453452
}
454453

@@ -463,7 +462,7 @@ nni_http_transact(nni_http_client *client, nni_http_req *req,
463462
if ((rv = nni_aio_schedule(aio, http_txn_cancel, txn)) != 0) {
464463
nni_mtx_unlock(&http_txn_lk);
465464
nni_aio_finish_error(aio, rv);
466-
nni_reap(&txn->reap, http_txn_reap, txn);
465+
http_txn_fini(txn);
467466
return;
468467
}
469468
nni_http_res_reset(txn->res);

0 commit comments

Comments
 (0)
Please sign in to comment.