Skip to content

Commit

Permalink
[BugFix] fix broker load job hang when meet resource group pending ti…
Browse files Browse the repository at this point in the history
…meout (#51072)

Signed-off-by: luohaha <[email protected]>
(cherry picked from commit d49cb08)
  • Loading branch information
luohaha authored and mergify[bot] committed Sep 18, 2024
1 parent 8f0a1e3 commit f926986
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ public void onTaskFailed(long taskId, FailMsg failMsg) {
return;
}

if (!failMsg.getMsg().contains("timeout")) {
if (!failMsg.getMsg().contains("timeout") || failMsg.getCancelType() == FailMsg.CancelType.USER_CANCEL) {
unprotectedExecuteCancel(failMsg, true);
logFinalOperation();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,13 @@ protected void exec() {
// callback on pending task finished
callback.onTaskFinished(attachment);
isFinished = true;
} catch (LoadException e) {
failMsg.setMsg(e.getMessage() == null ? "" : e.getMessage());
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId())
.add("error_msg", "Failed to execute load task").build(), e);
} catch (UserException e) {
failMsg.setMsg(e.getMessage() == null ? "" : e.getMessage());
failMsg.setCancelType(FailMsg.CancelType.USER_CANCEL);
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId())
.add("error_msg", "Failed to execute load task").build(), e);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,24 @@ public void logEndLoadJob(LoadJobFinalOperation loadJobFinalOperation) {
Assert.assertEquals(0, idToTasks.size());
}

@Test
public void testTaskOnResourceGroupTaskFailed(@Injectable long taskId, @Injectable FailMsg failMsg) {
GlobalStateMgr.getCurrentState().setEditLog(new EditLog(new ArrayBlockingQueue<>(100)));
new MockUp<EditLog>() {
@Mock
public void logEndLoadJob(LoadJobFinalOperation loadJobFinalOperation) {

}
};

BrokerLoadJob brokerLoadJob = new BrokerLoadJob();
failMsg = new FailMsg(FailMsg.CancelType.USER_CANCEL, "Failed to allocate resource to query: pending timeout");
brokerLoadJob.onTaskFailed(taskId, failMsg);

Map<Long, LoadTask> idToTasks = Deencapsulation.getField(brokerLoadJob, "idToTasks");
Assert.assertEquals(0, idToTasks.size());
}

@Test
public void testPendingTaskOnFinishedWithJobCancelled(@Injectable BrokerPendingTaskAttachment attachment) {
BrokerLoadJob brokerLoadJob = new BrokerLoadJob();
Expand Down

0 comments on commit f926986

Please sign in to comment.