diff --git a/src/main/java/io/lettuce/core/ConnectionBuilder.java b/src/main/java/io/lettuce/core/ConnectionBuilder.java
index 4bb3127b88..e123502a2b 100644
--- a/src/main/java/io/lettuce/core/ConnectionBuilder.java
+++ b/src/main/java/io/lettuce/core/ConnectionBuilder.java
@@ -155,7 +155,7 @@ protected ConnectionWatchdog createConnectionWatchdog() {
ConnectionWatchdog watchdog = new ConnectionWatchdog(clientResources.reconnectDelay(), clientOptions, bootstrap,
clientResources.timer(), clientResources.eventExecutorGroup(), socketAddressSupplier, reconnectionListener,
- connection, clientResources.eventBus(), endpoint);
+ connection, clientResources.eventBus(), endpoint, clientResources.connectionMonitor());
endpoint.registerConnectionWatchdog(watchdog);
diff --git a/src/main/java/io/lettuce/core/TimeoutOptions.java b/src/main/java/io/lettuce/core/TimeoutOptions.java
index a5754f1819..9502de5b3e 100644
--- a/src/main/java/io/lettuce/core/TimeoutOptions.java
+++ b/src/main/java/io/lettuce/core/TimeoutOptions.java
@@ -24,18 +24,26 @@
@SuppressWarnings("serial")
public class TimeoutOptions implements Serializable {
+ public static final Duration DISABLED_TIMEOUT = Duration.ZERO.minusSeconds(1);
+
public static final boolean DEFAULT_TIMEOUT_COMMANDS = false;
+ public static final Duration DEFAULT_RELAXED_TIMEOUT = DISABLED_TIMEOUT;
+
private final boolean timeoutCommands;
private final boolean applyConnectionTimeout;
+ private final Duration relaxedTimeout;
+
private final TimeoutSource source;
- private TimeoutOptions(boolean timeoutCommands, boolean applyConnectionTimeout, TimeoutSource source) {
+ private TimeoutOptions(boolean timeoutCommands, boolean applyConnectionTimeout, TimeoutSource source,
+ Duration relaxedTimeout) {
this.timeoutCommands = timeoutCommands;
this.applyConnectionTimeout = applyConnectionTimeout;
+ this.relaxedTimeout = relaxedTimeout;
this.source = source;
}
@@ -84,6 +92,8 @@ public static class Builder {
private boolean applyConnectionTimeout = false;
+ private Duration relaxedTimeout = DEFAULT_RELAXED_TIMEOUT;
+
private TimeoutSource source;
/**
@@ -107,6 +117,24 @@ public Builder timeoutCommands(boolean enabled) {
return this;
}
+ /**
+ * Enable proactive timeout relaxing. Disabled by default, see {@link #DEFAULT_RELAXED_TIMEOUT}.
+ *
+ * If the Redis server supports this, the client could listen to notifications that the current endpoint is about to go
+ * down as part of some maintenance activity, for example. In such cases, the driver could extend the existing timeout
+ * settings for existing commands to make sure they do not time out during this process either as part of the offline
+ * buffer or while waiting for a reply.
+ *
+ * @param duration {@link Duration} to relax timeouts proactively, must not be {@code null}.
+ * @return {@code this}
+ */
+ public Builder proactiveTimeoutsRelaxing(Duration duration) {
+ LettuceAssert.notNull(duration, "Duration must not be null");
+
+ this.relaxedTimeout = duration;
+ return this;
+ }
+
/**
* Set a fixed timeout for all commands.
*
@@ -158,7 +186,7 @@ public TimeoutOptions build() {
}
}
- return new TimeoutOptions(timeoutCommands, applyConnectionTimeout, source);
+ return new TimeoutOptions(timeoutCommands, applyConnectionTimeout, source, relaxedTimeout);
}
}
@@ -177,6 +205,13 @@ public boolean isApplyConnectionTimeout() {
return applyConnectionTimeout;
}
+ /**
+ * @return the {@link Duration} to relax timeouts proactively, {@link #DISABLED_TIMEOUT} if disabled.
+ */
+ public Duration getRelaxedTimeout() {
+ return relaxedTimeout;
+ }
+
/**
* @return the timeout source to determine the timeout for a {@link RedisCommand}. Can be {@code null} if
* {@link #isTimeoutCommands()} is {@code false}.
diff --git a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java
index b5fa5cf196..4df92fd4db 100644
--- a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java
+++ b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java
@@ -1091,6 +1091,7 @@ private CompletionStage fetchPartitions(Iterable topologyR
getClusterClientOptions().getSocketOptions().getConnectTimeout(), useDynamicRefreshSources());
return topology.thenApply(partitions -> {
+ logger.debug("Topology Refresh Views: {} ", partitions);
if (partitions.isEmpty()) {
throw new RedisException(String.format("Cannot retrieve initial cluster partitions from initial URIs %s",
@@ -1108,7 +1109,7 @@ private CompletionStage fetchPartitions(Iterable topologyR
}
topologyRefreshScheduler.activateTopologyRefreshIfNeeded();
-
+ logger.debug("Topology Refresh loadedPartitions: {}", loadedPartitions);
return loadedPartitions;
});
}
diff --git a/src/main/java/io/lettuce/core/metrics/ConnectionMonitor.java b/src/main/java/io/lettuce/core/metrics/ConnectionMonitor.java
new file mode 100644
index 0000000000..fdf4782f99
--- /dev/null
+++ b/src/main/java/io/lettuce/core/metrics/ConnectionMonitor.java
@@ -0,0 +1,33 @@
+package io.lettuce.core.metrics;
+
+public interface ConnectionMonitor {
+
+ static ConnectionMonitor disabled() {
+
+ return new ConnectionMonitor() {
+
+ @Override
+ public void recordDisconnectedTime(String epid, long time) {
+
+ }
+
+ @Override
+ public void incrementReconnectionAttempts(String epid) {
+
+ }
+
+ @Override
+ public boolean isEnabled() {
+ return false;
+ }
+
+ };
+ }
+
+ void recordDisconnectedTime(String epid, long time);
+
+ void incrementReconnectionAttempts(String epid);
+
+ boolean isEnabled();
+
+}
diff --git a/src/main/java/io/lettuce/core/metrics/EndpointQueueMonitor.java b/src/main/java/io/lettuce/core/metrics/EndpointQueueMonitor.java
new file mode 100644
index 0000000000..ec5ea74484
--- /dev/null
+++ b/src/main/java/io/lettuce/core/metrics/EndpointQueueMonitor.java
@@ -0,0 +1,76 @@
+package io.lettuce.core.metrics;
+
+import io.lettuce.core.internal.LettuceAssert;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+public interface EndpointQueueMonitor {
+
+ class QueueId implements Serializable {
+
+ private final String queueName;
+
+ private final String epId;
+
+ protected QueueId(String name, String epId) {
+ LettuceAssert.notNull(name, "name must not be null");
+
+ this.queueName = name;
+ this.epId = epId;
+ }
+
+ public static QueueId create(String queueName, String epId) {
+ return new QueueId(queueName, epId);
+ }
+
+ public String getEpId() {
+ return epId;
+ }
+
+ public String getQueueName() {
+ return queueName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (!(o instanceof QueueId))
+ return false;
+ QueueId queueId = (QueueId) o;
+ return Objects.equals(queueName, queueId.queueName) && Objects.equals(epId, queueId.epId);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("[").append("epId=").append(epId).append(']');
+ return sb.toString();
+ }
+
+ }
+
+ static EndpointQueueMonitor disabled() {
+
+ return new EndpointQueueMonitor() {
+
+ @Override
+ public void observeQueueSize(QueueId queueId, Supplier queueSizeSupplier) {
+
+ }
+
+ @Override
+ public boolean isEnabled() {
+ return false;
+ }
+
+ };
+ }
+
+ void observeQueueSize(QueueId queueId, Supplier queueSizeSupplier);
+
+ boolean isEnabled();
+
+}
diff --git a/src/main/java/io/lettuce/core/metrics/MicrometerConnectionMonitor.java b/src/main/java/io/lettuce/core/metrics/MicrometerConnectionMonitor.java
new file mode 100644
index 0000000000..e0462007e6
--- /dev/null
+++ b/src/main/java/io/lettuce/core/metrics/MicrometerConnectionMonitor.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2011-Present, Redis Ltd. and Contributors
+ * All rights reserved.
+ *
+ * Licensed under the MIT License.
+ *
+ * This file contains contributions from third-party contributors
+ * licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lettuce.core.metrics;
+
+import io.lettuce.core.internal.LettuceAssert;
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Timer;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Micrometer implementation for tracking connection metrics.
+ *
+ *
+ * - Time from having connection disconnected till successfully reconnected:
+ *
+ * - lettuce.reconnection.inactive.duration
+ * - Description: Measures the time between a connection being disconnected and successfully reconnected.
+ *
+ *
+ * - Number of reconnection attempts:
+ *
+ * - lettuce.reconnection.attempts.count
+ * - Description: Tracks the number of reconnection attempts made during a disconnection.
+ *
+ *
+ *
+ *
+ * @author Ivo Gaydajiev
+ * @since 6.7
+ */
+public class MicrometerConnectionMonitor implements ConnectionMonitor {
+
+ static final String LABEL_EPID = "epid";
+
+ // Track the time between a connection being disconnected and successfully reconnected or closed
+ public static final String METRIC_RECONNECTION_INACTIVE_TIME = "lettuce.reconnection.inactive.duration";
+
+ public static final String METRIC_RECONNECTION_ATTEMPTS = "lettuce.reconnection.attempts";
+
+ private final MeterRegistry meterRegistry;
+
+ private final MicrometerOptions options;
+
+ private final Map disconnectedTimers = new ConcurrentHashMap<>();
+
+ private final Map reconnectionAttempts = new ConcurrentHashMap<>();
+
+ /**
+ * Create a new {@link MicrometerConnectionMonitor} instance given {@link MeterRegistry} and {@link MicrometerOptions}.
+ *
+ * @param meterRegistry
+ * @param options
+ */
+ public MicrometerConnectionMonitor(MeterRegistry meterRegistry, MicrometerOptions options) {
+
+ LettuceAssert.notNull(meterRegistry, "MeterRegistry must not be null");
+ LettuceAssert.notNull(options, "MicrometerOptions must not be null");
+
+ this.meterRegistry = meterRegistry;
+ this.options = options;
+ }
+
+ @Override
+ public void recordDisconnectedTime(String epid, long time) {
+
+ if (!isEnabled()) {
+ return;
+ }
+
+ MonitoredConnectionId connectionId = createId(epid);
+ Timer inavtiveConnectionTimer = disconnectedTimers.computeIfAbsent(connectionId, this::inactiveConnectionTimer);
+ inavtiveConnectionTimer.record(time, TimeUnit.NANOSECONDS);
+ }
+
+ @Override
+ public void incrementReconnectionAttempts(String epid) {
+
+ if (!isEnabled()) {
+ return;
+ }
+
+ MonitoredConnectionId connectionId = createId(epid);
+
+ Counter recconectionAttemptsCounter = reconnectionAttempts.computeIfAbsent(connectionId, this::reconnectAttempts);
+ recconectionAttemptsCounter.increment();
+ }
+
+ @Override
+ public boolean isEnabled() {
+ return options.isEnabled();
+ }
+
+ private MonitoredConnectionId createId(String epId) {
+ return MonitoredConnectionId.create(epId);
+ }
+
+ protected Timer inactiveConnectionTimer(MonitoredConnectionId connectionId) {
+ Timer.Builder timer = Timer.builder(METRIC_RECONNECTION_INACTIVE_TIME)
+ .description("Time taken for successful reconnection").tag(LABEL_EPID, connectionId.epId())
+ .tags(options.tags());
+
+ if (options.isHistogram()) {
+ timer.publishPercentileHistogram().publishPercentiles(options.targetPercentiles())
+ .minimumExpectedValue(options.minLatency()).maximumExpectedValue(options.maxLatency());
+ }
+
+ return timer.register(meterRegistry);
+ }
+
+ protected Counter reconnectAttempts(MonitoredConnectionId connectionId) {
+ Counter.Builder timer = Counter.builder(METRIC_RECONNECTION_ATTEMPTS).description("Number of reconnection attempts")
+ .tag(LABEL_EPID, connectionId.epId()).tags(options.tags());
+
+ return timer.register(meterRegistry);
+ }
+
+}
diff --git a/src/main/java/io/lettuce/core/metrics/MicrometerQueueMonitor.java b/src/main/java/io/lettuce/core/metrics/MicrometerQueueMonitor.java
new file mode 100644
index 0000000000..1f02395c80
--- /dev/null
+++ b/src/main/java/io/lettuce/core/metrics/MicrometerQueueMonitor.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2011-Present, Redis Ltd. and Contributors
+ * All rights reserved.
+ *
+ * Licensed under the MIT License.
+ *
+ * This file contains contributions from third-party contributors
+ * licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lettuce.core.metrics;
+
+import io.lettuce.core.internal.LettuceAssert;
+import io.micrometer.core.instrument.Gauge;
+import io.micrometer.core.instrument.MeterRegistry;
+
+import java.util.function.Supplier;
+
+public class MicrometerQueueMonitor implements EndpointQueueMonitor {
+
+ public static final String LABEL_EPID = "epId";
+
+ private final MeterRegistry meterRegistry;
+
+ private final MicrometerOptions options;
+
+ /**
+ * Create a new {@link MicrometerQueueMonitor} instance given {@link MeterRegistry} and {@link MicrometerOptions}.
+ *
+ * @param meterRegistry
+ * @param options
+ */
+ public MicrometerQueueMonitor(MeterRegistry meterRegistry, MicrometerOptions options) {
+
+ LettuceAssert.notNull(meterRegistry, "MeterRegistry must not be null");
+ LettuceAssert.notNull(options, "MicrometerOptions must not be null");
+
+ this.meterRegistry = meterRegistry;
+ this.options = options;
+ }
+
+ @Override
+ public boolean isEnabled() {
+ return options.isEnabled();
+ }
+
+ @Override
+ public void observeQueueSize(QueueId queueId, Supplier queueSizeSupplier) {
+ if (!isEnabled()) {
+ return;
+ }
+
+ Gauge.builder(queueId.getQueueName(), queueSizeSupplier).description("Queue size").tag(LABEL_EPID, queueId.getEpId())
+ .tags(options.tags()).register(meterRegistry);
+ }
+
+}
diff --git a/src/main/java/io/lettuce/core/metrics/MonitoredConnectionId.java b/src/main/java/io/lettuce/core/metrics/MonitoredConnectionId.java
new file mode 100644
index 0000000000..a813868ee2
--- /dev/null
+++ b/src/main/java/io/lettuce/core/metrics/MonitoredConnectionId.java
@@ -0,0 +1,77 @@
+package io.lettuce.core.metrics;
+
+import io.lettuce.core.internal.LettuceAssert;
+
+import java.io.Serializable;
+import java.net.SocketAddress;
+
+/**
+ * Identifier for a connection
+ *
+ * @author Ivo Gaydajiev
+ */
+@SuppressWarnings("serial")
+public class MonitoredConnectionId implements Serializable, Comparable {
+
+ private final String epId;
+
+ protected MonitoredConnectionId(String epId) {
+ LettuceAssert.notNull(epId, "EndpointId must not be null");
+
+ this.epId = epId;
+ }
+
+ /**
+ * Create a new instance of {@link MonitoredConnectionId}.
+ *
+ * @param epId the endpoint id
+ * @return a new instance of {@link MonitoredConnectionId}
+ */
+ public static MonitoredConnectionId create(String epId) {
+ return new MonitoredConnectionId(epId);
+ }
+
+ /**
+ * Returns the command type.
+ *
+ * @return the command type
+ */
+ public String epId() {
+ return epId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (!(o instanceof MonitoredConnectionId))
+ return false;
+
+ MonitoredConnectionId that = (MonitoredConnectionId) o;
+
+ return epId.equals(that.epId);
+ }
+
+ @Override
+ public int hashCode() {
+ return epId.hashCode();
+ }
+
+ @Override
+ public int compareTo(MonitoredConnectionId o) {
+
+ if (o == null) {
+ return -1;
+ }
+
+ return epId.compareTo(o.epId);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("[").append("epId=").append(epId).append(']');
+ return sb.toString();
+ }
+
+}
diff --git a/src/main/java/io/lettuce/core/protocol/CommandExpiryWriter.java b/src/main/java/io/lettuce/core/protocol/CommandExpiryWriter.java
index 160deed187..33489fafcf 100644
--- a/src/main/java/io/lettuce/core/protocol/CommandExpiryWriter.java
+++ b/src/main/java/io/lettuce/core/protocol/CommandExpiryWriter.java
@@ -32,9 +32,12 @@
import io.lettuce.core.TimeoutOptions;
import io.lettuce.core.internal.ExceptionFactory;
import io.lettuce.core.internal.LettuceAssert;
+import io.lettuce.core.rebind.RebindCompletedEvent;
+import io.lettuce.core.rebind.RebindInitiatedEvent;
import io.lettuce.core.resource.ClientResources;
import io.netty.util.Timeout;
import io.netty.util.Timer;
+import reactor.core.Disposable;
/**
* Extension to {@link RedisChannelWriter} that expires commands. Command timeout starts at the time the command is written
@@ -59,8 +62,16 @@ public class CommandExpiryWriter implements RedisChannelWriter {
private final boolean applyConnectionTimeout;
+ private final Duration relaxedTimeout;
+
+ private final Disposable rebindStatedListener;
+
+ private final Disposable rebindEndedListener;
+
private volatile long timeout = -1;
+ private volatile boolean relaxTimeoutsGlobally = false;
+
/**
* Create a new {@link CommandExpiryWriter}.
*
@@ -78,9 +89,26 @@ public CommandExpiryWriter(RedisChannelWriter delegate, ClientOptions clientOpti
this.delegate = delegate;
this.source = timeoutOptions.getSource();
this.applyConnectionTimeout = timeoutOptions.isApplyConnectionTimeout();
+ this.relaxedTimeout = timeoutOptions.getRelaxedTimeout();
this.timeUnit = source.getTimeUnit();
this.executorService = clientResources.eventExecutorGroup();
this.timer = clientResources.timer();
+
+ this.rebindStatedListener = clientResources.eventBus().get().filter(e -> e instanceof RebindInitiatedEvent)
+ .subscribe(e -> {
+ this.relaxTimeoutsGlobally = true;
+ });
+
+ this.rebindEndedListener = clientResources.eventBus().get().filter(e -> e instanceof RebindCompletedEvent)
+ .subscribe(e -> {
+ // Consider the rebind complete after another relaxed timeout cycle.
+ //
+ // The reasoning behind that is we can't really be sure when all the enqueued commands have
+ // successfully been written to the wire and then the reply was received
+ timer.newTimeout(t -> getExecutorService().submit(() -> this.relaxTimeoutsGlobally = false),
+ relaxedTimeout.toMillis(), TimeUnit.MILLISECONDS);
+
+ });
}
/**
@@ -144,6 +172,8 @@ public void flushCommands() {
@Override
public void close() {
+ this.rebindStatedListener.dispose();
+ this.rebindEndedListener.dispose();
delegate.close();
}
@@ -154,6 +184,7 @@ public CompletableFuture closeAsync() {
@Override
public void reset() {
+ this.relaxTimeoutsGlobally = false;
delegate.reset();
}
@@ -179,8 +210,14 @@ private void potentiallyExpire(RedisCommand, ?, ?> command, ScheduledExecutorS
Timeout commandTimeout = timer.newTimeout(t -> {
if (!command.isDone()) {
- executors.submit(() -> command.completeExceptionally(ExceptionFactory
- .createTimeoutException(command.getType().toString(), Duration.ofNanos(timeUnit.toNanos(timeout)))));
+ executors.submit(() -> {
+ if (shouldRelaxTimeoutsGlobally()) {
+ relaxedAttempt(command, executors);
+ } else {
+ command.completeExceptionally(ExceptionFactory.createTimeoutException(command.getType().toString(),
+ Duration.ofNanos(timeUnit.toNanos(timeout))));
+ }
+ });
}
}, timeout, timeUnit);
@@ -191,4 +228,22 @@ private void potentiallyExpire(RedisCommand, ?, ?> command, ScheduledExecutorS
}
+ public boolean shouldRelaxTimeoutsGlobally() {
+ return relaxTimeoutsGlobally && !relaxedTimeout.isNegative();
+ }
+
+ // when relaxing the timeouts - instead of expiring immediately, we will start a new timer with 10 seconds
+ private void relaxedAttempt(RedisCommand, ?, ?> command, ScheduledExecutorService executors) {
+
+ Timeout commandTimeout = timer.newTimeout(t -> {
+ if (!command.isDone()) {
+ executors.submit(() -> command.completeExceptionally(ExceptionFactory.createTimeoutException(relaxedTimeout)));
+ }
+ }, relaxedTimeout.toMillis(), TimeUnit.MILLISECONDS);
+
+ if (command instanceof CompleteableCommand) {
+ ((CompleteableCommand>) command).onComplete((o, o2) -> commandTimeout.cancel());
+ }
+ }
+
}
diff --git a/src/main/java/io/lettuce/core/protocol/CommandHandler.java b/src/main/java/io/lettuce/core/protocol/CommandHandler.java
index 59aee61e0c..9e5ba9bd99 100644
--- a/src/main/java/io/lettuce/core/protocol/CommandHandler.java
+++ b/src/main/java/io/lettuce/core/protocol/CommandHandler.java
@@ -20,11 +20,13 @@
package io.lettuce.core.protocol;
import static io.lettuce.core.ConnectionEvents.*;
+import static io.lettuce.core.protocol.ConnectionWatchdog.REBIND_ATTRIBUTE;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
+import java.time.LocalTime;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
@@ -47,8 +49,11 @@
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.internal.LettuceSets;
import io.lettuce.core.metrics.CommandLatencyRecorder;
+import io.lettuce.core.metrics.EndpointQueueMonitor;
+import io.lettuce.core.metrics.EndpointQueueMonitor.QueueId;
import io.lettuce.core.output.CommandOutput;
import io.lettuce.core.output.PushOutput;
+import io.lettuce.core.rebind.RebindState;
import io.lettuce.core.resource.ClientResources;
import io.lettuce.core.tracing.TraceContext;
import io.lettuce.core.tracing.TraceContextProvider;
@@ -165,6 +170,12 @@ public CommandHandler(ClientOptions clientOptions, ClientResources clientResourc
this.tracingEnabled = tracing.isEnabled();
this.decodeBufferPolicy = clientOptions.getDecodeBufferPolicy();
+
+ EndpointQueueMonitor endpointQueueMonitor = clientResources.endpointQueueMonitor();
+ if (endpointQueueMonitor != null) {
+ endpointQueueMonitor.observeQueueSize(QueueId.create("lettuce.command.handler.queue", endpoint.getId()),
+ stack::size);
+ }
}
public Endpoint getEndpoint() {
@@ -625,6 +636,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException {
+ final boolean rebindInProgress = ctx.channel().hasAttr(REBIND_ATTRIBUTE)
+ && ctx.channel().attr(REBIND_ATTRIBUTE).get() != null
+ && ctx.channel().attr(REBIND_ATTRIBUTE).get().equals(RebindState.STARTED);
+ if (debugEnabled && rebindInProgress) {
+ logger.debug("{} Processing command while rebind is in progress, stack has {} more to process", logPrefix(),
+ stack.size());
+ }
if (pristine) {
@@ -711,6 +729,11 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup
}
}
+ if (rebindInProgress && stack.isEmpty()) {
+ logger.info("{} Rebind completed at {}", logPrefix(), LocalTime.now());
+ ctx.channel().attr(REBIND_ATTRIBUTE).set(RebindState.COMPLETED);
+ }
+
decodeBufferPolicy.afterDecoding(buffer);
}
diff --git a/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java b/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java
index 84bcb41f1f..f4b62cba44 100644
--- a/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java
+++ b/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java
@@ -19,12 +19,29 @@
*/
package io.lettuce.core.protocol;
+import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.nio.ByteBuffer;
import java.time.Duration;
+import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import io.lettuce.core.metrics.ConnectionMonitor;
+import io.lettuce.core.api.push.PushListener;
+import io.lettuce.core.api.push.PushMessage;
+import io.lettuce.core.codec.StringCodec;
+import io.lettuce.core.pubsub.PubSubCommandHandler;
+import io.lettuce.core.rebind.RebindCompletedEvent;
+import io.lettuce.core.rebind.RebindInitiatedEvent;
+import io.lettuce.core.rebind.RebindState;
+import io.netty.util.AttributeKey;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import io.lettuce.core.ClientOptions;
@@ -59,7 +76,9 @@
* @author Koji Lin
*/
@ChannelHandler.Sharable
-public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
+public class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements PushListener {
+
+ public static final AttributeKey REBIND_ATTRIBUTE = AttributeKey.newInstance("rebindAddress");
private static final long LOGGING_QUIET_TIME_MS = TimeUnit.MILLISECONDS.convert(5, TimeUnit.SECONDS);
@@ -93,7 +112,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
private final AtomicBoolean reconnectSchedulerSync;
- private volatile int attempts;
+ private volatile Attempts attempts = new Attempts();
private volatile boolean armed;
@@ -101,6 +120,75 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
private volatile Timeout reconnectScheduleTimeout;
+ private SocketAddress rebindAddress;
+
+ private final ConnectionMonitor connectionMonitor;
+
+ static class Attempts {
+
+ private int attempts = 0;
+
+ private long disconnectedNs = -1;
+
+ private long completedNs = -1;
+
+ /**
+ * Sets the time of when connection transitioned to disconnected state.
+ *
+ * @param time the time of when the item was sent.
+ */
+ void disconnected(long time) {
+ if (this.disconnectedNs == -1) {
+ this.disconnectedNs = time;
+ }
+
+ this.completedNs = -1;
+ }
+
+ /**
+ * Set the time of completion
+ *
+ * @param time the time of completion.
+ */
+ void completed(long time) {
+ if (this.completedNs == -1) {
+ this.completedNs = time;
+ } else {
+ logger.warn("Attempt Completed time already set, ignoring");
+ }
+ }
+
+ /**
+ * @return the time of when the connection became inactive.
+ */
+ long getDisconnected() {
+ return disconnectedNs;
+ }
+
+ /**
+ *
+ * @return the time of completion.
+ */
+ long getCompleted() {
+ return completedNs;
+ }
+
+ public int getAttempts() {
+ return attempts;
+ }
+
+ public int incrementAndGet() {
+ return ++attempts;
+ }
+
+ @Override
+ public String toString() {
+ return "Attempts{" + "attempts=" + attempts + ", disconnectedNs=" + disconnectedNs + ", completedNs=" + completedNs
+ + '}';
+ }
+
+ }
+
/**
* Create a new watchdog that adds to new connections to the supplied {@link ChannelGroup} and establishes a new
* {@link Channel} when disconnected, while reconnect is true. The socketAddressSupplier can supply the reconnect address.
@@ -118,8 +206,8 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
*/
public ConnectionWatchdog(Delay reconnectDelay, ClientOptions clientOptions, Bootstrap bootstrap, Timer timer,
EventExecutorGroup reconnectWorkers, Mono socketAddressSupplier,
- ReconnectionListener reconnectionListener, ConnectionFacade connectionFacade, EventBus eventBus,
- Endpoint endpoint) {
+ ReconnectionListener reconnectionListener, ConnectionFacade connectionFacade, EventBus eventBus, Endpoint endpoint,
+ ConnectionMonitor connectionMonitor) {
LettuceAssert.notNull(reconnectDelay, "Delay must not be null");
LettuceAssert.notNull(clientOptions, "ClientOptions must not be null");
@@ -141,6 +229,7 @@ public ConnectionWatchdog(Delay reconnectDelay, ClientOptions clientOptions, Boo
this.eventBus = eventBus;
this.redisUri = (String) bootstrap.config().attrs().get(ConnectionBuilder.REDIS_URI);
this.epid = endpoint.getId();
+ this.connectionMonitor = connectionMonitor;
Mono wrappedSocketAddressSupplier = socketAddressSupplier.doOnNext(addr -> remoteAddress = addr)
.onErrorResume(t -> {
@@ -172,24 +261,51 @@ void prepareClose() {
reconnectScheduleTimeout.cancel();
}
+ // connection is closing,
+ // reset attempts & record record the time of completion if in middle of reconnect
+ Attempts currentAttempts = this.attempts;
+ this.attempts = new Attempts();
+ if (currentAttempts.getDisconnected() > 0) {
+ // reconnected after disconnect
+ currentAttempts.completed(System.nanoTime());
+
+ connectionMonitor.recordDisconnectedTime(epid, currentAttempts.getCompleted() - currentAttempts.getDisconnected());
+ }
+
reconnectionHandler.prepareClose();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
- CommandHandler commandHandler = ctx.pipeline().get(CommandHandler.class);
-
reconnectSchedulerSync.set(false);
channel = ctx.channel();
reconnectScheduleTimeout = null;
logPrefix = null;
remoteAddress = channel.remoteAddress();
- attempts = 0;
+
+ // attempts = 0;
+ // reconnected successfully
+ // reset attempts & record record the time of completion
+ final Attempts currentAttempts = this.attempts;
+ this.attempts = new Attempts();
+ if (currentAttempts.getAttempts() > 0) {
+ // reconnected after disconnect
+ currentAttempts.completed(System.nanoTime());
+ connectionMonitor.recordDisconnectedTime(epid, currentAttempts.getCompleted() - currentAttempts.getDisconnected());
+ }
+
resetReconnectDelay();
logPrefix = null;
logger.debug("{} channelActive()", logPrefix());
+ // todo : Configurable enable disable proactive reconnect
+ ChannelPipeline pipeline = ctx.channel().pipeline();
+ PubSubCommandHandler, ?> commandHandler = pipeline.get(PubSubCommandHandler.class);
+ if (commandHandler != null) {
+ commandHandler.getEndpoint().addListener(this);
+ }
+
super.channelActive(ctx);
}
@@ -205,6 +321,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
channel = null;
if (listenOnChannelInactive && !reconnectionHandler.isReconnectSuspended()) {
+ attempts.disconnected(System.nanoTime());
scheduleReconnect();
} else {
logger.debug("{} Reconnect scheduling disabled", logPrefix(), ctx);
@@ -239,9 +356,11 @@ public void scheduleReconnect() {
}
if ((channel == null || !channel.isActive()) && reconnectSchedulerSync.compareAndSet(false, true)) {
+ // attempts++;
+ // final int attempt = attempts;
+ final int attempt = attempts.incrementAndGet();
+ connectionMonitor.incrementReconnectionAttempts(epid);
- attempts++;
- final int attempt = attempts;
Duration delay = reconnectDelay.createDelay(attempt);
int timeout = (int) delay.toMillis();
logger.debug("{} Reconnect attempt {}, delay {}ms", logPrefix(), attempt, timeout);
@@ -275,7 +394,6 @@ public void scheduleReconnect() {
* the same handler instances contained in the old channel's pipeline.
*
* @param attempt attempt counter
- *
* @throws Exception when reconnection fails.
*/
public void run(int attempt) throws Exception {
@@ -288,7 +406,6 @@ public void run(int attempt) throws Exception {
*
* @param attempt attempt counter.
* @param delay retry delay.
- *
* @throws Exception when reconnection fails.
*/
private void run(int attempt, Duration delay) throws Exception {
@@ -330,12 +447,15 @@ private void run(int attempt, Duration delay) throws Exception {
eventBus.publish(new ReconnectAttemptEvent(redisUri, epid, LocalAddress.ANY, remoteAddress, attempt, delay));
logger.log(infoLevel, "Reconnecting, last destination was {}", remoteAddress);
- Tuple2, CompletableFuture> tuple = reconnectionHandler.reconnect();
+ Tuple2, CompletableFuture> tuple = rebindAddress == null
+ ? reconnectionHandler.reconnect()
+ : reconnectionHandler.reconnect(rebindAddress);
CompletableFuture future = tuple.getT1();
future.whenComplete((c, t) -> {
if (c != null && t == null) {
+ this.channel.attr(REBIND_ATTRIBUTE).set(null);
return;
}
@@ -436,4 +556,61 @@ private String logPrefix() {
return logPrefix = buffer;
}
+ @Override
+ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+
+ if (ctx.channel() != null && ctx.channel().isActive() && ctx.channel().hasAttr(REBIND_ATTRIBUTE)
+ && ctx.channel().attr(REBIND_ATTRIBUTE).get() == RebindState.COMPLETED) {
+ logger.debug("Disconnecting at {}", LocalTime.now());
+ ctx.channel().close().awaitUninterruptibly();
+ // FIXME this is currently only used to notify in an awkward way the ChannelExpiryWriter
+ eventBus.publish(new RebindCompletedEvent());
+ }
+
+ super.channelReadComplete(ctx);
+ }
+
+ @Override
+ public void onPushMessage(PushMessage message) {
+ if (!message.getType().equals("message")) {
+ return;
+ }
+
+ List content = message.getContent().stream()
+ .map(ez -> ez instanceof ByteBuffer ? StringCodec.UTF8.decodeKey((ByteBuffer) ez) : ez.toString())
+ .collect(Collectors.toList());
+
+ if (content.stream().anyMatch(c -> c.contains("type=rebind"))) {
+ logger.info("Attempting to rebind to new endpoint '{}'", getRemoteAddress(content));
+
+ channel.attr(REBIND_ATTRIBUTE).set(RebindState.STARTED);
+ this.rebindAddress = getRemoteAddress(content);
+
+ ChannelPipeline pipeline = channel.pipeline();
+ PubSubCommandHandler, ?> commandHandler = pipeline.get(PubSubCommandHandler.class);
+ if (commandHandler.getStack().isEmpty()) {
+ channel.close().awaitUninterruptibly();
+ channel.attr(REBIND_ATTRIBUTE).set(RebindState.COMPLETED);
+ } else {
+ // FIXME this is currently only used to notify in an awkward way the ChannelExpiryWriter
+ eventBus.publish(new RebindInitiatedEvent());
+ }
+ }
+ }
+
+ private SocketAddress getRemoteAddress(List messageContents) {
+
+ final String payload = messageContents.stream().filter(c -> c.contains("to_ep")).findFirst()
+ .orElse("type=rebind;from_ep=localhost:6479;to_ep=localhost:6379;until_s=10");
+
+ final String toEndpoint = Arrays.stream(payload.split(";")).filter(c -> c.contains("to_ep")).findFirst()
+ .orElse("to_ep=localhost:6479");
+
+ final String addressAndPort = toEndpoint.split("=")[1];
+ final String address = addressAndPort.split(":")[0];
+ final int port = Integer.parseInt(addressAndPort.split(":")[1]);
+
+ return new InetSocketAddress(address, port);
+ }
+
}
diff --git a/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java b/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java
index 79f2f05f16..6adafac71b 100644
--- a/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java
+++ b/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java
@@ -20,6 +20,7 @@
package io.lettuce.core.protocol;
import static io.lettuce.core.protocol.CommandHandler.*;
+import static io.lettuce.core.protocol.ConnectionWatchdog.REBIND_ATTRIBUTE;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
@@ -45,6 +46,9 @@
import io.lettuce.core.internal.Futures;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.internal.LettuceFactories;
+import io.lettuce.core.rebind.RebindState;
+import io.lettuce.core.metrics.EndpointQueueMonitor;
+import io.lettuce.core.metrics.EndpointQueueMonitor.QueueId;
import io.lettuce.core.resource.ClientResources;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@@ -148,6 +152,17 @@ public DefaultEndpoint(ClientOptions clientOptions, ClientResources clientResour
this.boundedQueues = clientOptions.getRequestQueueSize() != Integer.MAX_VALUE;
this.rejectCommandsWhileDisconnected = isRejectCommand(clientOptions);
this.cachedEndpointId = "0x" + Long.toHexString(endpointId);
+
+ // Commands submitted netty queue but still not written to the channel
+ EndpointQueueMonitor endpointQueueMonitor = clientResources.endpointQueueMonitor();
+ if (endpointQueueMonitor != null) {
+ endpointQueueMonitor.observeQueueSize(QueueId.create("lettuce.endpoint.command.queue", getId()),
+ () -> QUEUE_SIZE.get(this));
+ endpointQueueMonitor.observeQueueSize(QueueId.create("lettuce.endpoint.disconnected.buffer", getId()),
+ disconnectedBuffer::size);
+ endpointQueueMonitor.observeQueueSize(QueueId.create("lettuce.endpoint.command.buffer", getId()),
+ commandBuffer::size);
+ }
}
@Override
@@ -812,7 +827,16 @@ private void cancelCommands(String message, Iterable extends RedisCommand, ?
}
private boolean isConnected(Channel channel) {
- return channel != null && channel.isActive();
+
+ if (channel == null || !channel.isActive()) {
+ return false;
+ }
+
+ if (channel.hasAttr(REBIND_ATTRIBUTE)) {
+ return channel.attr(REBIND_ATTRIBUTE).get() != RebindState.STARTED;
+ }
+
+ return true;
}
protected String logPrefix() {
@@ -1054,20 +1078,8 @@ private void potentiallyRequeueCommands(Channel channel, RedisCommand, ?, ?> s
return;
}
- if (sentCommands != null) {
-
- boolean foundToSend = false;
-
- for (RedisCommand, ?, ?> command : sentCommands) {
- if (!command.isDone()) {
- foundToSend = true;
- break;
- }
- }
-
- if (!foundToSend) {
- return;
- }
+ if (sentCommands != null && sentCommands.stream().allMatch(RedisCommand::isDone)) {
+ return;
}
if (channel != null) {
diff --git a/src/main/java/io/lettuce/core/protocol/Endpoint.java b/src/main/java/io/lettuce/core/protocol/Endpoint.java
index aa55ac43b7..201ee0dd9b 100644
--- a/src/main/java/io/lettuce/core/protocol/Endpoint.java
+++ b/src/main/java/io/lettuce/core/protocol/Endpoint.java
@@ -1,6 +1,7 @@
package io.lettuce.core.protocol;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
/**
* Wraps a stateful {@link Endpoint} that abstracts the underlying channel. Endpoints may be connected, disconnected and in
diff --git a/src/main/java/io/lettuce/core/protocol/ReconnectionHandler.java b/src/main/java/io/lettuce/core/protocol/ReconnectionHandler.java
index 8bf183b426..04ab026c42 100644
--- a/src/main/java/io/lettuce/core/protocol/ReconnectionHandler.java
+++ b/src/main/java/io/lettuce/core/protocol/ReconnectionHandler.java
@@ -114,6 +114,16 @@ protected Tuple2, CompletableFuture> r
return Tuples.of(future, address);
}
+ protected Tuple2, CompletableFuture> reconnect(SocketAddress remoteAddress) {
+ CompletableFuture future = new CompletableFuture<>();
+ CompletableFuture address = new CompletableFuture<>();
+
+ reconnect0(future, remoteAddress);
+
+ this.currentFuture = future;
+ return Tuples.of(future, address);
+ }
+
private void reconnect0(CompletableFuture result, SocketAddress remoteAddress) {
ChannelHandler handler = bootstrap.config().handler();
diff --git a/src/main/java/io/lettuce/core/rebind/RebindCompletedEvent.java b/src/main/java/io/lettuce/core/rebind/RebindCompletedEvent.java
new file mode 100644
index 0000000000..0ce22b44aa
--- /dev/null
+++ b/src/main/java/io/lettuce/core/rebind/RebindCompletedEvent.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2025, Redis Ltd. and Contributors
+ * All rights reserved.
+ *
+ * Licensed under the MIT License.
+ *
+ * This file contains contributions from third-party contributors
+ * licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.lettuce.core.rebind;
+
+import io.lettuce.core.event.Event;
+
+public class RebindCompletedEvent implements Event {
+
+}
diff --git a/src/main/java/io/lettuce/core/rebind/RebindInitiatedEvent.java b/src/main/java/io/lettuce/core/rebind/RebindInitiatedEvent.java
new file mode 100644
index 0000000000..77e7c07f8f
--- /dev/null
+++ b/src/main/java/io/lettuce/core/rebind/RebindInitiatedEvent.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2025, Redis Ltd. and Contributors
+ * All rights reserved.
+ *
+ * Licensed under the MIT License.
+ *
+ * This file contains contributions from third-party contributors
+ * licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.lettuce.core.rebind;
+
+import io.lettuce.core.event.Event;
+
+import java.net.SocketAddress;
+
+public class RebindInitiatedEvent implements Event {
+
+}
diff --git a/src/main/java/io/lettuce/core/rebind/RebindState.java b/src/main/java/io/lettuce/core/rebind/RebindState.java
new file mode 100644
index 0000000000..3c6530f7e0
--- /dev/null
+++ b/src/main/java/io/lettuce/core/rebind/RebindState.java
@@ -0,0 +1,12 @@
+/*
+ * Copyright 2025, Redis Ltd. and Contributors
+ * All rights reserved.
+ *
+ * Licensed under the MIT License.
+ */
+
+package io.lettuce.core.rebind;
+
+public enum RebindState {
+ STARTED, COMPLETED
+}
diff --git a/src/main/java/io/lettuce/core/resource/ClientResources.java b/src/main/java/io/lettuce/core/resource/ClientResources.java
index 4d82d4ac08..ddf4e850d0 100644
--- a/src/main/java/io/lettuce/core/resource/ClientResources.java
+++ b/src/main/java/io/lettuce/core/resource/ClientResources.java
@@ -27,6 +27,8 @@
import io.lettuce.core.metrics.CommandLatencyCollector;
import io.lettuce.core.metrics.CommandLatencyCollectorOptions;
import io.lettuce.core.metrics.CommandLatencyRecorder;
+import io.lettuce.core.metrics.ConnectionMonitor;
+import io.lettuce.core.metrics.EndpointQueueMonitor;
import io.lettuce.core.tracing.Tracing;
import io.netty.resolver.AddressResolverGroup;
import io.netty.util.Timer;
@@ -130,6 +132,10 @@ default Builder commandLatencyCollector(CommandLatencyCollector commandLatencyCo
*/
Builder commandLatencyRecorder(CommandLatencyRecorder latencyRecorder);
+ Builder connectionMonitor(ConnectionMonitor connectionMonitor);
+
+ Builder endpointQueueMonitor(EndpointQueueMonitor queueMonitor);
+
/**
* Sets the {@link CommandLatencyCollectorOptions} that can be used across different instances of the RedisClient. The
* options are only effective if no {@code commandLatencyCollector} is provided.
@@ -349,6 +355,10 @@ default Builder commandLatencyCollector(CommandLatencyCollector commandLatencyCo
*/
CommandLatencyRecorder commandLatencyRecorder();
+ ConnectionMonitor connectionMonitor();
+
+ EndpointQueueMonitor endpointQueueMonitor();
+
/**
* Return the pool size (number of threads) for all computation tasks.
*
diff --git a/src/main/java/io/lettuce/core/resource/DefaultClientResources.java b/src/main/java/io/lettuce/core/resource/DefaultClientResources.java
index a165978df6..4fd6539af2 100644
--- a/src/main/java/io/lettuce/core/resource/DefaultClientResources.java
+++ b/src/main/java/io/lettuce/core/resource/DefaultClientResources.java
@@ -33,9 +33,11 @@
import io.lettuce.core.metrics.CommandLatencyCollector;
import io.lettuce.core.metrics.CommandLatencyCollectorOptions;
import io.lettuce.core.metrics.CommandLatencyRecorder;
+import io.lettuce.core.metrics.ConnectionMonitor;
import io.lettuce.core.metrics.DefaultCommandLatencyCollector;
import io.lettuce.core.metrics.DefaultCommandLatencyCollectorOptions;
import io.lettuce.core.metrics.MetricCollector;
+import io.lettuce.core.metrics.EndpointQueueMonitor;
import io.lettuce.core.resource.Delay.StatefulDelay;
import io.lettuce.core.tracing.Tracing;
import io.netty.resolver.AddressResolverGroup;
@@ -131,6 +133,10 @@ public class DefaultClientResources implements ClientResources {
private final CommandLatencyRecorder commandLatencyRecorder;
+ private final ConnectionMonitor connectionMonitor;
+
+ private final EndpointQueueMonitor endpointQueueMonitor;
+
private final boolean sharedCommandLatencyRecorder;
private final EventPublisherOptions commandLatencyPublisherOptions;
@@ -220,6 +226,9 @@ protected DefaultClientResources(Builder builder) {
eventBus = builder.eventBus;
}
+ connectionMonitor = builder.connectionMonitor;
+ endpointQueueMonitor = builder.queueMonitor;
+
if (builder.commandLatencyRecorder == null) {
if (DefaultCommandLatencyCollector.isAvailable()) {
if (builder.commandLatencyCollectorOptions != null) {
@@ -335,6 +344,10 @@ public static class Builder implements ClientResources.Builder {
private Runnable afterBuild;
+ private ConnectionMonitor connectionMonitor = ConnectionMonitor.disabled();
+
+ private EndpointQueueMonitor queueMonitor = EndpointQueueMonitor.disabled();
+
private Builder() {
}
@@ -408,6 +421,20 @@ public Builder commandLatencyCollectorOptions(CommandLatencyCollectorOptions com
return this;
}
+ @Override
+ public Builder connectionMonitor(ConnectionMonitor connectionMonitor) {
+ LettuceAssert.notNull(connectionMonitor, "ConnectionMonitor must not be null");
+ this.connectionMonitor = connectionMonitor;
+ return this;
+ }
+
+ @Override
+ public Builder endpointQueueMonitor(EndpointQueueMonitor queueMonitor) {
+ LettuceAssert.notNull(queueMonitor, "QueueMonitor must not be null");
+ this.queueMonitor = queueMonitor;
+ return this;
+ }
+
/**
* Sets the {@link CommandLatencyRecorder} that can be used across different instances of the RedisClient.
*
@@ -771,6 +798,16 @@ public Future shutdown(long quietPeriod, long timeout, TimeUnit timeUni
return PromiseAdapter.toBooleanPromise(voidPromise);
}
+ @Override
+ public ConnectionMonitor connectionMonitor() {
+ return connectionMonitor;
+ }
+
+ @Override
+ public EndpointQueueMonitor endpointQueueMonitor() {
+ return endpointQueueMonitor;
+ }
+
@Override
public CommandLatencyRecorder commandLatencyRecorder() {
return commandLatencyRecorder;
diff --git a/src/test/java/biz/paluch/redis/extensibility/LettuceRebindDemo.java b/src/test/java/biz/paluch/redis/extensibility/LettuceRebindDemo.java
new file mode 100644
index 0000000000..2898dd4795
--- /dev/null
+++ b/src/test/java/biz/paluch/redis/extensibility/LettuceRebindDemo.java
@@ -0,0 +1,125 @@
+package biz.paluch.redis.extensibility;
+
+import io.lettuce.core.ClientOptions;
+import io.lettuce.core.RedisClient;
+import io.lettuce.core.RedisURI;
+import io.lettuce.core.TimeoutOptions;
+import io.lettuce.core.api.push.PushListener;
+import io.lettuce.core.api.push.PushMessage;
+import io.lettuce.core.codec.StringCodec;
+import io.lettuce.core.event.EventBus;
+import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
+import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+
+public class LettuceRebindDemo {
+
+ public static final Logger logger = Logger.getLogger(LettuceRebindDemo.class.getName());
+
+ public static final String KEY = "rebind:" + UUID.randomUUID().getLeastSignificantBits();
+
+ public static void main(String[] args) throws ExecutionException, InterruptedException {
+
+ // NEW! No need for a custom handler
+
+ TimeoutOptions timeoutOpts = TimeoutOptions.builder().timeoutCommands().fixedTimeout(Duration.ofMillis(50))
+ // NEW! control that during timeouts we need to relax the timeouts
+ .proactiveTimeoutsRelaxing(Duration.ofMillis(500)).build();
+ ClientOptions options = ClientOptions.builder().timeoutOptions(timeoutOpts).build();
+
+ RedisClient redisClient = RedisClient.create(RedisURI.Builder.redis("localhost", 6379).build());
+ redisClient.setOptions(options);
+
+ // Monitor connection events
+ EventBus eventBus = redisClient.getResources().eventBus();
+ eventBus.get().subscribe(e -> {
+ logger.info(">>> Event bus received: {} " + e);
+ });
+
+ // Subscribe to __rebind channel (REMOVE ONCE WE START RECEIVING THESE WITHOUT SUBSCRIPTION)
+ StatefulRedisPubSubConnection redis = redisClient.connectPubSub();
+ RedisPubSubAsyncCommands commands = redis.async();
+ commands.subscribe("__rebind").get();
+
+ // Used to stop the demo by sending the following command:
+ // publish __rebind "type=stop_demo"
+ Control control = new Control();
+ redis.addListener(control);
+
+ // Used to initiate the proactive rebind by sending the following command
+ // publish __rebind "type=rebind;from_ep=localhost:6379;to_ep=localhost:6479;until_s=10"
+
+ ExecutorService executorService = new ThreadPoolExecutor(5, // core pool size
+ 10, // maximum pool size
+ 60, TimeUnit.SECONDS, // idle thread keep-alive time
+ new ArrayBlockingQueue<>(20), // work queue size
+ new ThreadPoolExecutor.DiscardPolicy()); // rejection policy
+
+ try {
+ while (control.shouldContinue) {
+ executorService.execute(new DemoWorker(commands));
+ Thread.sleep(1);
+ }
+
+ if (executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+ logger.info("Executor service terminated");
+ } else {
+ logger.warning("Executor service did not terminate in the specified time");
+ }
+
+ } finally {
+ executorService.shutdownNow();
+ }
+
+ redis.close();
+ redisClient.shutdown();
+ }
+
+ static class DemoWorker implements Runnable {
+
+ private final RedisPubSubAsyncCommands commands;
+
+ public DemoWorker(RedisPubSubAsyncCommands commands) {
+ this.commands = commands;
+ }
+
+ @Override
+ public void run() {
+ try {
+ commands.incr(KEY).get();
+ } catch (InterruptedException | ExecutionException e) {
+ logger.severe("ExecutionException: " + e.getMessage());
+ }
+ }
+
+ }
+
+ static class Control implements PushListener {
+
+ public boolean shouldContinue = true;
+
+ @Override
+ public void onPushMessage(PushMessage message) {
+ List content = message.getContent().stream().filter(ez -> ez instanceof ByteBuffer)
+ .map(ez -> StringCodec.UTF8.decodeKey((ByteBuffer) ez)).collect(Collectors.toList());
+
+ if (content.stream().anyMatch(c -> c.contains("type=stop_demo"))) {
+ logger.info("Control received message to stop the demo");
+ shouldContinue = false;
+ }
+ }
+
+ }
+
+}
diff --git a/src/test/java/io/lettuce/core/ConnectionMonitorIntegrationTests.java b/src/test/java/io/lettuce/core/ConnectionMonitorIntegrationTests.java
new file mode 100644
index 0000000000..712f799a4b
--- /dev/null
+++ b/src/test/java/io/lettuce/core/ConnectionMonitorIntegrationTests.java
@@ -0,0 +1,95 @@
+package io.lettuce.core;
+
+import io.lettuce.core.api.StatefulRedisConnection;
+import io.lettuce.core.api.sync.RedisCommands;
+import io.lettuce.core.metrics.MicrometerConnectionMonitor;
+import io.lettuce.core.metrics.MicrometerOptions;
+import io.lettuce.core.resource.ClientResources;
+import io.lettuce.test.LettuceExtension;
+import io.lettuce.test.Wait;
+import io.lettuce.test.resource.TestClientResources;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+import static io.lettuce.TestTags.INTEGRATION_TEST;
+import static io.lettuce.core.metrics.MicrometerConnectionMonitor.METRIC_RECONNECTION_INACTIVE_TIME;
+import static io.lettuce.core.metrics.MicrometerConnectionMonitor.METRIC_RECONNECTION_ATTEMPTS;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * @author Mark Paluch
+ */
+@Tag(INTEGRATION_TEST)
+@ExtendWith(LettuceExtension.class)
+class ConnectionMonitorIntegrationTests extends TestSupport {
+
+ private final MeterRegistry meterRegistry = new SimpleMeterRegistry();
+
+ private final ClientResources clientResources = TestClientResources.get();
+
+ @Test
+ void connectionInactiveTime() {
+
+ MicrometerOptions options = MicrometerOptions.create();
+ MicrometerConnectionMonitor monitor = new MicrometerConnectionMonitor(meterRegistry, options);
+ ClientResources resources = clientResources.mutate().connectionMonitor(monitor).build();
+
+ RedisClient client = RedisClient.create(resources, RedisURI.Builder.redis(host, port).build());
+
+ StatefulRedisConnection connection = client.connect();
+
+ // Force disconnection
+ connection.sync().quit();
+ Wait.untilTrue(() -> !connection.isOpen()).during(Duration.ofSeconds(1)).waitOrTimeout();
+
+ // Wait for successful reconnection
+ Wait.untilTrue(connection::isOpen).during(Duration.ofSeconds(1)).waitOrTimeout();
+
+ // At least one reconnect attempt
+ assertThat(meterRegistry.find(METRIC_RECONNECTION_ATTEMPTS).counter().count()).isGreaterThanOrEqualTo(1);
+
+ assertThat(meterRegistry.find(METRIC_RECONNECTION_INACTIVE_TIME).timers()).isNotEmpty();
+ assertThat(meterRegistry.find(METRIC_RECONNECTION_INACTIVE_TIME).timer().count()).isEqualTo(1);
+ double totalTime = meterRegistry.find(METRIC_RECONNECTION_INACTIVE_TIME).timer().totalTime(TimeUnit.NANOSECONDS);
+ assertThat(totalTime).isGreaterThan(0);
+
+ connection.close();
+
+ assertThat(meterRegistry.find(METRIC_RECONNECTION_INACTIVE_TIME).timer().count()).isEqualTo(1);
+ assertThat(meterRegistry.find(METRIC_RECONNECTION_INACTIVE_TIME).timer().totalTime(TimeUnit.NANOSECONDS))
+ .isEqualTo(totalTime);
+
+ }
+
+ @Test
+ void connectionInactiveTimeAutoReconnectDisabled() {
+
+ MicrometerOptions options = MicrometerOptions.create();
+ MicrometerConnectionMonitor monitor = new MicrometerConnectionMonitor(meterRegistry, options);
+ ClientResources resources = clientResources.mutate().connectionMonitor(monitor).build();
+
+ ClientOptions clientOptions = ClientOptions.builder().autoReconnect(false).build();
+
+ RedisClient client = RedisClient.create(resources, RedisURI.Builder.redis(host, port).build());
+ client.setOptions(clientOptions);
+
+ try (StatefulRedisConnection connection = client.connect()) {
+ RedisCommands redis = connection.sync();
+
+ // Force disconnection
+ redis.quit();
+ Wait.untilTrue(() -> !connection.isOpen()).during(Duration.ofSeconds(1)).waitOrTimeout();
+ }
+
+ // Connection is closed, with auto-reconnect disabled, no metrics are recorded
+ assertThat(meterRegistry.find(METRIC_RECONNECTION_ATTEMPTS).counters()).isEmpty();
+ assertThat(meterRegistry.find(METRIC_RECONNECTION_INACTIVE_TIME).timers()).isEmpty();
+ }
+
+}
diff --git a/src/test/java/io/lettuce/core/EndpointQueueMonitorIntegrationTests.java b/src/test/java/io/lettuce/core/EndpointQueueMonitorIntegrationTests.java
new file mode 100644
index 0000000000..a8a6933992
--- /dev/null
+++ b/src/test/java/io/lettuce/core/EndpointQueueMonitorIntegrationTests.java
@@ -0,0 +1,111 @@
+package io.lettuce.core;
+
+import io.lettuce.core.api.StatefulRedisConnection;
+import io.lettuce.core.api.async.RedisAsyncCommands;
+import io.lettuce.core.metrics.EndpointQueueMonitor;
+import io.lettuce.core.metrics.MicrometerOptions;
+import io.lettuce.core.metrics.MicrometerQueueMonitor;
+import io.lettuce.core.resource.ClientResources;
+import io.lettuce.test.LettuceExtension;
+import io.lettuce.test.Wait;
+import io.lettuce.test.resource.TestClientResources;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+
+import static io.lettuce.TestTags.INTEGRATION_TEST;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * @author Mark Paluch
+ */
+@Tag(INTEGRATION_TEST)
+@ExtendWith(LettuceExtension.class)
+class EndpointQueueMonitorIntegrationTests extends TestSupport {
+
+ private final static MeterRegistry meterRegistry = new SimpleMeterRegistry();
+
+ private static RedisClient client;
+
+ private static ClientResources resources;
+
+ @BeforeAll
+ static void beforeAll() {
+ MicrometerOptions options = MicrometerOptions.create();
+ EndpointQueueMonitor monitor = new MicrometerQueueMonitor(meterRegistry, options);
+ resources = TestClientResources.get().mutate().endpointQueueMonitor(monitor).build();
+ client = RedisClient.create(resources, RedisURI.Builder.redis(host, port).build());
+ }
+
+ @BeforeEach
+ void setUp() {
+ client.setOptions(ClientOptions.builder().build());
+ }
+
+ @AfterEach
+ void tearDown() {
+ meterRegistry.clear();
+ }
+
+ @Test
+ void queueMonitorDisconnectedBuffer() {
+
+ ClientOptions clientOptions = ClientOptions.builder().autoReconnect(false)
+ .disconnectedBehavior(ClientOptions.DisconnectedBehavior.ACCEPT_COMMANDS).build();
+ client.setOptions(clientOptions);
+
+ try (StatefulRedisConnection connection = client.connect()) {
+ RedisAsyncCommands redis = connection.async();
+
+ // Force disconnection
+ redis.quit();
+ Wait.untilTrue(() -> !connection.isOpen()).during(Duration.ofSeconds(1)).waitOrTimeout();
+
+ redis.set("key", "value");
+
+ assertThat(meterRegistry.find("lettuce.endpoint.disconnected.buffer").gauge().value()).isEqualTo(1);
+ }
+ }
+
+ @Test
+ void queueMonitorCommandsBuffer() {
+
+ RedisClient client = RedisClient.create(resources, RedisURI.Builder.redis(host, port).build());
+
+ try (StatefulRedisConnection connection = client.connect()) {
+ RedisAsyncCommands redis = connection.async();
+ redis.setAutoFlushCommands(false);
+ redis.set("key", "value");
+
+ assertThat(meterRegistry.find("lettuce.endpoint.command.buffer").gauge().value()).isEqualTo(1);
+ redis.flushCommands();
+ assertThat(meterRegistry.find("lettuce.endpoint.command.buffer").gauge().value()).isEqualTo(0);
+ }
+ }
+
+ @Test
+ void queueMonitorCommandHandlerStackSize() throws ExecutionException, InterruptedException {
+
+ try (StatefulRedisConnection connection = client.connect();
+ StatefulRedisConnection connection2 = client.connect()) {
+ RedisAsyncCommands redis = connection.async();
+ RedisFuture> blpop = redis.blpop(1, "blpop:key");
+ assertThat(meterRegistry.find("lettuce.command.handler.queue").gauge().value()).isEqualTo(1);
+
+ Long lpush = connection2.sync().lpush("blpop:key", "value");
+ assertThat(lpush).isEqualTo(1);
+ assertThat(blpop.get()).isNotNull();
+
+ assertThat(meterRegistry.find("lettuce.command.handler.queue").gauge().value()).isEqualTo(0);
+ }
+ }
+
+}
diff --git a/src/test/java/io/lettuce/core/metrics/MicrometerConnectionMonitorUnitTest.java b/src/test/java/io/lettuce/core/metrics/MicrometerConnectionMonitorUnitTest.java
new file mode 100644
index 0000000000..a78e96b9cf
--- /dev/null
+++ b/src/test/java/io/lettuce/core/metrics/MicrometerConnectionMonitorUnitTest.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2011-Present, Redis Ltd. and Contributors
+ * All rights reserved.
+ *
+ * Licensed under the MIT License.
+ *
+ * This file contains contributions from third-party contributors
+ * licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lettuce.core.metrics;
+
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Tags;
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import static io.lettuce.TestTags.UNIT_TEST;
+import static io.lettuce.core.metrics.MicrometerConnectionMonitor.LABEL_EPID;
+import static io.lettuce.core.metrics.MicrometerConnectionMonitor.METRIC_RECONNECTION_INACTIVE_TIME;
+import static org.assertj.core.api.Assertions.assertThat;
+
+@Tag(UNIT_TEST)
+class MicrometerConnectionMonitorUnitTest {
+
+ private final MeterRegistry meterRegistry = new SimpleMeterRegistry();
+
+ @Test
+ void disabled() {
+
+ MicrometerOptions options = MicrometerOptions.disabled();
+ MicrometerConnectionMonitor monitor = new MicrometerConnectionMonitor(meterRegistry, options);
+
+ monitor.recordDisconnectedTime("1", 1);
+ assertThat(meterRegistry.find(METRIC_RECONNECTION_INACTIVE_TIME).timers()).isEmpty();
+ assertThat(meterRegistry.find(METRIC_RECONNECTION_INACTIVE_TIME).timers()).isEmpty();
+ }
+
+ @Test
+ void tags() {
+
+ Tags tags = Tags.of("app", "foo");
+ MicrometerOptions options = MicrometerOptions.builder().tags(tags).build();
+ MicrometerConnectionMonitor monitor = new MicrometerConnectionMonitor(meterRegistry, options);
+
+ monitor.recordDisconnectedTime("1", 1);
+
+ assertThat(meterRegistry.find(METRIC_RECONNECTION_INACTIVE_TIME).tags(tags).timers()).hasSize(1);
+ assertThat(meterRegistry.find(METRIC_RECONNECTION_INACTIVE_TIME).tag(LABEL_EPID, "1").timers()).hasSize(1);
+ }
+
+}
diff --git a/src/test/resources/log4j2-test.xml b/src/test/resources/log4j2-test.xml
index 981918b55e..40d2e514e2 100644
--- a/src/test/resources/log4j2-test.xml
+++ b/src/test/resources/log4j2-test.xml
@@ -11,7 +11,7 @@
-
+