diff --git a/fe/fe-core/src/main/java/com/starrocks/load/loadv2/BulkLoadJob.java b/fe/fe-core/src/main/java/com/starrocks/load/loadv2/BulkLoadJob.java index c02467bda6637..c7921f1474458 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/loadv2/BulkLoadJob.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/loadv2/BulkLoadJob.java @@ -258,7 +258,7 @@ public void onTaskFailed(long taskId, FailMsg failMsg) { return; } - if (!failMsg.getMsg().contains("timeout")) { + if (!failMsg.getMsg().contains("timeout") || failMsg.getCancelType() == FailMsg.CancelType.USER_CANCEL) { unprotectedExecuteCancel(failMsg, true); logFinalOperation(); } diff --git a/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadTask.java b/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadTask.java index 9ea43c204ab4a..496465555cf0a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadTask.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadTask.java @@ -73,8 +73,13 @@ protected void exec() { // callback on pending task finished callback.onTaskFinished(attachment); isFinished = true; + } catch (LoadException e) { + failMsg.setMsg(e.getMessage() == null ? "" : e.getMessage()); + LOG.warn(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId()) + .add("error_msg", "Failed to execute load task").build(), e); } catch (UserException e) { failMsg.setMsg(e.getMessage() == null ? "" : e.getMessage()); + failMsg.setCancelType(FailMsg.CancelType.USER_CANCEL); LOG.warn(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId()) .add("error_msg", "Failed to execute load task").build(), e); } catch (Exception e) { diff --git a/fe/fe-core/src/test/java/com/starrocks/load/loadv2/BrokerLoadJobTest.java b/fe/fe-core/src/test/java/com/starrocks/load/loadv2/BrokerLoadJobTest.java index 28d74a13d75f0..63ad8a265e734 100644 --- a/fe/fe-core/src/test/java/com/starrocks/load/loadv2/BrokerLoadJobTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/load/loadv2/BrokerLoadJobTest.java @@ -434,6 +434,24 @@ public void logEndLoadJob(LoadJobFinalOperation loadJobFinalOperation) { Assert.assertEquals(0, idToTasks.size()); } + @Test + public void testTaskOnResourceGroupTaskFailed(@Injectable long taskId, @Injectable FailMsg failMsg) { + GlobalStateMgr.getCurrentState().setEditLog(new EditLog(new ArrayBlockingQueue<>(100))); + new MockUp() { + @Mock + public void logEndLoadJob(LoadJobFinalOperation loadJobFinalOperation) { + + } + }; + + BrokerLoadJob brokerLoadJob = new BrokerLoadJob(); + failMsg = new FailMsg(FailMsg.CancelType.USER_CANCEL, "Failed to allocate resource to query: pending timeout"); + brokerLoadJob.onTaskFailed(taskId, failMsg); + + Map idToTasks = Deencapsulation.getField(brokerLoadJob, "idToTasks"); + Assert.assertEquals(0, idToTasks.size()); + } + @Test public void testPendingTaskOnFinishedWithJobCancelled(@Injectable BrokerPendingTaskAttachment attachment) { BrokerLoadJob brokerLoadJob = new BrokerLoadJob();