Skip to content

Commit

Permalink
remove remote_host_code from taskInstance and TaskDefinition, and add…
Browse files Browse the repository at this point in the history
… BashTaskExecutionContext
  • Loading branch information
DarkAssassinator committed Nov 12, 2022
1 parent 87a5859 commit fb3ae31
Show file tree
Hide file tree
Showing 29 changed files with 224 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,9 @@ public int deleteByCode(long code, User loginUser) {
throw new ServiceException(Status.TASK_REMOTE_HOST_NOT_FOUND, code);
}

List<TaskInstance> relatedTaskInstances =
taskInstanceMapper.queryByTaskRemoteHostCodeAndStatus(code, Constants.TASK_NOT_TERMINATED_STATES);
String searchVal = "\"remoteHostCode\":" + taskRemoteHost.getCode();
List<TaskInstance> relatedTaskInstances = taskInstanceMapper.queryTaskInstanceByTaskParamsAndStatus(searchVal,
Constants.TASK_NOT_TERMINATED_STATES);
if (CollectionUtils.isNotEmpty(relatedTaskInstances)) {
logger.error("delete task remote code {} failed, because there are {} task instances are using it.", code,
relatedTaskInstances.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,15 +200,16 @@ public void test_deleteByCode() {
.thenReturn(taskRemoteHost);
List<TaskInstance> taskInstanceList = new ArrayList<>();
taskInstanceList.add(createTaskInstance());
Mockito.when(taskInstanceMapper.queryByTaskRemoteHostCodeAndStatus(Mockito.any(Long.class),

Mockito.when(taskInstanceMapper.queryTaskInstanceByTaskParamsAndStatus(Mockito.anyString(),
Mockito.eq(Constants.TASK_NOT_TERMINATED_STATES))).thenReturn(taskInstanceList);
exception = Assertions.assertThrows(ServiceException.class, () -> {
taskRemoteHostService.deleteByCode(1L, user);
});
Assertions.assertEquals(Status.DELETE_TASK_REMOTE_HOST_FAIL.getCode(),
((ServiceException) exception).getCode());

Mockito.when(taskInstanceMapper.queryByTaskRemoteHostCodeAndStatus(Mockito.any(Long.class),
Mockito.when(taskInstanceMapper.queryTaskInstanceByTaskParamsAndStatus(Mockito.anyString(),
Mockito.eq(Constants.TASK_NOT_TERMINATED_STATES))).thenReturn(null);
Mockito.when(taskRemoteHostMapper.deleteByCode(Mockito.any(Long.class))).thenReturn(1);
int result = taskRemoteHostService.deleteByCode(1L, user);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,6 @@ public class TaskDefinition {
*/
private Integer memoryMax;

/**
* task remote host code
*/
private long remoteHostCode;

/**
* task execute type
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ public TaskDefinitionLog(TaskDefinition taskDefinition) {
this.setModifyBy(taskDefinition.getModifyBy());
this.setCpuQuota(taskDefinition.getCpuQuota());
this.setMemoryMax(taskDefinition.getMemoryMax());
this.setRemoteHostCode(taskDefinition.getRemoteHostCode());
}

public int getOperator() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.SSHSessionHost;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;

Expand Down Expand Up @@ -296,17 +295,6 @@ public class TaskInstance implements Serializable {
*/
private Integer memoryMax;

/**
* task remote host code
*/
private long remoteHostCode;

/**
* task remote host
*/
@TableField(exist = false)
private SSHSessionHost sshSessionHost;

/**
* task execute type
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,12 @@ List<TaskInstance> loadAllInfosNoRelease(@Param("processInstanceId") int process
@Param("status") int status);

/**
* Query task instance list by remote host code and instance status list
* @param remoteHostCode task remote host code
* @param states task instance state list
* @return task instance list
* Query task instances by task params search string
* @param searchValue search string
* @param states states
* @return list of task instances
*/
List<TaskInstance> queryByTaskRemoteHostCodeAndStatus(@Param("remoteHostCode") long remoteHostCode,
@Param("states") int[] states);
List<TaskInstance> queryTaskInstanceByTaskParamsAndStatus(@Param("searchValue") String searchValue,
@Param("states") int[] states);

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<sql id="baseSql">
id, code, name, version, description, project_code, user_id, task_type, task_params, flag, task_priority,
worker_group, environment_code, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time,
resource_ids, operator, operate_time, create_time, update_time, task_group_id, task_group_priority, cpu_quota, memory_max, task_execute_type, remote_host_code
resource_ids, operator, operate_time, create_time, update_time, task_group_id, task_group_priority, cpu_quota, memory_max, task_execute_type
</sql>
<select id="queryMaxVersionForDefinition" resultType="java.lang.Integer">
select max(version)
Expand Down Expand Up @@ -52,7 +52,7 @@
insert into t_ds_task_definition_log (code, name, version, description, project_code, user_id,
task_type, task_params, flag, task_priority, worker_group, environment_code, fail_retry_times, fail_retry_interval,
timeout_flag, timeout_notify_strategy, timeout, delay_time, resource_ids, operator, operate_time, create_time,
update_time, task_group_id, task_group_priority, cpu_quota, memory_max, task_execute_type, remote_host_code)
update_time, task_group_id, task_group_priority, cpu_quota, memory_max, task_execute_type)
values
<foreach collection="taskDefinitionLogs" item="taskDefinitionLog" separator=",">
(#{taskDefinitionLog.code},#{taskDefinitionLog.name},#{taskDefinitionLog.version},#{taskDefinitionLog.description},
Expand All @@ -61,7 +61,7 @@
#{taskDefinitionLog.failRetryTimes},#{taskDefinitionLog.failRetryInterval},#{taskDefinitionLog.timeoutFlag},#{taskDefinitionLog.timeoutNotifyStrategy},
#{taskDefinitionLog.timeout},#{taskDefinitionLog.delayTime},#{taskDefinitionLog.resourceIds},#{taskDefinitionLog.operator},#{taskDefinitionLog.operateTime},
#{taskDefinitionLog.createTime},#{taskDefinitionLog.updateTime}, #{taskDefinitionLog.taskGroupId},#{taskDefinitionLog.taskGroupPriority},
#{taskDefinitionLog.cpuQuota},#{taskDefinitionLog.memoryMax},#{taskDefinitionLog.taskExecuteType},#{taskDefinitionLog.remoteHostCode})
#{taskDefinitionLog.cpuQuota},#{taskDefinitionLog.memoryMax},#{taskDefinitionLog.taskExecuteType})
</foreach>
</insert>
<delete id="deleteByCodeAndVersion">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<sql id="baseSql">
id, code, name, version, description, project_code, user_id, task_type, task_params, flag, task_priority,
worker_group, environment_code, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time,
resource_ids, create_time, update_time, task_group_id,task_group_priority, cpu_quota, memory_max, task_execute_type, remote_host_code
resource_ids, create_time, update_time, task_group_id,task_group_priority, cpu_quota, memory_max, task_execute_type
</sql>
<sql id="baseSqlV2">
${alias}.id, ${alias}.code, ${alias}.name, ${alias}.version, ${alias}.description, ${alias}.project_code, ${alias}.user_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
id, name, task_type, process_instance_id, task_code, task_definition_version, state, submit_time,
start_time, end_time, host, execute_path, log_path, alert_flag, retry_times, pid, app_link,
flag, retry_interval, max_retry_times, task_instance_priority, worker_group,environment_code , executor_id,
first_submit_time, delay_time, task_params, var_pool, dry_run, test_flag, task_group_id, cpu_quota, memory_max, task_execute_type, remote_host_code
first_submit_time, delay_time, task_params, var_pool, dry_run, test_flag, task_group_id, cpu_quota, memory_max, task_execute_type
</sql>
<sql id="baseSqlV2">
${alias}.id, ${alias}.name, ${alias}.task_type, ${alias}.task_code, ${alias}.task_definition_version, ${alias}.process_instance_id, ${alias}.state, ${alias}.submit_time,
${alias}.start_time, ${alias}.end_time, ${alias}.host, ${alias}.execute_path, ${alias}.log_path, ${alias}.alert_flag, ${alias}.retry_times, ${alias}.pid, ${alias}.app_link,
${alias}.flag, ${alias}.retry_interval, ${alias}.max_retry_times, ${alias}.task_instance_priority, ${alias}.worker_group,${alias}.environment_code , ${alias}.executor_id,
${alias}.first_submit_time, ${alias}.delay_time, ${alias}.task_params, ${alias}.var_pool, ${alias}.dry_run, ${alias}.test_flag, ${alias}.task_group_id, ${alias}.task_execute_type, ${alias}.remote_host_code
${alias}.first_submit_time, ${alias}.delay_time, ${alias}.task_params, ${alias}.var_pool, ${alias}.dry_run, ${alias}.test_flag, ${alias}.task_group_id, ${alias}.task_execute_type
</sql>
<update id="setFailoverByHostAndStateArray">
update t_ds_task_instance
Expand Down Expand Up @@ -174,20 +174,20 @@
</if>
</select>

<select id="queryByTaskRemoteHostCodeAndStatus" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
<select id="queryTaskInstanceByTaskParamsAndStatus" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
select
<include refid="baseSql"/>
from t_ds_task_instance
where 1=1
<if test="remoteHostCode != ''">
and remote_host_code =#{remoteHostCode}
</if>
where 1 = 1
<if test="states != null and states.length != 0">
and state in
<foreach collection="states" item="i" open="(" close=")" separator=",">
#{i}
</foreach>
</if>
<if test="searchValue != null and searchValue != '' ">
and task_params like concat('%', #{searchValue}, '%')
</if>
order by id asc
</select>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,6 @@ CREATE TABLE t_ds_task_definition
task_priority tinyint(4) DEFAULT '2',
worker_group varchar(200) DEFAULT NULL,
environment_code bigint(20) DEFAULT '-1',
remote_host_code bigint(20) DEFAULT '-1',
fail_retry_times int(11) DEFAULT NULL,
fail_retry_interval int(11) DEFAULT NULL,
timeout_flag tinyint(2) DEFAULT '0',
Expand Down Expand Up @@ -521,7 +520,6 @@ CREATE TABLE t_ds_task_definition_log
task_priority tinyint(4) DEFAULT '2',
worker_group varchar(200) DEFAULT NULL,
environment_code bigint(20) DEFAULT '-1',
remote_host_code bigint(20) DEFAULT '-1',
fail_retry_times int(11) DEFAULT NULL,
fail_retry_interval int(11) DEFAULT NULL,
timeout_flag tinyint(2) DEFAULT '0',
Expand Down Expand Up @@ -890,7 +888,6 @@ CREATE TABLE t_ds_task_instance
task_instance_priority int(11) DEFAULT NULL,
worker_group varchar(64) DEFAULT NULL,
environment_code bigint(20) DEFAULT '-1',
remote_host_code bigint(20) DEFAULT '-1',
environment_config text DEFAULT '',
executor_id int(11) DEFAULT NULL,
first_submit_time datetime DEFAULT NULL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,6 @@ CREATE TABLE `t_ds_task_definition` (
`task_priority` tinyint(4) DEFAULT '2' COMMENT 'job priority',
`worker_group` varchar(200) DEFAULT NULL COMMENT 'worker grouping',
`environment_code` bigint(20) DEFAULT '-1' COMMENT 'environment code',
`remote_host_code` bigint(20) DEFAULT '-1' COMMENT 'task remote host code',
`fail_retry_times` int(11) DEFAULT NULL COMMENT 'number of failed retries',
`fail_retry_interval` int(11) DEFAULT NULL COMMENT 'failed retry interval',
`timeout_flag` tinyint(2) DEFAULT '0' COMMENT 'timeout flag:0 close, 1 open',
Expand Down Expand Up @@ -520,7 +519,6 @@ CREATE TABLE `t_ds_task_definition_log` (
`task_priority` tinyint(4) DEFAULT '2' COMMENT 'job priority',
`worker_group` varchar(200) DEFAULT NULL COMMENT 'worker grouping',
`environment_code` bigint(20) DEFAULT '-1' COMMENT 'environment code',
`remote_host_code` bigint(20) DEFAULT '-1' COMMENT 'task remote host code',
`fail_retry_times` int(11) DEFAULT NULL COMMENT 'number of failed retries',
`fail_retry_interval` int(11) DEFAULT NULL COMMENT 'failed retry interval',
`timeout_flag` tinyint(2) DEFAULT '0' COMMENT 'timeout flag:0 close, 1 open',
Expand Down Expand Up @@ -884,7 +882,6 @@ CREATE TABLE `t_ds_task_instance` (
`task_instance_priority` int(11) DEFAULT NULL COMMENT 'task instance priority:0 Highest,1 High,2 Medium,3 Low,4 Lowest',
`worker_group` varchar(64) DEFAULT NULL COMMENT 'worker group id',
`environment_code` bigint(20) DEFAULT '-1' COMMENT 'environment code',
`remote_host_code` bigint(20) DEFAULT '-1' COMMENT 'task remote host code',
`environment_config` text COMMENT 'this config contains many environment variables config',
`executor_id` int(11) DEFAULT NULL,
`first_submit_time` datetime DEFAULT NULL COMMENT 'task first submit time',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,6 @@ CREATE TABLE t_ds_task_definition (
task_priority int DEFAULT '2' ,
worker_group varchar(255) DEFAULT NULL ,
environment_code bigint DEFAULT '-1',
remote_host_code bigint DEFAULT '-1',
fail_retry_times int DEFAULT NULL ,
fail_retry_interval int DEFAULT NULL ,
timeout_flag int DEFAULT NULL ,
Expand Down Expand Up @@ -440,7 +439,6 @@ CREATE TABLE t_ds_task_definition_log (
task_priority int DEFAULT '2' ,
worker_group varchar(255) DEFAULT NULL ,
environment_code bigint DEFAULT '-1',
remote_host_code bigint DEFAULT '-1',
fail_retry_times int DEFAULT NULL ,
fail_retry_interval int DEFAULT NULL ,
timeout_flag int DEFAULT NULL ,
Expand Down Expand Up @@ -784,7 +782,6 @@ CREATE TABLE t_ds_task_instance (
task_instance_priority int DEFAULT NULL ,
worker_group varchar(64),
environment_code bigint DEFAULT '-1',
remote_host_code bigint DEFAULT '-1',
environment_config text,
executor_id int DEFAULT NULL ,
first_submit_time timestamp DEFAULT NULL ,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,3 @@ CREATE TABLE `t_ds_task_remote_host`
`update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

-- Add remote_host_code column
ALTER TABLE `t_ds_task_definition` ADD COLUMN `remote_host_code` bigint(20) DEFAULT '-1' COMMENT 'task remote host code';
ALTER TABLE `t_ds_task_definition_log` ADD COLUMN `remote_host_code` bigint(20) DEFAULT '-1' COMMENT 'task remote host code';
ALTER TABLE `t_ds_task_instance` ADD COLUMN `remote_host_code` bigint(20) DEFAULT '-1' COMMENT 'task remote host code';
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,3 @@ EXECUTE 'CREATE TABLE IF NOT EXISTS' || quote_ident(v_schema) ||'."t_ds_task_rem
update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
)';

-- Add resource limit column
EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_task_definition ADD COLUMN IF NOT EXISTS remote_host_code int NOT NULL DEFAULT ''-1'' ';
EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_task_definition_log ADD COLUMN IF NOT EXISTS remote_host_code int NOT NULL DEFAULT ''-1'' ';
EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_task_instance ADD COLUMN IF NOT EXISTS remote_host_code remote_host_code bigint DEFAULT ''-1'' ';
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ public void testQueryTaskInstanceListPaging() {
}

@Test
public void testQueryByTaskRemoteHostCodeAndStatus() {
public void testQueryTaskInstanceByTaskParamsAndStatus() {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setFlag(Flag.YES);
taskInstance.setName("us task");
Expand All @@ -397,13 +397,18 @@ public void testQueryByTaskRemoteHostCodeAndStatus() {
taskInstance.setEndTime(new Date());
taskInstance.setProcessInstanceId(1);
taskInstance.setTaskType("shel");
taskInstance.setRemoteHostCode(1L);
taskInstance.setTaskParams(
"{\"localParams\":[],\"rawScript\":\"echo \\\"hello yann\\\"\",\"resourceList\":[],\"remoteHostCode\":7529232540352,\"conditionResult\":\"null\",\"dependence\":\"null\",\"switchResult\":\"null\",\"waitStartTimeout\":null}");
taskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION);

taskInstanceMapper.insert(taskInstance);
List<TaskInstance> taskInstances = taskInstanceMapper
.queryTaskInstanceByTaskParamsAndStatus("\"remoteHostCode\":7529232540352", TASK_NOT_TERMINATED_STATES);
Assertions.assertEquals(1, taskInstances.size());

List<TaskInstance> taskInstances =
taskInstanceMapper.queryByTaskRemoteHostCodeAndStatus(1L, TASK_NOT_TERMINATED_STATES);
Assertions.assertEquals(taskInstances.size(), 1);
List<TaskInstance> nullTaskInstances = taskInstanceMapper
.queryTaskInstanceByTaskParamsAndStatus("\"remoteHostCode\":null", TASK_NOT_TERMINATED_STATES);
Assertions.assertEquals(0, nullTaskInstances.size());
}

public static final int[] TASK_NOT_TERMINATED_STATES = new int[]{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.BashTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.DataQualityTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
Expand Down Expand Up @@ -79,7 +80,6 @@ public TaskExecutionContextBuilder buildTaskInstanceRelatedInfo(TaskInstance tas
taskExecutionContext.setCpuQuota(taskInstance.getCpuQuota());
taskExecutionContext.setMemoryMax(taskInstance.getMemoryMax());
taskExecutionContext.setAppIds(taskInstance.getAppLink());
taskExecutionContext.setSshSessionHost(taskInstance.getSshSessionHost());
return this;
}

Expand Down Expand Up @@ -137,6 +137,11 @@ public TaskExecutionContextBuilder buildResourceParametersInfo(ResourceParameter
return this;
}

public TaskExecutionContextBuilder buildBashTaskExecutionContext(BashTaskExecutionContext bashTaskExecutionContext) {
taskExecutionContext.setBashTaskExecutionContext(bashTaskExecutionContext);
return this;
}

/**
* build k8sTask related info
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskRemoteHostDao;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
Expand Down Expand Up @@ -82,9 +81,6 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
@Autowired
private TaskDefinitionLogDao taskDefinitionLogDao;

@Autowired
private TaskRemoteHostDao taskRemoteHostDao;

@Autowired
private MasterConfig masterConfig;

Expand Down Expand Up @@ -200,8 +196,7 @@ public void run() {
stateWheelExecuteThread,
curingGlobalParamsService,
taskInstanceDao,
taskDefinitionLogDao,
taskRemoteHostDao);
taskDefinitionLogDao);
processInstanceExecCacheManager.cache(processInstance.getId(), workflowRunnable);
workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW,
processInstance.getId()));
Expand Down
Loading

0 comments on commit fb3ae31

Please sign in to comment.