Skip to content

Commit

Permalink
Improve pipeline incremental task persist position (#29749)
Browse files Browse the repository at this point in the history
  • Loading branch information
azexcy authored Jan 17, 2024
1 parent 9590654 commit a8e57a0
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,12 @@
import org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;

import java.util.List;
import java.util.stream.Collectors;

/**
* Single channel consumer importer.
Expand All @@ -50,7 +48,7 @@ public final class SingleChannelConsumerImporter extends AbstractPipelineLifecyc
@Override
protected void runBlocking() {
while (isRunning()) {
List<Record> records = channel.fetch(batchSize, timeoutMillis).stream().filter(each -> !(each instanceof PlaceholderRecord)).collect(Collectors.toList());
List<Record> records = channel.fetch(batchSize, timeoutMillis);
if (records.isEmpty()) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final L
return;
}
if (binlogEvent.get() instanceof PlaceholderEvent) {
out.add(binlogEvent);
out.add(binlogEvent.get());
skipChecksum(binlogEventHeader.getEventType(), in);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,14 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckId;
import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
import org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;

Expand Down Expand Up @@ -217,15 +216,17 @@ private void doWithoutSorting() {

private void doWithoutSorting(final CDCChannelProgressPair progressPair) {
PipelineChannel channel = progressPair.getChannel();
List<Record> records = channel.fetch(batchSize, timeoutMillis).stream().filter(each -> !(each instanceof PlaceholderRecord)).collect(Collectors.toList());
List<Record> records = channel.fetch(batchSize, timeoutMillis);
if (records.isEmpty()) {
return;
}
Record lastRecord = records.get(records.size() - 1);
if (lastRecord instanceof FinishedRecord && records.stream().noneMatch(DataRecord.class::isInstance)) {
if (records.stream().noneMatch(DataRecord.class::isInstance)) {
channel.ack(records);
progressPair.getJobProgressListener().onProgressUpdated(new PipelineJobProgressUpdatedParameter(0));
originalChannelProgressPairs.remove(progressPair);
if (lastRecord instanceof FinishedRecord) {
originalChannelProgressPairs.remove(progressPair);
}
return;
}
if (null != rateLimitAlgorithm) {
Expand Down

0 comments on commit a8e57a0

Please sign in to comment.