Skip to content

Commit

Permalink
Allow to set max operation numbers in a single rocksdb batch (#4044)
Browse files Browse the repository at this point in the history
---

In rocksdb, the memory usage is related to the batch size.
The more operations in a single batch, the more memory is consumed.
Expose the configuration to allow control the batch size.

(cherry picked from commit ad0ed21)
  • Loading branch information
zymap authored and hangc0276 committed Aug 17, 2023
1 parent 2f3ab1c commit a2c3aee
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,5 +168,9 @@ public interface Batch extends Closeable {
void clear();

void flush() throws IOException;

default int batchCount() {
return -1;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ public class KeyValueStorageRocksDB implements KeyValueStorage {

private final ReadOptions optionCache;
private final ReadOptions optionDontCache;

private final WriteBatch emptyBatch;
private final int writeBatchMaxSize;

private static final String ROCKSDB_LOG_PATH = "dbStorage_rocksDB_logPath";
private static final String ROCKSDB_LOG_LEVEL = "dbStorage_rocksDB_logLevel";
Expand Down Expand Up @@ -206,6 +206,7 @@ public KeyValueStorageRocksDB(String basePath, String subPath, DbConfigType dbCo

optionCache.setFillCache(true);
optionDontCache.setFillCache(false);
this.writeBatchMaxSize = conf.getMaxOperationNumbersInSingleRocksDBBatch();
}

@Override
Expand Down Expand Up @@ -403,21 +404,29 @@ public long count() throws IOException {

@Override
public Batch newBatch() {
return new RocksDBBatch();
return new RocksDBBatch(writeBatchMaxSize);
}

private class RocksDBBatch implements Batch {
private final WriteBatch writeBatch = new WriteBatch();
private final int batchSize;
private int batchCount = 0;

RocksDBBatch(int batchSize) {
this.batchSize = batchSize;
}

@Override
public void close() {
writeBatch.close();
batchCount = 0;
}

@Override
public void put(byte[] key, byte[] value) throws IOException {
try {
writeBatch.put(key, value);
countBatchAndFlushIfNeeded();
} catch (RocksDBException e) {
throw new IOException("Failed to flush RocksDB batch", e);
}
Expand All @@ -427,6 +436,7 @@ public void put(byte[] key, byte[] value) throws IOException {
public void remove(byte[] key) throws IOException {
try {
writeBatch.delete(key);
countBatchAndFlushIfNeeded();
} catch (RocksDBException e) {
throw new IOException("Failed to flush RocksDB batch", e);
}
Expand All @@ -435,17 +445,31 @@ public void remove(byte[] key) throws IOException {
@Override
public void clear() {
writeBatch.clear();
batchCount = 0;
}

@Override
public void deleteRange(byte[] beginKey, byte[] endKey) throws IOException {
try {
writeBatch.deleteRange(beginKey, endKey);
countBatchAndFlushIfNeeded();
} catch (RocksDBException e) {
throw new IOException("Failed to flush RocksDB batch", e);
}
}

private void countBatchAndFlushIfNeeded() throws IOException {
if (++batchCount >= batchSize) {
flush();
clear();
}
}

@Override
public int batchCount() {
return batchCount;
}

@Override
public void flush() throws IOException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,9 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
protected static final String AUTHORIZED_ROLES = "authorizedRoles";
protected static final String ROCKSDB_DELETE_ENTRIES_BATCH_SIZE = "rocksDBDeleteEntriesBatchSize";

protected static final String MAX_OPERATION_NUMBERS_IN_SINGLE_ROCKSDB_WRITE_BATCH =
"maxOperationNumbersInSingleRocksdbWriteBatch";

protected static final String SKIP_REPLAY_JOURNAL_INVALID_RECORD = "skipReplayJournalInvalidRecord";

/**
Expand Down Expand Up @@ -3664,4 +3667,25 @@ public ServerConfiguration setRocksDBDeleteEntriesBatchSize(int rocksDBDeleteEnt
this.setProperty(ROCKSDB_DELETE_ENTRIES_BATCH_SIZE, rocksDBDeleteEntriesBatchSize);
return this;
}

/**
* Set the max operation numbers in a single rocksdb write batch.
* The rocksdb write batch is related to the memory usage. If the batch is too large, it will cause the OOM.
*
* @param maxNumbersInSingleRocksDBBatch
* @return
*/
public ServerConfiguration setOperationMaxNumbersInSingleRocksDBWriteBatch(int maxNumbersInSingleRocksDBBatch) {
this.setProperty(MAX_OPERATION_NUMBERS_IN_SINGLE_ROCKSDB_WRITE_BATCH, maxNumbersInSingleRocksDBBatch);
return this;
}

/**
* Get the max operation numbers in a single rocksdb write batch.
*
* @return
*/
public int getMaxOperationNumbersInSingleRocksDBBatch() {
return getInt(MAX_OPERATION_NUMBERS_IN_SINGLE_ROCKSDB_WRITE_BATCH, 100000);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,41 @@ public void simple() throws Exception {
db.close();
FileUtils.deleteDirectory(tmpDir);
}

@Test
public void testBatch() throws Exception {

configuration.setOperationMaxNumbersInSingleRocksDBWriteBatch(5);

File tmpDir = Files.createTempDirectory("junitTemporaryFolder").toFile();
Files.createDirectory(Paths.get(tmpDir.toString(), "subDir"));

KeyValueStorage db = storageFactory.newKeyValueStorage(tmpDir.toString(), "subDir", DbConfigType.Huge,
configuration);

assertEquals(null, db.getFloor(toArray(3)));
assertEquals(0, db.count());

Batch batch = db.newBatch();
assertEquals(0, batch.batchCount());

batch.put(toArray(1), toArray(1));
batch.put(toArray(2), toArray(2));
assertEquals(2, batch.batchCount());

batch.put(toArray(3), toArray(3));
batch.put(toArray(4), toArray(4));
batch.put(toArray(5), toArray(5));
assertEquals(0, batch.batchCount());
batch.put(toArray(6), toArray(6));
assertEquals(1, batch.batchCount());

batch.flush();
assertEquals(1, batch.batchCount());
batch.close();
assertEquals(0, batch.batchCount());

db.close();
FileUtils.deleteDirectory(tmpDir);
}
}

0 comments on commit a2c3aee

Please sign in to comment.