-
Notifications
You must be signed in to change notification settings - Fork 131
feat: support begin with AbortedException for manager interface #3835
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
a11b756
7270c9b
d0de490
b2f22da
28704e3
00360f0
e5ef71e
c11f3a2
fa31c51
444d1e1
22de193
234c732
d399cbd
a149a4f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -170,6 +170,19 @@ interface AsyncTransactionFunction<I, O> { | |||||||||||||||||||
*/ | ||||||||||||||||||||
TransactionContextFuture beginAsync(); | ||||||||||||||||||||
|
||||||||||||||||||||
/** | ||||||||||||||||||||
* Initializes a new read-write transaction. This method must be called before performing any | ||||||||||||||||||||
* operations, and it can only be invoked once per transaction lifecycle. | ||||||||||||||||||||
* | ||||||||||||||||||||
* <p>This is especially useful in scenarios involving multiplexed sessions and when creating a | ||||||||||||||||||||
* new transaction for retry attempts. If {@link #resetForRetryAsync()} is not used, you can pass | ||||||||||||||||||||
* the {@link AbortedException} from a previous attempt here to preserve the transaction's | ||||||||||||||||||||
* priority. | ||||||||||||||||||||
Comment on lines
+177
to
+180
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||
* | ||||||||||||||||||||
* <p>For regular sessions, this behaves the same as {@link #beginAsync()}. | ||||||||||||||||||||
*/ | ||||||||||||||||||||
TransactionContextFuture beginAsync(AbortedException exception); | ||||||||||||||||||||
|
||||||||||||||||||||
/** | ||||||||||||||||||||
* Rolls back the currently active transaction. In most cases there should be no need to call this | ||||||||||||||||||||
* explicitly since {@link #close()} would automatically roll back any active transaction. | ||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -76,14 +76,26 @@ public ApiFuture<Void> closeAsync() { | |||||||
@Override | ||||||||
public TransactionContextFutureImpl beginAsync() { | ||||||||
Preconditions.checkState(txn == null, "begin can only be called once"); | ||||||||
return new TransactionContextFutureImpl(this, internalBeginAsync(true)); | ||||||||
return new TransactionContextFutureImpl(this, internalBeginAsync(true, ByteString.EMPTY)); | ||||||||
} | ||||||||
|
||||||||
private ApiFuture<TransactionContext> internalBeginAsync(boolean firstAttempt) { | ||||||||
@Override | ||||||||
public TransactionContextFutureImpl beginAsync(AbortedException exception) { | ||||||||
Preconditions.checkState(txn == null, "begin can only be called once"); | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
ByteString abortedTransactionId = | ||||||||
exception.getTransactionID() != null ? exception.getTransactionID() : ByteString.EMPTY; | ||||||||
return new TransactionContextFutureImpl(this, internalBeginAsync(true, abortedTransactionId)); | ||||||||
} | ||||||||
|
||||||||
private ApiFuture<TransactionContext> internalBeginAsync( | ||||||||
boolean firstAttempt, ByteString abortedTransactionID) { | ||||||||
txnState = TransactionState.STARTED; | ||||||||
|
||||||||
// Determine the latest transactionId when using a multiplexed session. | ||||||||
ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY; | ||||||||
if (firstAttempt && session.getIsMultiplexed()) { | ||||||||
multiplexedSessionPreviousTransactionId = abortedTransactionID; | ||||||||
} | ||||||||
if (txn != null && session.getIsMultiplexed() && !firstAttempt) { | ||||||||
// Use the current transactionId if available, otherwise fallback to the previous aborted | ||||||||
// transactionId. | ||||||||
|
@@ -187,7 +199,7 @@ public TransactionContextFuture resetForRetryAsync() { | |||||||
throw new IllegalStateException( | ||||||||
"resetForRetry can only be called if the previous attempt aborted"); | ||||||||
} | ||||||||
return new TransactionContextFutureImpl(this, internalBeginAsync(false)); | ||||||||
return new TransactionContextFutureImpl(this, internalBeginAsync(false, ByteString.EMPTY)); | ||||||||
} | ||||||||
|
||||||||
@Override | ||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -61,6 +61,18 @@ enum TransactionState { | |
*/ | ||
TransactionContext begin(); | ||
|
||
/** | ||
* Initializes a new read-write transaction. This method must be called before performing any | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See my comment on |
||
* operations, and it can only be invoked once per transaction lifecycle. | ||
* | ||
* <p>This is especially useful in scenarios involving multiplexed sessions and when creating a | ||
* new transaction for retry attempts. If {@link #resetForRetry()} is not used, you can pass the | ||
* {@link AbortedException} from a previous attempt here to preserve the transaction's priority. | ||
* | ||
* <p>For regular sessions, this behaves the same as {@link #begin()}. | ||
*/ | ||
TransactionContext begin(AbortedException exception); | ||
|
||
/** | ||
* Commits the currently active transaction. If the transaction was already aborted, then this | ||
* would throw an {@link AbortedException}. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -53,8 +53,20 @@ public void setSpan(ISpan span) { | |
@Override | ||
public TransactionContext begin() { | ||
Preconditions.checkState(txn == null, "begin can only be called once"); | ||
return begin(ByteString.EMPTY); | ||
} | ||
|
||
@Override | ||
public TransactionContext begin(AbortedException exception) { | ||
Preconditions.checkState(txn == null, "begin can only be called once"); | ||
ByteString previousAbortedTransactionID = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here also: Add a not-null check for the AbortedException. |
||
exception.getTransactionID() != null ? exception.getTransactionID() : ByteString.EMPTY; | ||
return begin(previousAbortedTransactionID); | ||
} | ||
|
||
TransactionContext begin(ByteString previousTransactionId) { | ||
try (IScope s = tracer.withSpan(span)) { | ||
txn = session.newTransaction(options, /* previousTransactionId = */ ByteString.EMPTY); | ||
txn = session.newTransaction(options, /* previousTransactionId = */ previousTransactionId); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: you can remove the comment for previousTransactionId, as that is now clear from the variable name |
||
session.setActive(this); | ||
txnState = TransactionState.STARTED; | ||
return txn; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -793,6 +793,7 @@ public SpannerException onError(SpannerException e, boolean withBeginTransaction | |
long delay = -1L; | ||
if (exceptionToThrow instanceof AbortedException) { | ||
delay = exceptionToThrow.getRetryDelayInMillis(); | ||
((AbortedException) exceptionToThrow).setTransactionID(this.transactionId); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess |
||
} | ||
if (delay == -1L) { | ||
txnLogger.log( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.