Skip to content

Commit

Permalink
perf: use multiplexed sessions for read-only operations (#2043)
Browse files Browse the repository at this point in the history
* perf: use multiplexed sessions for read-only operations

Enable multiplexed sessions for all read-only operations. Multiplexed
sessions can handle any number of concurrent read-only operations,
which means that PGAdapter does not need to reserve a session
exclusively for a single read-only operation. This increases the
maximum possible concurrency without the need to increase the
number of sessions in the pool.

* fix: add database name for pgx tests

* fix: add default database for gorm tests

* chore: cleanup
  • Loading branch information
olavloite authored Jul 5, 2024
1 parent 635367a commit 40a2177
Show file tree
Hide file tree
Showing 11 changed files with 215 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.google.cloud.spanner;

import com.google.api.core.InternalApi;

/**
* This class is only here to access a package-private method in the Spanner client library and will
* be removed in the future.
*/
@InternalApi
public class SessionPoolOptionsHelper {
private SessionPoolOptionsHelper() {}

@InternalApi
public static SessionPoolOptions.Builder useMultiplexedSessions(
SessionPoolOptions.Builder builder) {
return builder.setUseMultiplexedSession(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.google.cloud.spanner.Instance;
import com.google.cloud.spanner.InstanceAdminClient;
import com.google.cloud.spanner.InstanceNotFoundException;
import com.google.cloud.spanner.SessionPoolOptions;
import com.google.cloud.spanner.SessionPoolOptionsHelper;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerException.ResourceNotFoundException;
Expand Down Expand Up @@ -205,10 +207,14 @@ public void connectToSpanner(String database, @Nullable Credentials credentials)
ConnectionOptionsHelper.setCredentials(
connectionOptionsBuilder, options.getCredentials());
}
if (options.getSessionPoolOptions() != null) {
connectionOptionsBuilder =
connectionOptionsBuilder.setSessionPoolOptions(options.getSessionPoolOptions());
}
SessionPoolOptions sessionPoolOptions =
options.getSessionPoolOptions() == null
? SessionPoolOptionsHelper.useMultiplexedSessions(SessionPoolOptions.newBuilder())
.build()
: SessionPoolOptionsHelper.useMultiplexedSessions(
options.getSessionPoolOptions().toBuilder())
.build();
connectionOptionsBuilder.setSessionPoolOptions(sessionPoolOptions);
if (options.isEnableOpenTelemetryMetrics()) {
SpannerOptions.enableOpenTelemetryMetrics();
connectionOptionsBuilder =
Expand Down
30 changes: 30 additions & 0 deletions src/test/java/com/google/cloud/spanner/MockServerHelper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.google.cloud.spanner;

import com.google.api.core.InternalApi;
import com.google.spanner.v1.Session;

/** This file will be removed in the future. */
@InternalApi
public class MockServerHelper {

private MockServerHelper() {}

@InternalApi
public static Session getSession(MockSpannerServiceImpl server, String sessionName) {
return server.getSession(sessionName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ private String getExpectedInitialApplicationName() {
* mode for queries and DML statements.
*/
private String createUrl(String extraOptions) {
return String.format("jdbc:postgresql://localhost:%d/" + extraOptions, pgServer.getLocalPort());
return String.format(
"jdbc:postgresql://localhost:%d/d" + extraOptions, pgServer.getLocalPort());
}

private Connection createConnection() throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,7 @@ public void partitionQuery(
public static void startMockSpannerAndPgAdapterServers() throws Exception {
doStartMockSpannerAndPgAdapterServers(
createMockSpannerThatReturnsOneQueryPartition(),
"d",
null,
configurator -> {},
OpenTelemetry.noop());
}
Expand Down Expand Up @@ -1291,7 +1291,7 @@ public <ReqT, RespT> Listener<ReqT> interceptCall(
TestOptionsMetadataBuilder builder = new TestOptionsMetadataBuilder();
builder.setProject("p").setInstance("i");
if (defaultDatabase != null) {
builder.setDatabase("d");
builder.setDatabase(defaultDatabase);
}
builder
.enableDebugMode()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,29 @@
import com.google.cloud.spanner.pgadapter.wireprotocol.WireMessage;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import io.opentelemetry.api.OpenTelemetry;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class InvalidMessagesTest extends AbstractMockServerTest {
@BeforeClass
public static void startMockSpannerAndPgAdapterServers() throws Exception {
doStartMockSpannerAndPgAdapterServers(
createMockSpannerThatReturnsOneQueryPartition(),
"d",
configurator -> {},
OpenTelemetry.noop());
}

@Test
public void testConnectionWithoutMessages() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Dialect;
import com.google.cloud.spanner.MockServerHelper;
import com.google.cloud.spanner.MockSpannerServiceImpl;
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
Expand Down Expand Up @@ -60,12 +61,14 @@
import com.google.spanner.admin.database.v1.UpdateDatabaseDdlRequest;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.CreateSessionRequest;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
import com.google.spanner.v1.ResultSetMetadata;
import com.google.spanner.v1.ResultSetStats;
import com.google.spanner.v1.RollbackRequest;
import com.google.spanner.v1.Session;
import com.google.spanner.v1.StructType;
import com.google.spanner.v1.StructType.Field;
import com.google.spanner.v1.Type;
Expand Down Expand Up @@ -386,9 +389,13 @@ static void setupJsonbResults(MockSpannerServiceImpl mockSpanner) {
* mode for queries and DML statements.
*/
private String createUrl() {
return createUrl("d");
}

private String createUrl(String database) {
return String.format(
"jdbc:postgresql://localhost:%d/?options=-c%%20server_version=%s",
pgServer.getLocalPort(), pgVersion);
"jdbc:postgresql://localhost:%d/%s?options=-c%%20server_version=%s",
pgServer.getLocalPort(), database, pgVersion);
}

private String getExpectedInitialApplicationName() {
Expand Down Expand Up @@ -5416,6 +5423,116 @@ public void testTransactionAbortedByCloudSpanner() throws SQLException {
}
}

@Test
public void testUsesMultiplexedSessionForQueryInAutoCommit() throws SQLException {
try (Connection connection =
DriverManager.getConnection(createUrl(UUID.randomUUID().toString()))) {
assertTrue(connection.getAutoCommit());
try (ResultSet resultSet = connection.createStatement().executeQuery("SELECT 1")) {
//noinspection StatementWithEmptyBody
while (resultSet.next()) {
// Just consume the results
}
}
}
// Verify that one multiplexed session was created and used.
assertEquals(1, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
CreateSessionRequest request = mockSpanner.getRequestsOfType(CreateSessionRequest.class).get(0);
assertTrue(request.getSession().getMultiplexed());
assertEquals(1, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class));
String sessionId = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class).get(0).getSession();
Session session = MockServerHelper.getSession(mockSpanner, sessionId);
assertNotNull(session);
assertTrue(session.getMultiplexed());
}

@Test
public void testUsesMultiplexedSessionForQueryInReadOnlyTransaction() throws SQLException {
int numQueries = 2;
try (Connection connection =
DriverManager.getConnection(createUrl(UUID.randomUUID().toString()))) {
connection.setReadOnly(true);
connection.setAutoCommit(false);

for (int ignore = 0; ignore < numQueries; ignore++) {
try (ResultSet resultSet = connection.createStatement().executeQuery("SELECT 1")) {
//noinspection StatementWithEmptyBody
while (resultSet.next()) {
// Just consume the results
}
}
}
}
// Verify that one multiplexed session was created and used.
assertEquals(1, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
CreateSessionRequest request = mockSpanner.getRequestsOfType(CreateSessionRequest.class).get(0);
assertTrue(request.getSession().getMultiplexed());

// Verify that both queries used the multiplexed session.
assertEquals(numQueries, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class));
for (int index = 0; index < numQueries; index++) {
String sessionId =
mockSpanner.getRequestsOfType(ExecuteSqlRequest.class).get(index).getSession();
Session session = MockServerHelper.getSession(mockSpanner, sessionId);
assertNotNull(session);
assertTrue(session.getMultiplexed());
}
}

@Test
public void testUsesRegularSessionForDmlInAutoCommit() throws SQLException {
String sql = "insert into foo (id) values (1)";
mockSpanner.putStatementResult(StatementResult.update(Statement.of(sql), 1L));

try (Connection connection =
DriverManager.getConnection(createUrl(UUID.randomUUID().toString()))) {
assertTrue(connection.getAutoCommit());
assertEquals(1, connection.createStatement().executeUpdate(sql));
}
// The JDBC connection creates a multiplexed session by default, because it executes a query to
// check what dialect the database uses. This query is executed using a multiplexed session.
assertEquals(1, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
CreateSessionRequest request = mockSpanner.getRequestsOfType(CreateSessionRequest.class).get(0);
assertTrue(request.getSession().getMultiplexed());
// Verify that a regular session was used for the insert statement.
assertEquals(1, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class));
assertEquals(sql, mockSpanner.getRequestsOfType(ExecuteSqlRequest.class).get(0).getSql());
String sessionId = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class).get(0).getSession();
Session session = MockServerHelper.getSession(mockSpanner, sessionId);
assertNotNull(session);
assertFalse(session.getMultiplexed());
}

@Test
public void testUsesRegularSessionForQueryInTransaction() throws SQLException {
String sql = "SELECT 1";
try (Connection connection =
DriverManager.getConnection(createUrl(UUID.randomUUID().toString()))) {
connection.setAutoCommit(false);
assertFalse(connection.getAutoCommit());

try (ResultSet resultSet = connection.createStatement().executeQuery(sql)) {
//noinspection StatementWithEmptyBody
while (resultSet.next()) {
// Just consume the results
}
}
connection.commit();
}
// The JDBC connection creates a multiplexed session by default, because it executes a query to
// check what dialect the database uses. This query is executed using a multiplexed session.
assertEquals(1, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
CreateSessionRequest request = mockSpanner.getRequestsOfType(CreateSessionRequest.class).get(0);
assertTrue(request.getSession().getMultiplexed());
// Verify that a regular session was used for the select statement.
assertEquals(1, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class));
assertEquals(sql, mockSpanner.getRequestsOfType(ExecuteSqlRequest.class).get(0).getSql());
String sessionId = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class).get(0).getSession();
Session session = MockServerHelper.getSession(mockSpanner, sessionId);
assertNotNull(session);
assertFalse(session.getMultiplexed());
}

@Ignore("Only used for manual performance testing")
@Test
public void testBasePerformance() throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,11 @@ public static void compile() throws IOException, InterruptedException {

private GoString createConnString() {
if (useDomainSocket) {
return new GoString(String.format("host=/tmp port=%d", pgServer.getLocalPort()));
return new GoString(String.format("host=/tmp port=%d database=d", pgServer.getLocalPort()));
}
return new GoString(
String.format("postgres://uid:pwd@localhost:%d/?sslmode=disable", pgServer.getLocalPort()));
String.format(
"postgres://uid:pwd@localhost:%d/d?sslmode=disable", pgServer.getLocalPort()));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,11 @@ public static void compile() throws IOException, InterruptedException {

private GoString createConnString() {
if (useDomainSocket) {
return new GoString(String.format("host=/tmp port=%d", pgServer.getLocalPort()));
return new GoString(String.format("host=/tmp port=%d database=d", pgServer.getLocalPort()));
}
return new GoString(
String.format("postgres://uid:pwd@localhost:%d/?sslmode=disable", pgServer.getLocalPort()));
String.format(
"postgres://uid:pwd@localhost:%d/d?sslmode=disable", pgServer.getLocalPort()));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,11 @@ public static void compile() throws IOException, InterruptedException {

private GoString createConnString() {
if (useDomainSocket) {
return new GoString(String.format("host=/tmp port=%d", pgServer.getLocalPort()));
return new GoString(String.format("host=/tmp port=%d database=d", pgServer.getLocalPort()));
}
return new GoString(
String.format("postgres://uid:pwd@localhost:%d/?sslmode=disable", pgServer.getLocalPort()));
String.format(
"postgres://uid:pwd@localhost:%d/d?sslmode=disable", pgServer.getLocalPort()));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public static void compile() throws IOException, InterruptedException {
private GoString createConnString() {
return new GoString(
String.format(
"postgres://uid:pwd@localhost:%d/?prefer_simple_protocol=true&sslmode=disable",
"postgres://uid:pwd@localhost:%d/d?prefer_simple_protocol=true&sslmode=disable",
pgServer.getLocalPort()));
}

Expand Down

0 comments on commit 40a2177

Please sign in to comment.