Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: send priming requests on the channel directly #2435

Merged
merged 15 commits into from
Dec 3, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,22 @@
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.core.BetaApi;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.grpc.ChannelPrimer;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.auth.Credentials;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.bigtable.v2.BigtableGrpc;
import com.google.bigtable.v2.InstanceName;
import com.google.bigtable.v2.PingAndWarmRequest;
import com.google.cloud.bigtable.data.v2.internal.NameUtil;
import com.google.common.base.Preconditions;
import com.google.bigtable.v2.PingAndWarmResponse;
import com.google.common.net.PercentEscaper;
import io.grpc.CallCredentials;
import io.grpc.ClientCall;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.auth.MoreCallCredentials;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

/**
Expand All @@ -41,27 +45,46 @@
class BigtableChannelPrimer implements ChannelPrimer {
private static Logger LOG = Logger.getLogger(BigtableChannelPrimer.class.toString());

private final EnhancedBigtableStubSettings settingsTemplate;
private final Metadata.Key<String> requestParams =
Metadata.Key.of("x-goog-request-params", Metadata.ASCII_STRING_MARSHALLER);
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
private final PingAndWarmRequest request;
private final Metadata metadata = new Metadata();
mutianf marked this conversation as resolved.
Show resolved Hide resolved
private final CallCredentials credentials;

static BigtableChannelPrimer create(
Credentials credentials, String projectId, String instanceId, String appProfileId) {
EnhancedBigtableStubSettings.Builder builder =
EnhancedBigtableStubSettings.newBuilder()
.setProjectId(projectId)
.setInstanceId(instanceId)
static BigtableChannelPrimer create(EnhancedBigtableStubSettings settings) throws IOException {
return new BigtableChannelPrimer(settings);
}

BigtableChannelPrimer(EnhancedBigtableStubSettings settings) throws IOException {
mutianf marked this conversation as resolved.
Show resolved Hide resolved
String projectId = settings.getProjectId();
String instanceId = settings.getInstanceId();
String appProfileId = settings.getAppProfileId();

if (settings.getCredentialsProvider().getCredentials() != null) {
credentials = MoreCallCredentials.from(settings.getCredentialsProvider().getCredentials());
} else {
credentials = null;
}

request =
PingAndWarmRequest.newBuilder()
.setName(InstanceName.format(projectId, instanceId))
.setAppProfileId(appProfileId)
.setCredentialsProvider(FixedCredentialsProvider.create(credentials))
// Disable refreshing channel here to avoid creating settings in a loop
.setRefreshingChannel(false)
.setExecutorProvider(
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build());
.build();

return new BigtableChannelPrimer(builder.build());
}
PercentEscaper escaper = new PercentEscaper("._-~", false);

private BigtableChannelPrimer(EnhancedBigtableStubSettings settingsTemplate) {
Preconditions.checkNotNull(settingsTemplate, "settingsTemplate can't be null");
this.settingsTemplate = settingsTemplate;
Metadata metadata = new Metadata();
mutianf marked this conversation as resolved.
Show resolved Hide resolved
settings
.getHeaderProvider()
.getHeaders()
.forEach((k, v) -> metadata.put(Metadata.Key.of(k, Metadata.ASCII_STRING_MARSHALLER), v));

metadata.put(
requestParams,
escaper.escape(
mutianf marked this conversation as resolved.
Show resolved Hide resolved
String.format(
"name=%s&app_profile_id=%s", request.getName(), request.getAppProfileId())));
}

@Override
Expand All @@ -78,35 +101,43 @@ private void primeChannelUnsafe(ManagedChannel managedChannel) throws IOExceptio
sendPrimeRequests(managedChannel);
}

private void sendPrimeRequests(ManagedChannel managedChannel) throws IOException {
// Wrap the channel in a temporary stub
EnhancedBigtableStubSettings primingSettings =
settingsTemplate
.toBuilder()
.setTransportChannelProvider(
FixedTransportChannelProvider.create(GrpcTransportChannel.create(managedChannel)))
.build();
private void sendPrimeRequests(ManagedChannel managedChannel) {
try {
ClientCall<PingAndWarmRequest, PingAndWarmResponse> clientCall =
managedChannel.newCall(
BigtableGrpc.getPingAndWarmMethod(),
GrpcCallContext.createDefault().getCallOptions().withCallCredentials(credentials));
mutianf marked this conversation as resolved.
Show resolved Hide resolved

SettableApiFuture<PingAndWarmResponse> future = SettableApiFuture.create();
clientCall.start(
new ClientCall.Listener<PingAndWarmResponse>() {
PingAndWarmResponse response;

@Override
public void onMessage(PingAndWarmResponse message) {
response = message;
}

@Override
public void onClose(Status status, Metadata trailers) {
if (status.isOk()) {
future.set(response);
} else {
future.setException(status.asException());
}
}
},
metadata);
mutianf marked this conversation as resolved.
Show resolved Hide resolved
clientCall.sendMessage(request);
clientCall.halfClose();
clientCall.request(1);
mutianf marked this conversation as resolved.
Show resolved Hide resolved

try (EnhancedBigtableStub stub = EnhancedBigtableStub.create(primingSettings)) {
PingAndWarmRequest request =
PingAndWarmRequest.newBuilder()
.setName(
NameUtil.formatInstanceName(
primingSettings.getProjectId(), primingSettings.getInstanceId()))
.setAppProfileId(primingSettings.getAppProfileId())
.build();

try {
stub.pingAndWarmCallable().call(request);
} catch (Throwable e) {
// TODO: Not sure if we should swallow the error here. We are pre-emptively swapping
// channels if the new
// channel is bad.
if (e instanceof ExecutionException) {
e = e.getCause();
}
LOG.warning(String.format("Failed to prime channel: %s", e));
}
future.get(1, TimeUnit.MINUTES);
} catch (Throwable e) {
// TODO: Not sure if we should swallow the error here. We are pre-emptively swapping
// channels if the new
// channel is bad.
LOG.warning(String.format("Failed to prime channel: %s", e));
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,7 @@ public static BigtableClientContext create(EnhancedBigtableStubSettings settings
}
// Inject channel priming if enabled
if (builder.isRefreshingChannel()) {
transportProvider.setChannelPrimer(
BigtableChannelPrimer.create(
credentials,
settings.getProjectId(),
settings.getInstanceId(),
settings.getAppProfileId()));
transportProvider.setChannelPrimer(BigtableChannelPrimer.create(builder.build()));
}

builder.setTransportChannelProvider(transportProvider.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@
import com.google.bigtable.v2.GenerateInitialChangeStreamPartitionsResponse;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.MutateRowsResponse;
import com.google.bigtable.v2.PingAndWarmRequest;
import com.google.bigtable.v2.PingAndWarmResponse;
import com.google.bigtable.v2.ReadChangeStreamRequest;
import com.google.bigtable.v2.ReadChangeStreamResponse;
import com.google.bigtable.v2.ReadRowsRequest;
Expand Down Expand Up @@ -188,7 +186,6 @@ public class EnhancedBigtableStub implements AutoCloseable {
private final UnaryCallable<BulkMutation, Void> externalBulkMutateRowsCallable;
private final UnaryCallable<ConditionalRowMutation, Boolean> checkAndMutateRowCallable;
private final UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable;
private final UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarmCallable;

private final ServerStreamingCallable<String, ByteStringRange>
generateInitialChangeStreamPartitionsCallable;
Expand Down Expand Up @@ -321,7 +318,6 @@ public EnhancedBigtableStub(
createGenerateInitialChangeStreamPartitionsCallable();
readChangeStreamCallable =
createReadChangeStreamCallable(new DefaultChangeStreamRecordAdapter());
pingAndWarmCallable = createPingAndWarmCallable();
executeQueryCallable = createExecuteQueryCallable();
}

Expand Down Expand Up @@ -1252,28 +1248,6 @@ ServerStreamingCallSettings<ReqT, RespT> convertUnaryToServerStreamingSettings(
.build();
}

private UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> createPingAndWarmCallable() {
UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarm =
GrpcRawCallableFactory.createUnaryCallable(
GrpcCallSettings.<PingAndWarmRequest, PingAndWarmResponse>newBuilder()
.setMethodDescriptor(BigtableGrpc.getPingAndWarmMethod())
.setParamsExtractor(
new RequestParamsExtractor<PingAndWarmRequest>() {
@Override
public Map<String, String> extract(PingAndWarmRequest request) {
return ImmutableMap.of(
"name", request.getName(),
"app_profile_id", request.getAppProfileId());
}
})
.build(),
Collections.emptySet());
return pingAndWarm.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withRetrySettings(settings.pingAndWarmSettings().getRetrySettings()));
}

private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> withRetries(
UnaryCallable<RequestT, ResponseT> innerCallable, UnaryCallSettings<?, ?> unaryCallSettings) {
UnaryCallable<RequestT, ResponseT> retrying;
Expand Down Expand Up @@ -1381,10 +1355,6 @@ public ExecuteQueryCallable executeQueryCallable() {
return executeQueryCallable;
}

UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarmCallable() {
return pingAndWarmCallable;
}

// </editor-fold>

private SpanName getSpanName(String methodName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static com.google.common.truth.Truth.assertThat;

import com.google.api.core.ApiFunction;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.auth.oauth2.AccessToken;
import com.google.auth.oauth2.OAuth2Credentials;
import com.google.bigtable.v2.BigtableGrpc.BigtableImplBase;
Expand Down Expand Up @@ -67,12 +68,16 @@ public void setup() throws IOException {

server = FakeServiceBuilder.create(fakeService).intercept(metadataInterceptor).start();

primer =
BigtableChannelPrimer.create(
OAuth2Credentials.create(new AccessToken(TOKEN_VALUE, null)),
"fake-project",
"fake-instance",
"fake-app-profile");
EnhancedBigtableStubSettings settings =
EnhancedBigtableStubSettings.newBuilder()
.setProjectId("fake-project")
.setInstanceId("fake-instance")
.setAppProfileId("fake-app-profile")
.setCredentialsProvider(
FixedCredentialsProvider.create(
OAuth2Credentials.create(new AccessToken(TOKEN_VALUE, null))))
.build();
primer = BigtableChannelPrimer.create(settings);

channel =
ManagedChannelBuilder.forAddress("localhost", server.getPort()).usePlaintext().build();
Expand Down
Loading