Skip to content

Commit

Permalink
Add Lifecycle interface to RedisCache and simplify creation and closi…
Browse files Browse the repository at this point in the history
…ng of redis client (stop retrying connections for now).
  • Loading branch information
StFS committed Aug 30, 2023
1 parent f68e63d commit 2a20828
Showing 1 changed file with 52 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.atomic.AtomicBoolean;

import org.eclipse.hono.deviceconnection.common.Cache;
import org.eclipse.hono.util.Lifecycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,7 +41,7 @@
* @param <K> TODO
* @param <V> TODO
*/
public class RedisCache<K, V> implements Cache<K, V> {
public class RedisCache<K, V> implements Cache<K, V>, Lifecycle {

private static final Logger LOG = LoggerFactory.getLogger(RedisCache.class);

Expand All @@ -59,7 +60,7 @@ public class RedisCache<K, V> implements Cache<K, V> {
* @param vertx TODO.
* @param properties TODO.
*/
public RedisCache(final Vertx vertx, final RedisRemoteConfigurationProperties properties) {
private RedisCache(final Vertx vertx, final RedisRemoteConfigurationProperties properties) {

Objects.requireNonNull(vertx);
Objects.requireNonNull(properties);
Expand All @@ -68,9 +69,9 @@ public RedisCache(final Vertx vertx, final RedisRemoteConfigurationProperties pr
this.properties = properties;

LOG.info("Initializing REDIS cache!");
createRedisClient()
.onSuccess( c -> LOG.info("Connected to Redis"))
.onFailure( t -> LOG.error("Could not connect to Redis", t));
//createRedisClient()
// .onSuccess( c -> LOG.info("Connected to Redis"))
// .onFailure( t -> LOG.error("Could not connect to Redis", t));
}

/**
Expand All @@ -90,9 +91,33 @@ public static RedisCache<String, String> from(
return new RedisCache<>(vertx, properties);
}

/**
@Override
public Future<Void> start() {
LOG.info("REDIS: starting cache");
final Promise<Void> promise = Promise.promise();
/*
createRedisClient()
.onSuccess(c -> promise.complete())
.onFailure(promise::fail);
return promise.future();
*/
redis = Redis.createClient(vertx, properties);
redis.connect()
.onSuccess(c -> client = c)
.onFailure(promise::fail);
return promise.future();
}

@Override
public Future<Void> stop() {
LOG.info("REDIS: stopping cache");
redis.close();
return Future.succeededFuture();
}

/*
* Will create a redis client and set up a reconnect handler when there is an exception in the connection.
*/
*
private Future<RedisConnection> createRedisClient() {
final Promise<RedisConnection> promise = Promise.promise();
Expand Down Expand Up @@ -143,6 +168,7 @@ private void attemptReconnect(final int retry) {
vertx.setTimer(backoff, timer -> createRedisClient().onFailure(t -> attemptReconnect(retry + 1)));
}
}
*/

@Override
public Future<JsonObject> checkForCacheAvailability() {
Expand All @@ -151,20 +177,19 @@ public Future<JsonObject> checkForCacheAvailability() {
Objects.requireNonNull(client);

final Promise<JsonObject> promise = Promise.promise();
final RedisAPI redis = RedisAPI.api(client);
redis.ping(List.of())
final RedisAPI api = RedisAPI.api(client);
api.ping(List.of())
.onSuccess(v -> promise.complete(new JsonObject()))
.onFailure(promise::fail);

return promise.future();
}

@Override
public Future<Void> put(final K key, final V value) {
LOG.info("REDIS: put {}={}", key, value);
final Promise<Void> promise = Promise.promise();
final RedisAPI redis = RedisAPI.api(client);
redis.set(List.of(key.toString(), value.toString()))
final RedisAPI api = RedisAPI.api(client);
api.set(List.of(key.toString(), value.toString()))
.onSuccess(v -> promise.complete())
.onFailure(promise::fail);
return promise.future();
Expand All @@ -175,12 +200,12 @@ public Future<Void> put(final K key, final V value, final long lifespan, final T
LOG.info("REDIS: put {}={} ({} {})", key, value, lifespan, lifespanUnit);
final long millis = lifespanUnit.toMillis(lifespan);
final Promise<Void> promise = Promise.promise();
final RedisAPI redis = RedisAPI.api(client);
final RedisAPI api = RedisAPI.api(client);
final List<String> params = new ArrayList<>(List.of(key.toString(), value.toString()));
if (millis > 0) {
params.addAll(List.of("PX", String.valueOf(millis)));
}
redis.set(params)
api.set(params)
.onSuccess(v -> promise.complete())
.onFailure(promise::fail);
return promise.future();
Expand All @@ -190,12 +215,12 @@ public Future<Void> put(final K key, final V value, final long lifespan, final T
public Future<Void> putAll(final Map<? extends K, ? extends V> data) {
LOG.info("REDIS: putAll ({})", data.size());
final Promise<Void> promise = Promise.promise();
final RedisAPI redis = RedisAPI.api(client);
final RedisAPI api = RedisAPI.api(client);
final List<String> keyValues = new ArrayList<>(data.size() * 2);
data.forEach((k, v) -> {
keyValues.add(k.toString());
keyValues.add(v.toString()); });
redis.mset(keyValues)
api.mset(keyValues)
.onSuccess(v -> promise.complete())
.onFailure(promise::fail);
return promise.future();
Expand All @@ -206,16 +231,16 @@ public Future<Void> putAll(final Map<? extends K, ? extends V> data, final long
LOG.info("REDIS: putAll ({}) ({} {})", data.size(), lifespan, lifespanUnit);
final Promise<Void> promise = Promise.promise();
final long millis = lifespanUnit.toMillis(lifespan);
final RedisAPI redis = RedisAPI.api(client);
redis.multi();
final RedisAPI api = RedisAPI.api(client);
api.multi();
data.forEach((k, v) -> {
final List<String> params = new ArrayList<>(List.of(k.toString(), v.toString()));
if (millis > 0) {
params.addAll(List.of("PX", String.valueOf(millis)));
}
redis.set(params);
api.set(params);
});
redis.exec()
api.exec()
.onSuccess(v -> promise.complete())
.onFailure(promise::fail);
return promise.future();
Expand All @@ -225,8 +250,8 @@ public Future<Void> putAll(final Map<? extends K, ? extends V> data, final long
public Future<V> get(final K key) {
LOG.info("REDIS: get {}", key);
final Promise<V> promise = Promise.promise();
final RedisAPI redis = RedisAPI.api(client);
redis.get(key.toString())
final RedisAPI api = RedisAPI.api(client);
api.get(key.toString())
.onSuccess(v -> promise.complete((V) v) )
.onFailure(promise::fail);
return promise.future();
Expand All @@ -237,8 +262,8 @@ public Future<Boolean> remove(final K key, final V value) {
LOG.info("REDIS: remove {}={}", key, value);
//TODO: why is the value being passed here? Do we need to use that?
final Promise<Boolean> promise = Promise.promise();
final RedisAPI redis = RedisAPI.api(client);
redis.del(List.of(key.toString()))
final RedisAPI api = RedisAPI.api(client);
api.del(List.of(key.toString()))
.onSuccess(v -> promise.complete(true))
.onFailure(promise::fail);
return promise.future();
Expand All @@ -249,19 +274,19 @@ public Future<Map<K, V>> getAll(final Set<? extends K> keys) {
LOG.info("REDIS: getAll {}", keys.size());
final Promise<Map<K, V>> promise = Promise.promise();

final RedisAPI redis = RedisAPI.api(client);
final RedisAPI api = RedisAPI.api(client);
// Make sure the keys are in order and we can pop off the front
final LinkedList<String> keyList = new LinkedList<>(keys.stream().map(String::valueOf).toList());
keyList.forEach(i -> LOG.info("REDIS: Item: {}", i));
final Map<K, V> result = new HashMap<>(keyList.size());
redis.mget(keyList)
api.mget(keyList)
.onComplete(v -> {
LOG.info("REDIS: Got {} items back...", v.result().stream().toList().size());
v.result().forEach(i -> {
LOG.info("Iterating through result list: {}", i);
try {
if (i != null) { // TODO: this is kinda strange but some results are null and the BasicCache does not include those in the returned result. Ask about/investigate.
result.put((K) keyList.removeFirst(), i == null ? null : (V) i.toString());
result.put((K) keyList.removeFirst(), (V) i.toString());
}
} catch (Exception e) {
LOG.info(" - got exception {}", e.getMessage());
Expand Down

0 comments on commit 2a20828

Please sign in to comment.