Skip to content

Commit

Permalink
add DeveloperInterface.R / .Rd for extending BPPARAM
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmorgan committed Jan 5, 2019
1 parent e475962 commit 60f4c68
Show file tree
Hide file tree
Showing 8 changed files with 254 additions and 79 deletions.
8 changes: 4 additions & 4 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ Suggests: BiocGenerics, tools, foreach, BatchJobs, BBmisc, doParallel,
TxDb.Hsapiens.UCSC.hg19.knownGene, VariantAnnotation, Rsamtools,
GenomicAlignments, ShortRead, codetools, RUnit, BiocStyle, knitr,
batchtools, data.table
Collate: AllGenerics.R prototype.R
Collate: AllGenerics.R DeveloperInterface.R prototype.R
bploop.R ErrorHandling.R log.R
bpbackend-methods.R bpisup-methods.R bplapply-methods.R
bpiterate-methods.R BiocParallelParam-class.R
bpmapply-methods.R bpschedule-methods.R
bpstart-methods.R bpstop-methods.R bpvec-methods.R
bpiterate-methods.R bpstart-methods.R bpstop-methods.R
BiocParallelParam-class.R
bpmapply-methods.R bpschedule-methods.R bpvec-methods.R
bpvectorize-methods.R bpworkers-methods.R bpaggregate-methods.R
bpvalidate.R SnowParam-class.R MulticoreParam-class.R register.R
SerialParam-class.R DoparParam-class.R SnowParam-utils.R
Expand Down
11 changes: 11 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,14 @@ S3method(bploop, SOCKnode)
S3method(bploop, SOCK0node)
S3method(bploop, lapply)
S3method(bploop, iterate)

### - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
### Export 'developer' API for extending BiocParallelParam
###

export(
.BiocParallelParam_prototype, .prototype_update, .prettyPath,
.send_to, .recv_any, .send, .recv, .close, .send_all, .recv_all,
.bpstart_impl, .bpstop_impl, .bplapply_impl, .bpiterate_impl,
.error_worker_comm
)
4 changes: 4 additions & 0 deletions R/BiocParallelParam-class.R
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,10 @@ setReplaceMethod("bpRNGseed", c("BiocParallelParam", "numeric"),
### Methods - evaluation
###

setMethod("bpstart", "BiocParallelParam", .bpstart_impl)

setMethod("bpstop", "BiocParallelParam", .bpstop_impl)

setMethod("bplapply", c("ANY", "BiocParallelParam"), .bplapply_impl)

setMethod("bpiterate", c("ANY", "ANY", "BiocParallelParam"), .bpiterate_impl)
Expand Down
111 changes: 111 additions & 0 deletions R/DeveloperInterface.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
## .BiocParallelParam_prototype, .prototype_update, .prettyPath,
## .bpstart_impl, .bpstop_impl, .bplapply_impl, .bpiterate_impl,
## .error_worker_comm

## server
setGeneric(
".send_to",
function(cluster, node, value) standardGeneric(".send_to"),
signature = "cluster"
)

setGeneric(
".recv_any",
function(cluster) standardGeneric(".recv_any"),
signature = "cluster"
)

setGeneric(
".send_all",
function(cluster, value) standardGeneric(".send_all"),
signature = "cluster"
)

setGeneric(
".recv_all",
function(cluster) standardGeneric(".recv_all"),
signature = "cluster"
)

## client

setGeneric(
".send",
function(cluster, value) standardGeneric(".send"),
signature = "cluster"
)

setGeneric(
".recv",
function(cluster) standardGeneric(".recv"),
signature = "cluster"
)

setGeneric(
".close",
function(cluster) standardGeneric(".close"),
signature = "cluster"
)

## default implementation -- SNOW cluster

setMethod(
".send_all", "ANY",
function(cluster, value)
{
for (node in seq_along(cluster))
.send_to(cluster, node, value)
})

setMethod(
".recv_all", "ANY",
function(cluster)
{
replicate(length(cluster), .recv_any(cluster), simplify=FALSE)
})

setMethod(
".send_to", "ANY",
function(cluster, node, value)
{
parallel:::sendData(cluster[[node]], value)
TRUE
})

setMethod(
".recv_any", "ANY",
function(cluster)
{
tryCatch({
parallel:::recvOneData(cluster)
}, error = function(e) {
## indicate error, but do not stop
.error_worker_comm(e, "'.recv_any()' data failed")
})
})

setMethod(
".send", "ANY",
function(cluster, value)
{
parallel:::sendData(cluster, value)
})

setMethod(
".recv", "ANY",
function(cluster)
{
tryCatch({
parallel:::recvData(cluster)
}, error = function(e) {
## indicate error, but do not stop
.error_worker_comm(e, "'.recv()' data failed")
})
})

setMethod(
".close", "ANY",
function(cluster)
{
parallel:::closeNode(cluster)
})
1 change: 1 addition & 0 deletions R/SnowParam-class.R
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ setMethod("bpstop", "SnowParam",
stop(paste(strwrap(txt, exdent=2), collapse="\n"), call.=FALSE)
})
bpbackend(x) <- .NULLcluster()

.bpstop_impl(x)
})

Expand Down
79 changes: 4 additions & 75 deletions R/bploop.R
Original file line number Diff line number Diff line change
Expand Up @@ -6,77 +6,6 @@
### Derived from snow version 0.3-13 by Luke Tierney
### Derived from parallel version 2.16.0 by R Core Team

## server

.send_to <- function(cluster, node, ...)
UseMethod(".send_to")

.recv_any <- function(cluster, ...)
UseMethod(".recv_any")

.send_all <-
function(cluster, value)
{
for (node in seq_along(cluster))
.send_to(cluster, node, value)
}

.recv_all <-
function(cluster)
{
replicate(length(cluster), .recv_any(cluster), simplify=FALSE)
}

## client

.send <- function(cluster, ...)
UseMethod(".send")

.recv <- function(cluster, ...)
UseMethod(".recv")

.close <- function(cluster, node)
UseMethod(".close")

## default implementation -- SNOW cluster
.send_to.default <-
function(cluster, node, data)
{
parallel:::sendData(cluster[[node]], data)
TRUE
}

.recv_any.default <-
function(cluster, id)
{
tryCatch({
parallel:::recvOneData(cluster)
}, error = function(e) {
## indicate error, but do not stop
.error_worker_comm(e, sprintf("'%s' receive data failed", id))
})
}

.send.default <-
function(cluster, data)
{
parallel:::sendData(cluster, data)
}

.recv.default <-
function(cluster, id)
{
tryCatch({
parallel:::recvData(cluster)
}, error = function(e) {
## indicate error, but do not stop
.error_worker_comm(e, sprintf("'%s' receive data failed", id))
})
}

.close.default <- function(cluster)
parallel:::closeNode(cluster)

.EXEC <-
function(tag, fun, args)
{
Expand All @@ -101,7 +30,7 @@ bploop <- function(manager, ...)
{
repeat {
tryCatch({
msg <- .recv(manager, "worker")
msg <- .recv(manager)
if (inherits(msg, "error"))
## FIXME: try to return error to manager
break # lost socket connection?
Expand Down Expand Up @@ -161,7 +90,7 @@ bploop.SOCK0node <- .bploop.worker
setTimeLimit(30, 30, TRUE)
on.exit(setTimeLimit(Inf, Inf, FALSE))
while (any(running)) {
d <- .recv_any(cl, "clear_cluster")
d <- .recv_any(cl)
if (!is.null(result))
result[[d$value$tag]] <- d$value$value
running[d$node] <- FALSE
Expand Down Expand Up @@ -277,7 +206,7 @@ bploop.lapply <-

for (i in seq_len(n)) {
## collect
d <- .recv_any(cl, "bplapply")
d <- .recv_any(cl)

value <- d$value$value
njob <- d$value$tag
Expand Down Expand Up @@ -350,7 +279,7 @@ bploop.iterate <-
break

## collect
d <- .recv_any(cl, "bpiterate")
d <- .recv_any(cl)
progress$step()

value <- d$value$value
Expand Down
2 changes: 2 additions & 0 deletions man/BiocParallelParam-class.Rd
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
\alias{bpstart}
\alias{bpstart,ANY-method}
\alias{bpstart,missing-method}
\alias{bpstart,BiocParallelParam-method}
\alias{bpstop}
\alias{bpstop,ANY-method}
\alias{bpstop,missing-method}
\alias{bpstop,BiocParallelParam-method}
\alias{bpnworkers}
\alias{bpworkers}
\alias{bpworkers<-}
Expand Down
Loading

0 comments on commit 60f4c68

Please sign in to comment.