Skip to content

Commit

Permalink
test: add DefaultSnapshotManagerTest
Browse files Browse the repository at this point in the history
  • Loading branch information
areyouok committed Nov 14, 2024
1 parent 6178b0b commit f1e131b
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ public FrameCallResult execute(Void input) throws Throwable {
throw firstEx;
} else {
if (readFinish) {
log.info("snapshot read finished");
log.info("snapshot read finished: {}", snapshot.getClass().getSimpleName());
return Fiber.frameReturn();
} else {
log.info("snapshot read canceled");
log.info("snapshot read canceled: {}", snapshot.getClass().getSimpleName());
throw new RaftCancelException("snapshot read canceled");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public class DefaultSnapshotManager implements SnapshotManager {

private static final DtLog log = DtLogs.getLogger(DefaultSnapshotManager.class);

private static final int KEEP = 2;
static final int KEEP = 2;
public static final String SNAPSHOT_DIR = "snapshot";

private static final String DATA_SUFFIX = ".data";
private static final String IDX_SUFFIX = ".idx";
Expand Down Expand Up @@ -110,7 +111,7 @@ private class InitFrame extends FiberFrame<Snapshot> {
@Override
public FrameCallResult execute(Void input) {
File dataDir = FileUtil.ensureDir(groupConfig.getDataDir());
snapshotDir = FileUtil.ensureDir(dataDir, "snapshot");
snapshotDir = FileUtil.ensureDir(dataDir, SNAPSHOT_DIR);
File[] files = snapshotDir.listFiles(f -> f.isFile() && f.getName().endsWith(IDX_SUFFIX));
if (files == null || files.length == 0) {
setResult(null);
Expand All @@ -119,12 +120,14 @@ public FrameCallResult execute(Void input) {
Arrays.sort(files);
for (int i = files.length - 1; i >= 0; i--) {
File f = files[i];
String baseName = FileUtil.baseName(f);
File dataFile = new File(snapshotDir, baseName + DATA_SUFFIX);
if (f.length() == 0) {
log.warn("empty status file: {}", f.getPath());
deleteInIoExecutor(f);
deleteInIoExecutor(dataFile);
continue;
}
String baseName = FileUtil.baseName(f);
File dataFile = new File(snapshotDir, baseName + DATA_SUFFIX);
if (dataFile.exists()) {
snapshotFiles.addFirst(new Pair<>(f, dataFile));
} else {
Expand Down Expand Up @@ -190,9 +193,16 @@ public void startFiber() {
f.start();
}

@Override
public void stopFiber() {
saveLoopFrame.stopLoop = true;
saveLoopFrame.saveSnapshotCond.signal();
}

class SaveSnapshotLoopFrame extends FiberFrame<Void> {

final FiberCondition saveSnapshotCond;
private boolean stopLoop;

@Override
protected FrameCallResult handle(Throwable ex) throws Throwable {
Expand All @@ -205,6 +215,9 @@ protected FrameCallResult handle(Throwable ex) throws Throwable {

@Override
public FrameCallResult execute(Void input) throws Throwable {
if (stopLoop) {
return Fiber.frameReturn();
}
deleteOldFiles();
if (saveRequest.isEmpty()) {
return saveSnapshotCond.await(groupConfig.getSaveSnapshotMillis(), this::doSave);
Expand All @@ -214,6 +227,9 @@ public FrameCallResult execute(Void input) throws Throwable {
}

private FrameCallResult doSave(Void unused) {
if (stopLoop) {
return Fiber.frameReturn();
}
SaveFrame f = new SaveFrame(nextId++);
return Fiber.call(f, this::afterSave);
}
Expand Down Expand Up @@ -386,10 +402,10 @@ private FrameCallResult writeIdxFile(Void v) {
log.info("snapshot {} data file write success: {}", id, newDataFile.getFile().getPath());

statusFile = new StatusFile(newIdxFile, groupConfig);
return Fiber.call(statusFile.init(), this::afterStatusFileInit);
return Fiber.call(statusFile.init(), this::saveIdxFile);
}

private FrameCallResult afterStatusFileInit(Void unused) {
private FrameCallResult saveIdxFile(Void unused) {
SnapshotInfo si = readSnapshot.getSnapshotInfo();
Map<String, String> p = statusFile.getProperties();
p.put(KEY_LAST_INDEX, String.valueOf(si.getLastIncludedIndex()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,7 @@ public interface SnapshotManager {

void startFiber();

void stopFiber();

FiberFuture<Long> saveSnapshot();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Copyright The Dongting Project
*
* The Dongting Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.github.dtprj.dongting.raft.sm;

import com.github.dtprj.dongting.common.ByteArray;
import com.github.dtprj.dongting.common.DtTime;
import com.github.dtprj.dongting.common.Pair;
import com.github.dtprj.dongting.dtkv.KvCodes;
import com.github.dtprj.dongting.dtkv.KvResult;
import com.github.dtprj.dongting.dtkv.server.DtKV;
import com.github.dtprj.dongting.dtkv.server.KvConfig;
import com.github.dtprj.dongting.fiber.BaseFiberTest;
import com.github.dtprj.dongting.fiber.Fiber;
import com.github.dtprj.dongting.fiber.FiberFrame;
import com.github.dtprj.dongting.fiber.FiberFuture;
import com.github.dtprj.dongting.fiber.FrameCallResult;
import com.github.dtprj.dongting.raft.impl.RaftStatusImpl;
import com.github.dtprj.dongting.raft.server.RaftGroupConfigEx;
import com.github.dtprj.dongting.raft.server.RaftInput;
import com.github.dtprj.dongting.raft.store.TestDir;
import com.github.dtprj.dongting.raft.test.MockExecutors;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.*;

/**
* @author huangli
*/
public class DefaultSnapshotManagerTest extends BaseFiberTest {
private DefaultSnapshotManager m;
private DtKV kv;
private RaftStatusImpl raftStatus;
private String dataDir;

@BeforeEach
void setUp() throws Exception {
dataDir = TestDir.createTestDir(DefaultSnapshotManager.class.getSimpleName()).getAbsolutePath();
createManager();
doInFiber(() -> {
kv.start();
m.startFiber();
});
}

private void createManager() {
raftStatus = new RaftStatusImpl(dispatcher.getTs());
raftStatus.setNodeIdOfMembers(Set.of(1));
raftStatus.setNodeIdOfObservers(Set.of());
raftStatus.setNodeIdOfPreparedMembers(Set.of());
raftStatus.setNodeIdOfPreparedObservers(Set.of());
raftStatus.setLastAppliedTerm(1);
RaftGroupConfigEx groupConfig = new RaftGroupConfigEx(0, "1", "");
groupConfig.setFiberGroup(fiberGroup);
groupConfig.setRaftStatus(raftStatus);
groupConfig.setTs(dispatcher.getTs());
groupConfig.setDataDir(dataDir);
groupConfig.setBlockIoExecutor(MockExecutors.ioExecutor());
KvConfig kvConfig = new KvConfig();
kvConfig.setUseSeparateExecutor(true);
kvConfig.setInitMapCapacity(16);
kv = new DtKV(groupConfig, kvConfig);
m = new DefaultSnapshotManager(groupConfig, kv);
}

@AfterEach
void tearDown() throws Exception {
doInFiber(() -> {
kv.stop(new DtTime(1, TimeUnit.SECONDS));
m.stopFiber();
});
}

@Test
void test() throws Exception {
doInFiber(new FiberFrame<>() {
private long index = 1;
private static final int LOOP = 10;

@Override
public FrameCallResult execute(Void input) {
return Fiber.call(m.init(), this::afterInit);
}

private FrameCallResult afterInit(Snapshot snapshot) {
assertNull(snapshot);
return beforePut(null);
}

private FrameCallResult beforePut(Void v) {
if (index > LOOP) {
return afterLoop();
}
ByteArray key = new ByteArray(("key" + index).getBytes());
ByteArray value = new ByteArray(("value" + index).getBytes());
RaftInput i = new RaftInput(DtKV.BIZ_TYPE_PUT, key, value,
new DtTime(1, TimeUnit.SECONDS), false);
FiberFuture<Object> f = kv.exec(index++, i);
return f.await(this::afterPut);
}

private FrameCallResult afterPut(Object result) {
assertEquals(KvCodes.CODE_SUCCESS, ((KvResult) result).getBizCode());
raftStatus.setLastApplied(index - 1);
if ((index - 1) % 2 == 0) {
FiberFuture<Long> f = m.saveSnapshot();
return f.await(this::afterSave);
} else {
return Fiber.resume(null, this::beforePut);
}
}

private FrameCallResult afterSave(Long idx) {
assertEquals(index - 1, idx);
return Fiber.resume(null, this::beforePut);
}

private FrameCallResult afterLoop() {
kv.stop(new DtTime(1, TimeUnit.SECONDS));
m.stopFiber();

createManager();
kv.start();
m.startFiber();
return Fiber.call(m.init(), this::afterInit2);
}

private FrameCallResult afterInit2(Snapshot snapshot) {
assertNotNull(snapshot);
FiberFrame<Pair<Integer, Long>> f = m.recover(snapshot);
return Fiber.call(f, this::afterRecover);
}

private FrameCallResult afterRecover(Pair<Integer, Long> p) {
assertEquals(1, p.getLeft());
assertEquals(LOOP, p.getRight().longValue());
for (index = 1; index <= LOOP; index++) {
ByteArray key = new ByteArray(("key" + index).getBytes());
KvResult r = kv.get(key);
assertEquals(KvCodes.CODE_SUCCESS, r.getBizCode());
assertEquals("value" + index, new String(r.getNode().getData()));
}

File dir = new File(dataDir);
dir = new File(dir, DefaultSnapshotManager.SNAPSHOT_DIR);
File[] files = dir.listFiles();
assertEquals(DefaultSnapshotManager.KEEP * 2, files == null ? 0 : files.length);

return Fiber.frameReturn();
}

});
}

}

0 comments on commit f1e131b

Please sign in to comment.