Skip to content

Commit

Permalink
Merge branch 'main-3' into corneil/main3-junit5
Browse files Browse the repository at this point in the history
# Conflicts:
#	spring-cloud-dataflow-rest-client/src/test/java/org/springframework/cloud/dataflow/rest/client/DataflowTemplateTests.java
#	spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/JobExecutionControllerTests.java
#	spring-cloud-dataflow-shell-core/src/test/java/org/springframework/cloud/dataflow/shell/command/JobCommandTests.java
  • Loading branch information
corneil committed Aug 19, 2024
2 parents 7d1037e + e17b570 commit 89b8932
Show file tree
Hide file tree
Showing 24 changed files with 380 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -325,13 +325,17 @@ public void jobStop() throws Exception {
public void jobRestart() throws Exception {
this.mockMvc.perform(put("/jobs/executions/{id}", "2")
.queryParam("restart", "true")
.queryParam("useJsonJobParameters", "true")
)
.andDo(print())
.andExpect(status().isOk())
.andDo(this.documentationHandler.document(
pathParameters(parameterWithName("id")
.description("The id of an existing job execution (required)"))
, queryParameters(
parameterWithName("useJsonJobParameters").description("If true dataflow will " +
"serialize job parameters as JSON. Default is null, and the default " +
"configuration will be used to determine serialization method.").optional(),
parameterWithName("restart")
.description("Sends signal to restart the job if set to true")
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2666,7 +2666,7 @@ include::{snippets}/job-executions-documentation/job-restart/path-parameters.ado
[[api-guide-resources-job-executions-restart-request-parameters]]
===== Request Parameters

include::{snippets}/job-executions-documentation/job-restart/request-parameters.adoc[]
include::{snippets}/job-executions-documentation/job-restart/query-parameters.adoc[]



Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ public interface JobOperations {
*/
void executionRestart(long id);

/**
* Restarts a job by id
*
* @param id job execution id
* @param useJsonJobParameters if true {@link org.springframework.batch.core.JobParameters} will be serialized to JSON.
* Default is {@code Null} which will serialize the {@link org.springframework.batch.core.JobParameters}
* to the default specified in SCDF's configuration.
*/
void executionRestart(long id, Boolean useJsonJobParameters);

/**
* @return the list job executions without step executions known to the system.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,14 @@ public void executionRestart(long id) {
restTemplate.put(builder.toUriString(), null);
}

@Override
public void executionRestart(long id, Boolean useJsonJobParameters) {
UriComponentsBuilder builder = UriComponentsBuilder.fromUriString(executionLink.expand(id).getHref()).queryParam("restart", "true")
.queryParam("useJsonJobParameters", useJsonJobParameters);

restTemplate.put(builder.toUriString(), null);
}

@Override
public PagedModel<JobExecutionThinResource> executionThinList() {
UriComponentsBuilder builder = UriComponentsBuilder.fromUriString(thinExecutionsLink.getHref()).queryParam("size", "2000");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,24 +50,16 @@ public JobParameter deserialize(JsonParser jsonParser, DeserializationContext de
String type = node.get("type").asText();

JobParameter jobParameter;
//TODO: Boot3x followup Verify that Job Parameters setup properly for Batch 5
if (!type.isEmpty() && !type.equalsIgnoreCase("STRING")) {
if ("DATE".equalsIgnoreCase(type)) {
jobParameter = new JobParameter(LocalDateTime.parse(value), LocalDateTime.class, identifying);
}
else if ("DOUBLE".equalsIgnoreCase(type)) {
jobParameter = new JobParameter(Double.valueOf(value), Double.class, identifying);
}
else if ("LONG".equalsIgnoreCase(type)) {
jobParameter = new JobParameter(Long.valueOf(value), Long.class, identifying);
}
else {
throw new IllegalStateException("Unsupported JobParameter type: " + type);
if (!type.isEmpty()) {
try {
jobParameter = new JobParameter(value, Class.forName(type), identifying);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("JobParameter type %s is not supported by DataFlow".formatted(type), e);
}
}
else {
jobParameter = new JobParameter(value, String.class, identifying);
}
jobParameter = new JobParameter(value, String.class, identifying);
}

if (logger.isDebugEnabled()) {
logger.debug("jobParameter - value: {} (type: {}, isIdentifying: {})",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@

package org.springframework.cloud.dataflow.rest.support.jackson;

import java.util.Map;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.JobParameters;

/**
Expand All @@ -27,9 +30,12 @@
* @author Gunnar Hillert
* @since 1.0
*/
@JsonIgnoreProperties("empty")
@JsonIgnoreProperties({"empty", "identifyingParameters"})
public abstract class JobParametersJacksonMixIn {

@JsonProperty
abstract boolean isEmpty();

@JsonProperty
abstract Map<String, JobParameter<?>> getIdentifyingParameters();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.dataflow.rest.support.jackson;

import java.io.ByteArrayInputStream;
import java.io.IOException;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.json.UTF8StreamJsonParser;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Test;
import org.springframework.batch.core.JobParameter;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;

public class JobParameterJacksonDeserializerTests {

@Test
public void validJobParameter() throws IOException {
JobParameterJacksonDeserializer jobParameterJacksonDeserializer = new JobParameterJacksonDeserializer();
String json = "{\"value\":\"BAR\",\"type\":\"java.lang.String\",\"identifying\":true}";
JobParameter jobParameter = jobParameterJacksonDeserializer.deserialize(getJsonParser(json), null);
assertThat(jobParameter.getType()).isEqualTo(String.class);
assertThat(jobParameter.getValue()).isEqualTo("BAR");
assertThat(jobParameter.isIdentifying()).isTrue();
}

@Test
public void inValidJobParameter() throws IOException {
JobParameterJacksonDeserializer jobParameterJacksonDeserializer = new JobParameterJacksonDeserializer();
String json = "{\"value\":\"BAR\",\"type\":\"java.lang.FOO\",\"identifying\":true}";
assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> {
jobParameterJacksonDeserializer.deserialize(getJsonParser(json), null);
})
.withMessage("JobParameter type java.lang.FOO is not supported by DataFlow");
}

private JsonParser getJsonParser(String json) throws IOException {
JsonFactory factory = new JsonFactory();
byte[] jsonData = json.getBytes();
ByteArrayInputStream inputStream = new ByteArrayInputStream(jsonData);
UTF8StreamJsonParser jsonParser = (UTF8StreamJsonParser) factory.createParser(inputStream);
jsonParser.setCodec(new ObjectMapper());
return jsonParser;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -201,8 +202,11 @@ private static class StepExecutionRowMapper implements RowMapper<StepExecution>
public StepExecution mapRow(ResultSet rs, int rowNum) throws SQLException {
StepExecution stepExecution = new StepExecution(rs.getString(2), null);
stepExecution.setId(rs.getLong(1));
stepExecution.setStartTime(rs.getTimestamp(3).toLocalDateTime());
stepExecution.setEndTime(rs.getTimestamp(4).toLocalDateTime());
Timestamp startTimeStamp = rs.getTimestamp(3);
Timestamp endTimeStamp = rs.getTimestamp(4);

stepExecution.setStartTime((startTimeStamp == null) ? null : startTimeStamp.toLocalDateTime());
stepExecution.setEndTime((endTimeStamp == null) ? null : endTimeStamp.toLocalDateTime());
stepExecution.setStatus(BatchStatus.valueOf(rs.getString(5)));
stepExecution.setCommitCount(rs.getInt(6));
stepExecution.setReadCount(rs.getInt(7));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,13 +272,14 @@ public TaskJobService taskJobExecutionRepository(
DataflowTaskExplorer taskExplorer,
TaskDefinitionRepository taskDefinitionRepository,
TaskExecutionService taskExecutionService,
LauncherRepository launcherRepository) {
LauncherRepository launcherRepository, TaskConfigurationProperties taskConfigurationProperties) {
return new DefaultTaskJobService(
service,
taskExplorer,
taskDefinitionRepository,
taskExecutionService,
launcherRepository
launcherRepository,
taskConfigurationProperties
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import java.util.TimeZone;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.launch.JobExecutionNotRunningException;
Expand All @@ -39,7 +41,6 @@
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
Expand All @@ -63,6 +64,8 @@
@ExposesResourceFor(JobExecutionResource.class)
public class JobExecutionController {

private static final Logger logger = LoggerFactory.getLogger(JobExecutionController.class);

private final Assembler jobAssembler = new Assembler();

private final TaskJobService taskJobService;
Expand Down Expand Up @@ -148,9 +151,16 @@ public ResponseEntity<Void> stopJobExecution(
@RequestMapping(value = {"/{executionId}"}, method = RequestMethod.PUT, params = "restart=true")
@ResponseStatus(HttpStatus.OK)
public ResponseEntity<Void> restartJobExecution(
@PathVariable("executionId") long jobExecutionId) throws NoSuchJobExecutionException {
taskJobService.restartJobExecution(jobExecutionId);
return ResponseEntity.ok().build();
@PathVariable("executionId") long jobExecutionId,
@RequestParam(value = "useJsonJobParameters", required = false) Boolean useJsonJobParameters)
throws NoSuchJobExecutionException {
try {
taskJobService.restartJobExecution(jobExecutionId, useJsonJobParameters);
} catch (NoSuchJobExecutionException e) {
logger.warn(e.getMessage(), e);
throw e;
}
return ResponseEntity.ok().build();
}

/**
Expand Down Expand Up @@ -188,7 +198,8 @@ public JobExecutionResource instantiateModel(TaskJobExecution taskJobExecution)
resource.add(linkTo(methodOn(JobExecutionController.class).stopJobExecution(taskJobExecution.getJobExecution().getJobId())).withRel("stop"));
}
if (!taskJobExecution.getJobExecution().getStatus().equals(BatchStatus.COMPLETED)) {
resource.add(linkTo(methodOn(JobExecutionController.class).restartJobExecution(taskJobExecution.getJobExecution().getJobId())).withRel("restart"));
// In this case we use null for the useJsonJobParameters parameter, so we use the configured job parameter serialization method specified by dataflow.
resource.add(linkTo(methodOn(JobExecutionController.class).restartJobExecution(taskJobExecution.getJobExecution().getJobId(), null)).withRel("restart"));
}
} catch (NoSuchJobExecutionException | JobExecutionNotRunningException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ public JobExecutionThinResource instantiateModel(TaskJobExecution taskJobExecuti
resource.add(linkTo(methodOn(JobExecutionController.class).stopJobExecution(taskJobExecution.getJobExecution().getJobId())).withRel("stop"));
}
if (taskJobExecution.getJobExecution().getEndTime() != null && !taskJobExecution.getJobExecution().isRunning()) {
resource.add(linkTo(methodOn(JobExecutionController.class).restartJobExecution(taskJobExecution.getJobExecution().getJobId())).withRel("restart"));
// In this case we use null for the useJsonJobParameters parameter so we use the configured job parameter serialization method specified by dataflow.
resource.add(linkTo(methodOn(JobExecutionController.class).restartJobExecution(taskJobExecution.getJobExecution().getJobId(), null)).withRel("restart"));
}
} catch (NoSuchJobExecutionException | JobExecutionNotRunningException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ private double calculatePercentageComplete() {
double result = 0.0;
if (readHistory.getMean() == 0) {
percentCompleteBasis = PercentCompleteBasis.DURATION;
result = getDurationBasedEstimate(duration);
result = getDurationBasedEstimate();
}
else {
percentCompleteBasis = PercentCompleteBasis.READCOUNT;
Expand All @@ -120,7 +120,7 @@ private double calculatePercentageComplete() {
return result;
}

private double getDurationBasedEstimate(double duration) {
private double getDurationBasedEstimate() {

CumulativeHistory durationHistory = stepExecutionHistory.getDuration();
if (durationHistory.getMean() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.springframework.cloud.dataflow.server.service;

import java.util.Date;
import java.util.List;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.Job;
Expand All @@ -31,6 +30,7 @@
import org.springframework.cloud.dataflow.rest.job.TaskJobExecution;
import org.springframework.cloud.dataflow.server.batch.JobExecutionWithStepCount;
import org.springframework.cloud.dataflow.server.job.support.JobNotRestartableException;
import org.springframework.cloud.dataflow.server.service.impl.TaskConfigurationProperties;
import org.springframework.cloud.task.repository.TaskExecution;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
Expand Down Expand Up @@ -99,15 +99,32 @@ public interface TaskJobService {
JobInstanceExecutions getJobInstance(long id) throws NoSuchJobInstanceException, NoSuchJobException;

/**
* Restarts a {@link JobExecution} IF the respective {@link JobExecution} is actually
* Restarts a {@link JobExecution} if the respective {@link JobExecution} is actually
* deemed restartable. Otherwise a {@link JobNotRestartableException} is being thrown.
* The system will use {@link TaskConfigurationProperties#isUseJsonJobParameters()} to
* determine the {@link org.springframework.batch.core.JobParameter} serializer.
*
* @param jobExecutionId The id of the JobExecution to restart.
* @throws NoSuchJobExecutionException if the JobExecution for the provided id does not
* exist.
*/
void restartJobExecution(long jobExecutionId) throws NoSuchJobExecutionException;

/**
* Restarts a {@link JobExecution} if the respective {@link JobExecution} is actually
* deemed restartable. Otherwise, a {@link JobNotRestartableException} is being thrown.
*
* @param jobExecutionId The id of the JobExecution to restart.
* @param useJsonJobParameters if set to true, dataflow will serialize job parameters to the command line using the
* format provided by {@code JsonJobParametersConverter}.
* If set to false dataflow will use {@code DefaultParametersConverter}.
* If null dataflow will use {@link TaskConfigurationProperties#isUseJsonJobParameters()}
* to determine the {@link org.springframework.batch.core.JobParameter} serializer.
* @throws NoSuchJobExecutionException if the JobExecution for the provided id does not
* exist.
*/
void restartJobExecution(long jobExecutionId, Boolean useJsonJobParameters) throws NoSuchJobExecutionException;

/**
* Requests a {@link JobExecution} to stop.
* <p>
Expand Down
Loading

0 comments on commit 89b8932

Please sign in to comment.