Skip to content

Commit

Permalink
fair distribution to the forked consumers
Browse files Browse the repository at this point in the history
  • Loading branch information
musketyr committed Dec 3, 2024
1 parent 7efa111 commit 0fbd62f
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class RedisJobExecutorSpec extends AbstractJobExecutorSpec {
'worker.jobs.long-running-job-execute-unlimited.enabled': 'true',
'worker.jobs.long-running-job-execute-concurrent.enabled': 'true',
'worker.jobs.long-running-job-execute-concurrent-consumer.enabled': 'true',
'worker.jobs.long-running-job-execute-fork-consumer.enabled': 'true',
'worker.jobs.long-running-job-execute-regular-consumer.enabled': 'true',
'worker.jobs.long-running-job-execute-fork.enabled': 'true'
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@ abstract class AbstractJobExecutorSpec extends Specification {
initialDelay: 5
)

@Retry(count = 10)
@SuppressWarnings('AbcMetric')
@Retry(count = 10, condition = { System.getenv('CI') })
void 'jobs executed appropriate times on three servers'() {
given:
LocalQueues queues = LocalQueues.create().tap {
sendMessages(LongRunningJob.CONCURRENT_CONSUMER_QUEUE_NAME, Flux.range(1, 10).map(Object::toString))
sendMessages(LongRunningJob.REGULAR_CONSUMER_QUEUE_NAME, Flux.range(1, 10).map(Object::toString))
sendMessages(LongRunningJob.FORKED_CONSUMER_QUEUE_NAME, Flux.range(1, 15).map(Object::toString))
}
ApplicationContext one = buildContext(queues)
ApplicationContext two = buildContext(queues)
Expand Down Expand Up @@ -72,18 +74,30 @@ abstract class AbstractJobExecutorSpec extends Specification {
// concurrent jobs are at most n-times
jobs.count { it.concurrent.get() == 1 } == 2

List<String> remainingRegularMessages = queues.getMessages(LongRunningJob.REGULAR_CONSUMER_QUEUE_NAME, Argument.STRING)
List<String> consumedRegularMessages = jobs.consumedRegularMessages.flatten().flatten()
// forked consumer jobs should handle (workers * fork * max messages) messages
List<String> remainingForkMessages = queues.getMessages(LongRunningJob.FORKED_CONSUMER_QUEUE_NAME, Argument.STRING)
List<String> consumedForkMessages = jobs.consumedForkMessages.flatten().flatten()

List<String> remainingConcurrentMessages = queues.getMessages(LongRunningJob.CONCURRENT_CONSUMER_QUEUE_NAME, Argument.STRING)
List<String> consumeConcurrentMessage = jobs.consumedConcurrentMessages.flatten().flatten()
LongRunningJob.FAILING_MESSAGE in remainingForkMessages
remainingForkMessages.size() == 4
consumedForkMessages.size() == 11

jobOne.consumedForkMessages
jobTwo.consumedForkMessages
jobThree.consumedForkMessages

// concurrent consumer jobs should handle (workers * max messages) messages
List<String> remainingRegularMessages = queues.getMessages(LongRunningJob.REGULAR_CONSUMER_QUEUE_NAME, Argument.STRING)
List<String> consumedRegularMessages = jobs.consumedRegularMessages.flatten().flatten()

LongRunningJob.FAILING_MESSAGE in remainingRegularMessages
remainingRegularMessages.size() == 2
consumedRegularMessages.size() == 8

// concurrent consumer jobs should handle (concurrency * max messages) messages
List<String> remainingConcurrentMessages = queues.getMessages(LongRunningJob.CONCURRENT_CONSUMER_QUEUE_NAME, Argument.STRING)
List<String> consumeConcurrentMessage = jobs.consumedConcurrentMessages.flatten().flatten()

LongRunningJob.FAILING_MESSAGE in remainingConcurrentMessages
remainingConcurrentMessages.size() == 5
consumeConcurrentMessage.size() == 5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class LongRunningJob {

public static final String CONCURRENT_CONSUMER_QUEUE_NAME = 'concurrent-queue'
public static final String REGULAR_CONSUMER_QUEUE_NAME = 'normal-queue'
public static final String FORKED_CONSUMER_QUEUE_NAME = 'fork-queue'
public static final String FAILING_MESSAGE = '5'

final AtomicInteger producer = new AtomicInteger()
Expand All @@ -55,6 +56,7 @@ class LongRunningJob {
final AtomicInteger concurrent = new AtomicInteger()
final Queue<String> consumedConcurrentMessages = new ConcurrentLinkedQueue()
final Queue<String> consumedRegularMessages = new ConcurrentLinkedQueue()
final Queue<String> consumedForkMessages = new ConcurrentLinkedQueue()
final AtomicInteger fork = new AtomicInteger()

@Job(initialDelay = JOBS_INITIAL_DELAY)
Expand Down Expand Up @@ -108,6 +110,16 @@ class LongRunningJob {
consumedConcurrentMessages.add(message)
}

@Fork(2)
@Job(initialDelay = JOBS_INITIAL_DELAY)
@Consumes(value = FORKED_CONSUMER_QUEUE_NAME, maxMessages = 4)
void executeForkConsumer(String message) {
if (FAILING_MESSAGE == message) {
throw new IllegalStateException('Failing fork message')
}
consumedForkMessages.add(message)
}

@Job(initialDelay = JOBS_INITIAL_DELAY)
@Consumes(value = REGULAR_CONSUMER_QUEUE_NAME, maxMessages = 3)
void executeRegularConsumer(String message) {
Expand All @@ -127,7 +139,7 @@ class LongRunningJob {
@Override
@SuppressWarnings('LineLength')
String toString() {
return "LongRunningJob{producer=$producer, leader=$leader, follower=$follower, consecutive=$consecutive, unlimited=$unlimited, concurrent=$concurrent, fork=$fork, consumedConcurrentMessages=$consumedConcurrentMessages, consumedRegularMessages=$consumedRegularMessages}"
return "LongRunningJob{producer=$producer, leader=$leader, follower=$follower, consecutive=$consecutive, unlimited=$unlimited, concurrent=$concurrent, fork=$fork, consumedConcurrentMessages=$consumedConcurrentMessages, consumedRegularMessages=$consumedRegularMessages, consumedForkMessages=$consumedForkMessages}"
}

@SuppressWarnings('Instanceof')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,42 @@

import com.agorapulse.worker.Job;
import com.agorapulse.worker.JobConfiguration;
import com.agorapulse.worker.JobConfigurationException;
import com.agorapulse.worker.executor.DistributedJobExecutor;
import com.agorapulse.worker.job.JobRunContext;
import com.agorapulse.worker.queue.JobQueues;
import com.agorapulse.worker.queue.QueueMessage;
import io.micronaut.context.BeanContext;
import io.micronaut.context.event.ApplicationEventListener;
import io.micronaut.inject.ExecutableMethod;
import io.micronaut.inject.qualifiers.Qualifiers;
import io.micronaut.runtime.context.scope.Refreshable;
import io.micronaut.runtime.context.scope.refresh.RefreshEvent;
import jakarta.inject.Singleton;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;

@Singleton
public class DefaultMethodJobInvoker implements MethodJobInvoker {
@Refreshable
public class DefaultMethodJobInvoker implements MethodJobInvoker, ApplicationEventListener<RefreshEvent> {

private static final Logger LOGGER = LoggerFactory.getLogger(Job.class);

private final Map<String, Scheduler> schedulersCache = new ConcurrentHashMap<>();

private final BeanContext context;
private final DistributedJobExecutor distributedJobExecutor;

Expand All @@ -53,49 +66,91 @@ public DefaultMethodJobInvoker(
this.distributedJobExecutor = distributedJobExecutor;
}

@Override
public void onApplicationEvent(RefreshEvent event) {
schedulersCache.clear();
}

public <B> void invoke(MethodJob<B, ?> job, B bean, JobRunContext context) {
ExecutableMethod<B, ?> method = job.getMethod();
JobConfiguration configuration = job.getConfiguration();

if (method.getArguments().length == 0) {
context.message(null);
handleResult(configuration, context, executor(context, configuration).apply(() -> method.invoke(bean)));
handleResult(configuration, context, executor(context, configuration).apply(() -> {
if (configuration.getFork() > 1) {
return Flux.range(0, configuration.getFork())
.parallel(configuration.getFork())
.runOn(getScheduler(job))
.flatMap(i -> {
JobRunContext forkedContext = context.createChildContext(i.toString());
try {
Object result = method.invoke(bean);

if (result == null) {
return Mono.empty();
}

if (result instanceof Publisher<?> p) {
return Flux.from(p);
}

return Mono.just(result);
} catch (Exception e) {
forkedContext.error(e);
return Mono.empty();
}
});
}

return method.invoke(bean);
}));
} else if (method.getArguments().length == 1) {
handleResult(configuration, context, executor(context, configuration).apply(() -> {
JobConfiguration.ConsumerQueueConfiguration queueConfiguration = configuration.getConsumer();
return Flux.from(
queues(queueConfiguration.getQueueType()).readMessages(
queueConfiguration.getQueueName(),
queueConfiguration.getMaxMessages() < 1 ? 1 : queueConfiguration.getMaxMessages(),
Optional.ofNullable(queueConfiguration.getWaitingTime()).orElse(Duration.ZERO),
method.getArguments()[0]
)
Flux<? extends QueueMessage<?>> messages = Flux.from(
queues(queueConfiguration.getQueueType()).readMessages(
queueConfiguration.getQueueName(),
queueConfiguration.getMaxMessages() < 1 ? 1 : queueConfiguration.getMaxMessages(),
Optional.ofNullable(queueConfiguration.getWaitingTime()).orElse(Duration.ZERO),
method.getArguments()[0]
)
.flatMap(message -> {
JobRunContext messageContext = context.createChildContext(message.getId());
try {
messageContext.message(message);

Object result = method.invoke(bean, message.getMessage());
);

message.delete();
Function<QueueMessage<?>, Publisher<?>> messageProcessor = message -> {
JobRunContext messageContext = context.createChildContext(message.getId());
try {
messageContext.message(message);

if (result == null) {
return Mono.empty();
}
Object result = method.invoke(bean, message.getMessage());

if (result instanceof Publisher<?> p) {
return Flux.from(p);
}
message.delete();

return Mono.just(result);
} catch (Throwable e) {
message.requeue();
messageContext.error(e);
if (result == null) {
return Mono.empty();
}

});
if (result instanceof Publisher<?> p) {
return Flux.from(p);
}

return Mono.just(result);
} catch (Exception e) {
message.requeue();
messageContext.error(e);
return Mono.empty();
}

};

if (configuration.getFork() > 1) {
return messages
.parallel(configuration.getFork())
.runOn(getScheduler(job))
.flatMap(messageProcessor);
}

return messages.flatMap(messageProcessor);
}));
} else {
LOGGER.error("Too many arguments for {}! The job method wasn't executed!", method);
Expand Down Expand Up @@ -146,6 +201,10 @@ protected void handleResult(JobConfiguration configuration, JobRunContext callba
callback.finished();
}

private <B> Scheduler getScheduler(MethodJob<B, ?> job) {
return schedulersCache.computeIfAbsent(job.getConfiguration().getScheduler(), s -> Schedulers.fromExecutor(getExecutor(job)));
}

private JobQueues queues(String qualifier) {
return context.findBean(
JobQueues.class,
Expand All @@ -154,4 +213,12 @@ private JobQueues queues(String qualifier) {
.orElseGet(() -> context.getBean(JobQueues.class));
}

private ExecutorService getExecutor(Job job) {
JobConfiguration configuration = job.getConfiguration();

return context
.findBean(ExecutorService.class, Qualifiers.byName(configuration.getScheduler()))
.orElseThrow(() -> new JobConfigurationException(job, "No scheduler of type TaskScheduler configured for name: " + configuration.getScheduler()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@

import java.io.Closeable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
Expand Down Expand Up @@ -75,27 +73,20 @@ public void schedule(com.agorapulse.worker.Job job) {
JobConfiguration configuration = job.getConfiguration();
TaskScheduler taskScheduler = getTaskScheduler(job);

List<ScheduledFuture<?>> scheduled = new ArrayList<>();

for (int i = 0; i < configuration.getFork(); i++) {
scheduled.addAll(doSchedule(job, configuration, taskScheduler));
}
ScheduledFuture<?> scheduled = doSchedule(job, configuration, taskScheduler);

if (job instanceof MutableCancelableJob mj) {
mj.cancelAction(() -> {
for (ScheduledFuture<?> scheduledTask : scheduled) {
if (!scheduledTask.isCancelled()) {
scheduledTask.cancel(false);
}
if (!scheduled.isCancelled()) {
scheduled.cancel(false);
}
});
}

scheduledTasks.addAll(scheduled);
scheduledTasks.add(scheduled);
}

private List<ScheduledFuture<?>> doSchedule(Job job, JobConfiguration configuration, TaskScheduler taskScheduler) {
List<ScheduledFuture<?>> scheduled = new ArrayList<>();
private ScheduledFuture<?> doSchedule(Job job, JobConfiguration configuration, TaskScheduler taskScheduler) {
Duration initialDelay = configuration.getInitialDelay();

if (StringUtils.isNotEmpty(configuration.getCron())) {
Expand All @@ -106,35 +97,36 @@ private List<ScheduledFuture<?>> doSchedule(Job job, JobConfiguration configurat
LOG.debug("Scheduling cron job {} [{}] for {}", configuration.getName(), configuration.getCron(), job.getSource());
}
try {
ScheduledFuture<?> scheduledFuture = taskScheduler.schedule(configuration.getCron(), job);
scheduled.add(scheduledFuture);
return taskScheduler.schedule(configuration.getCron(), job);
} catch (IllegalArgumentException e) {
throw new JobConfigurationException(job, "Failed to schedule job " + configuration.getName() + " declared in " + job.getSource() + ". Invalid CRON expression: " + configuration.getCron(), e);
}
} else if (configuration.getFixedRate() != null) {
}

if (configuration.getFixedRate() != null) {
Duration duration = configuration.getFixedRate();
if (LOG.isDebugEnabled()) {
LOG.debug("Scheduling fixed rate job {} [{}] for {}", configuration.getName(), duration, job.getSource());
}

ScheduledFuture<?> scheduledFuture = taskScheduler.scheduleAtFixedRate(initialDelay, duration, job);
scheduled.add(scheduledFuture);
} else if (configuration.getFixedDelay() != null) {
return taskScheduler.scheduleAtFixedRate(initialDelay, duration, job);
}

if (configuration.getFixedDelay() != null) {
Duration duration = configuration.getFixedDelay();

if (LOG.isDebugEnabled()) {
LOG.debug("Scheduling fixed delay task {} [{}] for {}", configuration.getName(), duration, job.getSource());
}

ScheduledFuture<?> scheduledFuture = taskScheduler.scheduleWithFixedDelay(initialDelay, duration, job);
scheduled.add(scheduledFuture);
} else if (initialDelay != null) {
ScheduledFuture<?> scheduledFuture = taskScheduler.schedule(initialDelay, job);
scheduled.add(scheduledFuture);
} else {
throw new JobConfigurationException(job, "Failed to schedule job " + configuration.getName() + " declared in " + job.getSource() + ". Invalid definition");
return taskScheduler.scheduleWithFixedDelay(initialDelay, duration, job);
}

if (initialDelay != null) {
return taskScheduler.schedule(initialDelay, job);
}
return scheduled;

throw new JobConfigurationException(job, "Failed to schedule job " + configuration.getName() + " declared in " + job.getSource() + ". Invalid definition");
}

private TaskScheduler getTaskScheduler(com.agorapulse.worker.Job job) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class LocalJobExecutorSpec extends AbstractJobExecutorSpec {
'worker.jobs.long-running-job-execute-unlimited.enabled': 'true',
'worker.jobs.long-running-job-execute-concurrent.enabled': 'true',
'worker.jobs.long-running-job-execute-concurrent-consumer.enabled': 'true',
'worker.jobs.long-running-job-execute-fork-consumer.enabled': 'true',
'worker.jobs.long-running-job-execute-regular-consumer.enabled': 'true',
'worker.jobs.long-running-job-execute-fork.enabled': 'true'
)
Expand Down

0 comments on commit 0fbd62f

Please sign in to comment.