Skip to content

chore: wire up x-goog-spanner-request-id to all (REQUESTED_BEFORE_CLEANUP_FOR_PR) #3712

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,11 @@ void initTransaction() {

private void initTransactionInternal(BeginTransactionRequest request) {
try {
XGoogSpannerRequestId reqId =
session.getRequestIdCreator().nextRequestId(1 /*TODO: retrieve channelId*/, 1);
Transaction transaction =
rpc.beginTransaction(request, getTransactionChannelHint(), isRouteToLeader());
rpc.beginTransaction(
request, reqId.withOptions(getTransactionChannelHint()), isRouteToLeader());
if (!transaction.hasReadTimestamp()) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL, "Missing expected transaction.read_timestamp metadata field");
Expand Down Expand Up @@ -803,7 +806,8 @@ ResultSet executeQueryInternalWithOptions(
tracer.createStatementAttributes(statement, options),
session.getErrorHandler(),
rpc.getExecuteQueryRetrySettings(),
rpc.getExecuteQueryRetryableCodes()) {
rpc.getExecuteQueryRetryableCodes(),
session.getRequestIdCreator()) {
@Override
CloseableIterator<PartialResultSet> startStream(
@Nullable ByteString resumeToken,
Expand All @@ -826,11 +830,13 @@ CloseableIterator<PartialResultSet> startStream(
if (selector != null) {
request.setTransaction(selector);
}

this.incrementXGoogRequestIdAttempt();
SpannerRpc.StreamingCall call =
rpc.executeQuery(
request.build(),
stream.consumer(),
getTransactionChannelHint(),
this.xGoogRequestId.withOptions(getTransactionChannelHint()),
isRouteToLeader());
session.markUsed(clock.instant());
stream.setCall(call, request.getTransaction().hasBegin());
Expand Down Expand Up @@ -1008,7 +1014,8 @@ ResultSet readInternalWithOptions(
tracer.createTableAttributes(table, readOptions),
session.getErrorHandler(),
rpc.getReadRetrySettings(),
rpc.getReadRetryableCodes()) {
rpc.getReadRetryableCodes(),
session.getRequestIdCreator()) {
@Override
CloseableIterator<PartialResultSet> startStream(
@Nullable ByteString resumeToken,
Expand All @@ -1029,11 +1036,12 @@ CloseableIterator<PartialResultSet> startStream(
builder.setTransaction(selector);
}
builder.setRequestOptions(buildRequestOptions(readOptions));
this.incrementXGoogRequestIdAttempt();
SpannerRpc.StreamingCall call =
rpc.read(
builder.build(),
stream.consumer(),
getTransactionChannelHint(),
this.xGoogRequestId.withOptions(getTransactionChannelHint()),
isRouteToLeader());
session.markUsed(clock.instant());
stream.setCall(call, /* withBeginTransaction = */ builder.getTransaction().hasBegin());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,9 @@ private List<Partition> partitionReadUsingIndex(

final PartitionReadRequest request = builder.build();
try {
PartitionResponse response = rpc.partitionRead(request, options);
XGoogSpannerRequestId reqId =
this.session.getRequestIdCreator().nextRequestId(1 /* channelId */, 0);
PartitionResponse response = rpc.partitionRead(request, reqId.withOptions(options));
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
Partition partition =
Expand Down Expand Up @@ -315,7 +317,9 @@ private List<Partition> partitionQuery(

final PartitionQueryRequest request = builder.build();
try {
PartitionResponse response = rpc.partitionQuery(request, options);
XGoogSpannerRequestId reqId =
this.session.getRequestIdCreator().nextRequestId(1 /* channelId */, 0);
PartitionResponse response = rpc.partitionQuery(request, reqId.withOptions(options));
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
Partition partition =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@
import com.google.cloud.spanner.SessionPool.PooledSessionFuture;
import com.google.cloud.spanner.SpannerImpl.ClosedException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.spanner.v1.BatchWriteResponse;
import io.opentelemetry.api.common.Attributes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import javax.annotation.Nullable;

class DatabaseClientImpl implements DatabaseClient {
Expand All @@ -40,6 +44,8 @@ class DatabaseClientImpl implements DatabaseClient {
@VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient;
@VisibleForTesting final boolean useMultiplexedSessionPartitionedOps;
@VisibleForTesting final boolean useMultiplexedSessionForRW;
private final int dbId;
private final AtomicInteger nthRequest;

final boolean useMultiplexedSessionBlindWrite;

Expand Down Expand Up @@ -86,6 +92,18 @@ class DatabaseClientImpl implements DatabaseClient {
this.tracer = tracer;
this.useMultiplexedSessionForRW = useMultiplexedSessionForRW;
this.commonAttributes = commonAttributes;

this.dbId = this.dbIdFromClientId(this.clientId);
this.nthRequest = new AtomicInteger(0);
}

private int dbIdFromClientId(String clientId) {
int i = clientId.indexOf("-");
String strWithValue = clientId.substring(i + 1);
if (Objects.equals(strWithValue, "")) {
strWithValue = "0";
}
return Integer.parseInt(strWithValue);
}

@VisibleForTesting
Expand Down Expand Up @@ -159,7 +177,11 @@ public CommitResponse writeWithOptions(
if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) {
return getMultiplexedSessionDatabaseClient().writeWithOptions(mutations, options);
}
return runWithSessionRetry(session -> session.writeWithOptions(mutations, options));

return runWithSessionRetry(
(session, reqId) -> {
return session.writeWithOptions(mutations, withReqId(reqId, options));
});
} catch (RuntimeException e) {
span.setStatus(e);
throw e;
Expand All @@ -177,14 +199,27 @@ public Timestamp writeAtLeastOnce(final Iterable<Mutation> mutations) throws Spa
public CommitResponse writeAtLeastOnceWithOptions(
final Iterable<Mutation> mutations, final TransactionOption... options)
throws SpannerException {
CommitResponse res = doWriteAtLeastOnceWithOptions(mutations, options);
System.out.println("\033[33mCommitResponse: " + res + "\033[00m");
return res;
}

private CommitResponse doWriteAtLeastOnceWithOptions(
final Iterable<Mutation> mutations, final TransactionOption... options)
throws SpannerException {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
try (IScope s = tracer.withSpan(span)) {
if (useMultiplexedSessionBlindWrite && getMultiplexedSessionDatabaseClient() != null) {
return getMultiplexedSessionDatabaseClient()
.writeAtLeastOnceWithOptions(mutations, options);
}

return runWithSessionRetry(
session -> session.writeAtLeastOnceWithOptions(mutations, options));
(session, reqId) -> {
CommitResponse in = session.writeAtLeastOnceWithOptions(mutations, withReqId(reqId, options));
System.out.println("\033[35minternalDo: " + in + "\033[00m");
return in;
});
} catch (RuntimeException e) {
span.setStatus(e);
throw e;
Expand All @@ -193,6 +228,10 @@ public CommitResponse writeAtLeastOnceWithOptions(
}
}

private int nextNthRequest() {
return this.nthRequest.incrementAndGet();
}

@Override
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
final Iterable<MutationGroup> mutationGroups, final TransactionOption... options)
Expand All @@ -202,7 +241,9 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) {
return getMultiplexedSessionDatabaseClient().batchWriteAtLeastOnce(mutationGroups, options);
}
return runWithSessionRetry(session -> session.batchWriteAtLeastOnce(mutationGroups, options));
return runWithSessionRetry(
(session, reqId) ->
session.batchWriteAtLeastOnce(mutationGroups, withReqId(reqId, options)));
} catch (RuntimeException e) {
span.setStatus(e);
throw e;
Expand Down Expand Up @@ -346,27 +387,57 @@ public long executePartitionedUpdate(final Statement stmt, final UpdateOption...
return executePartitionedUpdateWithPooledSession(stmt, options);
}

private UpdateOption[] withReqId(
final XGoogSpannerRequestId reqId, final UpdateOption... options) {
if (reqId == null) {
return options;
}
ArrayList<UpdateOption> allOptions = new ArrayList(Arrays.asList(options));
allOptions.add(new Options.RequestIdOption(reqId));
return allOptions.toArray(new UpdateOption[0]);
}

private TransactionOption[] withReqId(
final XGoogSpannerRequestId reqId, final TransactionOption... options) {
if (reqId == null) {
return options;
}
ArrayList<TransactionOption> allOptions = new ArrayList(Arrays.asList(options));
allOptions.add(new Options.RequestIdOption(reqId));
return allOptions.toArray(new TransactionOption[0]);
}

private long executePartitionedUpdateWithPooledSession(
final Statement stmt, final UpdateOption... options) {
ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION, commonAttributes);
try (IScope s = tracer.withSpan(span)) {
return runWithSessionRetry(session -> session.executePartitionedUpdate(stmt, options));
return runWithSessionRetry(
(session, reqId) -> {
return session.executePartitionedUpdate(stmt, withReqId(reqId, options));
});
} catch (RuntimeException e) {
span.setStatus(e);
span.end();
throw e;
}
}

private <T> T runWithSessionRetry(Function<Session, T> callable) {
private <T> T runWithSessionRetry(BiFunction<Session, XGoogSpannerRequestId, T> callable) {
PooledSessionFuture session = getSession();
XGoogSpannerRequestId reqId =
XGoogSpannerRequestId.of(
this.dbId, Long.valueOf(session.getChannel()), this.nextNthRequest(), 0);
while (true) {
try {
return callable.apply(session);
reqId.incrementAttempt();
return callable.apply(session, reqId);
} catch (SessionNotFoundException e) {
session =
(PooledSessionFuture)
pool.getPooledSessionReplacementHandler().replaceSession(e, session);
reqId =
XGoogSpannerRequestId.of(
this.dbId, Long.valueOf(session.getChannel()), this.nextNthRequest(), 0);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ void appendToOptions(Options options) {
private RpcLockHint lockHint;
private Boolean lastStatement;
private IsolationLevel isolationLevel;
private XGoogSpannerRequestId reqId;

// Construction is via factory methods below.
private Options() {}
Expand Down Expand Up @@ -591,6 +592,14 @@ String pageToken() {
return pageToken;
}

boolean hasReqId() {
return reqId != null;
}

XGoogSpannerRequestId reqId() {
return reqId;
}

boolean hasFilter() {
return filter != null;
}
Expand Down Expand Up @@ -756,6 +765,9 @@ public String toString() {
if (isolationLevel != null) {
b.append("isolationLevel: ").append(isolationLevel).append(' ');
}
if (reqId != null) {
b.append("requestId: ").append(reqId.toString());
}
return b.toString();
}

Expand Down Expand Up @@ -798,7 +810,8 @@ public boolean equals(Object o) {
&& Objects.equals(orderBy(), that.orderBy())
&& Objects.equals(isLastStatement(), that.isLastStatement())
&& Objects.equals(lockHint(), that.lockHint())
&& Objects.equals(isolationLevel(), that.isolationLevel());
&& Objects.equals(isolationLevel(), that.isolationLevel())
&& Objects.equals(reqId(), that.reqId());
}

@Override
Expand Down Expand Up @@ -867,6 +880,9 @@ public int hashCode() {
if (isolationLevel != null) {
result = 31 * result + isolationLevel.hashCode();
}
if (reqId != null) {
result = 31 * result + reqId.hashCode();
}
return result;
}

Expand Down Expand Up @@ -1052,4 +1068,30 @@ public boolean equals(Object o) {
return o instanceof LastStatementUpdateOption;
}
}

static final class RequestIdOption extends InternalOption
implements TransactionOption, UpdateOption {
private final XGoogSpannerRequestId reqId;

RequestIdOption(XGoogSpannerRequestId reqId) {
this.reqId = reqId;
}

@Override
void appendToOptions(Options options) {
options.reqId = this.reqId;
}

@Override
public int hashCode() {
return RequestIdOption.class.hashCode();
}

@Override
public boolean equals(Object o) {
// TODO: Examine why the precedent for LastStatementUpdateOption
// does not check against the actual value.
return o instanceof RequestIdOption;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,22 @@ long executeStreamingPartitionedUpdate(
long updateCount = 0L;
Stopwatch stopwatch = Stopwatch.createStarted(ticker);
Options options = Options.fromUpdateOptions(updateOptions);
XGoogSpannerRequestId reqId = options.reqId();
if (reqId == null) {
reqId = session.getRequestIdCreator().nextRequestId(1 /*TODO: infer channelId*/, 0);
}

try {
ExecuteSqlRequest request = newTransactionRequestFrom(statement, options);

while (true) {
reqId.incrementAttempt();
final Duration remainingTimeout = tryUpdateTimeout(timeout, stopwatch);

try {
ServerStream<PartialResultSet> stream =
rpc.executeStreamingPartitionedDml(request, session.getOptions(), remainingTimeout);
rpc.executeStreamingPartitionedDml(
request, reqId.withOptions(session.getOptions()), remainingTimeout);

for (PartialResultSet rs : stream) {
if (rs.getResumeToken() != null && !rs.getResumeToken().isEmpty()) {
Expand All @@ -113,12 +119,17 @@ long executeStreamingPartitionedUpdate(
LOGGER.log(
Level.FINER, "Retrying PartitionedDml transaction after InternalException - EOS", e);
request = resumeOrRestartRequest(resumeToken, statement, request, options);
if (resumeToken.isEmpty()) {
// Create a new xGoogSpannerRequestId.
reqId = session.getRequestIdCreator().nextRequestId(1 /*TODO: infer channelId*/, 0);
}
} catch (AbortedException e) {
LOGGER.log(Level.FINER, "Retrying PartitionedDml transaction after AbortedException", e);
resumeToken = ByteString.EMPTY;
foundStats = false;
updateCount = 0L;
request = newTransactionRequestFrom(statement, options);
reqId = session.getRequestIdCreator().nextRequestId(1 /*TODO: infer channelId*/, 0);
}
}
if (!foundStats) {
Expand Down Expand Up @@ -209,7 +220,11 @@ private ByteString initTransaction(final Options options) {
.setExcludeTxnFromChangeStreams(
options.withExcludeTxnFromChangeStreams() == Boolean.TRUE))
.build();
Transaction tx = rpc.beginTransaction(request, session.getOptions(), true);
XGoogSpannerRequestId reqId = options.reqId();
if (reqId == null) {
reqId = session.getRequestIdCreator().nextRequestId(1 /*TODO: infer channelId*/, 1);
}
Transaction tx = rpc.beginTransaction(request, reqId.withOptions(session.getOptions()), true);
if (tx.getId().isEmpty()) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL,
Expand Down
Loading