Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
wg1026688210 committed Mar 27, 2024
1 parent 5405ab3 commit 17e242f
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 20 deletions.
6 changes: 3 additions & 3 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -564,10 +564,10 @@
<td>In watermarking, if a source remains idle beyond the specified timeout duration, it triggers snapshot advancement and facilitates tag creation.</td>
</tr>
<tr>
<td><h5>sort-compaction.local-sample.size</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td><h5>sort-compaction.local-sample.magnification</h5></td>
<td style="word-wrap: break-word;">1000</td>
<td>Integer</td>
<td>The size of local sample for sort-compaction.By default,the value is the sink parallelism * 1000.</td>
<td>The magnification of local sample for sort-compaction.The size of local sample is sink parallelism * magnification.</td>
</tr>
<tr>
<td><h5>sort-compaction.range-strategy</h5></td>
Expand Down
12 changes: 6 additions & 6 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1056,12 +1056,12 @@ public class CoreOptions implements Serializable {
+ "If the data size allocated for the sorting task is uneven,which may lead to performance bottlenecks, "
+ "the config can be set to size.");

public static final ConfigOption<Integer> SORT_COMPACTION_LOCAL_SAMPLE_SIZE =
key("sort-compaction.local-sample.size")
public static final ConfigOption<Integer> SORT_COMPACTION_SAMPLE_MAGNIFICATION =
key("sort-compaction.local-sample.magnification")
.intType()
.noDefaultValue()
.defaultValue(1000)
.withDescription(
"The size of local sample for sort-compaction.By default,the value is the sink parallelism * 1000.");
"The magnification of local sample for sort-compaction.The size of local sample is sink parallelism * magnification.");
private final Options options;

public CoreOptions(Map<String, String> options) {
Expand Down Expand Up @@ -1132,8 +1132,8 @@ public boolean sortBySize() {
return options.get(SORT_RANG_STRATEGY) == RangeStrategy.SIZE;
}

public Optional<Integer> getLocalSampleSize() {
return options.getOptional(SORT_COMPACTION_LOCAL_SAMPLE_SIZE);
public Integer getLocalSampleMagnification() {
return options.get(SORT_COMPACTION_SAMPLE_MAGNIFICATION);
}

public static FileFormat createFileFormat(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,17 @@ public static <KEY> DataStream<RowData> sortStreamByKey(
"The adaptive batch scheduler is not supported. Please set the sink parallelism using the key: "
+ FlinkConnectorOptions.SINK_PARALLELISM.key());
}
final int localSampleSize =
options.getLocalSampleSize().orElseGet(() -> sinkParallelism * 1000);
final int globalSampleSize = sinkParallelism * 1000;
final int rangeNum = sinkParallelism * 10;

if (localSampleSize < 20) {
int localSampleMagnification = options.getLocalSampleMagnification();
if (localSampleMagnification < 20) {
throw new IllegalArgumentException(
String.format(
"the config %s=%d is set too small,greater than or equal to 20 is needed.",
CoreOptions.SORT_COMPACTION_LOCAL_SAMPLE_SIZE.key(), localSampleSize));
"the config '%s=%d' should not be set too small,greater than or equal to 20 is needed.",
CoreOptions.SORT_COMPACTION_SAMPLE_MAGNIFICATION.key(),
localSampleMagnification));
}

final int localSampleSize = sinkParallelism * localSampleMagnification;
final int globalSampleSize = sinkParallelism * 1000;
final int rangeNum = sinkParallelism * 10;
int keyFieldCount = sortKeyType.getFieldCount();
int valueFieldCount = valueRowType.getFieldCount();
final int[] valueProjectionMap = new int[valueFieldCount];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ public void testvalidSampleConfig() throws Exception {
prepareData(300, 1);
{
ArrayList<String> extraCompactionConfig =
Lists.newArrayList("--table_conf", "sort-compaction.local-sample.size=1");
Lists.newArrayList("--table_conf", "sort-compaction.local-sample.magnification=1");
Assertions.assertThatCode(
() -> {
createAction(
Expand All @@ -407,7 +407,7 @@ public void testvalidSampleConfig() throws Exception {
.run();
})
.hasMessage(
"the config sort-compaction.local-sample.size=1 is set too small,greater than or equal to 20 is needed.");
"the config 'sort-compaction.local-sample.magnification=1' should not be set too small,greater than or equal to 20 is needed.");
}
}

Expand Down

0 comments on commit 17e242f

Please sign in to comment.