Skip to content

Commit

Permalink
Addressed PR comments: Added an option to enable the new behavior, wh…
Browse files Browse the repository at this point in the history
…ile still ignoring timeout for legacy clients
  • Loading branch information
pablosaavedra-rappi committed May 6, 2024
1 parent 635cc01 commit 58dc3cd
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 2 deletions.
84 changes: 84 additions & 0 deletions PgTimeoutTester.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package io.vertx.sqlclient.templates.impl;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.pgclient.PgPool;
import io.vertx.sqlclient.PoolOptions;

import java.util.ArrayList;
import java.util.List;

public class PgTimeoutTester {
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();

PgConnectOptions dbConfig = new PgConnectOptions()
.setPort(5432)
.setConnectTimeout(2000)
.setHost("localhost")
.setDatabase("postgres")
.setUser("postgres")
.setPassword("postgres");

PoolOptions poolConfig = new PoolOptions()
.setMaxSize(1) // One connection in Pool
.setConnectionTimeout(2); // 2 seconds

PgPool pool = PgPool.pool(vertx, dbConfig, poolConfig);

//connectionTimeOut(pool, vertx);
poolTimeOut(pool, vertx);
}

private static void connectionTimeOut(PgPool pool, Vertx vertx) {
//First query
pool.getConnection()
.onFailure(err -> {
err.printStackTrace();
vertx.close();
})
.compose(conn0 ->
conn0.query("SELECT 1").execute()
.onSuccess(rows -> System.out.println(rows.iterator().next().getInteger(0)))
/*.eventually(ign -> conn0.close())*/); // Don't close connection to trigger timeout while getting one below

//Second query
pool.getConnection()
.onFailure(err -> {
err.printStackTrace();
vertx.close();
})
.compose(conn0 ->
conn0.query("SELECT 2").execute()
.onSuccess(rows -> System.out.println(rows.iterator().next().getInteger(0)))
.eventually(ign -> conn0.close()));
}

private static void poolTimeOut(PgPool pool, Vertx vertx) {
//First query
pool.getConnection()
.onFailure(err -> {
err.printStackTrace();
vertx.close();
})
.compose(conn0 ->
conn0.query("SELECT 1").execute()
.onSuccess(rows -> System.out.println(rows.iterator().next().getInteger(0)))
.eventually(ign -> conn0.close()));// Don't close connection to trigger timeout while getting one below

List<Future<?>> futures = new ArrayList<>();
//N queries
for (int i = 2; i < 10; i++) {
Future<?> f = pool.query("SELECT " + i).execute()
.onSuccess(rows -> System.out.println(rows.iterator().next().getInteger(0)))
.onFailure(err -> {
err.printStackTrace();
vertx.close();
});
futures.add(f);
}

Future.all(futures).onComplete(c -> vertx.close());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ private void testConnectionClosedInProvider(TestContext ctx, boolean immediately

@Test
public void testConnectionTimeoutWhenExecutingDirectly(TestContext ctx) {
PgPool pool = createPool(options, new PoolOptions().setConnectionTimeout(2).setMaxSize(2));
PgPool pool = createPool(options, new PoolOptions().setConnectionTimeout(2).setMaxSize(2).setAlwaysUseTimeout(true));
final Async latch = ctx.async(2);
pool.getConnection(ctx.asyncAssertSuccess(conn -> {
conn
Expand Down
28 changes: 28 additions & 0 deletions vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ public class PoolOptions {
*/
public static final int DEFAULT_EVENT_LOOP_SIZE = 0;

/**
* Default honor timeout when scheduling commands is false
*/
public static final boolean DEFAULT_ALWAYS_USE_TIMEOUT = false;

private int maxSize = DEFAULT_MAX_SIZE;
private int maxWaitQueueSize = DEFAULT_MAX_WAIT_QUEUE_SIZE;
private int idleTimeout = DEFAULT_IDLE_TIMEOUT;
Expand All @@ -106,6 +111,7 @@ public class PoolOptions {
private boolean shared = DEFAULT_SHARED_POOL;
private String name = DEFAULT_NAME;
private int eventLoopSize = DEFAULT_EVENT_LOOP_SIZE;
private boolean alwaysUseTimeout = DEFAULT_ALWAYS_USE_TIMEOUT;

public PoolOptions() {
}
Expand All @@ -122,6 +128,7 @@ public PoolOptions(PoolOptions other) {
shared= other.shared;
name = other.name;
eventLoopSize = other.eventLoopSize;
alwaysUseTimeout = other.alwaysUseTimeout;
}

/**
Expand Down Expand Up @@ -360,6 +367,27 @@ public PoolOptions setEventLoopSize(int eventLoopSize) {
return this;
}

/**
* @return Whether the pool will always use timeout, even when sending commands directly to execute.
*/
public boolean isAlwaysUseTimeout() { return alwaysUseTimeout; }

/**
* Sets whether always honor the pool's timeout.
* <p>
* This basically affects the pool's schedule method, which will submit the command regardless of whether there's
* an available connection or not. This settings allows the caller to have a consistent max wait time across every
* method.
* </p>
* The default is {@code false}.
* @param alwaysUseTimeout Whether to use the configured connection timeout when scheduling commands
* @return a reference to this, so the API can be used fluently
*/
public PoolOptions setAlwaysUseTimeout(boolean alwaysUseTimeout) {
this.alwaysUseTimeout = alwaysUseTimeout;
return this;
}

public JsonObject toJson() {
JsonObject json = new JsonObject();
PoolOptionsConverter.toJson(this, json);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class PoolImpl extends SqlClientBase implements Pool, Closeable {
private volatile Handler<SqlConnectionPool.PooledConnection> connectionInitializer;
private long timerID;
private volatile Function<Context, Future<SqlConnection>> connectionProvider;
private final boolean alwaysUseTimeout;

public static final String PROPAGATABLE_CONNECTION = "propagatable_connection";

Expand All @@ -65,6 +66,7 @@ public PoolImpl(VertxInternal vertx,
this.connectionTimeout = MILLISECONDS.convert(poolOptions.getConnectionTimeout(), poolOptions.getConnectionTimeoutUnit());
this.maxLifetime = MILLISECONDS.convert(poolOptions.getMaxLifetime(), poolOptions.getMaxLifetimeUnit());
this.cleanerPeriod = poolOptions.getPoolCleanerPeriod();
this.alwaysUseTimeout = poolOptions.isAlwaysUseTimeout();
this.timerID = -1L;
this.pipelined = pipelined;
this.vertx = vertx;
Expand Down Expand Up @@ -169,14 +171,17 @@ public Future<SqlConnection> getConnection() {

@Override
public <R> Future<R> schedule(ContextInternal context, CommandBase<R> cmd) {
if (alwaysUseTimeout) {
return pool.execute(context, cmd);
}
PromiseInternal<SqlConnectionPool.PooledConnection> promise = context.promise();
//Acquires the connection honoring the pool's connection timeout
acquire(context, connectionTimeout, promise);
return promise.future().compose(pooled -> {
//We need to 'init' the connection or close will fail.
pooled.init(pooled);
return pooled.schedule(context, cmd)
.eventually(v -> {
.eventually(() -> {
Promise<Void> p = Promise.promise();
pooled.close(pooled, p);
return p.future();
Expand Down

0 comments on commit 58dc3cd

Please sign in to comment.