Skip to content

Commit

Permalink
error handling for message consumption
Browse files Browse the repository at this point in the history
  • Loading branch information
musketyr committed Dec 3, 2024
1 parent 4ffe4a8 commit 7efa111
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,21 @@ abstract class AbstractJobExecutorSpec extends Specification {
// concurrent jobs are at most n-times
jobs.count { it.concurrent.get() == 1 } == 2

// concurrent consumer jobs should handle (workers * max messages) messages
List<String> remainingRegularMessages = queues.getMessages(LongRunningJob.REGULAR_CONSUMER_QUEUE_NAME, Argument.STRING)
remainingRegularMessages.size() == 1
jobs.consumedRegularMessages.flatten().flatten().size() == 9
List<String> consumedRegularMessages = jobs.consumedRegularMessages.flatten().flatten()

// concurrent consumer jobs should handle (concurrency * max messages) messages
List<String> remainingConcurrentMessages = queues.getMessages(LongRunningJob.CONCURRENT_CONSUMER_QUEUE_NAME, Argument.STRING)
remainingConcurrentMessages.size() == 4
jobs.consumedConcurrentMessages.flatten().flatten().size() == 6
List<String> consumeConcurrentMessage = jobs.consumedConcurrentMessages.flatten().flatten()

// concurrent consumer jobs should handle (workers * max messages) messages
LongRunningJob.FAILING_MESSAGE in remainingRegularMessages
remainingRegularMessages.size() == 2
consumedRegularMessages.size() == 8

// concurrent consumer jobs should handle (concurrency * max messages) messages
LongRunningJob.FAILING_MESSAGE in remainingConcurrentMessages
remainingConcurrentMessages.size() == 5
consumeConcurrentMessage.size() == 5

// leader job is executed only on leader
jobs.count { it.leader.get() == 1 } == 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class LongRunningJob {

public static final String CONCURRENT_CONSUMER_QUEUE_NAME = 'concurrent-queue'
public static final String REGULAR_CONSUMER_QUEUE_NAME = 'normal-queue'
public static final String FAILING_MESSAGE = '5'

final AtomicInteger producer = new AtomicInteger()
final AtomicInteger leader = new AtomicInteger()
Expand Down Expand Up @@ -101,14 +102,18 @@ class LongRunningJob {
@Job(initialDelay = JOBS_INITIAL_DELAY)
@Consumes(value = CONCURRENT_CONSUMER_QUEUE_NAME, maxMessages = 3)
void executeConcurrentConsumer(String message) {
runLongTask()
if (FAILING_MESSAGE == message) {
throw new IllegalStateException('Failing concurrent message')
}
consumedConcurrentMessages.add(message)
}

@Job(initialDelay = JOBS_INITIAL_DELAY)
@Consumes(value = REGULAR_CONSUMER_QUEUE_NAME, maxMessages = 3)
void executeRegularConsumer(String message) {
runLongTask()
if (FAILING_MESSAGE == message) {
throw new IllegalStateException('Failing regular message')
}
consumedRegularMessages.add(message)
}

Expand All @@ -122,7 +127,7 @@ class LongRunningJob {
@Override
@SuppressWarnings('LineLength')
String toString() {
return "LongRunningJob{producer=$producer, leader=$leader, follower=$follower, consecutive=$consecutive, unlimited=$unlimited, concurrent=$concurrent, fork=$fork}"
return "LongRunningJob{producer=$producer, leader=$leader, follower=$follower, consecutive=$consecutive, unlimited=$unlimited, concurrent=$concurrent, fork=$fork, consumedConcurrentMessages=$consumedConcurrentMessages, consumedRegularMessages=$consumedRegularMessages}"
}

@SuppressWarnings('Instanceof')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,5 @@ default long getDurationMillis() {

@Nullable Throwable getException();

DefaultJobRunStatus copy();
DefaultJobRunStatus copy(String idSuffix);
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ public JobRunStatus getStatus() {
}

@Override
public JobRunContext copy() {
return new DefaultJobRunContext(status.copy(), onMessage, onError, onFinished, onResult, onExecuted, onSkipped);
public JobRunContext createChildContext(String isSuffix) {
return new DefaultJobRunContext(status.copy(isSuffix), onMessage, onError, onFinished, onResult, onExecuted, onSkipped);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ public void executed() {
}

@Override
public DefaultJobRunStatus copy() {
return new DefaultJobRunStatus(id, name, started, finished, exception, executionCount);
public DefaultJobRunStatus copy(String idSuffix) {
return new DefaultJobRunStatus(id + "-" + idSuffix, name, started, finished, exception, executionCount);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,36 +24,119 @@
import java.util.function.BiConsumer;
import java.util.function.Consumer;

/**
* Job context is passed to the job execution and allows to listen to various events during the job execution.
*/
public interface JobRunContext {

/**
* Creates a new job run context with the given status.
* @param status the status of the job
* @return new job run context
*/
static JobRunContext create(JobRunStatus status) {
return new DefaultJobRunContext(status);
}

/**
* Registers a listener for the message event. The listener is called when the message is received from the queue
* with the given message or with <code>null</code> when the job is not a consumer. The listener is called only once
* for non-consumer jobs and for each message for the consumer jobs.
*
* @param onMessage the listener to be called when the message is received
* @return this context
*/
JobRunContext onMessage(BiConsumer<JobRunStatus, QueueMessage<?>> onMessage);

/**
* Registers a listener for the error event. The listener is called when the job execution throws an exception. This
* can happen only once for non-consumer jobs and up to message count for consumer jobs.
* @param onError the listener to be called when the job execution throws an exception
* @return this context
*/
JobRunContext onError(BiConsumer<JobRunStatus, Throwable> onError);

/**
* Registers a listener for the finished event. The listener is called when the job execution is finished. This
* can happen at most once.
*
* @param onFinished the listener to be called when the job execution is finished
* @return this context
*/
JobRunContext onFinished(Consumer<JobRunStatus> onFinished);

/**
* Registers a listener for the result event. The listener is called for every generated result from producer jobs.
* @param onResult the listener to be called when the job execution generates a result
* @return this context
*/
JobRunContext onResult(BiConsumer<JobRunStatus, Object> onResult);

/**
* Registers a listener for the executed event. The listener is called when the job execution is executed because
* there are no restrictions that would prevent the execution.
*
* @param onExecuted the listener to be called when the job execution is executed
* @return this context
*/
JobRunContext onExecuted(Consumer<JobRunStatus> onExecuted);


/**
* Registers a listener for the skipped event. The listener is called when the job execution is skipped because
* there are restrictions that prevents the execution e.g. concurrency limit or leader/follower restrictions.
*
* @param onSkipped the listener to be called when the job execution is skipped
* @return this context
*/
JobRunContext onSkipped(Consumer<JobRunStatus> onSkipped);

/**
* Signals new incoming message.
* @param event the message or <code>null</code> if the job is not a consumer
*/
void message(@Nullable QueueMessage<?> event);

/**
* Singals an error during the job execution.
* @param error the error
*/
void error(Throwable error);

/**
* Signals the job execution is finished.
*/
void finished();

/**
* Signals the job execution produced a result.
* @param result the result of the job execution
*/
void result(@Nullable Object result);

/**
* Signals the job is executed.
*/
void executed();

/**
* Signals the job is skipped.
*/
void skipped();

/**
* Returns the status of the job.
* @return the status of the job
*/
JobRunStatus getStatus();

JobRunContext copy();
/**
* Creates a copy of the job run context. The copy will still execute all the listeners but the original context
* won't be affected by any new listeners added to the copy.
*
* @param idSuffix the id suffix for the new context
* @return the copy of the job run context
*/
JobRunContext createChildContext(String idSuffix);

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ public class LocalQueues implements JobQueues {
static class LocalQueue {

private final ConcurrentNavigableMap<String, Object> messages = new ConcurrentSkipListMap<>();
private final String prefix = UUID.randomUUID() + "-";
private final String format = UUID.randomUUID() + "-%015d";
private final AtomicLong counter = new AtomicLong(1);

void add(Object message) {
messages.put(prefix + counter.getAndIncrement(), message);
messages.put(format.formatted(counter.getAndIncrement()), message);
}

<T> QueueMessage<T> readMessage(ConversionService env, Argument<T> argument) {
Expand All @@ -61,7 +61,11 @@ <T> QueueMessage<T> readMessage(ConversionService env, Argument<T> argument) {
entry.getKey(),
env.convertRequired(entry.getValue(), argument),
() -> messages.remove(entry.getKey()),
() -> messages.put(entry.getKey(), entry.getValue())
() -> {
// ensure the message is removed and add it to the end of the queue
messages.remove(entry.getKey());
add(entry.getValue());
}
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,30 @@ public <B> void invoke(MethodJob<B, ?> job, B bean, JobRunContext context) {
method.getArguments()[0]
)
)
.doOnNext(context::message)
.flatMap(message -> {
Object result = method.invoke(bean, message.getMessage());
JobRunContext messageContext = context.createChildContext(message.getId());
try {
messageContext.message(message);

if (result == null) {
return Mono.empty();
}
Object result = method.invoke(bean, message.getMessage());

message.delete();

if (result instanceof Publisher<?> p) {
return Flux.from(p);
if (result == null) {
return Mono.empty();
}

if (result instanceof Publisher<?> p) {
return Flux.from(p);
}

return Mono.just(result);
} catch (Throwable e) {
message.requeue();
messageContext.error(e);
return Mono.empty();
}

return Mono.just(result);
});
}));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import java.util.List;
import java.util.Optional;

class MethodJob<B, R> extends AbstractJob {
public class MethodJob<B, R> extends AbstractJob {

private final ExecutableMethod<B, R> method;

Expand Down

0 comments on commit 7efa111

Please sign in to comment.