Skip to content

Commit

Permalink
update developer interface
Browse files Browse the repository at this point in the history
- export worker loop implementation, .bpworker_impl
- rename args to try to more clearly convey backend / worker as main arg
- .bpstop_impl stops workers (sends .DONE() semaphore)
  • Loading branch information
mtmorgan committed Jan 22, 2019
1 parent 60f4c68 commit 1eb8b90
Show file tree
Hide file tree
Showing 11 changed files with 134 additions and 109 deletions.
6 changes: 2 additions & 4 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,6 @@ exportMethods(
S3method(print, remote_error)
S3method(print, bplist_error)

S3method(bploop, MPInode)
S3method(bploop, SOCKnode)
S3method(bploop, SOCK0node)
S3method(bploop, lapply)
S3method(bploop, iterate)

Expand All @@ -118,6 +115,7 @@ S3method(bploop, iterate)
export(
.BiocParallelParam_prototype, .prototype_update, .prettyPath,
.send_to, .recv_any, .send, .recv, .close, .send_all, .recv_all,
.bpstart_impl, .bpstop_impl, .bplapply_impl, .bpiterate_impl,
.bpstart_impl, .bpstop_impl, .bpworker_impl,
.bplapply_impl, .bpiterate_impl,
.error_worker_comm
)
67 changes: 34 additions & 33 deletions R/DeveloperInterface.R
Original file line number Diff line number Diff line change
@@ -1,83 +1,84 @@
## .BiocParallelParam_prototype, .prototype_update, .prettyPath,
## .bpstart_impl, .bpstop_impl, .bplapply_impl, .bpiterate_impl,
## .error_worker_comm
##
## see NAMESPACE section for definitive exports
##

## server

setGeneric(
".send_to",
function(cluster, node, value) standardGeneric(".send_to"),
signature = "cluster"
function(backend, node, value) standardGeneric(".send_to"),
signature = "backend"
)

setGeneric(
".recv_any",
function(cluster) standardGeneric(".recv_any"),
signature = "cluster"
function(backend) standardGeneric(".recv_any"),
signature = "backend"
)

setGeneric(
".send_all",
function(cluster, value) standardGeneric(".send_all"),
signature = "cluster"
function(backend, value) standardGeneric(".send_all"),
signature = "backend"
)

setGeneric(
".recv_all",
function(cluster) standardGeneric(".recv_all"),
signature = "cluster"
function(backend) standardGeneric(".recv_all"),
signature = "backend"
)

## client

setGeneric(
".send",
function(cluster, value) standardGeneric(".send"),
signature = "cluster"
function(worker, value) standardGeneric(".send"),
signature = "worker"
)

setGeneric(
".recv",
function(cluster) standardGeneric(".recv"),
signature = "cluster"
function(worker) standardGeneric(".recv"),
signature = "worker"
)

setGeneric(
".close",
function(cluster) standardGeneric(".close"),
signature = "cluster"
function(worker) standardGeneric(".close"),
signature = "worker"
)

## default implementation -- SNOW cluster
## default implementation -- SNOW backend

setMethod(
".send_all", "ANY",
function(cluster, value)
function(backend, value)
{
for (node in seq_along(cluster))
.send_to(cluster, node, value)
for (node in seq_along(backend))
.send_to(backend, node, value)
})

setMethod(
".recv_all", "ANY",
function(cluster)
function(backend)
{
replicate(length(cluster), .recv_any(cluster), simplify=FALSE)
replicate(length(backend), .recv_any(backend), simplify=FALSE)
})

setMethod(
".send_to", "ANY",
function(cluster, node, value)
function(backend, node, value)
{
parallel:::sendData(cluster[[node]], value)
parallel:::sendData(backend[[node]], value)
TRUE
})

setMethod(
".recv_any", "ANY",
function(cluster)
function(backend)
{
tryCatch({
parallel:::recvOneData(cluster)
parallel:::recvOneData(backend)
}, error = function(e) {
## indicate error, but do not stop
.error_worker_comm(e, "'.recv_any()' data failed")
Expand All @@ -86,17 +87,17 @@ setMethod(

setMethod(
".send", "ANY",
function(cluster, value)
function(worker, value)
{
parallel:::sendData(cluster, value)
parallel:::sendData(worker, value)
})

setMethod(
".recv", "ANY",
function(cluster)
function(worker)
{
tryCatch({
parallel:::recvData(cluster)
parallel:::recvData(worker)
}, error = function(e) {
## indicate error, but do not stop
.error_worker_comm(e, "'.recv()' data failed")
Expand All @@ -105,7 +106,7 @@ setMethod(

setMethod(
".close", "ANY",
function(cluster)
function(worker)
{
parallel:::closeNode(cluster)
parallel:::closeNode(worker)
})
14 changes: 5 additions & 9 deletions R/SnowParam-class.R
Original file line number Diff line number Diff line change
Expand Up @@ -261,17 +261,13 @@ setMethod("bpstop", "SnowParam",
if (!bpisup(x))
return(invisible(x))

tryCatch({
res <- capture.output(parallel::stopCluster(bpbackend(x)))
}, error=function(err) {
txt <- sprintf("failed to stop %s cluster: %s",
sQuote(class(bpbackend(x))[[1]]),
conditionMessage(err))
stop(paste(strwrap(txt, exdent=2), collapse="\n"), call.=FALSE)
})
x <- .bpstop_impl(x)
cluster <- bpbackend(x)
for (i in seq_along(cluster))
.close(cluster[[i]])
bpbackend(x) <- .NULLcluster()

.bpstop_impl(x)
x
})

setMethod("bpisup", "SnowParam",
Expand Down
4 changes: 2 additions & 2 deletions R/SnowParam-utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ bprunMPIslave <- function() {
Rmpi::mpi.comm.set.errhandler(comm)
Rmpi::mpi.comm.disconnect(intercomm)

bploop(snow::makeMPImaster(comm))
.bpworker_impl(snow::makeMPImaster(comm))

Rmpi::mpi.comm.disconnect(comm)
Rmpi::mpi.quit()
Expand Down Expand Up @@ -57,7 +57,7 @@ bprunMPIslave <- function() {
}
})
node <- structure(list(con = con), class = "SOCK0node")
bploop(node)
.bpworker_impl(node)
}, detached=TRUE)
}

Expand Down
24 changes: 13 additions & 11 deletions R/bploop.R
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,27 @@
time = time, log = log, sout = sout)
}

.DONE <-
function()
{
list(type = "DONE")
}

### - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
### Worker loop used by SOCK, MPI and FORK. Error handling is done in
### .composeTry.

bploop <- function(manager, ...)
UseMethod("bploop")

.bploop.worker <- function(manager, ...)
.bpworker_impl <- function(worker)
{
repeat {
tryCatch({
msg <- .recv(manager)
msg <- .recv(worker)
if (inherits(msg, "error"))
## FIXME: try to return error to manager
break # lost socket connection?

if (msg$type == "DONE") {
.close(manager)
.close(worker)
break
} else if (msg$type == "EXEC") {
## need local handler for worker read/send errors
Expand All @@ -63,18 +66,14 @@ bploop <- function(manager, ...)
value <- .VALUE(
msg$data$tag, value, success, t2 - t1, log, sout
)
.send(manager, value)
.send(worker, value)
}
}, interrupt = function(e) {
NULL
})
}
}

bploop.MPInode <- .bploop.worker
bploop.SOCKnode <- .bploop.worker
bploop.SOCK0node <- .bploop.worker

### - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
### Manager loop used by SOCK, MPI and FORK

Expand Down Expand Up @@ -183,6 +182,9 @@ bploop.SOCK0node <- .bploop.worker
## bploop.lapply(): derived from snow::dynamicClusterApply.
##

bploop <- function(manager, ...)
UseMethod("bploop")

bploop.lapply <-
function(manager, X, FUN, ARGFUN, BPPARAM, ...)
{
Expand Down
4 changes: 2 additions & 2 deletions R/bpstart-methods.R
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ setMethod("bpstart", "missing",
}

.bpstart_impl <-
function(x, ...)
function(x)
{
## common actions once bpisup(x)
## common actions once bpisup(backend)

## logging
if (bplog(x))
Expand Down
11 changes: 11 additions & 0 deletions R/bpstop-methods.R
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,20 @@ setMethod("bpstop", "missing",
## .bpstop_impl: common functionality after bpisup() is no longer TRUE
##

.bpstop_nodes <-
function(x)
{
cluster <- bpbackend(x)
for (i in seq_along(cluster))
.send_to(cluster, i, .DONE())

TRUE
}

.bpstop_impl <-
function(x)
{
bpisup(x) && .bpstop_nodes(x)
.ClusterManager$drop(x$.uid)
invisible(x)
}
2 changes: 1 addition & 1 deletion inst/RSOCKnode.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ ${RPROG:-R} --vanilla <<EOF > ${OUT:-/dev/null} 2>&1 &
loadNamespace("snow")
options(timeout=getClusterOption("timeout"))
BiocParallel::bploop(snow::makeSOCKmaster())
BiocParallel::.bpworker_impl(snow::makeSOCKmaster())
EOF
2 changes: 1 addition & 1 deletion inst/snow/RSOCKnode.R
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ local({

if (port == "") port <- getClusterOption("port")

BiocParallel::bploop(snow::makeSOCKmaster(master, port))
BiocParallel::.bpworker_impl(snow::makeSOCKmaster(master, port))
})
Loading

0 comments on commit 1eb8b90

Please sign in to comment.