Skip to content

Commit

Permalink
Shutdown client properly (#252)
Browse files Browse the repository at this point in the history
* Shutdown client properly

Signed-off-by: Hongxin Liang <[email protected]>
  • Loading branch information
honnix authored Sep 29, 2023
1 parent c656650 commit 8dff1f8
Showing 1 changed file with 41 additions and 38 deletions.
79 changes: 41 additions & 38 deletions jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java
Original file line number Diff line number Diff line change
Expand Up @@ -203,44 +203,47 @@ static DynamicJobSpec rewrite(
Map<TaskIdentifier, TaskTemplate> taskTemplates,
Map<WorkflowIdentifier, WorkflowTemplate> workflowTemplates) {

WorkflowNodeVisitor workflowNodeVisitor =
IdentifierRewrite.builder()
.domain(executionConfig.domain())
.project(executionConfig.project())
.version(executionConfig.version())
.adminClient(
FlyteAdminClient.create(config.platformUrl(), config.platformInsecure(), null))
.build()
.visitor();

List<Node> rewrittenNodes =
spec.nodes().stream().map(workflowNodeVisitor::visitNode).collect(toUnmodifiableList());

Map<WorkflowIdentifier, WorkflowTemplate> usedSubWorkflows =
ProjectClosure.collectSubWorkflows(rewrittenNodes, workflowTemplates);

Map<TaskIdentifier, TaskTemplate> usedTaskTemplates =
ProjectClosure.collectTasks(rewrittenNodes, taskTemplates);

// FIXME one sub-workflow can use more sub-workflows, we should recursively collect used tasks
// and workflows

Map<WorkflowIdentifier, WorkflowTemplate> rewrittenUsedSubWorkflows =
mapValues(usedSubWorkflows, workflowNodeVisitor::visitWorkflowTemplate);

return spec.toBuilder()
.nodes(rewrittenNodes)
.subWorkflows(
ImmutableMap.<WorkflowIdentifier, WorkflowTemplate>builder()
.putAll(spec.subWorkflows())
.putAll(rewrittenUsedSubWorkflows)
.build())
.tasks(
ImmutableMap.<TaskIdentifier, TaskTemplate>builder()
.putAll(spec.tasks())
.putAll(usedTaskTemplates)
.build())
.build();
try (FlyteAdminClient flyteAdminClient =
FlyteAdminClient.create(config.platformUrl(), config.platformInsecure(), null)) {

WorkflowNodeVisitor workflowNodeVisitor =
IdentifierRewrite.builder()
.domain(executionConfig.domain())
.project(executionConfig.project())
.version(executionConfig.version())
.adminClient(flyteAdminClient)
.build()
.visitor();

List<Node> rewrittenNodes =
spec.nodes().stream().map(workflowNodeVisitor::visitNode).collect(toUnmodifiableList());

Map<WorkflowIdentifier, WorkflowTemplate> usedSubWorkflows =
ProjectClosure.collectSubWorkflows(rewrittenNodes, workflowTemplates);

Map<TaskIdentifier, TaskTemplate> usedTaskTemplates =
ProjectClosure.collectTasks(rewrittenNodes, taskTemplates);

// FIXME one sub-workflow can use more sub-workflows, we should recursively collect used tasks
// and workflows

Map<WorkflowIdentifier, WorkflowTemplate> rewrittenUsedSubWorkflows =
mapValues(usedSubWorkflows, workflowNodeVisitor::visitWorkflowTemplate);

return spec.toBuilder()
.nodes(rewrittenNodes)
.subWorkflows(
ImmutableMap.<WorkflowIdentifier, WorkflowTemplate>builder()
.putAll(spec.subWorkflows())
.putAll(rewrittenUsedSubWorkflows)
.build())
.tasks(
ImmutableMap.<TaskIdentifier, TaskTemplate>builder()
.putAll(spec.tasks())
.putAll(usedTaskTemplates)
.build())
.build();
}
}

private static DynamicWorkflowTask getDynamicWorkflowTask(String name) {
Expand Down

0 comments on commit 8dff1f8

Please sign in to comment.