From 57df449dddcdff8c57b1a20a522da9ac95646942 Mon Sep 17 00:00:00 2001 From: Charlie Gao <53399081+shikokuchuo@users.noreply.github.com> Date: Mon, 30 Dec 2024 16:26:47 +0000 Subject: [PATCH] cleanup internal functions (#69) --- NAMESPACE | 2 - R/sync.R | 32 ---- man/dot-dispatcher.Rd | 25 --- man/dot-online.Rd | 19 --- src/init.c | 2 - src/nanonext.h | 16 -- src/thread.c | 356 ------------------------------------------ tests/tests.R | 37 +---- 8 files changed, 5 insertions(+), 484 deletions(-) delete mode 100644 man/dot-dispatcher.Rd delete mode 100644 man/dot-online.Rd diff --git a/NAMESPACE b/NAMESPACE index 595ba039e..896dcbb18 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -46,11 +46,9 @@ export("%~>%") export("opt<-") export(.advance) export(.context) -export(.dispatcher) export(.interrupt) export(.keep) export(.mark) -export(.online) export(.unresolved) export(call_aio) export(call_aio_) diff --git a/R/sync.R b/R/sync.R index 3f331a834..cc92429a3 100644 --- a/R/sync.R +++ b/R/sync.R @@ -303,35 +303,3 @@ unlock <- function(socket) invisible(.Call(rnng_socket_unlock, socket)) #' @export #' `%~>%` <- function(cv, cv2) invisible(.Call(rnng_signal_thread_create, cv, cv2)) - -#' Dispatcher Socket -#' -#' Creates a Dispatcher socket, which is a special type of \sQuote{req} socket, -#' with FIFO scheduling using a threaded implementation. Internal package -#' function. -#' -#' @param host \sQuote{inproc://} url connecting the host to the thread. -#' @param url the URLs at which to listen for rep nodes. -#' @inheritParams listen -#' -#' @return A \sQuote{req} Socket. The thread is attached as an attribute. -#' -#' @keywords internal -#' @export -#' -.dispatcher <- function(host, url, tls = NULL) - .Call(rnng_dispatcher_socket, host, url, tls) - -#' Read Online Status -#' -#' Reads the online status of threaded dispatcher sockets. Internal package -#' function. -#' -#' @param sock a dispatcher Socket. -#' -#' @return An vector of integer values. -#' -#' @keywords internal -#' @export -#' -.online <- function(sock) .Call(rnng_read_online, sock) diff --git a/man/dot-dispatcher.Rd b/man/dot-dispatcher.Rd deleted file mode 100644 index 1d1da73fe..000000000 --- a/man/dot-dispatcher.Rd +++ /dev/null @@ -1,25 +0,0 @@ -% Generated by roxygen2: do not edit by hand -% Please edit documentation in R/sync.R -\name{.dispatcher} -\alias{.dispatcher} -\title{Dispatcher Socket} -\usage{ -.dispatcher(host, url, tls = NULL) -} -\arguments{ -\item{host}{\sQuote{inproc://} url connecting the host to the thread.} - -\item{url}{the URLs at which to listen for rep nodes.} - -\item{tls}{[default NULL] for secure tls+tcp:// or wss:// connections only, -provide a TLS configuration object created by \code{\link{tls_config}}.} -} -\value{ -A \sQuote{req} Socket. The thread is attached as an attribute. -} -\description{ -Creates a Dispatcher socket, which is a special type of \sQuote{req} socket, -with FIFO scheduling using a threaded implementation. Internal package -function. -} -\keyword{internal} diff --git a/man/dot-online.Rd b/man/dot-online.Rd deleted file mode 100644 index 17970ca38..000000000 --- a/man/dot-online.Rd +++ /dev/null @@ -1,19 +0,0 @@ -% Generated by roxygen2: do not edit by hand -% Please edit documentation in R/sync.R -\name{.online} -\alias{.online} -\title{Read Online Status} -\usage{ -.online(sock) -} -\arguments{ -\item{sock}{a dispatcher Socket.} -} -\value{ -An vector of integer values. -} -\description{ -Reads the online status of threaded dispatcher sockets. Internal package -function. -} -\keyword{internal} diff --git a/src/init.c b/src/init.c index 3205e650e..3895bef43 100644 --- a/src/init.c +++ b/src/init.c @@ -149,7 +149,6 @@ static const R_CallMethodDef callMethods[] = { {"rnng_dial", (DL_FUNC) &rnng_dial, 5}, {"rnng_dialer_close", (DL_FUNC) &rnng_dialer_close, 1}, {"rnng_dialer_start", (DL_FUNC) &rnng_dialer_start, 2}, - {"rnng_dispatcher_socket", (DL_FUNC) &rnng_dispatcher_socket, 3}, {"rnng_eval_safe", (DL_FUNC) &rnng_eval_safe, 1}, {"rnng_fini", (DL_FUNC) &rnng_fini, 0}, {"rnng_get_opt", (DL_FUNC) &rnng_get_opt, 2}, @@ -170,7 +169,6 @@ static const R_CallMethodDef callMethods[] = { {"rnng_pipe_notify", (DL_FUNC) &rnng_pipe_notify, 6}, {"rnng_protocol_open", (DL_FUNC) &rnng_protocol_open, 6}, {"rnng_random", (DL_FUNC) &rnng_random, 2}, - {"rnng_read_online", (DL_FUNC) &rnng_read_online, 1}, {"rnng_reap", (DL_FUNC) &rnng_reap, 1}, {"rnng_recv", (DL_FUNC) &rnng_recv, 4}, {"rnng_recv_aio", (DL_FUNC) &rnng_recv_aio, 6}, diff --git a/src/nanonext.h b/src/nanonext.h index e394a6f5b..5cdcf9bf3 100644 --- a/src/nanonext.h +++ b/src/nanonext.h @@ -212,20 +212,6 @@ typedef struct nano_thread_duo_s { nano_cv *cv2; } nano_thread_duo; -typedef struct nano_thread_disp_s { - nng_thread *thr; - nano_cv *cv; - nng_tls_config *tls; - nano_saio **saio; - nano_aio **raio; - nano_aio **haio; - nng_url *up; - const char *host; - char **url; - int *online; - R_xlen_t n; -} nano_thread_disp; - typedef struct nano_signal_s { nano_cv *cv; int *online; @@ -333,7 +319,6 @@ SEXP rnng_cv_wait_safe(SEXP); SEXP rnng_dial(SEXP, SEXP, SEXP, SEXP, SEXP); SEXP rnng_dialer_close(SEXP); SEXP rnng_dialer_start(SEXP, SEXP); -SEXP rnng_dispatcher_socket(SEXP, SEXP, SEXP); SEXP rnng_eval_safe(SEXP); SEXP rnng_fini(void); SEXP rnng_get_opt(SEXP, SEXP); @@ -355,7 +340,6 @@ SEXP rnng_ncurl_transact(SEXP); SEXP rnng_pipe_notify(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); SEXP rnng_protocol_open(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); SEXP rnng_random(SEXP, SEXP); -SEXP rnng_read_online(SEXP); SEXP rnng_reap(SEXP); SEXP rnng_recv(SEXP, SEXP, SEXP, SEXP); SEXP rnng_recv_aio(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); diff --git a/src/thread.c b/src/thread.c index c1a6d6ad8..fca4b1795 100644 --- a/src/thread.c +++ b/src/thread.c @@ -306,42 +306,6 @@ static void thread_duo_finalizer(SEXP xptr) { } -static void thread_disp_finalizer(SEXP xptr) { - - if (NANO_PTR(xptr) == NULL) return; - nano_thread_disp *xp = (nano_thread_disp *) NANO_PTR(xptr); - nano_cv *ncv = xp->cv; - nng_mtx *mtx = ncv->mtx; - nng_cv *cv = ncv->cv; - nng_mtx_lock(mtx); - ncv->condition = -1; - nng_cv_wake(cv); - nng_mtx_unlock(mtx); - if (xp->tls != NULL) - nng_tls_config_free(xp->tls); - nng_thread_destroy(xp->thr); - nng_url_free(xp->up); - for (int i = 0; i < xp->n; i++) { - nng_aio_free(xp->saio[i]->aio); - nng_aio_free(xp->raio[i]->aio); - nng_aio_free(xp->haio[i]->aio); - R_Free(xp->saio[i]); - R_Free(xp->raio[i]); - R_Free(xp->haio[i]); - R_Free(xp->url[i]); - } - R_Free(xp->saio); - R_Free(xp->raio); - R_Free(xp->haio); - R_Free(xp->url); - R_Free(xp->online); - nng_cv_free(ncv->cv); - nng_mtx_free(ncv->mtx); - R_Free(xp->cv); - R_Free(xp); - -} - static void rnng_wait_thread(void *args) { while (1) { @@ -482,21 +446,6 @@ SEXP rnng_thread_shutdown(void) { return R_NilValue; } -static void nano_record_pipe(nng_pipe p, nng_pipe_ev ev, void *arg) { - - nano_signal *signal = (nano_signal *) arg; - const int incr = ev == NNG_PIPE_EV_ADD_POST; - nano_cv *ncv = signal->cv; - nng_mtx *mtx = ncv->mtx; - nng_cv *cv = ncv->cv; - nng_mtx_lock(mtx); - incr ? (*signal->online)++ : (*signal->online)--; - ncv->condition++; - nng_cv_wake(cv); - nng_mtx_unlock(mtx); - -} - static void rnng_signal_thread(void *args) { nano_thread_duo *duo = (nano_thread_duo *) args; @@ -583,308 +532,3 @@ SEXP rnng_signal_thread_create(SEXP cv, SEXP cv2) { return cv2; } - -static void rnng_dispatch_thread(void *args) { - - nano_thread_disp *disp = (nano_thread_disp *) args; - - nano_cv *ncv = disp->cv; - nng_mtx *mtx = ncv->mtx; - nng_cv *cv = ncv->cv; - const R_xlen_t n = disp->n; - int *online = disp->online; - char **url = disp->url; - - int xc, end = 0; - nng_socket hsock; - nng_dialer hdial; - nng_socket sock[n]; - nng_ctx ctx[n]; - nng_ctx rctx[n]; - nano_saio **saio = disp->saio; - nano_aio **raio = disp->raio; - nano_aio **haio = disp->haio; - nng_listener list[n]; - int store[n]; - memset(store, 1, sizeof(store)); - int busy[n]; - memset(busy, 0, sizeof(busy)); - nano_signal signal[n]; - int active[n]; - - nng_msg *msg; - unsigned char *buf; - - unsigned char errnt[] = { - 0x42, 0x0a, 0x03, 0x00, 0x00, 0x00, 0x01, 0x04, 0x04, 0x00, 0x00, 0x05, - 0x03, 0x00, 0x05, 0x00, 0x00, 0x00, 0x55, 0x54, 0x46, 0x2d, 0x38, 0x0d, - 0x03, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x13, 0x00, 0x00, 0x00, 0x02, - 0x04, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x09, 0x00, 0x04, 0x00, 0x05, - 0x00, 0x00, 0x00, 0x63, 0x6c, 0x61, 0x73, 0x73, 0x10, 0x00, 0x00, 0x00, - 0x02, 0x00, 0x00, 0x00, 0x09, 0x00, 0x04, 0x00, 0x0a, 0x00, 0x00, 0x00, - 0x65, 0x72, 0x72, 0x6f, 0x72, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x09, 0x00, - 0x04, 0x00, 0x09, 0x00, 0x00, 0x00, 0x74, 0x72, 0x79, 0x2d, 0x65, 0x72, - 0x72, 0x6f, 0x72, 0xfe, 0x00, 0x00, 0x00 - }; - - for (R_xlen_t i = 0; i < n; i++) { - - signal[i].cv = ncv; - signal[i].online = &online[i]; - if (nng_req0_open(&sock[i]) || - nng_socket_set_ms(sock[i], "req:resend-time", 0) || - nng_pipe_notify(sock[i], NNG_PIPE_EV_ADD_POST, nano_record_pipe, &signal[i]) || - nng_pipe_notify(sock[i], NNG_PIPE_EV_REM_POST, nano_record_pipe, &signal[i])) - goto exitlevel1; - - if (disp->tls != NULL) { - if (nng_listener_create(&list[i], sock[i], url[i]) || - nng_tls_config_server_name(disp->tls, disp->up->u_hostname) || - nng_listener_set_ptr(list[i], NNG_OPT_TLS_CONFIG, disp->tls) || - nng_listener_start(list[i], 0)) - goto exitlevel1; - } else { - if (nng_listen(sock[i], url[i], &list[i], 0)) - goto exitlevel1; - } - - raio[i]->next = ncv; - raio[i]->result = 0; - haio[i]->next = ncv; - haio[i]->result = 0; - if (nng_aio_alloc(&saio[i]->aio, sendaio_complete, saio[i]) || - nng_aio_alloc(&raio[i]->aio, raio_complete_signal, raio[i]) || - nng_aio_alloc(&haio[i]->aio, raio_complete_signal, haio[i])) - goto exitlevel1; - } - - if (nng_rep0_open(&hsock)) - goto exitlevel1; - - if (nng_dial(hsock, disp->host, &hdial, 0)) - goto exitlevel2; - - for (R_xlen_t i = 0; i < n; i++) { - nng_mtx_lock(mtx); - while (ncv->condition == 0) - nng_cv_wait(cv); - if (ncv->condition < 0) { - nng_mtx_unlock(mtx); - goto exitlevel2; - } - ncv->condition--; - nng_mtx_unlock(mtx); - } - - for (R_xlen_t i = 0; i < n; i++) { - if (nng_ctx_open(&rctx[i], hsock)) - goto exitlevel2; - nng_ctx_recv(rctx[i], haio[i]->aio); - } - - while (1) { - - nng_mtx_lock(mtx); - while (ncv->condition == 0) - nng_cv_wait(cv); - if (ncv->condition < 0) { - nng_mtx_unlock(mtx); - goto exitlevel2; - } - ncv->condition--; - memcpy(active, online, n * sizeof(int)); - nng_mtx_unlock(mtx); - - for (R_xlen_t i = 0; i < n; i++) { - if (active[i] > store[i]) { - if (nng_ctx_open(&rctx[i], hsock)) - goto exitlevel2; - nng_ctx_recv(rctx[i], haio[i]->aio); - } - } - memcpy(store, active, n * sizeof(int)); - - for (R_xlen_t i = 0; i < n; i++) { - - if (busy[i]) { - nng_mtx_lock(mtx); - xc = raio[i]->result; - nng_mtx_unlock(mtx); - if (xc) { - raio[i]->result = 0; - if (xc < 0) { - buf = nng_msg_body((nng_msg *) raio[i]->data); - if (buf[3] == 0x1) { - if (nng_msg_alloc(&msg, 0)) - goto exitlevel2; - if (nng_ctx_sendmsg(ctx[i], msg, 0)) - nng_msg_free(msg); - end = 1; - } - if (nng_ctx_sendmsg(rctx[i], (nng_msg *) raio[i]->data, 0)) - nng_msg_free((nng_msg *) raio[i]->data); - } else { - if (nng_msg_alloc(&msg, 0) || - (xc == 19 ? nng_msg_append(msg, errnt, sizeof(errnt)) : - nng_msg_append(msg, &xc, sizeof(int)))) - goto exitlevel2; - if (nng_ctx_sendmsg(rctx[i], msg, 0)) - nng_msg_free(msg); - end = 1; - } - nng_ctx_close(ctx[i]); - nng_ctx_close(rctx[i]); - busy[i] = 0; - if (end) { - end = 0; - } else { - if (nng_ctx_open(&rctx[i], hsock)) - goto exitlevel2; - nng_ctx_recv(rctx[i], haio[i]->aio); - } - break; - } - } - - if (active[i] && !busy[i]) { - nng_mtx_lock(mtx); - xc = haio[i]->result; - nng_mtx_unlock(mtx); - if (xc) { - haio[i]->result = 0; - if (xc < 0) { - busy[i] = 1; - if (nng_ctx_open(&ctx[i], sock[i])) - goto exitlevel2; - nng_aio_set_msg(saio[i]->aio, (nng_msg *) haio[i]->data); - nng_ctx_send(ctx[i], saio[i]->aio); - nng_ctx_recv(ctx[i], raio[i]->aio); - } else { - // exit if reaches here - goto exitlevel2; - } - break; - } - } - - } - - } - - exitlevel2: - nng_close(hsock); - exitlevel1: - for (R_xlen_t i = 0; i < n; i++) - nng_close(sock[i]); - -} - -SEXP rnng_dispatcher_socket(SEXP host, SEXP url, SEXP tls) { - - const int sec = tls != R_NilValue; - const R_xlen_t nd = XLENGTH(url); - - if (sec && NANO_TAG(tls) != nano_TlsSymbol) - Rf_error("'tls' is not a valid TLS Configuration"); - - int xc; - SEXP xptr, sock, list; - - nano_cv *ncv = R_Calloc(1, nano_cv); - if ((xc = nng_mtx_alloc(&ncv->mtx))) - goto exitlevel1; - - if ((xc = nng_cv_alloc(&ncv->cv, ncv->mtx))) - goto exitlevel2; - - nano_thread_disp *disp = R_Calloc(1, nano_thread_disp); - disp->cv = ncv; - disp->n = nd; - disp->tls = sec ? (nng_tls_config *) NANO_PTR(tls) : NULL; - if (sec) nng_tls_config_hold(disp->tls); - disp->saio = R_Calloc(nd, nano_saio *); - disp->raio = R_Calloc(nd, nano_aio *); - disp->haio = R_Calloc(nd, nano_aio *); - disp->host = CHAR(STRING_ELT(host, 0)); - disp->online = R_Calloc(nd, int); - disp->url = R_Calloc(nd, char *); - for (R_xlen_t i = 0; i < nd; i++) { - disp->saio[i] = R_Calloc(1, nano_saio); - disp->raio[i] = R_Calloc(1, nano_aio); - disp->haio[i] = R_Calloc(1, nano_aio); - const char *up = CHAR(STRING_ELT(url, i)); - size_t slen = strlen(up); - disp->url[i] = R_Calloc(slen + 1, char); - memcpy(disp->url[i], up, slen); - } - nng_socket *hsock = R_Calloc(1, nng_socket); - nng_listener *hl = R_Calloc(1, nng_listener); - - if (nng_url_parse(&disp->up, disp->url[0])) - goto exitlevel3; - - if ((xc = nng_req0_open(hsock))) - goto exitlevel4; - - if ((xc = nng_socket_set_ms(*hsock, "req:resend-time", 0)) || - (xc = nng_listen(*hsock, disp->host, hl, 0)) || - (xc = nng_thread_create(&disp->thr, rnng_dispatch_thread, disp))) - goto exitlevel5; - - PROTECT(sock = R_MakeExternalPtr(hsock, nano_SocketSymbol, R_NilValue)); - R_RegisterCFinalizerEx(sock, socket_finalizer, TRUE); - - xptr = R_MakeExternalPtr(disp, nano_SocketSymbol, R_NilValue); - Rf_setAttrib(sock, R_MissingArg, xptr); - R_RegisterCFinalizerEx(xptr, thread_disp_finalizer, TRUE); - - list = R_MakeExternalPtr(hl, nano_ListenerSymbol, R_NilValue); - Rf_setAttrib(sock, nano_ListenerSymbol, list); - R_RegisterCFinalizerEx(list, listener_finalizer, TRUE); - - UNPROTECT(1); - return sock; - - exitlevel5: - nng_close(*hsock); - exitlevel4: - nng_url_free(disp->up); - exitlevel3: - R_Free(hl); - R_Free(hsock); - for (R_xlen_t i = 0; i < nd; i++) { - R_Free(disp->haio[i]); - R_Free(disp->raio[i]); - R_Free(disp->saio[i]); - R_Free(disp->url[i]); - } - R_Free(disp->haio); - R_Free(disp->raio); - R_Free(disp->saio); - R_Free(disp->url); - R_Free(disp->online); - if (sec) nng_tls_config_free(disp->tls); - R_Free(disp); - nng_cv_free(ncv->cv); - exitlevel2: - nng_mtx_free(ncv->mtx); - R_Free(ncv); - exitlevel1: - ERROR_OUT(xc); - -} - -SEXP rnng_read_online(SEXP sock) { - - SEXP xptr = Rf_getAttrib(sock, R_MissingArg); - if (NANO_TAG(xptr) != nano_SocketSymbol) - return R_NilValue; - - nano_thread_disp *disp = (nano_thread_disp *) NANO_PTR(xptr); - const int n = disp->n; - SEXP out = Rf_allocVector(INTSXP, n); - memcpy(NANO_DATAPTR(out), disp->online, n * sizeof(int)); - - return out; - -} diff --git a/tests/tests.R b/tests/tests.R index a80cafaf0..60998ab97 100644 --- a/tests/tests.R +++ b/tests/tests.R @@ -208,7 +208,9 @@ test_error(req$opt("false", list()), "type") test_class("nanoContext", ctx <- context(rep)) test_print(ctx) +test_true(.mark()) test_class("sendAio", csaio <- req$send_aio(data.frame(), mode = "seria", timeout = 500)) +test_true(!.mark(FALSE)) test_zero(call_aio_(csaio)$result) test_class("recvAio", craio <- recv_aio(ctx, timeout = 500)) test_type("list", collect_aio(craio)) @@ -530,38 +532,6 @@ test_zero(close(s1)) test_zero(close(s2)) test_zero(close(s3)) -test_type("externalptr", cv <- cv()) -test_error(.dispatcher(host = "inproc://hostdisp", url = "inproc://disp/1", tls = ""), "not a valid TLS Configuration") -test_type("externalptr", disp <- .dispatcher(host = "inproc://hostdisp", url = "inproc://disp/1", tls = NULL)) -test_zero(.online(disp)) -test_null(.online("a")) -test_class("nanoSocket", s <- socket(protocol = "rep")) -test_zero(pipe_notify(s, cv, add = TRUE)) -test_zero(dial(s, url = "inproc://disp/1")) -test_true(wait(cv)) -test_zero(send(disp, NULL, block = 500L)) -test_null(recv(s, block = 500L)) -test_zero(send(s, TRUE, block = 500L)) -test_true(recv(disp, block = 500L)) -test_zero(send(disp, NULL, block = 500L)) -test_null(recv(s, block = 500L)) -test_zero(reap(s)) -Sys.sleep(0.1) -test_class("nanoSocket", s <- socket(protocol = "rep")) -test_zero(pipe_notify(s, cv, add = TRUE)) -test_zero(dial(s, url = "inproc://disp/1")) -test_true(wait(cv)) -test_zero(send(disp, TRUE, block = 500L)) -test_true(recv(s, block = 500L)) -test_true(.mark()) -test_zero(send(s, NULL, block = 500L)) -test_true(!.mark(FALSE)) -test_null(recv(disp, block = 500L)) -test_zero(reap(s)) -rm(disp) -test_true(.interrupt()) -test_true(!.interrupt(FALSE)) - test_equal(nanonext:::.DollarNames.ncurlAio(NULL, "sta"), "status") test_equal(nanonext:::.DollarNames.recvAio(NULL, "dat"), "data") test_equal(nanonext:::.DollarNames.sendAio(NULL, "r"), "result") @@ -667,6 +637,9 @@ if (promises) later::run_now() if (promises) test_zero(close(s1)) if (promises) test_zero(close(s)) if (promises) later::run_now() + +test_true(.interrupt()) +test_true(!.interrupt(FALSE)) test_true(!identical(get0(".Random.seed"), {.advance(); .Random.seed})) test_type("integer", .Call(nanonext:::rnng_traverse_precious)) if (Sys.info()[["sysname"]] == "Linux") {