Skip to content

Commit

Permalink
Improve DefaultEntryLogger read performance. (#4038)
Browse files Browse the repository at this point in the history
* Avoid system call to improve read performance.

* Fix ci.

* Add comments for getCurrentWritingLogId

* Fix ci.

* Consider compacting log.

* Fix checkstyle.

* Address the comment.

* Address comment.

* Address the comments.

* Add tests.

* Fix checkstyle.

* address the comments.

* Fix concurrency problem.
  • Loading branch information
horizonzy authored Feb 12, 2024
1 parent bb9da9b commit f5e4a98
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
/**
* A Buffered channel without a write buffer. Only reads are buffered.
*/
public class BufferedReadChannel extends BufferedChannelBase {
public class BufferedReadChannel extends BufferedChannelBase {

// The capacity of the read buffer.
protected final int readCapacity;
Expand All @@ -43,9 +43,16 @@ public class BufferedReadChannel extends BufferedChannelBase {

long invocationCount = 0;
long cacheHitCount = 0;
private volatile long fileSize = -1;
final boolean sealed;

public BufferedReadChannel(FileChannel fileChannel, int readCapacity) {
this(fileChannel, readCapacity, false);
}

public BufferedReadChannel(FileChannel fileChannel, int readCapacity, boolean sealed) {
super(fileChannel);
this.sealed = sealed;
this.readCapacity = readCapacity;
this.readBuffer = Unpooled.buffer(readCapacity);
}
Expand All @@ -64,10 +71,26 @@ public int read(ByteBuf dest, long pos) throws IOException {
return read(dest, pos, dest.writableBytes());
}

@Override
public long size() throws IOException {
if (sealed) {
if (fileSize == -1) {
synchronized (this) {
if (fileSize == -1) {
fileSize = validateAndGetFileChannel().size();
}
}
}
return fileSize;
} else {
return validateAndGetFileChannel().size();
}
}

public synchronized int read(ByteBuf dest, long pos, int length) throws IOException {
invocationCount++;
long currentPosition = pos;
long eof = validateAndGetFileChannel().size();
long eof = size();
// return -1 if the given position is greater than or equal to the file's current size.
if (pos >= eof) {
return -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,10 @@ private long readLastLogId(File f) {
}
}

void clearCompactingLogId() {
entryLoggerAllocator.clearCompactingLogId();
}

/**
* Flushes all rotated log channels. After log channels are flushed,
* move leastUnflushedLogId ptr to current logId.
Expand Down Expand Up @@ -894,7 +898,8 @@ private Header getHeaderForLogId(long entryLogId) throws IOException {
}
}

private BufferedReadChannel getChannelForLogId(long entryLogId) throws IOException {
@VisibleForTesting
BufferedReadChannel getChannelForLogId(long entryLogId) throws IOException {
BufferedReadChannel fc = getFromChannels(entryLogId);
if (fc != null) {
return fc;
Expand All @@ -910,7 +915,11 @@ private BufferedReadChannel getChannelForLogId(long entryLogId) throws IOExcepti
}
// We set the position of the write buffer of this buffered channel to Long.MAX_VALUE
// so that there are no overlaps with the write buffer while reading
fc = new BufferedReadChannel(newFc, conf.getReadBufferBytes());
if (entryLogManager instanceof EntryLogManagerForSingleEntryLog) {
fc = new BufferedReadChannel(newFc, conf.getReadBufferBytes(), entryLoggerAllocator.isSealed(entryLogId));
} else {
fc = new BufferedReadChannel(newFc, conf.getReadBufferBytes(), false);
}
putInReadChannels(entryLogId, fc);
return fc;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,17 @@ void createNewLog(long ledgerId, String reason) throws IOException {
logChannel.appendLedgersMap();

BufferedLogChannel newLogChannel = entryLoggerAllocator.createNewLog(selectDirForNextEntryLog());
entryLoggerAllocator.setWritingLogId(newLogChannel.getLogId());
setCurrentLogForLedgerAndAddToRotate(ledgerId, newLogChannel);
log.info("Flushing entry logger {} back to filesystem, pending for syncing entry loggers : {}.",
logChannel.getLogId(), rotatedLogChannels);
for (EntryLogListener listener : listeners) {
listener.onRotateEntryLog();
}
} else {
setCurrentLogForLedgerAndAddToRotate(ledgerId,
entryLoggerAllocator.createNewLog(selectDirForNextEntryLog()));
BufferedLogChannel newLogChannel = entryLoggerAllocator.createNewLog(selectDirForNextEntryLog());
entryLoggerAllocator.setWritingLogId(newLogChannel.getLogId());
setCurrentLogForLedgerAndAddToRotate(ledgerId, newLogChannel);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ public void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IO

@Override
public DefaultEntryLogger.BufferedLogChannel createNewLogForCompaction() throws IOException {
return entryLoggerAllocator.createNewLogForCompaction(selectDirForNextEntryLog());
BufferedLogChannel newLogForCompaction = entryLoggerAllocator.createNewLogForCompaction(
selectDirForNextEntryLog());
entryLoggerAllocator.setWritingCompactingLogId(newLogForCompaction.getLogId());
return newLogForCompaction;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ class EntryLoggerAllocator {
private final boolean entryLogPreAllocationEnabled;
private final ByteBufAllocator byteBufAllocator;
final ByteBuf logfileHeader = Unpooled.buffer(DefaultEntryLogger.LOGFILE_HEADER_SIZE);
private volatile long writingLogId = -1;
private volatile long writingCompactingLogId = -1;

EntryLoggerAllocator(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager,
DefaultEntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus, long logId,
Expand Down Expand Up @@ -91,16 +93,19 @@ synchronized long getPreallocatedLogId() {
return preallocatedLogId;
}

public boolean isSealed(long logId) {
return logId != writingLogId && logId != writingCompactingLogId;
}

BufferedLogChannel createNewLog(File dirForNextEntryLog) throws IOException {
synchronized (createEntryLogLock) {
BufferedLogChannel bc;
if (!entryLogPreAllocationEnabled){
if (!entryLogPreAllocationEnabled) {
// create a new log directly
bc = allocateNewLog(dirForNextEntryLog);
return bc;
return allocateNewLog(dirForNextEntryLog);
} else {
// allocate directly to response request
if (null == preallocation){
if (null == preallocation) {
bc = allocateNewLog(dirForNextEntryLog);
} else {
// has a preallocated entry log
Expand All @@ -116,7 +121,7 @@ BufferedLogChannel createNewLog(File dirForNextEntryLog) throws IOException {
throw new IOException("Task to allocate a new entry log is cancelled.", ce);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new IOException("Intrrupted when waiting a new entry log to be allocated.", ie);
throw new IOException("Interrupted when waiting a new entry log to be allocated.", ie);
}
}
// preallocate a new log in background upon every call
Expand All @@ -132,6 +137,18 @@ BufferedLogChannel createNewLogForCompaction(File dirForNextEntryLog) throws IOE
}
}

void setWritingLogId(long lodId) {
this.writingLogId = lodId;
}

void setWritingCompactingLogId(long logId) {
this.writingCompactingLogId = logId;
}

void clearCompactingLogId() {
writingCompactingLogId = -1;
}

private synchronized BufferedLogChannel allocateNewLog(File dirForNextEntryLog) throws IOException {
return allocateNewLog(dirForNextEntryLog, ".log");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ boolean complete() {
LOG.info("No valid entry is found in entry log after scan, removing entry log now.");
logRemovalListener.removeEntryLog(metadata.getEntryLogId());
compactionLog.abort();
compactingLogWriteDone();
return false;
}
return true;
Expand All @@ -209,6 +210,13 @@ void abort() {
offsets.clear();
// since we haven't flushed yet, we only need to delete the unflushed compaction file.
compactionLog.abort();
compactingLogWriteDone();
}
}

private void compactingLogWriteDone() {
if (entryLogger instanceof DefaultEntryLogger) {
((DefaultEntryLogger) entryLogger).clearCompactingLogId();
}
}

Expand Down Expand Up @@ -241,6 +249,8 @@ boolean complete() throws IOException {
} catch (IOException ioe) {
LOG.warn("Error marking compaction as done", ioe);
return false;
} finally {
compactingLogWriteDone();
}
}

Expand All @@ -249,6 +259,7 @@ void abort() {
offsets.clear();
// remove compaction log file and its hardlink
compactionLog.abort();
compactingLogWriteDone();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.bookkeeper.common.testing.annotations.FlakyTest;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.test.TestStatsProvider;
import org.apache.bookkeeper.util.DiskChecker;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap;
Expand Down Expand Up @@ -154,6 +155,53 @@ public void testDeferCreateNewLog() throws Exception {
assertEquals(0L, entryLogManager.getCurrentLogId());
}

@Test
public void testEntryLogIsSealedWithPerLedgerDisabled() throws Exception {
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
conf.setEntryLogPerLedgerEnabled(false);
conf.setEntryLogFilePreAllocationEnabled(true);

TestStatsProvider statsProvider = new TestStatsProvider();
TestStatsProvider.TestStatsLogger statsLogger =
statsProvider.getStatsLogger(BookKeeperServerStats.ENTRYLOGGER_SCOPE);
DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, dirsMgr, null, statsLogger,
UnpooledByteBufAllocator.DEFAULT);
EntryLogManagerBase entrylogManager = (EntryLogManagerBase) entryLogger.getEntryLogManager();
entrylogManager.createNewLog(0);
BufferedReadChannel channel = entryLogger.getChannelForLogId(0);
assertFalse(channel.sealed);
entrylogManager.createNewLog(1);
channel = entryLogger.getChannelForLogId(0);
assertFalse(channel.sealed);
entrylogManager.createNewLog(2);
channel = entryLogger.getChannelForLogId(1);
assertTrue(channel.sealed);
}

@Test
public void testEntryLogIsSealedWithPerLedgerEnabled() throws Exception {
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
//If entryLogPerLedgerEnabled is true, the buffer channel sealed flag always false.
conf.setEntryLogPerLedgerEnabled(true);
conf.setEntryLogFilePreAllocationEnabled(true);

TestStatsProvider statsProvider = new TestStatsProvider();
TestStatsProvider.TestStatsLogger statsLogger =
statsProvider.getStatsLogger(BookKeeperServerStats.ENTRYLOGGER_SCOPE);
DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, dirsMgr, null, statsLogger,
UnpooledByteBufAllocator.DEFAULT);
EntryLogManagerBase entrylogManager = (EntryLogManagerBase) entryLogger.getEntryLogManager();
entrylogManager.createNewLog(0);
BufferedReadChannel channel = entryLogger.getChannelForLogId(0);
assertFalse(channel.sealed);
entrylogManager.createNewLog(1);
channel = entryLogger.getChannelForLogId(0);
assertFalse(channel.sealed);
entrylogManager.createNewLog(2);
channel = entryLogger.getChannelForLogId(1);
assertFalse(channel.sealed);
}

@Test
public void testDeferCreateNewLogWithoutEnoughDiskSpaces() throws Exception {
entryLogger.close();
Expand Down

0 comments on commit f5e4a98

Please sign in to comment.