From cc573eacfa466f0b4ad71bf4c578dc1a8b11b744 Mon Sep 17 00:00:00 2001 From: YeJunHao <41894543+leaves12138@users.noreply.github.com> Date: Tue, 17 Sep 2024 17:53:16 +0800 Subject: [PATCH] [flink] Add metrics for unaware append table compaction thread (#4160) --- .../flink/compact/UnawareBucketCompactor.java | 71 +++++++- .../sink/AppendCompactWorkerOperator.java | 3 +- ...nlyMultiTableCompactionWorkerOperator.java | 5 +- .../compact/UnawareBucketCompactorTest.java | 158 ++++++++++++++++++ 4 files changed, 232 insertions(+), 5 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketCompactorTest.java diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactor.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactor.java index 8c2b56777c95..a34f009072be 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactor.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactor.java @@ -20,12 +20,22 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.append.UnawareAppendCompactionTask; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.flink.metrics.FlinkMetricRegistry; import org.apache.paimon.flink.sink.Committable; import org.apache.paimon.operation.AppendOnlyFileStoreWrite; +import org.apache.paimon.operation.metrics.CompactionMetrics; +import org.apache.paimon.operation.metrics.MetricUtils; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.flink.metrics.MetricGroup; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + import java.io.IOException; import java.util.ArrayList; import java.util.LinkedList; @@ -39,6 +49,8 @@ /** The Compactor of unaware bucket table to execute {@link UnawareAppendCompactionTask}. */ public class UnawareBucketCompactor { + private static final Logger LOG = LoggerFactory.getLogger(UnawareBucketCompactor.class); + private final FileStoreTable table; private final String commitUser; @@ -47,29 +59,82 @@ public class UnawareBucketCompactor { protected final transient Queue> result; private final transient Supplier compactExecutorsupplier; + @Nullable private final transient CompactionMetrics compactionMetrics; + @Nullable private final transient CompactionMetrics.Reporter metricsReporter; public UnawareBucketCompactor( FileStoreTable table, String commitUser, - Supplier lazyCompactExecutor) { + Supplier lazyCompactExecutor, + @Nullable MetricGroup metricGroup) { this.table = table; this.commitUser = commitUser; this.write = (AppendOnlyFileStoreWrite) table.store().newWrite(commitUser); this.result = new LinkedList<>(); this.compactExecutorsupplier = lazyCompactExecutor; + this.compactionMetrics = + metricGroup == null + ? null + : new CompactionMetrics(new FlinkMetricRegistry(metricGroup), table.name()); + this.metricsReporter = + compactionMetrics == null + ? null + // partition and bucket fields are no use. + : this.compactionMetrics.createReporter(BinaryRow.EMPTY_ROW, 0); } public void processElement(UnawareAppendCompactionTask task) throws Exception { - result.add(compactExecutorsupplier.get().submit(() -> task.doCompact(table, write))); + result.add( + compactExecutorsupplier + .get() + .submit( + () -> { + MetricUtils.safeCall(this::startTimer, LOG); + + try { + long startMillis = System.currentTimeMillis(); + CommitMessage commitMessage = task.doCompact(table, write); + MetricUtils.safeCall( + () -> { + if (metricsReporter != null) { + metricsReporter.reportCompactionTime( + System.currentTimeMillis() + - startMillis); + } + }, + LOG); + return commitMessage; + } finally { + MetricUtils.safeCall(this::stopTimer, LOG); + } + })); + } + + private void startTimer() { + if (metricsReporter != null) { + metricsReporter.getCompactTimer().start(); + } + } + + private void stopTimer() { + if (metricsReporter != null) { + metricsReporter.getCompactTimer().finish(); + } } public void close() throws Exception { shutdown(); + if (metricsReporter != null) { + MetricUtils.safeCall(metricsReporter::unregister, LOG); + } + + if (compactionMetrics != null) { + MetricUtils.safeCall(compactionMetrics::close, LOG); + } } @VisibleForTesting void shutdown() throws Exception { - List messages = new ArrayList<>(); for (Future resultFuture : result) { if (!resultFuture.isDone()) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendCompactWorkerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendCompactWorkerOperator.java index d9ee79480204..52ab75de6b2c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendCompactWorkerOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendCompactWorkerOperator.java @@ -68,7 +68,8 @@ Iterable> result() { public void open() throws Exception { LOG.debug("Opened a append-only table compaction worker."); this.unawareBucketCompactor = - new UnawareBucketCompactor(table, commitUser, this::workerExecutor); + new UnawareBucketCompactor( + table, commitUser, this::workerExecutor, getMetricGroup()); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java index 9440e4a6551e..15e7b9746fe6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java @@ -111,7 +111,10 @@ public void processElement(StreamRecord e private UnawareBucketCompactor compactor(Identifier tableId) { try { return new UnawareBucketCompactor( - (FileStoreTable) catalog.getTable(tableId), commitUser, this::workerExecutor); + (FileStoreTable) catalog.getTable(tableId), + commitUser, + this::workerExecutor, + getMetricGroup()); } catch (Catalog.TableNotExistException e) { throw new RuntimeException(e); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketCompactorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketCompactorTest.java new file mode 100644 index 000000000000..69229ddce2f6 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketCompactorTest.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact; + +import org.apache.paimon.append.UnawareAppendCompactionTask; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.flink.source.FileStoreSourceReaderTest; +import org.apache.paimon.operation.AppendOnlyFileStoreWrite; +import org.apache.paimon.options.CatalogOptions; +import org.apache.paimon.options.Options; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.utils.ExecutorThreadFactory; + +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.apache.paimon.operation.metrics.CompactionMetrics.AVG_COMPACTION_TIME; +import static org.apache.paimon.operation.metrics.CompactionMetrics.COMPACTION_THREAD_BUSY; + +/** Test for {@link UnawareBucketCompactor}. */ +public class UnawareBucketCompactorTest { + + @TempDir private Path dir; + private String tableName = "Orders1"; + private String dataBaseName = "my_db"; + private Catalog catalog; + + @Test + public void testGaugeCollection() throws Exception { + createTable(); + ExecutorService executorService = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory( + Thread.currentThread().getName() + "-append-only-compact-worker")); + Map map = new HashMap<>(); + UnawareBucketCompactor unawareBucketCompactor = + new UnawareBucketCompactor( + (FileStoreTable) catalog.getTable(identifier()), + "10086", + () -> executorService, + new FileStoreSourceReaderTest.DummyMetricGroup() { + @Override + public > G gauge(String name, G gauge) { + map.put(name, gauge); + return null; + } + + @Override + public MetricGroup addGroup(String name) { + return this; + } + + @Override + public MetricGroup addGroup(String key, String value) { + return this; + } + }); + + for (int i = 0; i < 320; i++) { + unawareBucketCompactor.processElement(new MockCompactionTask()); + Thread.sleep(250); + } + + double compactionThreadBusy = (double) map.get(COMPACTION_THREAD_BUSY).getValue(); + double compactionAvrgTime = (double) map.get(AVG_COMPACTION_TIME).getValue(); + + Assertions.assertThat(compactionThreadBusy).isGreaterThan(45).isLessThan(55); + Assertions.assertThat(compactionAvrgTime).isGreaterThan(120).isLessThan(140); + } + + protected Catalog getCatalog() { + if (catalog == null) { + Options options = new Options(); + options.set( + CatalogOptions.WAREHOUSE, + new org.apache.paimon.fs.Path(dir.toString()).toUri().toString()); + catalog = CatalogFactory.createCatalog(CatalogContext.create(options)); + } + return catalog; + } + + protected void createTable() throws Exception { + getCatalog().createDatabase(dataBaseName, true); + getCatalog().createTable(identifier(), schema(), true); + } + + protected Identifier identifier() { + return Identifier.create(dataBaseName, tableName); + } + + protected static Schema schema() { + Schema.Builder schemaBuilder = Schema.newBuilder(); + schemaBuilder.column("f0", DataTypes.INT()); + schemaBuilder.column("f1", DataTypes.INT()); + schemaBuilder.column("f2", DataTypes.SMALLINT()); + schemaBuilder.column("f3", DataTypes.STRING()); + schemaBuilder.column("f4", DataTypes.DOUBLE()); + schemaBuilder.column("f5", DataTypes.CHAR(100)); + schemaBuilder.column("f6", DataTypes.VARCHAR(100)); + schemaBuilder.column("f7", DataTypes.BOOLEAN()); + schemaBuilder.column("f8", DataTypes.DATE()); + schemaBuilder.column("f10", DataTypes.TIMESTAMP(9)); + schemaBuilder.column("f11", DataTypes.DECIMAL(10, 2)); + schemaBuilder.column("f12", DataTypes.BYTES()); + schemaBuilder.column("f13", DataTypes.FLOAT()); + schemaBuilder.column("f14", DataTypes.BINARY(10)); + schemaBuilder.column("f15", DataTypes.VARBINARY(10)); + return schemaBuilder.build(); + } + + /** Mock compaction task for test. */ + private static class MockCompactionTask extends UnawareAppendCompactionTask { + + public MockCompactionTask() { + super(BinaryRow.EMPTY_ROW, Collections.emptyList()); + } + + @Override + public CommitMessage doCompact(FileStoreTable table, AppendOnlyFileStoreWrite write) + throws Exception { + Thread.sleep(125); + return null; + } + } +}