Skip to content

Add metrics for tracking total disconnected time and reconnection attempts #3220

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
Draft
2 changes: 1 addition & 1 deletion src/main/java/io/lettuce/core/ConnectionBuilder.java
Original file line number Diff line number Diff line change
@@ -155,7 +155,7 @@ protected ConnectionWatchdog createConnectionWatchdog() {

ConnectionWatchdog watchdog = new ConnectionWatchdog(clientResources.reconnectDelay(), clientOptions, bootstrap,
clientResources.timer(), clientResources.eventExecutorGroup(), socketAddressSupplier, reconnectionListener,
connection, clientResources.eventBus(), endpoint);
connection, clientResources.eventBus(), endpoint, clientResources.connectionMonitor());

endpoint.registerConnectionWatchdog(watchdog);

39 changes: 37 additions & 2 deletions src/main/java/io/lettuce/core/TimeoutOptions.java
Original file line number Diff line number Diff line change
@@ -24,18 +24,26 @@
@SuppressWarnings("serial")
public class TimeoutOptions implements Serializable {

public static final Duration DISABLED_TIMEOUT = Duration.ZERO.minusSeconds(1);

public static final boolean DEFAULT_TIMEOUT_COMMANDS = false;

public static final Duration DEFAULT_RELAXED_TIMEOUT = DISABLED_TIMEOUT;

private final boolean timeoutCommands;

private final boolean applyConnectionTimeout;

private final Duration relaxedTimeout;

private final TimeoutSource source;

private TimeoutOptions(boolean timeoutCommands, boolean applyConnectionTimeout, TimeoutSource source) {
private TimeoutOptions(boolean timeoutCommands, boolean applyConnectionTimeout, TimeoutSource source,
Duration relaxedTimeout) {

this.timeoutCommands = timeoutCommands;
this.applyConnectionTimeout = applyConnectionTimeout;
this.relaxedTimeout = relaxedTimeout;
this.source = source;
}

@@ -84,6 +92,8 @@ public static class Builder {

private boolean applyConnectionTimeout = false;

private Duration relaxedTimeout = DEFAULT_RELAXED_TIMEOUT;

private TimeoutSource source;

/**
@@ -107,6 +117,24 @@ public Builder timeoutCommands(boolean enabled) {
return this;
}

/**
* Enable proactive timeout relaxing. Disabled by default, see {@link #DEFAULT_RELAXED_TIMEOUT}.
* <p/>
* If the Redis server supports this, the client could listen to notifications that the current endpoint is about to go
* down as part of some maintenance activity, for example. In such cases, the driver could extend the existing timeout
* settings for existing commands to make sure they do not time out during this process either as part of the offline
* buffer or while waiting for a reply.
*
* @param duration {@link Duration} to relax timeouts proactively, must not be {@code null}.
* @return {@code this}
*/
public Builder proactiveTimeoutsRelaxing(Duration duration) {
LettuceAssert.notNull(duration, "Duration must not be null");

this.relaxedTimeout = duration;
return this;
}

/**
* Set a fixed timeout for all commands.
*
@@ -158,7 +186,7 @@ public TimeoutOptions build() {
}
}

return new TimeoutOptions(timeoutCommands, applyConnectionTimeout, source);
return new TimeoutOptions(timeoutCommands, applyConnectionTimeout, source, relaxedTimeout);
}

}
@@ -177,6 +205,13 @@ public boolean isApplyConnectionTimeout() {
return applyConnectionTimeout;
}

/**
* @return the {@link Duration} to relax timeouts proactively, {@link #DISABLED_TIMEOUT} if disabled.
*/
public Duration getRelaxedTimeout() {
return relaxedTimeout;
}

/**
* @return the timeout source to determine the timeout for a {@link RedisCommand}. Can be {@code null} if
* {@link #isTimeoutCommands()} is {@code false}.
Original file line number Diff line number Diff line change
@@ -1091,6 +1091,7 @@ private CompletionStage<Partitions> fetchPartitions(Iterable<RedisURI> topologyR
getClusterClientOptions().getSocketOptions().getConnectTimeout(), useDynamicRefreshSources());

return topology.thenApply(partitions -> {
logger.debug("Topology Refresh Views: {} ", partitions);

if (partitions.isEmpty()) {
throw new RedisException(String.format("Cannot retrieve initial cluster partitions from initial URIs %s",
@@ -1108,7 +1109,7 @@ private CompletionStage<Partitions> fetchPartitions(Iterable<RedisURI> topologyR
}

topologyRefreshScheduler.activateTopologyRefreshIfNeeded();

logger.debug("Topology Refresh loadedPartitions: {}", loadedPartitions);
return loadedPartitions;
});
}
33 changes: 33 additions & 0 deletions src/main/java/io/lettuce/core/metrics/ConnectionMonitor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.lettuce.core.metrics;

public interface ConnectionMonitor {

static ConnectionMonitor disabled() {

return new ConnectionMonitor() {

@Override
public void recordDisconnectedTime(String epid, long time) {

}

@Override
public void incrementReconnectionAttempts(String epid) {

}

@Override
public boolean isEnabled() {
return false;
}

};
}

void recordDisconnectedTime(String epid, long time);

void incrementReconnectionAttempts(String epid);

boolean isEnabled();

}
76 changes: 76 additions & 0 deletions src/main/java/io/lettuce/core/metrics/EndpointQueueMonitor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package io.lettuce.core.metrics;

import io.lettuce.core.internal.LettuceAssert;

import java.io.Serializable;
import java.util.Objects;
import java.util.function.Supplier;

public interface EndpointQueueMonitor {

class QueueId implements Serializable {

private final String queueName;

private final String epId;

protected QueueId(String name, String epId) {
LettuceAssert.notNull(name, "name must not be null");

this.queueName = name;
this.epId = epId;
}

public static QueueId create(String queueName, String epId) {
return new QueueId(queueName, epId);
}

public String getEpId() {
return epId;
}

public String getQueueName() {
return queueName;
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (!(o instanceof QueueId))
return false;
QueueId queueId = (QueueId) o;
return Objects.equals(queueName, queueId.queueName) && Objects.equals(epId, queueId.epId);
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("[").append("epId=").append(epId).append(']');
return sb.toString();
}

}

static EndpointQueueMonitor disabled() {

return new EndpointQueueMonitor() {

@Override
public void observeQueueSize(QueueId queueId, Supplier<Number> queueSizeSupplier) {

}

@Override
public boolean isEnabled() {
return false;
}

};
}

void observeQueueSize(QueueId queueId, Supplier<Number> queueSizeSupplier);

boolean isEnabled();

}
138 changes: 138 additions & 0 deletions src/main/java/io/lettuce/core/metrics/MicrometerConnectionMonitor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright 2011-Present, Redis Ltd. and Contributors
* All rights reserved.
*
* Licensed under the MIT License.
*
* This file contains contributions from third-party contributors
* licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lettuce.core.metrics;

import io.lettuce.core.internal.LettuceAssert;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/**
* Micrometer implementation for tracking connection metrics.
*
* <ul>
* <li>Time from having connection disconnected till successfully reconnected:
* <ul>
* <li>lettuce.reconnection.inactive.duration</li>
* <li>Description: Measures the time between a connection being disconnected and successfully reconnected.</li>
* </ul>
* </li>
* <li>Number of reconnection attempts:
* <ul>
* <li>lettuce.reconnection.attempts.count</li>
* <li>Description: Tracks the number of reconnection attempts made during a disconnection.</li>
* </ul>
* </li>
* </ul>
*
* @author Ivo Gaydajiev
* @since 6.7
*/
public class MicrometerConnectionMonitor implements ConnectionMonitor {

static final String LABEL_EPID = "epid";

// Track the time between a connection being disconnected and successfully reconnected or closed
public static final String METRIC_RECONNECTION_INACTIVE_TIME = "lettuce.reconnection.inactive.duration";

public static final String METRIC_RECONNECTION_ATTEMPTS = "lettuce.reconnection.attempts";

private final MeterRegistry meterRegistry;

private final MicrometerOptions options;

private final Map<MonitoredConnectionId, Timer> disconnectedTimers = new ConcurrentHashMap<>();

private final Map<MonitoredConnectionId, Counter> reconnectionAttempts = new ConcurrentHashMap<>();

/**
* Create a new {@link MicrometerConnectionMonitor} instance given {@link MeterRegistry} and {@link MicrometerOptions}.
*
* @param meterRegistry
* @param options
*/
public MicrometerConnectionMonitor(MeterRegistry meterRegistry, MicrometerOptions options) {

LettuceAssert.notNull(meterRegistry, "MeterRegistry must not be null");
LettuceAssert.notNull(options, "MicrometerOptions must not be null");

this.meterRegistry = meterRegistry;
this.options = options;
}

@Override
public void recordDisconnectedTime(String epid, long time) {

if (!isEnabled()) {
return;
}

MonitoredConnectionId connectionId = createId(epid);
Timer inavtiveConnectionTimer = disconnectedTimers.computeIfAbsent(connectionId, this::inactiveConnectionTimer);
inavtiveConnectionTimer.record(time, TimeUnit.NANOSECONDS);
}

@Override
public void incrementReconnectionAttempts(String epid) {

if (!isEnabled()) {
return;
}

MonitoredConnectionId connectionId = createId(epid);

Counter recconectionAttemptsCounter = reconnectionAttempts.computeIfAbsent(connectionId, this::reconnectAttempts);
recconectionAttemptsCounter.increment();
}

@Override
public boolean isEnabled() {
return options.isEnabled();
}

private MonitoredConnectionId createId(String epId) {
return MonitoredConnectionId.create(epId);
}

protected Timer inactiveConnectionTimer(MonitoredConnectionId connectionId) {
Timer.Builder timer = Timer.builder(METRIC_RECONNECTION_INACTIVE_TIME)
.description("Time taken for successful reconnection").tag(LABEL_EPID, connectionId.epId())
.tags(options.tags());

if (options.isHistogram()) {
timer.publishPercentileHistogram().publishPercentiles(options.targetPercentiles())
.minimumExpectedValue(options.minLatency()).maximumExpectedValue(options.maxLatency());
}

return timer.register(meterRegistry);
}

protected Counter reconnectAttempts(MonitoredConnectionId connectionId) {
Counter.Builder timer = Counter.builder(METRIC_RECONNECTION_ATTEMPTS).description("Number of reconnection attempts")
.tag(LABEL_EPID, connectionId.epId()).tags(options.tags());

return timer.register(meterRegistry);
}

}
Loading