Skip to content

Commit

Permalink
[DSIP-87] Remove cache configuration of task
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Dec 12, 2024
1 parent 566651c commit 7ab60ba
Show file tree
Hide file tree
Showing 74 changed files with 240 additions and 782 deletions.
16 changes: 0 additions & 16 deletions docs/docs/en/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -752,20 +752,4 @@ start API server. If you want disabled when Python gateway service you could cha

---

## Q:How to determine whether a task has been cached when the cache is executed, that is, how to determine whether a task can use the running result of another task?

A: For the task identified as `Cache Execution`, when the task starts, a cache key will be generated, and the key is composed of the following fields and hashed:

- task definition: the id of the task definition corresponding to the task instance
- task version: the version of the task definition corresponding to the task instance
- task input parameters: including the parameters passed in by the upstream node and the global parameter, the parameters referenced by the parameter list of the task definition and the parameters used by the task definition using `${}`
- environment configuration: the actual configuration content of the environment configuration under the environment name, that is, the actual configuration content in the `security` - `environment management`

If the task with cache identification runs, it will find whether there is data with the same cache key in the database,

- If there is, copy the task instance and update the corresponding data
- If not, the task runs as usual, and the task instance data is stored in the cache when the task is completed

If you do not need to cache, you can right-click the node to run `Clear cache` in the workflow instance to clear the cache, which will clear the cache data of the current input parameters under this version.

We will collect more FAQ later
1 change: 0 additions & 1 deletion docs/docs/en/guide/task/appendix.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ DolphinScheduler task plugins share some common default parameters. Each type of
|--------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Node Name | The name of the task. Node names within the same workflow must be unique. |
| Run Flag | Indicating whether to schedule the task. If you do not need to execute the task, you can turn on the `Prohibition execution` switch. |
| Cache Execution | Indicating whether this node needs to be cached. If it is cached, the same identifier (same task version, same task definition, same parameter input) task is cached. When the task has been cached, it will not be executed again, and the result will be reused directly. |
| Description | Describing the function of this node. |
| Task Priority | When the number of the worker threads is insufficient, the worker executes task according to the priority. When two tasks have the same priority, the worker will execute them in `first come first served` fashion. |
| Worker Group | Machines which execute the tasks. If you choose `default`, scheduler will send the task to a random worker. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
"taskParamList" : [ ],
"taskParamMap" : null,
"flag" : "YES",
"isCache" : "NO",
"taskPriority" : "MEDIUM",
"userName" : null,
"projectName" : null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@

import static org.apache.dolphinscheduler.api.enums.Status.FORCE_TASK_SUCCESS_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_LIST_PAGING_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.REMOVE_TASK_INSTANCE_CACHE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.TASK_SAVEPOINT_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.TASK_STOP_ERROR;

import org.apache.dolphinscheduler.api.audit.OperatorLog;
import org.apache.dolphinscheduler.api.audit.enums.AuditType;
import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse;
import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.TaskInstanceService;
import org.apache.dolphinscheduler.api.utils.Result;
Expand All @@ -37,7 +35,6 @@

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
Expand Down Expand Up @@ -202,24 +199,4 @@ public Result<Object> stopTask(@Parameter(hidden = true) @RequestAttribute(value
return taskInstanceService.stopTask(loginUser, projectCode, id);
}

/**
* remove task instance cache
*
* @param loginUser login user
* @param projectCode project code
* @param id task instance id
* @return the result code and msg
*/
@Operation(summary = "remove-task-instance-cache", description = "REMOVE_TASK_INSTANCE_CACHE")
@Parameters({
@Parameter(name = "id", description = "TASK_INSTANCE_ID", required = true, schema = @Schema(implementation = int.class, example = "12"))
})
@DeleteMapping(value = "/{id}/remove-cache")
@ResponseStatus(HttpStatus.OK)
@ApiException(REMOVE_TASK_INSTANCE_CACHE_ERROR)
public TaskInstanceRemoveCacheResponse removeTaskInstanceCache(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable(value = "id") Integer id) {
return taskInstanceService.removeTaskInstanceCache(loginUser, projectCode, id);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,6 @@ public enum Status {
"资源文件已授权其他用户[{0}],后缀不允许修改"),
RESOURCE_HAS_FOLDER(20018, "There are files or folders in the current directory:{0}", "当前目录下有文件或文件夹[{0}]"),

REMOVE_TASK_INSTANCE_CACHE_ERROR(20019, "remove task instance cache error", "删除任务实例缓存错误"),

ILLEGAL_RESOURCE_PATH(20020, "Resource file [{0}] is illegal", "非法的资源路径[{0}]"),

USER_NO_OPERATION_PERM(30001, "user has no operation privilege", "当前用户没有操作权限"),
Expand Down Expand Up @@ -520,7 +518,6 @@ public enum Status {
CLOSE_TASK_GROUP_ERROR(130011, "close task group error", "关闭任务组错误"),
START_TASK_GROUP_ERROR(130012, "start task group error", "启动任务组错误"),
QUERY_TASK_GROUP_QUEUE_LIST_ERROR(130013, "query task group queue list error", "查询任务组队列列表错误"),
TASK_GROUP_CACHE_START_FAILED(130014, "cache start failed", "任务组相关的缓存启动失败"),
ENVIRONMENT_WORKER_GROUPS_IS_INVALID(130015, "environment worker groups is invalid format", "环境关联的工作组参数解析错误"),
UPDATE_ENVIRONMENT_WORKER_GROUP_RELATION_ERROR(130016,
"You can't modify the worker group, because the worker group [{0}] and this environment [{1}] already be used in the task [{2}]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.dolphinscheduler.api.service;

import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.enums.TaskExecuteType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
Expand Down Expand Up @@ -101,14 +100,5 @@ void forceTaskSuccess(User loginUser,
*/
TaskInstance queryTaskInstanceById(User loginUser, long projectCode, Long taskInstanceId);

/**
* remove task instance cache
* @param loginUser
* @param projectCode
* @param taskInstanceId
* @return
*/
TaskInstanceRemoveCacheResponse removeTaskInstanceCache(User loginUser, long projectCode, Integer taskInstanceId);

void deleteByWorkflowInstanceId(Integer workflowInstanceId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
package org.apache.dolphinscheduler.api.service.impl;

import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.FORCED_SUCCESS;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_UPDATE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_INSTANCE;

import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ProjectService;
Expand All @@ -43,7 +41,6 @@
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils;
import org.apache.dolphinscheduler.extract.base.client.Clients;
import org.apache.dolphinscheduler.extract.common.ILogService;
import org.apache.dolphinscheduler.extract.worker.IPhysicalTaskExecutorOperator;
Expand All @@ -56,7 +53,6 @@
import org.apache.dolphinscheduler.task.executor.operations.TaskExecutorKillResponse;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;

import java.util.Date;
import java.util.HashSet;
Expand Down Expand Up @@ -325,31 +321,6 @@ public TaskInstance queryTaskInstanceById(User loginUser, long projectCode, Long
return taskInstance;
}

@Override
public TaskInstanceRemoveCacheResponse removeTaskInstanceCache(User loginUser, long projectCode,
Integer taskInstanceId) {
Result result = new Result();

Project project = projectMapper.queryByCode(projectCode);
projectService.checkProjectAndAuthThrowException(loginUser, project, INSTANCE_UPDATE);

TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstanceId);
if (taskInstance == null) {
log.error("Task definition can not be found, projectCode:{}, taskInstanceId:{}.", projectCode,
taskInstanceId);
putMsg(result, Status.TASK_INSTANCE_NOT_FOUND);
return new TaskInstanceRemoveCacheResponse(result);
}
String tagCacheKey = taskInstance.getCacheKey();
Pair<Integer, String> taskIdAndCacheKey = TaskCacheUtils.revertCacheKey(tagCacheKey);
String cacheKey = taskIdAndCacheKey.getRight();
if (StringUtils.isNotEmpty(cacheKey)) {
taskInstanceDao.clearCacheByCacheKey(cacheKey);
}
putMsg(result, Status.SUCCESS);
return new TaskInstanceRemoveCacheResponse(result, cacheKey);
}

@Override
public void deleteByWorkflowInstanceId(Integer workflowInstanceId) {
List<TaskInstance> needToDeleteTaskInstances =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.when;

import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
Expand Down Expand Up @@ -410,29 +409,4 @@ public void testForceTaskSuccess_withTaskInstanceNotFinished() {
() -> taskInstanceService.forceTaskSuccess(user, task.getProjectCode(), task.getId()));
}

@Test
public void testRemoveTaskInstanceCache() {
User user = getAdminUser();
long projectCode = 1L;
Project project = getProject(projectCode);
int taskId = 1;
TaskInstance task = getTaskInstance();
String cacheKey = "950311f3597f9198976cd3fd69e208e5b9ba6750";
task.setCacheKey(cacheKey);

when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(taskInstanceMapper.selectById(1)).thenReturn(task);
when(taskInstanceDao.queryByCacheKey(cacheKey)).thenReturn(task, null);
when(taskInstanceDao.updateById(task)).thenReturn(true);

TaskInstanceRemoveCacheResponse response =
taskInstanceService.removeTaskInstanceCache(user, projectCode, taskId);
Assertions.assertEquals(Status.SUCCESS.getCode(), response.getCode());

when(taskInstanceMapper.selectById(1)).thenReturn(null);
TaskInstanceRemoveCacheResponse responseNotFoundTask =
taskInstanceService.removeTaskInstanceCache(user, projectCode, taskId);
Assertions.assertEquals(Status.TASK_INSTANCE_NOT_FOUND.getCode(), responseNotFoundTask.getCode());

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
* have_arr_variables
* have_map_variables
* have_alert
* is_cache
*/
public enum Flag {

Expand Down
3 changes: 0 additions & 3 deletions dolphinscheduler-common/src/test/resources/sql/mysql_ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ ALTER TABLE `t_ds_plugin_define` AUTO_INCREMENT 2;
ALTER TABLE `t_ds_workflow_instance` MODIFY COLUMN `state_history` text NULL COMMENT 'state history desc';
ALTER TABLE `t_ds_project` MODIFY COLUMN `description` varchar(255) NULL;
ALTER TABLE `t_ds_task_group` MODIFY COLUMN `description` varchar(255) NULL;
ALTER TABLE `t_ds_task_instance` MODIFY COLUMN `app_link` text NULL COMMENT 'yarn app id', MODIFY COLUMN `cache_key` varchar(200) NULL COMMENT 'cache_key', MODIFY COLUMN `executor_name` varchar(64) NULL;
ALTER TABLE `t_ds_worker_group` MODIFY COLUMN `description` text NULL COMMENT 'description';
ALTER TABLE `t_ds_task_instance` MODIFY COLUMN `cache_key` varchar(200) NULL COMMENT 'cache_key', MODIFY COLUMN `executor_name` varchar(64) NULL;
ALTER TABLE `t_ds_fav_task` MODIFY COLUMN `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'id';
ALTER TABLE `t_ds_task_instance` MODIFY COLUMN `cache_key` varchar(200) NULL COMMENT 'cache_key', MODIFY COLUMN `executor_name` varchar(64) NULL;
SET FOREIGN_KEY_CHECKS = 1;
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,6 @@ public class TaskDefinition {
*/
private Flag flag;

/**
* task is cache: yes/no
*/
private Flag isCache;

/**
* task priority
*/
Expand Down Expand Up @@ -291,7 +286,6 @@ public boolean equals(Object o) {
&& Objects.equals(taskType, that.taskType)
&& Objects.equals(taskParams, that.taskParams)
&& flag == that.flag
&& isCache == that.isCache
&& taskPriority == that.taskPriority
&& Objects.equals(workerGroup, that.workerGroup)
&& timeoutFlag == that.timeoutFlag
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ public TaskDefinitionLog(TaskDefinition taskDefinition) {
this.setFailRetryInterval(taskDefinition.getFailRetryInterval());
this.setFailRetryTimes(taskDefinition.getFailRetryTimes());
this.setFlag(taskDefinition.getFlag());
this.setIsCache(taskDefinition.getIsCache());
this.setModifyBy(taskDefinition.getModifyBy());
this.setCpuQuota(taskDefinition.getCpuQuota());
this.setMemoryMax(taskDefinition.getMemoryMax());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

import lombok.Data;

import com.baomidou.mybatisplus.annotation.FieldStrategy;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
Expand Down Expand Up @@ -95,11 +94,6 @@ public class TaskInstance implements Serializable {

private Flag flag;

private Flag isCache;

@TableField(updateStrategy = FieldStrategy.IGNORED)
private String cacheKey;

@TableField(exist = false)
private String duration;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@ List<TaskInstance> findValidTaskListByWorkflowInstanceId(@Param("workflowInstanc
TaskInstance queryByInstanceIdAndCode(@Param("workflowInstanceId") int workflowInstanceId,
@Param("taskCode") Long taskCode);

TaskInstance queryByCacheKey(@Param("cacheKey") String cacheKey);

Boolean clearCacheByCacheKey(@Param("cacheKey") String cacheKey);

List<TaskInstance> queryByWorkflowInstanceIdsAndTaskCodes(@Param("workflowInstanceIds") List<Integer> workflowInstanceIds,
@Param("taskCodes") List<Long> taskCodes);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,22 +79,6 @@ public interface TaskInstanceDao extends IDao<TaskInstance> {
*/
List<TaskInstance> queryPreviousTaskListByWorkflowInstanceId(Integer workflowInstanceId);

/**
* find task instance by cache_key
*
* @param cacheKey cache key
* @return task instance
*/
TaskInstance queryByCacheKey(String cacheKey);

/**
* clear task instance cache by cache_key
*
* @param cacheKey cache key
* @return task instance
*/
Boolean clearCacheByCacheKey(String cacheKey);

void deleteByWorkflowInstanceId(int workflowInstanceId);

List<TaskInstance> queryByWorkflowInstanceId(Integer workflowInstanceId);
Expand Down
Loading

0 comments on commit 7ab60ba

Please sign in to comment.