diff --git a/libs/micronaut-worker-executor-redis/src/main/java/com/agorapulse/worker/redis/RedisJobExecutor.java b/libs/micronaut-worker-executor-redis/src/main/java/com/agorapulse/worker/redis/RedisJobExecutor.java index f67a6ede..5ed6a7ca 100644 --- a/libs/micronaut-worker-executor-redis/src/main/java/com/agorapulse/worker/redis/RedisJobExecutor.java +++ b/libs/micronaut-worker-executor-redis/src/main/java/com/agorapulse/worker/redis/RedisJobExecutor.java @@ -21,6 +21,7 @@ import com.agorapulse.worker.JobConfiguration; import com.agorapulse.worker.JobManager; import com.agorapulse.worker.executor.DistributedJobExecutor; +import com.agorapulse.worker.executor.ExecutorId; import io.lettuce.core.ScriptOutputType; import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.api.async.RedisAsyncCommands; @@ -31,7 +32,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import jakarta.inject.Named; import jakarta.inject.Singleton; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -71,13 +71,13 @@ public class RedisJobExecutor implements DistributedJobExecutor { private static final String DECREASE_JOB_COUNT = "return redis.call('decr', KEYS[1])"; private final StatefulRedisConnection connection; - private final String hostname; + private final ExecutorId executorId; private final BeanContext beanContext; private final JobManager jobManager; - public RedisJobExecutor(StatefulRedisConnection connection, @Named(HOSTNAME_PARAMETER_NAME) String hostname, BeanContext beanContext, JobManager jobManager) { + public RedisJobExecutor(StatefulRedisConnection connection, ExecutorId executorId, BeanContext beanContext, JobManager jobManager) { this.connection = connection; - this.hostname = hostname; + this.executorId = executorId; this.beanContext = beanContext; this.jobManager = jobManager; } @@ -87,7 +87,7 @@ public Publisher executeOnlyOnLeader(String jobName, Callable supplier RedisAsyncCommands commands = connection.async(); return readMasterHostname(jobName, commands).flatMap(h -> { - if (hostname.equals(h)) { + if (executorId.id().equals(h)) { return Mono.fromCallable(supplier).subscribeOn(Schedulers.fromExecutorService(getExecutorService(jobName))); } return Mono.empty(); @@ -114,7 +114,7 @@ public Publisher executeConcurrently(String jobName, int maxConcurrency, public Publisher executeOnlyOnFollower(String jobName, Callable supplier) { RedisAsyncCommands commands = connection.async(); return readMasterHostname(jobName, commands).flatMap(h -> { - if (!"".equals(h) && h.equals(hostname)) { + if (!"".equals(h) && h.equals(executorId.id())) { return Mono.empty(); } return Mono.fromCallable(supplier).subscribeOn(Schedulers.fromExecutorService(getExecutorService(jobName))); @@ -142,7 +142,7 @@ private Mono readMasterHostname(String jobName, RedisAsyncCommands