Skip to content

Commit

Permalink
refactor: make cancelRetryIndicator called in dispatcher thread and s…
Browse files Browse the repository at this point in the history
…implification.
  • Loading branch information
areyouok committed Jan 5, 2025
1 parent e76a5eb commit f9774ca
Showing 1 changed file with 17 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,6 @@ public AsyncIoTask(FiberGroup fiberGroup, DtFile dtFile) {
this(fiberGroup, dtFile, null, false, null);
}

public AsyncIoTask(FiberGroup fiberGroup, DtFile dtFile, int[] retryInterval, boolean retryForever) {
this(fiberGroup, dtFile, retryInterval, retryForever, null);
}

public AsyncIoTask(FiberGroup fiberGroup, DtFile dtFile, int[] retryInterval, boolean retryForever,
Supplier<Boolean> cancelRetryIndicator) {
this.fiberGroup = fiberGroup;
Expand Down Expand Up @@ -140,26 +136,16 @@ protected void fireComplete(Throwable ex) {
}

private void retry(Throwable ioEx) {
if (retryInterval == null) {
long sleepTime = StoreUtil.calcRetryInterval(retryCount, retryInterval);
if(sleepTime <= 0) {
fireComplete(ioEx);
return;
}
if (shouldCancelRetry()) {
// assert retryInterval is not null since StoreUtil.calcRetryInterval checked it
if (retryCount >= retryInterval.length && !retryForever) {
fireComplete(ioEx);
return;
}
long sleepTime;
if (retryCount >= retryInterval.length) {
if (retryForever) {
sleepTime = retryInterval[retryInterval.length - 1];
retryCount++;
} else {
fireComplete(ioEx);
return;
}
} else {
sleepTime = retryInterval[retryCount++];
}

Fiber retryFiber = new Fiber("io-retry-fiber", fiberGroup, new FiberFrame<>() {
@Override
Expand All @@ -173,6 +159,7 @@ private FrameCallResult resume(Void v) {
fireComplete(ioEx);
return Fiber.frameReturn();
}
retryCount++;
if (ioBuffer == null) {
submitForceTask();
} else {
Expand All @@ -188,22 +175,22 @@ protected FrameCallResult handle(Throwable ex) {
fireComplete(ex);
return Fiber.frameReturn();
}

private boolean shouldCancelRetry() {
if (isGroupShouldStopPlain()) {
// if fiber group is stopped, ignore cancelIndicator and retryForever
return true;
}
if (cancelRetryIndicator != null && cancelRetryIndicator.get()) {
log.warn("retry canceled by cancelIndicator");
return true;
}
return false;
}
});
fiberGroup.fireFiber(retryFiber);
}

private boolean shouldCancelRetry() {
if (fiberGroup.isShouldStop()) {
// if fiber group is stopped, ignore cancelIndicator and retryForever
return true;
}
if (cancelRetryIndicator != null && cancelRetryIndicator.get()) {
log.warn("retry canceled by cancelIndicator");
return true;
}
return false;
}

// this method set to protected for mock error in unit test
protected void exec(long pos) {
try {
Expand Down

0 comments on commit f9774ca

Please sign in to comment.