Skip to content

Commit

Permalink
refactor: add apply fiber monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
areyouok committed Nov 18, 2024
1 parent 3f3798c commit c951599
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class ApplyManager implements Comparator<Pair<DtTime, CompletableFuture<L
private FiberCondition needApplyCond;
private boolean waitApply;
private FiberCondition applyFinishCond;
private FiberCondition applyMonitorCond;
private final LinkedList<RaftTask> heartBeatQueue = new LinkedList<>();

private long initCommitIndex;
Expand All @@ -80,6 +81,8 @@ public class ApplyManager implements Comparator<Pair<DtTime, CompletableFuture<L

private final PerfCallback perfCallback;

private Fiber applyFiber;

public ApplyManager(GroupComponents gc) {
this.ts = gc.getRaftStatus().getTs();
this.raftStatus = gc.getRaftStatus();
Expand All @@ -103,8 +106,19 @@ public void postInit() {
public void init(FiberGroup fiberGroup) {
this.needApplyCond = fiberGroup.newCondition("needApply");
this.applyFinishCond = fiberGroup.newCondition("applyFinish");
this.applyMonitorCond = fiberGroup.newCondition("applyMonitor");
this.initCommitIndex = raftStatus.getCommitIndex();
startApplyFiber(fiberGroup);
new Fiber("waitGroupReadyTimeout", fiberGroup, new WaitGroupReadyTimeoutFrame(), true).start();
new Fiber("applyFiberMonitor", fiberGroup, new FiberFrame<>() {
@Override
public FrameCallResult execute(Void input) {
if (applyFiber.isFinished() && !shouldStopApply()) {
startApplyFiber(fiberGroup);
}
return applyMonitorCond.await(1000, this);
}
}, true).start();
if (raftStatus.getLastApplied() >= raftStatus.getCommitIndex()) {
log.info("apply manager init complete");
raftStatus.getInitFuture().complete(null);
Expand All @@ -117,10 +131,8 @@ public void init(FiberGroup fiberGroup) {
}

private void startApplyFiber(FiberGroup fiberGroup) {
Fiber f = new Fiber("apply", fiberGroup, new ApplyFrame(), false, 50);
f.start();
Fiber f2 = new Fiber("waitGroupReadyTimeout", fiberGroup, new WaitGroupReadyTimeoutFrame(), true);
f2.start();
applyFiber = new Fiber("apply", fiberGroup, new ApplyFrame(), false, 50);
applyFiber.start();
}

public void apply() {
Expand Down Expand Up @@ -300,9 +312,17 @@ private void tryApplyHeartBeat(long appliedIndex) {
}

private boolean shouldStopApply() {
return fiberGroup.isShouldStop() && (raftStatus.isInstallSnapshot()
|| fiberGroup.timeAfterRequestStop(TimeUnit.MILLISECONDS) > 300
|| raftStatus.getLastApplied() == raftStatus.getCommitIndex());
return raftStatus.isInstallSnapshot() || (fiberGroup.isShouldStop() &&
(fiberGroup.timeAfterRequestStop(TimeUnit.MILLISECONDS) > 300
|| raftStatus.getLastApplied() == raftStatus.getCommitIndex()));
}

public Fiber getApplyFiber() {
return applyFiber;
}

public void signalStartApply() {
applyMonitorCond.signal();
}

private class ApplyFrame extends FiberFrame<Void> {
Expand All @@ -314,22 +334,12 @@ private class ApplyFrame extends FiberFrame<Void> {
@Override
protected FrameCallResult handle(Throwable ex) {
log.error("apply failed", ex);
if (!shouldStopApply()) {
return Fiber.sleepUntilShouldStop(1000, this::restartApplyFiber);
} else {
return Fiber.frameReturn();
}
}

private FrameCallResult restartApplyFiber(Void unused) {
if (!shouldStopApply()) {
startApplyFiber(getFiberGroup());
}
return Fiber.frameReturn();
}

@Override
protected FrameCallResult doFinally() {
log.info("apply fiber exit: groupId={}", raftStatus.getGroupId());
closeIterator();
return Fiber.frameReturn();
}
Expand All @@ -339,9 +349,6 @@ public FrameCallResult execute(Void input) {
if (shouldStopApply()) {
return Fiber.frameReturn();
}
if (raftStatus.isInstallSnapshot()) {
return Fiber.sleepUntilShouldStop(10, this);
}
execCount = 1;
return execLoop(null);
}
Expand Down Expand Up @@ -408,10 +415,6 @@ public FrameCallResult execute(Void input) {
if (shouldStopApply()) {
return Fiber.frameReturn();
}
if (raftStatus.isInstallSnapshot()) {
log.warn("install snapshot, ignore load result");
return Fiber.frameReturn();
}
if (items == null || items.isEmpty()) {
log.error("load log failed, items is null");
return Fiber.frameReturn();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ protected int getRemoteTerm() {
}

@Override
protected FrameCallResult process() throws Exception {
protected FrameCallResult process() {
RaftStatusImpl raftStatus = gc.getRaftStatus();
InstallSnapshotReq req = reqInfo.getReqFrame().getBody();
if (req.offset == 0 && req.members != null) {
Expand All @@ -477,11 +477,22 @@ protected FrameCallResult process() throws Exception {
return doInstall(raftStatus, req);
}

private FrameCallResult startInstall(RaftStatusImpl raftStatus) throws Exception {
private FrameCallResult startInstall(RaftStatusImpl raftStatus) {
if (RaftUtil.writeNotFinished(raftStatus)) {
return RaftUtil.waitWriteFinish(raftStatus, this);
}
return Fiber.call(gc.getRaftLog().beginInstall(), this::applyConfigChange);
raftStatus.setInstallSnapshot(true);
gc.getStatusManager().persistAsync(true);
return gc.getStatusManager().waitUpdateFinish(this::afterStatusPersist);
}

private FrameCallResult afterStatusPersist(Void v) throws Exception {
Fiber applyFiber = gc.getApplyManager().getApplyFiber();
if (applyFiber.isFinished()) {
return Fiber.call(gc.getRaftLog().beginInstall(), this::applyConfigChange);
} else {
return applyFiber.join(this::afterStatusPersist);
}
}

private FrameCallResult applyConfigChange(Void unused) {
Expand Down Expand Up @@ -525,7 +536,10 @@ private FrameCallResult finishInstall(InstallSnapshotReq req, RaftStatusImpl raf

FiberFrame<Void> finishFrame = gc.getRaftLog().finishInstall(
req.lastIncludedIndex + 1, req.nextWritePos);
return Fiber.call(finishFrame, v -> writeResp(null));
return Fiber.call(finishFrame, v -> {
gc.getApplyManager().signalStartApply();
return writeResp(null);
});
}

private FrameCallResult writeResp(Throwable ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,7 @@ public void markTruncateByTimestamp(long timestampBound, long delayMillis) {
public FiberFrame<Void> beginInstall() {
return new FiberFrame<>() {
@Override
public FrameCallResult execute(Void input) {
raftStatus.setInstallSnapshot(true);
statusManager.persistAsync(true);
return statusManager.waitUpdateFinish(this::afterPersist);
}

private FrameCallResult afterPersist(Void unused) {
public FrameCallResult execute(Void unused) {
return Fiber.call(logFiles.beginInstall(), this::afterLogBeginInstall);
}

Expand Down

0 comments on commit c951599

Please sign in to comment.