Skip to content

Commit

Permalink
chore: make apply fiber daemon
Browse files Browse the repository at this point in the history
  • Loading branch information
areyouok committed Dec 10, 2024
1 parent f548790 commit 2ac740b
Show file tree
Hide file tree
Showing 5 changed files with 7 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ public class FiberGroup {
@SuppressWarnings("FieldMayBeFinal")
private volatile boolean shouldStop = false;
private final static VarHandle SHOULD_STOP;
private long markStopNanos;

final FiberChannel<Runnable> sysChannel;

Expand Down Expand Up @@ -113,7 +112,6 @@ public FrameCallResult execute(Void input) {
return Fiber.frameReturn();
}
log.info("request shutdown group: {}", name);
markStopNanos = dispatcher.ts.getNanoTime();
SHOULD_STOP.setVolatile(FiberGroup.this, true);
shouldStopCondition.signalAll();
return Fiber.frameReturn();
Expand Down Expand Up @@ -359,17 +357,6 @@ public ExecutorService getExecutor() {
return executor;
}

/**
* should call in dispatcher thread.
*/
public long timeAfterRequestStop(TimeUnit unit) {
checkGroup();
if (!isShouldStopPlain()) {
return -1;
}
return unit.convert(dispatcher.ts.getNanoTime() - markStopNanos, TimeUnit.NANOSECONDS);
}

public FiberCondition getShouldStopCondition() {
return shouldStopCondition;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,11 @@ public FrameCallResult execute(Void input) {
}

private void startApplyFiber(FiberGroup fiberGroup) {
applyFiber = new Fiber("apply", fiberGroup, new ApplyFrame(), false, 50);
applyFiber = new Fiber("apply", fiberGroup, new ApplyFrame(), true, 50);
applyFiber.start();
}

public void apply() {
public void wakeupApply() {
needApplyCond.signal();
}

Expand Down Expand Up @@ -311,9 +311,7 @@ private void tryApplyHeartBeat(long appliedIndex) {
}

private boolean shouldStopApply() {
return raftStatus.isInstallSnapshot() || (fiberGroup.isShouldStop() &&
(fiberGroup.timeAfterRequestStop(TimeUnit.MILLISECONDS) > 300
|| raftStatus.getLastApplied() == raftStatus.getCommitIndex()));
return raftStatus.isInstallSnapshot() || fiberGroup.isShouldStop();
}

public Fiber getApplyFiber() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void finish(long lastPersistIndex) {
long newCommitIndex = Math.min(lastPersistIndex, raftStatus.getLeaderCommit());
if (newCommitIndex > raftStatus.getCommitIndex()) {
raftStatus.setCommitIndex(newCommitIndex);
applyManager.apply();
applyManager.wakeupApply();
}
}
}
Expand All @@ -125,7 +125,7 @@ public void tryCommit(long recentMatchIndex) {
return;
}
raftStatus.setCommitIndex(recentMatchIndex);
applyManager.apply();
applyManager.wakeupApply();
}

private static boolean needCommit(long recentMatchIndex, RaftStatusImpl raftStatus) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ private FrameCallResult startInstall(RaftStatusImpl raftStatus) {
}
log.info("start install snapshot, groupId={}", groupId);
raftStatus.setInstallSnapshot(true);
gc.getApplyManager().apply(); // signal apply fiber to exit
gc.getApplyManager().wakeupApply(); // wakeup apply fiber to exit
gc.getStatusManager().persistAsync(true);
return gc.getStatusManager().waitUpdateFinish(this::afterStatusPersist);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ protected FiberFrame<Void> processInFiberGroup(ReqInfoEx<TransferLeaderReq> reqI
throw new RaftException("transfer leader fail, logIndex check fail");
}
raftStatus.setCommitIndex(req.logIndex);
gc.getApplyManager().apply();
gc.getApplyManager().wakeupApply();

RaftUtil.changeToLeader(raftStatus);
gc.getVoteManager().cancelVote("transfer leader");
Expand Down

0 comments on commit 2ac740b

Please sign in to comment.