Skip to content

Commit

Permalink
[Feature][CONNECTORS-V2-Paimon] Dynamic bucket splitting improves Pai…
Browse files Browse the repository at this point in the history
…mon writing efficiency
  • Loading branch information
hawk9821 committed Sep 20, 2024
1 parent a1b351d commit b34a78a
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public SinkWriter<SeaTunnelRow, MultiTableCommitInfo, MultiTableState> createWri
int index = context.getIndexOfSubtask() * replicaNum + i;
writers.put(
SinkIdentifier.of(tableIdentifier, index),
sink.createWriter(new SinkContextProxy(index, context)));
sink.createWriter(new SinkContextProxy(index, replicaNum, context)));
sinkWritersContext.put(SinkIdentifier.of(tableIdentifier, index), context);
}
}
Expand Down Expand Up @@ -100,11 +100,12 @@ public SinkWriter<SeaTunnelRow, MultiTableCommitInfo, MultiTableState> restoreWr
if (state.isEmpty()) {
writers.put(
sinkIdentifier,
sink.createWriter(new SinkContextProxy(index, context)));
sink.createWriter(new SinkContextProxy(index, replicaNum, context)));
} else {
writers.put(
sinkIdentifier,
sink.restoreWriter(new SinkContextProxy(index, context), state));
sink.restoreWriter(
new SinkContextProxy(index, replicaNum, context), state));
}
sinkWritersContext.put(SinkIdentifier.of(tableIdentifier, index), context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@ public class SinkContextProxy implements SinkWriter.Context {

private final int index;

private final int replicaNum;

private final SinkWriter.Context context;

public SinkContextProxy(int index, SinkWriter.Context context) {
public SinkContextProxy(int index, int replicaNum, SinkWriter.Context context) {
this.index = index;
this.replicaNum = replicaNum;
this.context = context;
}

Expand All @@ -39,7 +42,7 @@ public int getIndexOfSubtask() {

@Override
public int getNumberOfParallelSubtasks() {
return context.getNumberOfParallelSubtasks();
return context.getNumberOfParallelSubtasks() * replicaNum;
}

@Override
Expand Down

0 comments on commit b34a78a

Please sign in to comment.