From 2a20828fbf1c9c7a4628b88607ef213fd5620d28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stef=C3=A1n=20Freyr=20Stef=C3=A1nsson?= Date: Wed, 30 Aug 2023 22:43:52 +0000 Subject: [PATCH] Add Lifecycle interface to RedisCache and simplify creation and closing of redis client (stop retrying connections for now). --- .../redis/client/RedisCache.java | 79 ++++++++++++------- 1 file changed, 52 insertions(+), 27 deletions(-) diff --git a/client-device-connection-redis/src/main/java/org/eclipse/hono/deviceconnection/redis/client/RedisCache.java b/client-device-connection-redis/src/main/java/org/eclipse/hono/deviceconnection/redis/client/RedisCache.java index 00aefa95c5..50c5691651 100644 --- a/client-device-connection-redis/src/main/java/org/eclipse/hono/deviceconnection/redis/client/RedisCache.java +++ b/client-device-connection-redis/src/main/java/org/eclipse/hono/deviceconnection/redis/client/RedisCache.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.hono.deviceconnection.common.Cache; +import org.eclipse.hono.util.Lifecycle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +41,7 @@ * @param TODO * @param TODO */ -public class RedisCache implements Cache { +public class RedisCache implements Cache, Lifecycle { private static final Logger LOG = LoggerFactory.getLogger(RedisCache.class); @@ -59,7 +60,7 @@ public class RedisCache implements Cache { * @param vertx TODO. * @param properties TODO. */ - public RedisCache(final Vertx vertx, final RedisRemoteConfigurationProperties properties) { + private RedisCache(final Vertx vertx, final RedisRemoteConfigurationProperties properties) { Objects.requireNonNull(vertx); Objects.requireNonNull(properties); @@ -68,9 +69,9 @@ public RedisCache(final Vertx vertx, final RedisRemoteConfigurationProperties pr this.properties = properties; LOG.info("Initializing REDIS cache!"); - createRedisClient() - .onSuccess( c -> LOG.info("Connected to Redis")) - .onFailure( t -> LOG.error("Could not connect to Redis", t)); + //createRedisClient() + // .onSuccess( c -> LOG.info("Connected to Redis")) + // .onFailure( t -> LOG.error("Could not connect to Redis", t)); } /** @@ -90,9 +91,33 @@ public static RedisCache from( return new RedisCache<>(vertx, properties); } - /** + @Override + public Future start() { + LOG.info("REDIS: starting cache"); + final Promise promise = Promise.promise(); + /* + createRedisClient() + .onSuccess(c -> promise.complete()) + .onFailure(promise::fail); + return promise.future(); + */ + redis = Redis.createClient(vertx, properties); + redis.connect() + .onSuccess(c -> client = c) + .onFailure(promise::fail); + return promise.future(); + } + + @Override + public Future stop() { + LOG.info("REDIS: stopping cache"); + redis.close(); + return Future.succeededFuture(); + } + + /* * Will create a redis client and set up a reconnect handler when there is an exception in the connection. - */ + * private Future createRedisClient() { final Promise promise = Promise.promise(); @@ -143,6 +168,7 @@ private void attemptReconnect(final int retry) { vertx.setTimer(backoff, timer -> createRedisClient().onFailure(t -> attemptReconnect(retry + 1))); } } + */ @Override public Future checkForCacheAvailability() { @@ -151,11 +177,10 @@ public Future checkForCacheAvailability() { Objects.requireNonNull(client); final Promise promise = Promise.promise(); - final RedisAPI redis = RedisAPI.api(client); - redis.ping(List.of()) + final RedisAPI api = RedisAPI.api(client); + api.ping(List.of()) .onSuccess(v -> promise.complete(new JsonObject())) .onFailure(promise::fail); - return promise.future(); } @@ -163,8 +188,8 @@ public Future checkForCacheAvailability() { public Future put(final K key, final V value) { LOG.info("REDIS: put {}={}", key, value); final Promise promise = Promise.promise(); - final RedisAPI redis = RedisAPI.api(client); - redis.set(List.of(key.toString(), value.toString())) + final RedisAPI api = RedisAPI.api(client); + api.set(List.of(key.toString(), value.toString())) .onSuccess(v -> promise.complete()) .onFailure(promise::fail); return promise.future(); @@ -175,12 +200,12 @@ public Future put(final K key, final V value, final long lifespan, final T LOG.info("REDIS: put {}={} ({} {})", key, value, lifespan, lifespanUnit); final long millis = lifespanUnit.toMillis(lifespan); final Promise promise = Promise.promise(); - final RedisAPI redis = RedisAPI.api(client); + final RedisAPI api = RedisAPI.api(client); final List params = new ArrayList<>(List.of(key.toString(), value.toString())); if (millis > 0) { params.addAll(List.of("PX", String.valueOf(millis))); } - redis.set(params) + api.set(params) .onSuccess(v -> promise.complete()) .onFailure(promise::fail); return promise.future(); @@ -190,12 +215,12 @@ public Future put(final K key, final V value, final long lifespan, final T public Future putAll(final Map data) { LOG.info("REDIS: putAll ({})", data.size()); final Promise promise = Promise.promise(); - final RedisAPI redis = RedisAPI.api(client); + final RedisAPI api = RedisAPI.api(client); final List keyValues = new ArrayList<>(data.size() * 2); data.forEach((k, v) -> { keyValues.add(k.toString()); keyValues.add(v.toString()); }); - redis.mset(keyValues) + api.mset(keyValues) .onSuccess(v -> promise.complete()) .onFailure(promise::fail); return promise.future(); @@ -206,16 +231,16 @@ public Future putAll(final Map data, final long LOG.info("REDIS: putAll ({}) ({} {})", data.size(), lifespan, lifespanUnit); final Promise promise = Promise.promise(); final long millis = lifespanUnit.toMillis(lifespan); - final RedisAPI redis = RedisAPI.api(client); - redis.multi(); + final RedisAPI api = RedisAPI.api(client); + api.multi(); data.forEach((k, v) -> { final List params = new ArrayList<>(List.of(k.toString(), v.toString())); if (millis > 0) { params.addAll(List.of("PX", String.valueOf(millis))); } - redis.set(params); + api.set(params); }); - redis.exec() + api.exec() .onSuccess(v -> promise.complete()) .onFailure(promise::fail); return promise.future(); @@ -225,8 +250,8 @@ public Future putAll(final Map data, final long public Future get(final K key) { LOG.info("REDIS: get {}", key); final Promise promise = Promise.promise(); - final RedisAPI redis = RedisAPI.api(client); - redis.get(key.toString()) + final RedisAPI api = RedisAPI.api(client); + api.get(key.toString()) .onSuccess(v -> promise.complete((V) v) ) .onFailure(promise::fail); return promise.future(); @@ -237,8 +262,8 @@ public Future remove(final K key, final V value) { LOG.info("REDIS: remove {}={}", key, value); //TODO: why is the value being passed here? Do we need to use that? final Promise promise = Promise.promise(); - final RedisAPI redis = RedisAPI.api(client); - redis.del(List.of(key.toString())) + final RedisAPI api = RedisAPI.api(client); + api.del(List.of(key.toString())) .onSuccess(v -> promise.complete(true)) .onFailure(promise::fail); return promise.future(); @@ -249,19 +274,19 @@ public Future> getAll(final Set keys) { LOG.info("REDIS: getAll {}", keys.size()); final Promise> promise = Promise.promise(); - final RedisAPI redis = RedisAPI.api(client); + final RedisAPI api = RedisAPI.api(client); // Make sure the keys are in order and we can pop off the front final LinkedList keyList = new LinkedList<>(keys.stream().map(String::valueOf).toList()); keyList.forEach(i -> LOG.info("REDIS: Item: {}", i)); final Map result = new HashMap<>(keyList.size()); - redis.mget(keyList) + api.mget(keyList) .onComplete(v -> { LOG.info("REDIS: Got {} items back...", v.result().stream().toList().size()); v.result().forEach(i -> { LOG.info("Iterating through result list: {}", i); try { if (i != null) { // TODO: this is kinda strange but some results are null and the BasicCache does not include those in the returned result. Ask about/investigate. - result.put((K) keyList.removeFirst(), i == null ? null : (V) i.toString()); + result.put((K) keyList.removeFirst(), (V) i.toString()); } } catch (Exception e) { LOG.info(" - got exception {}", e.getMessage());