diff --git a/spring-cloud-dataflow-classic-docs/src/test/java/org/springframework/cloud/dataflow/server/rest/documentation/JobExecutionsDocumentation.java b/spring-cloud-dataflow-classic-docs/src/test/java/org/springframework/cloud/dataflow/server/rest/documentation/JobExecutionsDocumentation.java index 4f4db4ebae..cb18140ab8 100644 --- a/spring-cloud-dataflow-classic-docs/src/test/java/org/springframework/cloud/dataflow/server/rest/documentation/JobExecutionsDocumentation.java +++ b/spring-cloud-dataflow-classic-docs/src/test/java/org/springframework/cloud/dataflow/server/rest/documentation/JobExecutionsDocumentation.java @@ -334,6 +334,7 @@ public void jobStop() throws Exception { public void jobRestart() throws Exception { this.mockMvc.perform(put("/jobs/executions/{id}", "2") .queryParam("restart", "true") + .queryParam("useJsonJobParameters", "true") ) .andDo(print()) .andExpect(status().isOk()) @@ -341,6 +342,9 @@ public void jobRestart() throws Exception { pathParameters(parameterWithName("id") .description("The id of an existing job execution (required)")) , queryParameters( + parameterWithName("useJsonJobParameters").description("If true dataflow will " + + "serialize job parameters as JSON. Default is null, and the default " + + "configuration will be used to determine serialization method.").optional(), parameterWithName("restart") .description("Sends signal to restart the job if set to true") ) diff --git a/spring-cloud-dataflow-docs/src/main/asciidoc/api-guide.adoc b/spring-cloud-dataflow-docs/src/main/asciidoc/api-guide.adoc index c96804b6da..658c890a29 100644 --- a/spring-cloud-dataflow-docs/src/main/asciidoc/api-guide.adoc +++ b/spring-cloud-dataflow-docs/src/main/asciidoc/api-guide.adoc @@ -2666,7 +2666,7 @@ include::{snippets}/job-executions-documentation/job-restart/path-parameters.ado [[api-guide-resources-job-executions-restart-request-parameters]] ===== Request Parameters -include::{snippets}/job-executions-documentation/job-restart/request-parameters.adoc[] +include::{snippets}/job-executions-documentation/job-restart/query-parameters.adoc[] diff --git a/spring-cloud-dataflow-rest-client/src/main/java/org/springframework/cloud/dataflow/rest/client/JobOperations.java b/spring-cloud-dataflow-rest-client/src/main/java/org/springframework/cloud/dataflow/rest/client/JobOperations.java index e8d6af8bca..8bbd8dc325 100644 --- a/spring-cloud-dataflow-rest-client/src/main/java/org/springframework/cloud/dataflow/rest/client/JobOperations.java +++ b/spring-cloud-dataflow-rest-client/src/main/java/org/springframework/cloud/dataflow/rest/client/JobOperations.java @@ -42,6 +42,16 @@ public interface JobOperations { */ void executionRestart(long id); + /** + * Restarts a job by id + * + * @param id job execution id + * @param useJsonJobParameters if true {@link org.springframework.batch.core.JobParameters} will be serialized to JSON. + * Default is {@code Null} which will serialize the {@link org.springframework.batch.core.JobParameters} + * to the default specified in SCDF's configuration. + */ + void executionRestart(long id, Boolean useJsonJobParameters); + /** * @return the list job executions without step executions known to the system. */ diff --git a/spring-cloud-dataflow-rest-client/src/main/java/org/springframework/cloud/dataflow/rest/client/JobTemplate.java b/spring-cloud-dataflow-rest-client/src/main/java/org/springframework/cloud/dataflow/rest/client/JobTemplate.java index d71e3a5db7..027b7510f9 100644 --- a/spring-cloud-dataflow-rest-client/src/main/java/org/springframework/cloud/dataflow/rest/client/JobTemplate.java +++ b/spring-cloud-dataflow-rest-client/src/main/java/org/springframework/cloud/dataflow/rest/client/JobTemplate.java @@ -117,6 +117,14 @@ public void executionRestart(long id) { restTemplate.put(builder.toUriString(), null); } + @Override + public void executionRestart(long id, Boolean useJsonJobParameters) { + UriComponentsBuilder builder = UriComponentsBuilder.fromUriString(executionLink.expand(id).getHref()).queryParam("restart", "true") + .queryParam("useJsonJobParameters", useJsonJobParameters); + + restTemplate.put(builder.toUriString(), null); + } + @Override public PagedModel executionThinList() { UriComponentsBuilder builder = UriComponentsBuilder.fromUriString(thinExecutionsLink.getHref()).queryParam("size", "2000"); diff --git a/spring-cloud-dataflow-rest-client/src/test/java/org/springframework/cloud/dataflow/rest/client/DataflowTemplateTests.java b/spring-cloud-dataflow-rest-client/src/test/java/org/springframework/cloud/dataflow/rest/client/DataflowTemplateTests.java index 221e406941..d47b01d61e 100644 --- a/spring-cloud-dataflow-rest-client/src/test/java/org/springframework/cloud/dataflow/rest/client/DataflowTemplateTests.java +++ b/spring-cloud-dataflow-rest-client/src/test/java/org/springframework/cloud/dataflow/rest/client/DataflowTemplateTests.java @@ -22,9 +22,8 @@ import java.util.List; import java.util.Optional; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -34,15 +33,14 @@ import org.springframework.batch.core.JobInstance; import org.springframework.batch.core.JobParameter; import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.StepExecution; import org.springframework.batch.item.ExecutionContext; import org.springframework.cloud.dataflow.rest.Version; import org.springframework.cloud.dataflow.rest.job.StepExecutionHistory; import org.springframework.cloud.dataflow.rest.resource.RootResource; -import org.springframework.cloud.dataflow.rest.support.jackson.Jackson2DataflowModule; import org.springframework.hateoas.Link; import org.springframework.hateoas.LinkRelation; -import org.springframework.hateoas.mediatype.hal.Jackson2HalModule; import org.springframework.http.converter.HttpMessageConverter; import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter; import org.springframework.web.client.ResourceAccessException; @@ -69,10 +67,7 @@ public class DataflowTemplateTests { @Before public void setup() { mapper = new ObjectMapper(); - mapper.registerModule(new Jdk8Module()); - mapper.registerModule(new Jackson2HalModule()); - mapper.registerModule(new JavaTimeModule()); - mapper.registerModule(new Jackson2DataflowModule()); + DataFlowTemplate.prepareObjectMapper(mapper); System.setProperty("sun.net.client.defaultConnectTimeout", String.valueOf(100)); } @@ -102,9 +97,22 @@ public void testDataFlowTemplateContructorWithNonExistingUri() throws URISyntaxE @Test public void testThatObjectMapperGetsPrepared() { - final ObjectMapper objectMapper = new ObjectMapper(); - DataFlowTemplate.prepareObjectMapper(objectMapper); - assertCorrectMixins(objectMapper); + assertCorrectMixins(this.mapper); + } + + @Test + public void testJobParameters() throws JsonProcessingException { + JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(); + jobParametersBuilder.addString("foo", "foo"); + jobParametersBuilder.addString("bar", "bar"); + + JobParameters jobParameters = jobParametersBuilder.toJobParameters(); + assertCorrectMixins(this.mapper); + String jobParametersSerialized = this.mapper.writeValueAsString(jobParameters); + jobParameters = this.mapper.readValue(jobParametersSerialized, JobParameters.class); + assertEquals(jobParameters.getParameter("foo").getValue(), "foo"); + assertEquals(jobParameters.getParameter("bar").getValue(), "bar"); + assertEquals(jobParameters.getParameters().size(), 2); } @Test diff --git a/spring-cloud-dataflow-rest-resource/src/main/java/org/springframework/cloud/dataflow/rest/support/jackson/JobParameterJacksonDeserializer.java b/spring-cloud-dataflow-rest-resource/src/main/java/org/springframework/cloud/dataflow/rest/support/jackson/JobParameterJacksonDeserializer.java index 08833bb0af..64441e7100 100644 --- a/spring-cloud-dataflow-rest-resource/src/main/java/org/springframework/cloud/dataflow/rest/support/jackson/JobParameterJacksonDeserializer.java +++ b/spring-cloud-dataflow-rest-resource/src/main/java/org/springframework/cloud/dataflow/rest/support/jackson/JobParameterJacksonDeserializer.java @@ -50,24 +50,16 @@ public JobParameter deserialize(JsonParser jsonParser, DeserializationContext de String type = node.get("type").asText(); JobParameter jobParameter; - //TODO: Boot3x followup Verify that Job Parameters setup properly for Batch 5 - if (!type.isEmpty() && !type.equalsIgnoreCase("STRING")) { - if ("DATE".equalsIgnoreCase(type)) { - jobParameter = new JobParameter(LocalDateTime.parse(value), LocalDateTime.class, identifying); - } - else if ("DOUBLE".equalsIgnoreCase(type)) { - jobParameter = new JobParameter(Double.valueOf(value), Double.class, identifying); - } - else if ("LONG".equalsIgnoreCase(type)) { - jobParameter = new JobParameter(Long.valueOf(value), Long.class, identifying); - } - else { - throw new IllegalStateException("Unsupported JobParameter type: " + type); + if (!type.isEmpty()) { + try { + jobParameter = new JobParameter(value, Class.forName(type), identifying); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("JobParameter type %s is not supported by DataFlow".formatted(type), e); } } else { - jobParameter = new JobParameter(value, String.class, identifying); - } + jobParameter = new JobParameter(value, String.class, identifying); + } if (logger.isDebugEnabled()) { logger.debug("jobParameter - value: {} (type: {}, isIdentifying: {})", diff --git a/spring-cloud-dataflow-rest-resource/src/main/java/org/springframework/cloud/dataflow/rest/support/jackson/JobParametersJacksonMixIn.java b/spring-cloud-dataflow-rest-resource/src/main/java/org/springframework/cloud/dataflow/rest/support/jackson/JobParametersJacksonMixIn.java index d13606f656..1eb69b93f6 100644 --- a/spring-cloud-dataflow-rest-resource/src/main/java/org/springframework/cloud/dataflow/rest/support/jackson/JobParametersJacksonMixIn.java +++ b/spring-cloud-dataflow-rest-resource/src/main/java/org/springframework/cloud/dataflow/rest/support/jackson/JobParametersJacksonMixIn.java @@ -16,9 +16,12 @@ package org.springframework.cloud.dataflow.rest.support.jackson; +import java.util.Map; + import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; +import org.springframework.batch.core.JobParameter; import org.springframework.batch.core.JobParameters; /** @@ -27,9 +30,12 @@ * @author Gunnar Hillert * @since 1.0 */ -@JsonIgnoreProperties("empty") +@JsonIgnoreProperties({"empty", "identifyingParameters"}) public abstract class JobParametersJacksonMixIn { @JsonProperty abstract boolean isEmpty(); + + @JsonProperty + abstract Map> getIdentifyingParameters(); } diff --git a/spring-cloud-dataflow-rest-resource/src/test/java/org/springframework/cloud/dataflow/rest/support/jackson/JobParameterJacksonDeserializerTests.java b/spring-cloud-dataflow-rest-resource/src/test/java/org/springframework/cloud/dataflow/rest/support/jackson/JobParameterJacksonDeserializerTests.java new file mode 100644 index 0000000000..447d3eeb00 --- /dev/null +++ b/spring-cloud-dataflow-rest-resource/src/test/java/org/springframework/cloud/dataflow/rest/support/jackson/JobParameterJacksonDeserializerTests.java @@ -0,0 +1,62 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.dataflow.rest.support.jackson; + +import java.io.ByteArrayInputStream; +import java.io.IOException; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.json.UTF8StreamJsonParser; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Test; +import org.springframework.batch.core.JobParameter; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType; + +public class JobParameterJacksonDeserializerTests { + + @Test + public void validJobParameter() throws IOException { + JobParameterJacksonDeserializer jobParameterJacksonDeserializer = new JobParameterJacksonDeserializer(); + String json = "{\"value\":\"BAR\",\"type\":\"java.lang.String\",\"identifying\":true}"; + JobParameter jobParameter = jobParameterJacksonDeserializer.deserialize(getJsonParser(json), null); + assertThat(jobParameter.getType()).isEqualTo(String.class); + assertThat(jobParameter.getValue()).isEqualTo("BAR"); + assertThat(jobParameter.isIdentifying()).isTrue(); + } + + @Test + public void inValidJobParameter() throws IOException { + JobParameterJacksonDeserializer jobParameterJacksonDeserializer = new JobParameterJacksonDeserializer(); + String json = "{\"value\":\"BAR\",\"type\":\"java.lang.FOO\",\"identifying\":true}"; + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> { + jobParameterJacksonDeserializer.deserialize(getJsonParser(json), null); + }) + .withMessage("JobParameter type java.lang.FOO is not supported by DataFlow"); + } + + private JsonParser getJsonParser(String json) throws IOException { + JsonFactory factory = new JsonFactory(); + byte[] jsonData = json.getBytes(); + ByteArrayInputStream inputStream = new ByteArrayInputStream(jsonData); + UTF8StreamJsonParser jsonParser = (UTF8StreamJsonParser) factory.createParser(inputStream); + jsonParser.setCodec(new ObjectMapper()); + return jsonParser; + } +} diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/JdbcSearchableStepExecutionDao.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/JdbcSearchableStepExecutionDao.java index 8f394e02f6..e5c7d25513 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/JdbcSearchableStepExecutionDao.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/batch/JdbcSearchableStepExecutionDao.java @@ -17,6 +17,7 @@ import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Timestamp; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -201,8 +202,11 @@ private static class StepExecutionRowMapper implements RowMapper public StepExecution mapRow(ResultSet rs, int rowNum) throws SQLException { StepExecution stepExecution = new StepExecution(rs.getString(2), null); stepExecution.setId(rs.getLong(1)); - stepExecution.setStartTime(rs.getTimestamp(3).toLocalDateTime()); - stepExecution.setEndTime(rs.getTimestamp(4).toLocalDateTime()); + Timestamp startTimeStamp = rs.getTimestamp(3); + Timestamp endTimeStamp = rs.getTimestamp(4); + + stepExecution.setStartTime((startTimeStamp == null) ? null : startTimeStamp.toLocalDateTime()); + stepExecution.setEndTime((endTimeStamp == null) ? null : endTimeStamp.toLocalDateTime()); stepExecution.setStatus(BatchStatus.valueOf(rs.getString(5))); stepExecution.setCommitCount(rs.getInt(6)); stepExecution.setReadCount(rs.getInt(7)); diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/config/features/TaskConfiguration.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/config/features/TaskConfiguration.java index a2bae4aed2..8723573df3 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/config/features/TaskConfiguration.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/config/features/TaskConfiguration.java @@ -272,13 +272,14 @@ public TaskJobService taskJobExecutionRepository( DataflowTaskExplorer taskExplorer, TaskDefinitionRepository taskDefinitionRepository, TaskExecutionService taskExecutionService, - LauncherRepository launcherRepository) { + LauncherRepository launcherRepository, TaskConfigurationProperties taskConfigurationProperties) { return new DefaultTaskJobService( service, taskExplorer, taskDefinitionRepository, taskExecutionService, - launcherRepository + launcherRepository, + taskConfigurationProperties ); } } diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/JobExecutionController.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/JobExecutionController.java index 8ee6dd75a6..fd0471ab5b 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/JobExecutionController.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/JobExecutionController.java @@ -18,6 +18,8 @@ import java.util.TimeZone; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.launch.JobExecutionNotRunningException; @@ -39,7 +41,6 @@ import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.util.Assert; -import org.springframework.util.StringUtils; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; @@ -63,6 +64,8 @@ @ExposesResourceFor(JobExecutionResource.class) public class JobExecutionController { + private static final Logger logger = LoggerFactory.getLogger(JobExecutionController.class); + private final Assembler jobAssembler = new Assembler(); private final TaskJobService taskJobService; @@ -148,9 +151,16 @@ public ResponseEntity stopJobExecution( @RequestMapping(value = {"/{executionId}"}, method = RequestMethod.PUT, params = "restart=true") @ResponseStatus(HttpStatus.OK) public ResponseEntity restartJobExecution( - @PathVariable("executionId") long jobExecutionId) throws NoSuchJobExecutionException { - taskJobService.restartJobExecution(jobExecutionId); - return ResponseEntity.ok().build(); + @PathVariable("executionId") long jobExecutionId, + @RequestParam(value = "useJsonJobParameters", required = false) Boolean useJsonJobParameters) + throws NoSuchJobExecutionException { + try { + taskJobService.restartJobExecution(jobExecutionId, useJsonJobParameters); + } catch (NoSuchJobExecutionException e) { + logger.warn(e.getMessage(), e); + throw e; + } + return ResponseEntity.ok().build(); } /** @@ -188,7 +198,8 @@ public JobExecutionResource instantiateModel(TaskJobExecution taskJobExecution) resource.add(linkTo(methodOn(JobExecutionController.class).stopJobExecution(taskJobExecution.getJobExecution().getJobId())).withRel("stop")); } if (!taskJobExecution.getJobExecution().getStatus().equals(BatchStatus.COMPLETED)) { - resource.add(linkTo(methodOn(JobExecutionController.class).restartJobExecution(taskJobExecution.getJobExecution().getJobId())).withRel("restart")); + // In this case we use null for the useJsonJobParameters parameter, so we use the configured job parameter serialization method specified by dataflow. + resource.add(linkTo(methodOn(JobExecutionController.class).restartJobExecution(taskJobExecution.getJobExecution().getJobId(), null)).withRel("restart")); } } catch (NoSuchJobExecutionException | JobExecutionNotRunningException e) { throw new RuntimeException(e); diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/JobExecutionThinController.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/JobExecutionThinController.java index b8e740ca91..bc0eae2c11 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/JobExecutionThinController.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/JobExecutionThinController.java @@ -216,7 +216,8 @@ public JobExecutionThinResource instantiateModel(TaskJobExecution taskJobExecuti resource.add(linkTo(methodOn(JobExecutionController.class).stopJobExecution(taskJobExecution.getJobExecution().getJobId())).withRel("stop")); } if (taskJobExecution.getJobExecution().getEndTime() != null && !taskJobExecution.getJobExecution().isRunning()) { - resource.add(linkTo(methodOn(JobExecutionController.class).restartJobExecution(taskJobExecution.getJobExecution().getJobId())).withRel("restart")); + // In this case we use null for the useJsonJobParameters parameter so we use the configured job parameter serialization method specified by dataflow. + resource.add(linkTo(methodOn(JobExecutionController.class).restartJobExecution(taskJobExecution.getJobExecution().getJobId(), null)).withRel("restart")); } } catch (NoSuchJobExecutionException | JobExecutionNotRunningException e) { throw new RuntimeException(e); diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/job/support/StepExecutionProgressInfo.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/job/support/StepExecutionProgressInfo.java index 2ea291d9c4..fe34abc6c2 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/job/support/StepExecutionProgressInfo.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/job/support/StepExecutionProgressInfo.java @@ -111,7 +111,7 @@ private double calculatePercentageComplete() { double result = 0.0; if (readHistory.getMean() == 0) { percentCompleteBasis = PercentCompleteBasis.DURATION; - result = getDurationBasedEstimate(duration); + result = getDurationBasedEstimate(); } else { percentCompleteBasis = PercentCompleteBasis.READCOUNT; @@ -120,7 +120,7 @@ private double calculatePercentageComplete() { return result; } - private double getDurationBasedEstimate(double duration) { + private double getDurationBasedEstimate() { CumulativeHistory durationHistory = stepExecutionHistory.getDuration(); if (durationHistory.getMean() == 0) { diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/TaskJobService.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/TaskJobService.java index e52bf79313..f5e4e55cfa 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/TaskJobService.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/TaskJobService.java @@ -17,7 +17,6 @@ package org.springframework.cloud.dataflow.server.service; import java.util.Date; -import java.util.List; import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.Job; @@ -31,6 +30,7 @@ import org.springframework.cloud.dataflow.rest.job.TaskJobExecution; import org.springframework.cloud.dataflow.server.batch.JobExecutionWithStepCount; import org.springframework.cloud.dataflow.server.job.support.JobNotRestartableException; +import org.springframework.cloud.dataflow.server.service.impl.TaskConfigurationProperties; import org.springframework.cloud.task.repository.TaskExecution; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; @@ -99,8 +99,10 @@ public interface TaskJobService { JobInstanceExecutions getJobInstance(long id) throws NoSuchJobInstanceException, NoSuchJobException; /** - * Restarts a {@link JobExecution} IF the respective {@link JobExecution} is actually + * Restarts a {@link JobExecution} if the respective {@link JobExecution} is actually * deemed restartable. Otherwise a {@link JobNotRestartableException} is being thrown. + * The system will use {@link TaskConfigurationProperties#isUseJsonJobParameters()} to + * determine the {@link org.springframework.batch.core.JobParameter} serializer. * * @param jobExecutionId The id of the JobExecution to restart. * @throws NoSuchJobExecutionException if the JobExecution for the provided id does not @@ -108,6 +110,21 @@ public interface TaskJobService { */ void restartJobExecution(long jobExecutionId) throws NoSuchJobExecutionException; + /** + * Restarts a {@link JobExecution} if the respective {@link JobExecution} is actually + * deemed restartable. Otherwise, a {@link JobNotRestartableException} is being thrown. + * + * @param jobExecutionId The id of the JobExecution to restart. + * @param useJsonJobParameters if set to true, dataflow will serialize job parameters to the command line using the + * format provided by {@code JsonJobParametersConverter}. + * If set to false dataflow will use {@code DefaultParametersConverter}. + * If null dataflow will use {@link TaskConfigurationProperties#isUseJsonJobParameters()} + * to determine the {@link org.springframework.batch.core.JobParameter} serializer. + * @throws NoSuchJobExecutionException if the JobExecution for the provided id does not + * exist. + */ + void restartJobExecution(long jobExecutionId, Boolean useJsonJobParameters) throws NoSuchJobExecutionException; + /** * Requests a {@link JobExecution} to stop. *

diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskJobService.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskJobService.java index 4b618e4004..41e652e9e8 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskJobService.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskJobService.java @@ -29,7 +29,6 @@ import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobInstance; -import org.springframework.batch.core.JobParameter; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.launch.JobExecutionNotRunningException; @@ -84,13 +83,16 @@ public class DefaultTaskJobService implements TaskJobService { private final LauncherRepository launcherRepository; + private final TaskConfigurationProperties taskConfigurationProperties; + public DefaultTaskJobService( - JobService jobService, - DataflowTaskExplorer taskExplorer, - TaskDefinitionRepository taskDefinitionRepository, - TaskExecutionService taskExecutionService, - LauncherRepository launcherRepository) { + JobService jobService, + DataflowTaskExplorer taskExplorer, + TaskDefinitionRepository taskDefinitionRepository, + TaskExecutionService taskExecutionService, + LauncherRepository launcherRepository, + TaskConfigurationProperties taskConfigurationProperties) { Assert.notNull(jobService, "jobService must not be null"); Assert.notNull(taskExplorer, "taskExplorer must not be null"); Assert.notNull(taskDefinitionRepository, "taskDefinitionRepository must not be null"); @@ -101,6 +103,7 @@ public DefaultTaskJobService( this.taskDefinitionRepository = taskDefinitionRepository; this.taskExecutionService = taskExecutionService; this.launcherRepository = launcherRepository; + this.taskConfigurationProperties = taskConfigurationProperties; } @Override @@ -218,6 +221,11 @@ public JobInstanceExecutions getJobInstance(long id) throws NoSuchJobInstanceExc @Override public void restartJobExecution(long jobExecutionId) throws NoSuchJobExecutionException { + restartJobExecution(jobExecutionId, null); + } + + @Override + public void restartJobExecution(long jobExecutionId, Boolean useJsonJobParameters) throws NoSuchJobExecutionException { logger.info("restarting job:{}", jobExecutionId); final TaskJobExecution taskJobExecution = this.getJobExecution(jobExecutionId); final JobExecution jobExecution = taskJobExecution.getJobExecution(); @@ -253,7 +261,7 @@ public void restartJobExecution(long jobExecutionId) throws NoSuchJobExecutionEx deploymentProperties.put(DefaultTaskExecutionService.TASK_PLATFORM_NAME, platformName); taskExecutionService.executeTask(taskDefinition.getName(), deploymentProperties, restartExecutionArgs(taskExecution.getArguments(), - taskJobExecution.getJobExecution().getJobParameters())); + taskJobExecution.getJobExecution().getJobParameters(), useJsonJobParameters)); } else { throw new IllegalStateException(String.format("Did not find platform for taskName=[%s] , taskId=[%s]", taskExecution.getTaskName(), taskJobExecution.getTaskId())); @@ -269,28 +277,23 @@ public void restartJobExecution(long jobExecutionId) throws NoSuchJobExecutionEx * * @param taskExecutionArgs original set of task execution arguments * @param jobParameters for the job to be restarted. + * @param useJsonJobParameters determine what converter to use to serialize the job parameter to the command line arguments. * @return deduped list of arguments that contains the original arguments and any * identifying job parameters not in the original task execution arguments. */ - private List restartExecutionArgs(List taskExecutionArgs, JobParameters jobParameters) { - List result = new ArrayList<>(taskExecutionArgs); - String type; - Map> jobParametersMap = jobParameters.getParameters(); - for (String key : jobParametersMap.keySet()) { - if (!key.startsWith("-")) { - boolean existsFlag = false; - for (String arg : taskExecutionArgs) { - if (arg.startsWith(key)) { - existsFlag = true; - break; - } - } - if (!existsFlag) { - type = jobParametersMap.get(key).getType().getCanonicalName(); - result.add(String.format("%s=%s,%s", key, jobParametersMap.get(key).getValue(), type)); - } - } + private List restartExecutionArgs(List taskExecutionArgs, JobParameters jobParameters, + Boolean useJsonJobParameters) { + if (useJsonJobParameters == null) { + useJsonJobParameters = taskConfigurationProperties.isUseJsonJobParameters(); } + var jobParamsConverter = useJsonJobParameters ? new ScdfJsonJobParametersConverter() + : new ScdfDefaultJobParametersConverter(); + List result = new ArrayList<>(taskExecutionArgs); + jobParameters.getParameters().entrySet().stream() + .filter((e) -> !e.getKey().startsWith("-")) + .filter((e) -> taskExecutionArgs.stream().noneMatch((arg) -> arg.startsWith(e.getKey()))) + .map((e) -> e.getKey() + "=" + jobParamsConverter.deserializeJobParameter(e.getValue())) + .forEach(result::add); return result; } diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/ScdfDefaultJobParametersConverter.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/ScdfDefaultJobParametersConverter.java new file mode 100644 index 0000000000..42631128c7 --- /dev/null +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/ScdfDefaultJobParametersConverter.java @@ -0,0 +1,35 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.dataflow.server.service.impl; + +import org.springframework.batch.core.JobParameter; +import org.springframework.batch.core.converter.DefaultJobParametersConverter; + +/** + * Provides methods to serialize a Spring Batch {@link JobParameter} to the Spring Batch's default format. + */ +public class ScdfDefaultJobParametersConverter extends DefaultJobParametersConverter implements ScdfJobParametersConverter { + + public ScdfDefaultJobParametersConverter() { + super(); + } + + @Override + public String deserializeJobParameter(JobParameter jobParameter) { + return encode(jobParameter); + } +} diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/ScdfJobParametersConverter.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/ScdfJobParametersConverter.java new file mode 100644 index 0000000000..1c2fd785fc --- /dev/null +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/ScdfJobParametersConverter.java @@ -0,0 +1,32 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.dataflow.server.service.impl; + +import org.springframework.batch.core.JobParameter; + +/** + * Provides methods to serialize a Spring Batch {@link JobParameter} to the proper format. + */ +public interface ScdfJobParametersConverter { + + /** + * Serializes a Spring Batch {@link JobParameter} to the proper format. + * @param jobParameter to be serialized + * @return Serialized job parameter + */ + String deserializeJobParameter(JobParameter jobParameter); +} diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/ScdfJsonJobParametersConverter.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/ScdfJsonJobParametersConverter.java new file mode 100644 index 0000000000..c13fcc3c31 --- /dev/null +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/ScdfJsonJobParametersConverter.java @@ -0,0 +1,34 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.cloud.dataflow.server.service.impl; + +import org.springframework.batch.core.JobParameter; +import org.springframework.batch.core.converter.JsonJobParametersConverter; + +/** + * Provides methods to serialize a Spring Batch {@link JobParameter} to JSON. + */ +public class ScdfJsonJobParametersConverter extends JsonJobParametersConverter implements ScdfJobParametersConverter { + + public ScdfJsonJobParametersConverter() { + super(); + } + + @Override + public String deserializeJobParameter(JobParameter jobParameter) { + return encode(jobParameter); + } +} diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/TaskConfigurationProperties.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/TaskConfigurationProperties.java index b7ddcfe2ee..45f0657825 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/TaskConfigurationProperties.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/TaskConfigurationProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2020 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -68,6 +68,14 @@ public class TaskConfigurationProperties { */ private boolean useKubernetesSecretsForDbCredentials; + /** + * Controls the style that Dataflow reconstitutes job parameters when re-running a + * failed batch job. The style will be taken from Spring Batch's + * DefaultJobParametersConverter when set to false or JsonJobParametersConverter when true. + */ + + private boolean useJsonJobParameters = false; + @Deprecated public String getComposedTaskRunnerUri() { logDeprecationWarning("getUri"); @@ -189,4 +197,12 @@ public int getExecutionDeleteChunkSize() { public void setExecutionDeleteChunkSize(int executionDeleteChunkSize) { this.executionDeleteChunkSize = executionDeleteChunkSize; } + + public boolean isUseJsonJobParameters() { + return useJsonJobParameters; + } + + public void setUseJsonJobParameters(boolean useJsonJobParameters) { + this.useJsonJobParameters = useJsonJobParameters; + } } diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/JobDependencies.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/JobDependencies.java index deb86a8bd9..d00b26c72b 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/JobDependencies.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/JobDependencies.java @@ -297,14 +297,16 @@ public TaskJobService taskJobExecutionRepository( DataflowTaskExplorer taskExplorer, TaskDefinitionRepository taskDefinitionRepository, TaskExecutionService taskExecutionService, - LauncherRepository launcherRepository + LauncherRepository launcherRepository, + TaskConfigurationProperties taskConfigurationProperties ) { return new DefaultTaskJobService( jobService, taskExplorer, taskDefinitionRepository, taskExecutionService, - launcherRepository); + launcherRepository, + taskConfigurationProperties); } @Bean diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/JobExecutionControllerTests.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/JobExecutionControllerTests.java index 3af6171ff8..de7c2516fc 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/JobExecutionControllerTests.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/JobExecutionControllerTests.java @@ -188,7 +188,6 @@ public void testGetExecution() throws Exception { @Test public void testGetExecutionWithJobProperties() throws Exception { MvcResult result = mockMvc.perform(get("/jobs/executions/10").accept(MediaType.APPLICATION_JSON)) - .andDo(print()) .andExpect(status().isOk()) .andExpect(jsonPath("$.executionId", is(10))) .andExpect(jsonPath("$.jobExecution.jobParameters.parameters", Matchers.hasKey(("javaUtilDate")))) @@ -197,6 +196,16 @@ public void testGetExecutionWithJobProperties() throws Exception { assertThat(result.getResponse().getContentAsString()).contains("\"type\":\"java.lang.String\""); } + @Test + public void testGetExecutionWithJobPropertiesOverrideJobParam() throws Exception { + MvcResult result = mockMvc.perform(get("/jobs/executions/10?useJsonJobParameters=true").accept(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andExpect(jsonPath("$.executionId", is(10))) + .andExpect(jsonPath("$.jobExecution.jobParameters.parameters", Matchers.hasKey(("javaUtilDate")))) + .andExpect(jsonPath("$.jobExecution.stepExecutions", hasSize(1))).andReturn(); + assertThat(result.getResponse().getContentAsString()).contains("\"identifying\":true", "\"type\":\"java.lang.String\""); + } + @Test public void testGetAllExecutionsFailed() throws Exception { createDirtyJob(); @@ -214,8 +223,6 @@ public void testGetAllExecutions() throws Exception { .andExpect(jsonPath("$._embedded.jobExecutionResourceList[*].executionId", containsInRelativeOrder(10, 9, 8, 7, 6, 5, 4, 3, 2, 1))); } - //TODO: Boot3x followup - @Disabled("TODO: Boot3x followup Until we implement the paging capabilities this tests is disabled.") @Test public void testGetAllExecutionsPageOffsetLargerThanIntMaxValue() throws Exception { verify5XXErrorIsThrownForPageOffsetError(get("/jobs/executions")); @@ -233,8 +240,6 @@ public void testGetExecutionsByName() throws Exception { .andExpect(jsonPath("$._embedded.jobExecutionResourceList", hasSize(1))); } - //TODO: Boot3x followup - @Disabled("TODO: Boot3x followup Until we implement the paging capabilities this tests is disabled.") @Test public void testGetExecutionsByNamePageOffsetLargerThanIntMaxValue() throws Exception { verify5XXErrorIsThrownForPageOffsetError( diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskJobServiceTests.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskJobServiceTests.java index f122af78aa..9c1af2e895 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskJobServiceTests.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskJobServiceTests.java @@ -160,8 +160,19 @@ public void testRestart() throws Exception { final ArgumentCaptor argument = ArgumentCaptor.forClass(AppDeploymentRequest.class); verify(this.taskLauncher, times(1)).launch(argument.capture()); AppDeploymentRequest appDeploymentRequest = argument.getAllValues().get(0); + assertThat(appDeploymentRequest.getCommandlineArguments()).contains("identifying.param=testparam,java.lang.String,true"); + } - assertThat(appDeploymentRequest.getCommandlineArguments()).contains("identifying.param=testparam,java.lang.String"); + @Test + public void testRestartWithJsonParameters() throws Exception { + createBaseLaunchers(); + initializeJobs(true); + + this.taskJobService.restartJobExecution(jobInstanceCount, true); + ArgumentCaptor argument = ArgumentCaptor.forClass(AppDeploymentRequest.class); + verify(this.taskLauncher, times(1)).launch(argument.capture()); + AppDeploymentRequest appDeploymentRequest = argument.getAllValues().get(0); + assertThat(appDeploymentRequest.getCommandlineArguments()).contains("identifying.param={\"value\":\"testparam\",\"type\":\"java.lang.String\",\"identifying\":\"true\"}"); } @Test @@ -184,7 +195,7 @@ public void testRestartOnePlatform() throws Exception { final ArgumentCaptor argument = ArgumentCaptor.forClass(AppDeploymentRequest.class); verify(this.taskLauncher, times(1)).launch(argument.capture()); AppDeploymentRequest appDeploymentRequest = argument.getAllValues().get(0); - assertThat(appDeploymentRequest.getCommandlineArguments()).contains("identifying.param=testparam,java.lang.String"); + assertThat(appDeploymentRequest.getCommandlineArguments()).contains("identifying.param=testparam,java.lang.String,true"); } private void initializeJobs(boolean insertTaskExecutionMetadata) diff --git a/spring-cloud-dataflow-shell-core/src/main/java/org/springframework/cloud/dataflow/shell/command/JobCommands.java b/spring-cloud-dataflow-shell-core/src/main/java/org/springframework/cloud/dataflow/shell/command/JobCommands.java index 2964f45fe4..af11a71117 100644 --- a/spring-cloud-dataflow-shell-core/src/main/java/org/springframework/cloud/dataflow/shell/command/JobCommands.java +++ b/spring-cloud-dataflow-shell-core/src/main/java/org/springframework/cloud/dataflow/shell/command/JobCommands.java @@ -107,8 +107,17 @@ public Table executionList( @ShellMethod(key = EXECUTION_RESTART, value = "Restart a failed job by jobExecutionId") @ShellMethodAvailability("availableWithViewRole") public String executionRestart( - @ShellOption(help = "the job execution id") long id) { - jobOperations().executionRestart(id); + @ShellOption(help = "the job executiond id") long id, + @ShellOption(value = "--useJsonJobParameters", + help = "boolean value serialize job parameter as Json. " + + "Default is null, meaning SCDF default will be used.", + defaultValue = ShellOption.NULL) String useJsonJobParameters) { + if(useJsonJobParameters == null) { + jobOperations().executionRestart(id); + } + else { + jobOperations().executionRestart(id, Boolean.valueOf(useJsonJobParameters)); + } return String.format("Restart request has been sent for job execution '%s'", id); } diff --git a/spring-cloud-dataflow-shell-core/src/test/java/org/springframework/cloud/dataflow/shell/AbstractShellIntegrationTest.java b/spring-cloud-dataflow-shell-core/src/test/java/org/springframework/cloud/dataflow/shell/AbstractShellIntegrationTest.java index abe9b4aa3f..5de37be3fe 100644 --- a/spring-cloud-dataflow-shell-core/src/test/java/org/springframework/cloud/dataflow/shell/AbstractShellIntegrationTest.java +++ b/spring-cloud-dataflow-shell-core/src/test/java/org/springframework/cloud/dataflow/shell/AbstractShellIntegrationTest.java @@ -119,7 +119,12 @@ public static void startUp() { "--spring.jmx.default-domain=" + System.currentTimeMillis(), "--spring.jmx.enabled=false", "--security.basic.enabled=false", "--spring.main.show_banner=false", "--spring.cloud.config.enabled=false", - "--spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration,org.springframework.boot.autoconfigure.security.servlet.SecurityFilterAutoConfiguration,org.springframework.boot.actuate.autoconfigure.security.servlet.ManagementWebSecurityAutoConfiguration,org.springframework.boot.autoconfigure.session.SessionAutoConfiguration,org.springframework.cloud.deployer.spi.cloudfoundry.CloudFoundryDeployerAutoConfiguration,org.springframework.cloud.deployer.spi.kubernetes.KubernetesAutoConfiguration", + "--spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration," + + "org.springframework.boot.autoconfigure.security.servlet.SecurityFilterAutoConfiguration," + + "org.springframework.boot.actuate.autoconfigure.security.servlet.ManagementWebSecurityAutoConfiguration," + + "org.springframework.boot.autoconfigure.session.SessionAutoConfiguration," + + "org.springframework.cloud.deployer.spi.cloudfoundry.CloudFoundryDeployerAutoConfiguration," + + "org.springframework.cloud.deployer.spi.kubernetes.KubernetesAutoConfiguration", "--spring.datasource.url=" + dataSourceUrl, "--spring.cloud.dataflow.features.schedules-enabled=true"); Shell shell = applicationContext.getBean(Shell.class); diff --git a/spring-cloud-dataflow-shell-core/src/test/java/org/springframework/cloud/dataflow/shell/command/JobCommandTests.java b/spring-cloud-dataflow-shell-core/src/test/java/org/springframework/cloud/dataflow/shell/command/JobCommandTests.java index f8e66f1701..081a173bb7 100644 --- a/spring-cloud-dataflow-shell-core/src/test/java/org/springframework/cloud/dataflow/shell/command/JobCommandTests.java +++ b/spring-cloud-dataflow-shell-core/src/test/java/org/springframework/cloud/dataflow/shell/command/JobCommandTests.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.BiConsumer; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -29,11 +30,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.batch.core.BatchStatus; +import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobInstance; import org.springframework.batch.core.JobParameter; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.explore.JobExplorer; import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.repository.JobRepository; @@ -80,6 +84,7 @@ public static void setUp() throws Exception { Thread.sleep(2000); taskBatchDao = applicationContext.getBean(TaskBatchDao.class); jobRepository = applicationContext.getBean(JobRepository.class); + taskExecutionDao = applicationContext.getBean(TaskExecutionDao.class); taskExecutionIds.add(createSampleJob(JOB_NAME_ORIG, 1)); taskExecutionIds.add(createSampleJob(JOB_NAME_FOO, 1)); @@ -94,30 +99,28 @@ public static void tearDown() { } JdbcTemplate template = new JdbcTemplate(applicationContext.getBean(DataSource.class)); template.afterPropertiesSet(); - final String TASK_EXECUTION_FORMAT = "DELETE FROM task_execution WHERE task_execution_id = %d"; - final String TASK_BATCH_FORMAT = "DELETE FROM task_task_batch WHERE task_execution_id = %d"; - - for (Long id : taskExecutionIds) { - template.execute(String.format(TASK_BATCH_FORMAT, id)); - template.execute(String.format(TASK_EXECUTION_FORMAT, id)); - } } private static long createSampleJob(String jobName, int jobExecutionCount) throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobRestartException { - JobInstance instance = jobRepository.createJobInstance(jobName, new JobParameters()); - jobInstances.add(instance); TaskExecution taskExecution = taskExecutionDao.createTaskExecution(jobName, LocalDateTime.now(), new ArrayList<>(), null); + Map> jobParameterMap = new HashMap<>(); - jobParameterMap.put("foo", new JobParameter("FOO", String.class, true)); - jobParameterMap.put("bar", new JobParameter("BAR", String.class, false)); + jobParameterMap.put("foo", new JobParameter("FOO", String.class, false)); + jobParameterMap.put("bar", new JobParameter("BAR", String.class, true)); + jobParameterMap.put("baz", new JobParameter("55", Long.class, true)); JobParameters jobParameters = new JobParameters(jobParameterMap); JobExecution jobExecution; for (int i = 0; i < jobExecutionCount; i++) { jobExecution = jobRepository.createJobExecution(jobName, jobParameters); + JobInstance instance = jobExecution.getJobInstance(); + jobInstances.add(instance); taskBatchDao.saveRelationship(taskExecution, jobExecution); StepExecution stepExecution = new StepExecution("foobar", jobExecution); jobRepository.add(stepExecution); + jobExecution.setStatus(BatchStatus.FAILED); + jobExecution.setExitStatus(ExitStatus.FAILED); + jobRepository.update(jobExecution); } return taskExecution.getExecutionId(); } @@ -133,7 +136,6 @@ public void testJobExecutionList() { checkCell(table, 0, 3, "Start Time "); checkCell(table, 0, 4, "Step Execution Count "); checkCell(table, 0, 5, "Definition Status "); - } @Test @@ -152,10 +154,9 @@ public void testJobExecutionListByName() { @Test public void testViewExecution() { logger.info("Retrieve Job Execution Detail by Id"); - Table table = getTable(job().executionDisplay(getFirstJobExecutionIdFromTable())); verifyColumnNumber(table, 2); - assertEquals("Number of expected rows returned from the table is incorrect", 18, + assertEquals("Number of expected rows returned from the table is incorrect", 19, table.getModel().getRowCount()); int rowNumber = 0; checkCell(table, rowNumber++, 0, "Key "); @@ -174,22 +175,33 @@ public void testViewExecution() { checkCell(table, rowNumber++, 0, "Exit Message "); checkCell(table, rowNumber++, 0, "Definition Status "); checkCell(table, rowNumber++, 0, "Job Parameters "); - int paramRowOne = rowNumber++; - int paramRowTwo = rowNumber++; - boolean jobParamsPresent = false; - if ((table.getModel().getValue(paramRowOne, 0).equals("foo(STRING) ") - && table.getModel().getValue(paramRowTwo, 0).equals("-bar(STRING) ")) - || (table.getModel().getValue(paramRowOne, 0).equals("-bar(STRING) ") - && table.getModel().getValue(paramRowTwo, 0).equals("foo(STRING) "))) { - jobParamsPresent = true; + int paramRowOne = rowNumber; + + assertTrue("the table did not contain the correct job parameters for job parameter value foo", + checkModelColumn(paramRowOne, table, "-foo(java.lang.String) ")); + + assertTrue("the table did not contain the correct job parameters for job parameter value bar", + checkModelColumn(paramRowOne, table, "bar(java.lang.String) ")); + + assertTrue("the table did not contain the correct job parameters for job parameter value baz", + checkModelColumn(paramRowOne, table, "baz(java.lang.Long) ")); + + } + + private boolean checkModelColumn(int rowNumber, Table table, String value) { + boolean result = false; + int paramRowNumber = rowNumber; + if (table.getModel().getValue(paramRowNumber++, 0).equals(value) || + table.getModel().getValue(paramRowNumber++, 0).equals(value) || + table.getModel().getValue(paramRowNumber, 0).equals(value)) { + result = true; } - assertTrue("the table did not contain the correct job parameters ", jobParamsPresent); + return result; } @Test public void testViewInstance() { logger.info("Retrieve Job Instance Detail by Id"); - Table table = getTable(job().instanceDisplay(jobInstances.get(0).getInstanceId())); verifyColumnNumber(table, 5); checkCell(table, 0, 0, "Name "); @@ -198,8 +210,9 @@ public void testViewInstance() { checkCell(table, 0, 3, "Status "); checkCell(table, 0, 4, "Job Parameters "); boolean isValidCell = false; - if (table.getModel().getValue(1, 4).equals("foo=FOO,-bar=BAR") - || table.getModel().getValue(1, 4).equals("-bar=BAR,foo=FOO")) { + if (table.getModel().getValue(1, 4).toString().contains("-foo={value=FOO, type=class java.lang.String, identifying=false},java.lang.String,true") && + table.getModel().getValue(1, 4).toString().contains("bar={value=BAR, type=class java.lang.String, identifying=true},java.lang.String,true") && + table.getModel().getValue(1, 4).toString().contains("baz=55,java.lang.Long,true")) { isValidCell = true; } assertTrue("Job Parameters does match expected.", isValidCell);