Skip to content

Commit

Permalink
feat: enable transactional connection state as an opt-in
Browse files Browse the repository at this point in the history
This change enables transactional connection state for the Connection API.
It can be enabled by default for PostgreSQL-dialect databases using a system
property, and is always an opt-in for GoogleSQL-dialect databases.

Transactional connection state can be enabled for any database connection
with the `connection_state_type` connection STARTUP property.
  • Loading branch information
olavloite committed Sep 10, 2024
1 parent d15c079 commit b74abca
Show file tree
Hide file tree
Showing 19 changed files with 545 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.spanner.connection;

import static com.google.cloud.spanner.SpannerApiFutures.get;
import static com.google.cloud.spanner.connection.ConnectionOptions.isEnableTransactionalConnectionStateForPostgreSQL;
import static com.google.cloud.spanner.connection.ConnectionPreconditions.checkValidIdentifier;
import static com.google.cloud.spanner.connection.ConnectionProperties.AUTOCOMMIT;
import static com.google.cloud.spanner.connection.ConnectionProperties.AUTOCOMMIT_DML_MODE;
Expand Down Expand Up @@ -71,9 +72,11 @@
import com.google.cloud.spanner.connection.AbstractStatementParser.ParsedStatement;
import com.google.cloud.spanner.connection.AbstractStatementParser.StatementType;
import com.google.cloud.spanner.connection.ConnectionProperty.Context;
import com.google.cloud.spanner.connection.ConnectionState.Type;
import com.google.cloud.spanner.connection.StatementExecutor.StatementTimeout;
import com.google.cloud.spanner.connection.StatementResult.ResultType;
import com.google.cloud.spanner.connection.UnitOfWork.CallType;
import com.google.cloud.spanner.connection.UnitOfWork.EndTransactionCallback;
import com.google.cloud.spanner.connection.UnitOfWork.UnitOfWorkState;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -287,10 +290,18 @@ static UnitOfWorkType of(TransactionMode transactionMode) {
this.dbClient = spanner.getDatabaseClient(options.getDatabaseId());
this.batchClient = spanner.getBatchClient(options.getDatabaseId());
this.ddlClient = createDdlClient();
this.connectionState = new ConnectionState(options.getInitialConnectionPropertyValues());
this.connectionState =
new ConnectionState(
options.getInitialConnectionPropertyValues(),
Suppliers.memoize(
() ->
isEnableTransactionalConnectionStateForPostgreSQL()
&& getDialect() == Dialect.POSTGRESQL
? Type.TRANSACTIONAL
: Type.NON_TRANSACTIONAL));

// (Re)set the state of the connection to the default.
reset(Context.STARTUP);
setDefaultTransactionOptions();
}

/** Constructor only for test purposes. */
Expand All @@ -313,7 +324,10 @@ static UnitOfWorkType of(TransactionMode transactionMode) {
this.ddlClient = Preconditions.checkNotNull(ddlClient);
this.dbClient = Preconditions.checkNotNull(dbClient);
this.batchClient = Preconditions.checkNotNull(batchClient);
this.connectionState = new ConnectionState(options.getInitialConnectionPropertyValues());
this.connectionState =
new ConnectionState(
options.getInitialConnectionPropertyValues(),
Suppliers.ofInstance(Type.NON_TRANSACTIONAL));
setReadOnly(options.isReadOnly());
setAutocommit(options.isAutocommit());
setReturnCommitStats(options.isReturnCommitStats());
Expand Down Expand Up @@ -355,6 +369,11 @@ static Attributes createOpenTelemetryAttributes(DatabaseId databaseId) {
return attributesBuilder.build();
}

@VisibleForTesting
ConnectionState.Type getConnectionStateType() {
return this.connectionState.getType();
}

@Override
public void close() {
try {
Expand Down Expand Up @@ -412,38 +431,36 @@ private Context getCurrentContext() {
* this connection.
*/
public void reset() {
reset(getCurrentContext());
reset(getCurrentContext(), isInTransaction());
}

private void reset(Context context) {
private void reset(Context context, boolean inTransaction) {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);

// TODO: Replace all of these with a resetAll in ConnectionState.
this.connectionState.resetValue(RETRY_ABORTS_INTERNALLY, context, /* inTransaction= */ false);
this.connectionState.resetValue(AUTOCOMMIT, context, /* inTransaction= */ false);
this.connectionState.resetValue(READONLY, context, /* inTransaction= */ false);
this.connectionState.resetValue(READ_ONLY_STALENESS, context, /* inTransaction= */ false);
this.connectionState.resetValue(OPTIMIZER_VERSION, context, /* inTransaction= */ false);
this.connectionState.resetValue(RETRY_ABORTS_INTERNALLY, context, inTransaction);
this.connectionState.resetValue(AUTOCOMMIT, context, inTransaction);
this.connectionState.resetValue(READONLY, context, inTransaction);
this.connectionState.resetValue(READ_ONLY_STALENESS, context, inTransaction);
this.connectionState.resetValue(OPTIMIZER_VERSION, context, inTransaction);
this.connectionState.resetValue(OPTIMIZER_STATISTICS_PACKAGE, context, inTransaction);
this.connectionState.resetValue(RPC_PRIORITY, context, inTransaction);
this.connectionState.resetValue(DDL_IN_TRANSACTION_MODE, context, inTransaction);
this.connectionState.resetValue(RETURN_COMMIT_STATS, context, inTransaction);
this.connectionState.resetValue(
OPTIMIZER_STATISTICS_PACKAGE, context, /* inTransaction= */ false);
this.connectionState.resetValue(RPC_PRIORITY, context, /* inTransaction= */ false);
this.connectionState.resetValue(DDL_IN_TRANSACTION_MODE, context, /* inTransaction= */ false);
this.connectionState.resetValue(RETURN_COMMIT_STATS, context, /* inTransaction= */ false);
this.connectionState.resetValue(
DELAY_TRANSACTION_START_UNTIL_FIRST_WRITE, context, /* inTransaction= */ false);
this.connectionState.resetValue(KEEP_TRANSACTION_ALIVE, context, /* inTransaction= */ false);
this.connectionState.resetValue(AUTO_PARTITION_MODE, context, /* inTransaction= */ false);
this.connectionState.resetValue(DATA_BOOST_ENABLED, context, /* inTransaction= */ false);
this.connectionState.resetValue(MAX_PARTITIONS, context, /* inTransaction= */ false);
this.connectionState.resetValue(
MAX_PARTITIONED_PARALLELISM, context, /* inTransaction= */ false);
this.connectionState.resetValue(MAX_COMMIT_DELAY, context, /* inTransaction= */ false);

this.connectionState.resetValue(AUTOCOMMIT_DML_MODE, context, /* inTransaction= */ false);
DELAY_TRANSACTION_START_UNTIL_FIRST_WRITE, context, inTransaction);
this.connectionState.resetValue(KEEP_TRANSACTION_ALIVE, context, inTransaction);
this.connectionState.resetValue(AUTO_PARTITION_MODE, context, inTransaction);
this.connectionState.resetValue(DATA_BOOST_ENABLED, context, inTransaction);
this.connectionState.resetValue(MAX_PARTITIONS, context, inTransaction);
this.connectionState.resetValue(MAX_PARTITIONED_PARALLELISM, context, inTransaction);
this.connectionState.resetValue(MAX_COMMIT_DELAY, context, inTransaction);

this.connectionState.resetValue(AUTOCOMMIT_DML_MODE, context, inTransaction);
this.statementTag = null;
this.statementTimeout = new StatementExecutor.StatementTimeout();
this.connectionState.resetValue(DIRECTED_READ, context, /* inTransaction= */ false);
this.connectionState.resetValue(SAVEPOINT_SUPPORT, context, /* inTransaction= */ false);
this.connectionState.resetValue(DIRECTED_READ, context, inTransaction);
this.connectionState.resetValue(SAVEPOINT_SUPPORT, context, inTransaction);
this.protoDescriptors = null;
this.protoDescriptorsFilePath = null;

Expand Down Expand Up @@ -493,7 +510,31 @@ private <T> T getConnectionPropertyValue(
}

private <T> void setConnectionPropertyValue(ConnectionProperty<T> property, T value) {
this.connectionState.setValue(property, value, getCurrentContext(), /* inTransaction= */ false);
setConnectionPropertyValue(property, value, /* local = */ false);
}

private <T> void setConnectionPropertyValue(
ConnectionProperty<T> property, T value, boolean local) {
if (local) {
setLocalConnectionPropertyValue(property, value);
} else {
this.connectionState.setValue(property, value, getCurrentContext(), isInTransaction());
}
}

/**
* Sets a connection property value only for the duration of the current transaction. The effects
* of this will be undone once the transaction ends, regardless whether the transaction is
* committed or rolled back. 'Local' properties are supported for both {@link
* com.google.cloud.spanner.connection.ConnectionState.Type#TRANSACTIONAL} and {@link
* com.google.cloud.spanner.connection.ConnectionState.Type#NON_TRANSACTIONAL} connection states.
*
* <p>NOTE: This feature is not yet exposed in the public API.
*/
private <T> void setLocalConnectionPropertyValue(ConnectionProperty<T> property, T value) {
ConnectionPreconditions.checkState(
isInTransaction(), "SET LOCAL statements are only supported in transactions");
this.connectionState.setLocalValue(property, value);
}

@Override
Expand All @@ -511,6 +552,14 @@ public void setAutocommit(boolean autocommit) {
ConnectionPreconditions.checkState(
!transactionBeginMarked, "Cannot set autocommit when a transaction has begun");
setConnectionPropertyValue(AUTOCOMMIT, autocommit);
if (autocommit) {
// Commit the current transaction state if we went from autocommit=false to autocommit=true.
// Otherwise, we get the strange situation that autocommit=true cannot be committed, as we no
// longer have a transaction. Note that all the above state checks essentially mean that
// autocommit can only be set before a transaction has actually started, and not in the
// middle of a transaction.
this.connectionState.commit();
}
clearLastTransactionAndSetDefaultTransactionOptions();
// Reset the readOnlyStaleness value if it is no longer compatible with the new autocommit
// value.
Expand Down Expand Up @@ -917,6 +966,10 @@ private boolean internalIsTransactionStarted() {
&& this.currentUnitOfWork.getState() == UnitOfWorkState.STARTED;
}

private boolean hasTransactionalChanges() {
return internalIsTransactionStarted() || this.connectionState.hasTransactionalChanges();
}

@Override
public Timestamp getReadTimestamp() {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
Expand Down Expand Up @@ -960,8 +1013,13 @@ CommitResponse getCommitResponseOrNull() {

@Override
public void setReturnCommitStats(boolean returnCommitStats) {
setReturnCommitStats(returnCommitStats, /* local = */ false);
}

@VisibleForTesting
void setReturnCommitStats(boolean returnCommitStats, boolean local) {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
setConnectionPropertyValue(RETURN_COMMIT_STATS, returnCommitStats);
setConnectionPropertyValue(RETURN_COMMIT_STATS, returnCommitStats, local);
}

@Override
Expand Down Expand Up @@ -1056,10 +1114,22 @@ private interface EndTransactionMethod {
ApiFuture<Void> endAsync(CallType callType, UnitOfWork t);
}

private static final class Commit implements EndTransactionMethod {
private final class Commit implements EndTransactionMethod {
@Override
public ApiFuture<Void> endAsync(CallType callType, UnitOfWork t) {
return t.commitAsync(callType);
return t.commitAsync(
callType,
new EndTransactionCallback() {
@Override
public void onSuccess() {
ConnectionImpl.this.connectionState.commit();
}

@Override
public void onFailure() {
ConnectionImpl.this.connectionState.rollback();
}
});
}
}

Expand All @@ -1080,10 +1150,22 @@ private ApiFuture<Void> commitAsync(CallType callType) {
return endCurrentTransactionAsync(callType, commit);
}

private static final class Rollback implements EndTransactionMethod {
private final class Rollback implements EndTransactionMethod {
@Override
public ApiFuture<Void> endAsync(CallType callType, UnitOfWork t) {
return t.rollbackAsync(callType);
return t.rollbackAsync(
callType,
new EndTransactionCallback() {
@Override
public void onSuccess() {
ConnectionImpl.this.connectionState.rollback();
}

@Override
public void onFailure() {
ConnectionImpl.this.connectionState.rollback();
}
});
}
}

Expand Down Expand Up @@ -1112,7 +1194,7 @@ private ApiFuture<Void> endCurrentTransactionAsync(
statementTag == null, "Statement tags are not supported for COMMIT or ROLLBACK");
ApiFuture<Void> res;
try {
if (isTransactionStarted()) {
if (hasTransactionalChanges()) {
res = endTransactionMethod.endAsync(callType, getCurrentUnitOfWorkOrStartNewUnitOfWork());
} else {
this.currentUnitOfWork = null;
Expand Down Expand Up @@ -1880,7 +1962,8 @@ && getDdlInTransactionMode() != DdlInTransactionMode.FAIL
private Span createSpanForUnitOfWork(String name) {
return tracer
.spanBuilder(
Suppliers.memoize(() -> connectionState.getValue(TRACING_PREFIX).getValue()).get()
// We can memoize this, as it is a STARTUP property.
Suppliers.memoize(() -> this.connectionState.getValue(TRACING_PREFIX).getValue()).get()
+ "."
+ name)
.setAllAttributes(getOpenTelemetryAttributes())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,13 @@ public String[] getValidValues() {
}
}

/**
* Set this system property to true to enable transactional connection state by default for
* PostgreSQL-dialect databases. The default is currently false.
*/
public static String ENABLE_TRANSACTIONAL_CONNECTION_STATE_FOR_POSTGRESQL_PROPERTY =
"spanner.enable_transactional_connection_state_for_postgresql";

private static final LocalConnectionChecker LOCAL_CONNECTION_CHECKER =
new LocalConnectionChecker();
static final boolean DEFAULT_USE_PLAIN_TEXT = false;
Expand Down Expand Up @@ -337,6 +344,11 @@ private static String generateGuardedConnectionPropertyError(
systemPropertyName);
}

static boolean isEnableTransactionalConnectionStateForPostgreSQL() {
return Boolean.parseBoolean(
System.getProperty(ENABLE_TRANSACTIONAL_CONNECTION_STATE_FOR_POSTGRESQL_PROPERTY, "false"));
}

/**
* All valid connection properties.
*
Expand Down Expand Up @@ -810,12 +822,12 @@ private ConnectionOptions(Builder builder) {
// OUAuth token has been specified in the connection URI.
Preconditions.checkArgument(
Stream.of(
getInitialConnectionPropertyValue(CREDENTIALS_URL),
getInitialConnectionPropertyValue(ENCODED_CREDENTIALS),
getInitialConnectionPropertyValue(CREDENTIALS_PROVIDER),
getInitialConnectionPropertyValue(OAUTH_TOKEN))
.filter(Objects::nonNull)
.count()
getInitialConnectionPropertyValue(CREDENTIALS_URL),
getInitialConnectionPropertyValue(ENCODED_CREDENTIALS),
getInitialConnectionPropertyValue(CREDENTIALS_PROVIDER),
getInitialConnectionPropertyValue(OAUTH_TOKEN))
.filter(Objects::nonNull)
.count()
<= 1,
"Specify only one of credentialsUrl, encodedCredentials, credentialsProvider and OAuth token");
checkGuardedProperty(
Expand Down Expand Up @@ -1293,4 +1305,4 @@ List<StatementExecutionInterceptor> getStatementExecutionInterceptors() {
public String toString() {
return getUri();
}
}
}
Loading

0 comments on commit b74abca

Please sign in to comment.