diff --git a/src/main/java/io/lettuce/core/ConnectionBuilder.java b/src/main/java/io/lettuce/core/ConnectionBuilder.java index 4bb3127b88..e123502a2b 100644 --- a/src/main/java/io/lettuce/core/ConnectionBuilder.java +++ b/src/main/java/io/lettuce/core/ConnectionBuilder.java @@ -155,7 +155,7 @@ protected ConnectionWatchdog createConnectionWatchdog() { ConnectionWatchdog watchdog = new ConnectionWatchdog(clientResources.reconnectDelay(), clientOptions, bootstrap, clientResources.timer(), clientResources.eventExecutorGroup(), socketAddressSupplier, reconnectionListener, - connection, clientResources.eventBus(), endpoint); + connection, clientResources.eventBus(), endpoint, clientResources.connectionMonitor()); endpoint.registerConnectionWatchdog(watchdog); diff --git a/src/main/java/io/lettuce/core/TimeoutOptions.java b/src/main/java/io/lettuce/core/TimeoutOptions.java index a5754f1819..9502de5b3e 100644 --- a/src/main/java/io/lettuce/core/TimeoutOptions.java +++ b/src/main/java/io/lettuce/core/TimeoutOptions.java @@ -24,18 +24,26 @@ @SuppressWarnings("serial") public class TimeoutOptions implements Serializable { + public static final Duration DISABLED_TIMEOUT = Duration.ZERO.minusSeconds(1); + public static final boolean DEFAULT_TIMEOUT_COMMANDS = false; + public static final Duration DEFAULT_RELAXED_TIMEOUT = DISABLED_TIMEOUT; + private final boolean timeoutCommands; private final boolean applyConnectionTimeout; + private final Duration relaxedTimeout; + private final TimeoutSource source; - private TimeoutOptions(boolean timeoutCommands, boolean applyConnectionTimeout, TimeoutSource source) { + private TimeoutOptions(boolean timeoutCommands, boolean applyConnectionTimeout, TimeoutSource source, + Duration relaxedTimeout) { this.timeoutCommands = timeoutCommands; this.applyConnectionTimeout = applyConnectionTimeout; + this.relaxedTimeout = relaxedTimeout; this.source = source; } @@ -84,6 +92,8 @@ public static class Builder { private boolean applyConnectionTimeout = false; + private Duration relaxedTimeout = DEFAULT_RELAXED_TIMEOUT; + private TimeoutSource source; /** @@ -107,6 +117,24 @@ public Builder timeoutCommands(boolean enabled) { return this; } + /** + * Enable proactive timeout relaxing. Disabled by default, see {@link #DEFAULT_RELAXED_TIMEOUT}. + *

+ * If the Redis server supports this, the client could listen to notifications that the current endpoint is about to go + * down as part of some maintenance activity, for example. In such cases, the driver could extend the existing timeout + * settings for existing commands to make sure they do not time out during this process either as part of the offline + * buffer or while waiting for a reply. + * + * @param duration {@link Duration} to relax timeouts proactively, must not be {@code null}. + * @return {@code this} + */ + public Builder proactiveTimeoutsRelaxing(Duration duration) { + LettuceAssert.notNull(duration, "Duration must not be null"); + + this.relaxedTimeout = duration; + return this; + } + /** * Set a fixed timeout for all commands. * @@ -158,7 +186,7 @@ public TimeoutOptions build() { } } - return new TimeoutOptions(timeoutCommands, applyConnectionTimeout, source); + return new TimeoutOptions(timeoutCommands, applyConnectionTimeout, source, relaxedTimeout); } } @@ -177,6 +205,13 @@ public boolean isApplyConnectionTimeout() { return applyConnectionTimeout; } + /** + * @return the {@link Duration} to relax timeouts proactively, {@link #DISABLED_TIMEOUT} if disabled. + */ + public Duration getRelaxedTimeout() { + return relaxedTimeout; + } + /** * @return the timeout source to determine the timeout for a {@link RedisCommand}. Can be {@code null} if * {@link #isTimeoutCommands()} is {@code false}. diff --git a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java index b5fa5cf196..4df92fd4db 100644 --- a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java +++ b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java @@ -1091,6 +1091,7 @@ private CompletionStage fetchPartitions(Iterable topologyR getClusterClientOptions().getSocketOptions().getConnectTimeout(), useDynamicRefreshSources()); return topology.thenApply(partitions -> { + logger.debug("Topology Refresh Views: {} ", partitions); if (partitions.isEmpty()) { throw new RedisException(String.format("Cannot retrieve initial cluster partitions from initial URIs %s", @@ -1108,7 +1109,7 @@ private CompletionStage fetchPartitions(Iterable topologyR } topologyRefreshScheduler.activateTopologyRefreshIfNeeded(); - + logger.debug("Topology Refresh loadedPartitions: {}", loadedPartitions); return loadedPartitions; }); } diff --git a/src/main/java/io/lettuce/core/metrics/ConnectionMonitor.java b/src/main/java/io/lettuce/core/metrics/ConnectionMonitor.java new file mode 100644 index 0000000000..fdf4782f99 --- /dev/null +++ b/src/main/java/io/lettuce/core/metrics/ConnectionMonitor.java @@ -0,0 +1,33 @@ +package io.lettuce.core.metrics; + +public interface ConnectionMonitor { + + static ConnectionMonitor disabled() { + + return new ConnectionMonitor() { + + @Override + public void recordDisconnectedTime(String epid, long time) { + + } + + @Override + public void incrementReconnectionAttempts(String epid) { + + } + + @Override + public boolean isEnabled() { + return false; + } + + }; + } + + void recordDisconnectedTime(String epid, long time); + + void incrementReconnectionAttempts(String epid); + + boolean isEnabled(); + +} diff --git a/src/main/java/io/lettuce/core/metrics/EndpointQueueMonitor.java b/src/main/java/io/lettuce/core/metrics/EndpointQueueMonitor.java new file mode 100644 index 0000000000..ec5ea74484 --- /dev/null +++ b/src/main/java/io/lettuce/core/metrics/EndpointQueueMonitor.java @@ -0,0 +1,76 @@ +package io.lettuce.core.metrics; + +import io.lettuce.core.internal.LettuceAssert; + +import java.io.Serializable; +import java.util.Objects; +import java.util.function.Supplier; + +public interface EndpointQueueMonitor { + + class QueueId implements Serializable { + + private final String queueName; + + private final String epId; + + protected QueueId(String name, String epId) { + LettuceAssert.notNull(name, "name must not be null"); + + this.queueName = name; + this.epId = epId; + } + + public static QueueId create(String queueName, String epId) { + return new QueueId(queueName, epId); + } + + public String getEpId() { + return epId; + } + + public String getQueueName() { + return queueName; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (!(o instanceof QueueId)) + return false; + QueueId queueId = (QueueId) o; + return Objects.equals(queueName, queueId.queueName) && Objects.equals(epId, queueId.epId); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("[").append("epId=").append(epId).append(']'); + return sb.toString(); + } + + } + + static EndpointQueueMonitor disabled() { + + return new EndpointQueueMonitor() { + + @Override + public void observeQueueSize(QueueId queueId, Supplier queueSizeSupplier) { + + } + + @Override + public boolean isEnabled() { + return false; + } + + }; + } + + void observeQueueSize(QueueId queueId, Supplier queueSizeSupplier); + + boolean isEnabled(); + +} diff --git a/src/main/java/io/lettuce/core/metrics/MicrometerConnectionMonitor.java b/src/main/java/io/lettuce/core/metrics/MicrometerConnectionMonitor.java new file mode 100644 index 0000000000..e0462007e6 --- /dev/null +++ b/src/main/java/io/lettuce/core/metrics/MicrometerConnectionMonitor.java @@ -0,0 +1,138 @@ +/* + * Copyright 2011-Present, Redis Ltd. and Contributors + * All rights reserved. + * + * Licensed under the MIT License. + * + * This file contains contributions from third-party contributors + * licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core.metrics; + +import io.lettuce.core.internal.LettuceAssert; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Timer; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +/** + * Micrometer implementation for tracking connection metrics. + * + *

+ * + * @author Ivo Gaydajiev + * @since 6.7 + */ +public class MicrometerConnectionMonitor implements ConnectionMonitor { + + static final String LABEL_EPID = "epid"; + + // Track the time between a connection being disconnected and successfully reconnected or closed + public static final String METRIC_RECONNECTION_INACTIVE_TIME = "lettuce.reconnection.inactive.duration"; + + public static final String METRIC_RECONNECTION_ATTEMPTS = "lettuce.reconnection.attempts"; + + private final MeterRegistry meterRegistry; + + private final MicrometerOptions options; + + private final Map disconnectedTimers = new ConcurrentHashMap<>(); + + private final Map reconnectionAttempts = new ConcurrentHashMap<>(); + + /** + * Create a new {@link MicrometerConnectionMonitor} instance given {@link MeterRegistry} and {@link MicrometerOptions}. + * + * @param meterRegistry + * @param options + */ + public MicrometerConnectionMonitor(MeterRegistry meterRegistry, MicrometerOptions options) { + + LettuceAssert.notNull(meterRegistry, "MeterRegistry must not be null"); + LettuceAssert.notNull(options, "MicrometerOptions must not be null"); + + this.meterRegistry = meterRegistry; + this.options = options; + } + + @Override + public void recordDisconnectedTime(String epid, long time) { + + if (!isEnabled()) { + return; + } + + MonitoredConnectionId connectionId = createId(epid); + Timer inavtiveConnectionTimer = disconnectedTimers.computeIfAbsent(connectionId, this::inactiveConnectionTimer); + inavtiveConnectionTimer.record(time, TimeUnit.NANOSECONDS); + } + + @Override + public void incrementReconnectionAttempts(String epid) { + + if (!isEnabled()) { + return; + } + + MonitoredConnectionId connectionId = createId(epid); + + Counter recconectionAttemptsCounter = reconnectionAttempts.computeIfAbsent(connectionId, this::reconnectAttempts); + recconectionAttemptsCounter.increment(); + } + + @Override + public boolean isEnabled() { + return options.isEnabled(); + } + + private MonitoredConnectionId createId(String epId) { + return MonitoredConnectionId.create(epId); + } + + protected Timer inactiveConnectionTimer(MonitoredConnectionId connectionId) { + Timer.Builder timer = Timer.builder(METRIC_RECONNECTION_INACTIVE_TIME) + .description("Time taken for successful reconnection").tag(LABEL_EPID, connectionId.epId()) + .tags(options.tags()); + + if (options.isHistogram()) { + timer.publishPercentileHistogram().publishPercentiles(options.targetPercentiles()) + .minimumExpectedValue(options.minLatency()).maximumExpectedValue(options.maxLatency()); + } + + return timer.register(meterRegistry); + } + + protected Counter reconnectAttempts(MonitoredConnectionId connectionId) { + Counter.Builder timer = Counter.builder(METRIC_RECONNECTION_ATTEMPTS).description("Number of reconnection attempts") + .tag(LABEL_EPID, connectionId.epId()).tags(options.tags()); + + return timer.register(meterRegistry); + } + +} diff --git a/src/main/java/io/lettuce/core/metrics/MicrometerQueueMonitor.java b/src/main/java/io/lettuce/core/metrics/MicrometerQueueMonitor.java new file mode 100644 index 0000000000..1f02395c80 --- /dev/null +++ b/src/main/java/io/lettuce/core/metrics/MicrometerQueueMonitor.java @@ -0,0 +1,66 @@ +/* + * Copyright 2011-Present, Redis Ltd. and Contributors + * All rights reserved. + * + * Licensed under the MIT License. + * + * This file contains contributions from third-party contributors + * licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core.metrics; + +import io.lettuce.core.internal.LettuceAssert; +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; + +import java.util.function.Supplier; + +public class MicrometerQueueMonitor implements EndpointQueueMonitor { + + public static final String LABEL_EPID = "epId"; + + private final MeterRegistry meterRegistry; + + private final MicrometerOptions options; + + /** + * Create a new {@link MicrometerQueueMonitor} instance given {@link MeterRegistry} and {@link MicrometerOptions}. + * + * @param meterRegistry + * @param options + */ + public MicrometerQueueMonitor(MeterRegistry meterRegistry, MicrometerOptions options) { + + LettuceAssert.notNull(meterRegistry, "MeterRegistry must not be null"); + LettuceAssert.notNull(options, "MicrometerOptions must not be null"); + + this.meterRegistry = meterRegistry; + this.options = options; + } + + @Override + public boolean isEnabled() { + return options.isEnabled(); + } + + @Override + public void observeQueueSize(QueueId queueId, Supplier queueSizeSupplier) { + if (!isEnabled()) { + return; + } + + Gauge.builder(queueId.getQueueName(), queueSizeSupplier).description("Queue size").tag(LABEL_EPID, queueId.getEpId()) + .tags(options.tags()).register(meterRegistry); + } + +} diff --git a/src/main/java/io/lettuce/core/metrics/MonitoredConnectionId.java b/src/main/java/io/lettuce/core/metrics/MonitoredConnectionId.java new file mode 100644 index 0000000000..a813868ee2 --- /dev/null +++ b/src/main/java/io/lettuce/core/metrics/MonitoredConnectionId.java @@ -0,0 +1,77 @@ +package io.lettuce.core.metrics; + +import io.lettuce.core.internal.LettuceAssert; + +import java.io.Serializable; +import java.net.SocketAddress; + +/** + * Identifier for a connection + * + * @author Ivo Gaydajiev + */ +@SuppressWarnings("serial") +public class MonitoredConnectionId implements Serializable, Comparable { + + private final String epId; + + protected MonitoredConnectionId(String epId) { + LettuceAssert.notNull(epId, "EndpointId must not be null"); + + this.epId = epId; + } + + /** + * Create a new instance of {@link MonitoredConnectionId}. + * + * @param epId the endpoint id + * @return a new instance of {@link MonitoredConnectionId} + */ + public static MonitoredConnectionId create(String epId) { + return new MonitoredConnectionId(epId); + } + + /** + * Returns the command type. + * + * @return the command type + */ + public String epId() { + return epId; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (!(o instanceof MonitoredConnectionId)) + return false; + + MonitoredConnectionId that = (MonitoredConnectionId) o; + + return epId.equals(that.epId); + } + + @Override + public int hashCode() { + return epId.hashCode(); + } + + @Override + public int compareTo(MonitoredConnectionId o) { + + if (o == null) { + return -1; + } + + return epId.compareTo(o.epId); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("[").append("epId=").append(epId).append(']'); + return sb.toString(); + } + +} diff --git a/src/main/java/io/lettuce/core/protocol/CommandExpiryWriter.java b/src/main/java/io/lettuce/core/protocol/CommandExpiryWriter.java index 160deed187..33489fafcf 100644 --- a/src/main/java/io/lettuce/core/protocol/CommandExpiryWriter.java +++ b/src/main/java/io/lettuce/core/protocol/CommandExpiryWriter.java @@ -32,9 +32,12 @@ import io.lettuce.core.TimeoutOptions; import io.lettuce.core.internal.ExceptionFactory; import io.lettuce.core.internal.LettuceAssert; +import io.lettuce.core.rebind.RebindCompletedEvent; +import io.lettuce.core.rebind.RebindInitiatedEvent; import io.lettuce.core.resource.ClientResources; import io.netty.util.Timeout; import io.netty.util.Timer; +import reactor.core.Disposable; /** * Extension to {@link RedisChannelWriter} that expires commands. Command timeout starts at the time the command is written @@ -59,8 +62,16 @@ public class CommandExpiryWriter implements RedisChannelWriter { private final boolean applyConnectionTimeout; + private final Duration relaxedTimeout; + + private final Disposable rebindStatedListener; + + private final Disposable rebindEndedListener; + private volatile long timeout = -1; + private volatile boolean relaxTimeoutsGlobally = false; + /** * Create a new {@link CommandExpiryWriter}. * @@ -78,9 +89,26 @@ public CommandExpiryWriter(RedisChannelWriter delegate, ClientOptions clientOpti this.delegate = delegate; this.source = timeoutOptions.getSource(); this.applyConnectionTimeout = timeoutOptions.isApplyConnectionTimeout(); + this.relaxedTimeout = timeoutOptions.getRelaxedTimeout(); this.timeUnit = source.getTimeUnit(); this.executorService = clientResources.eventExecutorGroup(); this.timer = clientResources.timer(); + + this.rebindStatedListener = clientResources.eventBus().get().filter(e -> e instanceof RebindInitiatedEvent) + .subscribe(e -> { + this.relaxTimeoutsGlobally = true; + }); + + this.rebindEndedListener = clientResources.eventBus().get().filter(e -> e instanceof RebindCompletedEvent) + .subscribe(e -> { + // Consider the rebind complete after another relaxed timeout cycle. + // + // The reasoning behind that is we can't really be sure when all the enqueued commands have + // successfully been written to the wire and then the reply was received + timer.newTimeout(t -> getExecutorService().submit(() -> this.relaxTimeoutsGlobally = false), + relaxedTimeout.toMillis(), TimeUnit.MILLISECONDS); + + }); } /** @@ -144,6 +172,8 @@ public void flushCommands() { @Override public void close() { + this.rebindStatedListener.dispose(); + this.rebindEndedListener.dispose(); delegate.close(); } @@ -154,6 +184,7 @@ public CompletableFuture closeAsync() { @Override public void reset() { + this.relaxTimeoutsGlobally = false; delegate.reset(); } @@ -179,8 +210,14 @@ private void potentiallyExpire(RedisCommand command, ScheduledExecutorS Timeout commandTimeout = timer.newTimeout(t -> { if (!command.isDone()) { - executors.submit(() -> command.completeExceptionally(ExceptionFactory - .createTimeoutException(command.getType().toString(), Duration.ofNanos(timeUnit.toNanos(timeout))))); + executors.submit(() -> { + if (shouldRelaxTimeoutsGlobally()) { + relaxedAttempt(command, executors); + } else { + command.completeExceptionally(ExceptionFactory.createTimeoutException(command.getType().toString(), + Duration.ofNanos(timeUnit.toNanos(timeout)))); + } + }); } }, timeout, timeUnit); @@ -191,4 +228,22 @@ private void potentiallyExpire(RedisCommand command, ScheduledExecutorS } + public boolean shouldRelaxTimeoutsGlobally() { + return relaxTimeoutsGlobally && !relaxedTimeout.isNegative(); + } + + // when relaxing the timeouts - instead of expiring immediately, we will start a new timer with 10 seconds + private void relaxedAttempt(RedisCommand command, ScheduledExecutorService executors) { + + Timeout commandTimeout = timer.newTimeout(t -> { + if (!command.isDone()) { + executors.submit(() -> command.completeExceptionally(ExceptionFactory.createTimeoutException(relaxedTimeout))); + } + }, relaxedTimeout.toMillis(), TimeUnit.MILLISECONDS); + + if (command instanceof CompleteableCommand) { + ((CompleteableCommand) command).onComplete((o, o2) -> commandTimeout.cancel()); + } + } + } diff --git a/src/main/java/io/lettuce/core/protocol/CommandHandler.java b/src/main/java/io/lettuce/core/protocol/CommandHandler.java index 59aee61e0c..9e5ba9bd99 100644 --- a/src/main/java/io/lettuce/core/protocol/CommandHandler.java +++ b/src/main/java/io/lettuce/core/protocol/CommandHandler.java @@ -20,11 +20,13 @@ package io.lettuce.core.protocol; import static io.lettuce.core.ConnectionEvents.*; +import static io.lettuce.core.protocol.ConnectionWatchdog.REBIND_ATTRIBUTE; import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.charset.Charset; +import java.time.LocalTime; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; @@ -47,8 +49,11 @@ import io.lettuce.core.internal.LettuceAssert; import io.lettuce.core.internal.LettuceSets; import io.lettuce.core.metrics.CommandLatencyRecorder; +import io.lettuce.core.metrics.EndpointQueueMonitor; +import io.lettuce.core.metrics.EndpointQueueMonitor.QueueId; import io.lettuce.core.output.CommandOutput; import io.lettuce.core.output.PushOutput; +import io.lettuce.core.rebind.RebindState; import io.lettuce.core.resource.ClientResources; import io.lettuce.core.tracing.TraceContext; import io.lettuce.core.tracing.TraceContextProvider; @@ -165,6 +170,12 @@ public CommandHandler(ClientOptions clientOptions, ClientResources clientResourc this.tracingEnabled = tracing.isEnabled(); this.decodeBufferPolicy = clientOptions.getDecodeBufferPolicy(); + + EndpointQueueMonitor endpointQueueMonitor = clientResources.endpointQueueMonitor(); + if (endpointQueueMonitor != null) { + endpointQueueMonitor.observeQueueSize(QueueId.create("lettuce.command.handler.queue", endpoint.getId()), + stack::size); + } } public Endpoint getEndpoint() { @@ -625,6 +636,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException { + final boolean rebindInProgress = ctx.channel().hasAttr(REBIND_ATTRIBUTE) + && ctx.channel().attr(REBIND_ATTRIBUTE).get() != null + && ctx.channel().attr(REBIND_ATTRIBUTE).get().equals(RebindState.STARTED); + if (debugEnabled && rebindInProgress) { + logger.debug("{} Processing command while rebind is in progress, stack has {} more to process", logPrefix(), + stack.size()); + } if (pristine) { @@ -711,6 +729,11 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup } } + if (rebindInProgress && stack.isEmpty()) { + logger.info("{} Rebind completed at {}", logPrefix(), LocalTime.now()); + ctx.channel().attr(REBIND_ATTRIBUTE).set(RebindState.COMPLETED); + } + decodeBufferPolicy.afterDecoding(buffer); } diff --git a/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java b/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java index 84bcb41f1f..f4b62cba44 100644 --- a/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java +++ b/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java @@ -19,12 +19,29 @@ */ package io.lettuce.core.protocol; +import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.nio.ByteBuffer; import java.time.Duration; +import java.time.LocalTime; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; - +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import io.lettuce.core.metrics.ConnectionMonitor; +import io.lettuce.core.api.push.PushListener; +import io.lettuce.core.api.push.PushMessage; +import io.lettuce.core.codec.StringCodec; +import io.lettuce.core.pubsub.PubSubCommandHandler; +import io.lettuce.core.rebind.RebindCompletedEvent; +import io.lettuce.core.rebind.RebindInitiatedEvent; +import io.lettuce.core.rebind.RebindState; +import io.netty.util.AttributeKey; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; import io.lettuce.core.ClientOptions; @@ -59,7 +76,9 @@ * @author Koji Lin */ @ChannelHandler.Sharable -public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { +public class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements PushListener { + + public static final AttributeKey REBIND_ATTRIBUTE = AttributeKey.newInstance("rebindAddress"); private static final long LOGGING_QUIET_TIME_MS = TimeUnit.MILLISECONDS.convert(5, TimeUnit.SECONDS); @@ -93,7 +112,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { private final AtomicBoolean reconnectSchedulerSync; - private volatile int attempts; + private volatile Attempts attempts = new Attempts(); private volatile boolean armed; @@ -101,6 +120,75 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { private volatile Timeout reconnectScheduleTimeout; + private SocketAddress rebindAddress; + + private final ConnectionMonitor connectionMonitor; + + static class Attempts { + + private int attempts = 0; + + private long disconnectedNs = -1; + + private long completedNs = -1; + + /** + * Sets the time of when connection transitioned to disconnected state. + * + * @param time the time of when the item was sent. + */ + void disconnected(long time) { + if (this.disconnectedNs == -1) { + this.disconnectedNs = time; + } + + this.completedNs = -1; + } + + /** + * Set the time of completion + * + * @param time the time of completion. + */ + void completed(long time) { + if (this.completedNs == -1) { + this.completedNs = time; + } else { + logger.warn("Attempt Completed time already set, ignoring"); + } + } + + /** + * @return the time of when the connection became inactive. + */ + long getDisconnected() { + return disconnectedNs; + } + + /** + * + * @return the time of completion. + */ + long getCompleted() { + return completedNs; + } + + public int getAttempts() { + return attempts; + } + + public int incrementAndGet() { + return ++attempts; + } + + @Override + public String toString() { + return "Attempts{" + "attempts=" + attempts + ", disconnectedNs=" + disconnectedNs + ", completedNs=" + completedNs + + '}'; + } + + } + /** * Create a new watchdog that adds to new connections to the supplied {@link ChannelGroup} and establishes a new * {@link Channel} when disconnected, while reconnect is true. The socketAddressSupplier can supply the reconnect address. @@ -118,8 +206,8 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { */ public ConnectionWatchdog(Delay reconnectDelay, ClientOptions clientOptions, Bootstrap bootstrap, Timer timer, EventExecutorGroup reconnectWorkers, Mono socketAddressSupplier, - ReconnectionListener reconnectionListener, ConnectionFacade connectionFacade, EventBus eventBus, - Endpoint endpoint) { + ReconnectionListener reconnectionListener, ConnectionFacade connectionFacade, EventBus eventBus, Endpoint endpoint, + ConnectionMonitor connectionMonitor) { LettuceAssert.notNull(reconnectDelay, "Delay must not be null"); LettuceAssert.notNull(clientOptions, "ClientOptions must not be null"); @@ -141,6 +229,7 @@ public ConnectionWatchdog(Delay reconnectDelay, ClientOptions clientOptions, Boo this.eventBus = eventBus; this.redisUri = (String) bootstrap.config().attrs().get(ConnectionBuilder.REDIS_URI); this.epid = endpoint.getId(); + this.connectionMonitor = connectionMonitor; Mono wrappedSocketAddressSupplier = socketAddressSupplier.doOnNext(addr -> remoteAddress = addr) .onErrorResume(t -> { @@ -172,24 +261,51 @@ void prepareClose() { reconnectScheduleTimeout.cancel(); } + // connection is closing, + // reset attempts & record record the time of completion if in middle of reconnect + Attempts currentAttempts = this.attempts; + this.attempts = new Attempts(); + if (currentAttempts.getDisconnected() > 0) { + // reconnected after disconnect + currentAttempts.completed(System.nanoTime()); + + connectionMonitor.recordDisconnectedTime(epid, currentAttempts.getCompleted() - currentAttempts.getDisconnected()); + } + reconnectionHandler.prepareClose(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { - CommandHandler commandHandler = ctx.pipeline().get(CommandHandler.class); - reconnectSchedulerSync.set(false); channel = ctx.channel(); reconnectScheduleTimeout = null; logPrefix = null; remoteAddress = channel.remoteAddress(); - attempts = 0; + + // attempts = 0; + // reconnected successfully + // reset attempts & record record the time of completion + final Attempts currentAttempts = this.attempts; + this.attempts = new Attempts(); + if (currentAttempts.getAttempts() > 0) { + // reconnected after disconnect + currentAttempts.completed(System.nanoTime()); + connectionMonitor.recordDisconnectedTime(epid, currentAttempts.getCompleted() - currentAttempts.getDisconnected()); + } + resetReconnectDelay(); logPrefix = null; logger.debug("{} channelActive()", logPrefix()); + // todo : Configurable enable disable proactive reconnect + ChannelPipeline pipeline = ctx.channel().pipeline(); + PubSubCommandHandler commandHandler = pipeline.get(PubSubCommandHandler.class); + if (commandHandler != null) { + commandHandler.getEndpoint().addListener(this); + } + super.channelActive(ctx); } @@ -205,6 +321,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { channel = null; if (listenOnChannelInactive && !reconnectionHandler.isReconnectSuspended()) { + attempts.disconnected(System.nanoTime()); scheduleReconnect(); } else { logger.debug("{} Reconnect scheduling disabled", logPrefix(), ctx); @@ -239,9 +356,11 @@ public void scheduleReconnect() { } if ((channel == null || !channel.isActive()) && reconnectSchedulerSync.compareAndSet(false, true)) { + // attempts++; + // final int attempt = attempts; + final int attempt = attempts.incrementAndGet(); + connectionMonitor.incrementReconnectionAttempts(epid); - attempts++; - final int attempt = attempts; Duration delay = reconnectDelay.createDelay(attempt); int timeout = (int) delay.toMillis(); logger.debug("{} Reconnect attempt {}, delay {}ms", logPrefix(), attempt, timeout); @@ -275,7 +394,6 @@ public void scheduleReconnect() { * the same handler instances contained in the old channel's pipeline. * * @param attempt attempt counter - * * @throws Exception when reconnection fails. */ public void run(int attempt) throws Exception { @@ -288,7 +406,6 @@ public void run(int attempt) throws Exception { * * @param attempt attempt counter. * @param delay retry delay. - * * @throws Exception when reconnection fails. */ private void run(int attempt, Duration delay) throws Exception { @@ -330,12 +447,15 @@ private void run(int attempt, Duration delay) throws Exception { eventBus.publish(new ReconnectAttemptEvent(redisUri, epid, LocalAddress.ANY, remoteAddress, attempt, delay)); logger.log(infoLevel, "Reconnecting, last destination was {}", remoteAddress); - Tuple2, CompletableFuture> tuple = reconnectionHandler.reconnect(); + Tuple2, CompletableFuture> tuple = rebindAddress == null + ? reconnectionHandler.reconnect() + : reconnectionHandler.reconnect(rebindAddress); CompletableFuture future = tuple.getT1(); future.whenComplete((c, t) -> { if (c != null && t == null) { + this.channel.attr(REBIND_ATTRIBUTE).set(null); return; } @@ -436,4 +556,61 @@ private String logPrefix() { return logPrefix = buffer; } + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + + if (ctx.channel() != null && ctx.channel().isActive() && ctx.channel().hasAttr(REBIND_ATTRIBUTE) + && ctx.channel().attr(REBIND_ATTRIBUTE).get() == RebindState.COMPLETED) { + logger.debug("Disconnecting at {}", LocalTime.now()); + ctx.channel().close().awaitUninterruptibly(); + // FIXME this is currently only used to notify in an awkward way the ChannelExpiryWriter + eventBus.publish(new RebindCompletedEvent()); + } + + super.channelReadComplete(ctx); + } + + @Override + public void onPushMessage(PushMessage message) { + if (!message.getType().equals("message")) { + return; + } + + List content = message.getContent().stream() + .map(ez -> ez instanceof ByteBuffer ? StringCodec.UTF8.decodeKey((ByteBuffer) ez) : ez.toString()) + .collect(Collectors.toList()); + + if (content.stream().anyMatch(c -> c.contains("type=rebind"))) { + logger.info("Attempting to rebind to new endpoint '{}'", getRemoteAddress(content)); + + channel.attr(REBIND_ATTRIBUTE).set(RebindState.STARTED); + this.rebindAddress = getRemoteAddress(content); + + ChannelPipeline pipeline = channel.pipeline(); + PubSubCommandHandler commandHandler = pipeline.get(PubSubCommandHandler.class); + if (commandHandler.getStack().isEmpty()) { + channel.close().awaitUninterruptibly(); + channel.attr(REBIND_ATTRIBUTE).set(RebindState.COMPLETED); + } else { + // FIXME this is currently only used to notify in an awkward way the ChannelExpiryWriter + eventBus.publish(new RebindInitiatedEvent()); + } + } + } + + private SocketAddress getRemoteAddress(List messageContents) { + + final String payload = messageContents.stream().filter(c -> c.contains("to_ep")).findFirst() + .orElse("type=rebind;from_ep=localhost:6479;to_ep=localhost:6379;until_s=10"); + + final String toEndpoint = Arrays.stream(payload.split(";")).filter(c -> c.contains("to_ep")).findFirst() + .orElse("to_ep=localhost:6479"); + + final String addressAndPort = toEndpoint.split("=")[1]; + final String address = addressAndPort.split(":")[0]; + final int port = Integer.parseInt(addressAndPort.split(":")[1]); + + return new InetSocketAddress(address, port); + } + } diff --git a/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java b/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java index 79f2f05f16..6adafac71b 100644 --- a/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java +++ b/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java @@ -20,6 +20,7 @@ package io.lettuce.core.protocol; import static io.lettuce.core.protocol.CommandHandler.*; +import static io.lettuce.core.protocol.ConnectionWatchdog.REBIND_ATTRIBUTE; import java.io.IOException; import java.nio.channels.ClosedChannelException; @@ -45,6 +46,9 @@ import io.lettuce.core.internal.Futures; import io.lettuce.core.internal.LettuceAssert; import io.lettuce.core.internal.LettuceFactories; +import io.lettuce.core.rebind.RebindState; +import io.lettuce.core.metrics.EndpointQueueMonitor; +import io.lettuce.core.metrics.EndpointQueueMonitor.QueueId; import io.lettuce.core.resource.ClientResources; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; @@ -148,6 +152,17 @@ public DefaultEndpoint(ClientOptions clientOptions, ClientResources clientResour this.boundedQueues = clientOptions.getRequestQueueSize() != Integer.MAX_VALUE; this.rejectCommandsWhileDisconnected = isRejectCommand(clientOptions); this.cachedEndpointId = "0x" + Long.toHexString(endpointId); + + // Commands submitted netty queue but still not written to the channel + EndpointQueueMonitor endpointQueueMonitor = clientResources.endpointQueueMonitor(); + if (endpointQueueMonitor != null) { + endpointQueueMonitor.observeQueueSize(QueueId.create("lettuce.endpoint.command.queue", getId()), + () -> QUEUE_SIZE.get(this)); + endpointQueueMonitor.observeQueueSize(QueueId.create("lettuce.endpoint.disconnected.buffer", getId()), + disconnectedBuffer::size); + endpointQueueMonitor.observeQueueSize(QueueId.create("lettuce.endpoint.command.buffer", getId()), + commandBuffer::size); + } } @Override @@ -812,7 +827,16 @@ private void cancelCommands(String message, Iterable s return; } - if (sentCommands != null) { - - boolean foundToSend = false; - - for (RedisCommand command : sentCommands) { - if (!command.isDone()) { - foundToSend = true; - break; - } - } - - if (!foundToSend) { - return; - } + if (sentCommands != null && sentCommands.stream().allMatch(RedisCommand::isDone)) { + return; } if (channel != null) { diff --git a/src/main/java/io/lettuce/core/protocol/Endpoint.java b/src/main/java/io/lettuce/core/protocol/Endpoint.java index aa55ac43b7..201ee0dd9b 100644 --- a/src/main/java/io/lettuce/core/protocol/Endpoint.java +++ b/src/main/java/io/lettuce/core/protocol/Endpoint.java @@ -1,6 +1,7 @@ package io.lettuce.core.protocol; import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; /** * Wraps a stateful {@link Endpoint} that abstracts the underlying channel. Endpoints may be connected, disconnected and in diff --git a/src/main/java/io/lettuce/core/protocol/ReconnectionHandler.java b/src/main/java/io/lettuce/core/protocol/ReconnectionHandler.java index 8bf183b426..04ab026c42 100644 --- a/src/main/java/io/lettuce/core/protocol/ReconnectionHandler.java +++ b/src/main/java/io/lettuce/core/protocol/ReconnectionHandler.java @@ -114,6 +114,16 @@ protected Tuple2, CompletableFuture> r return Tuples.of(future, address); } + protected Tuple2, CompletableFuture> reconnect(SocketAddress remoteAddress) { + CompletableFuture future = new CompletableFuture<>(); + CompletableFuture address = new CompletableFuture<>(); + + reconnect0(future, remoteAddress); + + this.currentFuture = future; + return Tuples.of(future, address); + } + private void reconnect0(CompletableFuture result, SocketAddress remoteAddress) { ChannelHandler handler = bootstrap.config().handler(); diff --git a/src/main/java/io/lettuce/core/rebind/RebindCompletedEvent.java b/src/main/java/io/lettuce/core/rebind/RebindCompletedEvent.java new file mode 100644 index 0000000000..0ce22b44aa --- /dev/null +++ b/src/main/java/io/lettuce/core/rebind/RebindCompletedEvent.java @@ -0,0 +1,27 @@ +/* + * Copyright 2025, Redis Ltd. and Contributors + * All rights reserved. + * + * Licensed under the MIT License. + * + * This file contains contributions from third-party contributors + * licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.lettuce.core.rebind; + +import io.lettuce.core.event.Event; + +public class RebindCompletedEvent implements Event { + +} diff --git a/src/main/java/io/lettuce/core/rebind/RebindInitiatedEvent.java b/src/main/java/io/lettuce/core/rebind/RebindInitiatedEvent.java new file mode 100644 index 0000000000..77e7c07f8f --- /dev/null +++ b/src/main/java/io/lettuce/core/rebind/RebindInitiatedEvent.java @@ -0,0 +1,29 @@ +/* + * Copyright 2025, Redis Ltd. and Contributors + * All rights reserved. + * + * Licensed under the MIT License. + * + * This file contains contributions from third-party contributors + * licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.lettuce.core.rebind; + +import io.lettuce.core.event.Event; + +import java.net.SocketAddress; + +public class RebindInitiatedEvent implements Event { + +} diff --git a/src/main/java/io/lettuce/core/rebind/RebindState.java b/src/main/java/io/lettuce/core/rebind/RebindState.java new file mode 100644 index 0000000000..3c6530f7e0 --- /dev/null +++ b/src/main/java/io/lettuce/core/rebind/RebindState.java @@ -0,0 +1,12 @@ +/* + * Copyright 2025, Redis Ltd. and Contributors + * All rights reserved. + * + * Licensed under the MIT License. + */ + +package io.lettuce.core.rebind; + +public enum RebindState { + STARTED, COMPLETED +} diff --git a/src/main/java/io/lettuce/core/resource/ClientResources.java b/src/main/java/io/lettuce/core/resource/ClientResources.java index 4d82d4ac08..ddf4e850d0 100644 --- a/src/main/java/io/lettuce/core/resource/ClientResources.java +++ b/src/main/java/io/lettuce/core/resource/ClientResources.java @@ -27,6 +27,8 @@ import io.lettuce.core.metrics.CommandLatencyCollector; import io.lettuce.core.metrics.CommandLatencyCollectorOptions; import io.lettuce.core.metrics.CommandLatencyRecorder; +import io.lettuce.core.metrics.ConnectionMonitor; +import io.lettuce.core.metrics.EndpointQueueMonitor; import io.lettuce.core.tracing.Tracing; import io.netty.resolver.AddressResolverGroup; import io.netty.util.Timer; @@ -130,6 +132,10 @@ default Builder commandLatencyCollector(CommandLatencyCollector commandLatencyCo */ Builder commandLatencyRecorder(CommandLatencyRecorder latencyRecorder); + Builder connectionMonitor(ConnectionMonitor connectionMonitor); + + Builder endpointQueueMonitor(EndpointQueueMonitor queueMonitor); + /** * Sets the {@link CommandLatencyCollectorOptions} that can be used across different instances of the RedisClient. The * options are only effective if no {@code commandLatencyCollector} is provided. @@ -349,6 +355,10 @@ default Builder commandLatencyCollector(CommandLatencyCollector commandLatencyCo */ CommandLatencyRecorder commandLatencyRecorder(); + ConnectionMonitor connectionMonitor(); + + EndpointQueueMonitor endpointQueueMonitor(); + /** * Return the pool size (number of threads) for all computation tasks. * diff --git a/src/main/java/io/lettuce/core/resource/DefaultClientResources.java b/src/main/java/io/lettuce/core/resource/DefaultClientResources.java index a165978df6..4fd6539af2 100644 --- a/src/main/java/io/lettuce/core/resource/DefaultClientResources.java +++ b/src/main/java/io/lettuce/core/resource/DefaultClientResources.java @@ -33,9 +33,11 @@ import io.lettuce.core.metrics.CommandLatencyCollector; import io.lettuce.core.metrics.CommandLatencyCollectorOptions; import io.lettuce.core.metrics.CommandLatencyRecorder; +import io.lettuce.core.metrics.ConnectionMonitor; import io.lettuce.core.metrics.DefaultCommandLatencyCollector; import io.lettuce.core.metrics.DefaultCommandLatencyCollectorOptions; import io.lettuce.core.metrics.MetricCollector; +import io.lettuce.core.metrics.EndpointQueueMonitor; import io.lettuce.core.resource.Delay.StatefulDelay; import io.lettuce.core.tracing.Tracing; import io.netty.resolver.AddressResolverGroup; @@ -131,6 +133,10 @@ public class DefaultClientResources implements ClientResources { private final CommandLatencyRecorder commandLatencyRecorder; + private final ConnectionMonitor connectionMonitor; + + private final EndpointQueueMonitor endpointQueueMonitor; + private final boolean sharedCommandLatencyRecorder; private final EventPublisherOptions commandLatencyPublisherOptions; @@ -220,6 +226,9 @@ protected DefaultClientResources(Builder builder) { eventBus = builder.eventBus; } + connectionMonitor = builder.connectionMonitor; + endpointQueueMonitor = builder.queueMonitor; + if (builder.commandLatencyRecorder == null) { if (DefaultCommandLatencyCollector.isAvailable()) { if (builder.commandLatencyCollectorOptions != null) { @@ -335,6 +344,10 @@ public static class Builder implements ClientResources.Builder { private Runnable afterBuild; + private ConnectionMonitor connectionMonitor = ConnectionMonitor.disabled(); + + private EndpointQueueMonitor queueMonitor = EndpointQueueMonitor.disabled(); + private Builder() { } @@ -408,6 +421,20 @@ public Builder commandLatencyCollectorOptions(CommandLatencyCollectorOptions com return this; } + @Override + public Builder connectionMonitor(ConnectionMonitor connectionMonitor) { + LettuceAssert.notNull(connectionMonitor, "ConnectionMonitor must not be null"); + this.connectionMonitor = connectionMonitor; + return this; + } + + @Override + public Builder endpointQueueMonitor(EndpointQueueMonitor queueMonitor) { + LettuceAssert.notNull(queueMonitor, "QueueMonitor must not be null"); + this.queueMonitor = queueMonitor; + return this; + } + /** * Sets the {@link CommandLatencyRecorder} that can be used across different instances of the RedisClient. * @@ -771,6 +798,16 @@ public Future shutdown(long quietPeriod, long timeout, TimeUnit timeUni return PromiseAdapter.toBooleanPromise(voidPromise); } + @Override + public ConnectionMonitor connectionMonitor() { + return connectionMonitor; + } + + @Override + public EndpointQueueMonitor endpointQueueMonitor() { + return endpointQueueMonitor; + } + @Override public CommandLatencyRecorder commandLatencyRecorder() { return commandLatencyRecorder; diff --git a/src/test/java/biz/paluch/redis/extensibility/LettuceRebindDemo.java b/src/test/java/biz/paluch/redis/extensibility/LettuceRebindDemo.java new file mode 100644 index 0000000000..2898dd4795 --- /dev/null +++ b/src/test/java/biz/paluch/redis/extensibility/LettuceRebindDemo.java @@ -0,0 +1,125 @@ +package biz.paluch.redis.extensibility; + +import io.lettuce.core.ClientOptions; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisURI; +import io.lettuce.core.TimeoutOptions; +import io.lettuce.core.api.push.PushListener; +import io.lettuce.core.api.push.PushMessage; +import io.lettuce.core.codec.StringCodec; +import io.lettuce.core.event.EventBus; +import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; +import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +public class LettuceRebindDemo { + + public static final Logger logger = Logger.getLogger(LettuceRebindDemo.class.getName()); + + public static final String KEY = "rebind:" + UUID.randomUUID().getLeastSignificantBits(); + + public static void main(String[] args) throws ExecutionException, InterruptedException { + + // NEW! No need for a custom handler + + TimeoutOptions timeoutOpts = TimeoutOptions.builder().timeoutCommands().fixedTimeout(Duration.ofMillis(50)) + // NEW! control that during timeouts we need to relax the timeouts + .proactiveTimeoutsRelaxing(Duration.ofMillis(500)).build(); + ClientOptions options = ClientOptions.builder().timeoutOptions(timeoutOpts).build(); + + RedisClient redisClient = RedisClient.create(RedisURI.Builder.redis("localhost", 6379).build()); + redisClient.setOptions(options); + + // Monitor connection events + EventBus eventBus = redisClient.getResources().eventBus(); + eventBus.get().subscribe(e -> { + logger.info(">>> Event bus received: {} " + e); + }); + + // Subscribe to __rebind channel (REMOVE ONCE WE START RECEIVING THESE WITHOUT SUBSCRIPTION) + StatefulRedisPubSubConnection redis = redisClient.connectPubSub(); + RedisPubSubAsyncCommands commands = redis.async(); + commands.subscribe("__rebind").get(); + + // Used to stop the demo by sending the following command: + // publish __rebind "type=stop_demo" + Control control = new Control(); + redis.addListener(control); + + // Used to initiate the proactive rebind by sending the following command + // publish __rebind "type=rebind;from_ep=localhost:6379;to_ep=localhost:6479;until_s=10" + + ExecutorService executorService = new ThreadPoolExecutor(5, // core pool size + 10, // maximum pool size + 60, TimeUnit.SECONDS, // idle thread keep-alive time + new ArrayBlockingQueue<>(20), // work queue size + new ThreadPoolExecutor.DiscardPolicy()); // rejection policy + + try { + while (control.shouldContinue) { + executorService.execute(new DemoWorker(commands)); + Thread.sleep(1); + } + + if (executorService.awaitTermination(5, TimeUnit.SECONDS)) { + logger.info("Executor service terminated"); + } else { + logger.warning("Executor service did not terminate in the specified time"); + } + + } finally { + executorService.shutdownNow(); + } + + redis.close(); + redisClient.shutdown(); + } + + static class DemoWorker implements Runnable { + + private final RedisPubSubAsyncCommands commands; + + public DemoWorker(RedisPubSubAsyncCommands commands) { + this.commands = commands; + } + + @Override + public void run() { + try { + commands.incr(KEY).get(); + } catch (InterruptedException | ExecutionException e) { + logger.severe("ExecutionException: " + e.getMessage()); + } + } + + } + + static class Control implements PushListener { + + public boolean shouldContinue = true; + + @Override + public void onPushMessage(PushMessage message) { + List content = message.getContent().stream().filter(ez -> ez instanceof ByteBuffer) + .map(ez -> StringCodec.UTF8.decodeKey((ByteBuffer) ez)).collect(Collectors.toList()); + + if (content.stream().anyMatch(c -> c.contains("type=stop_demo"))) { + logger.info("Control received message to stop the demo"); + shouldContinue = false; + } + } + + } + +} diff --git a/src/test/java/io/lettuce/core/ConnectionMonitorIntegrationTests.java b/src/test/java/io/lettuce/core/ConnectionMonitorIntegrationTests.java new file mode 100644 index 0000000000..712f799a4b --- /dev/null +++ b/src/test/java/io/lettuce/core/ConnectionMonitorIntegrationTests.java @@ -0,0 +1,95 @@ +package io.lettuce.core; + +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.sync.RedisCommands; +import io.lettuce.core.metrics.MicrometerConnectionMonitor; +import io.lettuce.core.metrics.MicrometerOptions; +import io.lettuce.core.resource.ClientResources; +import io.lettuce.test.LettuceExtension; +import io.lettuce.test.Wait; +import io.lettuce.test.resource.TestClientResources; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +import static io.lettuce.TestTags.INTEGRATION_TEST; +import static io.lettuce.core.metrics.MicrometerConnectionMonitor.METRIC_RECONNECTION_INACTIVE_TIME; +import static io.lettuce.core.metrics.MicrometerConnectionMonitor.METRIC_RECONNECTION_ATTEMPTS; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Mark Paluch + */ +@Tag(INTEGRATION_TEST) +@ExtendWith(LettuceExtension.class) +class ConnectionMonitorIntegrationTests extends TestSupport { + + private final MeterRegistry meterRegistry = new SimpleMeterRegistry(); + + private final ClientResources clientResources = TestClientResources.get(); + + @Test + void connectionInactiveTime() { + + MicrometerOptions options = MicrometerOptions.create(); + MicrometerConnectionMonitor monitor = new MicrometerConnectionMonitor(meterRegistry, options); + ClientResources resources = clientResources.mutate().connectionMonitor(monitor).build(); + + RedisClient client = RedisClient.create(resources, RedisURI.Builder.redis(host, port).build()); + + StatefulRedisConnection connection = client.connect(); + + // Force disconnection + connection.sync().quit(); + Wait.untilTrue(() -> !connection.isOpen()).during(Duration.ofSeconds(1)).waitOrTimeout(); + + // Wait for successful reconnection + Wait.untilTrue(connection::isOpen).during(Duration.ofSeconds(1)).waitOrTimeout(); + + // At least one reconnect attempt + assertThat(meterRegistry.find(METRIC_RECONNECTION_ATTEMPTS).counter().count()).isGreaterThanOrEqualTo(1); + + assertThat(meterRegistry.find(METRIC_RECONNECTION_INACTIVE_TIME).timers()).isNotEmpty(); + assertThat(meterRegistry.find(METRIC_RECONNECTION_INACTIVE_TIME).timer().count()).isEqualTo(1); + double totalTime = meterRegistry.find(METRIC_RECONNECTION_INACTIVE_TIME).timer().totalTime(TimeUnit.NANOSECONDS); + assertThat(totalTime).isGreaterThan(0); + + connection.close(); + + assertThat(meterRegistry.find(METRIC_RECONNECTION_INACTIVE_TIME).timer().count()).isEqualTo(1); + assertThat(meterRegistry.find(METRIC_RECONNECTION_INACTIVE_TIME).timer().totalTime(TimeUnit.NANOSECONDS)) + .isEqualTo(totalTime); + + } + + @Test + void connectionInactiveTimeAutoReconnectDisabled() { + + MicrometerOptions options = MicrometerOptions.create(); + MicrometerConnectionMonitor monitor = new MicrometerConnectionMonitor(meterRegistry, options); + ClientResources resources = clientResources.mutate().connectionMonitor(monitor).build(); + + ClientOptions clientOptions = ClientOptions.builder().autoReconnect(false).build(); + + RedisClient client = RedisClient.create(resources, RedisURI.Builder.redis(host, port).build()); + client.setOptions(clientOptions); + + try (StatefulRedisConnection connection = client.connect()) { + RedisCommands redis = connection.sync(); + + // Force disconnection + redis.quit(); + Wait.untilTrue(() -> !connection.isOpen()).during(Duration.ofSeconds(1)).waitOrTimeout(); + } + + // Connection is closed, with auto-reconnect disabled, no metrics are recorded + assertThat(meterRegistry.find(METRIC_RECONNECTION_ATTEMPTS).counters()).isEmpty(); + assertThat(meterRegistry.find(METRIC_RECONNECTION_INACTIVE_TIME).timers()).isEmpty(); + } + +} diff --git a/src/test/java/io/lettuce/core/EndpointQueueMonitorIntegrationTests.java b/src/test/java/io/lettuce/core/EndpointQueueMonitorIntegrationTests.java new file mode 100644 index 0000000000..a8a6933992 --- /dev/null +++ b/src/test/java/io/lettuce/core/EndpointQueueMonitorIntegrationTests.java @@ -0,0 +1,111 @@ +package io.lettuce.core; + +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.async.RedisAsyncCommands; +import io.lettuce.core.metrics.EndpointQueueMonitor; +import io.lettuce.core.metrics.MicrometerOptions; +import io.lettuce.core.metrics.MicrometerQueueMonitor; +import io.lettuce.core.resource.ClientResources; +import io.lettuce.test.LettuceExtension; +import io.lettuce.test.Wait; +import io.lettuce.test.resource.TestClientResources; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.time.Duration; +import java.util.concurrent.ExecutionException; + +import static io.lettuce.TestTags.INTEGRATION_TEST; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Mark Paluch + */ +@Tag(INTEGRATION_TEST) +@ExtendWith(LettuceExtension.class) +class EndpointQueueMonitorIntegrationTests extends TestSupport { + + private final static MeterRegistry meterRegistry = new SimpleMeterRegistry(); + + private static RedisClient client; + + private static ClientResources resources; + + @BeforeAll + static void beforeAll() { + MicrometerOptions options = MicrometerOptions.create(); + EndpointQueueMonitor monitor = new MicrometerQueueMonitor(meterRegistry, options); + resources = TestClientResources.get().mutate().endpointQueueMonitor(monitor).build(); + client = RedisClient.create(resources, RedisURI.Builder.redis(host, port).build()); + } + + @BeforeEach + void setUp() { + client.setOptions(ClientOptions.builder().build()); + } + + @AfterEach + void tearDown() { + meterRegistry.clear(); + } + + @Test + void queueMonitorDisconnectedBuffer() { + + ClientOptions clientOptions = ClientOptions.builder().autoReconnect(false) + .disconnectedBehavior(ClientOptions.DisconnectedBehavior.ACCEPT_COMMANDS).build(); + client.setOptions(clientOptions); + + try (StatefulRedisConnection connection = client.connect()) { + RedisAsyncCommands redis = connection.async(); + + // Force disconnection + redis.quit(); + Wait.untilTrue(() -> !connection.isOpen()).during(Duration.ofSeconds(1)).waitOrTimeout(); + + redis.set("key", "value"); + + assertThat(meterRegistry.find("lettuce.endpoint.disconnected.buffer").gauge().value()).isEqualTo(1); + } + } + + @Test + void queueMonitorCommandsBuffer() { + + RedisClient client = RedisClient.create(resources, RedisURI.Builder.redis(host, port).build()); + + try (StatefulRedisConnection connection = client.connect()) { + RedisAsyncCommands redis = connection.async(); + redis.setAutoFlushCommands(false); + redis.set("key", "value"); + + assertThat(meterRegistry.find("lettuce.endpoint.command.buffer").gauge().value()).isEqualTo(1); + redis.flushCommands(); + assertThat(meterRegistry.find("lettuce.endpoint.command.buffer").gauge().value()).isEqualTo(0); + } + } + + @Test + void queueMonitorCommandHandlerStackSize() throws ExecutionException, InterruptedException { + + try (StatefulRedisConnection connection = client.connect(); + StatefulRedisConnection connection2 = client.connect()) { + RedisAsyncCommands redis = connection.async(); + RedisFuture> blpop = redis.blpop(1, "blpop:key"); + assertThat(meterRegistry.find("lettuce.command.handler.queue").gauge().value()).isEqualTo(1); + + Long lpush = connection2.sync().lpush("blpop:key", "value"); + assertThat(lpush).isEqualTo(1); + assertThat(blpop.get()).isNotNull(); + + assertThat(meterRegistry.find("lettuce.command.handler.queue").gauge().value()).isEqualTo(0); + } + } + +} diff --git a/src/test/java/io/lettuce/core/metrics/MicrometerConnectionMonitorUnitTest.java b/src/test/java/io/lettuce/core/metrics/MicrometerConnectionMonitorUnitTest.java new file mode 100644 index 0000000000..a78e96b9cf --- /dev/null +++ b/src/test/java/io/lettuce/core/metrics/MicrometerConnectionMonitorUnitTest.java @@ -0,0 +1,62 @@ +/* + * Copyright 2011-Present, Redis Ltd. and Contributors + * All rights reserved. + * + * Licensed under the MIT License. + * + * This file contains contributions from third-party contributors + * licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core.metrics; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tags; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import static io.lettuce.TestTags.UNIT_TEST; +import static io.lettuce.core.metrics.MicrometerConnectionMonitor.LABEL_EPID; +import static io.lettuce.core.metrics.MicrometerConnectionMonitor.METRIC_RECONNECTION_INACTIVE_TIME; +import static org.assertj.core.api.Assertions.assertThat; + +@Tag(UNIT_TEST) +class MicrometerConnectionMonitorUnitTest { + + private final MeterRegistry meterRegistry = new SimpleMeterRegistry(); + + @Test + void disabled() { + + MicrometerOptions options = MicrometerOptions.disabled(); + MicrometerConnectionMonitor monitor = new MicrometerConnectionMonitor(meterRegistry, options); + + monitor.recordDisconnectedTime("1", 1); + assertThat(meterRegistry.find(METRIC_RECONNECTION_INACTIVE_TIME).timers()).isEmpty(); + assertThat(meterRegistry.find(METRIC_RECONNECTION_INACTIVE_TIME).timers()).isEmpty(); + } + + @Test + void tags() { + + Tags tags = Tags.of("app", "foo"); + MicrometerOptions options = MicrometerOptions.builder().tags(tags).build(); + MicrometerConnectionMonitor monitor = new MicrometerConnectionMonitor(meterRegistry, options); + + monitor.recordDisconnectedTime("1", 1); + + assertThat(meterRegistry.find(METRIC_RECONNECTION_INACTIVE_TIME).tags(tags).timers()).hasSize(1); + assertThat(meterRegistry.find(METRIC_RECONNECTION_INACTIVE_TIME).tag(LABEL_EPID, "1").timers()).hasSize(1); + } + +} diff --git a/src/test/resources/log4j2-test.xml b/src/test/resources/log4j2-test.xml index 981918b55e..40d2e514e2 100644 --- a/src/test/resources/log4j2-test.xml +++ b/src/test/resources/log4j2-test.xml @@ -11,7 +11,7 @@ - +