Skip to content

Commit

Permalink
feat: optimize key manifest (#364)
Browse files Browse the repository at this point in the history
  • Loading branch information
caojiajun committed Jan 24, 2025
1 parent 6995fe5 commit acb4e41
Show file tree
Hide file tree
Showing 14 changed files with 302 additions and 262 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.netease.nim.camellia.redis.proxy.monitor;

import com.netease.nim.camellia.redis.proxy.upstream.local.storage.command.LocalStorageExecutors;
import com.netease.nim.camellia.redis.proxy.util.ExecutorUtils;
import com.netease.nim.camellia.redis.proxy.util.Utils;
import com.netease.nim.camellia.tools.utils.CamelliaMapUtils;
Expand All @@ -18,90 +19,23 @@ public class LocalStorageMonitor {

private static final Logger logger = LoggerFactory.getLogger(LocalStorageMonitor.class);

private static final Time compactTime = new Time();
private static final Time flushTime = new Time();
private static final Time keyFlushTime = new Time();
private static final Time valueFlushTime = new Time();
private static final Time walFlushTime = new Time();
private static final Time walAppendTime = new Time();
private static final Time valueWaitFlushTime = new Time();
private static final Time keyWaitFlushTime = new Time();
private static final ConcurrentHashMap<String, FileReadWriteCollector> fileMap = new ConcurrentHashMap<>();

private static final Time fileReadTime = new Time();
private static final Time fileWriteTime = new Time();

private static final ConcurrentHashMap<String, LongAdder> fileReadMap = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, LongAdder> fileWriteMap = new ConcurrentHashMap<>();

private static final LongAdder slotInit= new LongAdder();
private static final LongAdder slotInit = new LongAdder();
private static final LongAdder slotExpand = new LongAdder();

public static void compactTime(long time) {
if (time < 0) return;
time = time / 10000;
compactTime.update(time);
}

public static void flushTime(long time) {
if (time < 0) return;
time = time / 10000;
flushTime.update(time);
}

public static void keyFlushTime(long time) {
if (time < 0) return;
time = time / 10000;
keyFlushTime.update(time);
}

public static void valueFlushTime(long time) {
if (time < 0) return;
time = time / 10000;
valueFlushTime.update(time);
}

public static void walFlushTime(long time) {
if (time < 0) return;
time = time / 10000;
walFlushTime.update(time);
}

public static void walAppendTime(long time) {
if (time < 0) return;
time = time / 10000;
walAppendTime.update(time);
}

public static void valueWaitFlushTime(long time) {
if (time < 0) return;
time = time / 10000;
valueWaitFlushTime.update(time);
}

public static void keyWaitFlushTime(long time) {
if (time < 0) return;
time = time / 10000;
keyWaitFlushTime.update(time);
}
private static final ConcurrentHashMap<String, TimeCollector> timeMap = new ConcurrentHashMap<>();

public static void fileReadTime(long time) {
public static void fileRead(String file, long size, long time) {
if (time < 0) return;
time = time / 10000;
fileReadTime.update(time);
CamelliaMapUtils.computeIfAbsent(fileMap, file, k -> new FileReadWriteCollector()).updateRead(size, time);
}

public static void fileWriteTime(long time) {
public static void fileWrite(String file, long size, long time) {
if (time < 0) return;
time = time / 10000;
fileWriteTime.update(time);
}

public static void fileRead(String file, long size) {
CamelliaMapUtils.computeIfAbsent(fileReadMap, file, k -> new LongAdder()).add(size);
}

public static void fileWrite(String file, long size) {
CamelliaMapUtils.computeIfAbsent(fileWriteMap, file, k -> new LongAdder()).add(size);
CamelliaMapUtils.computeIfAbsent(fileMap, file, k -> new FileReadWriteCollector()).updateWrite(size, time);
}

public static void slotInit() {
Expand All @@ -112,74 +46,46 @@ public static void slotExpand() {
slotExpand.increment();
}

public static void time(String item, long time) {
if (time < 0) return;
time = time / 10000;
CamelliaMapUtils.computeIfAbsent(timeMap, item, k -> new TimeCollector()).update(time);
}

static {
ExecutorUtils.scheduleAtFixedRate(() -> {
logger.info("##############");
{
Stats stats = compactTime.getStats();
logger.info("compact stats, count = {}, avg = {}", stats.count, stats.avg);
}
{
Stats stats = flushTime.getStats();
logger.info("flush stats, count = {}, avg = {}", stats.count, stats.avg);
}
{
Stats stats = keyFlushTime.getStats();
logger.info("key flush stats, count = {}, avg = {}", stats.count, stats.avg);
}
{
Stats stats = valueFlushTime.getStats();
logger.info("value flush stats, count = {}, avg = {}", stats.count, stats.avg);
}
{
Stats stats = walFlushTime.getStats();
logger.info("wal flush stats, count = {}, avg = {}", stats.count, stats.avg);
}
{
Stats stats = walAppendTime.getStats();
logger.info("wal append stats, count = {}, avg = {}", stats.count, stats.avg);
}
{
Stats stats = keyWaitFlushTime.getStats();
logger.info("key wait flush stats, count = {}, avg = {}", stats.count, stats.avg);
}
{
Stats stats = valueWaitFlushTime.getStats();
logger.info("value wait flush stats, count = {}, avg = {}", stats.count, stats.avg);
}
{
Stats stats = fileReadTime.getStats();
logger.info("file read stats, count = {}, avg = {}", stats.count, stats.avg);
}
{
Stats stats = fileWriteTime.getStats();
logger.info("file write stats, count = {}, avg = {}", stats.count, stats.avg);
logger.info("slot init, count = {}", slotInit.sumThenReset());
logger.info("slot expand, count = {}", slotExpand.sumThenReset());
logger.info("flush executor, queue.size = {}", LocalStorageExecutors.getInstance().getFlushExecutor().size());
}
{
for (Map.Entry<String, LongAdder> entry : fileWriteMap.entrySet()) {
long size = entry.getValue().sumThenReset();
if (size == 0) {
for (Map.Entry<String, TimeCollector> entry : timeMap.entrySet()) {
TimeCollector time = entry.getValue();
TimeStats stats = time.getStats();
if (stats.count == 0) {
continue;
}
logger.info("file write, file = {}, size = {}", entry.getKey(), Utils.humanReadableByteCountBin(size));
logger.info("time monitor, item = {}, count = {}, avg = {}", entry.getKey(), stats.count, stats.avg);
}
}
{
for (Map.Entry<String, LongAdder> entry : fileReadMap.entrySet()) {
long size = entry.getValue().sumThenReset();
if (size == 0) {
continue;
}
logger.info("file read, file = {}, size = {}", entry.getKey(), Utils.humanReadableByteCountBin(size));
for (Map.Entry<String, FileReadWriteCollector> entry : fileMap.entrySet()) {
FileReadWriteCollector collector = entry.getValue();
FileReadWriteStats stats = collector.getStats();
if (stats.readSize == 0 && stats.writeSize == 0) {
continue;
}
logger.info("file read, file = {}, read.count = {}, read.size = {}, read.time.avg = {}",
entry.getKey(), stats.readTime.count, Utils.humanReadableByteCountBin(stats.readSize), stats.readTime.avg);
logger.info("file write, file = {}, write.count = {}, write.size = {}, write.time.avg = {}",
entry.getKey(), stats.writeTime.count, Utils.humanReadableByteCountBin(stats.writeSize), stats.writeTime.avg);
}
{
logger.info("slot init, count = {}", slotInit.sumThenReset());
logger.info("slot expand, count = {}", slotExpand.sumThenReset());
}
logger.info("##############");
}, 10, 10, TimeUnit.SECONDS);
}

private static class Time {
private static class TimeCollector {
LongAdder time = new LongAdder();
LongAdder count = new LongAdder();

Expand All @@ -188,21 +94,55 @@ void update(long time) {
this.count.increment();
}

Stats getStats() {
TimeStats getStats() {
long time = this.time.sumThenReset();
long count = this.count.sumThenReset();
if (count == 0) {
return new Stats();
return new TimeStats();
}
Stats stats = new Stats();
TimeStats stats = new TimeStats();
stats.avg = ((double) time / count) / 100;
stats.count = count;
return stats;
}
}

private static class Stats {
private static class FileReadWriteCollector {
TimeCollector readTime = new TimeCollector();
LongAdder readSize = new LongAdder();

TimeCollector writeTime = new TimeCollector();
LongAdder writeSize = new LongAdder();

void updateRead(long size, long time) {
readTime.update(time);
readSize.add(size);
}

void updateWrite(long size, long time) {
writeTime.update(time);
writeSize.add(size);
}

FileReadWriteStats getStats() {
FileReadWriteStats stats = new FileReadWriteStats();
stats.readTime = readTime.getStats();
stats.writeTime = writeTime.getStats();
stats.readSize = readSize.sumThenReset();
stats.writeSize = writeSize.sumThenReset();
return stats;
}
}

private static class TimeStats {
long count;
double avg;
}

private static class FileReadWriteStats {
TimeStats readTime;
long readSize;
TimeStats writeTime;
long writeSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private void afterWrite(short slot) throws IOException {
try {
wal.flush(slot, slotWalOffset);
} finally {
LocalStorageMonitor.flushTime(System.nanoTime() - startTime);
LocalStorageMonitor.time("flush", System.nanoTime() - startTime);
}
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void compact(short slot) {
logger.error("compact error, slot = {}, blockType = {}, offset = {}, limit = {}", slot, blockType, offset, limit, e);
} finally {
lastCompactTimeMap.put(slot, TimeCache.currentMillis);
LocalStorageMonitor.compactTime(System.nanoTime() - startTime);
LocalStorageMonitor.time("compact", System.nanoTime() - startTime);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,8 @@ public void write(String file, long offset, byte[] data) throws IOException {
int write = fileChannel.write(buffer, position);
position += write;
}
LocalStorageMonitor.fileWrite(file, data.length);
} finally {
LocalStorageMonitor.fileWriteTime(System.nanoTime() - startTime);
LocalStorageMonitor.fileWrite(file, data.length, System.nanoTime() - startTime);
}
}

Expand All @@ -63,10 +62,9 @@ public byte[] read(String file, long offset, int size) throws IOException {
FileChannel fileChannel = getFileChannel(file);
ByteBuffer buffer = ByteBuffer.allocate(size);
fileChannel.read(buffer, offset);
LocalStorageMonitor.fileRead(file, size);
return buffer.array();
} finally {
LocalStorageMonitor.fileReadTime(System.nanoTime() - startTime);
LocalStorageMonitor.fileRead(file, size, System.nanoTime() - startTime);
}
}

Expand All @@ -80,10 +78,9 @@ public int readInt(String file, long offset) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(4);
fileChannel.read(buffer, offset);
buffer.flip();
LocalStorageMonitor.fileRead(file, 4);
return buffer.getInt();
} finally {
LocalStorageMonitor.fileReadTime(System.nanoTime() - startTime);
LocalStorageMonitor.fileRead(file, 4, System.nanoTime() - startTime);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
package com.netease.nim.camellia.redis.proxy.upstream.local.storage.flush;


import com.netease.nim.camellia.redis.proxy.util.ExecutorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand All @@ -15,17 +10,15 @@
*/
public class FlushExecutor {

private static final Logger logger = LoggerFactory.getLogger(FlushExecutor.class);

private final ThreadPoolExecutor executor;

public FlushExecutor(int poolSize, int queueSize) {
this.executor = new ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(queueSize), new FlushThreadFactory("flush", false));
ExecutorUtils.scheduleAtFixedRate(() -> {
int size = executor.getQueue().size();
logger.info("flush executor, size = {}", size);
}, 10, 10, TimeUnit.SECONDS);
}

public int size() {
return executor.getQueue().size();
}

public boolean isInFlushThread() {
Expand Down
Loading

0 comments on commit acb4e41

Please sign in to comment.