Skip to content

Commit

Permalink
[flink] Fix that compact actions haven't handled scan parallelism
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Mar 22, 2024
1 parent 6378848 commit d38fc4c
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,10 @@ private DataStreamSource<AppendOnlyCompactionTask> buildSource() {
specifiedPartitions != null
? PredicateBuilder.partitions(specifiedPartitions, table.rowType())
: null);

return BucketUnawareCompactSource.buildSource(env, source, isContinuous, tableIdentifier);
Integer scanParallelism =
Options.fromMap(table.options()).get(FlinkConnectorOptions.SCAN_PARALLELISM);
return BucketUnawareCompactSource.buildSource(
env, source, isContinuous, scanParallelism, tableIdentifier);
}

private void sinkFromSource(DataStreamSource<AppendOnlyCompactionTask> input) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,21 @@ public static DataStreamSource<AppendOnlyCompactionTask> buildSource(
StreamExecutionEnvironment env,
BucketUnawareCompactSource source,
boolean streaming,
@Nullable Integer parallelism,
String tableIdentifier) {
final StreamSource<AppendOnlyCompactionTask, BucketUnawareCompactSource> sourceOperator =
new StreamSource<>(source);
return new DataStreamSource<>(
env,
new CompactionTaskTypeInfo(),
sourceOperator,
false,
COMPACTION_COORDINATOR_NAME + " : " + tableIdentifier,
streaming ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED);
DataStreamSource<AppendOnlyCompactionTask> dataStream =
new DataStreamSource<>(
env,
new CompactionTaskTypeInfo(),
sourceOperator,
false,
COMPACTION_COORDINATOR_NAME + " : " + tableIdentifier,
streaming ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED);
if (parallelism != null) {
dataStream.setParallelism(parallelism);
}
return dataStream;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,18 @@ public DataStreamSource<RowData> build() {

BucketsTable bucketsTable = new BucketsTable(table, isContinuous);
RowType produceType = bucketsTable.rowType();
return env.fromSource(
buildSource(bucketsTable),
WatermarkStrategy.noWatermarks(),
tableIdentifier + "-compact-source",
InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(produceType)));
DataStreamSource<RowData> dataStream =
env.fromSource(
buildSource(bucketsTable),
WatermarkStrategy.noWatermarks(),
tableIdentifier + "-compact-source",
InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(produceType)));
Integer parallelism =
Options.fromMap(table.options()).get(FlinkConnectorOptions.SCAN_PARALLELISM);
if (parallelism != null) {
dataStream.setParallelism(parallelism);
}
return dataStream;
}

private Map<String, String> streamingCompactOptions() {
Expand Down

0 comments on commit d38fc4c

Please sign in to comment.