Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] fix broker load job hang when meet resource group pending timeout (backport #51072) #51127

Merged
merged 1 commit into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ public void onTaskFailed(long taskId, FailMsg failMsg) {
return;
}

if (!failMsg.getMsg().contains("timeout")) {
if (!failMsg.getMsg().contains("timeout") || failMsg.getCancelType() == FailMsg.CancelType.USER_CANCEL) {
unprotectedExecuteCancel(failMsg, true);
logFinalOperation();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,13 @@ protected void exec() {
// callback on pending task finished
callback.onTaskFinished(attachment);
isFinished = true;
} catch (LoadException e) {
failMsg.setMsg(e.getMessage() == null ? "" : e.getMessage());
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId())
.add("error_msg", "Failed to execute load task").build(), e);
} catch (UserException e) {
failMsg.setMsg(e.getMessage() == null ? "" : e.getMessage());
failMsg.setCancelType(FailMsg.CancelType.USER_CANCEL);
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId())
.add("error_msg", "Failed to execute load task").build(), e);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,24 @@ public void logEndLoadJob(LoadJobFinalOperation loadJobFinalOperation) {
Assert.assertEquals(0, idToTasks.size());
}

@Test
public void testTaskOnResourceGroupTaskFailed(@Injectable long taskId, @Injectable FailMsg failMsg) {
GlobalStateMgr.getCurrentState().setEditLog(new EditLog(new ArrayBlockingQueue<>(100)));
new MockUp<EditLog>() {
@Mock
public void logEndLoadJob(LoadJobFinalOperation loadJobFinalOperation) {

}
};

BrokerLoadJob brokerLoadJob = new BrokerLoadJob();
failMsg = new FailMsg(FailMsg.CancelType.USER_CANCEL, "Failed to allocate resource to query: pending timeout");
brokerLoadJob.onTaskFailed(taskId, failMsg);

Map<Long, LoadTask> idToTasks = Deencapsulation.getField(brokerLoadJob, "idToTasks");
Assert.assertEquals(0, idToTasks.size());
}

@Test
public void testPendingTaskOnFinishedWithJobCancelled(@Injectable BrokerPendingTaskAttachment attachment) {
BrokerLoadJob brokerLoadJob = new BrokerLoadJob();
Expand Down
Loading