From 2ac740b19aa3f55b577510a3732bcbb4a3af2ccf Mon Sep 17 00:00:00 2001 From: huangli Date: Tue, 10 Dec 2024 22:43:02 +0800 Subject: [PATCH] chore: make apply fiber daemon --- .../com/github/dtprj/dongting/fiber/FiberGroup.java | 13 ------------- .../dtprj/dongting/raft/impl/ApplyManager.java | 8 +++----- .../dtprj/dongting/raft/impl/CommitManager.java | 4 ++-- .../dtprj/dongting/raft/rpc/AppendProcessor.java | 2 +- .../dongting/raft/rpc/TransferLeaderProcessor.java | 2 +- 5 files changed, 7 insertions(+), 22 deletions(-) diff --git a/server/src/main/java/com/github/dtprj/dongting/fiber/FiberGroup.java b/server/src/main/java/com/github/dtprj/dongting/fiber/FiberGroup.java index 85995027f..3fdf165e0 100644 --- a/server/src/main/java/com/github/dtprj/dongting/fiber/FiberGroup.java +++ b/server/src/main/java/com/github/dtprj/dongting/fiber/FiberGroup.java @@ -48,7 +48,6 @@ public class FiberGroup { @SuppressWarnings("FieldMayBeFinal") private volatile boolean shouldStop = false; private final static VarHandle SHOULD_STOP; - private long markStopNanos; final FiberChannel sysChannel; @@ -113,7 +112,6 @@ public FrameCallResult execute(Void input) { return Fiber.frameReturn(); } log.info("request shutdown group: {}", name); - markStopNanos = dispatcher.ts.getNanoTime(); SHOULD_STOP.setVolatile(FiberGroup.this, true); shouldStopCondition.signalAll(); return Fiber.frameReturn(); @@ -359,17 +357,6 @@ public ExecutorService getExecutor() { return executor; } - /** - * should call in dispatcher thread. - */ - public long timeAfterRequestStop(TimeUnit unit) { - checkGroup(); - if (!isShouldStopPlain()) { - return -1; - } - return unit.convert(dispatcher.ts.getNanoTime() - markStopNanos, TimeUnit.NANOSECONDS); - } - public FiberCondition getShouldStopCondition() { return shouldStopCondition; } diff --git a/server/src/main/java/com/github/dtprj/dongting/raft/impl/ApplyManager.java b/server/src/main/java/com/github/dtprj/dongting/raft/impl/ApplyManager.java index d7ccc118c..b82331d41 100644 --- a/server/src/main/java/com/github/dtprj/dongting/raft/impl/ApplyManager.java +++ b/server/src/main/java/com/github/dtprj/dongting/raft/impl/ApplyManager.java @@ -130,11 +130,11 @@ public FrameCallResult execute(Void input) { } private void startApplyFiber(FiberGroup fiberGroup) { - applyFiber = new Fiber("apply", fiberGroup, new ApplyFrame(), false, 50); + applyFiber = new Fiber("apply", fiberGroup, new ApplyFrame(), true, 50); applyFiber.start(); } - public void apply() { + public void wakeupApply() { needApplyCond.signal(); } @@ -311,9 +311,7 @@ private void tryApplyHeartBeat(long appliedIndex) { } private boolean shouldStopApply() { - return raftStatus.isInstallSnapshot() || (fiberGroup.isShouldStop() && - (fiberGroup.timeAfterRequestStop(TimeUnit.MILLISECONDS) > 300 - || raftStatus.getLastApplied() == raftStatus.getCommitIndex())); + return raftStatus.isInstallSnapshot() || fiberGroup.isShouldStop(); } public Fiber getApplyFiber() { diff --git a/server/src/main/java/com/github/dtprj/dongting/raft/impl/CommitManager.java b/server/src/main/java/com/github/dtprj/dongting/raft/impl/CommitManager.java index 6fb902343..07f5a2275 100644 --- a/server/src/main/java/com/github/dtprj/dongting/raft/impl/CommitManager.java +++ b/server/src/main/java/com/github/dtprj/dongting/raft/impl/CommitManager.java @@ -108,7 +108,7 @@ public void finish(long lastPersistIndex) { long newCommitIndex = Math.min(lastPersistIndex, raftStatus.getLeaderCommit()); if (newCommitIndex > raftStatus.getCommitIndex()) { raftStatus.setCommitIndex(newCommitIndex); - applyManager.apply(); + applyManager.wakeupApply(); } } } @@ -125,7 +125,7 @@ public void tryCommit(long recentMatchIndex) { return; } raftStatus.setCommitIndex(recentMatchIndex); - applyManager.apply(); + applyManager.wakeupApply(); } private static boolean needCommit(long recentMatchIndex, RaftStatusImpl raftStatus) { diff --git a/server/src/main/java/com/github/dtprj/dongting/raft/rpc/AppendProcessor.java b/server/src/main/java/com/github/dtprj/dongting/raft/rpc/AppendProcessor.java index 03ef19b1a..4160fc4c7 100644 --- a/server/src/main/java/com/github/dtprj/dongting/raft/rpc/AppendProcessor.java +++ b/server/src/main/java/com/github/dtprj/dongting/raft/rpc/AppendProcessor.java @@ -491,7 +491,7 @@ private FrameCallResult startInstall(RaftStatusImpl raftStatus) { } log.info("start install snapshot, groupId={}", groupId); raftStatus.setInstallSnapshot(true); - gc.getApplyManager().apply(); // signal apply fiber to exit + gc.getApplyManager().wakeupApply(); // wakeup apply fiber to exit gc.getStatusManager().persistAsync(true); return gc.getStatusManager().waitUpdateFinish(this::afterStatusPersist); } diff --git a/server/src/main/java/com/github/dtprj/dongting/raft/rpc/TransferLeaderProcessor.java b/server/src/main/java/com/github/dtprj/dongting/raft/rpc/TransferLeaderProcessor.java index 2e7225bfa..51d881ce4 100644 --- a/server/src/main/java/com/github/dtprj/dongting/raft/rpc/TransferLeaderProcessor.java +++ b/server/src/main/java/com/github/dtprj/dongting/raft/rpc/TransferLeaderProcessor.java @@ -71,7 +71,7 @@ protected FiberFrame processInFiberGroup(ReqInfoEx reqI throw new RaftException("transfer leader fail, logIndex check fail"); } raftStatus.setCommitIndex(req.logIndex); - gc.getApplyManager().apply(); + gc.getApplyManager().wakeupApply(); RaftUtil.changeToLeader(raftStatus); gc.getVoteManager().cancelVote("transfer leader");