diff --git a/docs/guide/src/docs/asciidoc/installation.adoc b/docs/guide/src/docs/asciidoc/installation.adoc index 400f27b7..f954e7d0 100644 --- a/docs/guide/src/docs/asciidoc/installation.adoc +++ b/docs/guide/src/docs/asciidoc/installation.adoc @@ -12,6 +12,8 @@ repositories { } dependencies { + // minimal dependency with local queue and executor + // select some of the following dependencies to enable more features implementation 'com.agorapulse:micronaut-worker:{project-version}' // to enable /jobs endpoint @@ -29,6 +31,9 @@ dependencies { // to enable Redis queues integration implementation 'com.agorapulse:micronaut-worker-queues-redis:{project-version}' + // to enable running jobs as CLI apps + implementation 'com.agorapulse:micronaut-worker-runner:{project-version}' + // you also need Redis configuration on the classpath depending on your Micronaut version // for Micronaut 1.x implementation 'io.micronaut.configuration:micronaut-redis-lettuce' diff --git a/docs/guide/src/docs/asciidoc/introduction.adoc b/docs/guide/src/docs/asciidoc/introduction.adoc index c78fb90b..363e4ffe 100644 --- a/docs/guide/src/docs/asciidoc/introduction.adoc +++ b/docs/guide/src/docs/asciidoc/introduction.adoc @@ -11,6 +11,7 @@ Micronaut Worker is a library for advanced scheduling and work distribution in M * Job execution events `JobExecutionStartedEvent`, `JobExecutionFinishedEvent` and `JobExecutionResultEvent` * Built in support for https://github.com/agorapulse/micronaut-snitch[Micronaut Snitch] * Built in support for https://agorapulse.github.io/micronaut-console[Micronaut Console] + * Ability to execute a single job from the CLI (for e.g. https://aws.amazon.com/batch/ or https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task_definitions.html) Unlike the `@Scheduled` annotation, Micronaut Worker annotations are not repeatable, but they can be combined in meaningful ways. For example, a method annotated with `@FixedRate('10m') @InitialDelay('1m')` executes every diff --git a/docs/guide/src/docs/asciidoc/usage.adoc b/docs/guide/src/docs/asciidoc/usage.adoc index 2bc7780d..abafd7b7 100644 --- a/docs/guide/src/docs/asciidoc/usage.adoc +++ b/docs/guide/src/docs/asciidoc/usage.adoc @@ -256,6 +256,18 @@ include::{root-dir}/libs/micronaut-worker/src/main/java/com/agorapulse/worker/ev TIP: If https://github.com/agorapulse/micronaut-snitch[Micronaut Snitch] is present on the classpath and configured with the name of the job, the `snitch` method is called automatically after successful execution. +== CLI Runner + +You can run a single job from the command line using the `com.agorapulse.worker.runner.JobRunner` class as the main class. The arguments are the names of the jobs to run. All other jobs are disabled, even when enabled in the configuration (see corner cases below). The application will run until all jobs are finished. + +[source,shell] +.Run Job from CLI +---- +java -cp myapp-shadow.jar com.agorapulse.worker.runner.JobRunner sample-job other-job +---- + +WARNING: In some corner cases, some unrelated jobs can still be executed if they have a very short delay or frequency if they are manually enabled in the configuration. Please, prefer annotation driven jobs over configuring them manually in the configuration to avoid this issue. + == Management You can use `jobs` management endpoint, by default located at `/jobs`, to see the status of all the jobs in the application. @@ -280,7 +292,7 @@ each job. The name is the lower-camel-case version of the job name, e.g., `sampl include::{root-dir}/libs/micronaut-worker/src/test/resources/com/agorapulse/worker/console/ConsoleSpec/variables.txt[] ---- -TIP: The job variables are instances of `JobAccessor`, which also provides methods `run()` and `enqueue(message)` to let you easily trigger jobs from the console. +TIP: The job variables are instances of `JobAccessor`, which also provides methods `run()` and `enqueue(message)` to let you easily trigger jobs from the console. You can also use method `reconfigure(consumer)` that changes the in-memory configuration of the job and reschedules it. A simple script with just variable `jobs` will print the status of the current job execution. Depending on which console endpoint you choose, you get either a text or JSON summary. @@ -297,13 +309,20 @@ include::{root-dir}/libs/micronaut-worker/src/test/resources/com/agorapulse/work include::{root-dir}/libs/micronaut-worker/src/test/resources/com/agorapulse/worker/console/ConsoleSpec/listJobsResponse.txt[] ---- - [source] .Job Manager Script - JSON Result ---- include::{root-dir}/libs/micronaut-worker/src/test/resources/com/agorapulse/worker/console/ConsoleSpec/listJobsResponse.json[] ---- +Reconfiguring the job will try to change the configuration of the job and reschedule it if it's still enabled. + +[source,options="nowrap"] +.Job Manager Script - Reconfigure +---- +include::{root-dir}/libs/micronaut-worker/src/test/resources/com/agorapulse/worker/console/ConsoleSpec/reconfigure.groovy[] +---- + Returning a job variable from the script will render details for that job. [source,groovy] diff --git a/libs/micronaut-worker-queues-redis/src/main/java/com/agorapulse/worker/queues/redis/RedisQueues.java b/libs/micronaut-worker-queues-redis/src/main/java/com/agorapulse/worker/queues/redis/RedisQueues.java index c7e3806d..81fc3f56 100644 --- a/libs/micronaut-worker-queues-redis/src/main/java/com/agorapulse/worker/queues/redis/RedisQueues.java +++ b/libs/micronaut-worker-queues-redis/src/main/java/com/agorapulse/worker/queues/redis/RedisQueues.java @@ -29,8 +29,10 @@ import io.lettuce.core.support.BoundedPoolConfig; import io.micronaut.core.type.Argument; import io.micronaut.jackson.JacksonConfiguration; +import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; import java.time.Duration; import java.util.List; @@ -61,9 +63,9 @@ public RedisQueues(ObjectMapper objectMapper, RedisClient client, RedisPoolConfi pool = new BoundedAsyncPool<>(new ConnectionFactory(client), config); } - @Override @SuppressWarnings("unchecked") - public void readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument argument, Consumer action) { + @Override + public Publisher readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument argument) { TransactionResult result = withTransaction(redisCommands -> { String key = getKey(queueName); redisCommands.zrange(key, 0, maxNumberOfMessages - 1); @@ -71,7 +73,7 @@ public void readMessages(String queueName, int maxNumberOfMessages, Duration }); if (result == null) { - return; + return Flux.empty(); } @@ -83,14 +85,14 @@ public void readMessages(String queueName, int maxNumberOfMessages, Duration List messages = (List) firstResponse; - messages.forEach(body -> { + return Flux.fromIterable(messages).handle((body, sink) -> { try { - action.accept(objectMapper.readValue(body, JacksonConfiguration.constructType(argument, objectMapper.getTypeFactory()))); + sink.next(objectMapper.readValue(body, JacksonConfiguration.constructType(argument, objectMapper.getTypeFactory()))); } catch (JsonProcessingException e) { if (argument.equalsType(Argument.STRING)) { - action.accept((T) body); + sink.next((T) body); } else { - throw new IllegalArgumentException("Cannot convert to " + argument + "from message\n" + body, e); + sink.error(new IllegalArgumentException("Cannot convert to " + argument + "from message\n" + body, e)); } } }); diff --git a/libs/micronaut-worker-queues-sqs-v1/micronaut-worker-queues-sqs-v1.gradle b/libs/micronaut-worker-queues-sqs-v1/micronaut-worker-queues-sqs-v1.gradle index 88bbdbe9..873316a0 100644 --- a/libs/micronaut-worker-queues-sqs-v1/micronaut-worker-queues-sqs-v1.gradle +++ b/libs/micronaut-worker-queues-sqs-v1/micronaut-worker-queues-sqs-v1.gradle @@ -20,6 +20,7 @@ dependencies { api "com.agorapulse:micronaut-aws-sdk-sqs:$micronautAwsSdkVersion" implementation 'io.micronaut:micronaut-jackson-databind' + implementation 'io.micronaut.reactor:micronaut-reactor' testImplementation project(':micronaut-worker-tck') diff --git a/libs/micronaut-worker-queues-sqs-v1/src/main/java/com/agorapulse/worker/sqs/v1/SqsQueues.java b/libs/micronaut-worker-queues-sqs-v1/src/main/java/com/agorapulse/worker/sqs/v1/SqsQueues.java index e0d452cb..a6cffe05 100644 --- a/libs/micronaut-worker-queues-sqs-v1/src/main/java/com/agorapulse/worker/sqs/v1/SqsQueues.java +++ b/libs/micronaut-worker-queues-sqs-v1/src/main/java/com/agorapulse/worker/sqs/v1/SqsQueues.java @@ -24,11 +24,13 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.micronaut.core.type.Argument; import io.micronaut.jackson.JacksonConfiguration; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import java.time.Duration; import java.util.Arrays; import java.util.Collection; -import java.util.function.Consumer; import java.util.stream.Collectors; public class SqsQueues implements JobQueues { @@ -42,10 +44,10 @@ public SqsQueues(SimpleQueueService simpleQueueService, ObjectMapper objectMappe } @Override - public void readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument argument, Consumer action) { - simpleQueueService.receiveMessages(queueName, maxNumberOfMessages, 0, Math.toIntExact(waitTime.getSeconds())).forEach(m -> { - readMessageInternal(queueName, argument, action, m.getBody(), m.getReceiptHandle(), true); - }); + public Publisher readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument argument) { + return Flux.merge(simpleQueueService.receiveMessages(queueName, maxNumberOfMessages, 0, Math.toIntExact(waitTime.getSeconds())).stream().map(m -> + readMessageInternal(queueName, argument, m.getBody(), m.getReceiptHandle(), true) + ).toList()); } @Override @@ -70,28 +72,27 @@ public void sendRawMessage(String queueName, Object result) { } } - private void readMessageInternal(String queueName, Argument argument, Consumer action, String body, String handle, boolean tryReformat) { + private Mono readMessageInternal(String queueName, Argument argument, String body, String handle, boolean tryReformat) { try { - action.accept(objectMapper.readValue(body, JacksonConfiguration.constructType(argument, objectMapper.getTypeFactory()))); + Mono result = Mono.just(objectMapper.readValue(body, JacksonConfiguration.constructType(argument, objectMapper.getTypeFactory()))); simpleQueueService.deleteMessage(queueName, handle); + return result; } catch (JsonProcessingException e) { if (tryReformat) { if (String.class.isAssignableFrom(argument.getType())) { - action.accept(argument.getType().cast(body)); + Mono result = Mono.just(argument.getType().cast(body)); simpleQueueService.deleteMessage(queueName, handle); - return; + return result; } if (Collection.class.isAssignableFrom(argument.getType())) { if (argument.getTypeParameters().length > 0 && CharSequence.class.isAssignableFrom(argument.getTypeParameters()[0].getType())) { String quoted = Arrays.stream(body.split(",\\s*")).map(s -> "\"" + s + "\"").collect(Collectors.joining(",")); - readMessageInternal(queueName, argument, action, "[" + quoted + "]", handle, false); - return; + return readMessageInternal(queueName, argument, "[" + quoted + "]", handle, false); } - readMessageInternal(queueName, argument, action, "[" + body + "]", handle, false); - return; + return readMessageInternal(queueName, argument, "[" + body + "]", handle, false); } } - throw new IllegalArgumentException("Cannot convert to " + argument + "from message\n" + body, e); + return Mono.error(new IllegalArgumentException("Cannot convert to " + argument + "from message\n" + body, e)); } } } diff --git a/libs/micronaut-worker-queues-sqs-v1/src/test/groovy/com/agorapulse/worker/sqs/v1/SqsQueuesUnitSpec.groovy b/libs/micronaut-worker-queues-sqs-v1/src/test/groovy/com/agorapulse/worker/sqs/v1/SqsQueuesUnitSpec.groovy index df042d92..451fcd74 100644 --- a/libs/micronaut-worker-queues-sqs-v1/src/test/groovy/com/agorapulse/worker/sqs/v1/SqsQueuesUnitSpec.groovy +++ b/libs/micronaut-worker-queues-sqs-v1/src/test/groovy/com/agorapulse/worker/sqs/v1/SqsQueuesUnitSpec.groovy @@ -23,6 +23,7 @@ import com.amazonaws.services.sqs.model.AmazonSQSException import com.amazonaws.services.sqs.model.Message import com.fasterxml.jackson.databind.ObjectMapper import io.micronaut.core.type.Argument +import reactor.core.publisher.Flux import spock.lang.Shared import spock.lang.Specification @@ -42,10 +43,9 @@ class SqsQueuesUnitSpec extends Specification { void 'message is deleted once read'() { when: - List> values = [] - sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.mapOf(String, String)) { - values << it - } + List> values = Flux.from( + sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.mapOf(String, String)) + ).collectList().block() then: values values.size() == 2 @@ -71,10 +71,9 @@ class SqsQueuesUnitSpec extends Specification { void 'can read legacy messages'() { when: - List> values = [] - sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.listOf(Long)) { - values << it - } + List> values = Flux.from( + sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.listOf(Long)) + ).collectList().block() then: values values.size() == 2 @@ -101,10 +100,9 @@ class SqsQueuesUnitSpec extends Specification { void 'can read legacy string messages'() { when: - List> values = [] - sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.listOf(String)) { - values << it - } + List> values = Flux.from( + sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.listOf(String)) + ).collectList().block() then: values values.size() == 2 @@ -131,10 +129,9 @@ class SqsQueuesUnitSpec extends Specification { void 'message not deleted on error'() { when: - List> values = [] - sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.mapOf(String, String)) { - values << it - } + Flux.from( + sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.mapOf(String, String)) + ).collectList().block() then: thrown IllegalArgumentException diff --git a/libs/micronaut-worker-queues-sqs-v2/src/main/java/com/agorapulse/worker/sqs/v2/SqsQueues.java b/libs/micronaut-worker-queues-sqs-v2/src/main/java/com/agorapulse/worker/sqs/v2/SqsQueues.java index 8070ead6..7c6758cb 100644 --- a/libs/micronaut-worker-queues-sqs-v2/src/main/java/com/agorapulse/worker/sqs/v2/SqsQueues.java +++ b/libs/micronaut-worker-queues-sqs-v2/src/main/java/com/agorapulse/worker/sqs/v2/SqsQueues.java @@ -25,12 +25,12 @@ import io.micronaut.jackson.JacksonConfiguration; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import software.amazon.awssdk.services.sqs.model.SqsException; import java.time.Duration; import java.util.Arrays; import java.util.Collection; -import java.util.function.Consumer; import java.util.stream.Collectors; public class SqsQueues implements JobQueues { @@ -44,10 +44,10 @@ public SqsQueues(SimpleQueueService simpleQueueService, ObjectMapper objectMappe } @Override - public void readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument argument, Consumer action) { - simpleQueueService.receiveMessages(queueName, maxNumberOfMessages, 0, Math.toIntExact(waitTime.getSeconds())).forEach(m -> { - readMessageInternal(queueName, argument, action, m.body(), m.receiptHandle(), true); - }); + public Publisher readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument argument) { + return Flux.merge(simpleQueueService.receiveMessages(queueName, maxNumberOfMessages, 0, Math.toIntExact(waitTime.getSeconds())).stream().map(m -> + readMessageInternal(queueName, argument, m.body(), m.receiptHandle(), true) + ).toList()); } @Override @@ -78,28 +78,27 @@ public void sendRawMessages(String queueName, Publisher result) { Flux.from(simpleQueueService.sendMessages(queueName, Flux.from(result).map(String::valueOf))).subscribe(); } - private void readMessageInternal(String queueName, Argument argument, Consumer action, String body, String handle, boolean tryReformat) { + private Mono readMessageInternal(String queueName, Argument argument, String body, String handle, boolean tryReformat) { try { - action.accept(objectMapper.readValue(body, JacksonConfiguration.constructType(argument, objectMapper.getTypeFactory()))); + Mono result = Mono.just(objectMapper.readValue(body, JacksonConfiguration.constructType(argument, objectMapper.getTypeFactory()))); simpleQueueService.deleteMessage(queueName, handle); + return result; } catch (JsonProcessingException e) { if (tryReformat) { if (String.class.isAssignableFrom(argument.getType())) { - action.accept(argument.getType().cast(body)); + Mono result = Mono.just(argument.getType().cast(body)); simpleQueueService.deleteMessage(queueName, handle); - return; + return result; } if (Collection.class.isAssignableFrom(argument.getType())) { if (argument.getTypeParameters().length > 0 && CharSequence.class.isAssignableFrom(argument.getTypeParameters()[0].getType())) { String quoted = Arrays.stream(body.split(",\\s*")).map(s -> "\"" + s + "\"").collect(Collectors.joining(",")); - readMessageInternal(queueName, argument, action, "[" + quoted + "]", handle, false); - return; + return readMessageInternal(queueName, argument, "[" + quoted + "]", handle, false); } - readMessageInternal(queueName, argument, action, "[" + body + "]", handle, false); - return; + return readMessageInternal(queueName, argument, "[" + body + "]", handle, false); } } - throw new IllegalArgumentException("Cannot convert to " + argument + "from message\n" + body, e); + return Mono.error(new IllegalArgumentException("Cannot convert to " + argument + "from message\n" + body, e)); } } diff --git a/libs/micronaut-worker-queues-sqs-v2/src/test/groovy/com/agorapulse/worker/sqs/v2/SqsQueuesUnitSpec.groovy b/libs/micronaut-worker-queues-sqs-v2/src/test/groovy/com/agorapulse/worker/sqs/v2/SqsQueuesUnitSpec.groovy index a916ef55..8bf6a9a7 100644 --- a/libs/micronaut-worker-queues-sqs-v2/src/test/groovy/com/agorapulse/worker/sqs/v2/SqsQueuesUnitSpec.groovy +++ b/libs/micronaut-worker-queues-sqs-v2/src/test/groovy/com/agorapulse/worker/sqs/v2/SqsQueuesUnitSpec.groovy @@ -21,6 +21,7 @@ import com.agorapulse.micronaut.amazon.awssdk.sqs.SimpleQueueService import com.agorapulse.worker.queue.JobQueues import com.fasterxml.jackson.databind.ObjectMapper import io.micronaut.core.type.Argument +import reactor.core.publisher.Flux import software.amazon.awssdk.services.sqs.model.Message import software.amazon.awssdk.services.sqs.model.SqsException import spock.lang.Shared @@ -42,10 +43,9 @@ class SqsQueuesUnitSpec extends Specification { void 'message is deleted once read'() { when: - List> values = [] - sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.mapOf(String, String)) { - values << it - } + List> values = Flux.from( + sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.mapOf(String, String)) + ).collectList().block() then: values values.size() == 2 @@ -71,10 +71,9 @@ class SqsQueuesUnitSpec extends Specification { void 'can read legacy messages'() { when: - List> values = [] - sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.listOf(Long)) { - values << it - } + List> values = Flux.from( + sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.listOf(Long)) + ).collectList().block() then: values values.size() == 2 @@ -101,10 +100,9 @@ class SqsQueuesUnitSpec extends Specification { void 'can read legacy string messages'() { when: - List> values = [] - sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.listOf(String)) { - values << it - } + List> values = Flux.from( + sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.listOf(String)) + ).collectList().block() then: values values.size() == 2 @@ -131,10 +129,9 @@ class SqsQueuesUnitSpec extends Specification { void 'message not deleted on error'() { when: - List> values = [] - sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.mapOf(String, String)) { - values << it - } + Flux.from( + sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.mapOf(String, String)) + ).collectList().block() then: thrown IllegalArgumentException diff --git a/libs/micronaut-worker-runner/micronaut-worker-runner.gradle b/libs/micronaut-worker-runner/micronaut-worker-runner.gradle new file mode 100644 index 00000000..43220ca2 --- /dev/null +++ b/libs/micronaut-worker-runner/micronaut-worker-runner.gradle @@ -0,0 +1,26 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * Copyright 2021-2024 Agorapulse. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +dependencies { + api project(':micronaut-worker') + api 'io.micronaut:micronaut-function' + + implementation 'io.micronaut.reactor:micronaut-reactor' + + // verify everything works as expected even with server environment present + testImplementation 'io.micronaut:micronaut-http-server-netty' +} diff --git a/libs/micronaut-worker-runner/src/main/java/com/agorapulse/worker/runner/JobRunner.java b/libs/micronaut-worker-runner/src/main/java/com/agorapulse/worker/runner/JobRunner.java new file mode 100644 index 00000000..cc04a93b --- /dev/null +++ b/libs/micronaut-worker-runner/src/main/java/com/agorapulse/worker/runner/JobRunner.java @@ -0,0 +1,157 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * Copyright 2021-2024 Agorapulse. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.agorapulse.worker.runner; + +import com.agorapulse.worker.Job; +import com.agorapulse.worker.JobManager; +import com.agorapulse.worker.report.JobReport; +import io.micronaut.context.ApplicationContext; +import io.micronaut.context.ApplicationContextBuilder; +import io.micronaut.core.annotation.NonNull; +import io.micronaut.core.cli.CommandLine; +import io.micronaut.function.executor.FunctionInitializer; +import jakarta.inject.Inject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.Optional; + +@SuppressWarnings("java:S6813") +public class JobRunner extends FunctionInitializer { + + private static final Logger LOGGER = LoggerFactory.getLogger(JobRunner.class); + + public static void main(String[] args) throws IOException { + try (JobRunner runner = new JobRunner()) { + runner.run(args); + } + } + + @Inject + private JobManager jobManager; + + public JobRunner() { + super(); + } + + public JobRunner(ApplicationContext applicationContext) { + super(applicationContext); + } + + public JobRunner(ApplicationContext applicationContext, boolean inject) { + super(applicationContext, inject); + } + + public void run(String[] args) throws IOException { + CommandLine cli = CommandLine.build().parse(args); + + run(args, ignored -> { + if (!run(cli.getRemainingArgs())) { + throw new IllegalStateException("Error running jobs! See the logs for more details."); + } + return true; + }); + } + + @Override + protected @NonNull ApplicationContextBuilder newApplicationContextBuilder() { + return super.newApplicationContextBuilder().environments("job"); + } + + private boolean run(List jobNames) { + if (jobNames.isEmpty()) { + LOGGER.error("No job name provided"); + return false; + } + + for (String jobName : jobManager.getJobNames()) { + if (!jobNames.contains(jobName)) { + jobManager.getJob(jobName).filter(job -> job.getConfiguration().isEnabled()).ifPresent(job -> { + LOGGER.warn("Job '{}' is not in the list of jobs to run, but it is enabled. Disabling it.", jobName); + jobManager.reconfigure(jobName, c -> c.setEnabled(false)); + }); + } + } + + boolean result = true; + + for (String jobName : jobNames) { + try { + Optional optionalJob = jobManager.getJob(jobName); + + if (optionalJob.isEmpty()) { + LOGGER.error("Job '{}' not found", jobName); + continue; + } + + Job job = optionalJob.get(); + + job.forceRun(); + } catch (Exception e) { + LOGGER.error("Error running job '{}'", jobName, e); + + result = false; + } + } + + waitUntilAllJobsAreFinished(jobNames); + + for (String jobName : jobNames) { + Optional optionalJob = jobManager.getJob(jobName); + + if (optionalJob.isPresent()) { + Job job = optionalJob.get(); + + if (LOGGER.isInfoEnabled()) { + LOGGER.info("Job '{}' executed in {}", jobName, JobReport.humanReadableFormat(job.getStatus().getLastDuration())); + } + + if (job.getStatus().getLastException() != null) { + // the exception is already logged + result = false; + } + } + } + + return result; + } + + private void waitUntilAllJobsAreFinished(List jobNames) { + Flux.fromIterable(jobNames) + .map(jobManager::getJob) + .filter(Optional::isPresent) + .map(Optional::get) + .flatMap(JobRunner::waitUntilFinished) + .blockLast(); + } + + private static Mono waitUntilFinished(Job job) { + return Mono.defer(() -> { + if (job.getStatus().getLastDuration() != null) { + return Mono.empty(); + } + return Mono.delay(Duration.ofMillis(100)).then(waitUntilFinished(job)); + }); + } + +} diff --git a/libs/micronaut-worker-runner/src/test/groovy/com/agorapulse/worker/runner/JobExecutionRecorder.java b/libs/micronaut-worker-runner/src/test/groovy/com/agorapulse/worker/runner/JobExecutionRecorder.java new file mode 100644 index 00000000..1460cd36 --- /dev/null +++ b/libs/micronaut-worker-runner/src/test/groovy/com/agorapulse/worker/runner/JobExecutionRecorder.java @@ -0,0 +1,68 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * Copyright 2021-2024 Agorapulse. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.agorapulse.worker.runner; + +import com.agorapulse.worker.event.JobExecutionFinishedEvent; +import com.agorapulse.worker.event.JobExecutionResultEvent; +import com.agorapulse.worker.event.JobExecutionStartedEvent; +import io.micronaut.runtime.event.annotation.EventListener; +import jakarta.inject.Singleton; + +import java.util.ArrayList; +import java.util.List; + +@Singleton +public class JobExecutionRecorder { + + private final List startedEvents = new ArrayList<>(); + private final List finishedEvents = new ArrayList<>(); + private final List resultEvents = new ArrayList<>(); + + @EventListener + public void onJobStarted(JobExecutionStartedEvent event) { + startedEvents.add(event); + } + + @EventListener + public void onJobFinished(JobExecutionFinishedEvent event) { + finishedEvents.add(event); + } + + @EventListener + public void onJobResult(JobExecutionResultEvent event) { + resultEvents.add(event); + } + + public final List getStartedEvents() { + return List.copyOf(startedEvents); + } + + public final List getFinishedEvents() { + return List.copyOf(finishedEvents); + } + + public final List getResultEvents() { + return List.copyOf(resultEvents); + } + + @Override + public String toString() { + return "JobExecutionRecorder{startedEvents=%s, finishedEvents=%s, resultEvents=%s}".formatted(startedEvents, finishedEvents, resultEvents); + } + +} diff --git a/libs/micronaut-worker-runner/src/test/groovy/com/agorapulse/worker/runner/JobRunnerSpec.groovy b/libs/micronaut-worker-runner/src/test/groovy/com/agorapulse/worker/runner/JobRunnerSpec.groovy new file mode 100644 index 00000000..851053d6 --- /dev/null +++ b/libs/micronaut-worker-runner/src/test/groovy/com/agorapulse/worker/runner/JobRunnerSpec.groovy @@ -0,0 +1,123 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * Copyright 2021-2024 Agorapulse. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.agorapulse.worker.runner + +import com.agorapulse.worker.JobManager +import io.micronaut.context.ApplicationContext +import io.micronaut.context.annotation.Property +import io.micronaut.test.extensions.spock.annotation.MicronautTest +import jakarta.inject.Inject +import spock.lang.Specification + +@MicronautTest(rebuildContext = true) +class JobRunnerSpec extends Specification { + + @Inject ApplicationContext context + @Inject TestFunctionExitHandler exitHandler + @Inject JobExecutionRecorder recorder + @Inject JobManager jobManager + + void 'single job is executed'() { + when: + JobRunner runner = new JobRunner(context) + runner.run('test-job-one') + then: + 'test-job-one' in recorder.finishedEvents*.name + + exitHandler.success + } + + void 'runner waits until all events are generated'() { + when: + JobRunner runner = new JobRunner(context) + runner.run('test-job-two') + then: + 'test-job-two' in recorder.finishedEvents*.name + + recorder.resultEvents.any { result -> result.name == 'test-job-two' && result.result == 'foo' } + + exitHandler.success + } + + void 'job failure is propagated'() { + when: + JobRunner runner = new JobRunner(context) + runner.run('test-job-three') + then: + 'test-job-three' in recorder.finishedEvents*.name + + !exitHandler.success + exitHandler.error instanceof IllegalStateException + } + + void 'job generation failure is propagated'() { + when: + JobRunner runner = new JobRunner(context) + runner.run('test-job-four') + then: + 'test-job-four' in recorder.finishedEvents*.name + + !exitHandler.success + exitHandler.error instanceof IllegalStateException + } + + void 'consumer job is executed'() { + when: + JobRunner runner = new JobRunner(context) + jobManager.enqueue('test-job-five', 'foo') + runner.run('test-job-five') + then: + 'test-job-five' in recorder.finishedEvents*.name + + recorder.startedEvents.any { start -> start.name == 'test-job-five' && start.message.orElse(null) == 'foo' } + + exitHandler.success + } + + void 'pipe job waits until all messages are produced'() { + when: + JobRunner runner = new JobRunner(context) + jobManager.enqueue('test-job-six', 'Foo') + runner.run('test-job-six') + then: + 'test-job-six' in recorder.finishedEvents*.name + + recorder.startedEvents.any { start -> start.name == 'test-job-six' && start.message.orElse(null) == 'Foo' } + recorder.resultEvents.any { result -> result.name == 'test-job-six' && result.result == 'FOO' } + recorder.resultEvents.any { result -> result.name == 'test-job-six' && result.result == 'foo' } + + exitHandler.success + } + + @Property(name = 'worker.jobs.test-job-three.enabled', value = 'true') + @Property(name = 'worker.jobs.test-job-three.initial-delay', value = '20ms') + void 'only selected jobs are executed ignoring the enabled setting'() { + when: + JobRunner runner = new JobRunner(context) + runner.run('test-job-one', 'test-job-two') + and: + Thread.sleep(30) + then: + 'test-job-one' in recorder.finishedEvents*.name + 'test-job-two' in recorder.finishedEvents*.name + 'test-job-three' !in recorder.finishedEvents*.name + + exitHandler.success + } + +} diff --git a/libs/micronaut-worker-runner/src/test/groovy/com/agorapulse/worker/runner/TestFunctionExitHandler.java b/libs/micronaut-worker-runner/src/test/groovy/com/agorapulse/worker/runner/TestFunctionExitHandler.java new file mode 100644 index 00000000..697947f9 --- /dev/null +++ b/libs/micronaut-worker-runner/src/test/groovy/com/agorapulse/worker/runner/TestFunctionExitHandler.java @@ -0,0 +1,55 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * Copyright 2021-2024 Agorapulse. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.agorapulse.worker.runner; + +import io.micronaut.context.annotation.Primary; +import io.micronaut.function.executor.FunctionExitHandler; +import jakarta.inject.Singleton; + +@Primary +@Singleton +public class TestFunctionExitHandler implements FunctionExitHandler { + + private boolean success; + private Exception error; + + @Override + public void exitWithError(Exception error, boolean debug) { + this.error = error; + } + + @Override + public void exitWithSuccess() { + success = true; + } + + @Override + public void exitWithNoData() { + success = true; + } + + public boolean isSuccess() { + return success; + } + + public Exception getError() { + return error; + } + + +} diff --git a/libs/micronaut-worker-runner/src/test/groovy/com/agorapulse/worker/runner/TestJob.java b/libs/micronaut-worker-runner/src/test/groovy/com/agorapulse/worker/runner/TestJob.java new file mode 100644 index 00000000..2cbf70af --- /dev/null +++ b/libs/micronaut-worker-runner/src/test/groovy/com/agorapulse/worker/runner/TestJob.java @@ -0,0 +1,61 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * Copyright 2021-2024 Agorapulse. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.agorapulse.worker.runner; + +import com.agorapulse.worker.annotation.Job; +import jakarta.inject.Singleton; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.Duration; +import java.util.Locale; + +@Singleton +public class TestJob { + + @Job("test-job-one") + public void recordingJobOne() { + // do nothing + } + + @Job("test-job-two") + public Flux recordingJobTwo() { + return Flux.from(Mono.delay(Duration.ofMillis(300)).map(ignore -> "foo")); + } + + @Job("test-job-three") + public void recordingJobThree() { + throw new UnsupportedOperationException("This job is supposed to fail"); + } + + @Job("test-job-four") + public Flux recordingJobFour() { + return Flux.error(new UnsupportedOperationException("This job is supposed to fail")); + } + + @Job("test-job-five") + public void recordingJobFive(String ignored) { + // do nothing + } + + @Job("test-job-six") + public Flux recordingJobSix(String input) { + return Flux.just(input.toUpperCase(Locale.ROOT), input.toLowerCase(Locale.ROOT)); + } + +} diff --git a/libs/micronaut-worker-runner/src/test/resources/log4j2-test.xml b/libs/micronaut-worker-runner/src/test/resources/log4j2-test.xml new file mode 100644 index 00000000..7449703d --- /dev/null +++ b/libs/micronaut-worker-runner/src/test/resources/log4j2-test.xml @@ -0,0 +1,35 @@ + + + + + + + + + + + + + + + + + diff --git a/libs/micronaut-worker-tck/src/main/groovy/com/agorapulse/worker/tck/queue/AbstractQueuesSpec.groovy b/libs/micronaut-worker-tck/src/main/groovy/com/agorapulse/worker/tck/queue/AbstractQueuesSpec.groovy index 5e83dbb8..a4fd29b8 100644 --- a/libs/micronaut-worker-tck/src/main/groovy/com/agorapulse/worker/tck/queue/AbstractQueuesSpec.groovy +++ b/libs/micronaut-worker-tck/src/main/groovy/com/agorapulse/worker/tck/queue/AbstractQueuesSpec.groovy @@ -22,6 +22,7 @@ import io.micronaut.context.ApplicationContext import io.micronaut.core.async.publisher.Publishers import io.micronaut.core.type.Argument import io.micronaut.inject.qualifiers.Qualifiers +import reactor.core.publisher.Flux import spock.lang.Specification import jakarta.inject.Inject @@ -55,14 +56,11 @@ abstract class AbstractQueuesSpec extends Specification { void 'can send raw messages to queue'() { given: JobQueues queues = context.getBean(JobQueues, Qualifiers.byName(name)) - List messages = [] when: queues.sendRawMessage('foo', 'one') queues.sendRawMessages('foo', Publishers.just('two')) and: - queues.readMessages('foo', 2, Duration.ofSeconds(1), Argument.STRING) { message -> - messages << message - } + List messages = Flux.from(queues.readMessages('foo', 2, Duration.ofSeconds(1), Argument.STRING)).collectList().block() then: messages == ['one', 'two'] } diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/Job.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/Job.java index ebc5312a..b67a66b7 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/Job.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/Job.java @@ -17,8 +17,11 @@ */ package com.agorapulse.worker; +import com.agorapulse.worker.configuration.MutableJobConfiguration; import com.agorapulse.worker.job.SimpleJob; +import java.util.function.Consumer; + /** * Job is a {@link Runnable} with a name. */ @@ -30,4 +33,10 @@ static Job create(JobConfiguration configuration, Runnable task) { void forceRun(); + /** + * Allows changing the configuration of the job if possible only for the lifetime of the application. + * @param configuration the configuration to be changed + */ + void configure(Consumer configuration); + } diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/JobManager.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/JobManager.java index 47e68c43..456ca5c8 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/JobManager.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/JobManager.java @@ -17,6 +17,7 @@ */ package com.agorapulse.worker; +import com.agorapulse.worker.configuration.MutableJobConfiguration; import io.micronaut.core.naming.NameUtils; import java.util.Optional; @@ -96,4 +97,12 @@ default void forceRun(Class jobClass, String methodName) { default void enqueue(Class> jobClass, T message) { enqueue(getDefaultJobName(jobClass), message); } + + /** + * Reconfigures the job with the given name if possible and reschedules it + * + * @param jobName the name of the job + * @param configuration the configuration to be applied + */ + void reconfigure(String jobName, Consumer configuration); } diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/JobRunStatus.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/JobRunStatus.java index e8d233f3..eeddc238 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/JobRunStatus.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/JobRunStatus.java @@ -19,6 +19,7 @@ import com.agorapulse.worker.job.DefaultJobRunStatus; import com.agorapulse.worker.json.DurationSerializer; +import com.agorapulse.worker.report.JobReport; import com.fasterxml.jackson.databind.annotation.JsonSerialize; import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; @@ -45,6 +46,10 @@ default Duration getDuration() { return Duration.between(started, finished); } + default String getHumanReadableDuration() { + return JobReport.humanReadableFormat(getDuration()); + } + default long getDurationMillis() { return getDuration().toMillis(); } diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/Cron.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/Cron.java index 0b21d7b2..b84b68ab 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/Cron.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/Cron.java @@ -19,6 +19,7 @@ import io.micronaut.context.annotation.AliasFor; import io.micronaut.scheduling.TaskExecutors; +import jakarta.inject.Named; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; @@ -40,6 +41,7 @@ String value(); @AliasFor(annotation = Job.class, member = "value") + @AliasFor(annotation = Named.class, member = "value") String name() default ""; /** diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/FixedDelay.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/FixedDelay.java index 890bb940..896e40c0 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/FixedDelay.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/FixedDelay.java @@ -19,6 +19,7 @@ import io.micronaut.context.annotation.AliasFor; import io.micronaut.scheduling.TaskExecutors; +import jakarta.inject.Named; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; @@ -40,6 +41,7 @@ String value(); @AliasFor(annotation = Job.class, member = "value") + @AliasFor(annotation = Named.class, member = "value") String name() default ""; /** diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/FixedRate.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/FixedRate.java index d343746f..cc0e480c 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/FixedRate.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/FixedRate.java @@ -19,6 +19,7 @@ import io.micronaut.context.annotation.AliasFor; import io.micronaut.scheduling.TaskExecutors; +import jakarta.inject.Named; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; @@ -40,6 +41,7 @@ String value(); @AliasFor(annotation = Job.class, member = "value") + @AliasFor(annotation = Named.class, member = "value") String name() default ""; /** diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/InitialDelay.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/InitialDelay.java index 4bef97eb..07acfb05 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/InitialDelay.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/InitialDelay.java @@ -19,6 +19,7 @@ import io.micronaut.context.annotation.AliasFor; import io.micronaut.scheduling.TaskExecutors; +import jakarta.inject.Named; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; @@ -40,6 +41,7 @@ String value(); @AliasFor(annotation = Job.class, member = "value") + @AliasFor(annotation = Named.class, member = "value") String name() default ""; /** diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/Job.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/Job.java index b282d583..bcf712eb 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/Job.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/annotation/Job.java @@ -17,9 +17,12 @@ */ package com.agorapulse.worker.annotation; +import io.micronaut.context.annotation.AliasFor; import io.micronaut.context.annotation.Executable; import io.micronaut.context.annotation.Parallel; +import io.micronaut.core.annotation.EntryPoint; import io.micronaut.scheduling.TaskExecutors; +import jakarta.inject.Named; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; @@ -36,6 +39,7 @@ @Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE}) @Executable(processOnStartup = true) @Parallel +@EntryPoint public @interface Job { /** @@ -46,6 +50,7 @@ * * @return the name of the job used for configuration */ + @AliasFor(annotation = Named.class, member = "value") String value() default ""; diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/console/ConsoleJobManager.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/console/ConsoleJobManager.java index c2ea5f25..b338f1ea 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/console/ConsoleJobManager.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/console/ConsoleJobManager.java @@ -19,11 +19,13 @@ import com.agorapulse.worker.Job; import com.agorapulse.worker.JobManager; +import com.agorapulse.worker.configuration.MutableJobConfiguration; import com.fasterxml.jackson.annotation.JsonValue; import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.function.Consumer; import java.util.stream.Collectors; public class ConsoleJobManager implements JobManager { @@ -64,4 +66,9 @@ List getJobs() { return getJobNames().stream().map(name -> new JobAccessor(name, this)).collect(Collectors.toList()); } + @Override + public void reconfigure(String jobName, Consumer configuration) { + delegate.reconfigure(jobName, configuration); + } + } diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/console/JobAccessor.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/console/JobAccessor.java index 9df73c54..3147fd6e 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/console/JobAccessor.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/console/JobAccessor.java @@ -17,11 +17,17 @@ */ package com.agorapulse.worker.console; -import com.agorapulse.worker.*; +import com.agorapulse.worker.Job; +import com.agorapulse.worker.JobConfiguration; +import com.agorapulse.worker.JobInfo; +import com.agorapulse.worker.JobManager; +import com.agorapulse.worker.JobStatus; +import com.agorapulse.worker.configuration.MutableJobConfiguration; import com.agorapulse.worker.report.JobReport; import java.util.Collections; import java.util.Optional; +import java.util.function.Consumer; public class JobAccessor implements JobInfo { @@ -41,6 +47,11 @@ public void enqueue(Object o) { jobManager.enqueue(jobName, o); } + public JobAccessor reconfigure(Consumer configurator) { + jobManager.reconfigure(jobName, configurator); + return this; + } + @Override public String toString() { return JobReport.report(jobManager, Collections.singleton(jobName)); diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/event/JobEventsLogger.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/event/JobEventsLogger.java index 42526ce5..9dc4a442 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/event/JobEventsLogger.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/event/JobEventsLogger.java @@ -24,6 +24,8 @@ import jakarta.inject.Singleton; +import java.util.Optional; + @Singleton public class JobEventsLogger { @@ -32,8 +34,9 @@ public class JobEventsLogger { @EventListener void onJobExecutionStarted(JobExecutionStartedEvent event) { if (LOGGER.isDebugEnabled()) { - if (event.getMessage().isPresent()) { - LOGGER.debug("Starting job {}#{} with message {}", event.getName(), event.getId(), event.getMessage().get()); + Optional message = event.getMessage(); + if (message.isPresent()) { + LOGGER.debug("Starting job {}#{} with message {}", event.getName(), event.getId(), message.get()); } else { LOGGER.debug("Starting job {}#{}", event.getName(), event.getId()); } @@ -55,7 +58,7 @@ void onJobExecutionResult(JobExecutionResultEvent event) { @EventListener void onJobExecutionFinished(JobExecutionFinishedEvent event) { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Finished executing job {}#{} in {} (some results can still be generated asynchronously later)", event.getName(), event.getStatus().getId(), event.getStatus().getDuration()); + LOGGER.debug("Finished executing job {}#{} in {}", event.getName(), event.getStatus().getId(), event.getStatus().getHumanReadableDuration()); } } diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/event/JobExecutionFinishedEvent.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/event/JobExecutionFinishedEvent.java index a7fc1828..998db6bc 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/event/JobExecutionFinishedEvent.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/event/JobExecutionFinishedEvent.java @@ -42,4 +42,8 @@ public JobRunStatus getStatus() { return status; } + @Override + public String toString() { + return "JobExecutionFinishedEvent{name='%s', status=%s}".formatted(name, status); + } } diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/event/JobExecutionResultEvent.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/event/JobExecutionResultEvent.java index d08227b0..ab379868 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/event/JobExecutionResultEvent.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/event/JobExecutionResultEvent.java @@ -46,4 +46,9 @@ public String getId() { public String getName() { return name; } + + @Override + public String toString() { + return "JobExecutionResultEvent{name='%s', id='%s', result=%s}".formatted(name, id, result); + } } diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/event/JobExecutionStartedEvent.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/event/JobExecutionStartedEvent.java index 12283305..a0b33788 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/event/JobExecutionStartedEvent.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/event/JobExecutionStartedEvent.java @@ -55,4 +55,9 @@ public Optional getMessage() { return Optional.ofNullable(message); } + @Override + public String toString() { + return "JobExecutionStartedEvent{name='%s', id='%s', message=%s}".formatted(name, id, message); + } + } diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/groovy/WorkerExtensions.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/groovy/WorkerExtensions.java index 722b6f24..f4354f9a 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/groovy/WorkerExtensions.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/groovy/WorkerExtensions.java @@ -127,4 +127,23 @@ public static Job register( return job; } + public static void reconfigure( + JobManager self, + String name, + @DelegatesTo(value = MutableJobConfiguration.class, strategy = Closure.DELEGATE_FIRST) + @ClosureParams(value = SimpleType.class, options = "com.agorapulse.worker.configuration.MutableJobConfiguration") + Closure configurator + ) { + self.reconfigure(name, ConsumerWithDelegate.create(configurator)); + } + + public static void reconfigure( + JobAccessor self, + @DelegatesTo(value = MutableJobConfiguration.class, strategy = Closure.DELEGATE_FIRST) + @ClosureParams(value = SimpleType.class, options = "com.agorapulse.worker.configuration.MutableJobConfiguration") + Closure configurator + ) { + self.reconfigure(ConsumerWithDelegate.create(configurator)); + } + } diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/job/AbstractJob.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/job/AbstractJob.java index aa264eb3..b3cc2613 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/job/AbstractJob.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/job/AbstractJob.java @@ -17,14 +17,17 @@ */ package com.agorapulse.worker.job; -import com.agorapulse.worker.Job; import com.agorapulse.worker.JobConfiguration; import com.agorapulse.worker.JobStatus; +import com.agorapulse.worker.configuration.MutableJobConfiguration; -public abstract class AbstractJob implements Job { +import java.util.function.Consumer; + +public abstract class AbstractJob implements MutableCancelableJob { private final JobConfiguration configuration; private final ConcurrentJobStatus status; + private Runnable cancelAction = () -> {}; protected AbstractJob(JobConfiguration configuration) { this.configuration = configuration; @@ -53,5 +56,34 @@ public final void forceRun() { status.run(this::doRun); } + @Override + public void configure(Consumer change) { + if (configuration instanceof MutableJobConfiguration c) { + change.accept(c); + } else { + throw new IllegalStateException("The configuration of the job is not mutable!"); + } + } + + @Override + public String getSource() { + return ""; + } + protected abstract void doRun(JobRunContext context); + + @Override + public void cancelAction(Runnable runnable) { + Runnable current = this.cancelAction; + this.cancelAction = () -> { + current.run(); + runnable.run(); + }; + } + + @Override + public void cancel() { + cancelAction.run(); + } + } diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/job/CancelableJob.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/job/CancelableJob.java new file mode 100644 index 00000000..7bf4d0bd --- /dev/null +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/job/CancelableJob.java @@ -0,0 +1,26 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * Copyright 2021-2024 Agorapulse. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.agorapulse.worker.job; + +import com.agorapulse.worker.Job; + +public interface CancelableJob extends Job { + + void cancel(); + +} diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/job/ConcurrentJobStatus.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/job/ConcurrentJobStatus.java index c9c835e6..4ec79222 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/job/ConcurrentJobStatus.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/job/ConcurrentJobStatus.java @@ -17,6 +17,7 @@ */ package com.agorapulse.worker.job; +import com.agorapulse.worker.JobRunStatus; import com.agorapulse.worker.JobStatus; import com.agorapulse.worker.json.DurationSerializer; import com.agorapulse.worker.json.StacktraceSerializer; @@ -91,22 +92,25 @@ public void run(Consumer doRun) { executionCount.incrementAndGet(); lastTriggered.set(status.getStarted()); - JobRunContext context = JobRunContext.create(status).onError((s, ex) -> { - lastException.set(ex); - status.fail(ex); - }); - - try { - doRun.accept(context); - } finally { - status.finish(); - executionCount.decrementAndGet(); - lastFinished.set(status.getFinished()); - lastDuration.set(status.getDuration()); - lastId.set(status.getId()); - } - - context.finished(status); + JobRunContext context = JobRunContext.create(status) + .onFinished(s -> { + status.finish(); + recordLastStatus(s); + }) + .onError((s, ex) -> { + status.fail(ex); + lastException.set(ex); + recordLastStatus(s); + }); + + doRun.accept(context); + } + + private void recordLastStatus(JobRunStatus s) { + lastFinished.set(s.getFinished()); + lastDuration.set(s.getDuration()); + lastId.set(s.getId()); + executionCount.decrementAndGet(); } } diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/job/DefaultJobRunStatus.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/job/DefaultJobRunStatus.java index 3684fa2c..f807d13d 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/job/DefaultJobRunStatus.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/job/DefaultJobRunStatus.java @@ -82,4 +82,8 @@ public void fail(Throwable exception) { this.exception = exception; } + @Override + public String toString() { + return "DefaultJobRunStatus{id='%s', name='%s', started=%s, finished=%s, exception=%s}".formatted(id, name, started, finished, exception); + } } diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/job/JobRunContext.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/job/JobRunContext.java index e8e3d67a..f177ccb3 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/job/JobRunContext.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/job/JobRunContext.java @@ -67,7 +67,7 @@ public void error(Throwable error) { onError.accept(status, error); } - public void finished(JobRunStatus status) { + public void finished() { onFinished.accept(status); } diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/job/MutableCancelableJob.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/job/MutableCancelableJob.java new file mode 100644 index 00000000..ab209d8b --- /dev/null +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/job/MutableCancelableJob.java @@ -0,0 +1,28 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * Copyright 2021-2024 Agorapulse. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.agorapulse.worker.job; + +public interface MutableCancelableJob extends CancelableJob { + + /** + * Updates the action being called when the job is canceled. + * @param runnable the action to be called when the job is canceled + */ + void cancelAction(Runnable runnable); + +} diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/job/SimpleJob.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/job/SimpleJob.java index f4549ba3..fc9f496d 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/job/SimpleJob.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/job/SimpleJob.java @@ -37,6 +37,7 @@ public String getSource() { protected void doRun(JobRunContext context) { try { task.run(); + context.finished(); } catch (Throwable th) { context.error(th); } diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/local/LocalQueues.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/local/LocalQueues.java index 1103b3ed..fc68554a 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/local/LocalQueues.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/local/LocalQueues.java @@ -22,15 +22,17 @@ import io.micronaut.context.annotation.Secondary; import io.micronaut.context.env.Environment; import io.micronaut.core.type.Argument; - import jakarta.inject.Named; import jakarta.inject.Singleton; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; + import java.time.Duration; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentMap; -import java.util.function.Consumer; @Secondary @Singleton @@ -46,15 +48,19 @@ public LocalQueues(Environment environment) { } @Override - public void readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument argument, Consumer action) { + public Publisher readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument argument) { ConcurrentLinkedDeque objects = queues.get(queueName); if (objects == null) { - return; + return Flux.empty(); } + List results = new ArrayList<>(); + for (int i = 0; i < maxNumberOfMessages && !objects.isEmpty(); i++) { - action.accept(environment.convertRequired(objects.removeFirst(), argument)); + results.add(environment.convertRequired(objects.removeFirst(), argument)); } + + return Flux.fromIterable(results); } @Override diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/manager/DefaultJobManager.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/manager/DefaultJobManager.java index 407ae734..35d6599a 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/manager/DefaultJobManager.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/manager/DefaultJobManager.java @@ -20,6 +20,9 @@ import com.agorapulse.worker.Job; import com.agorapulse.worker.JobConfiguration; import com.agorapulse.worker.JobManager; +import com.agorapulse.worker.JobScheduler; +import com.agorapulse.worker.configuration.MutableJobConfiguration; +import com.agorapulse.worker.job.CancelableJob; import com.agorapulse.worker.queue.JobQueues; import com.agorapulse.worker.report.JobReport; import io.micronaut.context.BeanContext; @@ -32,6 +35,7 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.function.Consumer; @Singleton public class DefaultJobManager implements JobManager { @@ -39,9 +43,11 @@ public class DefaultJobManager implements JobManager { private final ConcurrentMap tasks = new ConcurrentHashMap<>(); private final BeanContext beanContext; + private final JobScheduler jobScheduler; - public DefaultJobManager(List tasksFromContext, BeanContext beanContext) { + public DefaultJobManager(List tasksFromContext, BeanContext beanContext, JobScheduler jobScheduler) { this.beanContext = beanContext; + this.jobScheduler = jobScheduler; tasksFromContext.forEach(this::registerInternal); } @@ -84,4 +90,18 @@ public void enqueue(String jobName, Object payload) { } ); } + + @Override + public void reconfigure(String jobName, Consumer configuration) { + getJob(jobName).ifPresent(job -> { + job.configure(configuration); + if (job instanceof CancelableJob cj) { + cj.cancel(); + if (job.getConfiguration().isEnabled()) { + jobScheduler.schedule(cj); + } + } + }); + } + } diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/processor/DefaultMethodJobInvoker.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/processor/DefaultMethodJobInvoker.java index 3ea87a0d..d549943b 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/processor/DefaultMethodJobInvoker.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/processor/DefaultMethodJobInvoker.java @@ -26,11 +26,10 @@ import io.micronaut.context.BeanContext; import io.micronaut.inject.ExecutableMethod; import io.micronaut.inject.qualifiers.Qualifiers; +import jakarta.inject.Singleton; import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import jakarta.inject.Singleton; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -55,7 +54,6 @@ public DefaultMethodJobInvoker( this.distributedJobExecutor = distributedJobExecutor; } - @SuppressWarnings("unchecked") public void invoke(MethodJob job, B bean, JobRunContext callback) { ExecutableMethod method = job.getMethod(); JobConfiguration configuration = job.getConfiguration(); @@ -78,20 +76,22 @@ public void invoke(MethodJob job, B bean, JobRunContext callback) { Function, Publisher> executor = executor(configuration.getName(), leaderOnly, followerOnly, concurrency); if (method.getArguments().length == 0) { - handleResult(configuration, callback, executor.apply(() -> method.invoke(bean))); callback.message(null); + handleResult(configuration, callback, executor.apply(() -> method.invoke(bean))); } else if (method.getArguments().length == 1) { JobConfiguration.ConsumerQueueConfiguration queueConfiguration = configuration.getConsumer(); - queues(queueConfiguration.getQueueType()).readMessages( - queueConfiguration.getQueueName(), - queueConfiguration.getMaxMessages() < 1 ? 1 : queueConfiguration.getMaxMessages(), - Optional.ofNullable(queueConfiguration.getWaitingTime()).orElse(Duration.ZERO), - method.getArguments()[0], - message -> { - callback.message(message); - handleResult(configuration, callback, executor.apply(() -> method.invoke(bean, message))); - } - ); + Publisher results = Flux.from( + queues(queueConfiguration.getQueueType()).readMessages( + queueConfiguration.getQueueName(), + queueConfiguration.getMaxMessages() < 1 ? 1 : queueConfiguration.getMaxMessages(), + Optional.ofNullable(queueConfiguration.getWaitingTime()).orElse(Duration.ZERO), + method.getArguments()[0] + ) + ) + .doOnNext(callback::message) + .flatMap(message -> executor.apply(() -> method.invoke(bean, message))); + + handleResult(configuration, callback, results); } else { LOGGER.error("Too many arguments for " + method + "! The job method wasn't executed!"); } @@ -114,6 +114,7 @@ protected void handleResult(JobConfiguration configuration, JobRunContext callba Object result = Flux.from(resultPublisher).blockFirst(); if (result == null) { + callback.finished(); return; } @@ -122,7 +123,10 @@ protected void handleResult(JobConfiguration configuration, JobRunContext callba JobQueues sender = queues(configuration.getProducer().getQueueType()); if (result instanceof Publisher) { - Flux resultFLux = Flux.from((Publisher) result).doOnNext(callback::result); + Flux resultFLux = Flux.from((Publisher) result) + .doOnNext(callback::result) + .doOnError(callback::error) + .doFinally(signalType -> callback.finished()); sender.sendMessages(queueName, resultFLux); return; } @@ -134,13 +138,14 @@ protected void handleResult(JobConfiguration configuration, JobRunContext callba callback.result(result); sender.sendMessage(queueName, result); + callback.finished(); } private JobQueues queues(String qualifier) { return context.findBean( - JobQueues.class, - qualifier == null ? null : Qualifiers.byName(qualifier) - ) + JobQueues.class, + qualifier == null ? null : Qualifiers.byName(qualifier) + ) .orElseGet(() -> context.getBean(JobQueues.class)); } diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/processor/MethodJob.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/processor/MethodJob.java index 3aff44ef..8718bd22 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/processor/MethodJob.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/processor/MethodJob.java @@ -71,7 +71,7 @@ public ExecutableMethod getMethod() { } @Override - @SuppressWarnings({"rawtypes", "unchecked"}) + @SuppressWarnings({"unchecked"}) protected void doRun(JobRunContext context) { io.micronaut.context.Qualifier qualifer = beanDefinition .getAnnotationTypeByStereotype(Qualifier.class) @@ -84,8 +84,9 @@ protected void doRun(JobRunContext context) { bean = (B) beanContext.getBean(beanType, qualifer); context - .onMessage((status, message) -> beanContext.getEventPublisher(JobExecutionStartedEvent.class).publishEvent(new JobExecutionStartedEvent(getName(), status.getId()))) + .onMessage((status, message) -> beanContext.getEventPublisher(JobExecutionStartedEvent.class).publishEvent(new JobExecutionStartedEvent(getName(), status.getId(), message))) .onFinished(status -> beanContext.getEventPublisher(JobExecutionFinishedEvent.class).publishEvent(new JobExecutionFinishedEvent(getName(), status))) + .onError((status, ex) -> beanContext.getEventPublisher(JobExecutionFinishedEvent.class).publishEvent(new JobExecutionFinishedEvent(getName(), status))) .onResult((status, result) -> beanContext.getEventPublisher(JobExecutionResultEvent.class).publishEvent(new JobExecutionResultEvent(getName(), status.getId(), result))); jobMethodInvoker.invoke(this, bean, context); @@ -93,19 +94,24 @@ protected void doRun(JobRunContext context) { } catch (Throwable e) { context.error(e); - io.micronaut.context.Qualifier qualifier = Qualifiers.byTypeArguments(beanType, e.getClass()); - Collection> definitions = beanContext.getBeanDefinitions(TaskExceptionHandler.class, qualifier); - Optional> mostSpecific = definitions.stream().filter(def -> { - List> typeArguments = def.getTypeArguments(TaskExceptionHandler.class); - if (typeArguments.size() == 2) { - return typeArguments.get(0).getType() == beanType && typeArguments.get(1).getType() == e.getClass(); - } - return false; - }).findFirst(); - - TaskExceptionHandler finalHandler = mostSpecific.map(bd -> beanContext.getBean(bd.getBeanType(), qualifier)).orElse(taskExceptionHandler); - finalHandler.handle(bean, e); + handleException(e, beanType, bean); } } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private void handleException(Throwable e, Class beanType, B bean) { + io.micronaut.context.Qualifier qualifier = Qualifiers.byTypeArguments(beanType, e.getClass()); + Collection> definitions = beanContext.getBeanDefinitions(TaskExceptionHandler.class, qualifier); + Optional> mostSpecific = definitions.stream().filter(def -> { + List> typeArguments = def.getTypeArguments(TaskExceptionHandler.class); + if (typeArguments.size() == 2) { + return typeArguments.get(0).getType() == beanType && typeArguments.get(1).getType() == e.getClass(); + } + return false; + }).findFirst(); + + TaskExceptionHandler finalHandler = mostSpecific.map(bd -> beanContext.getBean(bd.getBeanType(), qualifier)).orElse(taskExceptionHandler); + finalHandler.handle(bean, e); + } } diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/queue/JobQueues.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/queue/JobQueues.java index e9576bbe..76670684 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/queue/JobQueues.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/queue/JobQueues.java @@ -26,7 +26,22 @@ public interface JobQueues { - void readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument argument, Consumer action); + /** + * Reads messages from the queue and processes them with the given action. + * @param queueName the name of the queue + * @param maxNumberOfMessages the maximal number of messages to read + * @param waitTime the maximal time to wait for the messages + * @param argument the argument type + * @param action the action to process the message + * @param the type of the message + * @deprecated Use {@link #readMessages(String, int, Duration, Argument)} instead + */ + @Deprecated(forRemoval = true) + default void readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument argument, Consumer action) { + Flux.from(readMessages(queueName, maxNumberOfMessages, waitTime, argument)).subscribe(action::accept); + } + + Publisher readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument argument); default void sendRawMessages(String queueName, Publisher result) { Flux.from(result).subscribe(message -> sendRawMessage(queueName, message)); diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/schedule/DefaultJobScheduler.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/schedule/DefaultJobScheduler.java index c6663e56..7ce32152 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/schedule/DefaultJobScheduler.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/schedule/DefaultJobScheduler.java @@ -17,21 +17,25 @@ */ package com.agorapulse.worker.schedule; +import com.agorapulse.worker.Job; import com.agorapulse.worker.JobConfiguration; import com.agorapulse.worker.JobConfigurationException; import com.agorapulse.worker.JobScheduler; +import com.agorapulse.worker.job.MutableCancelableJob; import io.micronaut.context.BeanContext; import io.micronaut.context.annotation.Requires; import io.micronaut.core.util.StringUtils; import io.micronaut.inject.qualifiers.Qualifiers; import io.micronaut.scheduling.ScheduledExecutorTaskScheduler; import io.micronaut.scheduling.TaskScheduler; +import jakarta.inject.Singleton; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import jakarta.inject.Singleton; import java.io.Closeable; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedDeque; @@ -49,10 +53,10 @@ public class DefaultJobScheduler implements JobScheduler, Closeable { private final Queue> scheduledTasks = new ConcurrentLinkedDeque<>(); /** - * @param beanContext The bean context for DI of beans annotated with {@link jakarta.inject.Inject} + * @param beanContext The bean context for DI of beans annotated with {@link jakarta.inject.Inject} */ public DefaultJobScheduler( - BeanContext beanContext + BeanContext beanContext ) { this.beanContext = beanContext; } @@ -71,6 +75,27 @@ public void schedule(com.agorapulse.worker.Job job) { JobConfiguration configuration = job.getConfiguration(); TaskScheduler taskScheduler = getTaskScheduler(job); + List> scheduled = new ArrayList<>(); + + for (int i = 0; i < configuration.getFork(); i++) { + scheduled.addAll(doSchedule(job, configuration, taskScheduler)); + } + + if (job instanceof MutableCancelableJob mj) { + mj.cancelAction(() -> { + for (ScheduledFuture scheduledTask : scheduled) { + if (!scheduledTask.isCancelled()) { + scheduledTask.cancel(false); + } + } + }); + } + + scheduledTasks.addAll(scheduled); + } + + private List> doSchedule(Job job, JobConfiguration configuration, TaskScheduler taskScheduler) { + List> scheduled = new ArrayList<>(); Duration initialDelay = configuration.getInitialDelay(); if (StringUtils.isNotEmpty(configuration.getCron())) { @@ -81,7 +106,8 @@ public void schedule(com.agorapulse.worker.Job job) { LOG.debug("Scheduling cron job {} [{}] for {}", configuration.getName(), configuration.getCron(), job.getSource()); } try { - taskScheduler.schedule(configuration.getCron(), job); + ScheduledFuture scheduledFuture = taskScheduler.schedule(configuration.getCron(), job); + scheduled.add(scheduledFuture); } catch (IllegalArgumentException e) { throw new JobConfigurationException(job, "Failed to schedule job " + configuration.getName() + " declared in " + job.getSource() + ". Invalid CRON expression: " + configuration.getCron(), e); } @@ -91,10 +117,8 @@ public void schedule(com.agorapulse.worker.Job job) { LOG.debug("Scheduling fixed rate job {} [{}] for {}", configuration.getName(), duration, job.getSource()); } - for (int i = 0; i < configuration.getFork(); i++) { - ScheduledFuture scheduledFuture = taskScheduler.scheduleAtFixedRate(initialDelay, duration, job); - scheduledTasks.add(scheduledFuture); - } + ScheduledFuture scheduledFuture = taskScheduler.scheduleAtFixedRate(initialDelay, duration, job); + scheduled.add(scheduledFuture); } else if (configuration.getFixedDelay() != null) { Duration duration = configuration.getFixedDelay(); @@ -102,18 +126,15 @@ public void schedule(com.agorapulse.worker.Job job) { LOG.debug("Scheduling fixed delay task {} [{}] for {}", configuration.getName(), duration, job.getSource()); } - for (int i = 0; i < configuration.getFork(); i++) { - ScheduledFuture scheduledFuture = taskScheduler.scheduleWithFixedDelay(initialDelay, duration, job); - scheduledTasks.add(scheduledFuture); - } + ScheduledFuture scheduledFuture = taskScheduler.scheduleWithFixedDelay(initialDelay, duration, job); + scheduled.add(scheduledFuture); } else if (initialDelay != null) { - for (int i = 0; i < configuration.getFork(); i++) { - ScheduledFuture scheduledFuture = taskScheduler.schedule(initialDelay, job); - scheduledTasks.add(scheduledFuture); - } + ScheduledFuture scheduledFuture = taskScheduler.schedule(initialDelay, job); + scheduled.add(scheduledFuture); } else { - throw new JobConfigurationException(job, "Failed to schedule job " + configuration.getName() + " declared in " + job.getSource() + ". Invalid definition"); + throw new JobConfigurationException(job, "Failed to schedule job " + configuration.getName() + " declared in " + job.getSource() + ". Invalid definition"); } + return scheduled; } private TaskScheduler getTaskScheduler(com.agorapulse.worker.Job job) { @@ -122,8 +143,8 @@ private TaskScheduler getTaskScheduler(com.agorapulse.worker.Job job) { if (!optionalTaskScheduler.isPresent()) { optionalTaskScheduler = beanContext.findBean(ExecutorService.class, Qualifiers.byName(configuration.getScheduler())) - .filter(ScheduledExecutorService.class::isInstance) - .map(ScheduledExecutorTaskScheduler::new); + .filter(ScheduledExecutorService.class::isInstance) + .map(ScheduledExecutorTaskScheduler::new); } return optionalTaskScheduler.orElseThrow(() -> new JobConfigurationException(job, "No scheduler of type TaskScheduler configured for name: " + configuration.getScheduler())); diff --git a/libs/micronaut-worker/src/test/groovy/com/agorapulse/worker/JobManagerSpec.groovy b/libs/micronaut-worker/src/test/groovy/com/agorapulse/worker/JobManagerSpec.groovy index 10aae706..4273ae50 100644 --- a/libs/micronaut-worker/src/test/groovy/com/agorapulse/worker/JobManagerSpec.groovy +++ b/libs/micronaut-worker/src/test/groovy/com/agorapulse/worker/JobManagerSpec.groovy @@ -18,6 +18,7 @@ package com.agorapulse.worker import com.agorapulse.worker.annotation.FixedRate +import com.agorapulse.worker.annotation.InitialDelay import io.micronaut.context.annotation.Property import io.micronaut.context.annotation.Requires import io.micronaut.test.extensions.spock.annotation.MicronautTest @@ -38,6 +39,7 @@ class JobManagerSpec extends Specification { @Inject JobManager manager @Inject ConsumerJob consumerJob + @Inject InLongFutureJob inLongFutureJob void 'can register new jobs'() { given: @@ -129,6 +131,22 @@ class JobManagerSpec extends Specification { executed.get() } + void 'can reconfigure'() { + expect: + 'in-long-future-job' in manager.jobNames + when: + manager.enqueue(InLongFutureJob, 'Hello') + + manager.reconfigure('in-long-future-job') { + enabled true + initialDelay Duration.ofMillis(1) + } + + Thread.sleep(100) + then: + inLongFutureJob.messages.contains('Hello') + } + } @Singleton @@ -144,3 +162,17 @@ class ConsumerJob implements Consumer { } } + +@Singleton +@Requires(env = JobManagerSpec.MANAGER_SPEC_ENVIRONMENT) +class InLongFutureJob implements Consumer { + + List messages = [] + + @Override + @InitialDelay('24h') + void accept(String message) { + messages << message + } + +} diff --git a/libs/micronaut-worker/src/test/groovy/com/agorapulse/worker/console/ConsoleSpec.groovy b/libs/micronaut-worker/src/test/groovy/com/agorapulse/worker/console/ConsoleSpec.groovy index 9a2578c2..eade6f22 100644 --- a/libs/micronaut-worker/src/test/groovy/com/agorapulse/worker/console/ConsoleSpec.groovy +++ b/libs/micronaut-worker/src/test/groovy/com/agorapulse/worker/console/ConsoleSpec.groovy @@ -28,7 +28,7 @@ import spock.lang.Specification import jakarta.inject.Inject import jakarta.inject.Singleton -@MicronautTest(environments = CONSOLE_SPEC_ENVIRONMENT) +@MicronautTest(environments = CONSOLE_SPEC_ENVIRONMENT, rebuildContext = true) @Property(name = 'worker.jobs.sample-job.enabled', value = 'true') @Property(name = 'console.enabled', value = 'true') class ConsoleSpec extends Specification { @@ -93,6 +93,18 @@ class ConsoleSpec extends Specification { } } + void 'reconfigure job'() { + expect: + gru.test { + post '/console/execute/result', { + content('reconfigure.groovy', 'text/groovy') + } + expect { + text 'reconfigureResponse.txt' + } + } + } + void 'render job'() { expect: gru.test { diff --git a/libs/micronaut-worker/src/test/groovy/com/agorapulse/worker/console/JobAccessorSpec.groovy b/libs/micronaut-worker/src/test/groovy/com/agorapulse/worker/console/JobAccessorSpec.groovy index b59bab6b..9b23604f 100644 --- a/libs/micronaut-worker/src/test/groovy/com/agorapulse/worker/console/JobAccessorSpec.groovy +++ b/libs/micronaut-worker/src/test/groovy/com/agorapulse/worker/console/JobAccessorSpec.groovy @@ -39,4 +39,11 @@ class JobAccessorSpec extends Specification { 1 * jobManager.enqueue('test-job', 'foo') } + void 'reconfigure'() { + when: + accessor.reconfigure { } + then: + 1 * jobManager.reconfigure('test-job', _) + } + } diff --git a/libs/micronaut-worker/src/test/resources/com/agorapulse/worker/console/ConsoleSpec/reconfigure.groovy b/libs/micronaut-worker/src/test/resources/com/agorapulse/worker/console/ConsoleSpec/reconfigure.groovy new file mode 100644 index 00000000..cba0eea5 --- /dev/null +++ b/libs/micronaut-worker/src/test/resources/com/agorapulse/worker/console/ConsoleSpec/reconfigure.groovy @@ -0,0 +1,6 @@ +import java.time.Duration + +sampleJob.reconfigure { + enabled true + initialDelay Duration.ofMillis(1) +} diff --git a/libs/micronaut-worker/src/test/resources/com/agorapulse/worker/console/ConsoleSpec/reconfigureResponse.txt b/libs/micronaut-worker/src/test/resources/com/agorapulse/worker/console/ConsoleSpec/reconfigureResponse.txt new file mode 100644 index 00000000..ec747fa4 --- /dev/null +++ b/libs/micronaut-worker/src/test/resources/com/agorapulse/worker/console/ConsoleSpec/reconfigureResponse.txt @@ -0,0 +1 @@ +null \ No newline at end of file