Skip to content

Commit

Permalink
[Feature][CONNECTORS-V2-Paimon] flink task parallelism
Browse files Browse the repository at this point in the history
  • Loading branch information
hawk9821 committed Sep 12, 2024
1 parent cbe8a22 commit e0cd7d8
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,16 @@ public class SinkExecuteProcessor
extends FlinkAbstractPluginExecuteProcessor<Optional<? extends Factory>> {

private static final String PLUGIN_TYPE = PluginType.SINK.getType();
private final int parallelism;

protected SinkExecuteProcessor(
List<URL> jarPaths,
Config envConfig,
List<? extends Config> pluginConfigs,
JobContext jobContext) {
JobContext jobContext,
int parallelism) {
super(jarPaths, envConfig, pluginConfigs, jobContext);
this.parallelism = parallelism;
}

@Override
Expand Down Expand Up @@ -139,10 +142,11 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
}
DataStreamSink<Row> dataStreamSink =
stream.getDataStream()
.sinkTo(new FlinkSink<>(sink, stream.getCatalogTables().get(0)))
.sinkTo(
new FlinkSink<>(
sink, stream.getCatalogTables().get(0), parallelism))
.name(String.format("%s-Sink", sink.getPluginName()));
if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
int parallelism = sinkConfig.getInt(CommonOptions.PARALLELISM.key());
dataStreamSink.setParallelism(parallelism);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public FlinkExecution(Config config) {
registerPlugin(envConfig);
JobContext jobContext = new JobContext();
jobContext.setJobMode(RuntimeEnvironment.getJobMode(config));

int parallelism = envConfig.hasPath("parallelism") ? envConfig.getInt("parallelism") : 1;
this.sourcePluginExecuteProcessor =
new SourceExecuteProcessor(
jarPaths, envConfig, config.getConfigList(Constants.SOURCE), jobContext);
Expand All @@ -98,7 +98,11 @@ public FlinkExecution(Config config) {
jobContext);
this.sinkPluginExecuteProcessor =
new SinkExecuteProcessor(
jarPaths, envConfig, config.getConfigList(Constants.SINK), jobContext);
jarPaths,
envConfig,
config.getConfigList(Constants.SINK),
jobContext,
parallelism);

this.flinkRuntimeEnvironment =
FlinkRuntimeEnvironment.getInstance(this.registerPlugin(config, jarPaths));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,16 @@ public class SinkExecuteProcessor
extends FlinkAbstractPluginExecuteProcessor<Optional<? extends Factory>> {

private static final String PLUGIN_TYPE = PluginType.SINK.getType();
private final int parallelism;

protected SinkExecuteProcessor(
List<URL> jarPaths,
Config envConfig,
List<? extends Config> pluginConfigs,
JobContext jobContext) {
JobContext jobContext,
int parallelism) {
super(jarPaths, envConfig, pluginConfigs, jobContext);
this.parallelism = parallelism;
}

@Override
Expand Down Expand Up @@ -143,7 +146,9 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
.sinkTo(
SinkV1Adapter.wrap(
new FlinkSink<>(
sink, stream.getCatalogTables().get(0))))
sink,
stream.getCatalogTables().get(0),
parallelism)))
.name(String.format("%s-Sink", sink.getPluginName()));
if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
int parallelism = sinkConfig.getInt(CommonOptions.PARALLELISM.key());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,24 @@ public class FlinkSinkWriterContext implements SinkWriter.Context {

private final InitContext writerContext;
private final EventListener eventListener;
private final int parallelism;

public FlinkSinkWriterContext(InitContext writerContext) {
public FlinkSinkWriterContext(InitContext writerContext, int parallelism) {
this.writerContext = writerContext;
this.eventListener = new DefaultEventProcessor(getJobIdForV14(writerContext));
this.parallelism = parallelism;
}

@Override
public int getIndexOfSubtask() {
return writerContext.getSubtaskId();
}

@Override
public int getNumberOfParallelSubtasks() {
return parallelism;
}

@Override
public MetricsContext getMetricsContext() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,23 @@ public class FlinkSink<InputT, CommT, WriterStateT, GlobalCommT>

private final CatalogTable catalogTable;

private final int parallelism;

public FlinkSink(
SeaTunnelSink<SeaTunnelRow, WriterStateT, CommT, GlobalCommT> sink,
CatalogTable catalogTable) {
CatalogTable catalogTable,
int parallelism) {
this.sink = sink;
this.catalogTable = catalogTable;
this.parallelism = parallelism;
}

@Override
public SinkWriter<InputT, CommitWrapper<CommT>, FlinkWriterState<WriterStateT>> createWriter(
Sink.InitContext context, List<FlinkWriterState<WriterStateT>> states)
throws IOException {
org.apache.seatunnel.api.sink.SinkWriter.Context stContext =
new FlinkSinkWriterContext(context);
new FlinkSinkWriterContext(context, parallelism);
if (states == null || states.isEmpty()) {
return new FlinkSinkWriter<>(
sink.createWriter(stContext), 1, catalogTable.getSeaTunnelRowType(), stContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ public class FlinkSinkWriterContext implements SinkWriter.Context {

private final Sink.InitContext writerContext;
private final EventListener eventListener;
private final int parallelism;

public FlinkSinkWriterContext(InitContext writerContext) {
public FlinkSinkWriterContext(InitContext writerContext, int parallelism) {
this.writerContext = writerContext;
this.eventListener = new DefaultEventProcessor(getFlinkJobId(writerContext));
this.parallelism = parallelism;
}

@Override
Expand Down

0 comments on commit e0cd7d8

Please sign in to comment.