Skip to content

chore(x-goog-spanner-request-id): plumb for BatchCreateSessions #3815

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@
import com.google.cloud.spanner.SpannerImpl.ClosedException;
import com.google.cloud.spanner.Statement.StatementFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.spanner.v1.BatchWriteResponse;
import io.opentelemetry.api.common.Attributes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import javax.annotation.Nullable;

class DatabaseClientImpl implements DatabaseClient {
Expand All @@ -45,6 +49,8 @@ class DatabaseClientImpl implements DatabaseClient {
@VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient;
@VisibleForTesting final boolean useMultiplexedSessionPartitionedOps;
@VisibleForTesting final boolean useMultiplexedSessionForRW;
private final int dbId;
private final AtomicInteger nthRequest;

final boolean useMultiplexedSessionBlindWrite;

Expand Down Expand Up @@ -91,6 +97,18 @@ class DatabaseClientImpl implements DatabaseClient {
this.tracer = tracer;
this.useMultiplexedSessionForRW = useMultiplexedSessionForRW;
this.commonAttributes = commonAttributes;

this.dbId = this.dbIdFromClientId(this.clientId);
this.nthRequest = new AtomicInteger(0);
}

private int dbIdFromClientId(String clientId) {
int i = clientId.indexOf("-");
String strWithValue = clientId.substring(i + 1);
if (Objects.equals(strWithValue, "")) {
strWithValue = "0";
}
return Integer.parseInt(strWithValue);
}

@VisibleForTesting
Expand Down Expand Up @@ -188,7 +206,11 @@ public CommitResponse writeWithOptions(
if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) {
return getMultiplexedSessionDatabaseClient().writeWithOptions(mutations, options);
}
return runWithSessionRetry(session -> session.writeWithOptions(mutations, options));

return runWithSessionRetry(
(session, reqId) -> {
return session.writeWithOptions(mutations, withReqId(reqId, options));
});
} catch (RuntimeException e) {
span.setStatus(e);
throw e;
Expand All @@ -213,7 +235,8 @@ public CommitResponse writeAtLeastOnceWithOptions(
.writeAtLeastOnceWithOptions(mutations, options);
}
return runWithSessionRetry(
session -> session.writeAtLeastOnceWithOptions(mutations, options));
(session, reqId) ->
session.writeAtLeastOnceWithOptions(mutations, withReqId(reqId, options)));
} catch (RuntimeException e) {
span.setStatus(e);
throw e;
Expand All @@ -222,6 +245,10 @@ public CommitResponse writeAtLeastOnceWithOptions(
}
}

private int nextNthRequest() {
return this.nthRequest.incrementAndGet();
}

@Override
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
final Iterable<MutationGroup> mutationGroups, final TransactionOption... options)
Expand All @@ -231,7 +258,9 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) {
return getMultiplexedSessionDatabaseClient().batchWriteAtLeastOnce(mutationGroups, options);
}
return runWithSessionRetry(session -> session.batchWriteAtLeastOnce(mutationGroups, options));
return runWithSessionRetry(
(session, reqId) ->
session.batchWriteAtLeastOnce(mutationGroups, withReqId(reqId, options)));
} catch (RuntimeException e) {
span.setStatus(e);
throw e;
Expand Down Expand Up @@ -383,27 +412,57 @@ private Future<Dialect> getDialectAsync() {
return pool.getDialectAsync();
}

private UpdateOption[] withReqId(
final XGoogSpannerRequestId reqId, final UpdateOption... options) {
if (reqId == null) {
return options;
}
ArrayList<UpdateOption> allOptions = new ArrayList(Arrays.asList(options));
allOptions.add(new Options.RequestIdOption(reqId));
return allOptions.toArray(new UpdateOption[0]);
}

private TransactionOption[] withReqId(
final XGoogSpannerRequestId reqId, final TransactionOption... options) {
if (reqId == null) {
return options;
}
ArrayList<TransactionOption> allOptions = new ArrayList(Arrays.asList(options));
allOptions.add(new Options.RequestIdOption(reqId));
return allOptions.toArray(new TransactionOption[0]);
}

private long executePartitionedUpdateWithPooledSession(
final Statement stmt, final UpdateOption... options) {
ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION, commonAttributes);
try (IScope s = tracer.withSpan(span)) {
return runWithSessionRetry(session -> session.executePartitionedUpdate(stmt, options));
return runWithSessionRetry(
(session, reqId) -> {
return session.executePartitionedUpdate(stmt, withReqId(reqId, options));
});
} catch (RuntimeException e) {
span.setStatus(e);
span.end();
throw e;
}
}

private <T> T runWithSessionRetry(Function<Session, T> callable) {
private <T> T runWithSessionRetry(BiFunction<Session, XGoogSpannerRequestId, T> callable) {
PooledSessionFuture session = getSession();
XGoogSpannerRequestId reqId =
XGoogSpannerRequestId.of(
this.dbId, Long.valueOf(session.getChannel()), this.nextNthRequest(), 0);
while (true) {
try {
return callable.apply(session);
reqId.incrementAttempt();
return callable.apply(session, reqId);
} catch (SessionNotFoundException e) {
session =
(PooledSessionFuture)
pool.getPooledSessionReplacementHandler().replaceSession(e, session);
reqId =
XGoogSpannerRequestId.of(
this.dbId, Long.valueOf(session.getChannel()), this.nextNthRequest(), 0);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ public static UpdateTransactionOption excludeTxnFromChangeStreams() {
return EXCLUDE_TXN_FROM_CHANGE_STREAMS_OPTION;
}

public static RequestIdOption requestId(XGoogSpannerRequestId reqId) {
return new RequestIdOption(reqId);
}

/**
* Specifying this will cause the read to yield at most this many rows. This should be greater
* than 0.
Expand Down Expand Up @@ -535,6 +539,7 @@ void appendToOptions(Options options) {
private RpcLockHint lockHint;
private Boolean lastStatement;
private IsolationLevel isolationLevel;
private XGoogSpannerRequestId reqId;

// Construction is via factory methods below.
private Options() {}
Expand Down Expand Up @@ -599,6 +604,14 @@ String filter() {
return filter;
}

boolean hasReqId() {
return reqId != null;
}

XGoogSpannerRequestId reqId() {
return reqId;
}

boolean hasPriority() {
return priority != null;
}
Expand Down Expand Up @@ -756,6 +769,9 @@ public String toString() {
if (isolationLevel != null) {
b.append("isolationLevel: ").append(isolationLevel).append(' ');
}
if (reqId != null) {
b.append("requestId: ").append(reqId.toString());
}
return b.toString();
}

Expand Down Expand Up @@ -798,7 +814,8 @@ public boolean equals(Object o) {
&& Objects.equals(orderBy(), that.orderBy())
&& Objects.equals(isLastStatement(), that.isLastStatement())
&& Objects.equals(lockHint(), that.lockHint())
&& Objects.equals(isolationLevel(), that.isolationLevel());
&& Objects.equals(isolationLevel(), that.isolationLevel())
&& Objects.equals(reqId(), that.reqId());
}

@Override
Expand Down Expand Up @@ -867,6 +884,9 @@ public int hashCode() {
if (isolationLevel != null) {
result = 31 * result + isolationLevel.hashCode();
}
if (reqId != null) {
result = 31 * result + reqId.hashCode();
}
return result;
}

Expand Down Expand Up @@ -1052,4 +1072,30 @@ public boolean equals(Object o) {
return o instanceof LastStatementUpdateOption;
}
}

static final class RequestIdOption extends InternalOption
implements ReadOption, TransactionOption, UpdateOption {
private final XGoogSpannerRequestId reqId;

RequestIdOption(XGoogSpannerRequestId reqId) {
this.reqId = reqId;
}

@Override
void appendToOptions(Options options) {
options.reqId = this.reqId;
}

@Override
public int hashCode() {
return RequestIdOption.class.hashCode();
}

@Override
public boolean equals(Object o) {
// TODO: Examine why the precedent for LastStatementUpdateOption
// does not check against the actual value.
return o instanceof RequestIdOption;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.concurrent.GuardedBy;

/** Client for creating single sessions and batches of sessions. */
class SessionClient implements AutoCloseable {
class SessionClient implements AutoCloseable, XGoogSpannerRequestId.RequestIdCreator {
static class SessionId {
private static final PathTemplate NAME_TEMPLATE =
PathTemplate.create(
Expand Down Expand Up @@ -174,6 +175,12 @@ interface SessionConsumer {
private final DatabaseId db;
private final Attributes commonAttributes;

// SessionClient is created long before a DatabaseClientImpl is created,
// as batch sessions are firstly created then later attached to each Client.
private static AtomicInteger NTH_ID = new AtomicInteger(0);
private final int nthId;
private final AtomicInteger nthRequest;

@GuardedBy("this")
private volatile long sessionChannelCounter;

Expand All @@ -186,6 +193,8 @@ interface SessionConsumer {
this.executorFactory = executorFactory;
this.executor = executorFactory.get();
this.commonAttributes = spanner.getTracer().createCommonAttributes(db);
this.nthId = SessionClient.NTH_ID.incrementAndGet();
this.nthRequest = new AtomicInteger(0);
}

@Override
Expand All @@ -201,28 +210,38 @@ DatabaseId getDatabaseId() {
return db;
}

@Override
public XGoogSpannerRequestId nextRequestId(long channelId, int attempt) {
return XGoogSpannerRequestId.of(this.nthId, this.nthRequest.incrementAndGet(), channelId, 1);
}

/** Create a single session. */
SessionImpl createSession() {
// The sessionChannelCounter could overflow, but that will just flip it to Integer.MIN_VALUE,
// which is also a valid channel hint.
final Map<SpannerRpc.Option, ?> options;
final long channelId;
synchronized (this) {
options = optionMap(SessionOption.channelHint(sessionChannelCounter++));
channelId = sessionChannelCounter;
}
ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_SESSION, this.commonAttributes);
try (IScope s = spanner.getTracer().withSpan(span)) {
XGoogSpannerRequestId reqId = this.nextRequestId(channelId, 1);
com.google.spanner.v1.Session session =
spanner
.getRpc()
.createSession(
db.getName(),
spanner.getOptions().getDatabaseRole(),
spanner.getOptions().getSessionLabels(),
options);
reqId.withOptions(options));
SessionReference sessionReference =
new SessionReference(
session.getName(), session.getCreateTime(), session.getMultiplexed(), options);
return new SessionImpl(spanner, sessionReference);
SessionImpl sessionImpl = new SessionImpl(spanner, sessionReference);
sessionImpl.setRequestIdCreator(this);
return sessionImpl;
} catch (RuntimeException e) {
span.setStatus(e);
throw e;
Expand Down Expand Up @@ -273,6 +292,7 @@ SessionImpl createMultiplexedSession() {
spanner,
new SessionReference(
session.getName(), session.getCreateTime(), session.getMultiplexed(), null));
sessionImpl.setRequestIdCreator(this);
span.addAnnotation(
String.format("Request for %d multiplexed session returned %d session", 1, 1));
return sessionImpl;
Expand Down Expand Up @@ -387,6 +407,8 @@ private List<SessionImpl> internalBatchCreateSessions(
.spanBuilderWithExplicitParent(SpannerImpl.BATCH_CREATE_SESSIONS_REQUEST, parent);
span.addAnnotation(String.format("Requesting %d sessions", sessionCount));
try (IScope s = spanner.getTracer().withSpan(span)) {
XGoogSpannerRequestId reqId =
XGoogSpannerRequestId.of(this.nthId, this.nthRequest.incrementAndGet(), channelHint, 1);
List<com.google.spanner.v1.Session> sessions =
spanner
.getRpc()
Expand All @@ -395,21 +417,20 @@ private List<SessionImpl> internalBatchCreateSessions(
sessionCount,
spanner.getOptions().getDatabaseRole(),
spanner.getOptions().getSessionLabels(),
options);
reqId.withOptions(options));
span.addAnnotation(
String.format(
"Request for %d sessions returned %d sessions", sessionCount, sessions.size()));
span.end();
List<SessionImpl> res = new ArrayList<>(sessionCount);
for (com.google.spanner.v1.Session session : sessions) {
res.add(
SessionImpl sessionImpl =
new SessionImpl(
spanner,
new SessionReference(
session.getName(),
session.getCreateTime(),
session.getMultiplexed(),
options)));
session.getName(), session.getCreateTime(), session.getMultiplexed(), options));
sessionImpl.setRequestIdCreator(this);
res.add(sessionImpl);
}
return res;
} catch (RuntimeException e) {
Expand All @@ -425,6 +446,8 @@ SessionImpl sessionWithId(String name) {
synchronized (this) {
options = optionMap(SessionOption.channelHint(sessionChannelCounter++));
}
return new SessionImpl(spanner, new SessionReference(name, options));
SessionImpl sessionImpl = new SessionImpl(spanner, new SessionReference(name, options));
sessionImpl.setRequestIdCreator(this);
return sessionImpl;
}
}
Loading
Loading