Skip to content

Commit

Permalink
dss-server:fix code
Browse files Browse the repository at this point in the history
  • Loading branch information
xuanzhou11 committed Oct 18, 2024
1 parent eaa421e commit 861ebef
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,10 @@ object FlowExecutionEntranceConfiguration {

val CONFIGURATION = "configuration"

val COMMAND = "command"

val USER_TO_PROXY = "user.to.proxy"

val FLOW_VAR_MAP = "flow_var_map"

}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,15 @@ class DefaultFlowExecution extends FlowExecution with Logging {
info(s"${flowEntranceJob.getId} Submit nodes(${runningNodes.size}) to running")
runningNodes.foreach { node =>
node.getNode.getDSSNode.getParams.get(FlowExecutionEntranceConfiguration.PROPS_MAP).asInstanceOf[java.util.Map[String, Any]].putAll(flowEntranceJob.getParams)
node.getNode.getDSSNode.getParams.get(FlowExecutionEntranceConfiguration.CONFIGURATION).asInstanceOf[java.util.Map[String, Any]].get(FlowExecutionEntranceConfiguration.START_UP).asInstanceOf[java.util.Map[String, Any]].put(FlowExecutionEntranceConfiguration.YARN_QUEUE, null)
if (node.getNode.getDSSNode.getParams.get(FlowExecutionEntranceConfiguration.CONFIGURATION) != null && node.getNode.getDSSNode.getParams.get(FlowExecutionEntranceConfiguration.CONFIGURATION).asInstanceOf[java.util.Map[String, Any]].get(FlowExecutionEntranceConfiguration.START_UP) != null) {
node.getNode.getDSSNode.getParams.get(FlowExecutionEntranceConfiguration.CONFIGURATION).asInstanceOf[java.util.Map[String, Any]].get(FlowExecutionEntranceConfiguration.START_UP).asInstanceOf[java.util.Map[String, Any]].put(FlowExecutionEntranceConfiguration.YARN_QUEUE, null)
}
if (node.getNode.getDSSNode.getParams.get(FlowExecutionEntranceConfiguration.PROPS_MAP) != null && node.getNode.getDSSNode.getParams.get(FlowExecutionEntranceConfiguration.PROPS_MAP).asInstanceOf[java.util.Map[String, Any]].get(FlowExecutionEntranceConfiguration.COMMAND) != null) {
node.getNode.getDSSNode.getParams.get(FlowExecutionEntranceConfiguration.PROPS_MAP).asInstanceOf[java.util.Map[String, Any]].get(FlowExecutionEntranceConfiguration.COMMAND).asInstanceOf[java.util.Map[String, Any]].put(FlowExecutionEntranceConfiguration.USER_TO_PROXY, null)
}
if (node.getNode.getDSSNode.getParams.get(FlowExecutionEntranceConfiguration.FLOW_VAR_MAP) != null) {
node.getNode.getDSSNode.getParams.get(FlowExecutionEntranceConfiguration.FLOW_VAR_MAP).asInstanceOf[java.util.Map[String, Any]].put(FlowExecutionEntranceConfiguration.USER_TO_PROXY, null)
}
info(s"nodeParams:${node.getNode.getDSSNode.getParams} ")
info(s"JobContent:${node.getNode.getDSSNode.getJobContent} ")
node.run()
Expand Down

0 comments on commit 861ebef

Please sign in to comment.