Skip to content

Commit

Permalink
[flink] Add local sample magnification option for sort compaction to …
Browse files Browse the repository at this point in the history
…prevent sampling long time. (#3081)
  • Loading branch information
wg1026688210 authored Apr 2, 2024
1 parent 2a3607a commit 6921d41
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 21 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,12 @@
<td>Duration</td>
<td>In watermarking, if a source remains idle beyond the specified timeout duration, it triggers snapshot advancement and facilitates tag creation.</td>
</tr>
<tr>
<td><h5>sort-compaction.local-sample.magnification</h5></td>
<td style="word-wrap: break-word;">1000</td>
<td>Integer</td>
<td>The magnification of local sample for sort-compaction.The size of local sample is sink parallelism * magnification.</td>
</tr>
<tr>
<td><h5>sort-compaction.range-strategy</h5></td>
<td style="word-wrap: break-word;">QUANTITY</td>
Expand Down
10 changes: 10 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1076,6 +1076,12 @@ public class CoreOptions implements Serializable {
+ "If the data size allocated for the sorting task is uneven,which may lead to performance bottlenecks, "
+ "the config can be set to size.");

public static final ConfigOption<Integer> SORT_COMPACTION_SAMPLE_MAGNIFICATION =
key("sort-compaction.local-sample.magnification")
.intType()
.defaultValue(1000)
.withDescription(
"The magnification of local sample for sort-compaction.The size of local sample is sink parallelism * magnification.");
private final Options options;

public CoreOptions(Map<String, String> options) {
Expand Down Expand Up @@ -1146,6 +1152,10 @@ public boolean sortBySize() {
return options.get(SORT_RANG_STRATEGY) == RangeStrategy.SIZE;
}

public Integer getLocalSampleMagnification() {
return options.get(SORT_COMPACTION_SAMPLE_MAGNIFICATION);
}

public static FileFormat createFileFormat(
Options options, ConfigOption<FileFormatType> formatOption) {
String formatIdentifier = options.get(formatOption).toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ public static <T> DataStream<Tuple2<T, RowData>> rangeShuffleByKey(
DataStream<Tuple2<T, RowData>> inputDataStream,
SerializableSupplier<Comparator<T>> keyComparator,
TypeInformation<T> keyTypeInformation,
int sampleSize,
int localSampleSize,
int globalSampleSize,
int rangeNum,
int outParallelism,
RowType valueRowType,
Expand All @@ -116,7 +117,7 @@ public static <T> DataStream<Tuple2<T, RowData>> rangeShuffleByKey(
new OneInputTransformation<>(
keyInput,
"LOCAL SAMPLE",
new LocalSampleOperator<>(sampleSize),
new LocalSampleOperator<>(localSampleSize),
new TupleTypeInfo<>(
BasicTypeInfo.DOUBLE_TYPE_INFO,
keyTypeInformation,
Expand All @@ -128,7 +129,7 @@ public static <T> DataStream<Tuple2<T, RowData>> rangeShuffleByKey(
new OneInputTransformation<>(
localSample,
"GLOBAL SAMPLE",
new GlobalSampleOperator<>(sampleSize, keyComparator, rangeNum),
new GlobalSampleOperator<>(globalSampleSize, keyComparator, rangeNum),
new ListTypeInfo<>(keyTypeInformation),
1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,17 @@ public static <KEY> DataStream<RowData> sortStreamByKey(
"The adaptive batch scheduler is not supported. Please set the sink parallelism using the key: "
+ FlinkConnectorOptions.SINK_PARALLELISM.key());
}
final int sampleSize = sinkParallelism * 1000;
int localSampleMagnification = options.getLocalSampleMagnification();
if (localSampleMagnification < 20) {
throw new IllegalArgumentException(
String.format(
"the config '%s=%d' should not be set too small,greater than or equal to 20 is needed.",
CoreOptions.SORT_COMPACTION_SAMPLE_MAGNIFICATION.key(),
localSampleMagnification));
}
final int localSampleSize = sinkParallelism * localSampleMagnification;
final int globalSampleSize = sinkParallelism * 1000;
final int rangeNum = sinkParallelism * 10;

int keyFieldCount = sortKeyType.getFieldCount();
int valueFieldCount = valueRowType.getFieldCount();
final int[] valueProjectionMap = new int[valueFieldCount];
Expand Down Expand Up @@ -144,7 +152,8 @@ public Tuple2<KEY, RowData> map(RowData value) {
inputWithKey,
shuffleKeyComparator,
keyTypeInformation,
sampleSize,
localSampleSize,
globalSampleSize,
rangeNum,
sinkParallelism,
valueRowType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.types.DataTypes;

import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -347,22 +349,55 @@ private void order(List<String> columns) throws Exception {

private SortCompactAction createAction(
String orderStrategy, String rangeStrategy, List<String> columns) {
return createAction(orderStrategy, rangeStrategy, columns, Lists.newArrayList());
}

private SortCompactAction createAction(
String orderStrategy,
String rangeStrategy,
List<String> columns,
List<String> extraConfigs) {
ArrayList<String> args =
Lists.newArrayList(
"compact",
"--warehouse",
warehouse,
"--database",
database,
"--table",
tableName,
"--order_strategy",
orderStrategy,
"--order_by",
String.join(",", columns),
"--table_conf",
"sort-compaction.range-strategy=" + rangeStrategy);
args.addAll(extraConfigs);
return createAction(SortCompactAction.class, args.toArray(new String[0]));
}

return createAction(
SortCompactAction.class,
"compact",
"--warehouse",
warehouse,
"--database",
database,
"--table",
tableName,
"--order_strategy",
orderStrategy,
"--order_by",
String.join(",", columns),
"--table_conf sort-compaction.range-strategy=" + rangeStrategy,
rangeStrategy);
@Test
public void testvalidSampleConfig() throws Exception {
prepareData(300, 1);
{
ArrayList<String> extraCompactionConfig =
Lists.newArrayList(
"--table_conf", "sort-compaction.local-sample.magnification=1");
Assertions.assertThatCode(
() -> {
createAction(
"order",
"size",
Arrays.asList(
"f0", "f1", "f2", "f3", "f4", "f5", "f6",
"f7", "f8", "f9", "f10", "f11", "f12",
"f13", "f14", "f15"),
extraCompactionConfig)
.run();
})
.hasMessage(
"the config 'sort-compaction.local-sample.magnification=1' should not be set too small,greater than or equal to 20 is needed.");
}
}

private void createTable() throws Exception {
Expand Down

0 comments on commit 6921d41

Please sign in to comment.