diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskRemoteHostServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskRemoteHostServiceImpl.java index 3d61837d3827f..3a1b7021e9e81 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskRemoteHostServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskRemoteHostServiceImpl.java @@ -140,8 +140,9 @@ public int deleteByCode(long code, User loginUser) { throw new ServiceException(Status.TASK_REMOTE_HOST_NOT_FOUND, code); } - List relatedTaskInstances = - taskInstanceMapper.queryByTaskRemoteHostCodeAndStatus(code, Constants.TASK_NOT_TERMINATED_STATES); + String searchVal = "\"remoteHostCode\":" + taskRemoteHost.getCode(); + List relatedTaskInstances = taskInstanceMapper.queryTaskInstanceByTaskParamsAndStatus(searchVal, + Constants.TASK_NOT_TERMINATED_STATES); if (CollectionUtils.isNotEmpty(relatedTaskInstances)) { logger.error("delete task remote code {} failed, because there are {} task instances are using it.", code, relatedTaskInstances.size()); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskRemoteHostServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskRemoteHostServiceTest.java index 79f678a31db26..6933a6223dc1b 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskRemoteHostServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskRemoteHostServiceTest.java @@ -200,7 +200,8 @@ public void test_deleteByCode() { .thenReturn(taskRemoteHost); List taskInstanceList = new ArrayList<>(); taskInstanceList.add(createTaskInstance()); - Mockito.when(taskInstanceMapper.queryByTaskRemoteHostCodeAndStatus(Mockito.any(Long.class), + + Mockito.when(taskInstanceMapper.queryTaskInstanceByTaskParamsAndStatus(Mockito.anyString(), Mockito.eq(Constants.TASK_NOT_TERMINATED_STATES))).thenReturn(taskInstanceList); exception = Assertions.assertThrows(ServiceException.class, () -> { taskRemoteHostService.deleteByCode(1L, user); @@ -208,7 +209,7 @@ public void test_deleteByCode() { Assertions.assertEquals(Status.DELETE_TASK_REMOTE_HOST_FAIL.getCode(), ((ServiceException) exception).getCode()); - Mockito.when(taskInstanceMapper.queryByTaskRemoteHostCodeAndStatus(Mockito.any(Long.class), + Mockito.when(taskInstanceMapper.queryTaskInstanceByTaskParamsAndStatus(Mockito.anyString(), Mockito.eq(Constants.TASK_NOT_TERMINATED_STATES))).thenReturn(null); Mockito.when(taskRemoteHostMapper.deleteByCode(Mockito.any(Long.class))).thenReturn(1); int result = taskRemoteHostService.deleteByCode(1L, user); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java index b524f91e96fc7..3da375dd0bc7e 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java @@ -213,11 +213,6 @@ public class TaskDefinition { */ private Integer memoryMax; - /** - * task remote host code - */ - private long remoteHostCode; - /** * task execute type */ diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java index e69b5b6eef9c0..3701956612032 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java @@ -71,7 +71,6 @@ public TaskDefinitionLog(TaskDefinition taskDefinition) { this.setModifyBy(taskDefinition.getModifyBy()); this.setCpuQuota(taskDefinition.getCpuQuota()); this.setMemoryMax(taskDefinition.getMemoryMax()); - this.setRemoteHostCode(taskDefinition.getRemoteHostCode()); } public int getOperator() { diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index a463196f6c966..7f9c0b7e5f54f 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -31,7 +31,6 @@ import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; -import org.apache.dolphinscheduler.plugin.task.api.model.SSHSessionHost; import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters; @@ -296,17 +295,6 @@ public class TaskInstance implements Serializable { */ private Integer memoryMax; - /** - * task remote host code - */ - private long remoteHostCode; - - /** - * task remote host - */ - @TableField(exist = false) - private SSHSessionHost sshSessionHost; - /** * task execute type */ diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java index fe30cc66b6a9f..64be98ac6bf95 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java @@ -120,11 +120,12 @@ List loadAllInfosNoRelease(@Param("processInstanceId") int process @Param("status") int status); /** - * Query task instance list by remote host code and instance status list - * @param remoteHostCode task remote host code - * @param states task instance state list - * @return task instance list + * Query task instances by task params search string + * @param searchValue search string + * @param states states + * @return list of task instances */ - List queryByTaskRemoteHostCodeAndStatus(@Param("remoteHostCode") long remoteHostCode, - @Param("states") int[] states); + List queryTaskInstanceByTaskParamsAndStatus(@Param("searchValue") String searchValue, + @Param("states") int[] states); + } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml index 5d8c0833d450f..d914f9db08616 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml @@ -21,7 +21,7 @@ id, code, name, version, description, project_code, user_id, task_type, task_params, flag, task_priority, worker_group, environment_code, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time, - resource_ids, operator, operate_time, create_time, update_time, task_group_id, task_group_priority, cpu_quota, memory_max, task_execute_type, remote_host_code + resource_ids, operator, operate_time, create_time, update_time, task_group_id, task_group_priority, cpu_quota, memory_max, task_execute_type - select from t_ds_task_instance - where 1=1 - - and remote_host_code =#{remoteHostCode} - + where 1 = 1 and state in #{i} + + and task_params like concat('%', #{searchValue}, '%') + order by id asc diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql index 2d250edfeb180..5591db8825265 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql @@ -484,7 +484,6 @@ CREATE TABLE t_ds_task_definition task_priority tinyint(4) DEFAULT '2', worker_group varchar(200) DEFAULT NULL, environment_code bigint(20) DEFAULT '-1', - remote_host_code bigint(20) DEFAULT '-1', fail_retry_times int(11) DEFAULT NULL, fail_retry_interval int(11) DEFAULT NULL, timeout_flag tinyint(2) DEFAULT '0', @@ -521,7 +520,6 @@ CREATE TABLE t_ds_task_definition_log task_priority tinyint(4) DEFAULT '2', worker_group varchar(200) DEFAULT NULL, environment_code bigint(20) DEFAULT '-1', - remote_host_code bigint(20) DEFAULT '-1', fail_retry_times int(11) DEFAULT NULL, fail_retry_interval int(11) DEFAULT NULL, timeout_flag tinyint(2) DEFAULT '0', @@ -890,7 +888,6 @@ CREATE TABLE t_ds_task_instance task_instance_priority int(11) DEFAULT NULL, worker_group varchar(64) DEFAULT NULL, environment_code bigint(20) DEFAULT '-1', - remote_host_code bigint(20) DEFAULT '-1', environment_config text DEFAULT '', executor_id int(11) DEFAULT NULL, first_submit_time datetime DEFAULT NULL, diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql index 7a4d33c1aa066..ca92ef11bcecb 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql @@ -484,7 +484,6 @@ CREATE TABLE `t_ds_task_definition` ( `task_priority` tinyint(4) DEFAULT '2' COMMENT 'job priority', `worker_group` varchar(200) DEFAULT NULL COMMENT 'worker grouping', `environment_code` bigint(20) DEFAULT '-1' COMMENT 'environment code', - `remote_host_code` bigint(20) DEFAULT '-1' COMMENT 'task remote host code', `fail_retry_times` int(11) DEFAULT NULL COMMENT 'number of failed retries', `fail_retry_interval` int(11) DEFAULT NULL COMMENT 'failed retry interval', `timeout_flag` tinyint(2) DEFAULT '0' COMMENT 'timeout flag:0 close, 1 open', @@ -520,7 +519,6 @@ CREATE TABLE `t_ds_task_definition_log` ( `task_priority` tinyint(4) DEFAULT '2' COMMENT 'job priority', `worker_group` varchar(200) DEFAULT NULL COMMENT 'worker grouping', `environment_code` bigint(20) DEFAULT '-1' COMMENT 'environment code', - `remote_host_code` bigint(20) DEFAULT '-1' COMMENT 'task remote host code', `fail_retry_times` int(11) DEFAULT NULL COMMENT 'number of failed retries', `fail_retry_interval` int(11) DEFAULT NULL COMMENT 'failed retry interval', `timeout_flag` tinyint(2) DEFAULT '0' COMMENT 'timeout flag:0 close, 1 open', @@ -884,7 +882,6 @@ CREATE TABLE `t_ds_task_instance` ( `task_instance_priority` int(11) DEFAULT NULL COMMENT 'task instance priority:0 Highest,1 High,2 Medium,3 Low,4 Lowest', `worker_group` varchar(64) DEFAULT NULL COMMENT 'worker group id', `environment_code` bigint(20) DEFAULT '-1' COMMENT 'environment code', - `remote_host_code` bigint(20) DEFAULT '-1' COMMENT 'task remote host code', `environment_config` text COMMENT 'this config contains many environment variables config', `executor_id` int(11) DEFAULT NULL, `first_submit_time` datetime DEFAULT NULL COMMENT 'task first submit time', diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql index b934be54f78a8..16bfe20bc275c 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql @@ -401,7 +401,6 @@ CREATE TABLE t_ds_task_definition ( task_priority int DEFAULT '2' , worker_group varchar(255) DEFAULT NULL , environment_code bigint DEFAULT '-1', - remote_host_code bigint DEFAULT '-1', fail_retry_times int DEFAULT NULL , fail_retry_interval int DEFAULT NULL , timeout_flag int DEFAULT NULL , @@ -440,7 +439,6 @@ CREATE TABLE t_ds_task_definition_log ( task_priority int DEFAULT '2' , worker_group varchar(255) DEFAULT NULL , environment_code bigint DEFAULT '-1', - remote_host_code bigint DEFAULT '-1', fail_retry_times int DEFAULT NULL , fail_retry_interval int DEFAULT NULL , timeout_flag int DEFAULT NULL , @@ -784,7 +782,6 @@ CREATE TABLE t_ds_task_instance ( task_instance_priority int DEFAULT NULL , worker_group varchar(64), environment_code bigint DEFAULT '-1', - remote_host_code bigint DEFAULT '-1', environment_config text, executor_id int DEFAULT NULL , first_submit_time timestamp DEFAULT NULL , diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_ddl.sql index f1b34aec90565..f5b60e99970f7 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_ddl.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_ddl.sql @@ -139,8 +139,3 @@ CREATE TABLE `t_ds_task_remote_host` `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; - --- Add remote_host_code column -ALTER TABLE `t_ds_task_definition` ADD COLUMN `remote_host_code` bigint(20) DEFAULT '-1' COMMENT 'task remote host code'; -ALTER TABLE `t_ds_task_definition_log` ADD COLUMN `remote_host_code` bigint(20) DEFAULT '-1' COMMENT 'task remote host code'; -ALTER TABLE `t_ds_task_instance` ADD COLUMN `remote_host_code` bigint(20) DEFAULT '-1' COMMENT 'task remote host code'; diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/postgresql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/postgresql/dolphinscheduler_ddl.sql index 8373beb97d2bf..0dc6e4e753c9a 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/postgresql/dolphinscheduler_ddl.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/postgresql/dolphinscheduler_ddl.sql @@ -134,8 +134,3 @@ EXECUTE 'CREATE TABLE IF NOT EXISTS' || quote_ident(v_schema) ||'."t_ds_task_rem update_time timestamp DEFAULT NULL , PRIMARY KEY (id) )'; - --- Add resource limit column -EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_task_definition ADD COLUMN IF NOT EXISTS remote_host_code int NOT NULL DEFAULT ''-1'' '; -EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_task_definition_log ADD COLUMN IF NOT EXISTS remote_host_code int NOT NULL DEFAULT ''-1'' '; -EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_task_instance ADD COLUMN IF NOT EXISTS remote_host_code remote_host_code bigint DEFAULT ''-1'' '; diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java index d8931945d7214..b0ff2b4e7df28 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java @@ -388,7 +388,7 @@ public void testQueryTaskInstanceListPaging() { } @Test - public void testQueryByTaskRemoteHostCodeAndStatus() { + public void testQueryTaskInstanceByTaskParamsAndStatus() { TaskInstance taskInstance = new TaskInstance(); taskInstance.setFlag(Flag.YES); taskInstance.setName("us task"); @@ -397,13 +397,18 @@ public void testQueryByTaskRemoteHostCodeAndStatus() { taskInstance.setEndTime(new Date()); taskInstance.setProcessInstanceId(1); taskInstance.setTaskType("shel"); - taskInstance.setRemoteHostCode(1L); + taskInstance.setTaskParams( + "{\"localParams\":[],\"rawScript\":\"echo \\\"hello yann\\\"\",\"resourceList\":[],\"remoteHostCode\":7529232540352,\"conditionResult\":\"null\",\"dependence\":\"null\",\"switchResult\":\"null\",\"waitStartTimeout\":null}"); taskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION); + taskInstanceMapper.insert(taskInstance); + List taskInstances = taskInstanceMapper + .queryTaskInstanceByTaskParamsAndStatus("\"remoteHostCode\":7529232540352", TASK_NOT_TERMINATED_STATES); + Assertions.assertEquals(1, taskInstances.size()); - List taskInstances = - taskInstanceMapper.queryByTaskRemoteHostCodeAndStatus(1L, TASK_NOT_TERMINATED_STATES); - Assertions.assertEquals(taskInstances.size(), 1); + List nullTaskInstances = taskInstanceMapper + .queryTaskInstanceByTaskParamsAndStatus("\"remoteHostCode\":null", TASK_NOT_TERMINATED_STATES); + Assertions.assertEquals(0, nullTaskInstances.size()); } public static final int[] TASK_NOT_TERMINATED_STATES = new int[]{ diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/builder/TaskExecutionContextBuilder.java index e18c4532f419e..7725feb6ea3e7 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/builder/TaskExecutionContextBuilder.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/builder/TaskExecutionContextBuilder.java @@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.plugin.task.api.BashTaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.DataQualityTaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; @@ -79,7 +80,6 @@ public TaskExecutionContextBuilder buildTaskInstanceRelatedInfo(TaskInstance tas taskExecutionContext.setCpuQuota(taskInstance.getCpuQuota()); taskExecutionContext.setMemoryMax(taskInstance.getMemoryMax()); taskExecutionContext.setAppIds(taskInstance.getAppLink()); - taskExecutionContext.setSshSessionHost(taskInstance.getSshSessionHost()); return this; } @@ -137,6 +137,11 @@ public TaskExecutionContextBuilder buildResourceParametersInfo(ResourceParameter return this; } + public TaskExecutionContextBuilder buildBashTaskExecutionContext(BashTaskExecutionContext bashTaskExecutionContext) { + taskExecutionContext.setBashTaskExecutionContext(bashTaskExecutionContext); + return this; + } + /** * build k8sTask related info * diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java index 1d14305e74d38..32b4ec9b967b6 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java @@ -29,7 +29,6 @@ import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; -import org.apache.dolphinscheduler.dao.repository.TaskRemoteHostDao; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; @@ -82,9 +81,6 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl @Autowired private TaskDefinitionLogDao taskDefinitionLogDao; - @Autowired - private TaskRemoteHostDao taskRemoteHostDao; - @Autowired private MasterConfig masterConfig; @@ -200,8 +196,7 @@ public void run() { stateWheelExecuteThread, curingGlobalParamsService, taskInstanceDao, - taskDefinitionLogDao, - taskRemoteHostDao); + taskDefinitionLogDao); processInstanceExecCacheManager.cache(processInstance.getId(), workflowRunnable); workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW, processInstance.getId())); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java index 7680081068da5..ec55b1ec349c3 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java @@ -29,7 +29,6 @@ import org.apache.dolphinscheduler.dao.entity.Resource; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.dao.entity.TaskRemoteHost; import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; @@ -39,7 +38,6 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; -import org.apache.dolphinscheduler.plugin.task.api.model.SSHSessionHost; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode; import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; @@ -81,7 +79,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.BeanUtils; /** * stream task execute @@ -318,19 +315,6 @@ public TaskInstance newTaskInstance(TaskDefinition taskDefinition) { } } - taskInstance - .setRemoteHostCode(taskDefinition.getRemoteHostCode() <= 0 ? -1 : taskDefinition.getRemoteHostCode()); - if (taskInstance.getRemoteHostCode() > 0) { - TaskRemoteHost taskRemoteHost = taskRemoteHostDao.getTaskRemoteHostByCode(taskInstance.getRemoteHostCode()); - if (taskRemoteHost != null) { - SSHSessionHost sshSessionHost = new SSHSessionHost(); - BeanUtils.copyProperties(taskRemoteHost, sshSessionHost); - taskInstance.setSshSessionHost(sshSessionHost); - } else { - logger.error("cannot find task remote host, code: {}", taskInstance.getRemoteHostCode()); - } - } - if (taskInstance.getSubmitTime() == null) { taskInstance.setSubmitTime(new Date()); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 602453455e4a6..c04f3404ba68f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -57,16 +57,13 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.dao.entity.TaskRemoteHost; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; -import org.apache.dolphinscheduler.dao.repository.TaskRemoteHostDao; import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.model.Property; -import org.apache.dolphinscheduler.plugin.task.api.model.SSHSessionHost; import org.apache.dolphinscheduler.remote.command.HostUpdateCommand; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.master.config.MasterConfig; @@ -139,8 +136,6 @@ public class WorkflowExecuteRunnable implements Callable { private ProcessInstanceDao processInstanceDao; - private TaskRemoteHostDao taskRemoteHostDao; - private TaskInstanceDao taskInstanceDao; private TaskDefinitionLogDao taskDefinitionLogDao; @@ -258,13 +253,11 @@ public WorkflowExecuteRunnable( @NonNull StateWheelExecuteThread stateWheelExecuteThread, @NonNull CuringParamsService curingParamsService, @NonNull TaskInstanceDao taskInstanceDao, - @NonNull TaskDefinitionLogDao taskDefinitionLogDao, - @NonNull TaskRemoteHostDao taskRemoteHostDao) { + @NonNull TaskDefinitionLogDao taskDefinitionLogDao) { this.processService = processService; this.commandService = commandService; this.processInstanceDao = processInstanceDao; this.processInstance = processInstance; - this.taskRemoteHostDao = taskRemoteHostDao; this.nettyExecutorManager = nettyExecutorManager; this.processAlertManager = processAlertManager; this.stateWheelExecuteThread = stateWheelExecuteThread; @@ -1226,18 +1219,6 @@ public TaskInstance newTaskInstance(ProcessInstance processInstance, TaskNode ta } } - taskInstance.setRemoteHostCode(taskNode.getRemoteHostCode()); - if (taskInstance.getRemoteHostCode() > 0) { - TaskRemoteHost taskRemoteHost = taskRemoteHostDao.getTaskRemoteHostByCode(taskInstance.getRemoteHostCode()); - if (taskRemoteHost != null) { - SSHSessionHost sshSessionHost = new SSHSessionHost(); - BeanUtils.copyProperties(taskRemoteHost, sshSessionHost); - taskInstance.setSshSessionHost(sshSessionHost); - } else { - logger.error("cannot find task remote host, code: {}", taskInstance.getRemoteHostCode()); - } - } - // delay execution time taskInstance.setDelayTime(taskNode.getDelayTime()); taskInstance.setTaskExecuteType(taskNode.getTaskExecuteType()); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java index a47df6979a0e3..72a3d9b9fe31d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java @@ -76,6 +76,7 @@ import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.storage.impl.HadoopUtils; +import org.apache.dolphinscheduler.service.task.TaskInstanceHandler; import org.apache.dolphinscheduler.service.task.TaskPluginManager; import org.apache.dolphinscheduler.service.utils.LoggerUtils; import org.apache.dolphinscheduler.spi.enums.DbType; @@ -132,6 +133,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { protected String threadLoggerInfoName; + protected TaskInstanceHandler taskInstanceHandler; + @Override public void init(@NonNull TaskInstance taskInstance, @NonNull ProcessInstance processInstance) { processService = SpringApplicationContext.getBean(ProcessService.class); @@ -140,6 +143,7 @@ public void init(@NonNull TaskInstance taskInstance, @NonNull ProcessInstance pr taskPluginManager = SpringApplicationContext.getBean(TaskPluginManager.class); curingParamsService = SpringApplicationContext.getBean(CuringParamsService.class); taskInstanceDao = SpringApplicationContext.getBean(TaskInstanceDao.class); + taskInstanceHandler = SpringApplicationContext.getBean(TaskInstanceHandler.class); this.taskInstance = taskInstance; this.processInstance = processInstance; this.maxRetryTimes = masterConfig.getTaskCommitRetryTimes(); @@ -267,7 +271,7 @@ protected boolean stop() { @Override public String getType() { - throw new UnsupportedOperationException("This abstract class doesn's has type"); + throw new UnsupportedOperationException("This abstract class doesn't has type"); } @Override @@ -343,6 +347,7 @@ protected TaskExecutionContext getTaskExecutionContext(TaskInstance taskInstance .buildResourceParametersInfo(resources) .buildDataQualityTaskExecutionContext(dataQualityTaskExecutionContext) .buildK8sTaskRelatedInfo(k8sTaskExecutionContext) + .buildBashTaskExecutionContext(taskInstanceHandler.createBashTaskExecutionContext(taskInstance)) .buildBusinessParamsMap(businessParamsMap) .buildParamInfo(propertyMap) .create(); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java index b433d0d71a26d..52035d0ef4462 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java @@ -33,7 +33,6 @@ import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; -import org.apache.dolphinscheduler.dao.repository.TaskRemoteHostDao; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; @@ -77,8 +76,6 @@ public class WorkflowExecuteRunnableTest { private TaskInstanceDao taskInstanceDao; private TaskDefinitionLogDao taskDefinitionLogDao; - - private TaskRemoteHostDao taskRemoteHostDao; private ProcessService processService; private CommandService commandService; @@ -107,7 +104,6 @@ public void init() throws Exception { taskInstanceDao = Mockito.mock(TaskInstanceDao.class); taskDefinitionLogDao = Mockito.mock(TaskDefinitionLogDao.class); taskInstanceDao = Mockito.mock(TaskInstanceDao.class); - taskRemoteHostDao = Mockito.mock(TaskRemoteHostDao.class); Map cmdParam = new HashMap<>(); cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_START_DATE, "2020-01-01 00:00:00"); cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_END_DATE, "2020-01-20 23:00:00"); @@ -124,7 +120,7 @@ public void init() throws Exception { new WorkflowExecuteRunnable(processInstance, commandService, processService, processInstanceDao, nettyExecutorManager, processAlertManager, config, stateWheelExecuteThread, curingGlobalParamsService, - taskInstanceDao, taskDefinitionLogDao, taskRemoteHostDao)); + taskInstanceDao, taskDefinitionLogDao)); Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag"); dag.setAccessible(true); dag.set(workflowExecuteThread, new DAG()); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/model/TaskNode.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/model/TaskNode.java index 20a13d14b8fff..0208593fdc265 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/model/TaskNode.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/model/TaskNode.java @@ -189,11 +189,6 @@ public class TaskNode { */ private Integer memoryMax; - /** - * task remote host code - */ - private Long remoteHostCode; - /** * task execute type */ @@ -481,7 +476,6 @@ public String toString() { + ", taskInstancePriority=" + taskInstancePriority + ", workerGroup='" + workerGroup + '\'' + ", environmentCode=" + environmentCode - + ", remoteHostCode=" + remoteHostCode + ", timeout='" + timeout + '\'' + ", delayTime=" + delayTime + '\'' + ", taskExecuteType=" + taskExecuteType @@ -552,11 +546,4 @@ public void setTaskExecuteType(TaskExecuteType taskExecuteType) { this.taskExecuteType = taskExecuteType; } - public Long getRemoteHostCode() { - return remoteHostCode; - } - - public void setRemoteHostCode(Long remoteHostCode) { - this.remoteHostCode = remoteHostCode; - } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index ce3614ce4847e..a71edd61c0d3f 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -2318,7 +2318,6 @@ public List transformTask(List taskRelationList, taskNode.setTaskInstancePriority(taskDefinitionLog.getTaskPriority()); taskNode.setWorkerGroup(taskDefinitionLog.getWorkerGroup()); taskNode.setEnvironmentCode(taskDefinitionLog.getEnvironmentCode()); - taskNode.setRemoteHostCode(taskDefinitionLog.getRemoteHostCode()); taskNode.setTimeout(JSONUtils .toJsonString(new TaskTimeoutParameter(taskDefinitionLog.getTimeoutFlag() == TimeoutFlag.OPEN, taskDefinitionLog.getTimeoutNotifyStrategy(), diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskInstanceHandler.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskInstanceHandler.java new file mode 100644 index 0000000000000..9695e579d4a07 --- /dev/null +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskInstanceHandler.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.task; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.TaskRemoteHost; +import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao; +import org.apache.dolphinscheduler.dao.repository.TaskRemoteHostDao; +import org.apache.dolphinscheduler.plugin.task.api.BashTaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.model.SSHSessionHost; + +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.BeanUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import com.fasterxml.jackson.core.type.TypeReference; + +@Component +public class TaskInstanceHandler { + + private static final Logger logger = LoggerFactory.getLogger(TaskInstanceHandler.class); + + private static final String REMOTE_HOST_CODE = "remoteHostCode"; + + @Autowired + private TaskDefinitionDao taskDefinitionDao; + + @Autowired + private TaskRemoteHostDao taskRemoteHostDao; + + /** + * Determine whether the task needs to be run in the bash environment + * @param taskInstance task instance {@link TaskInstance} + * @return result + */ + public boolean needRunningOnBash(TaskInstance taskInstance) { + // TODO The first step is only for SHELL and Python tasks + return "SHELL".equals(taskInstance.getTaskType()) || "PYTHON".equals(taskInstance.getTaskType()); + } + + /** + * Generate bash execution context {@link BashTaskExecutionContext} + * @param taskInstance task instance {@link TaskInstance} + * @return BashTaskExecutionContext + */ + public BashTaskExecutionContext createBashTaskExecutionContext(TaskInstance taskInstance) { + if (!needRunningOnBash(taskInstance)) { + return null; + } + + BashTaskExecutionContext bashTaskExecutionContext = new BashTaskExecutionContext(); + bashTaskExecutionContext.setSessionHost(createSessionHost(taskInstance)); + + return bashTaskExecutionContext; + } + + /** + * Generate task ssh session host + * @param taskInstance task instance {@link TaskInstance} + * @return remote session host {@link SSHSessionHost} + */ + public SSHSessionHost createSessionHost(TaskInstance taskInstance) { + Map taskParameters = getTaskParameters(taskInstance); + + if (taskParameters == null || !taskParameters.containsKey(REMOTE_HOST_CODE) + || taskParameters.get(REMOTE_HOST_CODE) == null) { + logger.warn("cannot find task parameters or do not contain remote_host_code, task instance code: {}", + taskInstance.getTaskCode()); + return null; + } + + long remoteHostCode = (long) taskParameters.get(REMOTE_HOST_CODE); + TaskRemoteHost taskRemoteHost = taskRemoteHostDao.getTaskRemoteHostByCode(remoteHostCode); + SSHSessionHost sessionHost = new SSHSessionHost(); + BeanUtils.copyProperties(taskRemoteHost, sessionHost); + + return sessionHost; + } + + public Map getTaskParameters(TaskInstance taskInstance) { + TaskDefinition taskDefinition = taskDefinitionDao.findTaskDefinition( + taskInstance.getTaskCode(), + taskInstance.getTaskDefinitionVersion()); + + return JSONUtils.parseObject( + taskDefinition.getTaskParams(), + new TypeReference>() { + }); + } + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java index f75ff58b6a91f..7b5d2598d3a53 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java @@ -114,11 +114,17 @@ public abstract class AbstractCommandExecutor { */ protected SSHSessionHost sessionHost; + /** + * bash execution context + */ + protected BashTaskExecutionContext bashTaskExecutionContext; + public AbstractCommandExecutor(Consumer> logHandler, TaskExecutionContext taskRequest, Logger logger) { this.logHandler = logHandler; this.taskRequest = taskRequest; + this.bashTaskExecutionContext = taskRequest.getBashTaskExecutionContext(); this.logger = logger; this.logBuffer = new LinkedBlockingQueue<>(); } @@ -212,7 +218,7 @@ public TaskResponse run(String execCommand) throws IOException, InterruptedExcep return result; } - if (taskRequest.getSshSessionHost() != null) { + if (bashTaskExecutionContext.getSessionHost() != null) { runningOnSSH = true; } @@ -239,7 +245,7 @@ private TaskResponse runCommandFile(String commandFilePath) throws IOException, private TaskResponse runOnSSH(String commandFilePath) { TaskResponse result = new TaskResponse(); SSHSessionHolder sessionHolder = null; - this.sessionHost = taskRequest.getSshSessionHost(); + this.sessionHost = bashTaskExecutionContext.getSessionHost(); try { sessionHolder = SSHSessionPool.getSessionHolder(sessionHost); sessionHolder.setSftpConfig(SSHSessionPool.getSftpConfig()); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/BashTaskExecutionContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/BashTaskExecutionContext.java new file mode 100644 index 0000000000000..c093e09cea215 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/BashTaskExecutionContext.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api; + +import org.apache.dolphinscheduler.plugin.task.api.model.SSHSessionHost; + +import java.io.Serializable; + +import lombok.Data; + +/** + * Execution context executed in the BASH environment, such as Shell Task + * This context mainly contains some advanced parameters executed in the BASH environment, + * not related to the parameters of the task plugin + * TODO: will migrate environment configuration and resource limitations to this context in the future + */ +@Data +public class BashTaskExecutionContext implements Serializable { + + /** + * remote SSH session host, see {@link SSHSessionHost} + * can ssh to the remote host's bash env to run commands or sftp files. + */ + private SSHSessionHost sessionHost; + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java index 258aa32d58751..8e2eb8110fe20 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java @@ -20,7 +20,6 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.plugin.task.api.model.Property; -import org.apache.dolphinscheduler.plugin.task.api.model.SSHSessionHost; import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; import java.io.Serializable; @@ -238,6 +237,12 @@ public class TaskExecutionContext implements Serializable { * k8s TaskExecutionContext */ private K8sTaskExecutionContext k8sTaskExecutionContext; + + /** + * bash environment execution context + */ + private BashTaskExecutionContext bashTaskExecutionContext; + /** * resources full name and tenant code */ @@ -272,8 +277,4 @@ public class TaskExecutionContext implements Serializable { */ private int testFlag; - /** - * task remote host - */ - private SSHSessionHost sshSessionHost; } diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts index 73698787f355b..f364c225589bb 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts @@ -61,6 +61,10 @@ export function formatParams(data: INodeData): { taskParams.others = data.others } + if (data.taskType && ['SHELL', 'PYTHON'].includes(data.taskType)) { + taskParams.remoteHostCode = data.remoteHostCode + } + if (data.taskType === 'SPARK') { taskParams.driverCores = data.driverCores taskParams.driverMemory = data.driverMemory @@ -478,7 +482,6 @@ export function formatParams(data: INodeData): { delayTime: data.delayTime ? String(data.delayTime) : '0', description: data.description, environmentCode: data.environmentCode || -1, - remoteHostCode: data.remoteHostCode || -1, failRetryInterval: data.failRetryInterval ? String(data.failRetryInterval) : '0', @@ -524,7 +527,6 @@ export function formatParams(data: INodeData): { export function formatModel(data: ITaskData) { const params = { ...omit(data, [ - 'remoteHostCode', 'environmentCode', 'timeoutFlag', 'timeoutNotifyStrategy', @@ -532,7 +534,6 @@ export function formatModel(data: ITaskData) { ]), ...omit(data.taskParams, ['resourceList', 'mainJar', 'localParams']), environmentCode: data.environmentCode === -1 ? null : data.environmentCode, - remoteHostCode: data.remoteHostCode === -1 ? null : data.remoteHostCode, timeoutFlag: data.timeoutFlag === 'OPEN', timeoutNotifyStrategy: data.timeoutNotifyStrategy ? [data.timeoutNotifyStrategy] @@ -700,6 +701,9 @@ export function formatModel(data: ITaskData) { if (data.taskParams?.jobType) { params.isCustomTask = data.taskParams.jobType === 'CUSTOM' } + if (data.taskParams?.remoteHostCode) { + params.remoteHostCode = data.taskParams.remoteHostCode === -1 ? null : data.taskParams.remoteHostCode + } return params } diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts index f9eae9637c1f7..97a1fdd8d65f7 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts @@ -401,6 +401,7 @@ interface ITaskParams { name?: string cloudWatchLogGroupArn?: string yamlContent?: string + remoteHostCode?: number | null } interface INodeData