From 6b3573552be9d11e896bcbb2dbb4bcd577bae31f Mon Sep 17 00:00:00 2001 From: Liang Zhang Date: Mon, 26 Aug 2024 23:25:11 +0800 Subject: [PATCH] Refactor InventoryDumper (#32691) --- .../dumper/inventory/InventoryDumper.java | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java index 030ab4835e272..4c9fdb0ba3356 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java @@ -18,8 +18,6 @@ package org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory; import com.google.common.base.Strings; -import lombok.AccessLevel; -import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType; @@ -72,7 +70,6 @@ @Slf4j public class InventoryDumper extends AbstractPipelineLifecycleRunnable implements Dumper { - @Getter(AccessLevel.PROTECTED) private final InventoryDumperContext dumperContext; private final PipelineChannel channel; @@ -81,7 +78,7 @@ public class InventoryDumper extends AbstractPipelineLifecycleRunnable implement private final PipelineTableMetaDataLoader metaDataLoader; - private final PipelineInventoryDumpSQLBuilder inventoryDumpSQLBuilder; + private final PipelineInventoryDumpSQLBuilder sqlBuilder; private final InventoryColumnValueReaderEngine columnValueReaderEngine; @@ -95,7 +92,7 @@ public InventoryDumper(final InventoryDumperContext dumperContext, final Pipelin this.dataSource = dataSource; this.metaDataLoader = metaDataLoader; DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType(); - inventoryDumpSQLBuilder = new PipelineInventoryDumpSQLBuilder(databaseType); + sqlBuilder = new PipelineInventoryDumpSQLBuilder(databaseType); columnValueReaderEngine = new InventoryColumnValueReaderEngine(databaseType); } @@ -231,7 +228,8 @@ private void setParameters(final PreparedStatement preparedStatement, final Inve private DataRecord loadDataRecord(final ResultSet resultSet, final ResultSetMetaData resultSetMetaData) throws SQLException { int columnCount = resultSetMetaData.getColumnCount(); - DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT, dumperContext.getLogicTableName(), newDataRecordPosition(resultSet), columnCount); + String tableName = dumperContext.getLogicTableName(); + DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT, tableName, newDataRecordPosition(resultSet), columnCount); List insertColumnNames = Optional.ofNullable(dumperContext.getInsertColumnNames()).orElse(Collections.emptyList()); ShardingSpherePreconditions.checkState(insertColumnNames.isEmpty() || insertColumnNames.size() == resultSetMetaData.getColumnCount(), () -> new PipelineInvalidParameterException("Insert column names count not equals ResultSet column count")); @@ -256,17 +254,15 @@ private String buildInventoryDumpPageByPageSQL(final InventoryQueryParameter que PipelineColumnMetaData firstColumn = dumperContext.getUniqueKeyColumns().get(0); List columnNames = dumperContext.getQueryColumnNames(); if (QueryType.POINT_QUERY == queryParam.getQueryType()) { - return inventoryDumpSQLBuilder.buildPointQuerySQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName()); + return sqlBuilder.buildPointQuerySQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName()); } QueryRange queryRange = queryParam.getUniqueKeyValueRange(); boolean lowerInclusive = queryRange.isLowerInclusive(); if (null != queryRange.getLower() && null != queryRange.getUpper()) { - return inventoryDumpSQLBuilder.buildDivisibleSQL(new BuildDivisibleSQLParameter( - schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName(), lowerInclusive, true)); + return sqlBuilder.buildDivisibleSQL(new BuildDivisibleSQLParameter(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName(), lowerInclusive, true)); } if (null != queryRange.getLower()) { - return inventoryDumpSQLBuilder.buildDivisibleSQL(new BuildDivisibleSQLParameter( - schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName(), lowerInclusive, false)); + return sqlBuilder.buildDivisibleSQL(new BuildDivisibleSQLParameter(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName(), lowerInclusive, false)); } throw new PipelineInternalException("Primary key position is invalid."); } @@ -323,7 +319,7 @@ private String buildInventoryDumpSQLWithStreamingQuery() { } String schemaName = dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()); List columnNames = dumperContext.getQueryColumnNames(); - return inventoryDumpSQLBuilder.buildFetchAllSQL(schemaName, dumperContext.getActualTableName(), columnNames); + return sqlBuilder.buildFetchAllSQL(schemaName, dumperContext.getActualTableName(), columnNames); } @Override