Skip to content

Commit

Permalink
Refactor InventoryDumper (#32691)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Aug 26, 2024
1 parent 75dd7cf commit 6b35735
Showing 1 changed file with 8 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory;

import com.google.common.base.Strings;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
Expand Down Expand Up @@ -72,7 +70,6 @@
@Slf4j
public class InventoryDumper extends AbstractPipelineLifecycleRunnable implements Dumper {

@Getter(AccessLevel.PROTECTED)
private final InventoryDumperContext dumperContext;

private final PipelineChannel channel;
Expand All @@ -81,7 +78,7 @@ public class InventoryDumper extends AbstractPipelineLifecycleRunnable implement

private final PipelineTableMetaDataLoader metaDataLoader;

private final PipelineInventoryDumpSQLBuilder inventoryDumpSQLBuilder;
private final PipelineInventoryDumpSQLBuilder sqlBuilder;

private final InventoryColumnValueReaderEngine columnValueReaderEngine;

Expand All @@ -95,7 +92,7 @@ public InventoryDumper(final InventoryDumperContext dumperContext, final Pipelin
this.dataSource = dataSource;
this.metaDataLoader = metaDataLoader;
DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
inventoryDumpSQLBuilder = new PipelineInventoryDumpSQLBuilder(databaseType);
sqlBuilder = new PipelineInventoryDumpSQLBuilder(databaseType);
columnValueReaderEngine = new InventoryColumnValueReaderEngine(databaseType);
}

Expand Down Expand Up @@ -231,7 +228,8 @@ private void setParameters(final PreparedStatement preparedStatement, final Inve

private DataRecord loadDataRecord(final ResultSet resultSet, final ResultSetMetaData resultSetMetaData) throws SQLException {
int columnCount = resultSetMetaData.getColumnCount();
DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT, dumperContext.getLogicTableName(), newDataRecordPosition(resultSet), columnCount);
String tableName = dumperContext.getLogicTableName();
DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT, tableName, newDataRecordPosition(resultSet), columnCount);
List<String> insertColumnNames = Optional.ofNullable(dumperContext.getInsertColumnNames()).orElse(Collections.emptyList());
ShardingSpherePreconditions.checkState(insertColumnNames.isEmpty() || insertColumnNames.size() == resultSetMetaData.getColumnCount(),
() -> new PipelineInvalidParameterException("Insert column names count not equals ResultSet column count"));
Expand All @@ -256,17 +254,15 @@ private String buildInventoryDumpPageByPageSQL(final InventoryQueryParameter que
PipelineColumnMetaData firstColumn = dumperContext.getUniqueKeyColumns().get(0);
List<String> columnNames = dumperContext.getQueryColumnNames();
if (QueryType.POINT_QUERY == queryParam.getQueryType()) {
return inventoryDumpSQLBuilder.buildPointQuerySQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName());
return sqlBuilder.buildPointQuerySQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName());
}
QueryRange queryRange = queryParam.getUniqueKeyValueRange();
boolean lowerInclusive = queryRange.isLowerInclusive();
if (null != queryRange.getLower() && null != queryRange.getUpper()) {
return inventoryDumpSQLBuilder.buildDivisibleSQL(new BuildDivisibleSQLParameter(
schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName(), lowerInclusive, true));
return sqlBuilder.buildDivisibleSQL(new BuildDivisibleSQLParameter(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName(), lowerInclusive, true));
}
if (null != queryRange.getLower()) {
return inventoryDumpSQLBuilder.buildDivisibleSQL(new BuildDivisibleSQLParameter(
schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName(), lowerInclusive, false));
return sqlBuilder.buildDivisibleSQL(new BuildDivisibleSQLParameter(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName(), lowerInclusive, false));
}
throw new PipelineInternalException("Primary key position is invalid.");
}
Expand Down Expand Up @@ -323,7 +319,7 @@ private String buildInventoryDumpSQLWithStreamingQuery() {
}
String schemaName = dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
List<String> columnNames = dumperContext.getQueryColumnNames();
return inventoryDumpSQLBuilder.buildFetchAllSQL(schemaName, dumperContext.getActualTableName(), columnNames);
return sqlBuilder.buildFetchAllSQL(schemaName, dumperContext.getActualTableName(), columnNames);
}

@Override
Expand Down

0 comments on commit 6b35735

Please sign in to comment.