Skip to content

Commit

Permalink
[flink] Add metrics for unaware append table compaction thread (#4160)
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 committed Sep 17, 2024
1 parent 4e457ba commit cc573ea
Show file tree
Hide file tree
Showing 4 changed files with 232 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,22 @@

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.append.UnawareAppendCompactionTask;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
import org.apache.paimon.operation.metrics.CompactionMetrics;
import org.apache.paimon.operation.metrics.MetricUtils;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.TableCommitImpl;

import org.apache.flink.metrics.MetricGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
Expand All @@ -39,6 +49,8 @@
/** The Compactor of unaware bucket table to execute {@link UnawareAppendCompactionTask}. */
public class UnawareBucketCompactor {

private static final Logger LOG = LoggerFactory.getLogger(UnawareBucketCompactor.class);

private final FileStoreTable table;
private final String commitUser;

Expand All @@ -47,29 +59,82 @@ public class UnawareBucketCompactor {
protected final transient Queue<Future<CommitMessage>> result;

private final transient Supplier<ExecutorService> compactExecutorsupplier;
@Nullable private final transient CompactionMetrics compactionMetrics;
@Nullable private final transient CompactionMetrics.Reporter metricsReporter;

public UnawareBucketCompactor(
FileStoreTable table,
String commitUser,
Supplier<ExecutorService> lazyCompactExecutor) {
Supplier<ExecutorService> lazyCompactExecutor,
@Nullable MetricGroup metricGroup) {
this.table = table;
this.commitUser = commitUser;
this.write = (AppendOnlyFileStoreWrite) table.store().newWrite(commitUser);
this.result = new LinkedList<>();
this.compactExecutorsupplier = lazyCompactExecutor;
this.compactionMetrics =
metricGroup == null
? null
: new CompactionMetrics(new FlinkMetricRegistry(metricGroup), table.name());
this.metricsReporter =
compactionMetrics == null
? null
// partition and bucket fields are no use.
: this.compactionMetrics.createReporter(BinaryRow.EMPTY_ROW, 0);
}

public void processElement(UnawareAppendCompactionTask task) throws Exception {
result.add(compactExecutorsupplier.get().submit(() -> task.doCompact(table, write)));
result.add(
compactExecutorsupplier
.get()
.submit(
() -> {
MetricUtils.safeCall(this::startTimer, LOG);

try {
long startMillis = System.currentTimeMillis();
CommitMessage commitMessage = task.doCompact(table, write);
MetricUtils.safeCall(
() -> {
if (metricsReporter != null) {
metricsReporter.reportCompactionTime(
System.currentTimeMillis()
- startMillis);
}
},
LOG);
return commitMessage;
} finally {
MetricUtils.safeCall(this::stopTimer, LOG);
}
}));
}

private void startTimer() {
if (metricsReporter != null) {
metricsReporter.getCompactTimer().start();
}
}

private void stopTimer() {
if (metricsReporter != null) {
metricsReporter.getCompactTimer().finish();
}
}

public void close() throws Exception {
shutdown();
if (metricsReporter != null) {
MetricUtils.safeCall(metricsReporter::unregister, LOG);
}

if (compactionMetrics != null) {
MetricUtils.safeCall(compactionMetrics::close, LOG);
}
}

@VisibleForTesting
void shutdown() throws Exception {

List<CommitMessage> messages = new ArrayList<>();
for (Future<CommitMessage> resultFuture : result) {
if (!resultFuture.isDone()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ Iterable<Future<CommitMessage>> result() {
public void open() throws Exception {
LOG.debug("Opened a append-only table compaction worker.");
this.unawareBucketCompactor =
new UnawareBucketCompactor(table, commitUser, this::workerExecutor);
new UnawareBucketCompactor(
table, commitUser, this::workerExecutor, getMetricGroup());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@ public void processElement(StreamRecord<MultiTableUnawareAppendCompactionTask> e
private UnawareBucketCompactor compactor(Identifier tableId) {
try {
return new UnawareBucketCompactor(
(FileStoreTable) catalog.getTable(tableId), commitUser, this::workerExecutor);
(FileStoreTable) catalog.getTable(tableId),
commitUser,
this::workerExecutor,
getMetricGroup());
} catch (Catalog.TableNotExistException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.compact;

import org.apache.paimon.append.UnawareAppendCompactionTask;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.source.FileStoreSourceReaderTest;
import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.ExecutorThreadFactory;

import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.apache.paimon.operation.metrics.CompactionMetrics.AVG_COMPACTION_TIME;
import static org.apache.paimon.operation.metrics.CompactionMetrics.COMPACTION_THREAD_BUSY;

/** Test for {@link UnawareBucketCompactor}. */
public class UnawareBucketCompactorTest {

@TempDir private Path dir;
private String tableName = "Orders1";
private String dataBaseName = "my_db";
private Catalog catalog;

@Test
public void testGaugeCollection() throws Exception {
createTable();
ExecutorService executorService =
Executors.newSingleThreadScheduledExecutor(
new ExecutorThreadFactory(
Thread.currentThread().getName() + "-append-only-compact-worker"));
Map<String, Gauge> map = new HashMap<>();
UnawareBucketCompactor unawareBucketCompactor =
new UnawareBucketCompactor(
(FileStoreTable) catalog.getTable(identifier()),
"10086",
() -> executorService,
new FileStoreSourceReaderTest.DummyMetricGroup() {
@Override
public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
map.put(name, gauge);
return null;
}

@Override
public MetricGroup addGroup(String name) {
return this;
}

@Override
public MetricGroup addGroup(String key, String value) {
return this;
}
});

for (int i = 0; i < 320; i++) {
unawareBucketCompactor.processElement(new MockCompactionTask());
Thread.sleep(250);
}

double compactionThreadBusy = (double) map.get(COMPACTION_THREAD_BUSY).getValue();
double compactionAvrgTime = (double) map.get(AVG_COMPACTION_TIME).getValue();

Assertions.assertThat(compactionThreadBusy).isGreaterThan(45).isLessThan(55);
Assertions.assertThat(compactionAvrgTime).isGreaterThan(120).isLessThan(140);
}

protected Catalog getCatalog() {
if (catalog == null) {
Options options = new Options();
options.set(
CatalogOptions.WAREHOUSE,
new org.apache.paimon.fs.Path(dir.toString()).toUri().toString());
catalog = CatalogFactory.createCatalog(CatalogContext.create(options));
}
return catalog;
}

protected void createTable() throws Exception {
getCatalog().createDatabase(dataBaseName, true);
getCatalog().createTable(identifier(), schema(), true);
}

protected Identifier identifier() {
return Identifier.create(dataBaseName, tableName);
}

protected static Schema schema() {
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("f0", DataTypes.INT());
schemaBuilder.column("f1", DataTypes.INT());
schemaBuilder.column("f2", DataTypes.SMALLINT());
schemaBuilder.column("f3", DataTypes.STRING());
schemaBuilder.column("f4", DataTypes.DOUBLE());
schemaBuilder.column("f5", DataTypes.CHAR(100));
schemaBuilder.column("f6", DataTypes.VARCHAR(100));
schemaBuilder.column("f7", DataTypes.BOOLEAN());
schemaBuilder.column("f8", DataTypes.DATE());
schemaBuilder.column("f10", DataTypes.TIMESTAMP(9));
schemaBuilder.column("f11", DataTypes.DECIMAL(10, 2));
schemaBuilder.column("f12", DataTypes.BYTES());
schemaBuilder.column("f13", DataTypes.FLOAT());
schemaBuilder.column("f14", DataTypes.BINARY(10));
schemaBuilder.column("f15", DataTypes.VARBINARY(10));
return schemaBuilder.build();
}

/** Mock compaction task for test. */
private static class MockCompactionTask extends UnawareAppendCompactionTask {

public MockCompactionTask() {
super(BinaryRow.EMPTY_ROW, Collections.emptyList());
}

@Override
public CommitMessage doCompact(FileStoreTable table, AppendOnlyFileStoreWrite write)
throws Exception {
Thread.sleep(125);
return null;
}
}
}

0 comments on commit cc573ea

Please sign in to comment.