diff --git a/libs/micronaut-worker-executor-redis/src/main/java/com/agorapulse/worker/redis/RedisJobExecutor.java b/libs/micronaut-worker-executor-redis/src/main/java/com/agorapulse/worker/redis/RedisJobExecutor.java index 1114df8e..b7ad2115 100644 --- a/libs/micronaut-worker-executor-redis/src/main/java/com/agorapulse/worker/redis/RedisJobExecutor.java +++ b/libs/micronaut-worker-executor-redis/src/main/java/com/agorapulse/worker/redis/RedisJobExecutor.java @@ -105,7 +105,6 @@ public Publisher executeOnlyOnLeader(JobRunContext context, Callable s return Mono.fromCallable(supplier).subscribeOn(Schedulers.fromExecutorService(getExecutorService(context.getStatus().getName()))); } eventPublisher.publishEvent(JobExecutorEvent.leaderOnly(EXECUTOR_TYPE, JobExecutorEvent.Execution.SKIP, context.getStatus(), executorId.id())); - context.skipped(); return Mono.empty(); }).flux(); } @@ -120,7 +119,6 @@ public Publisher executeConcurrently(JobRunContext context, int maxConcur LOGGER.trace("Skipping execution of the job {} as the concurrency level {} is already reached", context.getStatus().getName(), maxConcurrency); } eventPublisher.publishEvent(JobExecutorEvent.concurrent(EXECUTOR_TYPE, JobExecutorEvent.Execution.SKIP, context.getStatus(), maxConcurrency, executorId.id())); - context.skipped(); return decreaseCurrentExecutionCount(context.getStatus().getName(), commands).flatMap(decreased -> Mono.empty()); } @@ -137,7 +135,6 @@ public Publisher executeOnlyOnFollower(JobRunContext context, Callable return readMasterHostname(context.getStatus().getName(), commands).flatMap(h -> { if (!"".equals(h) && h.equals(executorId.id())) { eventPublisher.publishEvent(JobExecutorEvent.followerOnly(EXECUTOR_TYPE, JobExecutorEvent.Execution.SKIP, context.getStatus(), executorId.id())); - context.skipped(); return Mono.empty(); } context.executed(); diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/JobRunStatus.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/JobRunStatus.java index ea1fefd8..20fe687c 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/JobRunStatus.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/JobRunStatus.java @@ -64,5 +64,5 @@ default long getDurationMillis() { @Nullable Throwable getException(); - DefaultJobRunStatus copy(String idSuffix); + JobRunStatus copy(String idSuffix); } diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/event/JobExecutionStartedEvent.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/event/JobExecutionStartedEvent.java index d8d55f05..040f82a1 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/event/JobExecutionStartedEvent.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/event/JobExecutionStartedEvent.java @@ -56,6 +56,10 @@ public Optional getMessage() { return Optional.ofNullable(message).map(QueueMessage::getMessage); } + public Optional getMessageId() { + return Optional.ofNullable(message).map(QueueMessage::getId); + } + @Override public String toString() { return "JobExecutionStartedEvent{name='%s', id='%s', message=%s}".formatted(name, id, message); diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/job/DefaultJobRunContext.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/job/DefaultJobRunContext.java index 8f72156d..dccf4973 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/job/DefaultJobRunContext.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/job/DefaultJobRunContext.java @@ -33,20 +33,18 @@ public class DefaultJobRunContext implements JobRunContext { private Consumer onFinished = s -> { }; private BiConsumer onResult = (aStatus, r) -> { }; private Consumer onExecuted = s -> { }; - private Consumer onSkipped = s -> { }; public DefaultJobRunContext(JobRunStatus status) { this.status = status; } - private DefaultJobRunContext(JobRunStatus status, BiConsumer> onMessage, BiConsumer onError, Consumer onFinished, BiConsumer onResult, Consumer onExecuted, Consumer onSkipped) { + private DefaultJobRunContext(JobRunStatus status, BiConsumer> onMessage, BiConsumer onError, Consumer onFinished, BiConsumer onResult, Consumer onExecuted) { this.status = status; this.onMessage = onMessage; this.onError = onError; this.onFinished = onFinished; this.onResult = onResult; this.onExecuted = onExecuted; - this.onSkipped = onSkipped; } @Override @@ -79,12 +77,6 @@ public JobRunContext onExecuted(Consumer onExecuted) { return this; } - @Override - public JobRunContext onSkipped(Consumer onSkipped) { - this.onSkipped = this.onSkipped.andThen(onSkipped); - return this; - } - @Override public void message(@Nullable QueueMessage event) { onMessage.accept(status, event); @@ -110,11 +102,6 @@ public void executed() { onExecuted.accept(status); } - @Override - public void skipped() { - onSkipped.accept(status); - } - @Override public JobRunStatus getStatus() { return status; @@ -122,7 +109,7 @@ public JobRunStatus getStatus() { @Override public JobRunContext createChildContext(String isSuffix) { - return new DefaultJobRunContext(status.copy(isSuffix), onMessage, onError, onFinished, onResult, onExecuted, onSkipped); + return new DefaultJobRunContext(status.copy(isSuffix), onMessage, onError, onFinished, onResult, onExecuted); } } diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/job/JobRunContext.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/job/JobRunContext.java index 2792e680..6f07e07c 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/job/JobRunContext.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/job/JobRunContext.java @@ -82,15 +82,6 @@ static JobRunContext create(JobRunStatus status) { JobRunContext onExecuted(Consumer onExecuted); - /** - * Registers a listener for the skipped event. The listener is called when the job execution is skipped because - * there are restrictions that prevents the execution e.g. concurrency limit or leader/follower restrictions. - * - * @param onSkipped the listener to be called when the job execution is skipped - * @return this context - */ - JobRunContext onSkipped(Consumer onSkipped); - /** * Signals new incoming message. * @param event the message or null if the job is not a consumer @@ -119,11 +110,6 @@ static JobRunContext create(JobRunStatus status) { */ void executed(); - /** - * Signals the job is skipped. - */ - void skipped(); - /** * Returns the status of the job. * @return the status of the job diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/local/LocalJobExecutor.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/local/LocalJobExecutor.java index 888b3f53..0aaf947f 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/local/LocalJobExecutor.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/local/LocalJobExecutor.java @@ -88,7 +88,6 @@ private Publisher doExecuteConcurrently(JobExecutorEvent.Type type, JobR if (increasedCount > concurrency) { counts.get(status.getName()).decrementAndGet(); eventPublisher.publishEvent(new JobExecutorEvent(EXECUTOR_TYPE, type, JobExecutorEvent.Execution.SKIP, status, concurrency, executorId.id())); - context.skipped(); return null; }