From f274edf0b8161fdce2970e9bcf53dce083463d5e Mon Sep 17 00:00:00 2001 From: cyjseagull Date: Fri, 1 Nov 2024 10:32:12 +0800 Subject: [PATCH] support kill job --- .../components/loadbalancer/LoadBalancer.java | 4 ++ .../loadbalancer/impl/LoadBalancerImpl.java | 5 ++ .../wedpr/components/project/dao/JobDO.java | 4 ++ .../main/resources/mapper/ProjectMapper.xml | 2 +- .../scheduler/client/ModelClient.java | 9 ++- .../scheduler/config/SchedulerLoader.java | 2 +- .../components/scheduler/core/JobSyncer.java | 2 +- .../dag/DagWorkFlowSchedulerImpl.java | 11 ++-- .../scheduler/dag/worker/ModelWorker.java | 5 +- .../scheduler/dag/worker/MpcWorker.java | 3 +- .../scheduler/dag/worker/PsiWorker.java | 3 +- .../scheduler/dag/worker/Worker.java | 12 ++-- .../scheduler/dag/worker/WorkerStatus.java | 7 +- .../executor/impl/ExecutiveContext.java | 7 ++ .../impl/dag/DagSchedulerExecutor.java | 65 +++++++++++++++++-- .../executor/impl/ml/response/MLResponse.java | 7 ++ .../executor/manager/ExecutorManagerImpl.java | 25 ++++--- 17 files changed, 138 insertions(+), 35 deletions(-) diff --git a/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/LoadBalancer.java b/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/LoadBalancer.java index 5aafe97b..cd13c6e8 100644 --- a/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/LoadBalancer.java +++ b/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/LoadBalancer.java @@ -15,6 +15,8 @@ package com.webank.wedpr.components.loadbalancer; +import java.util.List; + public interface LoadBalancer { public static enum Policy { ROUND_ROBIN, @@ -22,4 +24,6 @@ public static enum Policy { } EntryPointInfo selectService(Policy policy, String serviceType); + + List selectAllEndPoint(String serviceType); } diff --git a/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/impl/LoadBalancerImpl.java b/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/impl/LoadBalancerImpl.java index d571626d..871fba85 100644 --- a/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/impl/LoadBalancerImpl.java +++ b/wedpr-components/meta/loadbalancer/src/main/java/com/webank/wedpr/components/loadbalancer/impl/LoadBalancerImpl.java @@ -30,6 +30,11 @@ public LoadBalancerImpl(EntryPointFetcher entryPointFetcher) { this.entryPointFetcher = entryPointFetcher; } + @Override + public List selectAllEndPoint(String serviceType) { + return entryPointFetcher.getAliveEntryPoints(serviceType); + } + @Override public EntryPointInfo selectService(Policy policy, String serviceType) { List entryPointInfoList = diff --git a/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/dao/JobDO.java b/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/dao/JobDO.java index 4ea1f74a..63d8f148 100644 --- a/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/dao/JobDO.java +++ b/wedpr-components/meta/project/src/main/java/com/webank/wedpr/components/project/dao/JobDO.java @@ -148,6 +148,7 @@ public static JobResult deserialize(String data) { @JsonIgnore private transient List taskParties; @JsonIgnore private transient Integer limitItems = -1; + @JsonIgnore private transient Boolean killed = false; // shouldSync or not private Boolean shouldSync; @@ -304,6 +305,9 @@ public Boolean isJobParty(String agency) { if (this.ownerAgency.compareToIgnoreCase(agency) == 0) { return Boolean.TRUE; } + if (taskParties == null) { + return Boolean.FALSE; + } for (FollowerDO followerDO : taskParties) { if (followerDO.getAgency().compareToIgnoreCase(agency) == 0) { return Boolean.TRUE; diff --git a/wedpr-components/meta/project/src/main/resources/mapper/ProjectMapper.xml b/wedpr-components/meta/project/src/main/resources/mapper/ProjectMapper.xml index ccca6a60..0614d812 100644 --- a/wedpr-components/meta/project/src/main/resources/mapper/ProjectMapper.xml +++ b/wedpr-components/meta/project/src/main/resources/mapper/ProjectMapper.xml @@ -198,7 +198,7 @@ select - `id`, `owner`, `owner_agency`, `job_type` + `id`, `owner`, `owner_agency`, `job_type`, `parties` * diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/client/ModelClient.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/client/ModelClient.java index 7254b7d7..3139beff 100644 --- a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/client/ModelClient.java +++ b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/client/ModelClient.java @@ -9,6 +9,7 @@ import com.webank.wedpr.components.http.client.HttpClientImpl; import com.webank.wedpr.components.scheduler.dag.entity.JobWorker; import com.webank.wedpr.components.scheduler.dag.utils.WorkerUtils; +import com.webank.wedpr.components.scheduler.dag.worker.WorkerStatus; import com.webank.wedpr.components.scheduler.executor.impl.ml.MLExecutorConfig; import com.webank.wedpr.components.scheduler.executor.impl.ml.request.ModelJobRequest; import com.webank.wedpr.components.scheduler.executor.impl.ml.response.MLResponse; @@ -71,7 +72,7 @@ public String submitTask(String params, JobWorker jobWorker) throws Exception { } @SneakyThrows - public void pollTask(String taskId) throws WeDPRException { + public WorkerStatus pollTask(String taskId) throws WeDPRException { String requestUrl = MLExecutorConfig.getRunTaskApiUrl(url, taskId); @@ -102,9 +103,13 @@ public void pollTask(String taskId) throws WeDPRException { + " ,response: " + response); } + if (response.killed()) { + logger.info("The ml task {} has been killed, response: {}", taskId, response); + return response.getData().getWorkerStatus(); + } if (response.success()) { - return; + return response.getData().getWorkerStatus(); } // task is running diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/config/SchedulerLoader.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/config/SchedulerLoader.java index 5aaee532..a3853809 100644 --- a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/config/SchedulerLoader.java +++ b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/config/SchedulerLoader.java @@ -132,7 +132,7 @@ protected void registerExecutors( new ExecutiveContextBuilder(projectMapperWrapper), threadPoolService); executorManager.registerExecutor(ExecutorType.DAG.getType(), dagSchedulerExecutor); - // register the pir executor, TODO: implement the taskFinishHandler + // register the pir executor executorManager.registerExecutor( ExecutorType.PIR.getType(), new PirExecutor( diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/core/JobSyncer.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/core/JobSyncer.java index 2138df78..f74de3bd 100644 --- a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/core/JobSyncer.java +++ b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/core/JobSyncer.java @@ -146,7 +146,7 @@ public void onReceiveRunAction(ResourceSyncer.CommitArgs commitArgs) public void onReceiveKillAction(ResourceSyncer.CommitArgs commitArgs) throws JsonProcessingException { BatchJobList jobList = - BatchJobList.deserialize(commitArgs.getResourceActionRecord().getResourceAction()); + BatchJobList.deserialize(commitArgs.getResourceActionRecord().getResourceContent()); logger.info("onReceiveKillAction, job size: {}", jobList.getJobs()); this.scheduler.batchKillJobs(jobList.getJobs()); } diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/DagWorkFlowSchedulerImpl.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/DagWorkFlowSchedulerImpl.java index a2549d25..e98645a2 100644 --- a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/DagWorkFlowSchedulerImpl.java +++ b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/DagWorkFlowSchedulerImpl.java @@ -189,10 +189,13 @@ public void executeWorker(Worker worker) throws Exception { workerId); return; } - worker.run(jobWorker.getStatus()); - - jobWorkerMapper.updateJobWorkerStatus(workerId, WorkerStatus.SUCCESS.name()); - logger.info("worker executed successfully, jobId: {}, workId: {}", jobId, workerId); + WorkerStatus status = worker.run(jobWorker.getStatus()); + if (status != WorkerStatus.KILLED) { + jobWorkerMapper.updateJobWorkerStatus(workerId, status.name()); + logger.info("worker executed successfully, jobId: {}, workId: {}", jobId, workerId); + } else { + logger.info("worker has been killed, jobId: {}, workId: {}", jobId, workerId); + } } catch (Exception e) { logger.error("worker executed failed, jobId: {}, workId: {}, e: ", jobId, workerId, e); jobWorkerMapper.updateJobWorkerStatus(workerId, WorkerStatus.FAILURE.name()); diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/ModelWorker.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/ModelWorker.java index 84cce402..bef54678 100644 --- a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/ModelWorker.java +++ b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/ModelWorker.java @@ -24,7 +24,7 @@ public ModelWorker( } @Override - public void engineRun() throws Exception { + public WorkerStatus engineRun() throws Exception { String jobId = getJobId(); String workerId = getWorkerId(); @@ -52,8 +52,7 @@ public void engineRun() throws Exception { // submit task String taskId = modelClient.submitTask(args, getJobWorker()); // poll until the task finished - modelClient.pollTask(getJobWorker().getWorkerId()); - + return modelClient.pollTask(getJobWorker().getWorkerId()); } finally { long endTimeMillis = System.currentTimeMillis(); logger.info( diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/MpcWorker.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/MpcWorker.java index 85f18a84..dfa4dcf6 100644 --- a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/MpcWorker.java +++ b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/MpcWorker.java @@ -23,7 +23,7 @@ public MpcWorker( } @Override - public void engineRun() throws WeDPRException { + public WorkerStatus engineRun() throws WeDPRException { EntryPointInfo entryPoint = getLoadBalancer() @@ -33,5 +33,6 @@ public void engineRun() throws WeDPRException { } logger.info("## getting mpc client: {}", entryPoint); + return WorkerStatus.SUCCESS; } } diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/PsiWorker.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/PsiWorker.java index 279db358..0df5bd8c 100644 --- a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/PsiWorker.java +++ b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/PsiWorker.java @@ -24,7 +24,7 @@ public PsiWorker( } @Override - public void engineRun() throws Exception { + public WorkerStatus engineRun() throws Exception { String jobId = getJobId(); String workerId = getWorkerId(); @@ -57,6 +57,7 @@ public void engineRun() throws Exception { String taskId = psiClient.submitTask(workerArgs); // poll until the task finished psiClient.pollTask(taskId); + return WorkerStatus.SUCCESS; } finally { long endTimeMillis = System.currentTimeMillis(); logger.info( diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/Worker.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/Worker.java index 692080a3..d44cb019 100644 --- a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/Worker.java +++ b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/Worker.java @@ -81,16 +81,16 @@ public void logWorker() { * * @return */ - public abstract void engineRun() throws Exception; + public abstract WorkerStatus engineRun() throws Exception; - public boolean run(String workerStatus) throws Exception { + public WorkerStatus run(String workerStatus) throws Exception { if (workerStatus.equals(WorkerStatus.SUCCESS.name())) { logger.info( "worker has been executed successfully, jobId: {}, workId: {}", jobId, workerId); - return false; + return WorkerStatus.SUCCESS; } logWorker(); @@ -101,9 +101,9 @@ public boolean run(String workerStatus) throws Exception { while (attemptTimes++ < retryTimes) { try { logger.info(workerStartLog(workerId)); - this.engineRun(); + WorkerStatus status = this.engineRun(); logger.info(workerEndLog(workerId)); - return true; + return status; } catch (Exception e) { if (attemptTimes >= retryTimes) { logger.error( @@ -125,7 +125,7 @@ public boolean run(String workerStatus) throws Exception { } } - return false; + return WorkerStatus.FAILURE; } String workerStartLog(String workId) { diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/WorkerStatus.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/WorkerStatus.java index ad784752..4f3227f9 100644 --- a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/WorkerStatus.java +++ b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/WorkerStatus.java @@ -23,8 +23,11 @@ public String getStatus() { } public boolean isFailed() { - return ordinal() == WorkerStatus.FAILURE.ordinal() - || ordinal() == WorkerStatus.KILLED.ordinal(); + return ordinal() == WorkerStatus.FAILURE.ordinal(); + } + + public boolean isKilled() { + return ordinal() == WorkerStatus.KILLED.ordinal(); } public boolean isSuccess() { diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/ExecutiveContext.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/ExecutiveContext.java index e05c77a3..76070424 100644 --- a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/ExecutiveContext.java +++ b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/ExecutiveContext.java @@ -52,6 +52,13 @@ public TaskFinishedHandler getTaskFinishedHandler() { } public void onTaskFinished(ExecuteResult result) { + // need to kill the job, no need to call the handler + if (!job.getKilled()) { + logger.info( + "onTaskFinished return directly for the job is been killed, job: {}", + job.toString()); + return; + } JobDO.JobResultItem subJobResult = new JobDO.JobResultItem( taskID, result.getResultStatus().success(), result.serialize()); diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/dag/DagSchedulerExecutor.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/dag/DagSchedulerExecutor.java index 2112f4f2..94340a15 100644 --- a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/dag/DagSchedulerExecutor.java +++ b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/dag/DagSchedulerExecutor.java @@ -1,18 +1,25 @@ package com.webank.wedpr.components.scheduler.executor.impl.dag; import com.webank.wedpr.common.protocol.ExecutorType; +import com.webank.wedpr.common.utils.BaseResponse; import com.webank.wedpr.common.utils.ThreadPoolService; +import com.webank.wedpr.common.utils.WeDPRException; +import com.webank.wedpr.components.http.client.HttpClientImpl; +import com.webank.wedpr.components.loadbalancer.EntryPointInfo; import com.webank.wedpr.components.loadbalancer.LoadBalancer; import com.webank.wedpr.components.project.JobChecker; import com.webank.wedpr.components.project.dao.JobDO; import com.webank.wedpr.components.scheduler.api.WorkFlowOrchestratorApi; import com.webank.wedpr.components.scheduler.dag.DagWorkFlowSchedulerImpl; import com.webank.wedpr.components.scheduler.dag.api.WorkFlowScheduler; +import com.webank.wedpr.components.scheduler.dag.utils.ServiceName; import com.webank.wedpr.components.scheduler.executor.ExecuteResult; import com.webank.wedpr.components.scheduler.executor.Executor; import com.webank.wedpr.components.scheduler.executor.callback.TaskFinishedHandler; import com.webank.wedpr.components.scheduler.executor.impl.ExecutiveContext; import com.webank.wedpr.components.scheduler.executor.impl.ExecutiveContextBuilder; +import com.webank.wedpr.components.scheduler.executor.impl.ml.MLExecutorConfig; +import com.webank.wedpr.components.scheduler.executor.impl.ml.response.MLResponseFactory; import com.webank.wedpr.components.scheduler.executor.impl.model.FileMetaBuilder; import com.webank.wedpr.components.scheduler.executor.manager.ExecutorManager; import com.webank.wedpr.components.scheduler.mapper.JobWorkerMapper; @@ -20,6 +27,7 @@ import com.webank.wedpr.components.scheduler.workflow.WorkFlowOrchestrator; import com.webank.wedpr.components.scheduler.workflow.builder.JobWorkFlowBuilderManager; import com.webank.wedpr.components.storage.api.FileStorageInterface; +import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,10 +38,10 @@ public class DagSchedulerExecutor implements Executor { private final WorkFlowScheduler workFlowScheduler; private final WorkFlowOrchestratorApi workflowOrchestrator; private final ExecutorManager executorManager; - private final ExecutiveContextBuilder executiveContextBuilder; private final ThreadPoolService threadPoolService; + private final LoadBalancer loadBalancer; public DagSchedulerExecutor( LoadBalancer loadBalancer, @@ -44,6 +52,7 @@ public DagSchedulerExecutor( ExecutorManager executorManager, ExecutiveContextBuilder executiveContextBuilder, ThreadPoolService threadPoolService) { + this.loadBalancer = loadBalancer; this.executiveContextBuilder = executiveContextBuilder; this.threadPoolService = threadPoolService; this.executorManager = executorManager; @@ -89,7 +98,6 @@ public void innerExecute(JobDO jobDO) { WorkFlow workflow = workflowOrchestrator.buildWorkFlow(jobDO); this.workFlowScheduler.schedule(jobDO.getId(), workflow); - executiveContext.onTaskFinished(new ExecuteResult(ExecuteResult.ResultStatus.SUCCESS)); long endTimeMillis = System.currentTimeMillis(); @@ -100,10 +108,8 @@ public void innerExecute(JobDO jobDO) { (endTimeMillis - startTimeMillis)); } catch (Exception e) { - executiveContext.onTaskFinished( new ExecuteResult(e.getMessage(), ExecuteResult.ResultStatus.FAILED)); - long endTimeMillis = System.currentTimeMillis(); logger.warn( @@ -115,7 +121,56 @@ public void innerExecute(JobDO jobDO) { } @Override - public void kill(JobDO jobDO) throws Exception {} + public void kill(JobDO jobDO) throws Exception { + if (jobDO.getType().mlJob()) { + killModelJob(jobDO); + } + } + + // Note: since the job may exist in any node, establish kill command to all nodes + public void killModelJob(JobDO jobDO) throws Exception { + logger.info("killModelJob: {}", jobDO.getId()); + List aliveEntryPoint = + loadBalancer.selectAllEndPoint(ServiceName.MODEL.getValue()); + if (aliveEntryPoint == null || aliveEntryPoint.isEmpty()) { + return; + } + boolean failed = false; + String reason = ""; + for (EntryPointInfo entryPointInfo : aliveEntryPoint) { + try { + logger.info("kill job: {}, entrypoint: {}", jobDO.toString(), entryPointInfo); + HttpClientImpl httpClient = + new HttpClientImpl( + MLExecutorConfig.getRunTaskApiUrl( + entryPointInfo.getEntryPoint(), jobDO.getId()), + MLExecutorConfig.getMaxTotalConnection(), + MLExecutorConfig.buildConfig(), + new MLResponseFactory()); + BaseResponse response = httpClient.execute(httpClient.getUrl(), true); + if (response.statusOk()) { + logger.info( + "kill job success: {}, entrypoint: {}", + jobDO.getJobRequest(), + entryPointInfo); + return; + } + logger.error( + "kill job {} failed, response: {}, entrypoint: {}", + jobDO.getId(), + response.serialize(), + entryPointInfo); + throw new WeDPRException("kill job failed, response: " + response.serialize()); + } catch (Exception e) { + failed = true; + reason = e.getMessage(); + } + } + if (failed) { + throw new WeDPRException(reason); + } + logger.info("killModelJob: {} success", jobDO.getId()); + } @Override public ExecuteResult queryStatus(String jobID) throws Exception { diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/ml/response/MLResponse.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/ml/response/MLResponse.java index d1439623..7d9201f3 100644 --- a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/ml/response/MLResponse.java +++ b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/impl/ml/response/MLResponse.java @@ -86,6 +86,13 @@ public Boolean failed() { return data.getWorkerStatus().isFailed(); } + public Boolean killed() { + if (data == null) { + return Boolean.FALSE; + } + return data.getWorkerStatus().isKilled(); + } + public static MLResponse deserialize(String data) throws JsonProcessingException { return ObjectMapperFactory.getObjectMapper().readValue(data, MLResponse.class); } diff --git a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/manager/ExecutorManagerImpl.java b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/manager/ExecutorManagerImpl.java index 705028c7..3634d3f4 100644 --- a/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/manager/ExecutorManagerImpl.java +++ b/wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/executor/manager/ExecutorManagerImpl.java @@ -28,7 +28,6 @@ import com.webank.wedpr.components.scheduler.executor.impl.ExecutiveContext; import com.webank.wedpr.components.scheduler.executor.impl.model.FileMetaBuilder; import com.webank.wedpr.components.storage.api.FileStorageInterface; -import java.util.List; import java.util.Map; import java.util.concurrent.*; import lombok.SneakyThrows; @@ -41,7 +40,7 @@ public class ExecutorManagerImpl implements ExecutorManager { protected Map executors = new ConcurrentHashMap<>(); - protected List proceedingJobs = new CopyOnWriteArrayList<>(); + protected Map proceedingJobs = new ConcurrentHashMap<>(); protected ScheduledExecutorService queryStatusWorker = new ScheduledThreadPoolExecutor(1); private final Integer queryStatusIntervalMs; @@ -72,17 +71,18 @@ public void run() { } protected void queryAllJobStatus() { - for (ExecutiveContext context : proceedingJobs) { + for (ExecutiveContext context : proceedingJobs.values()) { querySingleJobStatus(context); } } protected void querySingleJobStatus(ExecutiveContext executiveContext) { + String jobId = executiveContext.getJob().getId(); try { Executor executor = getExecutor(executiveContext.getJob().getJobType()); // Note: unreachable here if (executor == null) { - proceedingJobs.remove(executiveContext); + proceedingJobs.remove(jobId); return; } @@ -91,7 +91,7 @@ protected void querySingleJobStatus(ExecutiveContext executiveContext) { if (jobResult != null && jobResult.getJobStatus() != null && jobResult.getJobStatus().finished()) { - proceedingJobs.remove(executiveContext); + proceedingJobs.remove(jobId); return; } ExecuteResult result = executor.queryStatus(executiveContext.getTaskID()); @@ -101,7 +101,7 @@ protected void querySingleJobStatus(ExecutiveContext executiveContext) { if (result.finished()) { executiveContext.onTaskFinished(result); - proceedingJobs.remove(executiveContext); + proceedingJobs.remove(jobId); } } catch (Exception e) { logger.error( @@ -115,7 +115,7 @@ protected void querySingleJobStatus(ExecutiveContext executiveContext) { + " failed for " + e.getMessage(), ExecuteResult.ResultStatus.FAILED)); - proceedingJobs.remove(executiveContext); + proceedingJobs.remove(jobId); } int proceedingJobsSize = proceedingJobs.size(); @@ -151,7 +151,8 @@ public void execute(JobDO jobDO) { Constant.WEDPR_SUCCESS_MSG, ExecuteResult.ResultStatus.SUCCESS)); return; } - proceedingJobs.add( + proceedingJobs.put( + jobDO.getId(), new ExecutiveContext( jobDO, getTaskFinishHandler(jobType), @@ -185,6 +186,14 @@ public void kill(JobDO jobDO) throws Exception { if (executor == null) { return; } + //// find out the proceeding job, update the status to kill + if (this.proceedingJobs.containsKey(jobDO.getId())) { + ExecutiveContext proceedingJobCtx = proceedingJobs.get(jobDO.getId()); + // set the job to kill status + proceedingJobCtx.getJob().setKilled(true); + logger.info("Remove job: {} from proceedingJobs", jobDO.getId()); + proceedingJobs.remove(jobDO.getId()); + } executor.kill(jobDO); }