Skip to content

Commit

Permalink
Use boot3 notation when restarting a batch job
Browse files Browse the repository at this point in the history
Update code to fit code review requests
  • Loading branch information
cppwfs committed Dec 18, 2023
1 parent ef151d4 commit 3c8d6c0
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,8 @@ protected JobParameters getJobParameters(Long executionId, String schemaTarget)
value = new JobParameter((Date) typedValue, identifying);
} else if (typedValue instanceof Double) {
value = new JobParameter((Double) typedValue, identifying);
} else if (typedValue instanceof Long) {
value = new JobParameter((Long) typedValue, identifying);
} else if (typedValue instanceof Number) {
value = new JobParameter(((Number) typedValue).doubleValue(), identifying);
} else if (typedValue instanceof Instant) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.springframework.cloud.dataflow.rest.job.TaskJobExecution;
import org.springframework.cloud.dataflow.rest.job.support.JobUtils;
import org.springframework.cloud.dataflow.schema.AggregateTaskExecution;
import org.springframework.cloud.dataflow.schema.AppBootSchemaVersion;
import org.springframework.cloud.dataflow.schema.SchemaVersionTarget;
import org.springframework.cloud.dataflow.server.batch.JobExecutionWithStepCount;
import org.springframework.cloud.dataflow.server.batch.JobService;
Expand Down Expand Up @@ -234,7 +235,7 @@ public void restartJobExecution(long jobExecutionId, String schemaTarget) throws
deploymentProperties.put(DefaultTaskExecutionService.TASK_PLATFORM_NAME, platformName);
taskExecutionService.executeTask(taskDefinition.getName(), deploymentProperties,
restartExecutionArgs(taskExecution.getArguments(),
taskJobExecution.getJobExecution().getJobParameters()));
taskJobExecution.getJobExecution().getJobParameters(), schemaTarget));
} else {
throw new IllegalStateException(String.format("Did not find platform for taskName=[%s] , taskId=[%s]",
taskExecution.getTaskName(), taskJobExecution.getTaskId()));
Expand All @@ -252,8 +253,10 @@ public void restartJobExecution(long jobExecutionId, String schemaTarget) throws
* @return deduped list of arguments that contains the original arguments and any
* identifying job parameters not in the original task execution arguments.
*/
private List<String> restartExecutionArgs(List<String> taskExecutionArgs, JobParameters jobParameters) {
private List<String> restartExecutionArgs(List<String> taskExecutionArgs, JobParameters jobParameters, String schemaTarget) {
List<String> result = new ArrayList<>(taskExecutionArgs);
String boot3Version = SchemaVersionTarget.createDefault(AppBootSchemaVersion.BOOT3).getName();
String type;
Map<String, JobParameter> jobParametersMap = jobParameters.getParameters();
for (String key : jobParametersMap.keySet()) {
if (!key.startsWith("-")) {
Expand All @@ -265,9 +268,27 @@ private List<String> restartExecutionArgs(List<String> taskExecutionArgs, JobPar
}
}
if (!existsFlag) {
result.add(String.format("%s(%s)=%s", key,
String param;
if (boot3Version.equals(schemaTarget)) {
if (JobParameter.ParameterType.LONG.equals(jobParametersMap.get(key).getType())) {
type = Long.class.getCanonicalName();
} else if (JobParameter.ParameterType.DATE.equals(jobParametersMap.get(key).getType())) {
type = Date.class.getCanonicalName();
} else if (JobParameter.ParameterType.DOUBLE.equals(jobParametersMap.get(key).getType())) {
type = Double.class.getCanonicalName();
} else if (JobParameter.ParameterType.STRING.equals(jobParametersMap.get(key).getType())) {
type = String.class.getCanonicalName();
} else {
throw new IllegalArgumentException("Unable to convert " +
jobParametersMap.get(key).getType() + " to known type of JobParameters");
}
param = String.format("%s=%s,%s", key, jobParametersMap.get(key).getValue(), type);
} else {
param = String.format("%s(%s)=%s", key,
jobParametersMap.get(key).getType().toString().toLowerCase(),
jobParameters.getString(key)));
jobParameters.getString(key));
}
result.add(param);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import javax.sql.DataSource;
import java.net.MalformedURLException;
import java.net.URI;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
Expand All @@ -32,6 +33,7 @@
import org.mockito.ArgumentCaptor;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.JobParameter;
Expand All @@ -51,6 +53,7 @@
import org.springframework.cloud.dataflow.core.TaskDefinition;
import org.springframework.cloud.dataflow.core.TaskPlatformFactory;
import org.springframework.cloud.dataflow.registry.service.AppRegistryService;
import org.springframework.cloud.dataflow.schema.AppBootSchemaVersion;
import org.springframework.cloud.dataflow.schema.SchemaVersionTarget;
import org.springframework.cloud.dataflow.server.configuration.JobDependencies;
import org.springframework.cloud.dataflow.server.configuration.TaskServiceDependencies;
Expand All @@ -72,7 +75,7 @@
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -89,12 +92,21 @@
@AutoConfigureTestDatabase(replace = AutoConfigureTestDatabase.Replace.ANY)
public class DefaultTaskJobServiceTests {

private static final String SAVE_JOB_EXECUTION = "INSERT INTO BOOT3_BATCH_JOB_EXECUTION(JOB_EXECUTION_ID, " +
"JOB_INSTANCE_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, VERSION, CREATE_TIME, LAST_UPDATED) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
private static final String SAVE_JOB_EXECUTION_PARAM = "INSERT INTO BOOT3_BATCH_JOB_EXECUTION_PARAMS (" +
"job_execution_id, parameter_name, parameter_type, parameter_value, identifying) " +
"VALUES (?, ?, ?, ?, ?)";

private final static String BASE_JOB_NAME = "myJob";

private final static String JOB_NAME_ORIG = BASE_JOB_NAME + "_ORIG";

private static long jobInstanceCount = 0;

private static long boot3JobInstanceCount = 0;

@Autowired
TaskDefinitionRepository taskDefinitionRepository;

Expand All @@ -110,6 +122,8 @@ public class DefaultTaskJobServiceTests {
@Autowired
DataSource dataSource;

JdbcTemplate jdbcTemplate;

@Autowired
DataSourceProperties dataSourceProperties;

Expand Down Expand Up @@ -139,20 +153,26 @@ public void setup() {
jobParameterMap.put("identifying.param", new JobParameter("testparam"));
this.jobParameters = new JobParameters(jobParameterMap);

JdbcTemplate template = new JdbcTemplate(this.dataSource);
template.execute("DELETE FROM TASK_EXECUTION_PARAMS");
template.execute("DELETE FROM TASK_TASK_BATCH");
template.execute("DELETE FROM TASK_EXECUTION_METADATA");
template.execute("DELETE FROM TASK_EXECUTION;");
template.execute("ALTER SEQUENCE TASK_EXECUTION_METADATA_SEQ RESTART WITH 50");
template.execute("ALTER SEQUENCE TASK_SEQ RESTART WITH 1");
this.jdbcTemplate = new JdbcTemplate(this.dataSource);
resetTaskTables("TASK_");
initializeSuccessfulRegistry(this.appRegistry);
template.execute("INSERT INTO TASK_EXECUTION (TASK_EXECUTION_ID, TASK_NAME) VALUES (0, 'myTask_ORIG');");
resetTaskTables("BOOT3_TASK_");

reset(this.taskLauncher);
when(this.taskLauncher.launch(any())).thenReturn("1234");
clearLaunchers();
}

private void resetTaskTables(String prefix) {
this.jdbcTemplate.execute("DELETE FROM " + prefix + "EXECUTION_PARAMS");
this.jdbcTemplate.execute("DELETE FROM " + prefix + "TASK_BATCH");
this.jdbcTemplate.execute("DELETE FROM " + prefix + "EXECUTION_METADATA");
this.jdbcTemplate.execute("DELETE FROM " + prefix + "EXECUTION;");
this.jdbcTemplate.execute("ALTER SEQUENCE " + prefix + "EXECUTION_METADATA_SEQ RESTART WITH 50");
this.jdbcTemplate.execute("ALTER SEQUENCE " + prefix + "SEQ RESTART WITH 1");
this.jdbcTemplate.execute("INSERT INTO " + prefix + "EXECUTION (TASK_EXECUTION_ID, TASK_NAME) VALUES (0, 'myTask_ORIG');");
}

@Test
public void testRestart() throws Exception {
createBaseLaunchers();
Expand All @@ -166,6 +186,20 @@ public void testRestart() throws Exception {
assertTrue(appDeploymentRequest.getCommandlineArguments().contains("identifying.param(string)=testparam"));
}

@Test
public void testRestartBoot3() throws Exception {
SchemaVersionTarget schemaVersionTarget = new SchemaVersionTarget("boot3", AppBootSchemaVersion.BOOT3,
"BOOT3_TASK_", "BOOT3_BATCH_", "H2");
createBaseLaunchers();
initializeJobs(true, schemaVersionTarget);
this.taskJobService.restartJobExecution(boot3JobInstanceCount,
SchemaVersionTarget.createDefault(AppBootSchemaVersion.BOOT3).getName());
final ArgumentCaptor<AppDeploymentRequest> argument = ArgumentCaptor.forClass(AppDeploymentRequest.class);
verify(this.taskLauncher, times(1)).launch(argument.capture());
AppDeploymentRequest appDeploymentRequest = argument.getAllValues().get(0);
assertTrue(appDeploymentRequest.getCommandlineArguments().contains("identifying.param=testparm,java.lang.String"));
}

@Test
public void testRestartNoPlatform() {
createBaseLaunchers();
Expand All @@ -189,8 +223,14 @@ public void testRestartOnePlatform() throws Exception {
}

private void initializeJobs(boolean insertTaskExecutionMetadata) {
this.taskDefinitionRepository.save(new TaskDefinition(JOB_NAME_ORIG + jobInstanceCount, "some-name"));
SchemaVersionTarget schemaVersionTarget = aggregateExecutionSupport.findSchemaVersionTarget("some-name", taskDefinitionReader);
initializeJobs(insertTaskExecutionMetadata,
new SchemaVersionTarget("boot2", AppBootSchemaVersion.BOOT2, "TASK_",
"BATCH_", "H2"));
}
private void initializeJobs(boolean insertTaskExecutionMetadata, SchemaVersionTarget schemaVersionTarget) {
String definitionName = (AppBootSchemaVersion.BOOT3.equals(schemaVersionTarget.getSchemaVersion())) ?
"some-name-boot3" : "some-name";
this.taskDefinitionRepository.save(new TaskDefinition(JOB_NAME_ORIG + jobInstanceCount, definitionName ));
JobRepository jobRepository = jobRepositoryContainer.get(schemaVersionTarget.getName());
TaskBatchDao taskBatchDao = taskBatchDaoContainer.get(schemaVersionTarget.getName());
TaskExecutionDao taskExecutionDao = taskExecutionDaoContainer.get(schemaVersionTarget.getName());
Expand All @@ -200,9 +240,15 @@ private void initializeJobs(boolean insertTaskExecutionMetadata) {
taskExecutionDao,
JOB_NAME_ORIG + jobInstanceCount,
BatchStatus.FAILED,
insertTaskExecutionMetadata
insertTaskExecutionMetadata,
schemaVersionTarget
);
jobInstanceCount++;
if(AppBootSchemaVersion.BOOT2.equals(schemaVersionTarget.getSchemaVersion())) {
jobInstanceCount++;
}
else {
boot3JobInstanceCount++;
}

}

Expand All @@ -212,7 +258,8 @@ private void createSampleJob(
TaskExecutionDao taskExecutionDao,
String jobName,
BatchStatus status,
boolean insertTaskExecutionMetadata
boolean insertTaskExecutionMetadata,
SchemaVersionTarget schemaVersionTarget
) {
JobInstance instance = jobRepository.createJobInstance(jobName, new JobParameters());

Expand All @@ -221,13 +268,28 @@ private void createSampleJob(
JdbcTemplate template = new JdbcTemplate(this.dataSource);

if (insertTaskExecutionMetadata) {
template.execute(String.format("INSERT INTO TASK_EXECUTION_METADATA (ID, TASK_EXECUTION_ID, TASK_EXECUTION_MANIFEST) VALUES (%s, %s, '{\"taskDeploymentRequest\":{\"definition\":{\"name\":\"bd0917a\",\"properties\":{\"spring.datasource.username\":\"root\",\"spring.cloud.task.name\":\"bd0917a\",\"spring.datasource.url\":\"jdbc:mariadb://localhost:3306/task\",\"spring.datasource.driverClassName\":\"org.mariadb.jdbc.Driver\",\"spring.datasource.password\":\"password\"}},\"resource\":\"file:/Users/glennrenfro/tmp/batchdemo-0.0.1-SNAPSHOT.jar\",\"deploymentProperties\":{},\"commandlineArguments\":[\"run.id_long=1\",\"--spring.cloud.task.executionid=201\"]},\"platformName\":\"demo\"}')", taskExecution.getExecutionId(), taskExecution.getExecutionId()));
template.execute(String.format("INSERT INTO " + schemaVersionTarget.getTaskPrefix() + "EXECUTION_METADATA (ID, TASK_EXECUTION_ID, TASK_EXECUTION_MANIFEST) VALUES (%s, %s, '{\"taskDeploymentRequest\":{\"definition\":{\"name\":\"bd0917a\",\"properties\":{\"spring.datasource.username\":\"root\",\"spring.cloud.task.name\":\"bd0917a\",\"spring.datasource.url\":\"jdbc:mariadb://localhost:3306/task\",\"spring.datasource.driverClassName\":\"org.mariadb.jdbc.Driver\",\"spring.datasource.password\":\"password\"}},\"resource\":\"file:/Users/glennrenfro/tmp/batchdemo-0.0.1-SNAPSHOT.jar\",\"deploymentProperties\":{},\"commandlineArguments\":[\"run.id_long=1\",\"--spring.cloud.task.executionid=201\"]},\"platformName\":\"demo\"}')", taskExecution.getExecutionId(), taskExecution.getExecutionId()));
}
jobExecution = jobRepository.createJobExecution(instance,
if(AppBootSchemaVersion.BOOT3.equals(schemaVersionTarget.getSchemaVersion())) {
jobExecution = new JobExecution(instance, 1L, this.jobParameters, "foo");
jobExecution.setCreateTime(new Date());
jobExecution.setVersion(1);
Object[] jobExecutionParameters = new Object[] { 1, 1, new Date(), new Date(),
BatchStatus.COMPLETED, ExitStatus.COMPLETED,
ExitStatus.COMPLETED.getExitDescription(), 1, new Date(), new Date() };
Object[] jobExecutionParmParameters = new Object[] { 1, "identifying.param", "java.lang.String", "testparm", "Y"};
this.jdbcTemplate.update(SAVE_JOB_EXECUTION, jobExecutionParameters,
new int[] { Types.BIGINT, Types.BIGINT, Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR,
Types.VARCHAR, Types.INTEGER, Types.TIMESTAMP, Types.TIMESTAMP });
this.jdbcTemplate.update(SAVE_JOB_EXECUTION_PARAM, jobExecutionParmParameters, new int[] { Types.BIGINT,
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.CHAR});
} else {
jobExecution = jobRepository.createJobExecution(instance,
this.jobParameters, null);
StepExecution stepExecution = new StepExecution("foo", jobExecution, 1L);
stepExecution.setId(null);
jobRepository.add(stepExecution);
StepExecution stepExecution = new StepExecution("foo", jobExecution, 1L);
stepExecution.setId(null);
jobRepository.add(stepExecution);
}
taskBatchDao.saveRelationship(taskExecution, jobExecution);
jobExecution.setStatus(status);
jobExecution.setStartTime(new Date());
Expand All @@ -248,8 +310,10 @@ private void createBaseLaunchers() {
}

private static void initializeSuccessfulRegistry(AppRegistryService appRegistry) {
when(appRegistry.find(anyString(), any(ApplicationType.class))).thenReturn(
when(appRegistry.find(eq("some-name"), any(ApplicationType.class))).thenReturn(
new AppRegistration("some-name", ApplicationType.task, URI.create("https://helloworld")));
when(appRegistry.find(eq("some-name-boot3"), any(ApplicationType.class))).thenReturn(
new AppRegistration("some-name-boot3", ApplicationType.task, "", URI.create("https://helloworld"), URI.create("https://helloworld"), AppBootSchemaVersion.fromBootVersion("3")));
try {
when(appRegistry.getAppResource(any())).thenReturn(new FileUrlResource("src/test/resources/apps/foo-task"));
}
Expand Down

0 comments on commit 3c8d6c0

Please sign in to comment.