Skip to content

Commit

Permalink
Fix the mismatch between processed record count and actual data count…
Browse files Browse the repository at this point in the history
… for inventory dumper (apache#33996)
  • Loading branch information
menghaoranss authored and jiangML committed Dec 11, 2024
1 parent 8cb4939 commit 7562fcf
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ private void dumpByPage(final Connection connection, final PipelineTableMetaData
AtomicLong rowCount = new AtomicLong();
IngestPosition position = dumperContext.getCommonContext().getPosition();
do {
QueryRange queryRange = new QueryRange(((PrimaryKeyIngestPosition<?>) position).getBeginValue(), firstQuery, ((PrimaryKeyIngestPosition<?>) position).getEndValue());
QueryRange queryRange = new QueryRange(((PrimaryKeyIngestPosition<?>) position).getBeginValue(), firstQuery && dumperContext.isFirstDump(),
((PrimaryKeyIngestPosition<?>) position).getEndValue());
InventoryQueryParameter<?> queryParam = new InventoryRangeQueryParameter(queryRange);
List<Record> dataRecords = dumpByPage(connection, queryParam, rowCount, tableMetaData);
if (dataRecords.size() > 1 && Objects.deepEquals(getFirstUniqueKeyValue(dataRecords, 0), getFirstUniqueKeyValue(dataRecords, dataRecords.size() - 1))) {
Expand All @@ -169,7 +170,8 @@ private List<Record> dumpByPage(final Connection connection,
final InventoryQueryParameter<?> queryParam, final AtomicLong rowCount, final PipelineTableMetaData tableMetaData) throws SQLException {
DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
int batchSize = dumperContext.getBatchSize();
try (PreparedStatement preparedStatement = JDBCStreamQueryBuilder.build(databaseType, connection, buildDumpByPageSQL(queryParam), batchSize)) {
String sql = buildDumpByPageSQL(queryParam);
try (PreparedStatement preparedStatement = JDBCStreamQueryBuilder.build(databaseType, connection, sql, batchSize)) {
runningStatement.set(preparedStatement);
setParameters(preparedStatement, queryParam);
try (ResultSet resultSet = preparedStatement.executeQuery()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public final class InventoryDumperContext {

private JobRateLimitAlgorithm rateLimitAlgorithm;

private boolean firstDump = true;

public InventoryDumperContext(final DumperCommonContext commonContext) {
this.commonContext = new DumperCommonContext(
commonContext.getDataSourceName(), commonContext.getDataSourceConfig(), commonContext.getTableNameMapper(), commonContext.getTableAndSchemaNameMapper());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private Collection<InventoryDumperContext> splitByPrimaryKey(final InventoryDump
JobRateLimitAlgorithm rateLimitAlgorithm = jobProcessContext.getReadRateLimitAlgorithm();
int i = 0;
for (IngestPosition each : getInventoryPositions(dumperContext, jobItemContext)) {
result.add(createPrimaryKeySplitDumperContext(dumperContext, each, i++, batchSize, rateLimitAlgorithm));
result.add(createPrimaryKeySplitDumperContext(dumperContext, each, i++, batchSize, rateLimitAlgorithm, jobItemContext));
}
return result;
}
Expand Down Expand Up @@ -149,7 +149,8 @@ private Range<Long> getUniqueKeyValuesRange(final TransmissionJobItemContext job
}

private InventoryDumperContext createPrimaryKeySplitDumperContext(final InventoryDumperContext dumperContext, final IngestPosition position,
final int shardingItem, final int batchSize, final JobRateLimitAlgorithm rateLimitAlgorithm) {
final int shardingItem, final int batchSize, final JobRateLimitAlgorithm rateLimitAlgorithm,
final TransmissionJobItemContext jobItemContext) {
InventoryDumperContext result = new InventoryDumperContext(dumperContext.getCommonContext());
result.getCommonContext().setPosition(position);
result.setShardingItem(shardingItem);
Expand All @@ -159,6 +160,11 @@ private InventoryDumperContext createPrimaryKeySplitDumperContext(final Inventor
result.setInsertColumnNames(dumperContext.getInsertColumnNames());
result.setBatchSize(batchSize);
result.setRateLimitAlgorithm(rateLimitAlgorithm);
result.setFirstDump(isFirstDump(jobItemContext));
return result;
}

private boolean isFirstDump(final TransmissionJobItemContext jobItemContext) {
return null == jobItemContext.getInitProgress() && jobItemContext.getProcessedRecordsCount() == 0;
}
}

0 comments on commit 7562fcf

Please sign in to comment.