Skip to content

Commit

Permalink
feat: use transactional connection state for PostgreSQL
Browse files Browse the repository at this point in the history
This change enables transactional connection state for the Connection API.
It can be enabled by default for PostgreSQL-dialect databases, and is always
an opt-in for GoogleSQL-dialect databases.

Transactional connection state can be enabled for any database connection
with the `connection_state_type` connection STARTUP property.
  • Loading branch information
olavloite committed Sep 10, 2024
1 parent 41e79df commit 0c2b944
Show file tree
Hide file tree
Showing 19 changed files with 544 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import com.google.cloud.spanner.connection.StatementExecutor.StatementTimeout;
import com.google.cloud.spanner.connection.StatementResult.ResultType;
import com.google.cloud.spanner.connection.UnitOfWork.CallType;
import com.google.cloud.spanner.connection.UnitOfWork.EndTransactionCallback;
import com.google.cloud.spanner.connection.UnitOfWork.UnitOfWorkState;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -287,10 +288,12 @@ static UnitOfWorkType of(TransactionMode transactionMode) {
this.dbClient = spanner.getDatabaseClient(options.getDatabaseId());
this.batchClient = spanner.getBatchClient(options.getDatabaseId());
this.ddlClient = createDdlClient();
this.connectionState = new ConnectionState(options.getInitialConnectionPropertyValues());
this.connectionState =
new ConnectionState(
options.getInitialConnectionPropertyValues(), this.dbClient::getDialect);

// (Re)set the state of the connection to the default.
reset(Context.STARTUP);
setDefaultTransactionOptions();
}

/** Constructor only for test purposes. */
Expand All @@ -313,7 +316,8 @@ static UnitOfWorkType of(TransactionMode transactionMode) {
this.ddlClient = Preconditions.checkNotNull(ddlClient);
this.dbClient = Preconditions.checkNotNull(dbClient);
this.batchClient = Preconditions.checkNotNull(batchClient);
this.connectionState = new ConnectionState(options.getInitialConnectionPropertyValues());
this.connectionState =
new ConnectionState(options.getInitialConnectionPropertyValues(), dbClient::getDialect);
setReadOnly(options.isReadOnly());
setAutocommit(options.isAutocommit());
setReturnCommitStats(options.isReturnCommitStats());
Expand Down Expand Up @@ -355,6 +359,11 @@ static Attributes createOpenTelemetryAttributes(DatabaseId databaseId) {
return attributesBuilder.build();
}

@VisibleForTesting
ConnectionState.Type getConnectionStateType() {
return this.connectionState.getType();
}

@Override
public void close() {
try {
Expand Down Expand Up @@ -412,35 +421,36 @@ private Context getCurrentContext() {
* this connection.
*/
public void reset() {
reset(getCurrentContext());
reset(getCurrentContext(), isInTransaction());
}

private void reset(Context context) {
private void reset(Context context, boolean inTransaction) {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);

// TODO: Replace all of these with a resetAll in ConnectionState.
this.connectionState.resetValue(RETRY_ABORTS_INTERNALLY, context, /* inTransaction= */ false);
this.connectionState.resetValue(AUTOCOMMIT, context, /* inTransaction= */ false);
this.connectionState.resetValue(READONLY, context, /* inTransaction= */ false);
this.connectionState.resetValue(READ_ONLY_STALENESS, context, /* inTransaction= */ false);
this.connectionState.resetValue(OPTIMIZER_VERSION, context, /* inTransaction= */ false);
this.connectionState.resetValue(OPTIMIZER_STATISTICS_PACKAGE, context, /* inTransaction= */ false);
this.connectionState.resetValue(RPC_PRIORITY, context, /* inTransaction= */ false);
this.connectionState.resetValue(DDL_IN_TRANSACTION_MODE, context, /* inTransaction= */ false);
this.connectionState.resetValue(RETURN_COMMIT_STATS, context, /* inTransaction= */ false);
this.connectionState.resetValue(DELAY_TRANSACTION_START_UNTIL_FIRST_WRITE, context, /* inTransaction= */ false);
this.connectionState.resetValue(KEEP_TRANSACTION_ALIVE, context, /* inTransaction= */ false);
this.connectionState.resetValue(AUTO_PARTITION_MODE, context, /* inTransaction= */ false);
this.connectionState.resetValue(DATA_BOOST_ENABLED, context, /* inTransaction= */ false);
this.connectionState.resetValue(MAX_PARTITIONS, context, /* inTransaction= */ false);
this.connectionState.resetValue(MAX_PARTITIONED_PARALLELISM, context, /* inTransaction= */ false);
this.connectionState.resetValue(MAX_COMMIT_DELAY, context, /* inTransaction= */ false);

this.connectionState.resetValue(AUTOCOMMIT_DML_MODE, context, /* inTransaction= */ false);
this.connectionState.resetValue(RETRY_ABORTS_INTERNALLY, context, inTransaction);
this.connectionState.resetValue(AUTOCOMMIT, context, inTransaction);
this.connectionState.resetValue(READONLY, context, inTransaction);
this.connectionState.resetValue(READ_ONLY_STALENESS, context, inTransaction);
this.connectionState.resetValue(OPTIMIZER_VERSION, context, inTransaction);
this.connectionState.resetValue(OPTIMIZER_STATISTICS_PACKAGE, context, inTransaction);
this.connectionState.resetValue(RPC_PRIORITY, context, inTransaction);
this.connectionState.resetValue(DDL_IN_TRANSACTION_MODE, context, inTransaction);
this.connectionState.resetValue(RETURN_COMMIT_STATS, context, inTransaction);
this.connectionState.resetValue(
DELAY_TRANSACTION_START_UNTIL_FIRST_WRITE, context, inTransaction);
this.connectionState.resetValue(KEEP_TRANSACTION_ALIVE, context, inTransaction);
this.connectionState.resetValue(AUTO_PARTITION_MODE, context, inTransaction);
this.connectionState.resetValue(DATA_BOOST_ENABLED, context, inTransaction);
this.connectionState.resetValue(MAX_PARTITIONS, context, inTransaction);
this.connectionState.resetValue(MAX_PARTITIONED_PARALLELISM, context, inTransaction);
this.connectionState.resetValue(MAX_COMMIT_DELAY, context, inTransaction);

this.connectionState.resetValue(AUTOCOMMIT_DML_MODE, context, inTransaction);
this.statementTag = null;
this.statementTimeout = new StatementExecutor.StatementTimeout();
this.connectionState.resetValue(DIRECTED_READ, context, /* inTransaction= */ false);
this.connectionState.resetValue(SAVEPOINT_SUPPORT, context, /* inTransaction= */ false);
this.connectionState.resetValue(DIRECTED_READ, context, inTransaction);
this.connectionState.resetValue(SAVEPOINT_SUPPORT, context, inTransaction);
this.protoDescriptors = null;
this.protoDescriptorsFilePath = null;

Expand Down Expand Up @@ -490,7 +500,31 @@ private <T> T getConnectionPropertyValue(
}

private <T> void setConnectionPropertyValue(ConnectionProperty<T> property, T value) {
this.connectionState.setValue(property, value, getCurrentContext(), /* inTransaction= */ false);
setConnectionPropertyValue(property, value, /* local = */ false);
}

private <T> void setConnectionPropertyValue(
ConnectionProperty<T> property, T value, boolean local) {
if (local) {
setLocalConnectionPropertyValue(property, value);
} else {
this.connectionState.setValue(property, value, getCurrentContext(), isInTransaction());
}
}

/**
* Sets a connection property value only for the duration of the current transaction. The effects
* of this will be undone once the transaction ends, regardless whether the transaction is
* committed or rolled back. 'Local' properties are supported for both {@link
* com.google.cloud.spanner.connection.ConnectionState.Type#TRANSACTIONAL} and {@link
* com.google.cloud.spanner.connection.ConnectionState.Type#NON_TRANSACTIONAL} connection states.
*
* <p>NOTE: This feature is not yet exposed in the public API.
*/
private <T> void setLocalConnectionPropertyValue(ConnectionProperty<T> property, T value) {
ConnectionPreconditions.checkState(
isInTransaction(), "SET LOCAL statements are only supported in transactions");
this.connectionState.setLocalValue(property, value);
}

@Override
Expand All @@ -508,6 +542,14 @@ public void setAutocommit(boolean autocommit) {
ConnectionPreconditions.checkState(
!transactionBeginMarked, "Cannot set autocommit when a transaction has begun");
setConnectionPropertyValue(AUTOCOMMIT, autocommit);
if (autocommit) {
// Commit the current transaction state if we went from autocommit=false to autocommit=true.
// Otherwise, we get the strange situation that autocommit=true cannot be committed, as we no
// longer have a transaction. Note that all the above state checks essentially mean that
// autocommit can only be set before a transaction has actually started, and not in the
// middle of a transaction.
this.connectionState.commit();
}
clearLastTransactionAndSetDefaultTransactionOptions();
// Reset the readOnlyStaleness value if it is no longer compatible with the new autocommit
// value.
Expand Down Expand Up @@ -914,6 +956,10 @@ private boolean internalIsTransactionStarted() {
&& this.currentUnitOfWork.getState() == UnitOfWorkState.STARTED;
}

private boolean hasTransactionalChanges() {
return internalIsTransactionStarted() || this.connectionState.hasTransactionalChanges();
}

@Override
public Timestamp getReadTimestamp() {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
Expand Down Expand Up @@ -957,8 +1003,13 @@ CommitResponse getCommitResponseOrNull() {

@Override
public void setReturnCommitStats(boolean returnCommitStats) {
setReturnCommitStats(returnCommitStats, /* local = */ false);
}

@VisibleForTesting
void setReturnCommitStats(boolean returnCommitStats, boolean local) {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
setConnectionPropertyValue(RETURN_COMMIT_STATS, returnCommitStats);
setConnectionPropertyValue(RETURN_COMMIT_STATS, returnCommitStats, local);
}

@Override
Expand Down Expand Up @@ -1053,10 +1104,22 @@ private interface EndTransactionMethod {
ApiFuture<Void> endAsync(CallType callType, UnitOfWork t);
}

private static final class Commit implements EndTransactionMethod {
private final class Commit implements EndTransactionMethod {
@Override
public ApiFuture<Void> endAsync(CallType callType, UnitOfWork t) {
return t.commitAsync(callType);
return t.commitAsync(
callType,
new EndTransactionCallback() {
@Override
public void onSuccess() {
ConnectionImpl.this.connectionState.commit();
}

@Override
public void onFailure() {
ConnectionImpl.this.connectionState.rollback();
}
});
}
}

Expand All @@ -1077,10 +1140,22 @@ private ApiFuture<Void> commitAsync(CallType callType) {
return endCurrentTransactionAsync(callType, commit);
}

private static final class Rollback implements EndTransactionMethod {
private final class Rollback implements EndTransactionMethod {
@Override
public ApiFuture<Void> endAsync(CallType callType, UnitOfWork t) {
return t.rollbackAsync(callType);
return t.rollbackAsync(
callType,
new EndTransactionCallback() {
@Override
public void onSuccess() {
ConnectionImpl.this.connectionState.rollback();
}

@Override
public void onFailure() {
ConnectionImpl.this.connectionState.rollback();
}
});
}
}

Expand Down Expand Up @@ -1109,7 +1184,7 @@ private ApiFuture<Void> endCurrentTransactionAsync(
statementTag == null, "Statement tags are not supported for COMMIT or ROLLBACK");
ApiFuture<Void> res;
try {
if (isTransactionStarted()) {
if (hasTransactionalChanges()) {
res = endTransactionMethod.endAsync(callType, getCurrentUnitOfWorkOrStartNewUnitOfWork());
} else {
this.currentUnitOfWork = null;
Expand Down Expand Up @@ -1877,7 +1952,8 @@ && getDdlInTransactionMode() != DdlInTransactionMode.FAIL
private Span createSpanForUnitOfWork(String name) {
return tracer
.spanBuilder(
Suppliers.memoize(() -> connectionState.getValue(TRACING_PREFIX).getValue()).get()
// We can memoize this, as it is a STARTUP property.
Suppliers.memoize(() -> this.connectionState.getValue(TRACING_PREFIX).getValue()).get()
+ "."
+ name)
.setAllAttributes(getOpenTelemetryAttributes())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -810,12 +810,12 @@ private ConnectionOptions(Builder builder) {
// OUAuth token has been specified in the connection URI.
Preconditions.checkArgument(
Stream.of(
getInitialConnectionPropertyValue(CREDENTIALS_URL),
getInitialConnectionPropertyValue(ENCODED_CREDENTIALS),
getInitialConnectionPropertyValue(CREDENTIALS_PROVIDER),
getInitialConnectionPropertyValue(OAUTH_TOKEN))
.filter(Objects::nonNull)
.count()
getInitialConnectionPropertyValue(CREDENTIALS_URL),
getInitialConnectionPropertyValue(ENCODED_CREDENTIALS),
getInitialConnectionPropertyValue(CREDENTIALS_PROVIDER),
getInitialConnectionPropertyValue(OAUTH_TOKEN))
.filter(Objects::nonNull)
.count()
<= 1,
"Specify only one of credentialsUrl, encodedCredentials, credentialsProvider and OAuth token");
checkGuardedProperty(
Expand Down Expand Up @@ -1293,4 +1293,4 @@ List<StatementExecutionInterceptor> getStatementExecutionInterceptors() {
public String toString() {
return getUri();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,27 @@
import static com.google.cloud.spanner.connection.ConnectionProperty.castProperty;
import static com.google.cloud.spanner.connection.ConnectionPropertyValue.cast;

import com.google.cloud.spanner.Dialect;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.connection.ConnectionProperty.Context;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Suppliers;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import java.util.function.Supplier;
import javax.annotation.Nullable;

class ConnectionState {
// TODO: Remove when transactional connection state is fully implemented.
static final AtomicBoolean TRANSACTIONAL_CONNECTION_STATE_ENABLED = new AtomicBoolean(false);
/**
* Set this system property to true to enable transactional connection state by default for
* PostgreSQL-dialect databases. The default is currently false.
*/
public static String ENABLE_TRANSACTIONAL_CONNECTION_STATE_FOR_POSTGRESQL_PROPERTY =
"spanner.enable_transactional_connection_state_for_postgresql";

/** The type of connection state that is used. */
enum Type {
Expand All @@ -55,7 +61,7 @@ enum Type {

private final Object lock = new Object();

@Nonnull private final Type type;
private final Supplier<Type> type;

/** properties contain the current connection properties of a connection. */
private final Map<String, ConnectionPropertyValue<?>> properties;
Expand All @@ -70,6 +76,23 @@ enum Type {

/** Constructs a {@link ConnectionState} with the given initial values. */
ConnectionState(Map<String, ConnectionPropertyValue<?>> initialValues) {
this(initialValues, Suppliers.ofInstance(null));
}

/**
* Constructs a {@link ConnectionState} with the given initial values. The dialect is used to
* determine the default {@link ConnectionState.Type}:
*
* <ul>
* <li>{@link Dialect#POSTGRESQL} uses {@link Type#TRANSACTIONAL} by default if the System
* property 'spanner.enable_transactional_connection_state_for_postgresql' has been set to
* true.
* <li>{@link Dialect#GOOGLE_STANDARD_SQL} uses {@link Type#NON_TRANSACTIONAL} by default.
* <li>Other/unknown/null also default to {@link Type#NON_TRANSACTIONAL}.
* </ul>
*/
ConnectionState(
Map<String, ConnectionPropertyValue<?>> initialValues, Supplier<Dialect> dialect) {
this.properties = new HashMap<>(CONNECTION_PROPERTIES.size());
for (Entry<String, ConnectionProperty<?>> entry : CONNECTION_PROPERTIES.entrySet()) {
this.properties.put(
Expand All @@ -87,13 +110,32 @@ enum Type {
}
}
Type configuredType = getValue(CONNECTION_STATE_TYPE).getValue();
if (configuredType == null || !TRANSACTIONAL_CONNECTION_STATE_ENABLED.get()) {
// TODO: Make this dialect-dependent.
// GoogleSQL should by default be non-transactional.
// PostgreSQL should by default be transactional.
this.type = Type.NON_TRANSACTIONAL;
if (configuredType == null) {
this.type =
Suppliers.memoize(
() ->
dialect.get() == Dialect.POSTGRESQL
&& isEnableTransactionalConnectionStateForPostgreSQL()
? Type.TRANSACTIONAL
: Type.NON_TRANSACTIONAL);
} else {
this.type = configuredType;
this.type = Suppliers.ofInstance(configuredType);
}
}

private boolean isEnableTransactionalConnectionStateForPostgreSQL() {
return Boolean.parseBoolean(
System.getProperty(ENABLE_TRANSACTIONAL_CONNECTION_STATE_FOR_POSTGRESQL_PROPERTY, "false"));
}

@VisibleForTesting
Type getType() {
return this.type.get();
}

boolean hasTransactionalChanges() {
synchronized (lock) {
return this.transactionProperties != null || this.localProperties != null;
}
}

Expand Down Expand Up @@ -155,7 +197,7 @@ <T> void setValue(
+ context);
synchronized (lock) {
if (!inTransaction
|| type == Type.NON_TRANSACTIONAL
|| getType() == Type.NON_TRANSACTIONAL
|| context.ordinal() < Context.USER.ordinal()) {
internalSetValue(property, value, properties, context);
return;
Expand All @@ -182,7 +224,7 @@ <T> void setValue(
<T> void setLocalValue(ConnectionProperty<T> property, T value) {
ConnectionPreconditions.checkState(
property.getContext().ordinal() >= Context.USER.ordinal(),
"setLocalValue is only supported for properties with context IN_TRANSACTION or higher.");
"setLocalValue is only supported for properties with context USER or higher.");
synchronized (lock) {
if (localProperties == null) {
localProperties = new HashMap<>();
Expand Down
Loading

0 comments on commit 0c2b944

Please sign in to comment.