diff --git a/spring-cloud-dataflow-docs/src/main/asciidoc/tasks.adoc b/spring-cloud-dataflow-docs/src/main/asciidoc/tasks.adoc index 78f2941c11..5614987909 100644 --- a/spring-cloud-dataflow-docs/src/main/asciidoc/tasks.adoc +++ b/spring-cloud-dataflow-docs/src/main/asciidoc/tasks.adoc @@ -288,6 +288,30 @@ NOTE: Properties configured by using this mechanism have lower precedence than t They are overridden if a property with the same key is specified at task launch time (for example, `app.trigger.prop2` overrides the common property). +==== Launching tasks with a specific application version + +When launching a task you can specify the specific version of the application. +If no version is specified Spring Cloud Data Flow will use the default version of the application. +To specify a version of the application to be used at launch time use the deployer property `version.`. +For example: + +==== +[source,bash,subs=attributes] +---- +task launch my-task --properties 'version.timestamp=3.0.0' +---- +==== + +Similarly, when scheduling a task you will use the same format of `version.`. For example: + +==== +[source,bash,subs=attributes] +---- +task schedule create --name my-schedule --definitionName my-task --expression '*/1 * * * *' --properties 'version.timestamp=3.0.0' +---- +==== + + [[spring-cloud-dataflow-task-limit-concurrent-executions]] === Limit the number concurrent task launches diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultSchedulerService.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultSchedulerService.java index 0d2f8af316..0d8a95cd8f 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultSchedulerService.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultSchedulerService.java @@ -224,10 +224,12 @@ public void schedule( String taskAppName = taskDefinition.getRegisteredAppName(); String taskLabel = taskDefinition.getAppDefinition().getName(); - if(!StringUtils.hasText(taskLabel)) { - taskLabel = taskAppName; - } String version = taskDeploymentProperties.get("version." + taskLabel); + if (version == null) { + version = taskDeploymentProperties.get("version." + taskAppName); + } + + SchemaVersionTarget schemaVersionTarget = aggregateExecutionSupport.findSchemaVersionTarget(taskAppName, version, taskDefinition); Assert.notNull(schemaVersionTarget, "schemaVersionTarget not found for " + taskAppName); TaskParser taskParser = new TaskParser(taskDefinition.getName(), taskDefinition.getDslText(), true, true); diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/service/impl/DefaultSchedulerServiceTests.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/service/impl/DefaultSchedulerServiceTests.java index 828965625c..a09c98f352 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/service/impl/DefaultSchedulerServiceTests.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/service/impl/DefaultSchedulerServiceTests.java @@ -31,6 +31,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mockito; +import org.mockito.stubbing.Answer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration; import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties; @@ -60,7 +61,6 @@ import org.springframework.cloud.dataflow.server.service.SchedulerServiceProperties; import org.springframework.cloud.dataflow.server.service.TaskExecutionInfoService; import org.springframework.cloud.deployer.resource.docker.DockerResource; -import org.springframework.cloud.deployer.spi.core.AppDefinition; import org.springframework.cloud.deployer.spi.scheduler.CreateScheduleException; import org.springframework.cloud.deployer.spi.scheduler.ScheduleInfo; import org.springframework.cloud.deployer.spi.scheduler.ScheduleRequest; @@ -68,6 +68,7 @@ import org.springframework.cloud.deployer.spi.task.TaskLauncher; import org.springframework.cloud.task.listener.TaskException; import org.springframework.core.env.PropertyResolver; +import org.springframework.core.io.FileSystemResource; import org.springframework.core.io.Resource; import org.springframework.core.io.ResourceLoader; import org.springframework.data.domain.Page; @@ -75,10 +76,12 @@ import org.springframework.test.annotation.DirtiesContext; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -105,7 +108,9 @@ public class DefaultSchedulerServiceTests { private static final String BASE_DEFINITION_NAME = "myTaskDefinition"; - private static final String CTR_DEFINITION_NAME= "myCtrDefinition"; + private static final String CTR_DEFINITION_NAME = "myCtrDefinition"; + + private static final String DEMO_APP_NAME = "demoAppName"; @Autowired private Scheduler simpleTestScheduler; @@ -431,18 +436,51 @@ public void testScheduleWithoutCommandLineArguments() { @Test public void testGetDefaultCTR() { ScheduleRequest request = getScheduleRequest(new ArrayList<>(), "springcloudtask/composed-task-runner:latest", "1: timestamp && 2: timestamp"); - AppDefinition definition = request.getDefinition(); assertEquals("Docker Resource [docker:springcloudtask/composed-task-runner:latest]", request.getResource().toString()); } + @Test + public void testVersionWithResource() { + String validVersionNumber = "3.0.0"; + ScheduleRequest request = scheduleRequest(validVersionNumber); + assertThat(request.getResource().toString()).contains("file:src/test/resources/apps/foo-task"); + } + + @Test + public void testVersionWithResourceInvalidVersion() { + String invalidVersionNumber = "2.0.0"; + assertThatIllegalArgumentException() + .isThrownBy(() -> { + scheduleRequest(invalidVersionNumber); + }).withMessage("Unknown task app: demo"); + } + + private ScheduleRequest scheduleRequest(String appVersionToTest) { + String definition = "demo"; + Map resourceTestProps = new HashMap<>(testProperties); + resourceTestProps.put("version.demo", appVersionToTest); + AppRegistryService mockAppRegistryService = mock(AppRegistryService.class); + TaskDefinition taskDefinition = new TaskDefinition(BASE_DEFINITION_NAME, definition); + AppRegistration demoRegistration = new AppRegistration(); + demoRegistration.setName(DEMO_APP_NAME); + + when(mockAppRegistryService.find(taskDefinition.getRegisteredAppName(), ApplicationType.task, "3.0.0")) + .thenReturn(demoRegistration); + return getScheduleRequest(new ArrayList<>(), + "springcloudtask/composed-task-runner:latest", + definition, resourceTestProps, mockAppRegistryService); + } private List getCommandLineArguments(List commandLineArguments) { return getScheduleRequest(commandLineArguments,"springcloudtask/timestamp-task:latest", "timestamp").getCommandlineArguments(); } private ScheduleRequest getScheduleRequest(List commandLineArguments, String resourceToReturn, String definition) { + AppRegistryService mockAppRegistryService = mock(AppRegistryService.class); + return getScheduleRequest(commandLineArguments, resourceToReturn, definition, this.testProperties, mockAppRegistryService); + } + private ScheduleRequest getScheduleRequest(List commandLineArguments, String resourceToReturn, String definition, Map testProperties, AppRegistryService appRegistryService) { Scheduler mockScheduler = mock(SimpleTestScheduler.class); TaskDefinitionRepository mockTaskDefinitionRepository = mock(TaskDefinitionRepository.class); - AppRegistryService mockAppRegistryService = mock(AppRegistryService.class); Launcher launcher = new Launcher("default", "defaultType", null, mockScheduler); List launchers = new ArrayList<>(); @@ -452,7 +490,7 @@ private ScheduleRequest getScheduleRequest(List commandLineArguments, St mock(CommonApplicationProperties.class), taskPlatform, mockTaskDefinitionRepository, - mockAppRegistryService, + appRegistryService, mock(ResourceLoader.class), this.taskConfigurationProperties, mock(DataSourceProperties.class), @@ -470,10 +508,20 @@ private ScheduleRequest getScheduleRequest(List commandLineArguments, St TaskDefinition taskDefinition = new TaskDefinition(BASE_DEFINITION_NAME, definition); when(mockTaskDefinitionRepository.findById(BASE_DEFINITION_NAME)).thenReturn(Optional.of(taskDefinition)); - when(mockAppRegistryService.getAppResource(any())).thenReturn(new DockerResource(resourceToReturn)); - when(mockAppRegistryService.find(taskDefinition.getRegisteredAppName(), ApplicationType.task)) + doAnswer((Answer) invocation -> { + AppRegistration appRegistration = invocation.getArgument(0, AppRegistration.class); + String name = appRegistration.getName(); + Resource resource = new DockerResource(resourceToReturn); + if(name != null && name.equals(DEMO_APP_NAME)) { + resource = new FileSystemResource("file:src/test/resources/apps/foo-task"); + } + return resource; + }).when(appRegistryService).getAppResource(any()); + when(appRegistryService.find(taskDefinition.getRegisteredAppName(), ApplicationType.task)) .thenReturn(new AppRegistration()); - mockSchedulerService.schedule(BASE_SCHEDULE_NAME, BASE_DEFINITION_NAME, this.testProperties, + + + mockSchedulerService.schedule(BASE_SCHEDULE_NAME, BASE_DEFINITION_NAME, testProperties, commandLineArguments, null); ArgumentCaptor scheduleRequestArgumentCaptor = ArgumentCaptor.forClass(ScheduleRequest.class);