Skip to content

Commit 9940b66

Browse files
chore: fallback for Partitioned Operations with multiplexed session (#3710)
* feat(spanner): Implement fallback for Partitioned Operations with multiplexed session. If PartitionQueryRequest or PartitionReadRequest with multiplexed session return unimplemented error, then an implicit fallback to regular session will occur.
1 parent efb1680 commit 9940b66

File tree

5 files changed

+198
-67
lines changed

5 files changed

+198
-67
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 61 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,32 @@ ByteString getTransactionId() {
404404
}
405405
}
406406

407+
/**
408+
* Initializes the transaction with the timestamp specified within MultiUseReadOnlyTransaction.
409+
* This is used only for fallback of PartitionQueryRequest and PartitionReadRequest with
410+
* Multiplexed Session.
411+
*/
412+
void initFallbackTransaction() {
413+
synchronized (txnLock) {
414+
span.addAnnotation("Creating Transaction");
415+
TransactionOptions.Builder options = TransactionOptions.newBuilder();
416+
if (timestamp != null) {
417+
options
418+
.getReadOnlyBuilder()
419+
.setReadTimestamp(timestamp.toProto())
420+
.setReturnReadTimestamp(true);
421+
} else {
422+
bound.applyToBuilder(options.getReadOnlyBuilder()).setReturnReadTimestamp(true);
423+
}
424+
final BeginTransactionRequest request =
425+
BeginTransactionRequest.newBuilder()
426+
.setSession(session.getName())
427+
.setOptions(options)
428+
.build();
429+
initTransactionInternal(request);
430+
}
431+
}
432+
407433
void initTransaction() {
408434
SessionImpl.throwIfTransactionsPending();
409435

@@ -419,40 +445,43 @@ void initTransaction() {
419445
return;
420446
}
421447
span.addAnnotation("Creating Transaction");
448+
TransactionOptions.Builder options = TransactionOptions.newBuilder();
449+
bound.applyToBuilder(options.getReadOnlyBuilder()).setReturnReadTimestamp(true);
450+
final BeginTransactionRequest request =
451+
BeginTransactionRequest.newBuilder()
452+
.setSession(session.getName())
453+
.setOptions(options)
454+
.build();
455+
initTransactionInternal(request);
456+
}
457+
}
458+
459+
private void initTransactionInternal(BeginTransactionRequest request) {
460+
try {
461+
Transaction transaction =
462+
rpc.beginTransaction(request, getTransactionChannelHint(), isRouteToLeader());
463+
if (!transaction.hasReadTimestamp()) {
464+
throw SpannerExceptionFactory.newSpannerException(
465+
ErrorCode.INTERNAL, "Missing expected transaction.read_timestamp metadata field");
466+
}
467+
if (transaction.getId().isEmpty()) {
468+
throw SpannerExceptionFactory.newSpannerException(
469+
ErrorCode.INTERNAL, "Missing expected transaction.id metadata field");
470+
}
422471
try {
423-
TransactionOptions.Builder options = TransactionOptions.newBuilder();
424-
bound.applyToBuilder(options.getReadOnlyBuilder()).setReturnReadTimestamp(true);
425-
final BeginTransactionRequest request =
426-
BeginTransactionRequest.newBuilder()
427-
.setSession(session.getName())
428-
.setOptions(options)
429-
.build();
430-
Transaction transaction =
431-
rpc.beginTransaction(request, getTransactionChannelHint(), isRouteToLeader());
432-
if (!transaction.hasReadTimestamp()) {
433-
throw SpannerExceptionFactory.newSpannerException(
434-
ErrorCode.INTERNAL, "Missing expected transaction.read_timestamp metadata field");
435-
}
436-
if (transaction.getId().isEmpty()) {
437-
throw SpannerExceptionFactory.newSpannerException(
438-
ErrorCode.INTERNAL, "Missing expected transaction.id metadata field");
439-
}
440-
try {
441-
timestamp = Timestamp.fromProto(transaction.getReadTimestamp());
442-
} catch (IllegalArgumentException e) {
443-
throw SpannerExceptionFactory.newSpannerException(
444-
ErrorCode.INTERNAL, "Bad value in transaction.read_timestamp metadata field", e);
445-
}
446-
transactionId = transaction.getId();
447-
span.addAnnotation(
448-
"Transaction Creation Done",
449-
ImmutableMap.of(
450-
"Id", transaction.getId().toStringUtf8(), "Timestamp", timestamp.toString()));
451-
452-
} catch (SpannerException e) {
453-
span.addAnnotation("Transaction Creation Failed", e);
454-
throw e;
472+
timestamp = Timestamp.fromProto(transaction.getReadTimestamp());
473+
} catch (IllegalArgumentException e) {
474+
throw SpannerExceptionFactory.newSpannerException(
475+
ErrorCode.INTERNAL, "Bad value in transaction.read_timestamp metadata field", e);
455476
}
477+
transactionId = transaction.getId();
478+
span.addAnnotation(
479+
"Transaction Creation Done",
480+
ImmutableMap.of(
481+
"Id", transaction.getId().toStringUtf8(), "Timestamp", timestamp.toString()));
482+
} catch (SpannerException e) {
483+
span.addAnnotation("Transaction Creation Failed", e);
484+
throw e;
456485
}
457486
}
458487
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java

Lines changed: 56 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,8 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(TimestampBound bound) {
114114
sessionClient.getSpanner().getOptions().getDirectedReadOptions())
115115
.setSpan(sessionClient.getSpanner().getTracer().getCurrentSpan())
116116
.setTracer(sessionClient.getSpanner().getTracer()),
117-
checkNotNull(bound));
117+
checkNotNull(bound),
118+
sessionClient);
118119
}
119120

120121
@Override
@@ -137,7 +138,8 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(BatchTransactionId batc
137138
sessionClient.getSpanner().getOptions().getDirectedReadOptions())
138139
.setSpan(sessionClient.getSpanner().getTracer().getCurrentSpan())
139140
.setTracer(sessionClient.getSpanner().getTracer()),
140-
batchTransactionId);
141+
batchTransactionId,
142+
sessionClient);
141143
}
142144

143145
private boolean canUseMultiplexedSession() {
@@ -160,20 +162,28 @@ private SessionImpl getMultiplexedSession() {
160162

161163
private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransaction
162164
implements BatchReadOnlyTransaction {
163-
private final String sessionName;
165+
private String sessionName;
164166
private final Map<SpannerRpc.Option, ?> options;
167+
private final SessionClient sessionClient;
168+
private final AtomicBoolean fallbackInitiated = new AtomicBoolean(false);
165169

166170
BatchReadOnlyTransactionImpl(
167-
MultiUseReadOnlyTransaction.Builder builder, TimestampBound bound) {
171+
MultiUseReadOnlyTransaction.Builder builder,
172+
TimestampBound bound,
173+
SessionClient sessionClient) {
168174
super(builder.setTimestampBound(bound));
175+
this.sessionClient = sessionClient;
169176
this.sessionName = session.getName();
170177
this.options = session.getOptions();
171178
initTransaction();
172179
}
173180

174181
BatchReadOnlyTransactionImpl(
175-
MultiUseReadOnlyTransaction.Builder builder, BatchTransactionId batchTransactionId) {
182+
MultiUseReadOnlyTransaction.Builder builder,
183+
BatchTransactionId batchTransactionId,
184+
SessionClient sessionClient) {
176185
super(builder.setTransactionId(batchTransactionId.getTransactionId()));
186+
this.sessionClient = sessionClient;
177187
this.sessionName = session.getName();
178188
this.options = session.getOptions();
179189
}
@@ -204,6 +214,18 @@ public List<Partition> partitionReadUsingIndex(
204214
Iterable<String> columns,
205215
ReadOption... option)
206216
throws SpannerException {
217+
return partitionReadUsingIndex(partitionOptions, table, index, keys, columns, false, option);
218+
}
219+
220+
private List<Partition> partitionReadUsingIndex(
221+
PartitionOptions partitionOptions,
222+
String table,
223+
String index,
224+
KeySet keys,
225+
Iterable<String> columns,
226+
boolean isFallback,
227+
ReadOption... option)
228+
throws SpannerException {
207229
Options readOptions = Options.fromReadOptions(option);
208230
Preconditions.checkArgument(
209231
!readOptions.hasLimit(),
@@ -246,7 +268,10 @@ public List<Partition> partitionReadUsingIndex(
246268
}
247269
return partitions.build();
248270
} catch (SpannerException e) {
249-
maybeMarkUnimplementedForPartitionedOps(e);
271+
if (!isFallback && maybeMarkUnimplementedForPartitionedOps(e)) {
272+
return partitionReadUsingIndex(
273+
partitionOptions, table, index, keys, columns, true, option);
274+
}
250275
throw e;
251276
}
252277
}
@@ -255,6 +280,15 @@ public List<Partition> partitionReadUsingIndex(
255280
public List<Partition> partitionQuery(
256281
PartitionOptions partitionOptions, Statement statement, QueryOption... option)
257282
throws SpannerException {
283+
return partitionQuery(partitionOptions, statement, false, option);
284+
}
285+
286+
private List<Partition> partitionQuery(
287+
PartitionOptions partitionOptions,
288+
Statement statement,
289+
boolean isFallback,
290+
QueryOption... option)
291+
throws SpannerException {
258292
Options queryOptions = Options.fromQueryOptions(option);
259293
final PartitionQueryRequest.Builder builder =
260294
PartitionQueryRequest.newBuilder().setSession(sessionName).setSql(statement.getSql());
@@ -291,16 +325,29 @@ public List<Partition> partitionQuery(
291325
}
292326
return partitions.build();
293327
} catch (SpannerException e) {
294-
maybeMarkUnimplementedForPartitionedOps(e);
328+
if (!isFallback && maybeMarkUnimplementedForPartitionedOps(e)) {
329+
return partitionQuery(partitionOptions, statement, true, option);
330+
}
295331
throw e;
296332
}
297333
}
298334

299-
void maybeMarkUnimplementedForPartitionedOps(SpannerException spannerException) {
335+
boolean maybeMarkUnimplementedForPartitionedOps(SpannerException spannerException) {
300336
if (MultiplexedSessionDatabaseClient.verifyErrorMessage(
301337
spannerException, "Partitioned operations are not supported with multiplexed sessions")) {
302-
unimplementedForPartitionedOps.set(true);
338+
synchronized (fallbackInitiated) {
339+
if (!fallbackInitiated.get()) {
340+
session.setFallbackSessionReference(
341+
sessionClient.createSession().getSessionReference());
342+
sessionName = session.getName();
343+
initFallbackTransaction();
344+
unimplementedForPartitionedOps.set(true);
345+
fallbackInitiated.set(true);
346+
}
347+
return true;
348+
}
303349
}
350+
return false;
304351
}
305352

306353
@Override

google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchTransactionId.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.google.cloud.spanner;
1818

1919
import com.google.cloud.Timestamp;
20+
import com.google.common.annotations.VisibleForTesting;
2021
import com.google.common.base.Preconditions;
2122
import com.google.protobuf.ByteString;
2223
import java.io.Serializable;
@@ -34,6 +35,7 @@ public class BatchTransactionId implements Serializable {
3435
private final Timestamp timestamp;
3536
private static final long serialVersionUID = 8067099123096783939L;
3637

38+
@VisibleForTesting
3739
BatchTransactionId(String sessionId, ByteString transactionId, Timestamp timestamp) {
3840
this.transactionId = Preconditions.checkNotNull(transactionId);
3941
this.sessionId = Preconditions.checkNotNull(sessionId);

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ interface SessionTransaction {
120120
static final int NO_CHANNEL_HINT = -1;
121121

122122
private final SpannerImpl spanner;
123-
private final SessionReference sessionReference;
123+
private SessionReference sessionReference;
124124
private SessionTransaction activeTransaction;
125125
private ISpan currentSpan;
126126
private final Clock clock;
@@ -160,6 +160,14 @@ public String getName() {
160160
return sessionReference.getName();
161161
}
162162

163+
/**
164+
* Updates the session reference with the fallback session. This should only be used for updating
165+
* session reference with regular session in case of unimplemented error in multiplexed session.
166+
*/
167+
void setFallbackSessionReference(SessionReference sessionReference) {
168+
this.sessionReference = sessionReference;
169+
}
170+
163171
Map<SpannerRpc.Option, ?> getOptions() {
164172
return options;
165173
}

google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java

Lines changed: 70 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1554,7 +1554,11 @@ public void testInitialBeginTransactionWithRW_receivesUnimplemented_fallsBackToR
15541554
assertTrue(client.multiplexedSessionDatabaseClient.unimplementedForPartitionedOps.get());
15551555
}
15561556

1557-
// Tests the behavior of the server-side kill switch for read-write multiplexed sessions.
1557+
/**
1558+
* Tests the behavior of the server-side kill switch for partitioned query multiplexed sessions. 2
1559+
* PartitionQueryRequest should be received. First with Multiplexed session and second with
1560+
* regular session.
1561+
*/
15581562
@Test
15591563
public void testPartitionedQuery_receivesUnimplemented_fallsBackToRegularSession() {
15601564
try {
@@ -1569,40 +1573,81 @@ public void testPartitionedQuery_receivesUnimplemented_fallsBackToRegularSession
15691573

15701574
try (BatchReadOnlyTransaction transaction =
15711575
client.batchReadOnlyTransaction(TimestampBound.strong())) {
1572-
// Partitioned Query should fail
1573-
SpannerException spannerException =
1574-
assertThrows(
1575-
SpannerException.class,
1576-
() -> {
1577-
transaction.partitionQuery(PartitionOptions.getDefaultInstance(), STATEMENT);
1578-
});
1579-
assertEquals(ErrorCode.INVALID_ARGUMENT, spannerException.getErrorCode());
1576+
transaction.partitionQuery(PartitionOptions.getDefaultInstance(), STATEMENT);
15801577

15811578
// Verify that we received one PartitionQueryRequest.
15821579
List<PartitionQueryRequest> partitionQueryRequests =
15831580
mockSpanner.getRequestsOfType(PartitionQueryRequest.class);
1584-
assertEquals(1, partitionQueryRequests.size());
1581+
assertEquals(2, partitionQueryRequests.size());
15851582
// Verify the requests were executed using multiplexed sessions
1586-
Session session2 = mockSpanner.getSession(partitionQueryRequests.get(0).getSession());
1587-
assertNotNull(session2);
1588-
assertTrue(session2.getMultiplexed());
1583+
Session session = mockSpanner.getSession(partitionQueryRequests.get(0).getSession());
1584+
assertNotNull(session);
1585+
assertTrue(session.getMultiplexed());
15891586
assertTrue(BatchClientImpl.unimplementedForPartitionedOps.get());
1587+
1588+
session = mockSpanner.getSession(partitionQueryRequests.get(1).getSession());
1589+
assertNotNull(session);
1590+
assertFalse(session.getMultiplexed());
15901591
}
1592+
} finally {
1593+
BatchClientImpl.unimplementedForPartitionedOps.set(false);
1594+
}
1595+
}
1596+
1597+
/**
1598+
* Tests the behavior of the server-side kill switch for partitioned query multiplexed sessions.
1599+
* The BatchReadOnlyTransaction is initiated using BatchTransactionId. 2 PartitionQueryRequest
1600+
* should be received. First with Multiplexed session and second with regular session.
1601+
*/
1602+
@Test
1603+
public void
1604+
testPartitionedQueryWithTransactionId_receivesUnimplemented_fallsBackToRegularSession() {
1605+
try {
1606+
mockSpanner.setPartitionQueryExecutionTime(
1607+
SimulatedExecutionTime.ofException(
1608+
Status.INVALID_ARGUMENT
1609+
.withDescription(
1610+
"Partitioned operations are not supported with multiplexed sessions")
1611+
.asRuntimeException()));
1612+
BatchClientImpl client =
1613+
(BatchClientImpl) spanner.getBatchClient(DatabaseId.of("p", "i", "d"));
1614+
15911615
try (BatchReadOnlyTransaction transaction =
15921616
client.batchReadOnlyTransaction(TimestampBound.strong())) {
1593-
// Partitioned Query should fail
1594-
transaction.partitionQuery(PartitionOptions.getDefaultInstance(), STATEMENT);
15951617

1596-
// // Verify that we received two PartitionQueryRequest. and it uses a regular session due
1597-
// to
1598-
// fallback.
1599-
List<PartitionQueryRequest> partitionQueryRequests =
1600-
mockSpanner.getRequestsOfType(PartitionQueryRequest.class);
1601-
assertEquals(2, partitionQueryRequests.size());
1602-
// Verify the requests are not executed using multiplexed sessions
1603-
Session session2 = mockSpanner.getSession(partitionQueryRequests.get(1).getSession());
1604-
assertNotNull(session2);
1605-
assertFalse(session2.getMultiplexed());
1618+
try (BatchReadOnlyTransaction transaction1 =
1619+
client.batchReadOnlyTransaction(transaction.getBatchTransactionId())) {
1620+
transaction1.partitionQuery(PartitionOptions.getDefaultInstance(), STATEMENT);
1621+
1622+
// Verify that we received one PartitionQueryRequest.
1623+
List<PartitionQueryRequest> partitionQueryRequests =
1624+
mockSpanner.getRequestsOfType(PartitionQueryRequest.class);
1625+
assertEquals(2, partitionQueryRequests.size());
1626+
// Verify the requests were executed using multiplexed sessions
1627+
Session session = mockSpanner.getSession(partitionQueryRequests.get(0).getSession());
1628+
assertNotNull(session);
1629+
assertTrue(session.getMultiplexed());
1630+
assertTrue(BatchClientImpl.unimplementedForPartitionedOps.get());
1631+
1632+
session = mockSpanner.getSession(partitionQueryRequests.get(1).getSession());
1633+
assertNotNull(session);
1634+
assertFalse(session.getMultiplexed());
1635+
1636+
List<BeginTransactionRequest> beginTransactionRequests =
1637+
mockSpanner.getRequestsOfType(BeginTransactionRequest.class);
1638+
assertEquals(2, beginTransactionRequests.size());
1639+
1640+
session = mockSpanner.getSession(beginTransactionRequests.get(0).getSession());
1641+
assertNotNull(session);
1642+
assertTrue(session.getMultiplexed());
1643+
1644+
session = mockSpanner.getSession(beginTransactionRequests.get(1).getSession());
1645+
assertNotNull(session);
1646+
assertFalse(session.getMultiplexed());
1647+
assertEquals(
1648+
transaction.getBatchTransactionId().getTimestamp(),
1649+
transaction1.getBatchTransactionId().getTimestamp());
1650+
}
16061651
}
16071652
} finally {
16081653
BatchClientImpl.unimplementedForPartitionedOps.set(false);

0 commit comments

Comments
 (0)