Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor redis utils #382

Merged
merged 1 commit into from
Feb 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.example.age.module.store.redis;

import dagger.Module;
import dagger.Provides;
import jakarta.inject.Singleton;
import redis.clients.jedis.JedisPooled;

/**
* Dagger module that binds {@link JedisPooled}.
* <p>
* Depends on an unbound {@link RedisConfig}.
*/
@Module
interface RedisClientModule {

@Provides
@Singleton
static JedisPooled bindJedisPooled(RedisConfig config) {
return new JedisPooled(config.url().toString());
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,8 @@
package org.example.age.module.store.redis;

import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.inject.Named;
import java.io.IOException;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import org.example.age.service.module.store.PendingStore;
import redis.clients.jedis.JedisPooled;
import redis.clients.jedis.Pipeline;
Expand All @@ -21,73 +13,54 @@
final class RedisPendingStore<V> implements PendingStore<V> {

private final JedisPooled client;
private final RedisUtils utils;
private final String redisKeyPrefix;
private final Class<V> valueType;
private final ObjectMapper mapper;
private final ExecutorService worker;

public RedisPendingStore(
JedisPooled client,
String name,
Class<V> valueType,
ObjectMapper mapper,
@Named("worker") ExecutorService worker) {
public RedisPendingStore(JedisPooled client, RedisUtils utils, String name, Class<V> valueType) {
this.client = client;
this.utils = utils;
this.redisKeyPrefix = String.format("age:pending:%s", name);
this.valueType = valueType;
this.mapper = mapper;
this.worker = worker;
}

@Override
public CompletionStage<Void> put(String key, V value, OffsetDateTime expiration) {
return CompletableFuture.supplyAsync(() -> putSync(key, value, expiration), worker);
return utils.runAsync(() -> putSync(key, value, expiration));
}

@Override
public CompletionStage<Optional<V>> tryGet(String key) {
return CompletableFuture.supplyAsync(() -> tryGetSync(key), worker);
return utils.runAsync(() -> tryGetSync(key));
}

@Override
public CompletionStage<Optional<V>> tryRemove(String key) {
return CompletableFuture.supplyAsync(() -> tryRemoveSync(key), worker);
return utils.runAsync(() -> tryRemoveSync(key));
}

private Void putSync(String key, V value, OffsetDateTime expiration) {
long expiresInS = toExpiresInSeconds(expiration);
long expiresInS = utils.toExpiresInSeconds(expiration);
if (expiresInS <= 0) {
return null;
}

String redisKey = getRedisKey(key);
String json = serialize(value);
String redisKey = utils.getRedisKey(redisKeyPrefix, key);
String json = utils.serialize(value);
client.set(redisKey, json, new SetParams().ex(expiresInS));
return null;
}

private Optional<V> tryGetSync(String key) {
String redisKey = getRedisKey(key);
String redisKey = utils.getRedisKey(redisKeyPrefix, key);
String json = client.get(redisKey);
return (json != null) ? Optional.of(deserialize(json)) : Optional.empty();
return (json != null) ? Optional.of(utils.deserialize(json, valueType)) : Optional.empty();
}

private Optional<V> tryRemoveSync(String key) {
String redisKey = getRedisKey(key);
String redisKey = utils.getRedisKey(redisKeyPrefix, key);
String json = del(redisKey);
return (json != null) ? Optional.of(deserialize(json)) : Optional.empty();
}

/** Gets the key for Redis. */
private String getRedisKey(String key) {
return String.format("%s:%s", redisKeyPrefix, key);
}

/** Converts an expiration to a duration in seconds, rounding up. */
private long toExpiresInSeconds(OffsetDateTime expiration) {
Duration expiresIn = Duration.between(OffsetDateTime.now(ZoneOffset.UTC), expiration);
expiresIn = expiresIn.plusSeconds(1).minusNanos(1).truncatedTo(ChronoUnit.SECONDS);
return expiresIn.toSeconds();
return (json != null) ? Optional.of(utils.deserialize(json, valueType)) : Optional.empty();
}

/** Deletes a key from Redis, returning the value. */
Expand All @@ -99,22 +72,4 @@ private String del(String redisKey) {
return jsonResponse.get();
}
}

/** Serializes the value to JSON. */
private String serialize(V value) {
try {
return mapper.writeValueAsString(value);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

/** Deserializes a value from JSON. */
private V deserialize(String json) {
try {
return mapper.readValue(json, valueType);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import dagger.Binds;
import dagger.Module;
import dagger.Provides;
import jakarta.inject.Singleton;
import java.util.concurrent.ExecutorService;
import org.example.age.service.module.store.PendingStoreRepository;
import redis.clients.jedis.JedisPooled;

/**
* Dagger module that binds {@link PendingStoreRepository}.
Expand All @@ -18,18 +15,10 @@
* <li>{@link ObjectMapper}
* <li><code>@Named {@link ExecutorService}</code>
* </ul>
* <p>
* Requires sticky sessions to work in a distributed environment.
*/
@Module
@Module(includes = RedisClientModule.class)
public interface RedisPendingStoreModule {

@Binds
PendingStoreRepository bindPendingStoreRepository(RedisPendingStoreRepository impl);

@Provides
@Singleton
static JedisPooled bindJedisPooled(RedisConfig config) {
return new JedisPooled(config.url().toString());
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package org.example.age.module.store.redis;

import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.example.age.service.module.store.PendingStore;
import org.example.age.service.module.store.PendingStoreRepository;
import redis.clients.jedis.JedisPooled;
Expand All @@ -17,17 +14,14 @@
final class RedisPendingStoreRepository implements PendingStoreRepository {

private final JedisPooled client;
private final ObjectMapper mapper;
private final ExecutorService worker;
private final RedisUtils utils;

private final Map<String, PendingStore<?>> stores = Collections.synchronizedMap(new HashMap<>());

@Inject
public RedisPendingStoreRepository(
JedisPooled client, ObjectMapper mapper, @Named("worker") ExecutorService worker) {
public RedisPendingStoreRepository(JedisPooled client, RedisUtils utils) {
this.client = client;
this.mapper = mapper;
this.worker = worker;
this.utils = utils;
}

@SuppressWarnings("unchecked")
Expand All @@ -38,6 +32,6 @@ public <V> PendingStore<V> get(String name, Class<V> valueType) {

/** Creates a pending store. */
private <V> PendingStore<V> create(String name, Class<V> valueType) {
return new RedisPendingStore<>(client, name, valueType, mapper, worker);
return new RedisPendingStore<>(client, utils, name, valueType);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package org.example.age.module.store.redis;

import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

/** Utilities for working with Redis. */
@Singleton
final class RedisUtils {

private final ObjectMapper mapper;
private final ExecutorService worker;

@Inject
public RedisUtils(ObjectMapper mapper, @Named("worker") ExecutorService worker) {
this.mapper = mapper;
this.worker = worker;
}

/** Runs a task (that issues Redis commands) asynchronously on a worker thread. */
public <V> CompletionStage<V> runAsync(Supplier<V> task) {
return CompletableFuture.supplyAsync(task, worker);
}

/** Gets a Redis key. */
public String getRedisKey(String prefix, String key) {
return String.format("%s:%s", prefix, key);
}

/** Converts an expiration to a duration in seconds, rounding up. */
public long toExpiresInSeconds(OffsetDateTime expiration) {
Duration expiresIn = Duration.between(OffsetDateTime.now(ZoneOffset.UTC), expiration);
expiresIn = expiresIn.plusSeconds(1).minusNanos(1).truncatedTo(ChronoUnit.SECONDS);
return expiresIn.toSeconds();
}

/** Serializes a value to JSON. */
public <V> String serialize(V value) {
try {
return mapper.writeValueAsString(value);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

/** Deserializes a value from JSON. */
public <V> V deserialize(String json, Class<V> valueType) {
try {
return mapper.readValue(json, valueType);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Loading