Skip to content

Commit

Permalink
chore: FileLogLoader
Browse files Browse the repository at this point in the history
areyouok committed Nov 19, 2024
1 parent c951599 commit a46df5c
Showing 1 changed file with 16 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -39,6 +39,7 @@
import com.github.dtprj.dongting.raft.sm.RaftCodecFactory;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.function.Supplier;
@@ -182,9 +183,19 @@ private FrameCallResult afterStartIndexPosLoad(Long startIndexPos) {

private FrameCallResult parseContent() {
while (true) {
int r = doParse(readBuffer);
int r;
int s = state;
if (s == STATE_ITEM_HEADER) {
r = processHeader(readBuffer);
} else if (s == STATE_BIZ_HEADER) {
r = extractBizHeader(readBuffer);
} else if (s == STATE_BIZ_BODY) {
r = extractBizBody(readBuffer);
} else {
throw new RaftException("error state:" + state);
}
if (r == RESULT_FINISH) {
setResult(result);
setResult(new ArrayList<>(result));
return Fiber.frameReturn();
} else if (r == RESULT_NEED_LOAD) {
return loadLogFromStore();
@@ -194,37 +205,22 @@ private FrameCallResult parseContent() {
}
}

private int doParse(ByteBuffer buf) {
int s = state;
if (s == STATE_ITEM_HEADER) {
return processHeader(buf);
} else if (s == STATE_BIZ_HEADER) {
return extractBizHeader(buf);
} else if (s == STATE_BIZ_BODY) {
return extractBizBody(buf);
} else {
throw new RaftException("error state:" + state);
}
}


private FrameCallResult loadLogFromStore() {
long pos = nextPos;
logFile = logFiles.getLogFile(pos);
if (logFile.isDeleted()) {
throw new RaftException("file " + logFile.getFile().getName() + " is deleted");
}
long rest = logFile.endPos - pos;
long fileStartPos = logFiles.filePos(pos);
ByteBuffer buf = readBuffer;
if (fileStartPos == 0 && buf.position() > 0) {
RaftException e = new RaftException("readBuffer not empty when load from file start position");
BugLog.log(e);
throw e;
}
int rest = (int)(logFile.endPos - pos);
if (rest < buf.remaining()) {
// not overflow
buf.limit((int) (buf.position() + rest));
buf.limit(buf.position() + rest);
}
bufferStartPos = pos - buf.position();
bufferEndPos = pos + buf.remaining();
@@ -256,7 +252,7 @@ private int processHeader(ByteBuffer buf) {
}
crc32c.reset();
state = STATE_BIZ_HEADER;
if (!result.isEmpty() && header.bodyLen + totalReadBytes >= bytesLimit) {
if (!result.isEmpty() && header.bodyLen + totalReadBytes > bytesLimit) {
buf.position(buf.position() - LogHeader.ITEM_HEADER_SIZE);
finishRead();
return RESULT_FINISH;

0 comments on commit a46df5c

Please sign in to comment.