Skip to content

Commit

Permalink
test: FileLogLoader
Browse files Browse the repository at this point in the history
areyouok committed Nov 19, 2024

Verified

This commit was signed with the committer’s verified signature.
areyouok huangli
1 parent a46df5c commit 82c23bf
Showing 2 changed files with 161 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -162,7 +162,7 @@ protected FrameCallResult doFinally() {
@Override
public FrameCallResult execute(Void input) {
if (nextIndex == -1) {
return Fiber.call(idxFiles.loadLogPos(startIndex), this::afterStartIndexPosLoad);
return Fiber.call(idxFiles.loadLogPos(startIndex), this::resumeAfterFirstPosLoad);
} else {
if (readBuffer.hasRemaining()) {
return parseContent();
@@ -174,7 +174,10 @@ public FrameCallResult execute(Void input) {
}
}

private FrameCallResult afterStartIndexPosLoad(Long startIndexPos) {
private FrameCallResult resumeAfterFirstPosLoad(Long startIndexPos) {
if (cancelIndicator != null && cancelIndicator.get()) {
throw new RaftCancelException("canceled");
}
nextPos = startIndexPos;
nextIndex = startIndex;
readBuffer.clear();
Original file line number Diff line number Diff line change
@@ -22,22 +22,28 @@
import com.github.dtprj.dongting.fiber.FiberFuture;
import com.github.dtprj.dongting.fiber.FrameCallResult;
import com.github.dtprj.dongting.raft.impl.InitFiberFrame;
import com.github.dtprj.dongting.raft.impl.RaftCancelException;
import com.github.dtprj.dongting.raft.impl.RaftStatusImpl;
import com.github.dtprj.dongting.raft.impl.RaftTask;
import com.github.dtprj.dongting.raft.impl.TailCache;
import com.github.dtprj.dongting.raft.server.ChecksumException;
import com.github.dtprj.dongting.raft.server.LogItem;
import com.github.dtprj.dongting.raft.server.RaftGroupConfigEx;
import com.github.dtprj.dongting.raft.server.RaftInput;
import com.github.dtprj.dongting.raft.server.RaftServerConfig;
import com.github.dtprj.dongting.raft.test.MockExecutors;
import com.github.dtprj.dongting.raft.test.TestUtil;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.*;

/**
* @author huangli
@@ -63,8 +69,8 @@ private void init() throws Exception {
config.setDataDir(dataDir);
config.setBlockIoExecutor(MockExecutors.ioExecutor());
config.setTs(raftStatus.getTs());

config.setRaftStatus(raftStatus);
raftStatus.setTailCache(new TailCache(config, raftStatus));
statusManager = new StatusManager(config);
doInFiber(new FiberFrame<>() {
@Override
@@ -251,4 +257,151 @@ private FrameCallResult resume3(Void unused) {
}
});
}

@Test
void testFileLogLoader() throws Exception {
// file 1
// 1: 256 bytes, no header
// 2: 256 bytes, no body
// 3: LogHeader.ITEM_HEADER_SIZE bytes, no header, no body
// 4: 512 - LogHeader.ITEM_HEADER_SIZE bytes, fill rest file
int[] totalSizes = new int[]{256, 256, LogHeader.ITEM_HEADER_SIZE, 512 - LogHeader.ITEM_HEADER_SIZE};
int[] bizHeaderLen = new int[]{0, 256 - LogHeader.ITEM_HEADER_SIZE - 4, 0, 100};
append(1, totalSizes, bizHeaderLen);
// file 2, started from 5, with (LogHeader.ITEM_HEADER_SIZE - 1) bytes not used
append(5, new int[]{150, 200, 250, 1024 - 600 - (LogHeader.ITEM_HEADER_SIZE - 1)}, new int[]{10, 20, 150, 100});
// file 3, started from 9, with end magic item
append(9, new int[]{600}, new int[]{300});
// file 4, started from 10, with end magic item just fill the file
append(10, new int[]{1024 - LogHeader.ITEM_HEADER_SIZE}, new int[]{300});
// file 5, started from 11, total 12 items
append(11, new int[]{100, 100}, new int[]{10, 10});

testLoader(12, () -> raftLog.openIterator(() -> false));
testLoader(12, () -> new FileLogLoader(raftLog.idxFiles, raftLog.logFiles, config,
null, () -> false, 99));
doInFiber(new FiberFrame<>() {
RaftLog.LogIterator it = raftLog.openIterator(() -> true);

@Override
public FrameCallResult execute(Void input) {
return Fiber.call(it.next(1, 1, 500000), this::afterNext);
}

private FrameCallResult afterNext(List<LogItem> logItems) {
Assertions.fail();
return Fiber.frameReturn();
}

@Override
protected FrameCallResult handle(Throwable ex) {
assertTrue(ex instanceof RaftCancelException);
return Fiber.frameReturn();
}
});
doInFiber(new FiberFrame<>() {
int count;
RaftLog.LogIterator it = raftLog.openIterator(() -> count++ >= 1);

@Override
public FrameCallResult execute(Void input) {
return Fiber.call(it.next(1, 1, 500000), this::afterNext);
}

private FrameCallResult afterNext(List<LogItem> logItems) {
Assertions.fail();
return Fiber.frameReturn();
}

@Override
protected FrameCallResult handle(Throwable ex) {
assertTrue(ex instanceof RaftCancelException);
return Fiber.frameReturn();
}
});

RaftInput input = new RaftInput(0, null, null, null, false);
raftStatus.getTailCache().put(3, new RaftTask(raftStatus.getTs(), 0, input ,null));
doInFiber(new FiberFrame<>() {
RaftLog.LogIterator it = raftLog.openIterator(() -> false);

@Override
public FrameCallResult execute(Void input) {
return Fiber.call(it.next(1, 1000, 500000), this::afterNext);
}

private FrameCallResult afterNext(List<LogItem> logItems) {
assertEquals(2, logItems.size());
return Fiber.frameReturn();
}
});
}

private void testLoader(int total, Supplier<RaftLog.LogIterator> creator) throws Exception {
doInFiber(new FiberFrame<>() {
RaftLog.LogIterator it = creator.get();

@Override
public FrameCallResult execute(Void input) {
return Fiber.call(it.next(1, total, 500000), this::afterNext);
}

private FrameCallResult afterNext(List<LogItem> logItems) throws Exception {
assertEquals(total, logItems.size());
return Fiber.call(it.next(total + 1, 1, 500000), this::afterNext);
}

@Override
protected FrameCallResult handle(Throwable ex) throws Exception {
assertTrue(ex instanceof ChecksumException);
it.close();
return Fiber.frameReturn();
}
});
doInFiber(new FiberFrame<>() {
RaftLog.LogIterator it = creator.get();
int index = 1;

@Override
public FrameCallResult execute(Void input) {
return Fiber.call(it.next(index, 2, 500000), this::afterNext);
}

private FrameCallResult afterNext(List<LogItem> logItems) throws Exception {
assertEquals(2, logItems.size());
index += 2;
if (index <= total) {
return Fiber.resume(null, this);
} else {
it.close();
return Fiber.frameReturn();
}
}
});
doInFiber(new FiberFrame<>() {
RaftLog.LogIterator it = creator.get();
int index = 1;

@Override
public FrameCallResult execute(Void input) {
int limit = total - index + 1;
return Fiber.call(it.next(index, limit, 300), this::afterNext);
}

private FrameCallResult afterNext(List<LogItem> logItems) throws Exception {
index += logItems.size();
if (index <= total) {
return Fiber.resume(null, this);
} else {
it.close();
return Fiber.frameReturn();
}
}

@Override
protected FrameCallResult handle(Throwable ex) throws Throwable {
return super.handle(ex);
}
});
}
}

0 comments on commit 82c23bf

Please sign in to comment.