From f9774cac263640225f169fe258652af301f907fc Mon Sep 17 00:00:00 2001 From: huangli Date: Sun, 5 Jan 2025 19:14:11 +0800 Subject: [PATCH] refactor: make cancelRetryIndicator called in dispatcher thread and simplification. --- .../dongting/raft/store/AsyncIoTask.java | 47 +++++++------------ 1 file changed, 17 insertions(+), 30 deletions(-) diff --git a/server/src/main/java/com/github/dtprj/dongting/raft/store/AsyncIoTask.java b/server/src/main/java/com/github/dtprj/dongting/raft/store/AsyncIoTask.java index f44e2837..0f2c2f21 100644 --- a/server/src/main/java/com/github/dtprj/dongting/raft/store/AsyncIoTask.java +++ b/server/src/main/java/com/github/dtprj/dongting/raft/store/AsyncIoTask.java @@ -63,10 +63,6 @@ public AsyncIoTask(FiberGroup fiberGroup, DtFile dtFile) { this(fiberGroup, dtFile, null, false, null); } - public AsyncIoTask(FiberGroup fiberGroup, DtFile dtFile, int[] retryInterval, boolean retryForever) { - this(fiberGroup, dtFile, retryInterval, retryForever, null); - } - public AsyncIoTask(FiberGroup fiberGroup, DtFile dtFile, int[] retryInterval, boolean retryForever, Supplier cancelRetryIndicator) { this.fiberGroup = fiberGroup; @@ -140,26 +136,16 @@ protected void fireComplete(Throwable ex) { } private void retry(Throwable ioEx) { - if (retryInterval == null) { + long sleepTime = StoreUtil.calcRetryInterval(retryCount, retryInterval); + if(sleepTime <= 0) { fireComplete(ioEx); return; } - if (shouldCancelRetry()) { + // assert retryInterval is not null since StoreUtil.calcRetryInterval checked it + if (retryCount >= retryInterval.length && !retryForever) { fireComplete(ioEx); return; } - long sleepTime; - if (retryCount >= retryInterval.length) { - if (retryForever) { - sleepTime = retryInterval[retryInterval.length - 1]; - retryCount++; - } else { - fireComplete(ioEx); - return; - } - } else { - sleepTime = retryInterval[retryCount++]; - } Fiber retryFiber = new Fiber("io-retry-fiber", fiberGroup, new FiberFrame<>() { @Override @@ -173,6 +159,7 @@ private FrameCallResult resume(Void v) { fireComplete(ioEx); return Fiber.frameReturn(); } + retryCount++; if (ioBuffer == null) { submitForceTask(); } else { @@ -188,22 +175,22 @@ protected FrameCallResult handle(Throwable ex) { fireComplete(ex); return Fiber.frameReturn(); } + + private boolean shouldCancelRetry() { + if (isGroupShouldStopPlain()) { + // if fiber group is stopped, ignore cancelIndicator and retryForever + return true; + } + if (cancelRetryIndicator != null && cancelRetryIndicator.get()) { + log.warn("retry canceled by cancelIndicator"); + return true; + } + return false; + } }); fiberGroup.fireFiber(retryFiber); } - private boolean shouldCancelRetry() { - if (fiberGroup.isShouldStop()) { - // if fiber group is stopped, ignore cancelIndicator and retryForever - return true; - } - if (cancelRetryIndicator != null && cancelRetryIndicator.get()) { - log.warn("retry canceled by cancelIndicator"); - return true; - } - return false; - } - // this method set to protected for mock error in unit test protected void exec(long pos) { try {