Skip to content

Commit

Permalink
Forward port ctr status on Thin Controller. (#5906)
Browse files Browse the repository at this point in the history
* Added ThinTaskExecution to store ctrStatus.
Update Controller and Service to populate the ctr status.

Fixes #5907

* Updated for comments.
  • Loading branch information
corneil authored Aug 27, 2024
1 parent c31a8d6 commit 65989eb
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.dataflow.core;

import java.time.LocalDateTime;
import java.util.List;

import org.springframework.cloud.task.repository.TaskExecution;

/**
* Overrides TaskExecution class to provide CTR status required.
* @author Corneil du Plessis
*/
public class ThinTaskExecution extends TaskExecution {
private String ctrTaskStatus;

public ThinTaskExecution() {
}
public ThinTaskExecution(TaskExecution taskExecution) {
super(taskExecution.getExecutionId(), taskExecution.getExitCode(), taskExecution.getTaskName(), taskExecution.getStartTime(), taskExecution.getEndTime(), taskExecution.getExitMessage(), taskExecution.getArguments(), taskExecution.getErrorMessage(), taskExecution.getExternalExecutionId(), taskExecution.getParentExecutionId());
}
public ThinTaskExecution(TaskExecution taskExecution, String ctrTaskStatus) {
super(taskExecution.getExecutionId(), taskExecution.getExitCode(), taskExecution.getTaskName(), taskExecution.getStartTime(), taskExecution.getEndTime(), taskExecution.getExitMessage(), taskExecution.getArguments(), taskExecution.getErrorMessage(), taskExecution.getExternalExecutionId(), taskExecution.getParentExecutionId());
this.ctrTaskStatus = ctrTaskStatus;
}
public ThinTaskExecution(long executionId, Integer exitCode, String taskName, LocalDateTime startTime, LocalDateTime endTime, String exitMessage, List<String> arguments, String errorMessage, String externalExecutionId, Long parentExecutionId) {
super(executionId, exitCode, taskName, startTime, endTime, exitMessage, arguments, errorMessage, externalExecutionId, parentExecutionId);
}

public ThinTaskExecution(long executionId, Integer exitCode, String taskName, LocalDateTime startTime, LocalDateTime endTime, String exitMessage, List<String> arguments, String errorMessage, String externalExecutionId) {
super(executionId, exitCode, taskName, startTime, endTime, exitMessage, arguments, errorMessage, externalExecutionId);
}

public String getCtrTaskStatus() {
return ctrTaskStatus;
}

public void setCtrTaskStatus(String ctrTaskStatus) {
this.ctrTaskStatus = ctrTaskStatus;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import java.time.LocalDateTime;

import org.springframework.cloud.task.repository.TaskExecution;
import org.springframework.cloud.dataflow.core.ThinTaskExecution;
import org.springframework.hateoas.PagedModel;
import org.springframework.hateoas.RepresentationModel;

Expand Down Expand Up @@ -66,22 +66,22 @@ public class TaskExecutionThinResource extends RepresentationModel<TaskExecution

private String errorMessage;

private String composedTaskJobExecutionStatus;

public TaskExecutionThinResource() {
}

public TaskExecutionThinResource(TaskExecution taskExecution) {
public TaskExecutionThinResource(ThinTaskExecution taskExecution) {
this.executionId = taskExecution.getExecutionId();

this.taskName = taskExecution.getTaskName();

this.externalExecutionId = taskExecution.getExternalExecutionId();
this.parentExecutionId =taskExecution.getParentExecutionId();
this.startTime = taskExecution.getStartTime();
this.endTime = taskExecution.getEndTime();
this.exitCode = taskExecution.getExitCode();
this.exitMessage = taskExecution.getExitMessage();
this.errorMessage = taskExecution.getErrorMessage();
this.composedTaskJobExecutionStatus = taskExecution.getCtrTaskStatus();
}

public long getExecutionId() {
Expand Down Expand Up @@ -156,6 +156,30 @@ public void setErrorMessage(String errorMessage) {
this.errorMessage = errorMessage;
}

public String getComposedTaskJobExecutionStatus() {
return composedTaskJobExecutionStatus;
}

public void setComposedTaskJobExecutionStatus(String composedTaskJobExecutionStatus) {
this.composedTaskJobExecutionStatus = composedTaskJobExecutionStatus;
}

public TaskExecutionStatus getTaskExecutionStatus() {
if (this.startTime == null) {
return TaskExecutionStatus.UNKNOWN;
}
if (this.endTime == null) {
return TaskExecutionStatus.RUNNING;
}
if (this.composedTaskJobExecutionStatus != null) {
return (this.composedTaskJobExecutionStatus.equals("ABANDONED") ||
this.composedTaskJobExecutionStatus.equals("FAILED") ||
this.composedTaskJobExecutionStatus.equals("STOPPED")) ?
TaskExecutionStatus.ERROR : TaskExecutionStatus.COMPLETE;
}
return (this.exitCode == null) ? TaskExecutionStatus.RUNNING :
((this.exitCode == 0) ? TaskExecutionStatus.COMPLETE : TaskExecutionStatus.ERROR);
}
public static class Page extends PagedModel<TaskExecutionThinResource> {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
package org.springframework.cloud.dataflow.server.controller;


import org.springframework.cloud.dataflow.core.ThinTaskExecution;
import org.springframework.cloud.dataflow.rest.resource.TaskExecutionThinResource;
import org.springframework.cloud.dataflow.server.task.DataflowTaskExplorer;
import org.springframework.cloud.task.repository.TaskExecution;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.Pageable;
import org.springframework.data.web.PagedResourcesAssembler;
import org.springframework.hateoas.PagedModel;
Expand Down Expand Up @@ -47,21 +50,24 @@ public class TaskExecutionThinController {

public TaskExecutionThinController(DataflowTaskExplorer explorer) {
this.explorer = explorer;
this.resourceAssembler = new TaskExecutionThinResourceAssembler();
this.resourceAssembler = new TaskExecutionThinResourceAssembler();
}

@GetMapping(produces = "application/json")
@ResponseStatus(HttpStatus.OK)
public PagedModel<TaskExecutionThinResource> listTasks(Pageable pageable, PagedResourcesAssembler<TaskExecution> pagedAssembler) {
return pagedAssembler.toModel(explorer.findAll(pageable), resourceAssembler);
public PagedModel<TaskExecutionThinResource> listTasks(Pageable pageable, PagedResourcesAssembler<ThinTaskExecution> pagedAssembler) {
Page<TaskExecution> page = explorer.findAll(pageable);
Page<ThinTaskExecution> thinTaskExecutions = new PageImpl<>(page.stream().map(ThinTaskExecution::new).toList(), pageable, page.getTotalElements());
explorer.populateCtrStatus(thinTaskExecutions.getContent());
return pagedAssembler.toModel(thinTaskExecutions, resourceAssembler);
}

static class TaskExecutionThinResourceAssembler extends RepresentationModelAssemblerSupport<TaskExecution, TaskExecutionThinResource> {
static class TaskExecutionThinResourceAssembler extends RepresentationModelAssemblerSupport<ThinTaskExecution, TaskExecutionThinResource> {
public TaskExecutionThinResourceAssembler() {
super(TaskExecutionThinController.class, TaskExecutionThinResource.class);
}
@Override
public TaskExecutionThinResource toModel(TaskExecution entity) {
public TaskExecutionThinResource toModel(ThinTaskExecution entity) {
TaskExecutionThinResource resource = new TaskExecutionThinResource(entity);
resource.add(linkTo(methodOn(TaskExecutionController.class).view(resource.getExecutionId())).withSelfRel());
resource.add(linkTo(methodOn(TaskDefinitionController.class).display(resource.getTaskName(), true)).withRel("tasks/definitions"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Date;
import java.util.List;

import org.springframework.cloud.dataflow.core.ThinTaskExecution;
import org.springframework.cloud.task.repository.TaskExecution;
import org.springframework.cloud.task.repository.dao.TaskExecutionDao;
import org.springframework.data.domain.Page;
Expand Down Expand Up @@ -166,4 +167,5 @@ public interface DataflowTaskExecutionQueryDao {

TaskExecution geTaskExecutionByExecutionId(String executionId, String taskName);

void populateCtrStatus(Collection<ThinTaskExecution> thinTaskExecutions);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.List;
import java.util.Set;

import org.springframework.cloud.dataflow.core.ThinTaskExecution;
import org.springframework.cloud.task.repository.TaskExecution;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
Expand Down Expand Up @@ -175,4 +176,10 @@ public interface DataflowTaskExplorer {
* @see #getLatestTaskExecutionsByTaskNames(String...)
*/
TaskExecution getLatestTaskExecutionForTaskName(String taskName);

/**
* Populate CTR status for all tasks
* @param thinTaskExecutions
*/
void populateCtrStatus(Collection<ThinTaskExecution> thinTaskExecutions);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import javax.sql.DataSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.batch.item.database.Order;
import org.springframework.cloud.dataflow.core.ThinTaskExecution;
import org.springframework.cloud.dataflow.server.task.DataflowTaskExecutionQueryDao;
import org.springframework.cloud.task.repository.TaskExecution;
import org.springframework.cloud.task.repository.database.PagingQueryProvider;
Expand Down Expand Up @@ -79,6 +82,14 @@ public class DefaultDataFlowTaskExecutionQueryDao implements DataflowTaskExecuti
private static final String FIND_TASK_ARGUMENTS = "SELECT TASK_EXECUTION_ID, "
+ "TASK_PARAM from TASK_EXECUTION_PARAMS where TASK_EXECUTION_ID = :taskExecutionId";

private static final String FIND_CTR_STATUS = "SELECT T.TASK_EXECUTION_ID as TASK_EXECUTION_ID, J.EXIT_CODE as CTR_STATUS" +
" from TASK_EXECUTION T" +
" JOIN TASK_TASK_BATCH TB ON TB.TASK_EXECUTION_ID = T.TASK_EXECUTION_ID" +
" JOIN BATCH_JOB_EXECUTION J ON J.JOB_EXECUTION_ID = TB.JOB_EXECUTION_ID" +
" WHERE T.TASK_EXECUTION_ID in (:taskExecutionIds) " +
" AND (select count(*) from TASK_EXECUTION CT" + // it is the parent of one or more tasks
" where CT.PARENT_EXECUTION_ID = T.TASK_EXECUTION_ID) > 0";

private static final String GET_EXECUTIONS = "SELECT " + SELECT_CLAUSE +
" from TASK_EXECUTION";

Expand Down Expand Up @@ -509,4 +520,23 @@ private List<String> getTaskArguments(long taskExecutionId) {
handler);
return params;
}

@Override
public void populateCtrStatus(Collection<ThinTaskExecution> thinTaskExecutions) {
Map<Long, ThinTaskExecution> taskExecutionMap = thinTaskExecutions.stream()
.collect(Collectors.toMap(ThinTaskExecution::getExecutionId, Function.identity()));
String ids = taskExecutionMap.keySet()
.stream()
.map(Object::toString)
.collect(Collectors.joining(","));
String sql = FIND_CTR_STATUS.replace(":taskExecutionIds", ids);
jdbcTemplate.query(sql, rs -> {
Long id = rs.getLong("TASK_EXECUTION_ID");
String ctrStatus = rs.getString("CTR_STATUS");
logger.debug("populateCtrStatus:{}={}", id, ctrStatus);
ThinTaskExecution execution = taskExecutionMap.get(id);
Assert.notNull(execution, "Expected TaskExecution for " + id + " from " + ids);
execution.setCtrTaskStatus(ctrStatus);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.cloud.dataflow.core.ThinTaskExecution;
import org.springframework.cloud.dataflow.server.task.DataflowTaskExplorer;
import org.springframework.cloud.dataflow.server.task.DataflowTaskExecutionQueryDao;
import org.springframework.cloud.dataflow.server.task.TaskDefinitionReader;
Expand Down Expand Up @@ -185,4 +186,8 @@ public TaskExecution getLatestTaskExecutionForTaskName(String taskName) {
return taskExplorer.getLatestTaskExecutionForTaskName(taskName);
}

@Override
public void populateCtrStatus(Collection<ThinTaskExecution> thinTaskExecutions) {
this.taskExecutionQueryDao.populateCtrStatus(thinTaskExecutions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ void getAllExecutions() throws Exception {
.andExpect(status().isOk()))
.andExpect(jsonPath("$._embedded.taskExecutionResourceList[*].executionId", containsInAnyOrder(4, 3, 2, 1)))
.andExpect(jsonPath("$._embedded.taskExecutionResourceList[*].parentExecutionId", containsInAnyOrder(null, null, null, 1)))
.andExpect(jsonPath("$._embedded.taskExecutionResourceList[*].taskExecutionStatus", containsInAnyOrder("RUNNING", "RUNNING","RUNNING","RUNNING")))
.andExpect(jsonPath("$._embedded.taskExecutionResourceList", hasSize(4)));
}

Expand All @@ -342,6 +343,7 @@ void getAllThinExecutions() throws Exception {
.andExpect(status().isOk())
.andExpect(jsonPath("$._embedded.taskExecutionThinResourceList[*].executionId", containsInAnyOrder(4, 3, 2, 1)))
.andExpect(jsonPath("$._embedded.taskExecutionThinResourceList[*].parentExecutionId", containsInAnyOrder(null, null, null, 1)))
.andExpect(jsonPath("$._embedded.taskExecutionThinResourceList[*].taskExecutionStatus", containsInAnyOrder("RUNNING", "RUNNING","RUNNING","RUNNING")))
.andExpect(jsonPath("$._embedded.taskExecutionThinResourceList", hasSize(4)));
}

Expand Down

0 comments on commit 65989eb

Please sign in to comment.