From 0fbd62f6321d07b4df105720e618688e252de8ec Mon Sep 17 00:00:00 2001 From: Vladimir Orany Date: Tue, 3 Dec 2024 17:44:19 +0100 Subject: [PATCH] fair distribution to the forked consumers --- .../worker/redis/RedisJobExecutorSpec.groovy | 1 + .../executor/AbstractJobExecutorSpec.groovy | 24 +++- .../worker/tck/executor/LongRunningJob.groovy | 14 +- .../processor/DefaultMethodJobInvoker.java | 121 ++++++++++++++---- .../worker/schedule/DefaultJobScheduler.java | 48 +++---- .../worker/local/LocalJobExecutorSpec.groovy | 1 + 6 files changed, 148 insertions(+), 61 deletions(-) diff --git a/libs/micronaut-worker-executor-redis/src/test/groovy/com/agorapulse/worker/redis/RedisJobExecutorSpec.groovy b/libs/micronaut-worker-executor-redis/src/test/groovy/com/agorapulse/worker/redis/RedisJobExecutorSpec.groovy index 97f83f8f..50d0540b 100644 --- a/libs/micronaut-worker-executor-redis/src/test/groovy/com/agorapulse/worker/redis/RedisJobExecutorSpec.groovy +++ b/libs/micronaut-worker-executor-redis/src/test/groovy/com/agorapulse/worker/redis/RedisJobExecutorSpec.groovy @@ -51,6 +51,7 @@ class RedisJobExecutorSpec extends AbstractJobExecutorSpec { 'worker.jobs.long-running-job-execute-unlimited.enabled': 'true', 'worker.jobs.long-running-job-execute-concurrent.enabled': 'true', 'worker.jobs.long-running-job-execute-concurrent-consumer.enabled': 'true', + 'worker.jobs.long-running-job-execute-fork-consumer.enabled': 'true', 'worker.jobs.long-running-job-execute-regular-consumer.enabled': 'true', 'worker.jobs.long-running-job-execute-fork.enabled': 'true' ) diff --git a/libs/micronaut-worker-tck/src/main/groovy/com/agorapulse/worker/tck/executor/AbstractJobExecutorSpec.groovy b/libs/micronaut-worker-tck/src/main/groovy/com/agorapulse/worker/tck/executor/AbstractJobExecutorSpec.groovy index daaa14aa..292f4be0 100644 --- a/libs/micronaut-worker-tck/src/main/groovy/com/agorapulse/worker/tck/executor/AbstractJobExecutorSpec.groovy +++ b/libs/micronaut-worker-tck/src/main/groovy/com/agorapulse/worker/tck/executor/AbstractJobExecutorSpec.groovy @@ -39,12 +39,14 @@ abstract class AbstractJobExecutorSpec extends Specification { initialDelay: 5 ) - @Retry(count = 10) + @SuppressWarnings('AbcMetric') + @Retry(count = 10, condition = { System.getenv('CI') }) void 'jobs executed appropriate times on three servers'() { given: LocalQueues queues = LocalQueues.create().tap { sendMessages(LongRunningJob.CONCURRENT_CONSUMER_QUEUE_NAME, Flux.range(1, 10).map(Object::toString)) sendMessages(LongRunningJob.REGULAR_CONSUMER_QUEUE_NAME, Flux.range(1, 10).map(Object::toString)) + sendMessages(LongRunningJob.FORKED_CONSUMER_QUEUE_NAME, Flux.range(1, 15).map(Object::toString)) } ApplicationContext one = buildContext(queues) ApplicationContext two = buildContext(queues) @@ -72,18 +74,30 @@ abstract class AbstractJobExecutorSpec extends Specification { // concurrent jobs are at most n-times jobs.count { it.concurrent.get() == 1 } == 2 - List remainingRegularMessages = queues.getMessages(LongRunningJob.REGULAR_CONSUMER_QUEUE_NAME, Argument.STRING) - List consumedRegularMessages = jobs.consumedRegularMessages.flatten().flatten() + // forked consumer jobs should handle (workers * fork * max messages) messages + List remainingForkMessages = queues.getMessages(LongRunningJob.FORKED_CONSUMER_QUEUE_NAME, Argument.STRING) + List consumedForkMessages = jobs.consumedForkMessages.flatten().flatten() - List remainingConcurrentMessages = queues.getMessages(LongRunningJob.CONCURRENT_CONSUMER_QUEUE_NAME, Argument.STRING) - List consumeConcurrentMessage = jobs.consumedConcurrentMessages.flatten().flatten() + LongRunningJob.FAILING_MESSAGE in remainingForkMessages + remainingForkMessages.size() == 4 + consumedForkMessages.size() == 11 + + jobOne.consumedForkMessages + jobTwo.consumedForkMessages + jobThree.consumedForkMessages // concurrent consumer jobs should handle (workers * max messages) messages + List remainingRegularMessages = queues.getMessages(LongRunningJob.REGULAR_CONSUMER_QUEUE_NAME, Argument.STRING) + List consumedRegularMessages = jobs.consumedRegularMessages.flatten().flatten() + LongRunningJob.FAILING_MESSAGE in remainingRegularMessages remainingRegularMessages.size() == 2 consumedRegularMessages.size() == 8 // concurrent consumer jobs should handle (concurrency * max messages) messages + List remainingConcurrentMessages = queues.getMessages(LongRunningJob.CONCURRENT_CONSUMER_QUEUE_NAME, Argument.STRING) + List consumeConcurrentMessage = jobs.consumedConcurrentMessages.flatten().flatten() + LongRunningJob.FAILING_MESSAGE in remainingConcurrentMessages remainingConcurrentMessages.size() == 5 consumeConcurrentMessage.size() == 5 diff --git a/libs/micronaut-worker-tck/src/main/groovy/com/agorapulse/worker/tck/executor/LongRunningJob.groovy b/libs/micronaut-worker-tck/src/main/groovy/com/agorapulse/worker/tck/executor/LongRunningJob.groovy index 49c6b743..56308f72 100644 --- a/libs/micronaut-worker-tck/src/main/groovy/com/agorapulse/worker/tck/executor/LongRunningJob.groovy +++ b/libs/micronaut-worker-tck/src/main/groovy/com/agorapulse/worker/tck/executor/LongRunningJob.groovy @@ -45,6 +45,7 @@ class LongRunningJob { public static final String CONCURRENT_CONSUMER_QUEUE_NAME = 'concurrent-queue' public static final String REGULAR_CONSUMER_QUEUE_NAME = 'normal-queue' + public static final String FORKED_CONSUMER_QUEUE_NAME = 'fork-queue' public static final String FAILING_MESSAGE = '5' final AtomicInteger producer = new AtomicInteger() @@ -55,6 +56,7 @@ class LongRunningJob { final AtomicInteger concurrent = new AtomicInteger() final Queue consumedConcurrentMessages = new ConcurrentLinkedQueue() final Queue consumedRegularMessages = new ConcurrentLinkedQueue() + final Queue consumedForkMessages = new ConcurrentLinkedQueue() final AtomicInteger fork = new AtomicInteger() @Job(initialDelay = JOBS_INITIAL_DELAY) @@ -108,6 +110,16 @@ class LongRunningJob { consumedConcurrentMessages.add(message) } + @Fork(2) + @Job(initialDelay = JOBS_INITIAL_DELAY) + @Consumes(value = FORKED_CONSUMER_QUEUE_NAME, maxMessages = 4) + void executeForkConsumer(String message) { + if (FAILING_MESSAGE == message) { + throw new IllegalStateException('Failing fork message') + } + consumedForkMessages.add(message) + } + @Job(initialDelay = JOBS_INITIAL_DELAY) @Consumes(value = REGULAR_CONSUMER_QUEUE_NAME, maxMessages = 3) void executeRegularConsumer(String message) { @@ -127,7 +139,7 @@ class LongRunningJob { @Override @SuppressWarnings('LineLength') String toString() { - return "LongRunningJob{producer=$producer, leader=$leader, follower=$follower, consecutive=$consecutive, unlimited=$unlimited, concurrent=$concurrent, fork=$fork, consumedConcurrentMessages=$consumedConcurrentMessages, consumedRegularMessages=$consumedRegularMessages}" + return "LongRunningJob{producer=$producer, leader=$leader, follower=$follower, consecutive=$consecutive, unlimited=$unlimited, concurrent=$concurrent, fork=$fork, consumedConcurrentMessages=$consumedConcurrentMessages, consumedRegularMessages=$consumedRegularMessages, consumedForkMessages=$consumedForkMessages}" } @SuppressWarnings('Instanceof') diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/processor/DefaultMethodJobInvoker.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/processor/DefaultMethodJobInvoker.java index a49121ae..92ce27d6 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/processor/DefaultMethodJobInvoker.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/processor/DefaultMethodJobInvoker.java @@ -19,29 +19,42 @@ import com.agorapulse.worker.Job; import com.agorapulse.worker.JobConfiguration; +import com.agorapulse.worker.JobConfigurationException; import com.agorapulse.worker.executor.DistributedJobExecutor; import com.agorapulse.worker.job.JobRunContext; import com.agorapulse.worker.queue.JobQueues; +import com.agorapulse.worker.queue.QueueMessage; import io.micronaut.context.BeanContext; +import io.micronaut.context.event.ApplicationEventListener; import io.micronaut.inject.ExecutableMethod; import io.micronaut.inject.qualifiers.Qualifiers; +import io.micronaut.runtime.context.scope.Refreshable; +import io.micronaut.runtime.context.scope.refresh.RefreshEvent; import jakarta.inject.Singleton; import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; import java.time.Duration; +import java.util.Map; import java.util.Optional; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; import java.util.function.Function; @Singleton -public class DefaultMethodJobInvoker implements MethodJobInvoker { +@Refreshable +public class DefaultMethodJobInvoker implements MethodJobInvoker, ApplicationEventListener { private static final Logger LOGGER = LoggerFactory.getLogger(Job.class); + private final Map schedulersCache = new ConcurrentHashMap<>(); + private final BeanContext context; private final DistributedJobExecutor distributedJobExecutor; @@ -53,49 +66,91 @@ public DefaultMethodJobInvoker( this.distributedJobExecutor = distributedJobExecutor; } + @Override + public void onApplicationEvent(RefreshEvent event) { + schedulersCache.clear(); + } + public void invoke(MethodJob job, B bean, JobRunContext context) { ExecutableMethod method = job.getMethod(); JobConfiguration configuration = job.getConfiguration(); if (method.getArguments().length == 0) { context.message(null); - handleResult(configuration, context, executor(context, configuration).apply(() -> method.invoke(bean))); + handleResult(configuration, context, executor(context, configuration).apply(() -> { + if (configuration.getFork() > 1) { + return Flux.range(0, configuration.getFork()) + .parallel(configuration.getFork()) + .runOn(getScheduler(job)) + .flatMap(i -> { + JobRunContext forkedContext = context.createChildContext(i.toString()); + try { + Object result = method.invoke(bean); + + if (result == null) { + return Mono.empty(); + } + + if (result instanceof Publisher p) { + return Flux.from(p); + } + + return Mono.just(result); + } catch (Exception e) { + forkedContext.error(e); + return Mono.empty(); + } + }); + } + + return method.invoke(bean); + })); } else if (method.getArguments().length == 1) { handleResult(configuration, context, executor(context, configuration).apply(() -> { JobConfiguration.ConsumerQueueConfiguration queueConfiguration = configuration.getConsumer(); - return Flux.from( - queues(queueConfiguration.getQueueType()).readMessages( - queueConfiguration.getQueueName(), - queueConfiguration.getMaxMessages() < 1 ? 1 : queueConfiguration.getMaxMessages(), - Optional.ofNullable(queueConfiguration.getWaitingTime()).orElse(Duration.ZERO), - method.getArguments()[0] - ) + Flux> messages = Flux.from( + queues(queueConfiguration.getQueueType()).readMessages( + queueConfiguration.getQueueName(), + queueConfiguration.getMaxMessages() < 1 ? 1 : queueConfiguration.getMaxMessages(), + Optional.ofNullable(queueConfiguration.getWaitingTime()).orElse(Duration.ZERO), + method.getArguments()[0] ) - .flatMap(message -> { - JobRunContext messageContext = context.createChildContext(message.getId()); - try { - messageContext.message(message); - - Object result = method.invoke(bean, message.getMessage()); + ); - message.delete(); + Function, Publisher> messageProcessor = message -> { + JobRunContext messageContext = context.createChildContext(message.getId()); + try { + messageContext.message(message); - if (result == null) { - return Mono.empty(); - } + Object result = method.invoke(bean, message.getMessage()); - if (result instanceof Publisher p) { - return Flux.from(p); - } + message.delete(); - return Mono.just(result); - } catch (Throwable e) { - message.requeue(); - messageContext.error(e); + if (result == null) { return Mono.empty(); } - }); + if (result instanceof Publisher p) { + return Flux.from(p); + } + + return Mono.just(result); + } catch (Exception e) { + message.requeue(); + messageContext.error(e); + return Mono.empty(); + } + + }; + + if (configuration.getFork() > 1) { + return messages + .parallel(configuration.getFork()) + .runOn(getScheduler(job)) + .flatMap(messageProcessor); + } + + return messages.flatMap(messageProcessor); })); } else { LOGGER.error("Too many arguments for {}! The job method wasn't executed!", method); @@ -146,6 +201,10 @@ protected void handleResult(JobConfiguration configuration, JobRunContext callba callback.finished(); } + private Scheduler getScheduler(MethodJob job) { + return schedulersCache.computeIfAbsent(job.getConfiguration().getScheduler(), s -> Schedulers.fromExecutor(getExecutor(job))); + } + private JobQueues queues(String qualifier) { return context.findBean( JobQueues.class, @@ -154,4 +213,12 @@ private JobQueues queues(String qualifier) { .orElseGet(() -> context.getBean(JobQueues.class)); } + private ExecutorService getExecutor(Job job) { + JobConfiguration configuration = job.getConfiguration(); + + return context + .findBean(ExecutorService.class, Qualifiers.byName(configuration.getScheduler())) + .orElseThrow(() -> new JobConfigurationException(job, "No scheduler of type TaskScheduler configured for name: " + configuration.getScheduler())); + } + } diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/schedule/DefaultJobScheduler.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/schedule/DefaultJobScheduler.java index 7ce32152..43333c58 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/schedule/DefaultJobScheduler.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/schedule/DefaultJobScheduler.java @@ -34,8 +34,6 @@ import java.io.Closeable; import java.time.Duration; -import java.util.ArrayList; -import java.util.List; import java.util.Optional; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedDeque; @@ -75,27 +73,20 @@ public void schedule(com.agorapulse.worker.Job job) { JobConfiguration configuration = job.getConfiguration(); TaskScheduler taskScheduler = getTaskScheduler(job); - List> scheduled = new ArrayList<>(); - - for (int i = 0; i < configuration.getFork(); i++) { - scheduled.addAll(doSchedule(job, configuration, taskScheduler)); - } + ScheduledFuture scheduled = doSchedule(job, configuration, taskScheduler); if (job instanceof MutableCancelableJob mj) { mj.cancelAction(() -> { - for (ScheduledFuture scheduledTask : scheduled) { - if (!scheduledTask.isCancelled()) { - scheduledTask.cancel(false); - } + if (!scheduled.isCancelled()) { + scheduled.cancel(false); } }); } - scheduledTasks.addAll(scheduled); + scheduledTasks.add(scheduled); } - private List> doSchedule(Job job, JobConfiguration configuration, TaskScheduler taskScheduler) { - List> scheduled = new ArrayList<>(); + private ScheduledFuture doSchedule(Job job, JobConfiguration configuration, TaskScheduler taskScheduler) { Duration initialDelay = configuration.getInitialDelay(); if (StringUtils.isNotEmpty(configuration.getCron())) { @@ -106,35 +97,36 @@ private List> doSchedule(Job job, JobConfiguration configurat LOG.debug("Scheduling cron job {} [{}] for {}", configuration.getName(), configuration.getCron(), job.getSource()); } try { - ScheduledFuture scheduledFuture = taskScheduler.schedule(configuration.getCron(), job); - scheduled.add(scheduledFuture); + return taskScheduler.schedule(configuration.getCron(), job); } catch (IllegalArgumentException e) { throw new JobConfigurationException(job, "Failed to schedule job " + configuration.getName() + " declared in " + job.getSource() + ". Invalid CRON expression: " + configuration.getCron(), e); } - } else if (configuration.getFixedRate() != null) { + } + + if (configuration.getFixedRate() != null) { Duration duration = configuration.getFixedRate(); if (LOG.isDebugEnabled()) { LOG.debug("Scheduling fixed rate job {} [{}] for {}", configuration.getName(), duration, job.getSource()); } - ScheduledFuture scheduledFuture = taskScheduler.scheduleAtFixedRate(initialDelay, duration, job); - scheduled.add(scheduledFuture); - } else if (configuration.getFixedDelay() != null) { + return taskScheduler.scheduleAtFixedRate(initialDelay, duration, job); + } + + if (configuration.getFixedDelay() != null) { Duration duration = configuration.getFixedDelay(); if (LOG.isDebugEnabled()) { LOG.debug("Scheduling fixed delay task {} [{}] for {}", configuration.getName(), duration, job.getSource()); } - ScheduledFuture scheduledFuture = taskScheduler.scheduleWithFixedDelay(initialDelay, duration, job); - scheduled.add(scheduledFuture); - } else if (initialDelay != null) { - ScheduledFuture scheduledFuture = taskScheduler.schedule(initialDelay, job); - scheduled.add(scheduledFuture); - } else { - throw new JobConfigurationException(job, "Failed to schedule job " + configuration.getName() + " declared in " + job.getSource() + ". Invalid definition"); + return taskScheduler.scheduleWithFixedDelay(initialDelay, duration, job); + } + + if (initialDelay != null) { + return taskScheduler.schedule(initialDelay, job); } - return scheduled; + + throw new JobConfigurationException(job, "Failed to schedule job " + configuration.getName() + " declared in " + job.getSource() + ". Invalid definition"); } private TaskScheduler getTaskScheduler(com.agorapulse.worker.Job job) { diff --git a/libs/micronaut-worker/src/test/groovy/com/agorapulse/worker/local/LocalJobExecutorSpec.groovy b/libs/micronaut-worker/src/test/groovy/com/agorapulse/worker/local/LocalJobExecutorSpec.groovy index 2bd571cb..54fdec8f 100644 --- a/libs/micronaut-worker/src/test/groovy/com/agorapulse/worker/local/LocalJobExecutorSpec.groovy +++ b/libs/micronaut-worker/src/test/groovy/com/agorapulse/worker/local/LocalJobExecutorSpec.groovy @@ -56,6 +56,7 @@ class LocalJobExecutorSpec extends AbstractJobExecutorSpec { 'worker.jobs.long-running-job-execute-unlimited.enabled': 'true', 'worker.jobs.long-running-job-execute-concurrent.enabled': 'true', 'worker.jobs.long-running-job-execute-concurrent-consumer.enabled': 'true', + 'worker.jobs.long-running-job-execute-fork-consumer.enabled': 'true', 'worker.jobs.long-running-job-execute-regular-consumer.enabled': 'true', 'worker.jobs.long-running-job-execute-fork.enabled': 'true' )