Skip to content

Commit

Permalink
feat: add saveSnapshotWhenClose
Browse files Browse the repository at this point in the history
  • Loading branch information
areyouok committed Oct 29, 2024
1 parent 543bb4e commit 6931d77
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,7 @@ protected void afterWarmup() {
}

@Override
public void shutdown() throws Exception {
//noinspection unchecked
CompletableFuture<Long>[] saveFutures = new CompletableFuture[NODE_COUNT];
for (int i = 0; i < raftServers.size(); i++) {
saveFutures[i] = raftServers.get(i).getRaftGroup(GROUP_ID).fireSaveSnapshot();
}
CompletableFuture.allOf(saveFutures).get(10, TimeUnit.SECONDS);
public void shutdown() {
DtTime timeout = new DtTime(10, TimeUnit.SECONDS);
DtUtil.stop(timeout, clients);
DtUtil.stop(timeout, raftServers.toArray(new RaftServer[0]));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.github.dtprj.dongting.common.FlowControlException;
import com.github.dtprj.dongting.common.Timestamp;
import com.github.dtprj.dongting.fiber.FiberFrame;
import com.github.dtprj.dongting.fiber.FiberFuture;
import com.github.dtprj.dongting.fiber.FiberGroup;
import com.github.dtprj.dongting.log.DtLog;
import com.github.dtprj.dongting.log.DtLogs;
Expand Down Expand Up @@ -72,7 +73,7 @@ public int getGroupId() {
@Override
public boolean isLeader() {
RaftMember leader = raftStatus.getShareStatus().currentLeader;
return leader !=null && leader.getNode().isSelf();
return leader != null && leader.getNode().isSelf();
}

@Override
Expand Down Expand Up @@ -170,7 +171,20 @@ public void markTruncateByTimestamp(long timestampMillis, long delayMillis) {
public CompletableFuture<Long> fireSaveSnapshot() {
checkStatus();
CompletableFuture<Long> f = new CompletableFuture<>();
gc.getFiberGroup().getExecutor().execute(() -> gc.getSnapshotManager().fireSaveSnapshot(f));
gc.getFiberGroup().getExecutor().execute(() -> {
try {
FiberFuture<Long> ff = gc.getSnapshotManager().saveSnapshot();
ff.registerCallback((idx, ex) -> {
if (ex != null) {
f.completeExceptionally(ex);
} else {
f.complete(idx);
}
});
} catch (Exception e) {
f.completeExceptionally(e);
}
});
return f;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
/**
* @author huangli
*/
@SuppressWarnings("unused")
public class RaftGroupConfig {
private final int groupId;
private final String nodeIdOfMembers;
Expand Down Expand Up @@ -53,6 +54,8 @@ public class RaftGroupConfig {
private int replicateSnapshotConcurrency = 4;
private int replicateSnapshotBufferSize = 64 * 1024;

private boolean saveSnapshotWhenClose = true;

RaftGroupConfig(int groupId, String nodeIdOfMembers, String nodeIdOfObservers) {
this.groupId = groupId;
this.nodeIdOfMembers = nodeIdOfMembers;
Expand Down Expand Up @@ -234,4 +237,12 @@ public int getSnapshotConcurrency() {
public void setSnapshotConcurrency(int snapshotConcurrency) {
this.snapshotConcurrency = snapshotConcurrency;
}

public boolean isSaveSnapshotWhenClose() {
return saveSnapshotWhenClose;
}

public void setSaveSnapshotWhenClose(boolean saveSnapshotWhenClose) {
this.saveSnapshotWhenClose = saveSnapshotWhenClose;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.github.dtprj.dongting.fiber.Fiber;
import com.github.dtprj.dongting.fiber.FiberChannel;
import com.github.dtprj.dongting.fiber.FiberFrame;
import com.github.dtprj.dongting.fiber.FiberFuture;
import com.github.dtprj.dongting.fiber.FiberGroup;
import com.github.dtprj.dongting.fiber.FrameCallResult;
import com.github.dtprj.dongting.log.DtLog;
Expand Down Expand Up @@ -462,7 +463,8 @@ protected void doStop(DtTime timeout, boolean force) {
serviceNioServer.stop(timeout);
}
ArrayList<CompletableFuture<Void>> futures = new ArrayList<>();
raftGroups.forEach((groupId, g) -> futures.add(stopGroup(g, timeout)));
raftGroups.forEach((groupId, g) -> futures.add(stopGroup(g, timeout,
g.getGroupComponents().getGroupConfig().isSaveSnapshotWhenClose())));
nodeManager.stop(timeout);

try {
Expand All @@ -487,16 +489,26 @@ protected void doStop(DtTime timeout, boolean force) {
}
}

private CompletableFuture<Void> stopGroup(RaftGroupImpl g, DtTime timeout) {
private CompletableFuture<Void> stopGroup(RaftGroupImpl g, DtTime timeout, boolean saveSnapshot) {
FiberGroup fiberGroup = g.getFiberGroup();
if (fiberGroup.isShouldStop()) {
return g.getShutdownFuture();
}
fiberGroup.requestShutdown();
GroupComponents gc = g.getGroupComponents();
fiberGroup.fireFiber("shutdown" + g.getGroupId(), new FiberFrame<>() {
@Override
public FrameCallResult execute(Void input) {
GroupComponents gc = g.getGroupComponents();
FiberFuture<Long> f;
if (saveSnapshot) {
f = gc.getSnapshotManager().saveSnapshot();
}else{
f = FiberFuture.completedFuture(getFiberGroup(), 0L);
}
return f.await(this::afterSaveSnapshot);
}

private FrameCallResult afterSaveSnapshot(Long notUsed) {
gc.getApplyManager().shutdown(timeout);
return gc.getRaftLog().close().await(this::afterRaftLogClose);
}
Expand Down Expand Up @@ -623,15 +635,16 @@ public CompletableFuture<Void> addGroup(RaftGroupConfig groupConfig, long acquir
* ADMIN API. This method is idempotent.
*/
@SuppressWarnings("unused")
public CompletableFuture<Void> removeGroup(int groupId, long acquireLockTimeoutMillis, DtTime shutdownTimeout) {
public CompletableFuture<Void> removeGroup(int groupId, boolean saveSnapshot, long acquireLockTimeoutMillis,
DtTime shutdownTimeout) {
return doChange(acquireLockTimeoutMillis, () -> {
try {
RaftGroupImpl g = raftGroups.get(groupId);
if (g == null) {
log.warn("removeGroup failed: group not exist, groupId={}", groupId);
return CompletableFuture.failedFuture(new RaftException("group not exist: " + groupId));
}
return stopGroup(g, shutdownTimeout);
return stopGroup(g, shutdownTimeout, saveSnapshot);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import java.util.zip.CRC32C;
Expand Down Expand Up @@ -89,7 +88,7 @@ public class DefaultSnapshotManager implements SnapshotManager {
private File snapshotDir;

private final LinkedList<Pair<File, File>> snapshotFiles = new LinkedList<>();
private final LinkedList<Pair<Long, CompletableFuture<Long>>> saveRequest = new LinkedList<>();
private final LinkedList<Pair<Long, FiberFuture<Long>>> saveRequest = new LinkedList<>();

public DefaultSnapshotManager(RaftGroupConfigEx groupConfig, StateMachine stateMachine) {
this.groupConfig = groupConfig;
Expand Down Expand Up @@ -234,9 +233,11 @@ private void deleteOldFiles() {
}

@Override
public void fireSaveSnapshot(CompletableFuture<Long> f) {
public FiberFuture<Long> saveSnapshot() {
FiberFuture<Long> f = groupConfig.getFiberGroup().newFuture("saveSnapshot-" + groupConfig.getGroupId());
saveRequest.addLast(new Pair<>(raftStatus.getLastApplied(), f));
saveLoopFrame.saveSnapshotCond.signal();
return f;
}

private class SaveFrame extends FiberFrame<Void> {
Expand Down Expand Up @@ -358,15 +359,10 @@ private FiberFuture<Void> writeCallback(RefBuffer rb, Integer readBytes) {
}

private boolean checkCancel() {
if (isGroupShouldStopPlain()) {
if (cancel) {
log.info("snapshot save task is cancelled");
cancel = true;
}
return true;
}
// do not check isGroupShouldStopPlain() here

if (raftStatus.isInstallSnapshot()) {
if (cancel) {
if (!cancel) {
log.warn("install snapshot, cancel save snapshot task");
cancel = true;
}
Expand Down Expand Up @@ -423,7 +419,7 @@ private FrameCallResult finish2(Void unused) {

private void complete(Throwable ex) {
long raftIndex = snapshotInfo.getLastIncludedIndex();
Pair<Long, CompletableFuture<Long>> req;
Pair<Long, FiberFuture<Long>> req;
while ((req = saveRequest.peek()) != null) {
if (req.getLeft() <= raftIndex) {
if (ex == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

import com.github.dtprj.dongting.common.Pair;
import com.github.dtprj.dongting.fiber.FiberFrame;

import java.util.concurrent.CompletableFuture;
import com.github.dtprj.dongting.fiber.FiberFuture;

/**
* @author huangli
Expand All @@ -31,5 +30,5 @@ public interface SnapshotManager {

void startFiber();

void fireSaveSnapshot(CompletableFuture<Long> f);
FiberFuture<Long> saveSnapshot();
}

0 comments on commit 6931d77

Please sign in to comment.