Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: generate random index name for change streams #32689

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_RPC_PRIORITY;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.MAX_INCLUSIVE_END_AT;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.THROUGHPUT_WINDOW_SECONDS;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.NameGenerator.generatePartitionMetadataTableName;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

Expand Down Expand Up @@ -61,6 +60,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand All @@ -77,6 +77,7 @@
import org.apache.beam.sdk.io.gcp.spanner.changestreams.MetadataSpannerConfigFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ActionFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataTableNames;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.CleanUpReadChangeStreamDoFn;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.DetectNewPartitionsDoFn;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.InitializeDoFn;
Expand Down Expand Up @@ -1772,9 +1773,13 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
+ fullPartitionMetadataDatabaseId
+ " has dialect "
+ metadataDatabaseDialect);
final String partitionMetadataTableName =
MoreObjects.firstNonNull(
getMetadataTable(), generatePartitionMetadataTableName(partitionMetadataDatabaseId));
PartitionMetadataTableNames partitionMetadataTableNames =
Optional.ofNullable(getMetadataTable())
.map(
table ->
PartitionMetadataTableNames.fromExistingTable(
partitionMetadataDatabaseId, table))
.orElse(PartitionMetadataTableNames.generateRandom(partitionMetadataDatabaseId));
final String changeStreamName = getChangeStreamName();
final Timestamp startTimestamp = getInclusiveStartAt();
// Uses (Timestamp.MAX - 1ns) at max for end timestamp, because we add 1ns to transform the
Expand All @@ -1791,7 +1796,7 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
changeStreamSpannerConfig,
changeStreamName,
partitionMetadataSpannerConfig,
partitionMetadataTableName,
partitionMetadataTableNames,
rpcPriority,
input.getPipeline().getOptions().getJobName(),
changeStreamDatabaseDialect,
Expand All @@ -1807,7 +1812,9 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
final PostProcessingMetricsDoFn postProcessingMetricsDoFn =
new PostProcessingMetricsDoFn(metrics);

LOG.info("Partition metadata table that will be used is " + partitionMetadataTableName);
LOG.info(
"Partition metadata table that will be used is "
+ partitionMetadataTableNames.getTableName());

final PCollection<byte[]> impulseOut = input.apply(Impulse.create());
final PCollection<PartitionMetadata> partitionsOut =
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class DaoFactory implements Serializable {
private final SpannerConfig metadataSpannerConfig;

private final String changeStreamName;
private final String partitionMetadataTableName;
private final PartitionMetadataTableNames partitionMetadataTableNames;
private final RpcPriority rpcPriority;
private final String jobName;
private final Dialect spannerChangeStreamDatabaseDialect;
Expand All @@ -56,15 +56,15 @@ public class DaoFactory implements Serializable {
* @param changeStreamSpannerConfig the configuration for the change streams DAO
* @param changeStreamName the name of the change stream for the change streams DAO
* @param metadataSpannerConfig the metadata tables configuration
* @param partitionMetadataTableName the name of the created partition metadata table
* @param partitionMetadataTableNames the names of the partition metadata ddl objects
* @param rpcPriority the priority of the requests made by the DAO queries
* @param jobName the name of the running job
*/
public DaoFactory(
SpannerConfig changeStreamSpannerConfig,
String changeStreamName,
SpannerConfig metadataSpannerConfig,
String partitionMetadataTableName,
PartitionMetadataTableNames partitionMetadataTableNames,
RpcPriority rpcPriority,
String jobName,
Dialect spannerChangeStreamDatabaseDialect,
Expand All @@ -78,7 +78,7 @@ public DaoFactory(
this.changeStreamSpannerConfig = changeStreamSpannerConfig;
this.changeStreamName = changeStreamName;
this.metadataSpannerConfig = metadataSpannerConfig;
this.partitionMetadataTableName = partitionMetadataTableName;
this.partitionMetadataTableNames = partitionMetadataTableNames;
this.rpcPriority = rpcPriority;
this.jobName = jobName;
this.spannerChangeStreamDatabaseDialect = spannerChangeStreamDatabaseDialect;
Expand All @@ -102,7 +102,7 @@ public synchronized PartitionMetadataAdminDao getPartitionMetadataAdminDao() {
databaseAdminClient,
metadataSpannerConfig.getInstanceId().get(),
metadataSpannerConfig.getDatabaseId().get(),
partitionMetadataTableName,
partitionMetadataTableNames,
this.metadataDatabaseDialect);
}
return partitionMetadataAdminDao;
Expand All @@ -120,7 +120,7 @@ public synchronized PartitionMetadataDao getPartitionMetadataDao() {
if (partitionMetadataDaoInstance == null) {
partitionMetadataDaoInstance =
new PartitionMetadataDao(
this.partitionMetadataTableName,
this.partitionMetadataTableNames.getTableName(),
spannerAccessor.getDatabaseClient(),
this.metadataDatabaseDialect);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,13 @@ public class PartitionMetadataAdminDao {
*/
public static final String COLUMN_FINISHED_AT = "FinishedAt";

/** Metadata table index for queries over the watermark column. */
public static final String WATERMARK_INDEX = "WatermarkIndex";

/** Metadata table index for queries over the created at / start timestamp columns. */
public static final String CREATED_AT_START_TIMESTAMP_INDEX = "CreatedAtStartTimestampIndex";

private static final int TIMEOUT_MINUTES = 10;
private static final int TTL_AFTER_PARTITION_FINISHED_DAYS = 1;

private final DatabaseAdminClient databaseAdminClient;
private final String instanceId;
private final String databaseId;
private final String tableName;
private final PartitionMetadataTableNames names;
private final Dialect dialect;

/**
Expand All @@ -101,18 +95,18 @@ public class PartitionMetadataAdminDao {
* table
* @param instanceId the instance where the metadata table will reside
* @param databaseId the database where the metadata table will reside
* @param tableName the name of the metadata table
* @param names the names of the metadata table ddl objects
*/
PartitionMetadataAdminDao(
DatabaseAdminClient databaseAdminClient,
String instanceId,
String databaseId,
String tableName,
PartitionMetadataTableNames names,
Dialect dialect) {
this.databaseAdminClient = databaseAdminClient;
this.instanceId = instanceId;
this.databaseId = databaseId;
this.tableName = tableName;
this.names = names;
this.dialect = dialect;
}

Expand All @@ -129,7 +123,7 @@ public void createPartitionMetadataTable() {
// Literals need be added around literals to preserve casing.
ddl.add(
"CREATE TABLE \""
+ tableName
+ names.getTableName()
+ "\"(\""
+ COLUMN_PARTITION_TOKEN
+ "\" text NOT NULL,\""
Expand Down Expand Up @@ -164,19 +158,19 @@ public void createPartitionMetadataTable() {
+ "\"");
ddl.add(
"CREATE INDEX \""
+ WATERMARK_INDEX
+ names.getWatermarkIndexName()
+ "\" on \""
+ tableName
+ names.getTableName()
+ "\" (\""
+ COLUMN_WATERMARK
+ "\") INCLUDE (\""
+ COLUMN_STATE
+ "\")");
ddl.add(
"CREATE INDEX \""
+ CREATED_AT_START_TIMESTAMP_INDEX
+ names.getCreatedAtIndexName()
+ "\" ON \""
+ tableName
+ names.getTableName()
+ "\" (\""
+ COLUMN_CREATED_AT
+ "\",\""
Expand All @@ -185,7 +179,7 @@ public void createPartitionMetadataTable() {
} else {
ddl.add(
"CREATE TABLE "
+ tableName
+ names.getTableName()
+ " ("
+ COLUMN_PARTITION_TOKEN
+ " STRING(MAX) NOT NULL,"
Expand Down Expand Up @@ -219,19 +213,19 @@ public void createPartitionMetadataTable() {
+ " DAY))");
ddl.add(
"CREATE INDEX "
+ WATERMARK_INDEX
+ names.getWatermarkIndexName()
+ " on "
+ tableName
+ names.getTableName()
+ " ("
+ COLUMN_WATERMARK
+ ") STORING ("
+ COLUMN_STATE
+ ")");
ddl.add(
"CREATE INDEX "
+ CREATED_AT_START_TIMESTAMP_INDEX
+ names.getCreatedAtIndexName()
+ " ON "
+ tableName
+ names.getTableName()
+ " ("
+ COLUMN_CREATED_AT
+ ","
Expand Down Expand Up @@ -261,16 +255,14 @@ public void createPartitionMetadataTable() {
* Drops the metadata table. This operation should complete in {@link
* PartitionMetadataAdminDao#TIMEOUT_MINUTES} minutes.
*/
public void deletePartitionMetadataTable() {
public void deletePartitionMetadataTable(List<String> indexes) {
List<String> ddl = new ArrayList<>();
if (this.isPostgres()) {
ddl.add("DROP INDEX \"" + CREATED_AT_START_TIMESTAMP_INDEX + "\"");
ddl.add("DROP INDEX \"" + WATERMARK_INDEX + "\"");
ddl.add("DROP TABLE \"" + tableName + "\"");
indexes.forEach(index -> ddl.add("DROP INDEX \"" + index + "\""));
ddl.add("DROP TABLE \"" + names.getTableName() + "\"");
} else {
ddl.add("DROP INDEX " + CREATED_AT_START_TIMESTAMP_INDEX);
ddl.add("DROP INDEX " + WATERMARK_INDEX);
ddl.add("DROP TABLE " + tableName);
indexes.forEach(index -> ddl.add("DROP INDEX " + index));
ddl.add("DROP TABLE " + names.getTableName());
}
OperationFuture<Void, UpdateDatabaseDdlMetadata> op =
databaseAdminClient.updateDatabaseDdl(instanceId, databaseId, ddl, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,41 @@ public boolean tableExists() {
}
}

/**
* Finds all indexes for the metadata table.
*
* @return a list of index names for the metadata table.
*/
public List<String> findAllTableIndexes() {
String indexesStmt;
if (this.isPostgres()) {
indexesStmt =
"SELECT index_name FROM information_schema.indexes"
+ " WHERE table_schema = 'public'"
+ " AND table_name = '"
+ metadataTableName
+ "' AND index_type != 'PRIMARY_KEY'";
} else {
indexesStmt =
"SELECT index_name FROM information_schema.indexes"
+ " WHERE table_schema = ''"
+ " AND table_name = '"
+ metadataTableName
+ "' AND index_type != 'PRIMARY_KEY'";
}

List<String> result = new ArrayList<>();
try (ResultSet queryResultSet =
databaseClient
.singleUseReadOnlyTransaction()
.executeQuery(Statement.of(indexesStmt), Options.tag("query=findAllTableIndexes"))) {
while (queryResultSet.next()) {
result.add(queryResultSet.getString("index_name"));
}
}
return result;
}

/**
* Fetches the partition metadata row data for the given partition token.
*
Expand Down
Loading
Loading