Skip to content

Commit

Permalink
Need to select the serialization method for JobParameters for the com…
Browse files Browse the repository at this point in the history
…mandline

User needs ability to set the default serialization technique for SCDF when restarting a job
User needs ability to select a serialization technique for a specific job restart
When user restarts a job repository from the list that is derived from thinjobexecutions it should use default technique

Add restful documentation.

Add support to allow user to set useJsonJobParameters for job relaunch via the shell.

Note: there are not tests for the shell update in this commit. This is because the current
set of tests rely on @EnableDataflowServer which does not work.   But before we fix
@EnableDataflowServer we need to make sure we want to carry it forward per Issue #1040

Polish PR before push to repo

JobCommand should ignore identifyingParameters when deserializing JobParameters

This is a generated list and and will cause deserialization to fail if not skipped

SCDF needs to support any type of class for JobParameters

However, if the class is not a base java type or one provided by dataflow the user has to build dataflow with their class included.

Add tests for JobParameterJacksonDeserializer

Remove Disabled annotation from JobExecutionController tests that are no longer needed

Update per code review request.

Added test for JobParamterMixin via the JobTemplateTests
Removed the duration parameter from getDurationBasedEstimate method
Rebased

Reset the duration calculation from nanos back to millis.

Optimized restartExecutionArgs routine that removes duplicates.

This was per a comment in code review

Remove unnecessary exclusions from AbstractShellIntegrationTest

The changes are  code review requests.

Add warn log message when job restart id is invalid.

* Initialize ObjectMapper with the tools provided by DataFlowTemplate when testing

* These changes were identified during code review
  • Loading branch information
cppwfs committed Aug 19, 2024
1 parent 50f1e20 commit e17b570
Show file tree
Hide file tree
Showing 25 changed files with 395 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -334,13 +334,17 @@ public void jobStop() throws Exception {
public void jobRestart() throws Exception {
this.mockMvc.perform(put("/jobs/executions/{id}", "2")
.queryParam("restart", "true")
.queryParam("useJsonJobParameters", "true")
)
.andDo(print())
.andExpect(status().isOk())
.andDo(this.documentationHandler.document(
pathParameters(parameterWithName("id")
.description("The id of an existing job execution (required)"))
, queryParameters(
parameterWithName("useJsonJobParameters").description("If true dataflow will " +
"serialize job parameters as JSON. Default is null, and the default " +
"configuration will be used to determine serialization method.").optional(),
parameterWithName("restart")
.description("Sends signal to restart the job if set to true")
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2666,7 +2666,7 @@ include::{snippets}/job-executions-documentation/job-restart/path-parameters.ado
[[api-guide-resources-job-executions-restart-request-parameters]]
===== Request Parameters

include::{snippets}/job-executions-documentation/job-restart/request-parameters.adoc[]
include::{snippets}/job-executions-documentation/job-restart/query-parameters.adoc[]



Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ public interface JobOperations {
*/
void executionRestart(long id);

/**
* Restarts a job by id
*
* @param id job execution id
* @param useJsonJobParameters if true {@link org.springframework.batch.core.JobParameters} will be serialized to JSON.
* Default is {@code Null} which will serialize the {@link org.springframework.batch.core.JobParameters}
* to the default specified in SCDF's configuration.
*/
void executionRestart(long id, Boolean useJsonJobParameters);

/**
* @return the list job executions without step executions known to the system.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,14 @@ public void executionRestart(long id) {
restTemplate.put(builder.toUriString(), null);
}

@Override
public void executionRestart(long id, Boolean useJsonJobParameters) {
UriComponentsBuilder builder = UriComponentsBuilder.fromUriString(executionLink.expand(id).getHref()).queryParam("restart", "true")
.queryParam("useJsonJobParameters", useJsonJobParameters);

restTemplate.put(builder.toUriString(), null);
}

@Override
public PagedModel<JobExecutionThinResource> executionThinList() {
UriComponentsBuilder builder = UriComponentsBuilder.fromUriString(thinExecutionsLink.getHref()).queryParam("size", "2000");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@
import java.util.List;
import java.util.Optional;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -34,15 +33,14 @@
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.cloud.dataflow.rest.Version;
import org.springframework.cloud.dataflow.rest.job.StepExecutionHistory;
import org.springframework.cloud.dataflow.rest.resource.RootResource;
import org.springframework.cloud.dataflow.rest.support.jackson.Jackson2DataflowModule;
import org.springframework.hateoas.Link;
import org.springframework.hateoas.LinkRelation;
import org.springframework.hateoas.mediatype.hal.Jackson2HalModule;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
import org.springframework.web.client.ResourceAccessException;
Expand All @@ -69,10 +67,7 @@ public class DataflowTemplateTests {
@Before
public void setup() {
mapper = new ObjectMapper();
mapper.registerModule(new Jdk8Module());
mapper.registerModule(new Jackson2HalModule());
mapper.registerModule(new JavaTimeModule());
mapper.registerModule(new Jackson2DataflowModule());
DataFlowTemplate.prepareObjectMapper(mapper);
System.setProperty("sun.net.client.defaultConnectTimeout", String.valueOf(100));
}

Expand Down Expand Up @@ -102,9 +97,22 @@ public void testDataFlowTemplateContructorWithNonExistingUri() throws URISyntaxE

@Test
public void testThatObjectMapperGetsPrepared() {
final ObjectMapper objectMapper = new ObjectMapper();
DataFlowTemplate.prepareObjectMapper(objectMapper);
assertCorrectMixins(objectMapper);
assertCorrectMixins(this.mapper);
}

@Test
public void testJobParameters() throws JsonProcessingException {
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addString("foo", "foo");
jobParametersBuilder.addString("bar", "bar");

JobParameters jobParameters = jobParametersBuilder.toJobParameters();
assertCorrectMixins(this.mapper);
String jobParametersSerialized = this.mapper.writeValueAsString(jobParameters);
jobParameters = this.mapper.readValue(jobParametersSerialized, JobParameters.class);
assertEquals(jobParameters.getParameter("foo").getValue(), "foo");
assertEquals(jobParameters.getParameter("bar").getValue(), "bar");
assertEquals(jobParameters.getParameters().size(), 2);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,24 +50,16 @@ public JobParameter deserialize(JsonParser jsonParser, DeserializationContext de
String type = node.get("type").asText();

JobParameter jobParameter;
//TODO: Boot3x followup Verify that Job Parameters setup properly for Batch 5
if (!type.isEmpty() && !type.equalsIgnoreCase("STRING")) {
if ("DATE".equalsIgnoreCase(type)) {
jobParameter = new JobParameter(LocalDateTime.parse(value), LocalDateTime.class, identifying);
}
else if ("DOUBLE".equalsIgnoreCase(type)) {
jobParameter = new JobParameter(Double.valueOf(value), Double.class, identifying);
}
else if ("LONG".equalsIgnoreCase(type)) {
jobParameter = new JobParameter(Long.valueOf(value), Long.class, identifying);
}
else {
throw new IllegalStateException("Unsupported JobParameter type: " + type);
if (!type.isEmpty()) {
try {
jobParameter = new JobParameter(value, Class.forName(type), identifying);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("JobParameter type %s is not supported by DataFlow".formatted(type), e);
}
}
else {
jobParameter = new JobParameter(value, String.class, identifying);
}
jobParameter = new JobParameter(value, String.class, identifying);
}

if (logger.isDebugEnabled()) {
logger.debug("jobParameter - value: {} (type: {}, isIdentifying: {})",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@

package org.springframework.cloud.dataflow.rest.support.jackson;

import java.util.Map;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.JobParameters;

/**
Expand All @@ -27,9 +30,12 @@
* @author Gunnar Hillert
* @since 1.0
*/
@JsonIgnoreProperties("empty")
@JsonIgnoreProperties({"empty", "identifyingParameters"})
public abstract class JobParametersJacksonMixIn {

@JsonProperty
abstract boolean isEmpty();

@JsonProperty
abstract Map<String, JobParameter<?>> getIdentifyingParameters();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.dataflow.rest.support.jackson;

import java.io.ByteArrayInputStream;
import java.io.IOException;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.json.UTF8StreamJsonParser;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Test;
import org.springframework.batch.core.JobParameter;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;

public class JobParameterJacksonDeserializerTests {

@Test
public void validJobParameter() throws IOException {
JobParameterJacksonDeserializer jobParameterJacksonDeserializer = new JobParameterJacksonDeserializer();
String json = "{\"value\":\"BAR\",\"type\":\"java.lang.String\",\"identifying\":true}";
JobParameter jobParameter = jobParameterJacksonDeserializer.deserialize(getJsonParser(json), null);
assertThat(jobParameter.getType()).isEqualTo(String.class);
assertThat(jobParameter.getValue()).isEqualTo("BAR");
assertThat(jobParameter.isIdentifying()).isTrue();
}

@Test
public void inValidJobParameter() throws IOException {
JobParameterJacksonDeserializer jobParameterJacksonDeserializer = new JobParameterJacksonDeserializer();
String json = "{\"value\":\"BAR\",\"type\":\"java.lang.FOO\",\"identifying\":true}";
assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> {
jobParameterJacksonDeserializer.deserialize(getJsonParser(json), null);
})
.withMessage("JobParameter type java.lang.FOO is not supported by DataFlow");
}

private JsonParser getJsonParser(String json) throws IOException {
JsonFactory factory = new JsonFactory();
byte[] jsonData = json.getBytes();
ByteArrayInputStream inputStream = new ByteArrayInputStream(jsonData);
UTF8StreamJsonParser jsonParser = (UTF8StreamJsonParser) factory.createParser(inputStream);
jsonParser.setCodec(new ObjectMapper());
return jsonParser;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -201,8 +202,11 @@ private static class StepExecutionRowMapper implements RowMapper<StepExecution>
public StepExecution mapRow(ResultSet rs, int rowNum) throws SQLException {
StepExecution stepExecution = new StepExecution(rs.getString(2), null);
stepExecution.setId(rs.getLong(1));
stepExecution.setStartTime(rs.getTimestamp(3).toLocalDateTime());
stepExecution.setEndTime(rs.getTimestamp(4).toLocalDateTime());
Timestamp startTimeStamp = rs.getTimestamp(3);
Timestamp endTimeStamp = rs.getTimestamp(4);

stepExecution.setStartTime((startTimeStamp == null) ? null : startTimeStamp.toLocalDateTime());
stepExecution.setEndTime((endTimeStamp == null) ? null : endTimeStamp.toLocalDateTime());
stepExecution.setStatus(BatchStatus.valueOf(rs.getString(5)));
stepExecution.setCommitCount(rs.getInt(6));
stepExecution.setReadCount(rs.getInt(7));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,13 +272,14 @@ public TaskJobService taskJobExecutionRepository(
DataflowTaskExplorer taskExplorer,
TaskDefinitionRepository taskDefinitionRepository,
TaskExecutionService taskExecutionService,
LauncherRepository launcherRepository) {
LauncherRepository launcherRepository, TaskConfigurationProperties taskConfigurationProperties) {
return new DefaultTaskJobService(
service,
taskExplorer,
taskDefinitionRepository,
taskExecutionService,
launcherRepository
launcherRepository,
taskConfigurationProperties
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import java.util.TimeZone;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.launch.JobExecutionNotRunningException;
Expand All @@ -39,7 +41,6 @@
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
Expand All @@ -63,6 +64,8 @@
@ExposesResourceFor(JobExecutionResource.class)
public class JobExecutionController {

private static final Logger logger = LoggerFactory.getLogger(JobExecutionController.class);

private final Assembler jobAssembler = new Assembler();

private final TaskJobService taskJobService;
Expand Down Expand Up @@ -148,9 +151,16 @@ public ResponseEntity<Void> stopJobExecution(
@RequestMapping(value = {"/{executionId}"}, method = RequestMethod.PUT, params = "restart=true")
@ResponseStatus(HttpStatus.OK)
public ResponseEntity<Void> restartJobExecution(
@PathVariable("executionId") long jobExecutionId) throws NoSuchJobExecutionException {
taskJobService.restartJobExecution(jobExecutionId);
return ResponseEntity.ok().build();
@PathVariable("executionId") long jobExecutionId,
@RequestParam(value = "useJsonJobParameters", required = false) Boolean useJsonJobParameters)
throws NoSuchJobExecutionException {
try {
taskJobService.restartJobExecution(jobExecutionId, useJsonJobParameters);
} catch (NoSuchJobExecutionException e) {
logger.warn(e.getMessage(), e);
throw e;
}
return ResponseEntity.ok().build();
}

/**
Expand Down Expand Up @@ -188,7 +198,8 @@ public JobExecutionResource instantiateModel(TaskJobExecution taskJobExecution)
resource.add(linkTo(methodOn(JobExecutionController.class).stopJobExecution(taskJobExecution.getJobExecution().getJobId())).withRel("stop"));
}
if (!taskJobExecution.getJobExecution().getStatus().equals(BatchStatus.COMPLETED)) {
resource.add(linkTo(methodOn(JobExecutionController.class).restartJobExecution(taskJobExecution.getJobExecution().getJobId())).withRel("restart"));
// In this case we use null for the useJsonJobParameters parameter, so we use the configured job parameter serialization method specified by dataflow.
resource.add(linkTo(methodOn(JobExecutionController.class).restartJobExecution(taskJobExecution.getJobExecution().getJobId(), null)).withRel("restart"));
}
} catch (NoSuchJobExecutionException | JobExecutionNotRunningException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ public JobExecutionThinResource instantiateModel(TaskJobExecution taskJobExecuti
resource.add(linkTo(methodOn(JobExecutionController.class).stopJobExecution(taskJobExecution.getJobExecution().getJobId())).withRel("stop"));
}
if (taskJobExecution.getJobExecution().getEndTime() != null && !taskJobExecution.getJobExecution().isRunning()) {
resource.add(linkTo(methodOn(JobExecutionController.class).restartJobExecution(taskJobExecution.getJobExecution().getJobId())).withRel("restart"));
// In this case we use null for the useJsonJobParameters parameter so we use the configured job parameter serialization method specified by dataflow.
resource.add(linkTo(methodOn(JobExecutionController.class).restartJobExecution(taskJobExecution.getJobExecution().getJobId(), null)).withRel("restart"));
}
} catch (NoSuchJobExecutionException | JobExecutionNotRunningException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ private double calculatePercentageComplete() {
double result = 0.0;
if (readHistory.getMean() == 0) {
percentCompleteBasis = PercentCompleteBasis.DURATION;
result = getDurationBasedEstimate(duration);
result = getDurationBasedEstimate();
}
else {
percentCompleteBasis = PercentCompleteBasis.READCOUNT;
Expand All @@ -120,7 +120,7 @@ private double calculatePercentageComplete() {
return result;
}

private double getDurationBasedEstimate(double duration) {
private double getDurationBasedEstimate() {

CumulativeHistory durationHistory = stepExecutionHistory.getDuration();
if (durationHistory.getMean() == 0) {
Expand Down
Loading

0 comments on commit e17b570

Please sign in to comment.