From 17881c5e0284bd35c25fe8c663f7149f77e94b54 Mon Sep 17 00:00:00 2001 From: dhercher Date: Thu, 1 Aug 2024 20:38:07 +0000 Subject: [PATCH] merging --- .../BigtableResourceManagerUtils.java | 10 ++++++ v1/README_Cloud_Bigtable_to_GCS_Avro.md | 9 +++-- v1/README_Cloud_Bigtable_to_GCS_Json.md | 9 +++-- v1/README_Cloud_Bigtable_to_GCS_Parquet.md | 9 +++-- .../teleport/bigtable/BigtableToAvro.java | 15 ++++++++ .../teleport/bigtable/BigtableToJson.java | 15 ++++++++ .../teleport/bigtable/BigtableToParquet.java | 15 ++++++++ .../teleport/templates/BigtableToAvroIT.java | 8 ++++- .../teleport/templates/BigtableToJsonIT.java | 15 ++++++-- .../templates/BigtableToParquetIT.java | 8 ++++- v2/sourcedb-to-spanner/README.md | 8 ++--- .../README_Sourcedb_to_Spanner.md | 24 ++++++------- .../README_Sourcedb_to_Spanner_Flex.md | 22 ++++++------ .../v2/options/OptionsToConfigBuilder.java | 5 +-- .../v2/options/SourceDbToSpannerOptions.java | 19 ++++++----- .../v2/templates/PipelineController.java | 6 ++-- .../v2/templates/SourceDbToSpanner.java | 4 +-- .../options/OptionsToConfigBuilderTest.java | 4 +-- .../v2/templates/SourceDbToSpannerITBase.java | 2 +- .../v2/templates/SourceDbToSpannerTest.java | 2 +- .../migrations/schema/SessionBasedMapper.java | 34 ++++++++++++++++--- .../schema/SessionBasedMapperTest.java | 16 ++++++++- 22 files changed, 198 insertions(+), 61 deletions(-) diff --git a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManagerUtils.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManagerUtils.java index de7d2dc15b..23d2191168 100644 --- a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManagerUtils.java +++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManagerUtils.java @@ -18,6 +18,7 @@ package org.apache.beam.it.gcp.bigtable; import static org.apache.beam.it.common.utils.ResourceManagerUtils.generateResourceId; +import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric; import com.google.cloud.bigtable.admin.v2.models.StorageType; import java.time.format.DateTimeFormatter; @@ -185,4 +186,13 @@ static void checkValidTableId(String idToCheck) { + " is not a valid ID. Only letters, numbers, hyphens, underscores and exclamation points are allowed."); } } + + /** + * Generates an app profile id. + * + * @return The app profile id string. + */ + public static String generateAppProfileId() { + return "app_profile_" + randomAlphanumeric(8).toLowerCase() + "_" + System.nanoTime(); + } } diff --git a/v1/README_Cloud_Bigtable_to_GCS_Avro.md b/v1/README_Cloud_Bigtable_to_GCS_Avro.md index b0affe88d2..09d93b37b2 100644 --- a/v1/README_Cloud_Bigtable_to_GCS_Avro.md +++ b/v1/README_Cloud_Bigtable_to_GCS_Avro.md @@ -26,6 +26,7 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat ### Optional parameters +* **bigtableAppProfileId** : The ID of the Bigtable application profile to use for the export. If you don't specify an app profile, Bigtable uses the instance's default app profile: https://cloud.google.com/bigtable/docs/app-profiles#default-app-profile. @@ -113,6 +114,7 @@ export OUTPUT_DIRECTORY= export FILENAME_PREFIX=part ### Optional +export BIGTABLE_APP_PROFILE_ID=default gcloud dataflow jobs run "cloud-bigtable-to-gcs-avro-job" \ --project "$PROJECT" \ @@ -122,7 +124,8 @@ gcloud dataflow jobs run "cloud-bigtable-to-gcs-avro-job" \ --parameters "bigtableInstanceId=$BIGTABLE_INSTANCE_ID" \ --parameters "bigtableTableId=$BIGTABLE_TABLE_ID" \ --parameters "outputDirectory=$OUTPUT_DIRECTORY" \ - --parameters "filenamePrefix=$FILENAME_PREFIX" + --parameters "filenamePrefix=$FILENAME_PREFIX" \ + --parameters "bigtableAppProfileId=$BIGTABLE_APP_PROFILE_ID" ``` For more information about the command, please check: @@ -148,6 +151,7 @@ export OUTPUT_DIRECTORY= export FILENAME_PREFIX=part ### Optional +export BIGTABLE_APP_PROFILE_ID=default mvn clean package -PtemplatesRun \ -DskipTests \ @@ -156,7 +160,7 @@ mvn clean package -PtemplatesRun \ -Dregion="$REGION" \ -DjobName="cloud-bigtable-to-gcs-avro-job" \ -DtemplateName="Cloud_Bigtable_to_GCS_Avro" \ --Dparameters="bigtableProjectId=$BIGTABLE_PROJECT_ID,bigtableInstanceId=$BIGTABLE_INSTANCE_ID,bigtableTableId=$BIGTABLE_TABLE_ID,outputDirectory=$OUTPUT_DIRECTORY,filenamePrefix=$FILENAME_PREFIX" \ +-Dparameters="bigtableProjectId=$BIGTABLE_PROJECT_ID,bigtableInstanceId=$BIGTABLE_INSTANCE_ID,bigtableTableId=$BIGTABLE_TABLE_ID,outputDirectory=$OUTPUT_DIRECTORY,filenamePrefix=$FILENAME_PREFIX,bigtableAppProfileId=$BIGTABLE_APP_PROFILE_ID" \ -f v1 ``` @@ -207,6 +211,7 @@ resource "google_dataflow_job" "cloud_bigtable_to_gcs_avro" { bigtableTableId = "" outputDirectory = "gs://mybucket/somefolder" filenamePrefix = "part" + # bigtableAppProfileId = "default" } } ``` diff --git a/v1/README_Cloud_Bigtable_to_GCS_Json.md b/v1/README_Cloud_Bigtable_to_GCS_Json.md index a9a330b983..a7a56a7292 100644 --- a/v1/README_Cloud_Bigtable_to_GCS_Json.md +++ b/v1/README_Cloud_Bigtable_to_GCS_Json.md @@ -27,6 +27,7 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat * **outputDirectory** : The Cloud Storage path where the output JSON files are stored. (Example: gs://your-bucket/your-path/). * **userOption** : Possible values are `FLATTEN` or `NONE`. `FLATTEN` flattens the row to the single level. `NONE` stores the whole row as a JSON string. Defaults to `NONE`. * **columnsAliases** : A comma-separated list of columns that are required for the Vertex AI Vector Search index. The columns `id` and `embedding` are required for Vertex AI Vector Search. You can use the notation `fromfamily:fromcolumn;to`. For example, if the columns are `rowkey` and `cf:my_embedding`, where `rowkey` has a different name than the embedding column, specify `cf:my_embedding;embedding` and, `rowkey;id`. Only use this option when the value for `userOption` is `FLATTEN`. +* **bigtableAppProfileId** : The ID of the Bigtable application profile to use for the export. If you don't specify an app profile, Bigtable uses the instance's default app profile: https://cloud.google.com/bigtable/docs/app-profiles#default-app-profile. @@ -116,6 +117,7 @@ export FILENAME_PREFIX=part export OUTPUT_DIRECTORY= export USER_OPTION=NONE export COLUMNS_ALIASES= +export BIGTABLE_APP_PROFILE_ID=default gcloud dataflow jobs run "cloud-bigtable-to-gcs-json-job" \ --project "$PROJECT" \ @@ -127,7 +129,8 @@ gcloud dataflow jobs run "cloud-bigtable-to-gcs-json-job" \ --parameters "outputDirectory=$OUTPUT_DIRECTORY" \ --parameters "filenamePrefix=$FILENAME_PREFIX" \ --parameters "userOption=$USER_OPTION" \ - --parameters "columnsAliases=$COLUMNS_ALIASES" + --parameters "columnsAliases=$COLUMNS_ALIASES" \ + --parameters "bigtableAppProfileId=$BIGTABLE_APP_PROFILE_ID" ``` For more information about the command, please check: @@ -155,6 +158,7 @@ export FILENAME_PREFIX=part export OUTPUT_DIRECTORY= export USER_OPTION=NONE export COLUMNS_ALIASES= +export BIGTABLE_APP_PROFILE_ID=default mvn clean package -PtemplatesRun \ -DskipTests \ @@ -163,7 +167,7 @@ mvn clean package -PtemplatesRun \ -Dregion="$REGION" \ -DjobName="cloud-bigtable-to-gcs-json-job" \ -DtemplateName="Cloud_Bigtable_to_GCS_Json" \ --Dparameters="bigtableProjectId=$BIGTABLE_PROJECT_ID,bigtableInstanceId=$BIGTABLE_INSTANCE_ID,bigtableTableId=$BIGTABLE_TABLE_ID,outputDirectory=$OUTPUT_DIRECTORY,filenamePrefix=$FILENAME_PREFIX,userOption=$USER_OPTION,columnsAliases=$COLUMNS_ALIASES" \ +-Dparameters="bigtableProjectId=$BIGTABLE_PROJECT_ID,bigtableInstanceId=$BIGTABLE_INSTANCE_ID,bigtableTableId=$BIGTABLE_TABLE_ID,outputDirectory=$OUTPUT_DIRECTORY,filenamePrefix=$FILENAME_PREFIX,userOption=$USER_OPTION,columnsAliases=$COLUMNS_ALIASES,bigtableAppProfileId=$BIGTABLE_APP_PROFILE_ID" \ -f v1 ``` @@ -216,6 +220,7 @@ resource "google_dataflow_job" "cloud_bigtable_to_gcs_json" { # outputDirectory = "gs://your-bucket/your-path/" # userOption = "NONE" # columnsAliases = "" + # bigtableAppProfileId = "default" } } ``` diff --git a/v1/README_Cloud_Bigtable_to_GCS_Parquet.md b/v1/README_Cloud_Bigtable_to_GCS_Parquet.md index 0729294bce..cd005cd87d 100644 --- a/v1/README_Cloud_Bigtable_to_GCS_Parquet.md +++ b/v1/README_Cloud_Bigtable_to_GCS_Parquet.md @@ -27,6 +27,7 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat ### Optional parameters * **numShards** : The maximum number of output shards produced when writing. A higher number of shards means higher throughput for writing to Cloud Storage, but potentially higher data aggregation cost across shards when processing output Cloud Storage files. The default value is decided by Dataflow. +* **bigtableAppProfileId** : The ID of the Bigtable application profile to use for the export. If you don't specify an app profile, Bigtable uses the instance's default app profile: https://cloud.google.com/bigtable/docs/app-profiles#default-app-profile. @@ -115,6 +116,7 @@ export FILENAME_PREFIX=part ### Optional export NUM_SHARDS=0 +export BIGTABLE_APP_PROFILE_ID=default gcloud dataflow jobs run "cloud-bigtable-to-gcs-parquet-job" \ --project "$PROJECT" \ @@ -125,7 +127,8 @@ gcloud dataflow jobs run "cloud-bigtable-to-gcs-parquet-job" \ --parameters "bigtableTableId=$BIGTABLE_TABLE_ID" \ --parameters "outputDirectory=$OUTPUT_DIRECTORY" \ --parameters "filenamePrefix=$FILENAME_PREFIX" \ - --parameters "numShards=$NUM_SHARDS" + --parameters "numShards=$NUM_SHARDS" \ + --parameters "bigtableAppProfileId=$BIGTABLE_APP_PROFILE_ID" ``` For more information about the command, please check: @@ -152,6 +155,7 @@ export FILENAME_PREFIX=part ### Optional export NUM_SHARDS=0 +export BIGTABLE_APP_PROFILE_ID=default mvn clean package -PtemplatesRun \ -DskipTests \ @@ -160,7 +164,7 @@ mvn clean package -PtemplatesRun \ -Dregion="$REGION" \ -DjobName="cloud-bigtable-to-gcs-parquet-job" \ -DtemplateName="Cloud_Bigtable_to_GCS_Parquet" \ --Dparameters="bigtableProjectId=$BIGTABLE_PROJECT_ID,bigtableInstanceId=$BIGTABLE_INSTANCE_ID,bigtableTableId=$BIGTABLE_TABLE_ID,outputDirectory=$OUTPUT_DIRECTORY,filenamePrefix=$FILENAME_PREFIX,numShards=$NUM_SHARDS" \ +-Dparameters="bigtableProjectId=$BIGTABLE_PROJECT_ID,bigtableInstanceId=$BIGTABLE_INSTANCE_ID,bigtableTableId=$BIGTABLE_TABLE_ID,outputDirectory=$OUTPUT_DIRECTORY,filenamePrefix=$FILENAME_PREFIX,numShards=$NUM_SHARDS,bigtableAppProfileId=$BIGTABLE_APP_PROFILE_ID" \ -f v1 ``` @@ -212,6 +216,7 @@ resource "google_dataflow_job" "cloud_bigtable_to_gcs_parquet" { outputDirectory = "" filenamePrefix = "part" # numShards = "0" + # bigtableAppProfileId = "default" } } ``` diff --git a/v1/src/main/java/com/google/cloud/teleport/bigtable/BigtableToAvro.java b/v1/src/main/java/com/google/cloud/teleport/bigtable/BigtableToAvro.java index 644645ec32..18d0c3cce3 100644 --- a/v1/src/main/java/com/google/cloud/teleport/bigtable/BigtableToAvro.java +++ b/v1/src/main/java/com/google/cloud/teleport/bigtable/BigtableToAvro.java @@ -128,6 +128,20 @@ public interface Options extends PipelineOptions { @SuppressWarnings("unused") void setFilenamePrefix(ValueProvider filenamePrefix); + + @TemplateParameter.Text( + order = 6, + groupName = "Source", + optional = true, + regexes = {"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"}, + description = "Application profile ID", + helpText = + "The ID of the Bigtable application profile to use for the export. If you don't specify an app profile, Bigtable uses the instance's default app profile: https://cloud.google.com/bigtable/docs/app-profiles#default-app-profile.") + @Default.String("default") + ValueProvider getBigtableAppProfileId(); + + @SuppressWarnings("unused") + void setBigtableAppProfileId(ValueProvider appProfileId); } /** @@ -153,6 +167,7 @@ public static PipelineResult run(Options options) { BigtableIO.read() .withProjectId(options.getBigtableProjectId()) .withInstanceId(options.getBigtableInstanceId()) + .withAppProfileId(options.getBigtableAppProfileId()) .withTableId(options.getBigtableTableId()); // Do not validate input fields if it is running as a template. diff --git a/v1/src/main/java/com/google/cloud/teleport/bigtable/BigtableToJson.java b/v1/src/main/java/com/google/cloud/teleport/bigtable/BigtableToJson.java index 47bdf73483..3999e5f20c 100644 --- a/v1/src/main/java/com/google/cloud/teleport/bigtable/BigtableToJson.java +++ b/v1/src/main/java/com/google/cloud/teleport/bigtable/BigtableToJson.java @@ -160,6 +160,20 @@ public interface Options extends PipelineOptions { @SuppressWarnings("unused") void setColumnsAliases(ValueProvider value); + + @TemplateParameter.Text( + order = 8, + groupName = "Source", + optional = true, + regexes = {"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"}, + description = "Application profile ID", + helpText = + "The ID of the Bigtable application profile to use for the export. If you don't specify an app profile, Bigtable uses the instance's default app profile: https://cloud.google.com/bigtable/docs/app-profiles#default-app-profile.") + @Default.String("default") + ValueProvider getBigtableAppProfileId(); + + @SuppressWarnings("unused") + void setBigtableAppProfileId(ValueProvider appProfileId); } /** @@ -186,6 +200,7 @@ public static PipelineResult run(Options options) { BigtableIO.read() .withProjectId(options.getBigtableProjectId()) .withInstanceId(options.getBigtableInstanceId()) + .withAppProfileId(options.getBigtableAppProfileId()) .withTableId(options.getBigtableTableId()); // Do not validate input fields if it is running as a template. diff --git a/v1/src/main/java/com/google/cloud/teleport/bigtable/BigtableToParquet.java b/v1/src/main/java/com/google/cloud/teleport/bigtable/BigtableToParquet.java index 1336207754..6b93aab2b6 100644 --- a/v1/src/main/java/com/google/cloud/teleport/bigtable/BigtableToParquet.java +++ b/v1/src/main/java/com/google/cloud/teleport/bigtable/BigtableToParquet.java @@ -141,6 +141,20 @@ public interface Options extends PipelineOptions { @SuppressWarnings("unused") void setNumShards(ValueProvider numShards); + + @TemplateParameter.Text( + order = 7, + groupName = "Source", + optional = true, + regexes = {"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"}, + description = "Application profile ID", + helpText = + "The ID of the Bigtable application profile to use for the export. If you don't specify an app profile, Bigtable uses the instance's default app profile: https://cloud.google.com/bigtable/docs/app-profiles#default-app-profile.") + @Default.String("default") + ValueProvider getBigtableAppProfileId(); + + @SuppressWarnings("unused") + void setBigtableAppProfileId(ValueProvider appProfileId); } /** @@ -170,6 +184,7 @@ public static PipelineResult run(Options options) { BigtableIO.read() .withProjectId(options.getBigtableProjectId()) .withInstanceId(options.getBigtableInstanceId()) + .withAppProfileId(options.getBigtableAppProfileId()) .withTableId(options.getBigtableTableId()); // Do not validate input fields if it is running as a template. diff --git a/v1/src/test/java/com/google/cloud/teleport/templates/BigtableToAvroIT.java b/v1/src/test/java/com/google/cloud/teleport/templates/BigtableToAvroIT.java index 694ca51823..7ca3a2c2d0 100644 --- a/v1/src/test/java/com/google/cloud/teleport/templates/BigtableToAvroIT.java +++ b/v1/src/test/java/com/google/cloud/teleport/templates/BigtableToAvroIT.java @@ -17,6 +17,7 @@ import static com.google.common.truth.Truth.assertThat; import static org.apache.beam.it.gcp.artifacts.matchers.ArtifactAsserts.assertThatGenericRecords; +import static org.apache.beam.it.gcp.bigtable.BigtableResourceManagerUtils.generateAppProfileId; import static org.apache.beam.it.gcp.bigtable.BigtableResourceManagerUtils.generateTableId; import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline; import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult; @@ -116,6 +117,10 @@ public void testBigtableToAvro() throws IOException { String tableId = generateTableId(testName); bigtableResourceManager.createTable(tableId, ImmutableList.of("family1", "family2")); + String appProfileId = generateAppProfileId(); + bigtableResourceManager.createAppProfile( + appProfileId, false, bigtableResourceManager.getClusterNames()); + long timestamp = System.currentTimeMillis() * 1000; bigtableResourceManager.write( ImmutableList.of( @@ -130,7 +135,8 @@ public void testBigtableToAvro() throws IOException { .addParameter("bigtableInstanceId", bigtableResourceManager.getInstanceId()) .addParameter("bigtableTableId", tableId) .addParameter("outputDirectory", getGcsPath("output/")) - .addParameter("filenamePrefix", "bigtable-to-avro-output-"); + .addParameter("filenamePrefix", "bigtable-to-avro-output-") + .addParameter("bigtableAppProfileId", appProfileId); // Act PipelineLauncher.LaunchInfo info = launchTemplate(options); diff --git a/v1/src/test/java/com/google/cloud/teleport/templates/BigtableToJsonIT.java b/v1/src/test/java/com/google/cloud/teleport/templates/BigtableToJsonIT.java index eab625a77e..b382da136b 100644 --- a/v1/src/test/java/com/google/cloud/teleport/templates/BigtableToJsonIT.java +++ b/v1/src/test/java/com/google/cloud/teleport/templates/BigtableToJsonIT.java @@ -17,6 +17,7 @@ import static com.google.common.truth.Truth.assertThat; import static org.apache.beam.it.gcp.artifacts.matchers.ArtifactAsserts.assertThatArtifacts; +import static org.apache.beam.it.gcp.bigtable.BigtableResourceManagerUtils.generateAppProfileId; import static org.apache.beam.it.gcp.bigtable.BigtableResourceManagerUtils.generateTableId; import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline; import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult; @@ -68,6 +69,10 @@ public void testUnflattenedBigtableToJson() throws IOException { String tableId = generateTableId(testName); bigtableResourceManager.createTable(tableId, ImmutableList.of("col1")); + String appProfileId = generateAppProfileId(); + bigtableResourceManager.createAppProfile( + appProfileId, false, bigtableResourceManager.getClusterNames()); + long timestamp = System.currentTimeMillis() * 1000; bigtableResourceManager.write( ImmutableList.of( @@ -81,7 +86,8 @@ public void testUnflattenedBigtableToJson() throws IOException { .addParameter("bigtableInstanceId", bigtableResourceManager.getInstanceId()) .addParameter("bigtableTableId", tableId) .addParameter("outputDirectory", getGcsPath("output/")) - .addParameter("filenamePrefix", "bigtable-to-json-output-"); + .addParameter("filenamePrefix", "bigtable-to-json-output-") + .addParameter("bigtableAppProfileId", appProfileId); // Act PipelineLauncher.LaunchInfo info = launchTemplate(options); @@ -106,6 +112,10 @@ public void testFlattenedBigtableToJson() throws IOException { String tableId = generateTableId(testName); bigtableResourceManager.createTable(tableId, ImmutableList.of("col1")); + String appProfileId = generateAppProfileId(); + bigtableResourceManager.createAppProfile( + appProfileId, false, bigtableResourceManager.getClusterNames()); + long timestamp = System.currentTimeMillis() * 1000; bigtableResourceManager.write( ImmutableList.of( @@ -120,7 +130,8 @@ public void testFlattenedBigtableToJson() throws IOException { .addParameter("bigtableTableId", tableId) .addParameter("outputDirectory", getGcsPath("output/")) .addParameter("filenamePrefix", "bigtable-to-json-output-") - .addParameter("userOption", "FLATTEN"); + .addParameter("userOption", "FLATTEN") + .addParameter("bigtableAppProfileId", appProfileId); // Act PipelineLauncher.LaunchInfo info = launchTemplate(options); diff --git a/v1/src/test/java/com/google/cloud/teleport/templates/BigtableToParquetIT.java b/v1/src/test/java/com/google/cloud/teleport/templates/BigtableToParquetIT.java index 53c5f6571e..a4305bafa8 100644 --- a/v1/src/test/java/com/google/cloud/teleport/templates/BigtableToParquetIT.java +++ b/v1/src/test/java/com/google/cloud/teleport/templates/BigtableToParquetIT.java @@ -17,6 +17,7 @@ import static com.google.common.truth.Truth.assertThat; import static org.apache.beam.it.gcp.artifacts.matchers.ArtifactAsserts.assertThatGenericRecords; +import static org.apache.beam.it.gcp.bigtable.BigtableResourceManagerUtils.generateAppProfileId; import static org.apache.beam.it.gcp.bigtable.BigtableResourceManagerUtils.generateTableId; import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline; import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult; @@ -71,6 +72,10 @@ public void testBigtableToParquet() throws IOException { String tableId = generateTableId(testName); bigtableResourceManager.createTable(tableId, ImmutableList.of("col1")); + String appProfileId = generateAppProfileId(); + bigtableResourceManager.createAppProfile( + appProfileId, false, bigtableResourceManager.getClusterNames()); + long timestamp = System.currentTimeMillis() * 1000; bigtableResourceManager.write( ImmutableList.of( @@ -85,7 +90,8 @@ public void testBigtableToParquet() throws IOException { .addParameter("bigtableTableId", tableId) .addParameter("outputDirectory", getGcsPath("output/")) .addParameter("filenamePrefix", "bigtable-to-parquet-output-") - .addParameter("numShards", "1"); + .addParameter("numShards", "1") + .addParameter("bigtableAppProfileId", appProfileId); // Act PipelineLauncher.LaunchInfo info = launchTemplate(options); diff --git a/v2/sourcedb-to-spanner/README.md b/v2/sourcedb-to-spanner/README.md index a7060bb028..80f57b03d6 100644 --- a/v2/sourcedb-to-spanner/README.md +++ b/v2/sourcedb-to-spanner/README.md @@ -60,18 +60,18 @@ mvn test ### Executing Template #### Required Parameters -* **sourceDbURL** (JDBC URL of the source database): The URL which can be used to connect to the source database. (Example: jdbc:mysql://10.10.10.10:3306/testdb) +* **sourceConfigURL** (Configuration to connect to the source database): Can be the JDBC URL or the location of the sharding config. (Example: jdbc:mysql://10.10.10.10:3306/testdb or gs://test1/shard.conf) * **username** (username of the source database): The username which can be used to connect to the source database. * **password** (username of the source database): The username which can be used to connect to the source database. * **instanceId** (Cloud Spanner Instance Id.): The destination Cloud Spanner instance. * **databaseId** (Cloud Spanner Database Id.): The destination Cloud Spanner database. * **projectId** (Cloud Spanner Project Id.): This is the name of the Cloud Spanner project. -* **DLQDirectory** (GCS path of the dead letter queue directory): The GCS path of the DLQ Directory to be used during migrations +* **outputDirectory** (GCS path of the output directory): The GCS path of the directory where all errors and skipped events are dumped to be used during migrations #### Optional Parameters * **jdbcDriverJars** (Comma-separated Cloud Storage path(s) of the JDBC driver(s)): The comma-separated list of driver JAR files. (Example: gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar). * **jdbcDriverClassName** (JDBC driver class name): The JDBC driver class name. (Example: com.mysql.jdbc.Driver). -* **tables** (Comma seperated list of tables to migrate): Tables that will be migrated to Spanner. Leave this empty if all tables are to be migrated. (Example: table1,table2). +* **tables** (Colon seperated list of tables to migrate): Tables that will be migrated to Spanner. Leave this empty if all tables are to be migrated. (Example: table1:table2). * **numPartitions** (Number of partitions to create per table): A table is split into partitions and loaded independently. Use higher number of partitions for larger tables. (Example: 1000). * **spannerHost** (Cloud Spanner Endpoint): Use this endpoint to connect to Spanner. (Example: https://batch-spanner.googleapis.com) * **maxConnections** (Number of connections to create per source database): The max number of connections that can be used at any given time at source. (Example: 100) @@ -84,6 +84,6 @@ export JOB_NAME="${IMAGE_NAME}-`date +%Y%m%d-%H%M%S-%N`" gcloud dataflow flex-template run ${JOB_NAME} \ --project=${PROJECT} --region=us-central1 \ --template-file-gcs-location=${TEMPLATE_IMAGE_SPEC} \ - --parameters sourceDbURL="jdbc:mysql://:3306/",username=,password=,instanceId="",databaseId="",projectId="$PROJECT",DLQDirectory=gs:// \ + --parameters sourceConfigURL="jdbc:mysql://:3306/",username=,password=,instanceId="",databaseId="",projectId="$PROJECT",outputDirectory=gs:// \ --additional-experiments=disable_runner_v2 ``` diff --git a/v2/sourcedb-to-spanner/README_Sourcedb_to_Spanner.md b/v2/sourcedb-to-spanner/README_Sourcedb_to_Spanner.md index cca967a861..f28b830c0c 100644 --- a/v2/sourcedb-to-spanner/README_Sourcedb_to_Spanner.md +++ b/v2/sourcedb-to-spanner/README_Sourcedb_to_Spanner.md @@ -25,18 +25,18 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat ## Parameters #### Required Parameters -* **sourceDbURL** (JDBC URL of the source database): The URL which can be used to connect to the source database. (Example: jdbc:mysql://10.10.10.10:3306/testdb) +* **sourceConfigURL** (Configuration to connect to the source database): Can be the JDBC URL or the location of the sharding config. (Example: jdbc:mysql://10.10.10.10:3306/testdb or gs://test1/shard.conf) * **username** (username of the source database): The username which can be used to connect to the source database. * **password** (username of the source database): The username which can be used to connect to the source database. * **instanceId** (Cloud Spanner Instance Id.): The destination Cloud Spanner instance. * **databaseId** (Cloud Spanner Database Id.): The destination Cloud Spanner database. * **projectId** (Cloud Spanner Project Id.): This is the name of the Cloud Spanner project. -* **DLQDirectory** (GCS path of the dead letter queue directory): The GCS path of the DLQ Directory to be used during migrations +* **outputDirectory** (GCS path of the ouput directory): The GCS path of the Directory where all error and skipped events are dumped to be used during migrations #### Optional Parameters * **jdbcDriverJars** (Comma-separated Cloud Storage path(s) of the JDBC driver(s)): The comma-separated list of driver JAR files. (Example: gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar). * **jdbcDriverClassName** (JDBC driver class name): The JDBC driver class name. (Example: com.mysql.jdbc.Driver). -* **tables** (Comma seperated list of tables to migrate): Tables that will be migrated to Spanner. Leave this empty if all tables are to be migrated. (Example: table1,table2). +* **tables** (Colon seperated list of tables to migrate): Tables that will be migrated to Spanner. Leave this empty if all tables are to be migrated. (Example: table1:table2). * **numPartitions** (Number of partitions to create per table): A table is split into partitions and loaded independently. Use higher number of partitions for larger tables. (Example: 1000). * **spannerHost** (Cloud Spanner Endpoint): Use this endpoint to connect to Spanner. (Example: https://batch-spanner.googleapis.com) * **maxConnections** (Number of connections to create per source database): The max number of connections that can be used at any given time at source. (Example: 100) @@ -124,11 +124,11 @@ export REGION=us-central1 export TEMPLATE_SPEC_GCSPATH="gs://$BUCKET_NAME/templates/flex/Sourcedb_to_Spanner_Flex" ### Required -export SOURCE_DB_URL= +export SOURCE_CONFIG_URL= export INSTANCE_ID= export DATABASE_ID= export PROJECT_ID= -export DLQDIRECTORY= +export OUTPUT_DIRECTORY= ### Optional export JDBC_DRIVER_JARS="" @@ -150,7 +150,7 @@ gcloud dataflow flex-template run "sourcedb-to-spanner-flex-job" \ --template-file-gcs-location "$TEMPLATE_SPEC_GCSPATH" \ --parameters "jdbcDriverJars=$JDBC_DRIVER_JARS" \ --parameters "jdbcDriverClassName=$JDBC_DRIVER_CLASS_NAME" \ - --parameters "sourceDbURL=$SOURCE_DB_URL" \ + --parameters "sourceConfigURL=$SOURCE_CONFIG_URL" \ --parameters "username=$USERNAME" \ --parameters "password=$PASSWORD" \ --parameters "tables=$TABLES" \ @@ -161,7 +161,7 @@ gcloud dataflow flex-template run "sourcedb-to-spanner-flex-job" \ --parameters "spannerHost=$SPANNER_HOST" \ --parameters "maxConnections=$MAX_CONNECTIONS" \ --parameters "sessionFilePath=$SESSION_FILE_PATH" \ - --parameters "DLQDirectory=$DLQDIRECTORY" \ + --parameters "outputDirectory=$OUTPUT_DIRECTORY" \ --parameters "disabledAlgorithms=$DISABLED_ALGORITHMS" \ --parameters "extraFilesToStage=$EXTRA_FILES_TO_STAGE" \ --parameters "defaultLogLevel=$DEFAULT_LOG_LEVEL" @@ -183,11 +183,11 @@ export BUCKET_NAME= export REGION=us-central1 ### Required -export SOURCE_DB_URL= +export SOURCE_CONFIG_URL= export INSTANCE_ID= export DATABASE_ID= export PROJECT_ID= -export DLQDIRECTORY= +export OUTPUT_DIRECTORY= ### Optional export JDBC_DRIVER_JARS="" @@ -210,7 +210,7 @@ mvn clean package -PtemplatesRun \ -Dregion="$REGION" \ -DjobName="sourcedb-to-spanner-flex-job" \ -DtemplateName="Sourcedb_to_Spanner_Flex" \ --Dparameters="jdbcDriverJars=$JDBC_DRIVER_JARS,jdbcDriverClassName=$JDBC_DRIVER_CLASS_NAME,sourceDbURL=$SOURCE_DB_URL,username=$USERNAME,password=$PASSWORD,tables=$TABLES,numPartitions=$NUM_PARTITIONS,instanceId=$INSTANCE_ID,databaseId=$DATABASE_ID,projectId=$PROJECT_ID,spannerHost=$SPANNER_HOST,maxConnections=$MAX_CONNECTIONS,sessionFilePath=$SESSION_FILE_PATH,DLQDirectory=$DLQDIRECTORY,disabledAlgorithms=$DISABLED_ALGORITHMS,extraFilesToStage=$EXTRA_FILES_TO_STAGE,defaultLogLevel=$DEFAULT_LOG_LEVEL" \ +-Dparameters="jdbcDriverJars=$JDBC_DRIVER_JARS,jdbcDriverClassName=$JDBC_DRIVER_CLASS_NAME,sourceConfigURL=$SOURCE_CONFIG_URL,username=$USERNAME,password=$PASSWORD,tables=$TABLES,numPartitions=$NUM_PARTITIONS,instanceId=$INSTANCE_ID,databaseId=$DATABASE_ID,projectId=$PROJECT_ID,spannerHost=$SPANNER_HOST,maxConnections=$MAX_CONNECTIONS,sessionFilePath=$SESSION_FILE_PATH,outputDirectory=$OUTPUT_DIRECTORY,disabledAlgorithms=$DISABLED_ALGORITHMS,extraFilesToStage=$EXTRA_FILES_TO_STAGE,defaultLogLevel=$DEFAULT_LOG_LEVEL" \ -f v2/sourcedb-to-spanner ``` @@ -243,10 +243,10 @@ resource "google_dataflow_flex_template_job" "sourcedb_to_spanner_flex" { instanceId = "" databaseId = "" projectId = "" - sourceDbURL = "jdbc:mysql://some-host:3306/sampledb" + sourceConfigURL = "jdbc:mysql://some-host:3306/sampledb" username = "" password = "" - DLQDirectory = "gs://your-bucket/dir" + outputDirectory = "gs://your-bucket/dir" # jdbcDriverJars = "gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar" # jdbcDriverClassName = "com.mysql.jdbc.Driver" diff --git a/v2/sourcedb-to-spanner/README_Sourcedb_to_Spanner_Flex.md b/v2/sourcedb-to-spanner/README_Sourcedb_to_Spanner_Flex.md index 3f9d2be9f3..cc46029f22 100644 --- a/v2/sourcedb-to-spanner/README_Sourcedb_to_Spanner_Flex.md +++ b/v2/sourcedb-to-spanner/README_Sourcedb_to_Spanner_Flex.md @@ -27,11 +27,11 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat ### Required parameters -* **sourceDbURL** : The JDBC connection URL string. For example, `jdbc:mysql://127.4.5.30:3306/my-db?autoReconnect=true&maxReconnects=10&unicode=true&characterEncoding=UTF-8`. +* **sourceConfigURL** (Configuration to connect to the source database): Can be the JDBC URL or the location of the sharding config. (Example: jdbc:mysql://10.10.10.10:3306/testdb or gs://test1/shard.conf) * **instanceId** : The destination Cloud Spanner instance. * **databaseId** : The destination Cloud Spanner database. * **projectId** : This is the name of the Cloud Spanner project. -* **DLQDirectory** : This directory is used to dump the failed records in a migration. +* **outputDirectory** : This directory is used to dump the failed and skipped records in a migration. ### Optional parameters @@ -125,11 +125,11 @@ export REGION=us-central1 export TEMPLATE_SPEC_GCSPATH="gs://$BUCKET_NAME/templates/flex/Sourcedb_to_Spanner_Flex" ### Required -export SOURCE_DB_URL= +export SOURCE_CONFIG_URL= export INSTANCE_ID= export DATABASE_ID= export PROJECT_ID= -export DLQDIRECTORY= +export OUTPUT_DIRECTORY= ### Optional export JDBC_DRIVER_JARS="" @@ -151,7 +151,7 @@ gcloud dataflow flex-template run "sourcedb-to-spanner-flex-job" \ --template-file-gcs-location "$TEMPLATE_SPEC_GCSPATH" \ --parameters "jdbcDriverJars=$JDBC_DRIVER_JARS" \ --parameters "jdbcDriverClassName=$JDBC_DRIVER_CLASS_NAME" \ - --parameters "sourceDbURL=$SOURCE_DB_URL" \ + --parameters "sourceConfigURL=$SOURCE_CONFIG_URL" \ --parameters "username=$USERNAME" \ --parameters "password=$PASSWORD" \ --parameters "tables=$TABLES" \ @@ -162,7 +162,7 @@ gcloud dataflow flex-template run "sourcedb-to-spanner-flex-job" \ --parameters "spannerHost=$SPANNER_HOST" \ --parameters "maxConnections=$MAX_CONNECTIONS" \ --parameters "sessionFilePath=$SESSION_FILE_PATH" \ - --parameters "DLQDirectory=$DLQDIRECTORY" \ + --parameters "outputDirectory=$OUTPUT_DIRECTORY" \ --parameters "disabledAlgorithms=$DISABLED_ALGORITHMS" \ --parameters "extraFilesToStage=$EXTRA_FILES_TO_STAGE" \ --parameters "defaultLogLevel=$DEFAULT_LOG_LEVEL" @@ -184,11 +184,11 @@ export BUCKET_NAME= export REGION=us-central1 ### Required -export SOURCE_DB_URL= +export SOURCE_CONFIG_URL= export INSTANCE_ID= export DATABASE_ID= export PROJECT_ID= -export DLQDIRECTORY= +export OUTPUT_DIRECTORY= ### Optional export JDBC_DRIVER_JARS="" @@ -211,7 +211,7 @@ mvn clean package -PtemplatesRun \ -Dregion="$REGION" \ -DjobName="sourcedb-to-spanner-flex-job" \ -DtemplateName="Sourcedb_to_Spanner_Flex" \ --Dparameters="jdbcDriverJars=$JDBC_DRIVER_JARS,jdbcDriverClassName=$JDBC_DRIVER_CLASS_NAME,sourceDbURL=$SOURCE_DB_URL,username=$USERNAME,password=$PASSWORD,tables=$TABLES,numPartitions=$NUM_PARTITIONS,instanceId=$INSTANCE_ID,databaseId=$DATABASE_ID,projectId=$PROJECT_ID,spannerHost=$SPANNER_HOST,maxConnections=$MAX_CONNECTIONS,sessionFilePath=$SESSION_FILE_PATH,DLQDirectory=$DLQDIRECTORY,disabledAlgorithms=$DISABLED_ALGORITHMS,extraFilesToStage=$EXTRA_FILES_TO_STAGE,defaultLogLevel=$DEFAULT_LOG_LEVEL" \ +-Dparameters="jdbcDriverJars=$JDBC_DRIVER_JARS,jdbcDriverClassName=$JDBC_DRIVER_CLASS_NAME,sourceConfigURL=$SOURCE_CONFIG_URL,username=$USERNAME,password=$PASSWORD,tables=$TABLES,numPartitions=$NUM_PARTITIONS,instanceId=$INSTANCE_ID,databaseId=$DATABASE_ID,projectId=$PROJECT_ID,spannerHost=$SPANNER_HOST,maxConnections=$MAX_CONNECTIONS,sessionFilePath=$SESSION_FILE_PATH,outputDirectory=$OUTPUT_DIRECTORY,disabledAlgorithms=$DISABLED_ALGORITHMS,extraFilesToStage=$EXTRA_FILES_TO_STAGE,defaultLogLevel=$DEFAULT_LOG_LEVEL" \ -f v2/sourcedb-to-spanner ``` @@ -256,11 +256,11 @@ resource "google_dataflow_flex_template_job" "sourcedb_to_spanner_flex" { name = "sourcedb-to-spanner-flex" region = var.region parameters = { - sourceDbURL = "" + sourceConfigURL = "" instanceId = "" databaseId = "" projectId = "" - DLQDirectory = "" + outputDirectory = "" # jdbcDriverJars = "gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar" # jdbcDriverClassName = "com.mysql.jdbc.Driver" # username = "" diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/options/OptionsToConfigBuilder.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/options/OptionsToConfigBuilder.java index 15b148d43f..be0a4fcf2c 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/options/OptionsToConfigBuilder.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/options/OptionsToConfigBuilder.java @@ -56,7 +56,7 @@ public static JdbcIOWrapperConfig configWithMySqlDefaultsFromOptions( List tables, String shardId, Wait.OnSignal waitOn) { - String sourceDbURL = options.getSourceDbURL(); + String sourceDbURL = options.getSourceConfigURL(); String dbName = extractDbFromURL(sourceDbURL); String username = options.getUsername(); String password = options.getPassword(); @@ -104,7 +104,8 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfig( .setSourceSchemaReference(SourceSchemaReference.builder().setDbName(dbName).build()) .setDbAuth( LocalCredentialsProvider.builder() - .setUserName(username) + .setUserName( + username) // TODO - support taking username and password from url as well .setPassword(password) .build()) .setJdbcDriverClassName(jdbcDriverClassName) diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/options/SourceDbToSpannerOptions.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/options/SourceDbToSpannerOptions.java index 69ed6fe59e..90601bd57c 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/options/SourceDbToSpannerOptions.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/options/SourceDbToSpannerOptions.java @@ -45,18 +45,19 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions { void setJdbcDriverClassName(String driverClassName); - // TODO - reset the regex matches here @TemplateParameter.Text( order = 3, regexes = {"(^jdbc:mysql://.*|^gs://.*)"}, groupName = "Source", description = - "Connection URL to connect to the source database host. Must contain the host, port and source db name. Can optionally contain properties like autoReconnect, maxReconnects etc. Format: `jdbc:mysql://{host}:{port}/{dbName}?{parameters}`", + "URL to connect to the source database host. It can be either of " + + "1. The JDBC connection URL - which must contain the host, port and source db name and can optionally contain properties like autoReconnect, maxReconnects etc. Format: `jdbc:mysql://{host}:{port}/{dbName}?{parameters}`" + + "2. The shard config path", helpText = - "The JDBC connection URL string. For example, `jdbc:mysql://127.4.5.30:3306/my-db?autoReconnect=true&maxReconnects=10&unicode=true&characterEncoding=UTF-8`.") - String getSourceDbURL(); + "The JDBC connection URL string. For example, `jdbc:mysql://127.4.5.30:3306/my-db?autoReconnect=true&maxReconnects=10&unicode=true&characterEncoding=UTF-8` or the shard config") + String getSourceConfigURL(); - void setSourceDbURL(String url); + void setSourceConfigURL(String url); @TemplateParameter.Text( order = 4, @@ -65,7 +66,7 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions { description = "JDBC connection username.", helpText = "The username to be used for the JDBC connection.") @Default.String("") - String getUsername(); + String getUsername(); // Make optional void setUsername(String username); @@ -75,15 +76,15 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions { description = "JDBC connection password.", helpText = "The password to be used for the JDBC connection.") @Default.String("") - String getPassword(); + String getPassword(); // make optional void setPassword(String password); @TemplateParameter.Text( order = 6, optional = true, - description = "Comma-separated names of the tables in the source database.", - helpText = "Tables to read from using partitions.") + description = "colon-separated names of the tables in the source database.", + helpText = "Tables to migrate from source.") @Default.String("") String getTables(); diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/PipelineController.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/PipelineController.java index ed2c77d15e..2c94049152 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/PipelineController.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/PipelineController.java @@ -15,6 +15,7 @@ */ package com.google.cloud.teleport.v2.templates; +import com.google.cloud.spanner.Options.RpcPriority; import com.google.cloud.teleport.v2.options.OptionsToConfigBuilder; import com.google.cloud.teleport.v2.options.SourceDbToSpannerOptions; import com.google.cloud.teleport.v2.source.reader.ReaderImpl; @@ -244,7 +245,8 @@ static SpannerConfig createSpannerConfig(SourceDbToSpannerOptions options) { .withProjectId(ValueProvider.StaticValueProvider.of(options.getProjectId())) .withHost(ValueProvider.StaticValueProvider.of(options.getSpannerHost())) .withInstanceId(ValueProvider.StaticValueProvider.of(options.getInstanceId())) - .withDatabaseId(ValueProvider.StaticValueProvider.of(options.getDatabaseId())); + .withDatabaseId(ValueProvider.StaticValueProvider.of(options.getDatabaseId())) + .withRpcPriority(RpcPriority.HIGH); } @VisibleForTesting @@ -265,7 +267,7 @@ static ISchemaMapper getSchemaMapper(SourceDbToSpannerOptions options, Ddl ddl) static List listTablesToMigrate(String tableList, ISchemaMapper mapper, Ddl ddl) { List tablesFromOptions = StringUtils.isNotBlank(tableList) - ? Arrays.stream(tableList.split(",")).collect(Collectors.toList()) + ? Arrays.stream(tableList.split("\\:|,")).collect(Collectors.toList()) : new ArrayList(); List sourceTablesConfigured = null; diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/SourceDbToSpanner.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/SourceDbToSpanner.java index 882f4fe3cf..bfde56e03a 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/SourceDbToSpanner.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/SourceDbToSpanner.java @@ -102,10 +102,10 @@ static PipelineResult run(SourceDbToSpannerOptions options) { SpannerConfig spannerConfig = createSpannerConfig(options); // Decide type and source of migration - if (options.getSourceDbURL().startsWith("gs://")) { + if (options.getSourceConfigURL().startsWith("gs://")) { List shards = new ShardFileReader(new SecretManagerAccessorImpl()) - .readForwardMigrationShardingConfig(options.getSourceDbURL()); + .readForwardMigrationShardingConfig(options.getSourceConfigURL()); return PipelineController.executeShardedMigration(options, pipeline, shards, spannerConfig); } else { return PipelineController.executeSingleInstanceMigration(options, pipeline, spannerConfig); diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/options/OptionsToConfigBuilderTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/options/OptionsToConfigBuilderTest.java index d1828d6a7c..b9f045bdda 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/options/OptionsToConfigBuilderTest.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/options/OptionsToConfigBuilderTest.java @@ -45,7 +45,7 @@ public void testConfigWithMySqlDefaultsFromOptions() { final String testpassword = "password"; SourceDbToSpannerOptions sourceDbToSpannerOptions = PipelineOptionsFactory.as(SourceDbToSpannerOptions.class); - sourceDbToSpannerOptions.setSourceDbURL(testUrl); + sourceDbToSpannerOptions.setSourceConfigURL(testUrl); sourceDbToSpannerOptions.setJdbcDriverClassName(testdriverClassName); sourceDbToSpannerOptions.setMaxConnections(150); sourceDbToSpannerOptions.setNumPartitions(4000); @@ -71,7 +71,7 @@ public void testURIParsingException() { final String testUrl = "jd#bc://localhost"; SourceDbToSpannerOptions sourceDbToSpannerOptions = PipelineOptionsFactory.as(SourceDbToSpannerOptions.class); - sourceDbToSpannerOptions.setSourceDbURL(testUrl); + sourceDbToSpannerOptions.setSourceConfigURL(testUrl); assertThrows( RuntimeException.class, () -> diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SourceDbToSpannerITBase.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SourceDbToSpannerITBase.java index cfe60bb457..50342c748c 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SourceDbToSpannerITBase.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SourceDbToSpannerITBase.java @@ -145,7 +145,7 @@ protected PipelineLauncher.LaunchInfo launchDataflowJob( put("projectId", PROJECT); put("instanceId", spannerResourceManager.getInstanceId()); put("databaseId", spannerResourceManager.getDatabaseId()); - put("sourceDbURL", jdbcResourceManager.getUri()); + put("sourceConfigURL", jdbcResourceManager.getUri()); put("username", jdbcResourceManager.getUsername()); put("password", jdbcResourceManager.getPassword()); put("outputDirectory", "gs://" + artifactBucketName); diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SourceDbToSpannerTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SourceDbToSpannerTest.java index e95d6aee15..35b7accd6b 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SourceDbToSpannerTest.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SourceDbToSpannerTest.java @@ -149,7 +149,7 @@ public void listTablesToMigrateSession() { createOptionsHelper( Paths.get(Resources.getResource("session-file-with-dropped-column.json").getPath()) .toString(), - ""); + "cart,people"); ISchemaMapper schemaMapper = PipelineController.getSchemaMapper(mockOptions, spannerDdl); List tables = PipelineController.listTablesToMigrate(mockOptions.getTables(), schemaMapper, spannerDdl); diff --git a/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/schema/SessionBasedMapper.java b/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/schema/SessionBasedMapper.java index 0398424a29..44e418b3d1 100644 --- a/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/schema/SessionBasedMapper.java +++ b/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/schema/SessionBasedMapper.java @@ -32,6 +32,8 @@ import java.util.stream.Collectors; import org.apache.curator.shaded.com.google.common.collect.ImmutableList; import org.apache.parquet.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This mapper uses an SMT session file to map table and column names. For fetching destination data @@ -39,20 +41,44 @@ */ public class SessionBasedMapper implements ISchemaMapper, Serializable { + private static final Logger LOG = LoggerFactory.getLogger(SessionBasedMapper.class); + private final Ddl ddl; private final Schema schema; + /*If enabled, throw error on mismatches between spanner schema and session file. Defaults to false. + */ + private boolean strictCheckSchema = false; + public SessionBasedMapper(String sessionFilePath, Ddl ddl) throws InputMismatchException { - this.schema = SessionFileReader.read(sessionFilePath); - this.ddl = ddl; - validateSchemaAndDdl(schema, ddl); + this(sessionFilePath, ddl, false); + } + + public SessionBasedMapper(String sessionFilePath, Ddl ddl, boolean strictCheckSchema) + throws InputMismatchException { + this(SessionFileReader.read(sessionFilePath), ddl, strictCheckSchema); } public SessionBasedMapper(Schema schema, Ddl ddl) throws InputMismatchException { + this(schema, ddl, false); + } + + public SessionBasedMapper(Schema schema, Ddl ddl, boolean strictCheckSchema) + throws InputMismatchException { this.schema = schema; this.ddl = ddl; - validateSchemaAndDdl(schema, ddl); + try { + validateSchemaAndDdl(schema, ddl); + LOG.info("schema matches between session file and spanner"); + } catch (InputMismatchException e) { + if (strictCheckSchema) { + LOG.warn("schema does not match between session and spanner: {}", e.getMessage()); + throw e; + } else { + LOG.warn("proceeding without schema match between session and spanner"); + } + } } static void validateSchemaAndDdl(Schema schema, Ddl ddl) throws InputMismatchException { diff --git a/v2/spanner-common/src/test/java/com/google/cloud/teleport/v2/spanner/migrations/schema/SessionBasedMapperTest.java b/v2/spanner-common/src/test/java/com/google/cloud/teleport/v2/spanner/migrations/schema/SessionBasedMapperTest.java index ec06706be6..7501592b97 100644 --- a/v2/spanner-common/src/test/java/com/google/cloud/teleport/v2/spanner/migrations/schema/SessionBasedMapperTest.java +++ b/v2/spanner-common/src/test/java/com/google/cloud/teleport/v2/spanner/migrations/schema/SessionBasedMapperTest.java @@ -324,12 +324,26 @@ public void testSourceTablesToMigrate() { assertEquals(2, sourceTablesToMigrate.size()); } + @Test + public void testIgnoreStrictCheck() { + // Expected to fail as spanner tables mentioned in session file do not exist + SessionBasedMapper emptymapper = + new SessionBasedMapper( + Paths.get(Resources.getResource("session-file-empty.json").getPath()).toString(), + ddl, + false); + List sourceTablesToMigrate = emptymapper.getSourceTablesToMigrate(""); + assertTrue(sourceTablesToMigrate.isEmpty()); + } + @Test(expected = InputMismatchException.class) public void testSourceTablesToMigrateEmpty() { // Expected to fail as spanner tables mentioned in session file do not exist SessionBasedMapper emptymapper = new SessionBasedMapper( - Paths.get(Resources.getResource("session-file-empty.json").getPath()).toString(), ddl); + Paths.get(Resources.getResource("session-file-empty.json").getPath()).toString(), + ddl, + true); List sourceTablesToMigrate = emptymapper.getSourceTablesToMigrate(""); }