Skip to content

Commit

Permalink
use netty mock server for metrics test
Browse files Browse the repository at this point in the history
  • Loading branch information
surbhigarg92 committed Jan 7, 2025
1 parent 59459ae commit 6fbf28c
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public class BuiltInMetricsConstant {
OPERATION_LATENCIES_NAME,
ATTEMPT_LATENCIES_NAME,
OPERATION_COUNT_NAME,
ATTEMPT_COUNT_NAME)
ATTEMPT_COUNT_NAME,
GFE_LATENCIES_NAME)
.stream()
.map(m -> METER_NAME + '/' + m)
.collect(Collectors.toSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,18 @@ void initialize(
}
}

@VisibleForTesting
void initialize(
OpenTelemetry openTelemetry,
String projectId,
String client_name,
@Nullable Credentials credentials,
@Nullable String monitoringHost) {
initialize(projectId, client_name, credentials, monitoringHost);
this.builtInOpenTelemetryMetricsRecorder =
new BuiltInOpenTelemetryMetricsRecorder(openTelemetry, clientAttributes);
}

OpenTelemetry getOpenTelemetry() {
return this.openTelemetry;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import com.google.api.gax.grpc.testing.LocalChannelProvider;
import com.google.cloud.NoCredentials;
import io.grpc.ForwardingServerCall;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;

abstract class AbstractNettyMockServerTest {
protected static MockSpannerServiceImpl mockSpanner;

protected static Server server;
protected static InetSocketAddress address;
static ExecutorService executor;
protected static LocalChannelProvider channelProvider;
protected static AtomicInteger fakeServerTiming =
new AtomicInteger(new Random().nextInt(1000) + 1);

protected Spanner spanner;

@BeforeClass
public static void startMockServer() throws IOException {
mockSpanner = new MockSpannerServiceImpl();
mockSpanner.setAbortProbability(0.0D); // We don't want any unpredictable aborted transactions.

address = new InetSocketAddress("localhost", 0);
server =
NettyServerBuilder.forAddress(address)
.addService(mockSpanner)
.intercept(
new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> serverCall,
Metadata headers,
ServerCallHandler<ReqT, RespT> serverCallHandler) {
return serverCallHandler.startCall(
new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(
serverCall) {
@Override
public void sendHeaders(Metadata headers) {
headers.put(
Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER),
String.format("gfet4t7; dur=%d", fakeServerTiming.get()));
super.sendHeaders(headers);
}
},
headers);
}
})
.build()
.start();
executor = Executors.newSingleThreadExecutor();
}

@AfterClass
public static void stopMockServer() throws InterruptedException {
server.shutdown();
server.awaitTermination();
executor.shutdown();
}

@Before
public void createSpannerInstance() {
String endpoint = address.getHostString() + ":" + server.getPort();
spanner =
SpannerOptions.newBuilder()
.setProjectId("test-project")
.setChannelConfigurator(ManagedChannelBuilder::usePlaintext)
.setHost("http://" + endpoint)
.setCredentials(NoCredentials.getInstance())
.setSessionPoolOption(SessionPoolOptions.newBuilder().setFailOnSessionLeak().build())
.build()
.getService();
}

@After
public void cleanup() {
spanner.close();
mockSpanner.reset();
mockSpanner.removeAllExecutionTimes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Range;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
Expand All @@ -58,7 +59,7 @@
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class OpenTelemetryBuiltInMetricsTracerTest extends AbstractMockServerTest {
public class OpenTelemetryBuiltInMetricsTracerTest extends AbstractNettyMockServerTest {

private static final Statement SELECT_RANDOM = Statement.of("SELECT * FROM random");

Expand All @@ -69,7 +70,8 @@ public class OpenTelemetryBuiltInMetricsTracerTest extends AbstractMockServerTes

private static Map<String, String> attributes;

private static Attributes expectedBaseAttributes;
private static Attributes expectedCommonBaseAttributes;
private static Attributes expectedCommonRequestAttributes;

private static final long MIN_LATENCY = 0;

Expand All @@ -89,10 +91,11 @@ public static void setup() {
String client_name = "spanner-java/";
openTelemetry = OpenTelemetrySdk.builder().setMeterProvider(meterProvider.build()).build();
provider.reset();
provider.initialize("test-project", client_name, null, null);
// provider.getOpenTelemetry().getMeterProvider().
provider.initialize(openTelemetry, "test-project", client_name, null, null);
attributes = provider.getClientAttributes();

expectedBaseAttributes =
expectedCommonBaseAttributes =
Attributes.builder()
.put(BuiltInMetricsConstant.PROJECT_ID_KEY, "test-project")
.put(BuiltInMetricsConstant.INSTANCE_CONFIG_ID_KEY, "unknown")
Expand All @@ -103,6 +106,14 @@ public static void setup() {
.put(BuiltInMetricsConstant.CLIENT_UID_KEY, attributes.get("client_uid"))
.put(BuiltInMetricsConstant.CLIENT_HASH_KEY, attributes.get("client_hash"))
.build();

expectedCommonRequestAttributes =
Attributes.builder()
.put(BuiltInMetricsConstant.INSTANCE_ID_KEY, "i")
.put(BuiltInMetricsConstant.DATABASE_KEY, "d")
.put(BuiltInMetricsConstant.DIRECT_PATH_ENABLED_KEY, "false")
.put(BuiltInMetricsConstant.DIRECT_PATH_USED_KEY, "false")
.build();
}

@BeforeClass
Expand Down Expand Up @@ -137,18 +148,18 @@ public void createSpannerInstance() {
.setRetryDelayMultiplier(1.0)
.setTotalTimeoutDuration(Duration.ofMinutes(10L))
.build()));
String endpoint = address.getHostString() + ":" + server.getPort();
spanner =
builder
SpannerOptions.newBuilder()
.setProjectId("test-project")
.setChannelProvider(channelProvider)
.setChannelConfigurator(ManagedChannelBuilder::usePlaintext)
.setHost("http://" + endpoint)
.setCredentials(NoCredentials.getInstance())
.setSessionPoolOption(
SessionPoolOptions.newBuilder()
.setWaitForMinSessionsDuration(Duration.ofSeconds(5L))
.setFailOnSessionLeak()
.build())
// Setting this to false so that Spanner Options does not register Metrics Tracer
// factory again.
.setBuiltInMetricsEnabled(false)
.setApiTracerFactory(metricsTracerFactory)
.build()
Expand All @@ -166,8 +177,9 @@ public void testMetricsSingleUseQuery() {

long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
Attributes expectedAttributes =
expectedBaseAttributes
expectedCommonBaseAttributes
.toBuilder()
.putAll(expectedCommonRequestAttributes)
.put(BuiltInMetricsConstant.STATUS_KEY, "OK")
.put(BuiltInMetricsConstant.METHOD_KEY, "Spanner.ExecuteStreamingSql")
.build();
Expand All @@ -189,6 +201,11 @@ public void testMetricsSingleUseQuery() {
MetricData attemptCountMetricData =
getMetricData(metricReader, BuiltInMetricsConstant.ATTEMPT_COUNT_NAME);
assertThat(getAggregatedValue(attemptCountMetricData, expectedAttributes)).isEqualTo(1);

MetricData gfeLatencyMetricData =
getMetricData(metricReader, BuiltInMetricsConstant.GFE_LATENCIES_NAME);
long gfeLatencyValue = getAggregatedValue(attemptLatencyMetricData, expectedAttributes);
assertThat(gfeLatencyValue).isEqualTo(gfeLatencyValue);
}

@Test
Expand All @@ -205,14 +222,15 @@ public void testMetricsWithGaxRetryUnaryRpc() {
stopwatch.elapsed(TimeUnit.MILLISECONDS);

Attributes expectedAttributesBeginTransactionOK =
expectedBaseAttributes
expectedCommonBaseAttributes
.toBuilder()
.putAll(expectedCommonRequestAttributes)
.put(BuiltInMetricsConstant.STATUS_KEY, "OK")
.put(BuiltInMetricsConstant.METHOD_KEY, "Spanner.BeginTransaction")
.build();

Attributes expectedAttributesBeginTransactionFailed =
expectedBaseAttributes
expectedCommonBaseAttributes
.toBuilder()
.put(BuiltInMetricsConstant.STATUS_KEY, "UNAVAILABLE")
.put(BuiltInMetricsConstant.METHOD_KEY, "Spanner.BeginTransaction")
Expand Down

0 comments on commit 6fbf28c

Please sign in to comment.