Skip to content

Commit

Permalink
Rename PipelineJobUpdateProgress (#32775)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Sep 2, 2024
1 parent 7f495cc commit 62cf7a3
Show file tree
Hide file tree
Showing 14 changed files with 40 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;

import java.util.Collection;
Expand Down Expand Up @@ -61,8 +61,8 @@ public final class ConsistencyCheckJobItemProgressContext implements PipelineJob
private final String sourceDatabaseType;

@Override
public void onProgressUpdated(final PipelineJobProgressUpdatedParameter param) {
checkedRecordsCount.addAndGet(param.getProcessedRecordsCount());
public void onProgressUpdated(final PipelineJobUpdateProgress updateProgress) {
checkedRecordsCount.addAndGet(updateProgress.getProcessedRecordsCount());
PipelineJobProgressPersistService.notifyPersist(jobId, shardingItem);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculateParameter;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculator;
import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
import org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
import org.apache.shardingsphere.infra.exception.core.external.sql.type.wrapper.SQLWrapperException;
import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
Expand Down Expand Up @@ -113,7 +113,7 @@ private TableDataConsistencyCheckResult checkSingleTableInventoryData(final Iter
if (targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
param.getProgressContext().getTargetTableCheckPositions().put(param.getTargetTable().getTableName().toString(), targetCalculatedResult.getMaxUniqueKeyValue().get());
}
param.getProgressContext().onProgressUpdated(new PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
param.getProgressContext().onProgressUpdated(new PipelineJobUpdateProgress(sourceCalculatedResult.getRecordsCount()));
}
if (sourceCalculatedResults.hasNext()) {
checkResult.setMatched(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;

import java.util.List;
Expand Down Expand Up @@ -52,9 +52,9 @@ protected void runBlocking() {
if (records.isEmpty()) {
continue;
}
PipelineJobProgressUpdatedParameter updatedParam = sink.write("", records);
PipelineJobUpdateProgress updateProgress = sink.write("", records);
channel.ack(records);
jobProgressListener.onProgressUpdated(updatedParam);
jobProgressListener.onProgressUpdated(updateProgress);
if (FinishedRecord.class.equals(records.get(records.size() - 1).getClass())) {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.importer.sink;

import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;

import java.io.Closeable;
import java.util.Collection;
Expand All @@ -35,5 +35,5 @@ public interface PipelineSink extends Closeable {
* @param records records
* @return job progress updated parameter
*/
PipelineJobProgressUpdatedParameter write(String ackId, Collection<Record> records);
PipelineJobUpdateProgress write(String ackId, Collection<Record> records);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.shardingsphere.data.pipeline.core.ingest.record.RecordUtils;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.group.DataRecordGroupEngine;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.group.GroupedDataRecord;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineImportSQLBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
Expand Down Expand Up @@ -73,17 +73,17 @@ public PipelineDataSourceSink(final ImporterConfiguration importerConfig, final
}

@Override
public PipelineJobProgressUpdatedParameter write(final String ackId, final Collection<Record> records) {
public PipelineJobUpdateProgress write(final String ackId, final Collection<Record> records) {
List<DataRecord> dataRecords = records.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).collect(Collectors.toList());
if (dataRecords.isEmpty()) {
return new PipelineJobProgressUpdatedParameter(0);
return new PipelineJobUpdateProgress(0);
}
for (GroupedDataRecord each : groupEngine.group(dataRecords)) {
batchWrite(each.getDeleteDataRecords());
batchWrite(each.getInsertDataRecords());
batchWrite(each.getUpdateDataRecords());
}
return new PipelineJobProgressUpdatedParameter((int) dataRecords.stream().filter(each -> PipelineSQLOperationType.INSERT == each.getType()).count());
return new PipelineJobUpdateProgress((int) dataRecords.stream().filter(each -> PipelineSQLOperationType.INSERT == each.getType()).count());
}

@SuppressWarnings("BusyWait")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
public interface PipelineJobProgressListener {

/**
* Emit on progress updated.
* Emit on pipeline job progress updated.
*
* @param param process update parameter
* @param updateProgress pipeline job update process
*/
void onProgressUpdated(PipelineJobProgressUpdatedParameter param);
void onProgressUpdated(PipelineJobUpdateProgress updateProgress);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import lombok.RequiredArgsConstructor;

/**
* Pipeline job process update parameter.
* Pipeline job update progress.
*/
@RequiredArgsConstructor
@Getter
public final class PipelineJobProgressUpdatedParameter {
public final class PipelineJobUpdateProgress {

private final int processedRecordsCount;
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
Expand Down Expand Up @@ -117,8 +117,8 @@ public String getDataSourceName() {
}

@Override
public void onProgressUpdated(final PipelineJobProgressUpdatedParameter param) {
processedRecordsCount.addAndGet(param.getProcessedRecordsCount());
public void onProgressUpdated(final PipelineJobUpdateProgress updateProgress) {
processedRecordsCount.addAndGet(updateProgress.getProcessedRecordsCount());
PipelineJobProgressPersistService.notifyPersist(jobConfig.getJobId(), shardingItem);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
import org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;

import java.util.ArrayList;
Expand Down Expand Up @@ -77,7 +77,7 @@ public final class CDCImporter extends AbstractPipelineLifecycleRunnable impleme
protected void runBlocking() {
CDCImporterManager.putImporter(this);
for (CDCChannelProgressPair each : channelProgressPairs) {
each.getJobProgressListener().onProgressUpdated(new PipelineJobProgressUpdatedParameter(0));
each.getJobProgressListener().onProgressUpdated(new PipelineJobUpdateProgress(0));
}
while (isRunning()) {
if (needSorting) {
Expand Down Expand Up @@ -223,7 +223,7 @@ private void doWithoutSorting(final CDCChannelProgressPair channelProgressPair)
Record lastRecord = records.get(records.size() - 1);
if (records.stream().noneMatch(DataRecord.class::isInstance)) {
channel.ack(records);
channelProgressPair.getJobProgressListener().onProgressUpdated(new PipelineJobProgressUpdatedParameter(0));
channelProgressPair.getJobProgressListener().onProgressUpdated(new PipelineJobUpdateProgress(0));
if (lastRecord instanceof FinishedRecord) {
channelProgressPairs.remove(channelProgressPair);
}
Expand Down Expand Up @@ -255,7 +255,7 @@ public void ack(final String ackId) {
if (lastRecord instanceof FinishedRecord) {
channelProgressPairs.remove(each.getKey());
}
each.getLeft().getJobProgressListener().onProgressUpdated(new PipelineJobProgressUpdatedParameter(ackPosition.getDataRecordCount()));
each.getLeft().getJobProgressListener().onProgressUpdated(new PipelineJobUpdateProgress(ackPosition.getDataRecordCount()));
}
ackCache.invalidate(ackId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
import org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;

Expand Down Expand Up @@ -70,20 +70,20 @@ public PipelineCDCSocketSink(final Channel channel, final ShardingSphereDatabase
}

@Override
public PipelineJobProgressUpdatedParameter write(final String ackId, final Collection<Record> records) {
public PipelineJobUpdateProgress write(final String ackId, final Collection<Record> records) {
if (records.isEmpty()) {
return new PipelineJobProgressUpdatedParameter(0);
return new PipelineJobUpdateProgress(0);
}
while (!channel.isWritable() && channel.isActive()) {
doAwait();
}
if (!channel.isActive()) {
return new PipelineJobProgressUpdatedParameter(0);
return new PipelineJobUpdateProgress(0);
}
Collection<DataRecordResult.Record> resultRecords = getResultRecords(records);
DataRecordResult dataRecordResult = DataRecordResult.newBuilder().addAllRecord(resultRecords).setAckId(ackId).build();
channel.writeAndFlush(CDCResponseUtils.succeed("", ResponseCase.DATA_RECORD_RESULT, dataRecordResult));
return new PipelineJobProgressUpdatedParameter(resultRecords.size());
return new PipelineJobUpdateProgress(resultRecords.size());
}

@SneakyThrows(InterruptedException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.junit.jupiter.api.Test;

Expand All @@ -44,7 +44,7 @@ void assertWrite() throws IOException {
ShardingSphereDatabase mockDatabase = mock(ShardingSphereDatabase.class);
when(mockDatabase.getName()).thenReturn("test");
try (PipelineCDCSocketSink sink = new PipelineCDCSocketSink(mockChannel, mockDatabase, Collections.singletonList("test.t_order"))) {
PipelineJobProgressUpdatedParameter actual = sink.write("ack", Collections.singletonList(new FinishedRecord(new IngestPlaceholderPosition())));
PipelineJobUpdateProgress actual = sink.write("ack", Collections.singletonList(new FinishedRecord(new IngestPlaceholderPosition())));
assertThat(actual.getProcessedRecordsCount(), is(0));
actual = sink.write("ack", Collections.singletonList(new DataRecord(PipelineSQLOperationType.DELETE, "t_order", new IngestPlaceholderPosition(), 1)));
assertThat(actual.getProcessedRecordsCount(), is(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils;
Expand Down Expand Up @@ -89,7 +89,7 @@ public Map<String, TableDataConsistencyCheckResult> check(final String algorithm
.forEach(dataNode -> sourceTableNames.add(DataNodeUtils.formatWithSchema(dataNode)))));
progressContext.setRecordsCount(getRecordsCount());
progressContext.getTableNames().addAll(sourceTableNames);
progressContext.onProgressUpdated(new PipelineJobProgressUpdatedParameter(0));
progressContext.onProgressUpdated(new PipelineJobUpdateProgress(0));
Map<CaseInsensitiveQualifiedTable, TableDataConsistencyCheckResult> result = new LinkedHashMap<>();
try (
PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.type.PipelineDataSourceSink;
Expand Down Expand Up @@ -140,8 +140,8 @@ public boolean isSourceTargetDatabaseTheSame() {
}

@Override
public void onProgressUpdated(final PipelineJobProgressUpdatedParameter param) {
processedRecordsCount.addAndGet(param.getProcessedRecordsCount());
public void onProgressUpdated(final PipelineJobUpdateProgress updateProgress) {
processedRecordsCount.addAndGet(updateProgress.getProcessedRecordsCount());
PipelineJobProgressPersistService.notifyPersist(jobId, shardingItem);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;

Expand All @@ -32,7 +32,7 @@
public final class FixtureTransmissionJobItemContext implements TransmissionJobItemContext {

@Override
public void onProgressUpdated(final PipelineJobProgressUpdatedParameter param) {
public void onProgressUpdated(final PipelineJobUpdateProgress updateProgress) {
}

@Override
Expand Down

0 comments on commit 62cf7a3

Please sign in to comment.