Skip to content

feat: support begin with AbortedException for manager interface #3835

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -964,4 +964,16 @@
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>com.google.cloud.spanner.Statement$StatementFactory getStatementFactory()</method>
</difference>

<!-- Add method begin() with AbortedException in TransactionManager and AsyncTransactionManager -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/AsyncTransactionManager</className>
<method>com.google.cloud.spanner.AsyncTransactionManager$TransactionContextFuture beginAsync(com.google.cloud.spanner.AbortedException)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/TransactionManager</className>
<method>com.google.cloud.spanner.TransactionContext begin(com.google.cloud.spanner.AbortedException)</method>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.spanner;

import com.google.api.gax.rpc.ApiException;
import com.google.protobuf.ByteString;
import javax.annotation.Nullable;

/**
Expand All @@ -32,6 +33,8 @@ public class AbortedException extends SpannerException {
*/
private static final boolean IS_RETRYABLE = false;

private ByteString transactionID;

/** Private constructor. Use {@link SpannerExceptionFactory} to create instances. */
AbortedException(
DoNotConstructDirectly token, @Nullable String message, @Nullable Throwable cause) {
Expand All @@ -46,6 +49,9 @@ public class AbortedException extends SpannerException {
@Nullable ApiException apiException,
@Nullable XGoogSpannerRequestId reqId) {
super(token, ErrorCode.ABORTED, IS_RETRYABLE, message, cause, apiException, reqId);
if (cause instanceof AbortedException) {
this.transactionID = ((AbortedException) cause).getTransactionID();
}
}

/**
Expand All @@ -55,4 +61,12 @@ public class AbortedException extends SpannerException {
public boolean isEmulatorOnlySupportsOneTransactionException() {
return getMessage().endsWith("The emulator only supports one transaction at a time.");
}

void setTransactionID(ByteString transactionID) {
this.transactionID = transactionID;
}

ByteString getTransactionID() {
return this.transactionID;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,19 @@ interface AsyncTransactionFunction<I, O> {
*/
TransactionContextFuture beginAsync();

/**
* Initializes a new read-write transaction. This method must be called before performing any
* operations, and it can only be invoked once per transaction lifecycle.
Comment on lines +174 to +175
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Initializes a new read-write transaction. This method must be called before performing any
* operations, and it can only be invoked once per transaction lifecycle.
* Initializes a new read-write transaction that is a retry of a previously aborted transaction.
* This method must be called before performing any
* operations, and it can only be invoked once per transaction lifecycle.

*
* <p>This is especially useful in scenarios involving multiplexed sessions and when creating a
* new transaction for retry attempts. If {@link #resetForRetryAsync()} is not used, you can pass
* the {@link AbortedException} from a previous attempt here to preserve the transaction's
* priority.
Comment on lines +177 to +180
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* <p>This is especially useful in scenarios involving multiplexed sessions and when creating a
* new transaction for retry attempts. If {@link #resetForRetryAsync()} is not used, you can pass
* the {@link AbortedException} from a previous attempt here to preserve the transaction's
* priority.
* <p>This method should only be used when multiplexed sessions are enabled to create a retry
* for a previously aborted transaction. This method can be used instead of {@link #resetForRetryAsync()}
* to create a retry. Using this method or {@link #resetForRetryAsync()} will have the same effect.
* You must pass in the {@link AbortedException} from the previous attempt to preserve the transaction's
* priority.

*
* <p>For regular sessions, this behaves the same as {@link #beginAsync()}.
*/
TransactionContextFuture beginAsync(AbortedException exception);

/**
* Rolls back the currently active transaction. In most cases there should be no need to call this
* explicitly since {@link #close()} would automatically roll back any active transaction.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,26 @@ public ApiFuture<Void> closeAsync() {
@Override
public TransactionContextFutureImpl beginAsync() {
Preconditions.checkState(txn == null, "begin can only be called once");
return new TransactionContextFutureImpl(this, internalBeginAsync(true));
return new TransactionContextFutureImpl(this, internalBeginAsync(true, ByteString.EMPTY));
}

private ApiFuture<TransactionContext> internalBeginAsync(boolean firstAttempt) {
@Override
public TransactionContextFutureImpl beginAsync(AbortedException exception) {
Preconditions.checkState(txn == null, "begin can only be called once");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Preconditions.checkState(txn == null, "begin can only be called once");
Preconditions.checkState(txn == null, "begin can only be called once");
Preconditions.checkNotNull(exception, "AbortedException from the previous attempt is required");

ByteString abortedTransactionId =
exception.getTransactionID() != null ? exception.getTransactionID() : ByteString.EMPTY;
return new TransactionContextFutureImpl(this, internalBeginAsync(true, abortedTransactionId));
}

private ApiFuture<TransactionContext> internalBeginAsync(
boolean firstAttempt, ByteString abortedTransactionID) {
txnState = TransactionState.STARTED;

// Determine the latest transactionId when using a multiplexed session.
ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY;
if (firstAttempt && session.getIsMultiplexed()) {
multiplexedSessionPreviousTransactionId = abortedTransactionID;
}
if (txn != null && session.getIsMultiplexed() && !firstAttempt) {
// Use the current transactionId if available, otherwise fallback to the previous aborted
// transactionId.
Expand Down Expand Up @@ -187,7 +199,7 @@ public TransactionContextFuture resetForRetryAsync() {
throw new IllegalStateException(
"resetForRetry can only be called if the previous attempt aborted");
}
return new TransactionContextFutureImpl(this, internalBeginAsync(false));
return new TransactionContextFutureImpl(this, internalBeginAsync(false, ByteString.EMPTY));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ public TransactionContextFuture beginAsync() {
return getAsyncTransactionManager().beginAsync();
}

@Override
public TransactionContextFuture beginAsync(AbortedException exception) {
return getAsyncTransactionManager().beginAsync(exception);
}

@Override
public ApiFuture<Void> rollbackAsync() {
return getAsyncTransactionManager().rollbackAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public TransactionContext begin() {
return getTransactionManager().begin();
}

@Override
public TransactionContext begin(AbortedException exception) {
return getTransactionManager().begin(exception);
}

@Override
public void commit() {
getTransactionManager().commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,13 @@ public TransactionContext begin() {
return internalBegin();
}

@Override
public TransactionContext begin(AbortedException exception) {
// For regular sessions, the input exception is ignored and the behavior is equivalent to
// calling {@link #begin()}.
return begin();
}

private TransactionContext internalBegin() {
TransactionContext res = new SessionPoolTransactionContext(this, delegate.begin());
session.get().markUsed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,13 @@ public void onSuccess(TransactionContext result) {
return new TransactionContextFutureImpl(this, delegateTxnFuture);
}

@Override
public TransactionContextFuture beginAsync(AbortedException exception) {
// For regular sessions, the input exception is ignored and the behavior is equivalent to
// calling {@link #beginAsync()}.
return beginAsync();
}

@Override
public void onError(Throwable t) {
if (t instanceof AbortedException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,18 @@ enum TransactionState {
*/
TransactionContext begin();

/**
* Initializes a new read-write transaction. This method must be called before performing any
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment on beginAsync. I think that we should be more explicit in when this method should be used, and not describe it as 'useful in scenarios involving multiplexed sessions'. I think that could lead to wrong usage of this method.

* operations, and it can only be invoked once per transaction lifecycle.
*
* <p>This is especially useful in scenarios involving multiplexed sessions and when creating a
* new transaction for retry attempts. If {@link #resetForRetry()} is not used, you can pass the
* {@link AbortedException} from a previous attempt here to preserve the transaction's priority.
*
* <p>For regular sessions, this behaves the same as {@link #begin()}.
*/
TransactionContext begin(AbortedException exception);

/**
* Commits the currently active transaction. If the transaction was already aborted, then this
* would throw an {@link AbortedException}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,20 @@ public void setSpan(ISpan span) {
@Override
public TransactionContext begin() {
Preconditions.checkState(txn == null, "begin can only be called once");
return begin(ByteString.EMPTY);
}

@Override
public TransactionContext begin(AbortedException exception) {
Preconditions.checkState(txn == null, "begin can only be called once");
ByteString previousAbortedTransactionID =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here also: Add a not-null check for the AbortedException.

exception.getTransactionID() != null ? exception.getTransactionID() : ByteString.EMPTY;
return begin(previousAbortedTransactionID);
}

TransactionContext begin(ByteString previousTransactionId) {
try (IScope s = tracer.withSpan(span)) {
txn = session.newTransaction(options, /* previousTransactionId = */ ByteString.EMPTY);
txn = session.newTransaction(options, /* previousTransactionId = */ previousTransactionId);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you can remove the comment for previousTransactionId, as that is now clear from the variable name

session.setActive(this);
txnState = TransactionState.STARTED;
return txn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,7 @@ public SpannerException onError(SpannerException e, boolean withBeginTransaction
long delay = -1L;
if (exceptionToThrow instanceof AbortedException) {
delay = exceptionToThrow.getRetryDelayInMillis();
((AbortedException) exceptionToThrow).setTransactionID(this.transactionId);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be
this.transactionId or
this.transactionId != null ? this.transactionId : this.getPreviousTransactionId()

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this.transactionId != null ? this.transactionId : this.getPreviousTransactionId() would cause the transaction priority to be preserved also when the begin call of a transaction fails, right? In that case, I think we should do that.

}
if (delay == -1L) {
txnLogger.log(
Expand Down
Loading
Loading