From 52aa35ca59498fb507d356645eff210c5ca034cd Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sat, 7 Dec 2024 11:00:22 +0800 Subject: [PATCH 1/4] Fix E2EEnvironmentEngine --- .../shardingsphere/test/e2e/env/E2EEnvironmentEngine.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/e2e/sql/src/test/java/org/apache/shardingsphere/test/e2e/env/E2EEnvironmentEngine.java b/test/e2e/sql/src/test/java/org/apache/shardingsphere/test/e2e/env/E2EEnvironmentEngine.java index c15f87c0846a1..b1eb9e3b03c28 100644 --- a/test/e2e/sql/src/test/java/org/apache/shardingsphere/test/e2e/env/E2EEnvironmentEngine.java +++ b/test/e2e/sql/src/test/java/org/apache/shardingsphere/test/e2e/env/E2EEnvironmentEngine.java @@ -19,6 +19,7 @@ import lombok.Getter; import lombok.SneakyThrows; +import org.apache.shardingsphere.infra.database.core.DefaultDatabase; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import org.apache.shardingsphere.test.e2e.container.compose.ContainerComposer; import org.apache.shardingsphere.test.e2e.container.compose.ContainerComposerRegistry; @@ -68,7 +69,7 @@ public E2EEnvironmentEngine(final String key, final String scenario, final Datab @SneakyThrows({SQLException.class, IOException.class}) private void executeLogicDatabaseInitSQLFileOnlyOnce(final String key, final String scenario, final DatabaseType databaseType, final DataSource targetDataSource) { - Optional logicDatabaseInitSQLFile = new ScenarioDataPath(scenario).findActualDatabaseInitSQLFile("foo_db", databaseType); + Optional logicDatabaseInitSQLFile = new ScenarioDataPath(scenario).findActualDatabaseInitSQLFile(DefaultDatabase.LOGIC_NAME, databaseType); if (!logicDatabaseInitSQLFile.isPresent()) { return; } From 1a4a727d5d65f79dfd8e2cddd7f6281638303d25 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sat, 7 Dec 2024 11:38:18 +0800 Subject: [PATCH 2/4] Use ShardingSphereIdentifier on ShardingColumnsExtractor --- .../ShardingRuleConfigurationConverter.java | 2 -- .../core/importer/ImporterConfiguration.java | 12 ++++--- .../core/util/ShardingColumnsExtractor.java | 32 +++++++++---------- .../data/pipeline/cdc/CDCJob.java | 6 ++-- .../MigrationJobExecutorCallback.java | 8 ++--- .../core/util/PipelineContextUtils.java | 8 ++--- 6 files changed, 33 insertions(+), 35 deletions(-) diff --git a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/yaml/swapper/ShardingRuleConfigurationConverter.java b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/yaml/swapper/ShardingRuleConfigurationConverter.java index f869417b03385..66e12f94d661f 100644 --- a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/yaml/swapper/ShardingRuleConfigurationConverter.java +++ b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/yaml/swapper/ShardingRuleConfigurationConverter.java @@ -38,7 +38,6 @@ public final class ShardingRuleConfigurationConverter { * * @param yamlRuleConfigs YAML rule configurations * @return sharding rule configuration - * @throws IllegalStateException if there is no available sharding rule */ public static Optional findAndConvertShardingRuleConfiguration(final Collection yamlRuleConfigs) { return findYamlShardingRuleConfiguration(yamlRuleConfigs).map(each -> new YamlShardingRuleConfigurationSwapper().swapToObject(each)); @@ -49,7 +48,6 @@ public static Optional findAndConvertShardingRuleConf * * @param yamlRuleConfigs YAML rule configurations * @return YAML sharding rule configuration - * @throws IllegalStateException if there is no available sharding rule */ public static Optional findYamlShardingRuleConfiguration(final Collection yamlRuleConfigs) { return yamlRuleConfigs.stream().filter(YamlShardingRuleConfiguration.class::isInstance).findFirst().map(YamlShardingRuleConfiguration.class::cast); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java index 58cfd0e39095e..bb0ebca58cdbf 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java @@ -17,16 +17,17 @@ package org.apache.shardingsphere.data.pipeline.core.importer; +import lombok.AccessLevel; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.ToString; import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper; -import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier; -import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable; import org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm; import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData; import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry; +import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable; +import org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier; import java.util.Collection; import java.util.Collections; @@ -45,7 +46,8 @@ public final class ImporterConfiguration { private final PipelineDataSourceConfiguration dataSourceConfig; - private final Map> shardingColumnsMap; + @Getter(AccessLevel.NONE) + private final Map> shardingColumnsMap; private final TableAndSchemaNameMapper tableAndSchemaNameMapper; @@ -64,7 +66,7 @@ public final class ImporterConfiguration { * @return sharding columns */ public Set getShardingColumns(final String logicTableName) { - return shardingColumnsMap.getOrDefault(new CaseInsensitiveIdentifier(logicTableName), Collections.emptySet()); + return shardingColumnsMap.getOrDefault(new ShardingSphereIdentifier(logicTableName), Collections.emptySet()); } /** @@ -85,6 +87,6 @@ public Optional findSchemaName(final String logicTableName) { */ public Collection getQualifiedTables() { return shardingColumnsMap.keySet().stream() - .map(CaseInsensitiveIdentifier::toString).map(each -> new CaseInsensitiveQualifiedTable(tableAndSchemaNameMapper.getSchemaName(each), each)).collect(Collectors.toList()); + .map(ShardingSphereIdentifier::getValue).map(each -> new CaseInsensitiveQualifiedTable(tableAndSchemaNameMapper.getSchemaName(each), each)).collect(Collectors.toList()); } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/ShardingColumnsExtractor.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/ShardingColumnsExtractor.java index 276b8581c7062..951227516d082 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/ShardingColumnsExtractor.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/ShardingColumnsExtractor.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.data.pipeline.core.util; -import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier; +import org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier; import org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration; import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration; import org.apache.shardingsphere.sharding.api.config.rule.ShardingAutoTableRuleConfiguration; @@ -48,39 +48,37 @@ public final class ShardingColumnsExtractor { * @param logicTableNames logic table names * @return sharding columns map */ - public Map> getShardingColumnsMap(final Collection yamlRuleConfigs, final Set logicTableNames) { + public Map> getShardingColumnsMap(final Collection yamlRuleConfigs, final Collection logicTableNames) { Optional shardingRuleConfig = ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(yamlRuleConfigs); if (!shardingRuleConfig.isPresent()) { return Collections.emptyMap(); } Set defaultDatabaseShardingColumns = extractShardingColumns(shardingRuleConfig.get().getDefaultDatabaseShardingStrategy()); Set defaultTableShardingColumns = extractShardingColumns(shardingRuleConfig.get().getDefaultTableShardingStrategy()); - Map> result = new ConcurrentHashMap<>(shardingRuleConfig.get().getTables().size(), 1F); + // TODO check is it need to be ConcurrentHashMap? + // TODO check is it need to be ShardingSphereIdentifier with column names? + Map> result = new ConcurrentHashMap<>(shardingRuleConfig.get().getTables().size(), 1F); for (ShardingTableRuleConfiguration each : shardingRuleConfig.get().getTables()) { - CaseInsensitiveIdentifier logicTableName = new CaseInsensitiveIdentifier(each.getLogicTable()); - if (!logicTableNames.contains(logicTableName)) { - continue; + ShardingSphereIdentifier logicTableName = new ShardingSphereIdentifier(each.getLogicTable()); + if (logicTableNames.contains(logicTableName)) { + Set shardingColumns = new HashSet<>(); + shardingColumns.addAll(null == each.getDatabaseShardingStrategy() ? defaultDatabaseShardingColumns : extractShardingColumns(each.getDatabaseShardingStrategy())); + shardingColumns.addAll(null == each.getTableShardingStrategy() ? defaultTableShardingColumns : extractShardingColumns(each.getTableShardingStrategy())); + result.put(logicTableName, shardingColumns); } - Set shardingColumns = new HashSet<>(); - shardingColumns.addAll(null == each.getDatabaseShardingStrategy() ? defaultDatabaseShardingColumns : extractShardingColumns(each.getDatabaseShardingStrategy())); - shardingColumns.addAll(null == each.getTableShardingStrategy() ? defaultTableShardingColumns : extractShardingColumns(each.getTableShardingStrategy())); - result.put(logicTableName, shardingColumns); } for (ShardingAutoTableRuleConfiguration each : shardingRuleConfig.get().getAutoTables()) { - CaseInsensitiveIdentifier logicTableName = new CaseInsensitiveIdentifier(each.getLogicTable()); - if (!logicTableNames.contains(logicTableName)) { - continue; + ShardingSphereIdentifier logicTableName = new ShardingSphereIdentifier(each.getLogicTable()); + if (logicTableNames.contains(logicTableName)) { + result.put(logicTableName, extractShardingColumns(each.getShardingStrategy())); } - ShardingStrategyConfiguration shardingStrategy = each.getShardingStrategy(); - Set shardingColumns = new HashSet<>(extractShardingColumns(shardingStrategy)); - result.put(logicTableName, shardingColumns); } return result; } private Set extractShardingColumns(final ShardingStrategyConfiguration shardingStrategy) { if (shardingStrategy instanceof StandardShardingStrategyConfiguration) { - return new HashSet<>(Collections.singleton(((StandardShardingStrategyConfiguration) shardingStrategy).getShardingColumn())); + return Collections.singleton(((StandardShardingStrategyConfiguration) shardingStrategy).getShardingColumn()); } if (shardingStrategy instanceof ComplexShardingStrategyConfiguration) { return new HashSet<>(Arrays.asList(((ComplexShardingStrategyConfiguration) shardingStrategy).getShardingColumns().split(","))); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java index 7a11194544667..03cab1ef778f6 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java @@ -64,7 +64,7 @@ import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask; import org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor; import org.apache.shardingsphere.elasticjob.api.ShardingContext; -import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier; +import org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; import java.util.Collection; @@ -149,8 +149,8 @@ private ImporterConfiguration buildImporterConfiguration(final CDCJobConfigurati final Collection schemaTableNames, final TableAndSchemaNameMapper mapper) { PipelineDataSourceConfiguration dataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance( jobConfig.getDataSourceConfig().getType(), jobConfig.getDataSourceConfig().getParameter()); - Map> shardingColumnsMap = new ShardingColumnsExtractor() - .getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(), schemaTableNames.stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet())); + Map> shardingColumnsMap = new ShardingColumnsExtractor() + .getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(), schemaTableNames.stream().map(ShardingSphereIdentifier::new).collect(Collectors.toSet())); PipelineWriteConfiguration write = pipelineProcessConfig.getWrite(); JobRateLimitAlgorithm writeRateLimitAlgorithm = null == write.getRateLimiter() ? null : TypedSPILoader.getService(JobRateLimitAlgorithm.class, write.getRateLimiter().getType(), write.getRateLimiter().getProps()); diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallback.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallback.java index 2fde01e7632a0..cb1f86c252e7f 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallback.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallback.java @@ -40,8 +40,8 @@ import org.apache.shardingsphere.data.pipeline.scenario.migration.preparer.MigrationJobPreparer; import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry; import org.apache.shardingsphere.infra.datanode.DataNode; -import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier; import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable; +import org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier; import java.sql.SQLException; import java.util.Collection; @@ -65,8 +65,8 @@ public MigrationJobItemContext buildJobItemContext(final MigrationJobConfigurati private MigrationTaskConfiguration buildTaskConfiguration(final MigrationJobConfiguration jobConfig, final int jobShardingItem, final PipelineProcessConfiguration processConfig) { IncrementalDumperContext incrementalDumperContext = new MigrationIncrementalDumperContextCreator(jobConfig).createDumperContext(jobConfig.getJobDataNodeLine(jobShardingItem)); Collection createTableConfigs = buildCreateTableConfigurations(jobConfig, incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper()); - Set targetTableNames = jobConfig.getTargetTableNames().stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet()); - Map> shardingColumnsMap = new ShardingColumnsExtractor().getShardingColumnsMap( + Set targetTableNames = jobConfig.getTargetTableNames().stream().map(ShardingSphereIdentifier::new).collect(Collectors.toSet()); + Map> shardingColumnsMap = new ShardingColumnsExtractor().getShardingColumnsMap( ((ShardingSpherePipelineDataSourceConfiguration) jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames); ImporterConfiguration importerConfig = buildImporterConfiguration(jobConfig, processConfig, shardingColumnsMap, incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper()); return new MigrationTaskConfiguration(incrementalDumperContext.getCommonContext().getDataSourceName(), createTableConfigs, incrementalDumperContext, importerConfig); @@ -86,7 +86,7 @@ private CreateTableConfiguration getCreateTableConfiguration(final MigrationJobC } private ImporterConfiguration buildImporterConfiguration(final MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig, - final Map> shardingColumnsMap, final TableAndSchemaNameMapper mapper) { + final Map> shardingColumnsMap, final TableAndSchemaNameMapper mapper) { int batchSize = pipelineProcessConfig.getWrite().getBatchSize(); JobRateLimitAlgorithm writeRateLimitAlgorithm = new TransmissionProcessContext(jobConfig.getJobId(), pipelineProcessConfig).getWriteRateLimitAlgorithm(); int retryTimes = jobConfig.getRetryTimes(); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java index eb3b103885463..94ace243fbe84 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java @@ -52,10 +52,10 @@ import org.apache.shardingsphere.infra.datanode.DataNode; import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext; import org.apache.shardingsphere.infra.instance.metadata.InstanceType; -import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier; import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable; import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereColumn; import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable; +import org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration; import org.apache.shardingsphere.infra.yaml.config.swapper.mode.YamlModeConfigurationSwapper; @@ -210,8 +210,8 @@ private static PipelineProcessConfiguration mockPipelineProcessConfiguration() { private static MigrationTaskConfiguration buildTaskConfiguration(final MigrationJobConfiguration jobConfig, final int jobShardingItem, final PipelineProcessConfiguration processConfig) { IncrementalDumperContext incrementalDumperContext = new MigrationIncrementalDumperContextCreator(jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem)); Collection createTableConfigs = buildCreateTableConfigurations(jobConfig, incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper()); - Set targetTableNames = jobConfig.getTargetTableNames().stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet()); - Map> shardingColumnsMap = new ShardingColumnsExtractor().getShardingColumnsMap( + Set targetTableNames = jobConfig.getTargetTableNames().stream().map(ShardingSphereIdentifier::new).collect(Collectors.toSet()); + Map> shardingColumnsMap = new ShardingColumnsExtractor().getShardingColumnsMap( ((ShardingSpherePipelineDataSourceConfiguration) jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames); ImporterConfiguration importerConfig = buildImporterConfiguration(jobConfig, processConfig, shardingColumnsMap, incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper()); return new MigrationTaskConfiguration(incrementalDumperContext.getCommonContext().getDataSourceName(), createTableConfigs, incrementalDumperContext, importerConfig); @@ -233,7 +233,7 @@ private static Collection buildCreateTableConfiguratio } private static ImporterConfiguration buildImporterConfiguration(final MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig, - final Map> shardingColumnsMap, final TableAndSchemaNameMapper tableAndSchemaNameMapper) { + final Map> shardingColumnsMap, final TableAndSchemaNameMapper tableAndSchemaNameMapper) { int batchSize = pipelineProcessConfig.getWrite().getBatchSize(); JobRateLimitAlgorithm writeRateLimitAlgorithm = new TransmissionProcessContext(jobConfig.getJobId(), pipelineProcessConfig).getWriteRateLimitAlgorithm(); int retryTimes = jobConfig.getRetryTimes(); From a1ccf309b1b5fcb53e289a2454f9fee0d488421e Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sat, 7 Dec 2024 12:21:54 +0800 Subject: [PATCH 3/4] Use ShardingSphereIdentifier on ShardingColumnsExtractor --- .../pipeline/core/importer/ImporterConfigurationTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfigurationTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfigurationTest.java index e7725dcd5644f..d0bbb7024aa1a 100644 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfigurationTest.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfigurationTest.java @@ -21,8 +21,8 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; -import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier; import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable; +import org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; import org.junit.jupiter.api.Test; @@ -39,7 +39,7 @@ class ImporterConfigurationTest { @Test void assertGetShardingColumns() { ImporterConfiguration importerConfig = new ImporterConfiguration( - mock(PipelineDataSourceConfiguration.class), Collections.singletonMap(new CaseInsensitiveIdentifier("foo_tbl"), Collections.singleton("foo_col")), + mock(PipelineDataSourceConfiguration.class), Collections.singletonMap(new ShardingSphereIdentifier("foo_tbl"), Collections.singleton("foo_col")), mock(TableAndSchemaNameMapper.class), 1, mock(JobRateLimitAlgorithm.class), 1, 1); assertThat(importerConfig.getShardingColumns("foo_tbl"), is(Collections.singleton("foo_col"))); } @@ -57,7 +57,7 @@ void assertGetQualifiedTables() { TableAndSchemaNameMapper tableAndSchemaNameMapper = mock(TableAndSchemaNameMapper.class); when(tableAndSchemaNameMapper.getSchemaName("foo_tbl")).thenReturn("foo_schema"); ImporterConfiguration importerConfig = new ImporterConfiguration( - mock(PipelineDataSourceConfiguration.class), Collections.singletonMap(new CaseInsensitiveIdentifier("foo_tbl"), Collections.singleton("foo_col")), + mock(PipelineDataSourceConfiguration.class), Collections.singletonMap(new ShardingSphereIdentifier("foo_tbl"), Collections.singleton("foo_col")), tableAndSchemaNameMapper, 1, mock(JobRateLimitAlgorithm.class), 1, 1); assertThat(importerConfig.getQualifiedTables(), is(Collections.singletonList(new CaseInsensitiveQualifiedTable("foo_schema", "foo_tbl")))); } From 0de7e6d2a5f23d9fd5d5358a1c4e18795d8f1566 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sat, 7 Dec 2024 12:25:08 +0800 Subject: [PATCH 4/4] Use ShardingSphereIdentifier on ShardingColumnsExtractor --- .../core/importer/sink/type/PipelineDataSourceSinkTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSinkTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSinkTest.java index 53e13552ac995..b15284017af35 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSinkTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSinkTest.java @@ -32,7 +32,7 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord; import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn; import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record; -import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier; +import org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier; import org.apache.shardingsphere.test.it.data.pipeline.core.fixture.algorithm.FixtureTransmissionJobItemContext; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -90,7 +90,7 @@ void setUp() throws SQLException { } private ImporterConfiguration mockImporterConfiguration() { - Map> shardingColumnsMap = Collections.singletonMap(new CaseInsensitiveIdentifier("test_table"), Collections.singleton("user")); + Map> shardingColumnsMap = Collections.singletonMap(new ShardingSphereIdentifier("test_table"), Collections.singleton("user")); return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, new TableAndSchemaNameMapper(Collections.emptyMap()), 1000, null, 3, 3); }