Skip to content

Commit

Permalink
improve: Enrich GC metrics to better analyze GC behavior and the time…
Browse files Browse the repository at this point in the history
… consumption of each phase. (#4384)

### Motivation
Enrich GC metrics to better analyze GC behavior and the time consumption of each phase.

In our online environment, we found that some clusters spend a lot of time scanning metadata during gc, so the newly added metrics can more conveniently help us analyze the cluster gc behavior

this PR add 4 new metrics:
bookie_GC_LEDGER_RUNTIME
bookie_EXTRACT_META_RUNTIME
bookie_COMPACT_RUNTIME
bookie_ENTRY_LOG_COMPACT_RATIO

### Changes
- bookie_GC_LEDGER_RUNTIME
operation stats of doing gc ledgers based on metaStore, Time consumption for comparing ledger meta between local and metadata store (zk).

- bookie_EXTRACT_META_RUNTIME
Time consumption for extracting Meta from entryLogs.

- bookie_COMPACT_RUNTIME
Time consumption of entry log compaction.

- bookie_ENTRY_LOG_COMPACT_RATIO
Current proportion of compacted entry log files that have been executed (Provide reference for users when setting CompactionThreshold values, and configure more reasonable values)

<img width="904" alt="image" src="https://github.com/apache/bookkeeper/assets/16517186/79870ed5-5191-429c-b6e3-e8a08054bcd1">

Co-authored-by: qunzhong <[email protected]>
  • Loading branch information
ethqunzhong and qunzhong authored Sep 12, 2024
1 parent 1f1df81 commit d0adb60
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ public interface BookKeeperServerStats {
String MINOR_COMPACTION_COUNT = "MINOR_COMPACTION_TOTAL";
String ACTIVE_LEDGER_COUNT = "ACTIVE_LEDGER_TOTAL";
String DELETED_LEDGER_COUNT = "DELETED_LEDGER_TOTAL";
String GC_LEDGER_RUNTIME = "GC_LEDGER_RUNTIME";
String COMPACT_RUNTIME = "COMPACT_RUNTIME";
String EXTRACT_META_RUNTIME = "EXTRACT_META_RUNTIME";
String ENTRY_LOG_COMPACT_RATIO = "ENTRY_LOG_COMPACT_RATIO";

// Index Related Counters
String INDEX_INMEM_ILLEGAL_STATE_RESET = "INDEX_INMEM_ILLEGAL_STATE_RESET";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -59,6 +60,7 @@
public class GarbageCollectorThread implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(GarbageCollectorThread.class);
private static final int SECOND = 1000;
private static final int ENTRY_LOG_USAGE_SEGMENT_COUNT = 10;
private static final long MINUTE = TimeUnit.MINUTES.toMillis(1);

// Maps entry log files to the set of ledgers that comprise the file and the size usage per ledger
Expand Down Expand Up @@ -97,6 +99,8 @@ public class GarbageCollectorThread implements Runnable {

private volatile long totalEntryLogSize;
private volatile int numActiveEntryLogs;
private volatile double entryLogCompactRatio;
private volatile int[] currentEntryLogUsageBuckets;

final CompactableLedgerStorage ledgerStorage;

Expand Down Expand Up @@ -172,12 +176,16 @@ public GarbageCollectorThread(ServerConfiguration conf,

this.numActiveEntryLogs = 0;
this.totalEntryLogSize = 0L;
this.entryLogCompactRatio = 0.0;
this.currentEntryLogUsageBuckets = new int[ENTRY_LOG_USAGE_SEGMENT_COUNT];
this.garbageCollector = new ScanAndCompareGarbageCollector(ledgerManager, ledgerStorage, conf, statsLogger);
this.gcStats = new GarbageCollectorStats(
statsLogger,
() -> numActiveEntryLogs,
() -> totalEntryLogSize,
() -> garbageCollector.getNumActiveLedgers()
() -> garbageCollector.getNumActiveLedgers(),
() -> entryLogCompactRatio,
() -> currentEntryLogUsageBuckets
);

this.garbageCleaner = ledgerId -> {
Expand Down Expand Up @@ -413,12 +421,22 @@ public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMin
// this is used in extractMetaFromEntryLogs to calculate the usage of entry log
doGcLedgers();

// Extract all of the ledger ID's that comprise all of the entry logs
// (except for the current new one which is still being written to).
extractMetaFromEntryLogs();

// gc entry logs
doGcEntryLogs();
long extractMetaStart = MathUtils.nowInNano();
try {
// Extract all of the ledger ID's that comprise all of the entry logs
// (except for the current new one which is still being written to).
extractMetaFromEntryLogs();

// gc entry logs
doGcEntryLogs();
gcStats.getExtractMetaRuntime()
.registerSuccessfulEvent(MathUtils.elapsedNanos(extractMetaStart), TimeUnit.NANOSECONDS);
} catch (EntryLogMetadataMapException e) {
gcStats.getExtractMetaRuntime()
.registerFailedEvent(MathUtils.elapsedNanos(extractMetaStart), TimeUnit.NANOSECONDS);
throw e;
}

if (suspendMajor) {
LOG.info("Disk almost full, suspend major compaction to slow down filling disk.");
Expand All @@ -428,14 +446,20 @@ public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMin
}

long curTime = System.currentTimeMillis();
long compactStart = MathUtils.nowInNano();
if (((isForceMajorCompactionAllow && force) || (enableMajorCompaction
&& (force || curTime - lastMajorCompactionTime > majorCompactionInterval)))
&& (!suspendMajor)) {
// enter major compaction
LOG.info("Enter major compaction, suspendMajor {}", suspendMajor);
LOG.info("Enter major compaction, suspendMajor {}, lastMajorCompactionTime {}", suspendMajor,
lastMajorCompactionTime);
majorCompacting.set(true);
try {
doCompactEntryLogs(majorCompactionThreshold, majorCompactionMaxTimeMillis);
} catch (EntryLogMetadataMapException e) {
gcStats.getCompactRuntime()
.registerFailedEvent(MathUtils.elapsedNanos(compactStart), TimeUnit.NANOSECONDS);
throw e;
} finally {
lastMajorCompactionTime = System.currentTimeMillis();
// and also move minor compaction time
Expand All @@ -447,23 +471,29 @@ public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMin
&& (force || curTime - lastMinorCompactionTime > minorCompactionInterval)))
&& (!suspendMinor)) {
// enter minor compaction
LOG.info("Enter minor compaction, suspendMinor {}", suspendMinor);
LOG.info("Enter minor compaction, suspendMinor {}, lastMinorCompactionTime {}", suspendMinor,
lastMinorCompactionTime);
minorCompacting.set(true);
try {
doCompactEntryLogs(minorCompactionThreshold, minorCompactionMaxTimeMillis);
} catch (EntryLogMetadataMapException e) {
gcStats.getCompactRuntime()
.registerFailedEvent(MathUtils.elapsedNanos(compactStart), TimeUnit.NANOSECONDS);
throw e;
} finally {
lastMinorCompactionTime = System.currentTimeMillis();
gcStats.getMinorCompactionCounter().inc();
minorCompacting.set(false);
}
}
gcStats.getCompactRuntime()
.registerSuccessfulEvent(MathUtils.elapsedNanos(compactStart), TimeUnit.NANOSECONDS);
gcStats.getGcThreadRuntime().registerSuccessfulEvent(
MathUtils.nowInNano() - threadStart, TimeUnit.NANOSECONDS);
} catch (EntryLogMetadataMapException e) {
LOG.error("Error in entryLog-metadatamap, Failed to complete GC/Compaction due to entry-log {}",
e.getMessage(), e);
gcStats.getGcThreadRuntime().registerFailedEvent(
MathUtils.nowInNano() - threadStart, TimeUnit.NANOSECONDS);
gcStats.getGcThreadRuntime().registerFailedEvent(MathUtils.elapsedNanos(threadStart), TimeUnit.NANOSECONDS);
} finally {
if (force && forceGarbageCollection.compareAndSet(true, false)) {
LOG.info("{} Set forceGarbageCollection to false after force GC to make it forceGC-able again.",
Expand All @@ -477,7 +507,16 @@ public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMin
* Do garbage collection ledger index files.
*/
private void doGcLedgers() {
garbageCollector.gc(garbageCleaner);
long gcLedgersStart = MathUtils.nowInNano();
try {
garbageCollector.gc(garbageCleaner);
gcStats.getGcLedgerRuntime()
.registerSuccessfulEvent(MathUtils.elapsedNanos(gcLedgersStart), TimeUnit.NANOSECONDS);
} catch (Throwable t) {
LOG.warn("Exception when doing gc ledger.", t);
gcStats.getGcLedgerRuntime()
.registerFailedEvent(MathUtils.elapsedNanos(gcLedgersStart), TimeUnit.NANOSECONDS);
}
}

/**
Expand Down Expand Up @@ -550,7 +589,7 @@ private boolean removeIfLedgerNotExists(EntryLogMetadata meta) throws EntryLogMe
void doCompactEntryLogs(double threshold, long maxTimeMillis) throws EntryLogMetadataMapException {
LOG.info("Do compaction to compact those files lower than {}", threshold);

final int numBuckets = 10;
final int numBuckets = ENTRY_LOG_USAGE_SEGMENT_COUNT;
int[] entryLogUsageBuckets = new int[numBuckets];
int[] compactedBuckets = new int[numBuckets];

Expand Down Expand Up @@ -585,7 +624,8 @@ void doCompactEntryLogs(double threshold, long maxTimeMillis) throws EntryLogMet

compactableBuckets.get(bucketIndex).add(meta.getEntryLogId());
});

currentEntryLogUsageBuckets = entryLogUsageBuckets;
gcStats.setEntryLogUsageBuckets(currentEntryLogUsageBuckets);
LOG.info(
"Compaction: entry log usage buckets before compaction [10% 20% 30% 40% 50% 60% 70% 80% 90% 100%] = {}",
entryLogUsageBuckets);
Expand Down Expand Up @@ -649,9 +689,11 @@ void doCompactEntryLogs(double threshold, long maxTimeMillis) throws EntryLogMet
LOG.debug("Compaction ran for {}ms but was limited by {}ms", timeDiff, maxTimeMillis);
}
}
LOG.info(
"Compaction: entry log usage buckets[10% 20% 30% 40% 50% 60% 70% 80% 90% 100%] = {}, compacted {}",
entryLogUsageBuckets, compactedBuckets);
int totalEntryLogNum = Arrays.stream(entryLogUsageBuckets).sum();
int compactedEntryLogNum = Arrays.stream(compactedBuckets).sum();
this.entryLogCompactRatio = totalEntryLogNum == 0 ? 0 : (double) compactedEntryLogNum / totalEntryLogNum;
LOG.info("Compaction: entry log usage buckets[10% 20% 30% 40% 50% 60% 70% 80% 90% 100%] = {}, compacted {}, "
+ "compacted entry log ratio {}", entryLogUsageBuckets, compactedBuckets, entryLogCompactRatio);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ACTIVE_LEDGER_COUNT;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.COMPACT_RUNTIME;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.DELETED_LEDGER_COUNT;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ENTRY_LOG_COMPACT_RATIO;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.EXTRACT_META_RUNTIME;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GC_LEDGER_RUNTIME;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.MAJOR_COMPACTION_COUNT;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.MINOR_COMPACTION_COUNT;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.RECLAIMED_COMPACTION_SPACE_BYTES;
Expand Down Expand Up @@ -102,11 +106,36 @@ public class GarbageCollectorStats {
help = "Current number of active ledgers"
)
private final Gauge<Integer> activeLedgerCountGauge;
@StatsDoc(
name = GC_LEDGER_RUNTIME,
help = "Operation stats of doing gc ledgers base on metaStore"
)
private final OpStatsLogger gcLedgerRuntime;
@StatsDoc(
name = COMPACT_RUNTIME,
help = "Operation stats of entry log compaction"
)
private final OpStatsLogger compactRuntime;
@StatsDoc(
name = EXTRACT_META_RUNTIME,
help = "Operation stats of extracting Meta from entryLogs"
)
private final OpStatsLogger extractMetaRuntime;
@StatsDoc(
name = ENTRY_LOG_COMPACT_RATIO,
help = "Current proportion of compacted entry log files that have been executed"
)
private final Gauge<Double> entryLogCompactRatioGauge;
private volatile int[] entryLogUsageBuckets;
private final Gauge<Integer>[] entryLogUsageBucketsLeGauges;


public GarbageCollectorStats(StatsLogger statsLogger,
Supplier<Integer> activeEntryLogCountSupplier,
Supplier<Long> activeEntryLogSpaceBytesSupplier,
Supplier<Integer> activeLedgerCountSupplier) {
Supplier<Integer> activeLedgerCountSupplier,
Supplier<Double> entryLogCompactRatioSupplier,
Supplier<int[]> usageBuckets) {
this.statsLogger = statsLogger;

this.minorCompactionCounter = statsLogger.getCounter(MINOR_COMPACTION_COUNT);
Expand All @@ -116,6 +145,10 @@ public GarbageCollectorStats(StatsLogger statsLogger,
this.reclaimFailedToDelete = statsLogger.getCounter(RECLAIM_FAILED_TO_DELETE);
this.gcThreadRuntime = statsLogger.getOpStatsLogger(THREAD_RUNTIME);
this.deletedLedgerCounter = statsLogger.getCounter(DELETED_LEDGER_COUNT);
this.gcLedgerRuntime = statsLogger.getOpStatsLogger(GC_LEDGER_RUNTIME);
this.compactRuntime = statsLogger.getOpStatsLogger(COMPACT_RUNTIME);
this.extractMetaRuntime = statsLogger.getOpStatsLogger(EXTRACT_META_RUNTIME);
this.entryLogUsageBuckets = usageBuckets.get();

this.activeEntryLogCountGauge = new Gauge<Integer>() {
@Override
Expand Down Expand Up @@ -153,6 +186,44 @@ public Integer getSample() {
}
};
statsLogger.registerGauge(ACTIVE_LEDGER_COUNT, activeLedgerCountGauge);
this.entryLogCompactRatioGauge = new Gauge<Double>() {
@Override
public Double getDefaultValue() {
return 0.0;
}

@Override
public Double getSample() {
return entryLogCompactRatioSupplier.get();
}
};
statsLogger.registerGauge(ENTRY_LOG_COMPACT_RATIO, entryLogCompactRatioGauge);

this.entryLogUsageBucketsLeGauges = new Gauge[entryLogUsageBuckets.length];
for (int i = 0; i < entryLogUsageBucketsLeGauges.length; i++) {
entryLogUsageBucketsLeGauges[i] =
registerEntryLogUsageBucketsLeGauge("entry_log_usage_buckets_le_" + (i + 1) * 10, i);
}
}

private Gauge<Integer> registerEntryLogUsageBucketsLeGauge(String name, int index) {
Gauge<Integer> gauge = new Gauge<Integer>() {
@Override
public Integer getDefaultValue() {
return 0;
}

@Override
public Integer getSample() {
return entryLogUsageBuckets[index];
}
};
statsLogger.registerGauge(name, gauge);
return gauge;
}


public void setEntryLogUsageBuckets(int[] usageBuckets) {
entryLogUsageBuckets = usageBuckets;
}
}

0 comments on commit d0adb60

Please sign in to comment.