From d69b3f13d0b5424f210d852146cb3680108c734b Mon Sep 17 00:00:00 2001 From: Liang Zhang Date: Thu, 14 Dec 2023 23:34:27 +0800 Subject: [PATCH] Move PrepareTargetSchemasParameter and PrepareTargetTablesParameter (#29405) --- .../preparer/PipelineJobPreparerUtils.java | 4 +-- .../AbstractDataSourcePreparer.java | 34 +++++-------------- .../datasource/DataSourcePreparer.java | 2 ++ .../PrepareTargetSchemasParameter.java | 2 +- .../PrepareTargetTablesParameter.java | 2 +- .../AbstractDataSourcePreparerTest.java | 1 + .../datasource/MySQLDataSourcePreparer.java | 4 +-- .../OpenGaussDataSourcePreparer.java | 6 ++-- .../PostgreSQLDataSourcePreparer.java | 4 +-- .../preparer/MigrationJobPreparer.java | 4 +-- .../core/fixture/H2DataSourcePreparer.java | 4 +-- 11 files changed, 26 insertions(+), 41 deletions(-) rename kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/{ => param}/PrepareTargetSchemasParameter.java (98%) rename kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/{ => param}/PrepareTargetTablesParameter.java (98%) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java index 3d9df076335f4..d17716d851762 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java @@ -31,8 +31,8 @@ import org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress; import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourceCheckEngine; import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePreparer; -import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PrepareTargetSchemasParameter; -import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PrepareTargetTablesParameter; +import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter; +import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter; import org.apache.shardingsphere.data.pipeline.core.spi.ingest.dumper.IncrementalDumperCreator; import org.apache.shardingsphere.data.pipeline.core.spi.ingest.position.PositionInitializer; import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java index eb8c4bab0281b..e4323adfe8f9b 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java @@ -21,8 +21,8 @@ import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; -import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper; import org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator; +import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter; import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder; import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; @@ -55,11 +55,10 @@ public void prepareTargetSchemas(final PrepareTargetSchemasParameter param) thro if (!dialectDatabaseMetaData.isSchemaAvailable()) { return; } - Collection createTableConfigs = param.getCreateTableConfigurations(); String defaultSchema = dialectDatabaseMetaData.getDefaultSchema().orElse(null); PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(targetDatabaseType); Collection createdSchemaNames = new HashSet<>(); - for (CreateTableConfiguration each : createTableConfigs) { + for (CreateTableConfiguration each : param.getCreateTableConfigurations()) { String targetSchemaName = each.getTargetName().getSchemaName().toString(); if (null == targetSchemaName || targetSchemaName.equalsIgnoreCase(defaultSchema) || createdSchemaNames.contains(targetSchemaName)) { continue; @@ -73,38 +72,21 @@ public void prepareTargetSchemas(final PrepareTargetSchemasParameter param) thro } private void executeCreateSchema(final PipelineDataSourceManager dataSourceManager, final PipelineDataSourceConfiguration targetDataSourceConfig, final String sql) throws SQLException { - log.info("prepareTargetSchemas, sql={}", sql); - try (Connection connection = getCachedDataSource(dataSourceManager, targetDataSourceConfig).getConnection()) { - try (Statement statement = connection.createStatement()) { - statement.execute(sql); - } + log.info("Prepare target schemas SQL: {}", sql); + try ( + Connection connection = dataSourceManager.getDataSource(targetDataSourceConfig).getConnection(); + Statement statement = connection.createStatement()) { + statement.execute(sql); } } - protected final PipelineDataSourceWrapper getCachedDataSource(final PipelineDataSourceManager dataSourceManager, final PipelineDataSourceConfiguration dataSourceConfig) { - return dataSourceManager.getDataSource(dataSourceConfig); - } - - /** - * Execute target table SQL. - * - * @param targetConnection target connection - * @param sql SQL - * @throws SQLException SQL exception - */ protected void executeTargetTableSQL(final Connection targetConnection, final String sql) throws SQLException { - log.info("execute target table sql: {}", sql); + log.info("Execute target table SQL: {}", sql); try (Statement statement = targetConnection.createStatement()) { statement.execute(sql); } } - /** - * Add if not exists for create table SQL. - * - * @param createTableSQL create table SQL - * @return create table if not existed SQL - */ protected final String addIfNotExistsForCreateTableSQL(final String createTableSQL) { if (PATTERN_CREATE_TABLE_IF_NOT_EXISTS.matcher(createTableSQL).find()) { return createTableSQL; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePreparer.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePreparer.java index 3851e4babd22f..3becc7f808950 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePreparer.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePreparer.java @@ -17,6 +17,8 @@ package org.apache.shardingsphere.data.pipeline.core.preparer.datasource; +import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter; +import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter; import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI; import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PrepareTargetSchemasParameter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetSchemasParameter.java similarity index 98% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PrepareTargetSchemasParameter.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetSchemasParameter.java index a367d5a2bb044..7b4cf8e8d3621 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PrepareTargetSchemasParameter.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetSchemasParameter.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.core.preparer.datasource; +package org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param; import lombok.Getter; import lombok.RequiredArgsConstructor; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PrepareTargetTablesParameter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java similarity index 98% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PrepareTargetTablesParameter.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java index cdaac34a2f298..348c931eb0ac7 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PrepareTargetTablesParameter.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.core.preparer.datasource; +package org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param; import lombok.Getter; import lombok.RequiredArgsConstructor; diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparerTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparerTest.java index 05081717dd12f..d736c7286cfcf 100644 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparerTest.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparerTest.java @@ -17,6 +17,7 @@ package org.apache.shardingsphere.data.pipeline.core.preparer.datasource; +import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter; import org.junit.jupiter.api.Test; import java.sql.Connection; diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java index 239ff6704a299..044054b1311c3 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java @@ -20,7 +20,7 @@ import org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.AbstractDataSourcePreparer; -import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PrepareTargetTablesParameter; +import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter; import java.sql.Connection; import java.sql.SQLException; @@ -35,7 +35,7 @@ public void prepareTargetTables(final PrepareTargetTablesParameter param) throws PipelineDataSourceManager dataSourceManager = param.getDataSourceManager(); for (CreateTableConfiguration each : param.getCreateTableConfigurations()) { String createTargetTableSQL = getCreateTargetTableSQL(each, dataSourceManager, param.getSqlParserEngine()); - try (Connection targetConnection = getCachedDataSource(dataSourceManager, each.getTargetDataSourceConfig()).getConnection()) { + try (Connection targetConnection = dataSourceManager.getDataSource(each.getTargetDataSourceConfig()).getConnection()) { executeTargetTableSQL(targetConnection, addIfNotExistsForCreateTableSQL(createTargetTableSQL)); } } diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java index 005e13d4b31ea..f1efbf108329f 100644 --- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java +++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java @@ -22,8 +22,8 @@ import org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.AbstractDataSourcePreparer; -import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PrepareTargetSchemasParameter; -import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PrepareTargetTablesParameter; +import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter; +import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter; import java.sql.Connection; import java.sql.SQLException; @@ -52,7 +52,7 @@ public void prepareTargetTables(final PrepareTargetTablesParameter param) throws PipelineDataSourceManager dataSourceManager = param.getDataSourceManager(); for (CreateTableConfiguration each : param.getCreateTableConfigurations()) { String createTargetTableSQL = getCreateTargetTableSQL(each, dataSourceManager, param.getSqlParserEngine()); - try (Connection targetConnection = getCachedDataSource(dataSourceManager, each.getTargetDataSourceConfig()).getConnection()) { + try (Connection targetConnection = dataSourceManager.getDataSource(each.getTargetDataSourceConfig()).getConnection()) { for (String sql : Splitter.on(";").trimResults().omitEmptyStrings().splitToList(createTargetTableSQL)) { executeTargetTableSQL(targetConnection, addIfNotExistsForCreateTableSQL(sql)); } diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java index 2f788f7dab0d8..735cf423b84bc 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java +++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java @@ -21,7 +21,7 @@ import org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.AbstractDataSourcePreparer; -import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PrepareTargetTablesParameter; +import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter; import java.sql.Connection; import java.sql.SQLException; @@ -36,7 +36,7 @@ public void prepareTargetTables(final PrepareTargetTablesParameter param) throws PipelineDataSourceManager dataSourceManager = param.getDataSourceManager(); for (CreateTableConfiguration each : param.getCreateTableConfigurations()) { String createTargetTableSQL = getCreateTargetTableSQL(each, dataSourceManager, param.getSqlParserEngine()); - try (Connection targetConnection = getCachedDataSource(dataSourceManager, each.getTargetDataSourceConfig()).getConnection()) { + try (Connection targetConnection = dataSourceManager.getDataSource(each.getTargetDataSourceConfig()).getConnection()) { for (String sql : Splitter.on(";").trimResults().omitEmptyStrings().splitToList(createTargetTableSQL)) { executeTargetTableSQL(targetConnection, sql); } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java index 6dc4aaf45839d..69c4780ab64cd 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java @@ -49,8 +49,8 @@ import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.data.pipeline.core.preparer.InventoryTaskSplitter; import org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils; -import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PrepareTargetSchemasParameter; -import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PrepareTargetTablesParameter; +import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter; +import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter; import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask; import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask; import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils; diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2DataSourcePreparer.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2DataSourcePreparer.java index c3c8b5888a269..af256eed7fae5 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2DataSourcePreparer.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/H2DataSourcePreparer.java @@ -20,7 +20,7 @@ import org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.AbstractDataSourcePreparer; -import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PrepareTargetTablesParameter; +import org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter; import java.sql.Connection; import java.sql.SQLException; @@ -35,7 +35,7 @@ public void prepareTargetTables(final PrepareTargetTablesParameter param) throws PipelineDataSourceManager dataSourceManager = param.getDataSourceManager(); for (CreateTableConfiguration each : param.getCreateTableConfigurations()) { String createTargetTableSQL = getCreateTargetTableSQL(each, dataSourceManager, param.getSqlParserEngine()); - try (Connection targetConnection = getCachedDataSource(dataSourceManager, each.getTargetDataSourceConfig()).getConnection()) { + try (Connection targetConnection = dataSourceManager.getDataSource(each.getTargetDataSourceConfig()).getConnection()) { executeTargetTableSQL(targetConnection, addIfNotExistsForCreateTableSQL(createTargetTableSQL)); } }