From a11e39330a283f3a88bbc2ffe2e0b4dbf8e912be Mon Sep 17 00:00:00 2001 From: Thiago Nunes Date: Tue, 8 Oct 2024 13:55:30 +1100 Subject: [PATCH 01/10] fix: generate random index name for change streams Generates index names for change stream partition metadata table using a random UUID. This prevents issues if the job is being redeployed in an existing database. --- .../beam/sdk/io/gcp/spanner/SpannerIO.java | 18 ++- .../spanner/changestreams/NameGenerator.java | 52 ------- .../spanner/changestreams/dao/DaoFactory.java | 12 +- .../dao/PartitionMetadataAdminDao.java | 46 +++--- .../dao/PartitionMetadataTableNames.java | 144 ++++++++++++++++++ .../changestreams/NameGeneratorTest.java | 41 ----- .../dao/PartitionMetadataAdminDaoTest.java | 8 +- .../dao/PartitionMetadataTableNamesTest.java | 43 ++++++ 8 files changed, 231 insertions(+), 133 deletions(-) delete mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/NameGenerator.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNames.java delete mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/NameGeneratorTest.java create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNamesTest.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index 435bbba9ae8e3..b6a86a7e03e34 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -25,7 +25,6 @@ import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_RPC_PRIORITY; import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.MAX_INCLUSIVE_END_AT; import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.THROUGHPUT_WINDOW_SECONDS; -import static org.apache.beam.sdk.io.gcp.spanner.changestreams.NameGenerator.generatePartitionMetadataTableName; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; @@ -61,6 +60,7 @@ import java.util.HashMap; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.OptionalInt; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -77,6 +77,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.MetadataSpannerConfigFactory; import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ActionFactory; import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataTableNames; import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.CleanUpReadChangeStreamDoFn; import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.DetectNewPartitionsDoFn; import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.InitializeDoFn; @@ -1772,9 +1773,12 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta + fullPartitionMetadataDatabaseId + " has dialect " + metadataDatabaseDialect); - final String partitionMetadataTableName = - MoreObjects.firstNonNull( - getMetadataTable(), generatePartitionMetadataTableName(partitionMetadataDatabaseId)); + PartitionMetadataTableNames partitionMetadataTableNames = + Optional.ofNullable(getMetadataTable()) + .map( + tableName -> + PartitionMetadataTableNames.from(partitionMetadataDatabaseId, tableName)) + .orElse(PartitionMetadataTableNames.from(partitionMetadataDatabaseId)); final String changeStreamName = getChangeStreamName(); final Timestamp startTimestamp = getInclusiveStartAt(); // Uses (Timestamp.MAX - 1ns) at max for end timestamp, because we add 1ns to transform the @@ -1791,7 +1795,7 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta changeStreamSpannerConfig, changeStreamName, partitionMetadataSpannerConfig, - partitionMetadataTableName, + partitionMetadataTableNames, rpcPriority, input.getPipeline().getOptions().getJobName(), changeStreamDatabaseDialect, @@ -1807,7 +1811,9 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta final PostProcessingMetricsDoFn postProcessingMetricsDoFn = new PostProcessingMetricsDoFn(metrics); - LOG.info("Partition metadata table that will be used is " + partitionMetadataTableName); + LOG.info( + "Partition metadata table that will be used is " + + partitionMetadataTableNames.getTableName()); final PCollection impulseOut = input.apply(Impulse.create()); final PCollection partitionsOut = diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/NameGenerator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/NameGenerator.java deleted file mode 100644 index 322e85cb07a25..0000000000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/NameGenerator.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.spanner.changestreams; - -import java.util.UUID; - -/** - * This class generates a unique name for the partition metadata table, which is created when the - * Connector is initialized. - */ -public class NameGenerator { - - private static final String PARTITION_METADATA_TABLE_NAME_FORMAT = "Metadata_%s_%s"; - private static final int MAX_TABLE_NAME_LENGTH = 63; - - /** - * Generates an unique name for the partition metadata table in the form of {@code - * "Metadata__"}. - * - * @param databaseId The database id where the table will be created - * @return the unique generated name of the partition metadata table - */ - public static String generatePartitionMetadataTableName(String databaseId) { - // There are 11 characters in the name format. - // Maximum Spanner database ID length is 30 characters. - // UUID always generates a String with 36 characters. - // Since the Postgres table name length is 63, we may need to truncate the table name depending - // on the database length. - String fullString = - String.format(PARTITION_METADATA_TABLE_NAME_FORMAT, databaseId, UUID.randomUUID()) - .replaceAll("-", "_"); - if (fullString.length() < MAX_TABLE_NAME_LENGTH) { - return fullString; - } - return fullString.substring(0, MAX_TABLE_NAME_LENGTH); - } -} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java index b9718fdb675e1..787abad02e026 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java @@ -44,7 +44,7 @@ public class DaoFactory implements Serializable { private final SpannerConfig metadataSpannerConfig; private final String changeStreamName; - private final String partitionMetadataTableName; + private final PartitionMetadataTableNames partitionMetadataTableNames; private final RpcPriority rpcPriority; private final String jobName; private final Dialect spannerChangeStreamDatabaseDialect; @@ -56,7 +56,7 @@ public class DaoFactory implements Serializable { * @param changeStreamSpannerConfig the configuration for the change streams DAO * @param changeStreamName the name of the change stream for the change streams DAO * @param metadataSpannerConfig the metadata tables configuration - * @param partitionMetadataTableName the name of the created partition metadata table + * @param partitionMetadataTableNames the names of the partition metadata ddl objects * @param rpcPriority the priority of the requests made by the DAO queries * @param jobName the name of the running job */ @@ -64,7 +64,7 @@ public DaoFactory( SpannerConfig changeStreamSpannerConfig, String changeStreamName, SpannerConfig metadataSpannerConfig, - String partitionMetadataTableName, + PartitionMetadataTableNames partitionMetadataTableNames, RpcPriority rpcPriority, String jobName, Dialect spannerChangeStreamDatabaseDialect, @@ -78,7 +78,7 @@ public DaoFactory( this.changeStreamSpannerConfig = changeStreamSpannerConfig; this.changeStreamName = changeStreamName; this.metadataSpannerConfig = metadataSpannerConfig; - this.partitionMetadataTableName = partitionMetadataTableName; + this.partitionMetadataTableNames = partitionMetadataTableNames; this.rpcPriority = rpcPriority; this.jobName = jobName; this.spannerChangeStreamDatabaseDialect = spannerChangeStreamDatabaseDialect; @@ -102,7 +102,7 @@ public synchronized PartitionMetadataAdminDao getPartitionMetadataAdminDao() { databaseAdminClient, metadataSpannerConfig.getInstanceId().get(), metadataSpannerConfig.getDatabaseId().get(), - partitionMetadataTableName, + partitionMetadataTableNames, this.metadataDatabaseDialect); } return partitionMetadataAdminDao; @@ -120,7 +120,7 @@ public synchronized PartitionMetadataDao getPartitionMetadataDao() { if (partitionMetadataDaoInstance == null) { partitionMetadataDaoInstance = new PartitionMetadataDao( - this.partitionMetadataTableName, + this.partitionMetadataTableNames.getTableName(), spannerAccessor.getDatabaseClient(), this.metadataDatabaseDialect); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java index 368cab7022b3d..7e68f8e280d1f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java @@ -79,19 +79,13 @@ public class PartitionMetadataAdminDao { */ public static final String COLUMN_FINISHED_AT = "FinishedAt"; - /** Metadata table index for queries over the watermark column. */ - public static final String WATERMARK_INDEX = "WatermarkIndex"; - - /** Metadata table index for queries over the created at / start timestamp columns. */ - public static final String CREATED_AT_START_TIMESTAMP_INDEX = "CreatedAtStartTimestampIndex"; - private static final int TIMEOUT_MINUTES = 10; private static final int TTL_AFTER_PARTITION_FINISHED_DAYS = 1; private final DatabaseAdminClient databaseAdminClient; private final String instanceId; private final String databaseId; - private final String tableName; + private final PartitionMetadataTableNames config; private final Dialect dialect; /** @@ -101,18 +95,18 @@ public class PartitionMetadataAdminDao { * table * @param instanceId the instance where the metadata table will reside * @param databaseId the database where the metadata table will reside - * @param tableName the name of the metadata table + * @param names the names of the metadata table ddl objects */ PartitionMetadataAdminDao( DatabaseAdminClient databaseAdminClient, String instanceId, String databaseId, - String tableName, + PartitionMetadataTableNames names, Dialect dialect) { this.databaseAdminClient = databaseAdminClient; this.instanceId = instanceId; this.databaseId = databaseId; - this.tableName = tableName; + this.config = names; this.dialect = dialect; } @@ -129,7 +123,7 @@ public void createPartitionMetadataTable() { // Literals need be added around literals to preserve casing. ddl.add( "CREATE TABLE \"" - + tableName + + config.getTableName() + "\"(\"" + COLUMN_PARTITION_TOKEN + "\" text NOT NULL,\"" @@ -164,9 +158,9 @@ public void createPartitionMetadataTable() { + "\""); ddl.add( "CREATE INDEX \"" - + WATERMARK_INDEX + + config.getWatermarkIndexName() + "\" on \"" - + tableName + + config.getTableName() + "\" (\"" + COLUMN_WATERMARK + "\") INCLUDE (\"" @@ -174,9 +168,9 @@ public void createPartitionMetadataTable() { + "\")"); ddl.add( "CREATE INDEX \"" - + CREATED_AT_START_TIMESTAMP_INDEX + + config.getCreatedAtIndexName() + "\" ON \"" - + tableName + + config.getTableName() + "\" (\"" + COLUMN_CREATED_AT + "\",\"" @@ -185,7 +179,7 @@ public void createPartitionMetadataTable() { } else { ddl.add( "CREATE TABLE " - + tableName + + config.getTableName() + " (" + COLUMN_PARTITION_TOKEN + " STRING(MAX) NOT NULL," @@ -219,9 +213,9 @@ public void createPartitionMetadataTable() { + " DAY))"); ddl.add( "CREATE INDEX " - + WATERMARK_INDEX + + config.getWatermarkIndexName() + " on " - + tableName + + config.getTableName() + " (" + COLUMN_WATERMARK + ") STORING (" @@ -229,9 +223,9 @@ public void createPartitionMetadataTable() { + ")"); ddl.add( "CREATE INDEX " - + CREATED_AT_START_TIMESTAMP_INDEX + + config.getCreatedAtIndexName() + " ON " - + tableName + + config.getTableName() + " (" + COLUMN_CREATED_AT + "," @@ -264,13 +258,13 @@ public void createPartitionMetadataTable() { public void deletePartitionMetadataTable() { List ddl = new ArrayList<>(); if (this.isPostgres()) { - ddl.add("DROP INDEX \"" + CREATED_AT_START_TIMESTAMP_INDEX + "\""); - ddl.add("DROP INDEX \"" + WATERMARK_INDEX + "\""); - ddl.add("DROP TABLE \"" + tableName + "\""); + ddl.add("DROP INDEX \"" + config.getCreatedAtIndexName() + "\""); + ddl.add("DROP INDEX \"" + config.getWatermarkIndexName() + "\""); + ddl.add("DROP TABLE \"" + config.getTableName() + "\""); } else { - ddl.add("DROP INDEX " + CREATED_AT_START_TIMESTAMP_INDEX); - ddl.add("DROP INDEX " + WATERMARK_INDEX); - ddl.add("DROP TABLE " + tableName); + ddl.add("DROP INDEX " + config.getCreatedAtIndexName()); + ddl.add("DROP INDEX " + config.getWatermarkIndexName()); + ddl.add("DROP TABLE " + config.getTableName()); } OperationFuture op = databaseAdminClient.updateDatabaseDdl(instanceId, databaseId, ddl, null); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNames.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNames.java new file mode 100644 index 0000000000000..fe6b57812e34b --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNames.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao; + +import java.util.Objects; +import java.util.UUID; +import javax.annotation.Nullable; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; + +/** + * Configuration for a partition metadata table. It encapsulates the name of the metadata table and + * indexes. + */ +public class PartitionMetadataTableNames { + + /** PostgreSQL max table and index length is 63 bytes */ + @VisibleForTesting static final int MAX_NAME_LENGTH = 63; + + private static final String PARTITION_METADATA_TABLE_NAME_FORMAT = "Metadata_%s_%s"; + private static final String WATERMARK_INDEX_NAME_FORMAT = "WatermarkIdx_%s_%s"; + private static final String CREATED_AT_START_TIMESTAMP_INDEX_NAME_FORMAT = "CreatedAtIdx_%s_%s"; + + /** + * Generates a unique name for the partition metadata table and its indexes. The table name will + * be in the form of {@code "Metadata__"}. The watermark index will be in the + * form of {@code "WatermarkIdx__}. The createdAt / start timestamp index will + * be in the form of {@code "CreatedAtIdx__}. + * + * @param databaseId The database id where the table will be created + * @return the unique generated names of the partition metadata ddl + */ + public static PartitionMetadataTableNames from(String databaseId) { + UUID uuid = UUID.randomUUID(); + String table = + String.format(PARTITION_METADATA_TABLE_NAME_FORMAT, databaseId, uuid) + .replaceAll("-", "_") + .substring(0, MAX_NAME_LENGTH); + String watermarkIndex = + String.format(WATERMARK_INDEX_NAME_FORMAT, databaseId, uuid) + .replaceAll("-", "_") + .substring(0, MAX_NAME_LENGTH); + String createdAtIndex = + String.format(CREATED_AT_START_TIMESTAMP_INDEX_NAME_FORMAT, databaseId, uuid) + .replaceAll("-", "_") + .substring(0, MAX_NAME_LENGTH); + + return new PartitionMetadataTableNames(table, watermarkIndex, createdAtIndex); + } + + /** + * Generates a unique name for the partition metadata indexes. Uses the given table name. The + * watermark index will be in the form of {@code "WatermarkIdx__}. The createdAt + * / start timestamp index will be in the form of {@code "CreatedAtIdx__}. + * + * @param databaseId The database id where the table will be created + * @param table The table name to be used + * @return the unique generated names of the partition metadata ddl + */ + public static PartitionMetadataTableNames from(String databaseId, String table) { + UUID uuid = UUID.randomUUID(); + String watermarkIndex = + String.format(WATERMARK_INDEX_NAME_FORMAT, databaseId, uuid) + .replaceAll("-", "_") + .substring(0, MAX_NAME_LENGTH); + String createdAtIndex = + String.format(CREATED_AT_START_TIMESTAMP_INDEX_NAME_FORMAT, databaseId, uuid) + .replaceAll("-", "_") + .substring(0, MAX_NAME_LENGTH); + + return new PartitionMetadataTableNames(table, watermarkIndex, createdAtIndex); + } + + private final String tableName; + private final String watermarkIndexName; + private final String createdAtIndexName; + + public PartitionMetadataTableNames( + String tableName, String watermarkIndexName, String createdAtIndexName) { + this.tableName = tableName; + this.watermarkIndexName = watermarkIndexName; + this.createdAtIndexName = createdAtIndexName; + } + + public String getTableName() { + return tableName; + } + + public String getWatermarkIndexName() { + return watermarkIndexName; + } + + public String getCreatedAtIndexName() { + return createdAtIndexName; + } + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) { + return true; + } + if (!(o instanceof PartitionMetadataTableNames)) { + return false; + } + PartitionMetadataTableNames that = (PartitionMetadataTableNames) o; + return Objects.equals(tableName, that.tableName) + && Objects.equals(watermarkIndexName, that.watermarkIndexName) + && Objects.equals(createdAtIndexName, that.createdAtIndexName); + } + + @Override + public int hashCode() { + return Objects.hash(tableName, watermarkIndexName, createdAtIndexName); + } + + @Override + public String toString() { + return "PartitionMetadataTableNames{" + + "tableName='" + + tableName + + '\'' + + ", watermarkIndexName='" + + watermarkIndexName + + '\'' + + ", createdAtIndexName='" + + createdAtIndexName + + '\'' + + '}'; + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/NameGeneratorTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/NameGeneratorTest.java deleted file mode 100644 index f15fc5307374f..0000000000000 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/NameGeneratorTest.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.spanner.changestreams; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import org.junit.Test; - -public class NameGeneratorTest { - private static final int MAXIMUM_POSTGRES_TABLE_NAME_LENGTH = 63; - - @Test - public void testGenerateMetadataTableNameRemovesHyphens() { - final String tableName = - NameGenerator.generatePartitionMetadataTableName("my-database-id-12345"); - assertFalse(tableName.contains("-")); - } - - @Test - public void testGenerateMetadataTableNameIsShorterThan64Characters() { - final String tableName = - NameGenerator.generatePartitionMetadataTableName("my-database-id1-maximum-length"); - assertTrue(tableName.length() <= MAXIMUM_POSTGRES_TABLE_NAME_LENGTH); - } -} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDaoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDaoTest.java index 3752c2fb3afc1..c85a58008a088 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDaoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDaoTest.java @@ -58,6 +58,8 @@ public class PartitionMetadataAdminDaoTest { private static final String DATABASE_ID = "SPANNER_DATABASE"; private static final String TABLE_NAME = "SPANNER_TABLE"; + private static final String WATERMARK_INDEX_NAME = "WATERMARK_INDEX"; + private static final String CREATED_AT_INDEX_NAME = "CREATED_AT_INDEX"; private static final int TIMEOUT_MINUTES = 10; @@ -68,12 +70,14 @@ public class PartitionMetadataAdminDaoTest { @Before public void setUp() { databaseAdminClient = mock(DatabaseAdminClient.class); + PartitionMetadataTableNames names = + new PartitionMetadataTableNames(TABLE_NAME, WATERMARK_INDEX_NAME, CREATED_AT_INDEX_NAME); partitionMetadataAdminDao = new PartitionMetadataAdminDao( - databaseAdminClient, INSTANCE_ID, DATABASE_ID, TABLE_NAME, Dialect.GOOGLE_STANDARD_SQL); + databaseAdminClient, INSTANCE_ID, DATABASE_ID, names, Dialect.GOOGLE_STANDARD_SQL); partitionMetadataAdminDaoPostgres = new PartitionMetadataAdminDao( - databaseAdminClient, INSTANCE_ID, DATABASE_ID, TABLE_NAME, Dialect.POSTGRESQL); + databaseAdminClient, INSTANCE_ID, DATABASE_ID, names, Dialect.POSTGRESQL); op = (OperationFuture) mock(OperationFuture.class); statements = ArgumentCaptor.forClass(Iterable.class); when(databaseAdminClient.updateDatabaseDdl( diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNamesTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNamesTest.java new file mode 100644 index 0000000000000..dcfc4953469bc --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNamesTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao; + +import static org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataTableNames.MAX_NAME_LENGTH; +import static org.junit.Assert.*; + +import org.junit.Test; + +public class PartitionMetadataTableNamesTest { + + @Test + public void testGenerateMetadataConfigRemovesHyphens() { + PartitionMetadataTableNames names = PartitionMetadataTableNames.from("my-database-id-12345"); + assertFalse(names.getTableName().contains("-")); + assertFalse(names.getWatermarkIndexName().contains("-")); + assertFalse(names.getCreatedAtIndexName().contains("-")); + } + + @Test + public void testGenerateMetadataConfigIsShorterThan64Characters() { + PartitionMetadataTableNames names = + PartitionMetadataTableNames.from("my-database-id1-maximum-length"); + assertTrue(names.getTableName().length() <= MAX_NAME_LENGTH); + assertTrue(names.getWatermarkIndexName().length() <= MAX_NAME_LENGTH); + assertTrue(names.getCreatedAtIndexName().length() <= MAX_NAME_LENGTH); + } +} From 6cc13cab6334177f31ad0b78e84627ba24091af6 Mon Sep 17 00:00:00 2001 From: Thiago Nunes Date: Tue, 8 Oct 2024 14:16:16 +1100 Subject: [PATCH 02/10] refactor: config -> names --- .../dao/PartitionMetadataAdminDao.java | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java index 7e68f8e280d1f..3fada9fb8083a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java @@ -85,7 +85,7 @@ public class PartitionMetadataAdminDao { private final DatabaseAdminClient databaseAdminClient; private final String instanceId; private final String databaseId; - private final PartitionMetadataTableNames config; + private final PartitionMetadataTableNames names; private final Dialect dialect; /** @@ -106,7 +106,7 @@ public class PartitionMetadataAdminDao { this.databaseAdminClient = databaseAdminClient; this.instanceId = instanceId; this.databaseId = databaseId; - this.config = names; + this.names = names; this.dialect = dialect; } @@ -123,7 +123,7 @@ public void createPartitionMetadataTable() { // Literals need be added around literals to preserve casing. ddl.add( "CREATE TABLE \"" - + config.getTableName() + + names.getTableName() + "\"(\"" + COLUMN_PARTITION_TOKEN + "\" text NOT NULL,\"" @@ -158,9 +158,9 @@ public void createPartitionMetadataTable() { + "\""); ddl.add( "CREATE INDEX \"" - + config.getWatermarkIndexName() + + names.getWatermarkIndexName() + "\" on \"" - + config.getTableName() + + names.getTableName() + "\" (\"" + COLUMN_WATERMARK + "\") INCLUDE (\"" @@ -168,9 +168,9 @@ public void createPartitionMetadataTable() { + "\")"); ddl.add( "CREATE INDEX \"" - + config.getCreatedAtIndexName() + + names.getCreatedAtIndexName() + "\" ON \"" - + config.getTableName() + + names.getTableName() + "\" (\"" + COLUMN_CREATED_AT + "\",\"" @@ -179,7 +179,7 @@ public void createPartitionMetadataTable() { } else { ddl.add( "CREATE TABLE " - + config.getTableName() + + names.getTableName() + " (" + COLUMN_PARTITION_TOKEN + " STRING(MAX) NOT NULL," @@ -213,9 +213,9 @@ public void createPartitionMetadataTable() { + " DAY))"); ddl.add( "CREATE INDEX " - + config.getWatermarkIndexName() + + names.getWatermarkIndexName() + " on " - + config.getTableName() + + names.getTableName() + " (" + COLUMN_WATERMARK + ") STORING (" @@ -223,9 +223,9 @@ public void createPartitionMetadataTable() { + ")"); ddl.add( "CREATE INDEX " - + config.getCreatedAtIndexName() + + names.getCreatedAtIndexName() + " ON " - + config.getTableName() + + names.getTableName() + " (" + COLUMN_CREATED_AT + "," @@ -258,13 +258,13 @@ public void createPartitionMetadataTable() { public void deletePartitionMetadataTable() { List ddl = new ArrayList<>(); if (this.isPostgres()) { - ddl.add("DROP INDEX \"" + config.getCreatedAtIndexName() + "\""); - ddl.add("DROP INDEX \"" + config.getWatermarkIndexName() + "\""); - ddl.add("DROP TABLE \"" + config.getTableName() + "\""); + ddl.add("DROP INDEX \"" + names.getCreatedAtIndexName() + "\""); + ddl.add("DROP INDEX \"" + names.getWatermarkIndexName() + "\""); + ddl.add("DROP TABLE \"" + names.getTableName() + "\""); } else { - ddl.add("DROP INDEX " + config.getCreatedAtIndexName()); - ddl.add("DROP INDEX " + config.getWatermarkIndexName()); - ddl.add("DROP TABLE " + config.getTableName()); + ddl.add("DROP INDEX " + names.getCreatedAtIndexName()); + ddl.add("DROP INDEX " + names.getWatermarkIndexName()); + ddl.add("DROP TABLE " + names.getTableName()); } OperationFuture op = databaseAdminClient.updateDatabaseDdl(instanceId, databaseId, ddl, null); From e8533cb6bd685bd37a67381d2f192767d8768cf0 Mon Sep 17 00:00:00 2001 From: Thiago Nunes Date: Tue, 8 Oct 2024 14:39:48 +1100 Subject: [PATCH 03/10] fix: checkstyle --- .../spanner/changestreams/dao/PartitionMetadataTableNames.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNames.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNames.java index fe6b57812e34b..080d0661debe6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNames.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNames.java @@ -28,7 +28,7 @@ */ public class PartitionMetadataTableNames { - /** PostgreSQL max table and index length is 63 bytes */ + /** PostgreSQL max table and index length is 63 bytes. */ @VisibleForTesting static final int MAX_NAME_LENGTH = 63; private static final String PARTITION_METADATA_TABLE_NAME_FORMAT = "Metadata_%s_%s"; From 20d3dd3400b7b522b6cfaf736273d1ec0f941970 Mon Sep 17 00:00:00 2001 From: Thiago Nunes Date: Tue, 8 Oct 2024 15:39:04 +1100 Subject: [PATCH 04/10] fix: checkstyle test --- .../changestreams/dao/PartitionMetadataTableNamesTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNamesTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNamesTest.java index dcfc4953469bc..6e2e85553102d 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNamesTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNamesTest.java @@ -18,7 +18,8 @@ package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao; import static org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataTableNames.MAX_NAME_LENGTH; -import static org.junit.Assert.*; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import org.junit.Test; From e8188a66d3f318ed36acaaf3722379bdbc2eec81 Mon Sep 17 00:00:00 2001 From: Thiago Nunes Date: Tue, 8 Oct 2024 17:01:40 +1100 Subject: [PATCH 05/10] fix: makes new class serializable --- .../changestreams/dao/PartitionMetadataTableNames.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNames.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNames.java index 080d0661debe6..09987e4b30fd2 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNames.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNames.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao; +import java.io.Serializable; import java.util.Objects; import java.util.UUID; import javax.annotation.Nullable; @@ -26,7 +27,9 @@ * Configuration for a partition metadata table. It encapsulates the name of the metadata table and * indexes. */ -public class PartitionMetadataTableNames { +public class PartitionMetadataTableNames implements Serializable { + + private static final long serialVersionUID = 8848098877671834584L; /** PostgreSQL max table and index length is 63 bytes. */ @VisibleForTesting static final int MAX_NAME_LENGTH = 63; From 48b5ea6cb2ec37f8d15dd5ba3af1cfee5ec99c6d Mon Sep 17 00:00:00 2001 From: Thiago Nunes Date: Wed, 9 Oct 2024 11:50:32 +1100 Subject: [PATCH 06/10] fix: name generation --- .../dao/PartitionMetadataTableNames.java | 32 ++++++++----------- .../dao/PartitionMetadataTableNamesTest.java | 13 +++++--- 2 files changed, 23 insertions(+), 22 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNames.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNames.java index 09987e4b30fd2..0bba9aa93aa77 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNames.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNames.java @@ -49,18 +49,11 @@ public class PartitionMetadataTableNames implements Serializable { */ public static PartitionMetadataTableNames from(String databaseId) { UUID uuid = UUID.randomUUID(); - String table = - String.format(PARTITION_METADATA_TABLE_NAME_FORMAT, databaseId, uuid) - .replaceAll("-", "_") - .substring(0, MAX_NAME_LENGTH); - String watermarkIndex = - String.format(WATERMARK_INDEX_NAME_FORMAT, databaseId, uuid) - .replaceAll("-", "_") - .substring(0, MAX_NAME_LENGTH); + + String table = generateName(PARTITION_METADATA_TABLE_NAME_FORMAT, databaseId, uuid); + String watermarkIndex = generateName(WATERMARK_INDEX_NAME_FORMAT, databaseId, uuid); String createdAtIndex = - String.format(CREATED_AT_START_TIMESTAMP_INDEX_NAME_FORMAT, databaseId, uuid) - .replaceAll("-", "_") - .substring(0, MAX_NAME_LENGTH); + generateName(CREATED_AT_START_TIMESTAMP_INDEX_NAME_FORMAT, databaseId, uuid); return new PartitionMetadataTableNames(table, watermarkIndex, createdAtIndex); } @@ -76,18 +69,21 @@ public static PartitionMetadataTableNames from(String databaseId) { */ public static PartitionMetadataTableNames from(String databaseId, String table) { UUID uuid = UUID.randomUUID(); - String watermarkIndex = - String.format(WATERMARK_INDEX_NAME_FORMAT, databaseId, uuid) - .replaceAll("-", "_") - .substring(0, MAX_NAME_LENGTH); + String watermarkIndex = generateName(WATERMARK_INDEX_NAME_FORMAT, databaseId, uuid); String createdAtIndex = - String.format(CREATED_AT_START_TIMESTAMP_INDEX_NAME_FORMAT, databaseId, uuid) - .replaceAll("-", "_") - .substring(0, MAX_NAME_LENGTH); + generateName(CREATED_AT_START_TIMESTAMP_INDEX_NAME_FORMAT, databaseId, uuid); return new PartitionMetadataTableNames(table, watermarkIndex, createdAtIndex); } + private static String generateName(String template, String databaseId, UUID uuid) { + String name = String.format(template, databaseId, uuid).replaceAll("-", "_"); + if (name.length() > MAX_NAME_LENGTH) { + return name.substring(0, MAX_NAME_LENGTH); + } + return name; + } + private final String tableName; private final String watermarkIndexName; private final String createdAtIndexName; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNamesTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNamesTest.java index 6e2e85553102d..bd84669365b82 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNamesTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNamesTest.java @@ -24,9 +24,8 @@ import org.junit.Test; public class PartitionMetadataTableNamesTest { - @Test - public void testGenerateMetadataConfigRemovesHyphens() { + public void testPartitionMetadataNamesRemovesHyphens() { PartitionMetadataTableNames names = PartitionMetadataTableNames.from("my-database-id-12345"); assertFalse(names.getTableName().contains("-")); assertFalse(names.getWatermarkIndexName().contains("-")); @@ -34,9 +33,15 @@ public void testGenerateMetadataConfigRemovesHyphens() { } @Test - public void testGenerateMetadataConfigIsShorterThan64Characters() { + public void testPartitionMetadataNamesIsShorterThan64Characters() { PartitionMetadataTableNames names = - PartitionMetadataTableNames.from("my-database-id1-maximum-length"); + PartitionMetadataTableNames.from( + "my-database-id-larger-than-maximum-length-1234567890-1234567890-1234567890"); + assertTrue(names.getTableName().length() <= MAX_NAME_LENGTH); + assertTrue(names.getWatermarkIndexName().length() <= MAX_NAME_LENGTH); + assertTrue(names.getCreatedAtIndexName().length() <= MAX_NAME_LENGTH); + + names = PartitionMetadataTableNames.from("d"); assertTrue(names.getTableName().length() <= MAX_NAME_LENGTH); assertTrue(names.getWatermarkIndexName().length() <= MAX_NAME_LENGTH); assertTrue(names.getCreatedAtIndexName().length() <= MAX_NAME_LENGTH); From 6fcc3c20b2f8c7b03d6bd5255fe0d8288b32e016 Mon Sep 17 00:00:00 2001 From: Thiago Nunes Date: Wed, 9 Oct 2024 14:35:49 +1100 Subject: [PATCH 07/10] fix: drops indexes from metadata table Finds and drops all indexes on the metadata table before attempting to drop it. --- .../dao/PartitionMetadataAdminDao.java | 8 ++--- .../dao/PartitionMetadataDao.java | 24 +++++++++++++ .../dofn/CleanUpReadChangeStreamDoFn.java | 4 ++- .../dao/PartitionMetadataAdminDaoTest.java | 36 ++++++++++++++++--- 4 files changed, 62 insertions(+), 10 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java index 3fada9fb8083a..60a35fbbc102d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java @@ -255,15 +255,13 @@ public void createPartitionMetadataTable() { * Drops the metadata table. This operation should complete in {@link * PartitionMetadataAdminDao#TIMEOUT_MINUTES} minutes. */ - public void deletePartitionMetadataTable() { + public void deletePartitionMetadataTable(List indexes) { List ddl = new ArrayList<>(); if (this.isPostgres()) { - ddl.add("DROP INDEX \"" + names.getCreatedAtIndexName() + "\""); - ddl.add("DROP INDEX \"" + names.getWatermarkIndexName() + "\""); + indexes.forEach(index -> ddl.add("DROP INDEX \"" + index + "\"")); ddl.add("DROP TABLE \"" + names.getTableName() + "\""); } else { - ddl.add("DROP INDEX " + names.getCreatedAtIndexName()); - ddl.add("DROP INDEX " + names.getWatermarkIndexName()); + indexes.forEach(index -> ddl.add("DROP INDEX " + index)); ddl.add("DROP TABLE " + names.getTableName()); } OperationFuture op = diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java index 7867932cd1adc..196b13ddcc0cc 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java @@ -96,6 +96,30 @@ public boolean tableExists() { } } + /** + * Finds all indexes for the metadata table. + * + * @return a list of index names for the metadata table. + */ + public List findAllTableIndexes() { + final String indexesStmt = + "SELECT index_name FROM information_schema.indexes " + + "WHERE table_name = '" + + metadataTableName + + "' AND index_type != 'PRIMARY_KEY'"; + + List result = new ArrayList<>(); + try (ResultSet queryResultSet = + databaseClient + .singleUseReadOnlyTransaction() + .executeQuery(Statement.of(indexesStmt), Options.tag("query=findAllTableIndexes"))) { + while (queryResultSet.next()) { + result.add(queryResultSet.getString("index_name")); + } + } + return result; + } + /** * Fetches the partition metadata row data for the given partition token. * diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/CleanUpReadChangeStreamDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/CleanUpReadChangeStreamDoFn.java index a048c885a001d..f8aa497292bf6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/CleanUpReadChangeStreamDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/CleanUpReadChangeStreamDoFn.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn; import java.io.Serializable; +import java.util.List; import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory; import org.apache.beam.sdk.transforms.DoFn; @@ -33,6 +34,7 @@ public CleanUpReadChangeStreamDoFn(DaoFactory daoFactory) { @ProcessElement public void processElement(OutputReceiver receiver) { - daoFactory.getPartitionMetadataAdminDao().deletePartitionMetadataTable(); + List indexes = daoFactory.getPartitionMetadataDao().findAllTableIndexes(); + daoFactory.getPartitionMetadataAdminDao().deletePartitionMetadataTable(indexes); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDaoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDaoTest.java index c85a58008a088..fb19a84121fd7 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDaoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDaoTest.java @@ -33,7 +33,9 @@ import com.google.cloud.spanner.ErrorCode; import com.google.cloud.spanner.SpannerException; import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -137,7 +139,8 @@ public void testCreatePartitionMetadataTableWithInterruptedException() throws Ex @Test public void testDeletePartitionMetadataTable() throws Exception { when(op.get(TIMEOUT_MINUTES, TimeUnit.MINUTES)).thenReturn(null); - partitionMetadataAdminDao.deletePartitionMetadataTable(); + partitionMetadataAdminDao.deletePartitionMetadataTable( + Arrays.asList(WATERMARK_INDEX_NAME, CREATED_AT_INDEX_NAME)); verify(databaseAdminClient, times(1)) .updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID), statements.capture(), isNull()); assertEquals(3, ((Collection) statements.getValue()).size()); @@ -147,10 +150,22 @@ public void testDeletePartitionMetadataTable() throws Exception { assertTrue(it.next().contains("DROP TABLE")); } + @Test + public void testDeletePartitionMetadataTableWithNoIndexes() throws Exception { + when(op.get(TIMEOUT_MINUTES, TimeUnit.MINUTES)).thenReturn(null); + partitionMetadataAdminDao.deletePartitionMetadataTable(Collections.emptyList()); + verify(databaseAdminClient, times(1)) + .updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID), statements.capture(), isNull()); + assertEquals(1, ((Collection) statements.getValue()).size()); + Iterator it = statements.getValue().iterator(); + assertTrue(it.next().contains("DROP TABLE")); + } + @Test public void testDeletePartitionMetadataTablePostgres() throws Exception { when(op.get(TIMEOUT_MINUTES, TimeUnit.MINUTES)).thenReturn(null); - partitionMetadataAdminDaoPostgres.deletePartitionMetadataTable(); + partitionMetadataAdminDaoPostgres.deletePartitionMetadataTable( + Arrays.asList(WATERMARK_INDEX_NAME, CREATED_AT_INDEX_NAME)); verify(databaseAdminClient, times(1)) .updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID), statements.capture(), isNull()); assertEquals(3, ((Collection) statements.getValue()).size()); @@ -160,11 +175,23 @@ public void testDeletePartitionMetadataTablePostgres() throws Exception { assertTrue(it.next().contains("DROP TABLE \"")); } + @Test + public void testDeletePartitionMetadataTablePostgresWithNoIndexes() throws Exception { + when(op.get(TIMEOUT_MINUTES, TimeUnit.MINUTES)).thenReturn(null); + partitionMetadataAdminDaoPostgres.deletePartitionMetadataTable(Collections.emptyList()); + verify(databaseAdminClient, times(1)) + .updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID), statements.capture(), isNull()); + assertEquals(1, ((Collection) statements.getValue()).size()); + Iterator it = statements.getValue().iterator(); + assertTrue(it.next().contains("DROP TABLE \"")); + } + @Test public void testDeletePartitionMetadataTableWithTimeoutException() throws Exception { when(op.get(10, TimeUnit.MINUTES)).thenThrow(new TimeoutException(TIMED_OUT)); try { - partitionMetadataAdminDao.deletePartitionMetadataTable(); + partitionMetadataAdminDao.deletePartitionMetadataTable( + Arrays.asList(WATERMARK_INDEX_NAME, CREATED_AT_INDEX_NAME)); fail(); } catch (SpannerException e) { assertTrue(e.getMessage().contains(TIMED_OUT)); @@ -175,7 +202,8 @@ public void testDeletePartitionMetadataTableWithTimeoutException() throws Except public void testDeletePartitionMetadataTableWithInterruptedException() throws Exception { when(op.get(10, TimeUnit.MINUTES)).thenThrow(new InterruptedException(INTERRUPTED)); try { - partitionMetadataAdminDao.deletePartitionMetadataTable(); + partitionMetadataAdminDao.deletePartitionMetadataTable( + Arrays.asList(WATERMARK_INDEX_NAME, CREATED_AT_INDEX_NAME)); fail(); } catch (SpannerException e) { assertEquals(ErrorCode.CANCELLED, e.getErrorCode()); From ba4e3d1778254bada430e4a48ad410b0ae3861a4 Mon Sep 17 00:00:00 2001 From: Thiago Nunes Date: Wed, 9 Oct 2024 15:00:02 +1100 Subject: [PATCH 08/10] refactor: refactor name generator --- .../beam/sdk/io/gcp/spanner/SpannerIO.java | 6 ++-- .../dao/PartitionMetadataTableNames.java | 30 +++++++++---------- .../dao/PartitionMetadataTableNamesTest.java | 22 ++++++++++---- 3 files changed, 34 insertions(+), 24 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index b6a86a7e03e34..84f74b5286363 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -1775,10 +1775,8 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta + metadataDatabaseDialect); PartitionMetadataTableNames partitionMetadataTableNames = Optional.ofNullable(getMetadataTable()) - .map( - tableName -> - PartitionMetadataTableNames.from(partitionMetadataDatabaseId, tableName)) - .orElse(PartitionMetadataTableNames.from(partitionMetadataDatabaseId)); + .map(PartitionMetadataTableNames::fromExistingTable) + .orElse(PartitionMetadataTableNames.generateRandom(partitionMetadataDatabaseId)); final String changeStreamName = getChangeStreamName(); final Timestamp startTimestamp = getInclusiveStartAt(); // Uses (Timestamp.MAX - 1ns) at max for end timestamp, because we add 1ns to transform the diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNames.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNames.java index 0bba9aa93aa77..961732754e887 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNames.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNames.java @@ -47,7 +47,7 @@ public class PartitionMetadataTableNames implements Serializable { * @param databaseId The database id where the table will be created * @return the unique generated names of the partition metadata ddl */ - public static PartitionMetadataTableNames from(String databaseId) { + public static PartitionMetadataTableNames generateRandom(String databaseId) { UUID uuid = UUID.randomUUID(); String table = generateName(PARTITION_METADATA_TABLE_NAME_FORMAT, databaseId, uuid); @@ -59,21 +59,13 @@ public static PartitionMetadataTableNames from(String databaseId) { } /** - * Generates a unique name for the partition metadata indexes. Uses the given table name. The - * watermark index will be in the form of {@code "WatermarkIdx__}. The createdAt - * / start timestamp index will be in the form of {@code "CreatedAtIdx__}. + * Encapsulates an existing table name. Index names are not generated here, and will be null. * - * @param databaseId The database id where the table will be created * @param table The table name to be used - * @return the unique generated names of the partition metadata ddl + * @return an instance with the table name only */ - public static PartitionMetadataTableNames from(String databaseId, String table) { - UUID uuid = UUID.randomUUID(); - String watermarkIndex = generateName(WATERMARK_INDEX_NAME_FORMAT, databaseId, uuid); - String createdAtIndex = - generateName(CREATED_AT_START_TIMESTAMP_INDEX_NAME_FORMAT, databaseId, uuid); - - return new PartitionMetadataTableNames(table, watermarkIndex, createdAtIndex); + public static PartitionMetadataTableNames fromExistingTable(String table) { + return new PartitionMetadataTableNames(table); } private static String generateName(String template, String databaseId, UUID uuid) { @@ -85,8 +77,14 @@ private static String generateName(String template, String databaseId, UUID uuid } private final String tableName; - private final String watermarkIndexName; - private final String createdAtIndexName; + @Nullable private final String watermarkIndexName; + @Nullable private final String createdAtIndexName; + + public PartitionMetadataTableNames(String tableName) { + this.tableName = tableName; + this.watermarkIndexName = null; + this.createdAtIndexName = null; + } public PartitionMetadataTableNames( String tableName, String watermarkIndexName, String createdAtIndexName) { @@ -99,10 +97,12 @@ public String getTableName() { return tableName; } + @Nullable public String getWatermarkIndexName() { return watermarkIndexName; } + @Nullable public String getCreatedAtIndexName() { return createdAtIndexName; } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNamesTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNamesTest.java index bd84669365b82..0c9d18d455ff5 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNamesTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNamesTest.java @@ -18,32 +18,44 @@ package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao; import static org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataTableNames.MAX_NAME_LENGTH; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import org.junit.Test; public class PartitionMetadataTableNamesTest { @Test - public void testPartitionMetadataNamesRemovesHyphens() { - PartitionMetadataTableNames names = PartitionMetadataTableNames.from("my-database-id-12345"); + public void testGeneratePartitionMetadataNamesRemovesHyphens() { + PartitionMetadataTableNames names = + PartitionMetadataTableNames.generateRandom("my-database-id-12345"); assertFalse(names.getTableName().contains("-")); assertFalse(names.getWatermarkIndexName().contains("-")); assertFalse(names.getCreatedAtIndexName().contains("-")); } @Test - public void testPartitionMetadataNamesIsShorterThan64Characters() { + public void testGeneratePartitionMetadataNamesIsShorterThan64Characters() { PartitionMetadataTableNames names = - PartitionMetadataTableNames.from( + PartitionMetadataTableNames.generateRandom( "my-database-id-larger-than-maximum-length-1234567890-1234567890-1234567890"); assertTrue(names.getTableName().length() <= MAX_NAME_LENGTH); assertTrue(names.getWatermarkIndexName().length() <= MAX_NAME_LENGTH); assertTrue(names.getCreatedAtIndexName().length() <= MAX_NAME_LENGTH); - names = PartitionMetadataTableNames.from("d"); + names = PartitionMetadataTableNames.generateRandom("d"); assertTrue(names.getTableName().length() <= MAX_NAME_LENGTH); assertTrue(names.getWatermarkIndexName().length() <= MAX_NAME_LENGTH); assertTrue(names.getCreatedAtIndexName().length() <= MAX_NAME_LENGTH); } + + @Test + public void testPartitionMetadataNamesFromExistingTable() { + PartitionMetadataTableNames names = PartitionMetadataTableNames.fromExistingTable("mytable"); + + assertEquals("mytable", names.getTableName()); + assertNull(names.getWatermarkIndexName()); + assertNull(names.getCreatedAtIndexName()); + } } From 3df479ea83161f797e97247edda7f2675ef64fd7 Mon Sep 17 00:00:00 2001 From: Thiago Nunes Date: Wed, 9 Oct 2024 16:29:37 +1100 Subject: [PATCH 09/10] fix: always generates index names If a table is given, we still need to generate the index names in case the given table does not exist and needs to be created. --- .../beam/sdk/io/gcp/spanner/SpannerIO.java | 5 +++- .../dao/PartitionMetadataTableNames.java | 29 ++++++++++--------- .../changestreams/dofn/InitializeDoFn.java | 1 + .../dao/PartitionMetadataTableNamesTest.java | 8 ++--- 4 files changed, 24 insertions(+), 19 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index 84f74b5286363..d9dde11a3081a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -1775,7 +1775,10 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta + metadataDatabaseDialect); PartitionMetadataTableNames partitionMetadataTableNames = Optional.ofNullable(getMetadataTable()) - .map(PartitionMetadataTableNames::fromExistingTable) + .map( + table -> + PartitionMetadataTableNames.fromExistingTable( + partitionMetadataDatabaseId, table)) .orElse(PartitionMetadataTableNames.generateRandom(partitionMetadataDatabaseId)); final String changeStreamName = getChangeStreamName(); final Timestamp startTimestamp = getInclusiveStartAt(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNames.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNames.java index 961732754e887..90637126e84f8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNames.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNames.java @@ -59,13 +59,22 @@ public static PartitionMetadataTableNames generateRandom(String databaseId) { } /** - * Encapsulates an existing table name. Index names are not generated here, and will be null. + * Encapsulates an existing table name. Index names are generated, but will only be used if the + * given table does not exist. The watermark index will be in the form of {@code + * "WatermarkIdx__}. The createdAt / start timestamp index will be in the form + * of {@code "CreatedAtIdx__}. * + * @param databaseId The database id for the table * @param table The table name to be used - * @return an instance with the table name only + * @return an instance with the table name and generated index names */ - public static PartitionMetadataTableNames fromExistingTable(String table) { - return new PartitionMetadataTableNames(table); + public static PartitionMetadataTableNames fromExistingTable(String databaseId, String table) { + UUID uuid = UUID.randomUUID(); + + String watermarkIndex = generateName(WATERMARK_INDEX_NAME_FORMAT, databaseId, uuid); + String createdAtIndex = + generateName(CREATED_AT_START_TIMESTAMP_INDEX_NAME_FORMAT, databaseId, uuid); + return new PartitionMetadataTableNames(table, watermarkIndex, createdAtIndex); } private static String generateName(String template, String databaseId, UUID uuid) { @@ -77,14 +86,8 @@ private static String generateName(String template, String databaseId, UUID uuid } private final String tableName; - @Nullable private final String watermarkIndexName; - @Nullable private final String createdAtIndexName; - - public PartitionMetadataTableNames(String tableName) { - this.tableName = tableName; - this.watermarkIndexName = null; - this.createdAtIndexName = null; - } + private final String watermarkIndexName; + private final String createdAtIndexName; public PartitionMetadataTableNames( String tableName, String watermarkIndexName, String createdAtIndexName) { @@ -97,12 +100,10 @@ public String getTableName() { return tableName; } - @Nullable public String getWatermarkIndexName() { return watermarkIndexName; } - @Nullable public String getCreatedAtIndexName() { return createdAtIndexName; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java index 387ffd603b14d..ca93f34bf1ba8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java @@ -64,6 +64,7 @@ public InitializeDoFn( public void processElement(OutputReceiver receiver) { PartitionMetadataDao partitionMetadataDao = daoFactory.getPartitionMetadataDao(); if (!partitionMetadataDao.tableExists()) { + // Creates partition metadata table and associated indexes daoFactory.getPartitionMetadataAdminDao().createPartitionMetadataTable(); createFakeParentPartition(); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNamesTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNamesTest.java index 0c9d18d455ff5..e36b31c479885 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNamesTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNamesTest.java @@ -20,7 +20,6 @@ import static org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataTableNames.MAX_NAME_LENGTH; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import org.junit.Test; @@ -52,10 +51,11 @@ public void testGeneratePartitionMetadataNamesIsShorterThan64Characters() { @Test public void testPartitionMetadataNamesFromExistingTable() { - PartitionMetadataTableNames names = PartitionMetadataTableNames.fromExistingTable("mytable"); + PartitionMetadataTableNames names = + PartitionMetadataTableNames.fromExistingTable("databaseid", "mytable"); assertEquals("mytable", names.getTableName()); - assertNull(names.getWatermarkIndexName()); - assertNull(names.getCreatedAtIndexName()); + assertFalse(names.getWatermarkIndexName().contains("-")); + assertFalse(names.getCreatedAtIndexName().contains("-")); } } From 50495888a2430bc10715086561ad0cd7ac1930a2 Mon Sep 17 00:00:00 2001 From: Thiago Nunes Date: Thu, 10 Oct 2024 08:59:47 +1100 Subject: [PATCH 10/10] feat: considers schema in index discovery query --- .../dao/PartitionMetadataDao.java | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java index 196b13ddcc0cc..654fd946663cc 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java @@ -102,11 +102,22 @@ public boolean tableExists() { * @return a list of index names for the metadata table. */ public List findAllTableIndexes() { - final String indexesStmt = - "SELECT index_name FROM information_schema.indexes " - + "WHERE table_name = '" - + metadataTableName - + "' AND index_type != 'PRIMARY_KEY'"; + String indexesStmt; + if (this.isPostgres()) { + indexesStmt = + "SELECT index_name FROM information_schema.indexes" + + " WHERE table_schema = 'public'" + + " AND table_name = '" + + metadataTableName + + "' AND index_type != 'PRIMARY_KEY'"; + } else { + indexesStmt = + "SELECT index_name FROM information_schema.indexes" + + " WHERE table_schema = ''" + + " AND table_name = '" + + metadataTableName + + "' AND index_type != 'PRIMARY_KEY'"; + } List result = new ArrayList<>(); try (ResultSet queryResultSet =