From 438cfa3e0e1b3094f7a402b21012693e678f2b0c Mon Sep 17 00:00:00 2001 From: Jay Qi Date: Wed, 29 Aug 2018 18:41:56 -0500 Subject: [PATCH 1/2] Intermediate output cache clearing --- R/execution.R | 12 +++++++++++- R/workflow_dag.R | 10 ++++++++++ R/workflow_interface.R | 5 +++++ tests/testthat/test_workflow_dag.R | 6 +++++- 4 files changed, 31 insertions(+), 2 deletions(-) diff --git a/R/execution.R b/R/execution.R index 41eda6e..3d33f6c 100644 --- a/R/execution.R +++ b/R/execution.R @@ -109,7 +109,17 @@ Execute <- function(workflow } } - #TODO: clear cached output in modules when not needed anymore + if (clearCache){ + # Clear cached output in any upstream modules where all of its downstream + # dependencies are complete + for (upstreamModule in upstreamModules) { + if (workflow$hasCompletedAllDownstreamModules(upstreamModule)) { + upstreamModule$clearOutputCache() + } + } + } + + } modulesToExecute <- modulesToExecute[as.logical(lapply(modulesToExecute, is.Module))] # clears out NaNs when modules moved from modulesToExecute to modulesToExecute } diff --git a/R/workflow_dag.R b/R/workflow_dag.R index 1f67928..d959206 100644 --- a/R/workflow_dag.R +++ b/R/workflow_dag.R @@ -356,6 +356,16 @@ DAGWorkflow <- R6::R6Class("DAGWorkflow" , "out")) } + , hasCompletedAllDownstreamModules = function(module) { + downstreamModules <- self$getDownstreamModules(module) + downstreamCompletedVec <- vapply( + downstreamModules + , function(m) {m$hasCompleted()} + , logical(1) + ) + return(all(downstreamCompletedVec)) + } + , getEndingModules = function() { endingModules <- list() diff --git a/R/workflow_interface.R b/R/workflow_interface.R index 818f6f9..64f8f0d 100644 --- a/R/workflow_interface.R +++ b/R/workflow_interface.R @@ -15,6 +15,7 @@ #' \item{\code{getStartingModules()}}{Gets a list of modules that are the starting modules of a workflow.} #' \item{\code{getDownstreamModules(module)}}{Gets modules downstream of \code{module} in a workflow.} #' \item{\code{getUpstreamModules(module)}}{Gets modules upstream of \code{module} in a workflow.} +#' \item{\code{hasCompletedAllDownstreamModules(module)}}{Indicates if all modules downstream of \code{module} in a workflow have completed.} #' \item{\code{initFromFile(filename)}}{Initializes this workflow from a save state stored in \code{filename}.} #' \item{\code{removeConnection(connection)}}{Removes \code{connection}, an implementation instance of \code{ConnectionInterface}, to this workflow.} #' \item{\code{removeModule(module)}}{Adds \code{module}, an implementation instance of \code{ModuleInterface}, to this workflow.} @@ -66,6 +67,10 @@ WorkflowInterface <- R6::R6Class( "WorkflowInterface" UpDraftSettings$errorLogger("getUpstreamModules not implemented in ", class(self)[1]) } + , hasCompletedAllDownstreamModules = function(module) { + UpDraftSettings$errorLogger("hasCompletedAllDownstreamModules not implemented in ", class(self)[1]) + } + , removeConnection = function(connection) { UpDraftSettings$errorLogger("removeConnection not implemented in ", class(self)[1]) } diff --git a/tests/testthat/test_workflow_dag.R b/tests/testthat/test_workflow_dag.R index b6016df..e3b7ac1 100644 --- a/tests/testthat/test_workflow_dag.R +++ b/tests/testthat/test_workflow_dag.R @@ -77,6 +77,10 @@ test_that("Testing DAGWorkflow Class constructor and obj methods run to completi workflow$getUpstreamModules('mod2') TRUE }) + expect_true({ + workflow$hasCompletedAllDownstreamModules('mod1') + TRUE + }) expect_true({ workflow$getConnections('mod1', 'mod2') TRUE @@ -122,7 +126,7 @@ test_that("Testing DAGWorkflow Class Static initFromFile Method", { }) }) -test_that("Testing DAGWorkflow obj methods error when appriopriate", { +test_that("Testing DAGWorkflow obj methods error when appropriate", { workflow <- DAGWorkflow$new() connection <- DirectedConnection$new("conn", "mod1", "mod2", '...') mod1 <- PackageFunctionModule$new("mod1", "paste") From 7161039afcd1b2810651e76b118de017c2002056 Mon Sep 17 00:00:00 2001 From: Jay Qi Date: Wed, 29 Aug 2018 18:49:33 -0500 Subject: [PATCH 2/2] Updated Roxygen --- DESCRIPTION | 2 +- R/workflow_dag.R | 159 ++++++++++++++++++++------------------- man/DAGWorkflow.Rd | 7 ++ man/LogStrackTrace.Rd | 4 +- man/WorkflowInterface.Rd | 1 + 5 files changed, 94 insertions(+), 79 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index e807f65..cdaf624 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -27,4 +27,4 @@ Suggests: VignetteBuilder: knitr License: file LICENSE LazyData: NA -RoxygenNote: 6.0.1 +RoxygenNote: 6.1.0 diff --git a/R/workflow_dag.R b/R/workflow_dag.R index d959206..9a4cdfa 100644 --- a/R/workflow_dag.R +++ b/R/workflow_dag.R @@ -65,6 +65,13 @@ #' \item{\bold{Returns}: list of downstream modules} #' } #' } +#' \item{\code{hasCompletedAllDownstreamModules(module)}}{ +#' \itemize{ +#' \item{Checks the completion of all downstream modules of \code{module}.} +#' \item{\bold{\code{module}}: valid implementation \code{ModuleInterface} obj or name of module that is present in a workflow.} +#' \item{\bold{Returns}: logical indicating all downstream modules are complete} +#' } +#' } #' \item{\code{getEndingModules()}}{ #' \itemize{ #' \item{Gets a list of ending modules in a workflow.} @@ -181,32 +188,32 @@ #' @export DAGWorkflow <- R6::R6Class("DAGWorkflow" , inherit = WorkflowInterface - + , public = list( initialize = function(name = "") { private$name <- name private$modules <- list() private$connections <- list() private$graph <- igraph::graph.empty() - + self$errorCheck() } - + , addConnections = function(connections) { if (is.DirectedConnection(connections)) { connections <- list(connections) } - + for (connection in connections) { if (!is.DirectedConnection(connection)) { UpDraftSettings$errorLogger("connection parameter is not a valid DirectedConnection obj") } - + revertState <- private$getRevertState() - + tryCatch({ private$connections[[connection$getName()]] <- connection - private$graph <- suppressWarnings(private$graph + private$graph <- suppressWarnings(private$graph + igraph::edge(connection$getHeadModuleName() , connection$getTailModuleName() , name = connection$getName())) @@ -215,24 +222,24 @@ DAGWorkflow <- R6::R6Class("DAGWorkflow" , error = function(err) { private$revert(revertState) UpDraftSettings$errorLogger(err) - }) + }) } - + return(invisible(NULL)) } - + , addModules = function(modules) { if (is.Module(modules)) { modules <- list(modules) } - + for (module in modules) { if (!is.Module(module)) { UpDraftSettings$errorLogger("modules parameter must contain valid implementation objects of ModuleInterface") } - + revertState <- private$getRevertState() - + tryCatch({ private$modules[[module$getName()]] <- module private$graph <- private$graph + igraph::vertex(module$getName()) @@ -241,12 +248,12 @@ DAGWorkflow <- R6::R6Class("DAGWorkflow" , error = function(err) { private$revert(revertState) UpDraftSettings$errorLogger(err) - }) + }) } - + return(invisible(NULL)) } - + , errorCheck = function(executionCheck = FALSE , ... ) { @@ -255,20 +262,20 @@ DAGWorkflow <- R6::R6Class("DAGWorkflow" } if(length(unique(names(private$connections))) != length(names(private$connections))) { - UpDraftSettings$errorLogger("There are non-unique connection names") + UpDraftSettings$errorLogger("There are non-unique connection names") } - + if(length(unique(names(private$modules))) != length(names(private$modules))) { - UpDraftSettings$errorLogger("There are non-unique module names") + UpDraftSettings$errorLogger("There are non-unique module names") } - - + + if(executionCheck) { if(!is.null(private$graph) && !igraph::is.dag(private$graph)) { UpDraftSettings$errorLogger("This workflow is not a Directed Acyclic Graph (DAG)") } - + # check that there is at most only one connection per module input argument # and all required arguments are met through connections and ... if(length(private$connections) > 0 && length(private$modules) > 0) { @@ -277,15 +284,15 @@ DAGWorkflow <- R6::R6Class("DAGWorkflow" modulesWithErrors = list() for(module in private$modules) { moduleArgs <- module$getInputs() - requiredArgNames <- names(moduleArgs[moduleArgs]) # moduleArgs value is True is required. The name of element corrsponds to the actual argument. - + requiredArgNames <- names(moduleArgs[moduleArgs]) # moduleArgs value is True is required. The name of element corrsponds to the actual argument. + inputConns <- private$connections[names(lapply(igraph::incident(private$graph, module$getName(), mode = 'in'), attr, 'name'))] connectionSuppliedArgs <- vapply(inputConns, function(conn){conn$getInputArgument()}, as.character(length(inputConns))) connectionSuppliedArgs <- connectionSuppliedArgs[connectionSuppliedArgs != ''] if(length(connectionSuppliedArgs) != length(unique(connectionSuppliedArgs))) { UpDraftSettings$errorLogger(module$getName(), " module has mutiple connections assigned in the same input arguments(s): ", unique(connectionSuppliedArgs[duplicated(connectionSuppliedArgs)])) } - + metArgs <- intersect(requiredArgNames, c(suppliedArgNames, connectionSuppliedArgs)) if(length(metArgs) != length(requiredArgNames)) { UpDraftSettings$errorLogger(module$getName(), " module has unmet required arguments: ", setdiff(requiredArgNames, metArgs)) @@ -296,7 +303,7 @@ DAGWorkflow <- R6::R6Class("DAGWorkflow" return(NULL) } - + , getWorkflowInputs = function() { allModuleInputs <- lapply(private$modules, self$getModuleInputs) @@ -337,25 +344,25 @@ DAGWorkflow <- R6::R6Class("DAGWorkflow" if (is.Module(module2)) { module2 <- module2$getName() } - + connections = list() - + graphEdges <- igraph::E(private$graph) connectingEdges <- graphEdges[module1 %->% module2] for (edgeIndex in connectingEdges) { selectedEdge <- graphEdges[edgeIndex] connections[[selectedEdge$name]] <- private$connections[[selectedEdge$name]] } - + return(connections) } - + , getDownstreamModules = function(module) { return(private$getNeighbors(private$graph , module , "out")) } - + , hasCompletedAllDownstreamModules = function(module) { downstreamModules <- self$getDownstreamModules(module) downstreamCompletedVec <- vapply( @@ -365,24 +372,24 @@ DAGWorkflow <- R6::R6Class("DAGWorkflow" ) return(all(downstreamCompletedVec)) } - + , getEndingModules = function() { endingModules <- list() - + graphVertices <- igraph::V(private$graph) degreeOut <- igraph::degree(private$graph, mode="out") for (vertexIndex in graphVertices[degreeOut == 0]) { vertexModule <- private$modules[[graphVertices[vertexIndex]$name]] endingModules[[vertexModule$getName()]] <- vertexModule } - + return(endingModules) } - + , getAllModules = function() { return(private$modules) } - + , getStartingModules = function() { startingModules <- list() @@ -391,9 +398,9 @@ DAGWorkflow <- R6::R6Class("DAGWorkflow" for (vertexIndex in graphVertices[degreeIn == 0]) { vertexModule <- private$modules[[graphVertices[vertexIndex]$name]] requiredInputs <- vertexModule$getInputs() - + # TODO: figure out a better way to not run unused vertices in a particular graph - # Does not work with outside argument injection! Ignore for now + # Does not work with outside argument injection! Ignore for now # if (!any(requiredInputs)) { # startingModules[[vertexModule$getName()]] <- vertexModule # } @@ -402,82 +409,82 @@ DAGWorkflow <- R6::R6Class("DAGWorkflow" return(startingModules) } - + , getName = function() { return(private$name) } - - + + , getUpstreamModules = function(module) { return(private$getNeighbors(private$graph , module , "in")) } - + , getSaveInfo = function() { UpDraftSettings$warningLogger("getSaveInfo method does not apply to workflows - run save method()") - return(list()) + return(list()) } - + , removeConnection = function(connection) { if(is.Connection(connection)) { connection <- connection$getName() } - + tryCatch({ private$graph <- suppressWarnings(private$graph - igraph::edge(connection)) } , error = function(err) { - UpDraftSettings$errorLogger("connection parameter does not relate to a current connection in this workflow") + UpDraftSettings$errorLogger("connection parameter does not relate to a current connection in this workflow") }) private$connections[connection] <- NULL - + return(NULL) } - + , removeModule = function(module) { if(is.Module(module)) { module <- module$getName() } - + tryCatch({ private$graph <- suppressWarnings(private$graph - igraph::vertex(module)) } , error = function(err) { - UpDraftSettings$errorLogger("module parameter does not relate to a current module in this workflow") + UpDraftSettings$errorLogger("module parameter does not relate to a current module in this workflow") }) - + private$modules[module] <- NULL - + connectionsLost <- setdiff(names(private$connections) , names(igraph::E(private$graph))) private$connections[connectionsLost] <- NULL - + return(NULL) } - + , save = function(filename) { workflowData <- list() - + workflowData[['class']] <- "DAGWorkflow" workflowData[['name']] <- private$name - + workflowData[['modules']] <- list() for (module in private$modules) { workflowData[['modules']][[module$getName()]] <- module$getSaveInfo() } - + workflowData[['connections']] <- list() for (connection in private$connections) { workflowData[['connections']][[connection$getName()]] <- connection$getSaveInfo() } - + write(jsonlite::toJSON(workflowData), filename) - + return(NULL) } - + , visualize = function() { # Node Graphics Setup nodes_df <- DiagrammeR::create_node_df(n = length(igraph::V(private$graph)) @@ -502,35 +509,35 @@ DAGWorkflow <- R6::R6Class("DAGWorkflow" } else { edgeDisplayColor <- c(edgeDisplayColor, UpDraftSettings$multipleConnectionsColor) } - + toolTip <- "" for (connection in connectionsBetween) { toolTip <- paste0(toolTip, names(connection$getInputArgument()), '->' , connection$getInputArgument() , '\n') } - edgeToolTips <- c(edgeToolTips, substr(toolTip,1,nchar(toolTip)-1)) # substr removes last '\n' character + edgeToolTips <- c(edgeToolTips, substr(toolTip,1,nchar(toolTip)-1)) # substr removes last '\n' character } edges_df <- DiagrammeR::create_edge_df(from = uniqueEdgeListNoNames[,1] , to = uniqueEdgeListNoNames[,2] , tooltip = edgeToolTips , color = edgeDisplayColor , penwidth = 3.0) - + # Render diagrammerGraph <- DiagrammeR::create_graph(nodes_df = nodes_df , edges_df = edges_df , graph_name = self$getName()) diagrammerGraph <- DiagrammeR::add_global_graph_attrs(diagrammerGraph, "layout", "dot", attr_type = "graph") return(DiagrammeR::grViz(DiagrammeR::generate_dot(diagrammerGraph))) - + } ) - + , private = list( connections = NULL , graph = NULL , modules = NULL , name = "" - + , getNeighbors = function(graph , module , direction @@ -538,7 +545,7 @@ DAGWorkflow <- R6::R6Class("DAGWorkflow" if(is.Module(module)) { module <- module$getName() } - + neighborModules <- list() graphVertices <- igraph::V(graph) @@ -547,27 +554,27 @@ DAGWorkflow <- R6::R6Class("DAGWorkflow" downstreamVertex <- graphVertices[[downstreamIndex]] neighborModules[[downstreamVertex$name]] <- private$modules[[downstreamVertex$name]] } - + return(neighborModules) } - + , getRevertState = function() { revertState <- list() - + revertState[["modules"]] <- private$modules revertState[["connections"]] <- private$connections revertState[["graph"]] <- private$graph revertState[["name"]] <- private$name - + return(revertState) } - + , revert = function(previousState) { private$modules <- previousState[['modules']] private$connections <- previousState[["connections"]] private$graph <- previousState[["graph"]] private$name <- previousState[["name"]] - + return(NULL) } ) @@ -583,9 +590,9 @@ DAGWorkflow <- R6::R6Class("DAGWorkflow" DAGWorkflow$initFromFile <- function(filename) { tryCatch({ workflowData <- jsonlite::fromJSON(filename) - + workflow <- DAGWorkflow$new(name = workflowData$name) - + modules <- list() for (moduleData in workflowData$modules) { modules[[length(modules) + 1]] <- utils::getFromNamespace(moduleData$class, "updraft")$initFromSaveData(moduleData) @@ -601,6 +608,6 @@ DAGWorkflow$initFromFile <- function(filename) { , error = function(err) { UpDraftSettings$errorLogger(filename," is corrupted - cannot load a workflow from it") }) - + return(workflow) } diff --git a/man/DAGWorkflow.Rd b/man/DAGWorkflow.Rd index b5e527a..1a72a90 100644 --- a/man/DAGWorkflow.Rd +++ b/man/DAGWorkflow.Rd @@ -75,6 +75,13 @@ Directed Acyclic Graph (DAG) workflow implementation \item{\bold{Returns}: list of downstream modules} } } + \item{\code{hasCompletedAllDownstreamModules(module)}}{ + \itemize{ + \item{Checks the completion of all downstream modules of \code{module}.} + \item{\bold{\code{module}}: valid implementation \code{ModuleInterface} obj or name of module that is present in a workflow.} + \item{\bold{Returns}: logical indicating all downstream modules are complete} + } + } \item{\code{getEndingModules()}}{ \itemize{ \item{Gets a list of ending modules in a workflow.} diff --git a/man/LogStrackTrace.Rd b/man/LogStrackTrace.Rd index 6ee2951..af7b8c3 100644 --- a/man/LogStrackTrace.Rd +++ b/man/LogStrackTrace.Rd @@ -5,8 +5,8 @@ \alias{LogStackTrace} \title{Log Stack Trace} \usage{ -LogStackTrace(moduleName, errorMessage, fileName = NULL, charLimit = 256, - objectLimit = 1e+05) +LogStackTrace(moduleName, errorMessage, fileName = NULL, + charLimit = 256, objectLimit = 1e+05) } \arguments{ \item{moduleName}{the name of the module that triggered the stack trace} diff --git a/man/WorkflowInterface.Rd b/man/WorkflowInterface.Rd index 60fc9da..cad8d43 100644 --- a/man/WorkflowInterface.Rd +++ b/man/WorkflowInterface.Rd @@ -25,6 +25,7 @@ Defines a contract of implementation for any workflow based class, i.e. a class \item{\code{getStartingModules()}}{Gets a list of modules that are the starting modules of a workflow.} \item{\code{getDownstreamModules(module)}}{Gets modules downstream of \code{module} in a workflow.} \item{\code{getUpstreamModules(module)}}{Gets modules upstream of \code{module} in a workflow.} + \item{\code{hasCompletedAllDownstreamModules(module)}}{Indicates if all modules downstream of \code{module} in a workflow have completed.} \item{\code{initFromFile(filename)}}{Initializes this workflow from a save state stored in \code{filename}.} \item{\code{removeConnection(connection)}}{Removes \code{connection}, an implementation instance of \code{ConnectionInterface}, to this workflow.} \item{\code{removeModule(module)}}{Adds \code{module}, an implementation instance of \code{ModuleInterface}, to this workflow.}