Skip to content

Commit

Permalink
Thin task execution controller (#5745)
Browse files Browse the repository at this point in the history
* Adding Thin Task Executions Controller to improve task list / paging performance.

Fixes #5718

* Added test for thinexecutions to TaskExecutionControllerTests.

* Fix TaskTemplate.
  • Loading branch information
corneil committed Mar 22, 2024
1 parent 37a8ff3 commit 27d8a64
Show file tree
Hide file tree
Showing 11 changed files with 407 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import java.util.stream.Collectors;
import javax.sql.DataSource;

import org.slf4j.Logger;
Expand Down Expand Up @@ -89,6 +90,9 @@ public class AggregateDataFlowTaskExecutionQueryDao implements DataflowTaskExecu
private static final String FIND_TASK_ARGUMENTS = "SELECT TASK_EXECUTION_ID, "
+ "TASK_PARAM from AGGREGATE_TASK_EXECUTION_PARAMS where TASK_EXECUTION_ID = :taskExecutionId and SCHEMA_TARGET = :schemaTarget";

private static final String FIND_TASKS_ARGUMENTS = "SELECT TASK_EXECUTION_ID, "
+ "TASK_PARAM from AGGREGATE_TASK_EXECUTION_PARAMS where TASK_EXECUTION_ID IN (:taskExecutionIds) and SCHEMA_TARGET = :schemaTarget";

private static final String GET_EXECUTIONS = "SELECT " + SELECT_CLAUSE +
" from AGGREGATE_TASK_EXECUTION";

Expand Down Expand Up @@ -217,7 +221,7 @@ public AggregateTaskExecution geTaskExecutionByExecutionId(String externalExecut
return this.jdbcTemplate.queryForObject(
GET_EXECUTION_BY_EXTERNAL_EXECUTION_ID,
queryParameters,
new CompositeTaskExecutionRowMapper()
new CompositeTaskExecutionRowMapper(true)
);
} catch (EmptyResultDataAccessException e) {
return null;
Expand All @@ -234,7 +238,7 @@ public AggregateTaskExecution getTaskExecution(long executionId, String schemaTa
return this.jdbcTemplate.queryForObject(
GET_EXECUTION_BY_ID,
queryParameters,
new CompositeTaskExecutionRowMapper()
new CompositeTaskExecutionRowMapper(true)
);
} catch (EmptyResultDataAccessException e) {
return null;
Expand All @@ -251,7 +255,7 @@ public List<AggregateTaskExecution> findChildTaskExecutions(long executionId, St
return this.jdbcTemplate.query(
GET_CHILD_EXECUTION_BY_ID,
queryParameters,
new CompositeTaskExecutionRowMapper()
new CompositeTaskExecutionRowMapper(true)
);
} catch (EmptyResultDataAccessException e) {
return null;
Expand All @@ -265,26 +269,44 @@ public List<AggregateTaskExecution> findChildTaskExecutions(Collection<Long> par
.addValue("schemaTarget", "--spring.cloud.task.parent-schema-target=" + schemaTarget);

try {
return this.jdbcTemplate.query(
GET_CHILD_EXECUTION_BY_IDS,
queryParameters,
new CompositeTaskExecutionRowMapper()
List<AggregateTaskExecution> result = this.jdbcTemplate.query(
GET_CHILD_EXECUTION_BY_IDS,
queryParameters,
new CompositeTaskExecutionRowMapper(false)
);
populateArguments(schemaTarget, result);
return result;
} catch (EmptyResultDataAccessException e) {
return null;
}
}

private void populateArguments(String schemaTarget, List<AggregateTaskExecution> result) {
List<Long> ids = result.stream().map(AggregateTaskExecution::getExecutionId).collect(Collectors.toList());
Map<Long, List<String>> paramMap = getTaskArgumentsForTasks(ids, schemaTarget);
result.forEach(aggregateTaskExecution -> {
List<String> params = paramMap.get(aggregateTaskExecution.getExecutionId());
if(params != null) {
aggregateTaskExecution.setArguments(params);
}
});
}

@Override
public List<AggregateTaskExecution> findTaskExecutions(String taskName, boolean completed) {
List<AggregateTaskExecution> result;
if (StringUtils.hasLength(taskName)) {
final SqlParameterSource queryParameters = new MapSqlParameterSource()
.addValue("taskName", taskName);
String query = completed ? GET_EXECUTIONS_BY_NAME_COMPLETED : GET_EXECUTIONS_BY_NAME;
return this.jdbcTemplate.query(query, queryParameters, new CompositeTaskExecutionRowMapper());
result = this.jdbcTemplate.query(query, queryParameters, new CompositeTaskExecutionRowMapper(false));
} else {
return this.jdbcTemplate.query(completed ? GET_EXECUTIONS_COMPLETED : GET_EXECUTIONS, Collections.emptyMap(), new CompositeTaskExecutionRowMapper());
result = this.jdbcTemplate.query(completed ? GET_EXECUTIONS_COMPLETED : GET_EXECUTIONS, Collections.emptyMap(), new CompositeTaskExecutionRowMapper(false));
}
result.stream()
.collect(Collectors.groupingBy(AggregateTaskExecution::getSchemaTarget))
.forEach(this::populateArguments);
return result;
}

@Override
Expand All @@ -294,7 +316,11 @@ public List<AggregateTaskExecution> findTaskExecutionsBeforeEndTime(String taskN
.addValue("endTime", endTime);
String query;
query = taskName.isEmpty() ? GET_EXECUTIONS_COMPLETED_BEFORE_END_TIME : GET_EXECUTION_BY_NAME_COMPLETED_BEFORE_END_TIME;
return this.jdbcTemplate.query(query, queryParameters, new CompositeTaskExecutionRowMapper());
List<AggregateTaskExecution> result = this.jdbcTemplate.query(query, queryParameters, new CompositeTaskExecutionRowMapper(false));
result.stream()
.collect(Collectors.groupingBy(AggregateTaskExecution::getSchemaTarget))
.forEach(this::populateArguments);
return result;
}

@Override
Expand Down Expand Up @@ -404,7 +430,11 @@ public List<AggregateTaskExecution> getLatestTaskExecutionsByTaskNames(String...
try {
final Map<String, List<String>> paramMap = Collections
.singletonMap("taskNames", taskNamesAsList);
return this.jdbcTemplate.query(LAST_TASK_EXECUTIONS_BY_TASK_NAMES, paramMap, new CompositeTaskExecutionRowMapper());
List<AggregateTaskExecution> result = this.jdbcTemplate.query(LAST_TASK_EXECUTIONS_BY_TASK_NAMES, paramMap, new CompositeTaskExecutionRowMapper(false));
result.stream()
.collect(Collectors.groupingBy(AggregateTaskExecution::getSchemaTarget))
.forEach(this::populateArguments);
return result;
} catch (EmptyResultDataAccessException e) {
return Collections.emptyList();
}
Expand Down Expand Up @@ -509,15 +539,19 @@ private Page<AggregateTaskExecution> queryForPageableResults(
}
String query = pagingQueryProvider.getPageQuery(pageable);
List<AggregateTaskExecution> resultList = this.jdbcTemplate.query(query,
queryParameters, new CompositeTaskExecutionRowMapper());
queryParameters, new CompositeTaskExecutionRowMapper(false));
resultList.stream()
.collect(Collectors.groupingBy(AggregateTaskExecution::getSchemaTarget))
.forEach(this::populateArguments);
return new PageImpl<>(resultList, pageable, totalCount);
}


private class CompositeTaskExecutionRowMapper implements RowMapper<AggregateTaskExecution> {

private CompositeTaskExecutionRowMapper() {
}
final boolean mapRow;
private CompositeTaskExecutionRowMapper(boolean mapRow) {
this.mapRow = mapRow;
}

@Override
public AggregateTaskExecution mapRow(ResultSet rs, int rowNum) throws SQLException {
Expand All @@ -536,7 +570,7 @@ public AggregateTaskExecution mapRow(ResultSet rs, int rowNum) throws SQLExcepti
rs.getTimestamp("START_TIME"),
rs.getTimestamp("END_TIME"),
rs.getString("EXIT_MESSAGE"),
getTaskArguments(id, schemaTarget),
mapRow ? getTaskArguments(id, schemaTarget) : Collections.emptyList(),
rs.getString("ERROR_MESSAGE"),
rs.getString("EXTERNAL_EXECUTION_ID"),
parentExecutionId,
Expand All @@ -554,11 +588,25 @@ private Integer getNullableExitCode(ResultSet rs) throws SQLException {
private List<String> getTaskArguments(long taskExecutionId, String schemaTarget) {
final List<String> params = new ArrayList<>();
RowCallbackHandler handler = rs -> params.add(rs.getString(2));
MapSqlParameterSource parameterSource = new MapSqlParameterSource("taskExecutionId", taskExecutionId)
.addValue("schemaTarget", schemaTarget);
this.jdbcTemplate.query(
FIND_TASK_ARGUMENTS,
new MapSqlParameterSource("taskExecutionId", taskExecutionId)
.addValue("schemaTarget", schemaTarget),
parameterSource,
handler);
return params;
}
private Map<Long, List<String>> getTaskArgumentsForTasks(Collection<Long> taskExecutionIds, String schemaTarget) {
if(taskExecutionIds.isEmpty()) {
return Collections.emptyMap();
} else {
final Map<Long, List<String>> result = new HashMap<>();
RowCallbackHandler handler = rs -> result.computeIfAbsent(rs.getLong(1), a -> new ArrayList<>())
.add(rs.getString(2));
MapSqlParameterSource parameterSource = new MapSqlParameterSource("taskExecutionIds", taskExecutionIds)
.addValue("schemaTarget", schemaTarget);
this.jdbcTemplate.query(FIND_TASKS_ARGUMENTS, parameterSource, handler);
return result;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
* @author Gunnar Hillert
* @author Christian Tzolov
* @author Ilayaperumal Gopinathan
* @author Corneil du Plessis
*/
@SuppressWarnings("NewClassNamingConvention")
public class ApiDocumentation extends BaseDocumentation {
Expand Down Expand Up @@ -117,6 +118,7 @@ public void index() throws Exception {
linkWithRel("tasks/executions/launch").description("Provides for launching a Task execution"),
linkWithRel("tasks/executions/external").description("Returns Task execution by external id"),
linkWithRel("tasks/executions/current").description("Provides the current count of running tasks"),
linkWithRel("tasks/thinexecutions").description("Returns thin Task executions"),
linkWithRel("tasks/info/executions").description("Provides the task executions info"),
linkWithRel("tasks/schedules").description("Provides schedule information of tasks"),
linkWithRel("tasks/schedules/instances").description("Provides schedule information of a specific task "),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,39 @@ public void listTaskExecutions() throws Exception {
subsectionWithPath("page").description("Pagination properties"))));
}

@Test
public void listTaskThinExecutions() throws Exception {
documentation.dontDocument(() -> this.mockMvc.perform(
post("/tasks/executions")
.param("name", "taskB")
.param("properties", "app.my-task.foo=bar,deployer.my-task.something-else=3")
.param("arguments", "--server.port=8080 --foo=bar")
)
.andExpect(status().isCreated()));

this.mockMvc.perform(
get("/tasks/thinexecutions")
.param("page", "1")
.param("size", "2"))
.andDo(print())
.andExpect(status().isOk()).andDo(this.documentationHandler.document(
requestParameters(
parameterWithName("page")
.description("The zero-based page number (optional)"),
parameterWithName("size")
.description("The requested page size (optional)")
),
responseFields(
subsectionWithPath("_embedded.taskExecutionThinResourceList")
.description("Contains a collection of thin Task Executions/"),
subsectionWithPath("_links.self").description("Link to the task execution resource"),
subsectionWithPath("_links.first").description("Link to the first page of task execution resources").optional(),
subsectionWithPath("_links.last").description("Link to the last page of task execution resources").optional(),
subsectionWithPath("_links.next").description("Link to the next page of task execution resources").optional(),
subsectionWithPath("_links.prev").description("Link to the previous page of task execution resources").optional(),
subsectionWithPath("page").description("Pagination properties"))));
}

@Test
public void listTaskExecutionsByName() throws Exception {
this.mockMvc.perform(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.springframework.cloud.dataflow.rest.resource.TaskAppStatusResource;
import org.springframework.cloud.dataflow.rest.resource.TaskDefinitionResource;
import org.springframework.cloud.dataflow.rest.resource.TaskExecutionResource;
import org.springframework.cloud.dataflow.rest.resource.TaskExecutionThinResource;
import org.springframework.hateoas.PagedModel;

/**
Expand All @@ -37,6 +38,7 @@
* @author Michael Minella
* @author Gunnar Hillert
* @author David Turanski
* @author Corneil du Plessis
*/
public interface TaskOperations {

Expand Down Expand Up @@ -108,6 +110,11 @@ public interface TaskOperations {
*/
PagedModel<TaskExecutionResource> executionList();

/**
* @return the list of thin task executions known to the system.
*/
PagedModel<TaskExecutionThinResource> thinExecutionList();

/**
* List task executions known to the system filtered by task name.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@

package org.springframework.cloud.dataflow.rest.client;

import javax.naming.OperationNotSupportedException;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.naming.OperationNotSupportedException;

import org.springframework.cloud.dataflow.rest.client.support.VersionUtils;
import org.springframework.cloud.dataflow.rest.resource.CurrentTaskExecutionsResource;
Expand All @@ -31,12 +30,14 @@
import org.springframework.cloud.dataflow.rest.resource.TaskAppStatusResource;
import org.springframework.cloud.dataflow.rest.resource.TaskDefinitionResource;
import org.springframework.cloud.dataflow.rest.resource.TaskExecutionResource;
import org.springframework.cloud.dataflow.rest.resource.TaskExecutionThinResource;
import org.springframework.cloud.dataflow.rest.resource.TaskExecutionsInfoResource;
import org.springframework.cloud.dataflow.rest.resource.about.AboutResource;
import org.springframework.cloud.dataflow.rest.util.DeploymentPropertiesUtils;
import org.springframework.cloud.dataflow.schema.SchemaVersionTarget;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.hateoas.Link;
import org.springframework.hateoas.PagedModel;
import org.springframework.hateoas.RepresentationModel;
import org.springframework.http.HttpMethod;
import org.springframework.util.Assert;
Expand All @@ -53,6 +54,7 @@
* @author Michael Minella
* @author Gunnar Hillert
* @author David Turanski
* @author Corneil du Plessis
*/
public class TaskTemplate implements TaskOperations {

Expand All @@ -66,6 +68,8 @@ public class TaskTemplate implements TaskOperations {

private static final String EXECUTIONS_RELATION = "tasks/executions";

private static final String THIN_EXECUTIONS_RELATION = "tasks/thinexecutions";

private static final String EXECUTIONS_CURRENT_RELATION = "tasks/executions/current";

private static final String EXECUTION_RELATION = "tasks/executions/execution";
Expand All @@ -90,6 +94,8 @@ public class TaskTemplate implements TaskOperations {

private final Link executionsLink;

private final Link thinExecutionsLink;

private final Link executionLink;

private final Link executionLaunchLink;
Expand All @@ -114,6 +120,7 @@ public class TaskTemplate implements TaskOperations {
Assert.notNull(restTemplate, "RestTemplate must not be null");
Assert.isTrue(resources.getLink("about").isPresent(), "Expected about relation");
Assert.isTrue(resources.getLink(EXECUTIONS_RELATION).isPresent(), "Executions relation is required");
Assert.isTrue(resources.getLink(THIN_EXECUTIONS_RELATION).isPresent(), "Executions relation is required");
Assert.isTrue(resources.getLink(DEFINITIONS_RELATION).isPresent(), "Definitions relation is required");
Assert.isTrue(resources.getLink(DEFINITION_RELATION).isPresent(), "Definition relation is required");
Assert.isTrue(resources.getLink(EXECUTIONS_RELATION).isPresent(), "Executions relation is required");
Expand All @@ -133,22 +140,26 @@ public class TaskTemplate implements TaskOperations {
if (VersionUtils.isDataFlowServerVersionGreaterThanOrEqualToRequiredVersion(
VersionUtils.getThreePartVersion(dataFlowServerVersion),
EXECUTIONS_CURRENT_RELATION_VERSION)) {
Assert.isTrue(resources.getLink(EXECUTIONS_CURRENT_RELATION).isPresent(), "Current Executions relation is required");
Assert.notNull(resources.getLink(EXECUTIONS_CURRENT_RELATION), "Executions current relation is required");
this.executionsCurrentLink = resources.getLink(EXECUTIONS_CURRENT_RELATION).get();
} else {
this.executionsCurrentLink = null;
}

this.restTemplate = restTemplate;
this.aboutLink = resources.getLink("about").get();
this.definitionsLink = resources.getLink(DEFINITIONS_RELATION).get();
this.definitionLink = resources.getLink(DEFINITION_RELATION).get();
this.executionsLink = resources.getLink(EXECUTIONS_RELATION).get();
this.thinExecutionsLink = resources.getLink(THIN_EXECUTIONS_RELATION).get();
this.executionLink = resources.getLink(EXECUTION_RELATION).get();
if(resources.getLink(EXECUTION_LAUNCH_RELATION).isPresent()) {
this.executionLaunchLink = resources.getLink(EXECUTION_LAUNCH_RELATION).get();
} else {
this.executionLaunchLink = null;
}
this.executionByNameLink = resources.getLink(EXECUTION_RELATION_BY_NAME).get();
this.executionsCurrentLink = resources.getLink(EXECUTIONS_CURRENT_RELATION).get();
if (resources.getLink(EXECUTIONS_INFO_RELATION).isPresent()) {
this.executionsInfoLink = resources.getLink(EXECUTIONS_INFO_RELATION).get();
}
Expand Down Expand Up @@ -256,6 +267,11 @@ public TaskExecutionResource.Page executionList() {
return restTemplate.getForObject(executionsLink.getHref(), TaskExecutionResource.Page.class);
}

@Override
public PagedModel<TaskExecutionThinResource> thinExecutionList() {
return restTemplate.getForObject(thinExecutionsLink.getHref(), TaskExecutionThinResource.Page.class);
}

@Override
public TaskExecutionResource.Page executionListByTaskName(String taskName) {
return restTemplate.getForObject(executionByNameLink.expand(taskName).getHref(),
Expand Down
Loading

0 comments on commit 27d8a64

Please sign in to comment.