Skip to content

Commit

Permalink
WIP Job Runner
Browse files Browse the repository at this point in the history
  • Loading branch information
musketyr committed Nov 26, 2024
1 parent af2b7f4 commit ba834e2
Show file tree
Hide file tree
Showing 14 changed files with 364 additions and 3 deletions.
25 changes: 25 additions & 0 deletions libs/micronaut-worker-runner/micronaut-worker-runner.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* Copyright 2021-2024 Agorapulse.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
dependencies {
api project(':micronaut-worker')
api 'io.micronaut:micronaut-function'

testImplementation 'io.micronaut.reactor:micronaut-reactor'
// verify everything works as expected even with server environment present
testImplementation 'io.micronaut:micronaut-http-server-netty'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package com.agorapulse.worker.runner;

import com.agorapulse.worker.Job;
import com.agorapulse.worker.JobManager;
import com.agorapulse.worker.report.JobReport;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.ApplicationContextBuilder;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.cli.CommandLine;
import io.micronaut.function.executor.FunctionInitializer;
import jakarta.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@SuppressWarnings("java:S6813")
public class JobRunner extends FunctionInitializer {

private static final Logger LOGGER = LoggerFactory.getLogger(JobRunner.class);

public static void main(String[] args) throws IOException {
try (JobRunner runner = new JobRunner()) {
runner.run(args);
}
}

@Inject
private JobManager jobManager;

public JobRunner() {
super();
}

public JobRunner(ApplicationContext applicationContext) {
super(applicationContext);
}

public JobRunner(ApplicationContext applicationContext, boolean inject) {
super(applicationContext, inject);
}

public void run(String[] args) throws IOException {
CommandLine cli = CommandLine.build().parse(args);

run(args, ignored -> {
if (!run(cli.getRemainingArgs())) {
throw new IllegalStateException("Error running jobs! See the logs for more details.");
}
return true;
});
}

@Override
protected @NonNull ApplicationContextBuilder newApplicationContextBuilder() {
return super.newApplicationContextBuilder().environments("job");
}

private boolean run(List<String> jobNames) {
if (jobNames.isEmpty()) {
LOGGER.error("No job name provided");
return false;
}

boolean result = true;

for (String jobName : jobNames) {
try {
Optional<Job> optionalJob = jobManager.getJob(jobName);

if (optionalJob.isEmpty()) {
LOGGER.error("Job '{}' not found", jobName);
continue;
}

Job job = optionalJob.get();

job.forceRun();

waitUntilFinished(job);

if (LOGGER.isInfoEnabled()) {
LOGGER.info("Job '{}' executed in {}", jobName, JobReport.humanReadableFormat(job.getStatus().getLastDuration()));
}

if (job.getStatus().getLastException() != null) {
// the exception is already logged
result = false;
}
} catch (Exception e) {
LOGGER.error("Error running job '{}'", jobName, e);

result = false;
}
}

return result;
}

private static void waitUntilFinished(Job job) {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() -> {
if (job.getStatus().getLastDuration() != null) {
scheduler.shutdown();
}
}, 0, 100, TimeUnit.MILLISECONDS);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.agorapulse.worker.runner

import com.agorapulse.worker.event.JobExecutionFinishedEvent
import com.agorapulse.worker.event.JobExecutionResultEvent
import com.agorapulse.worker.event.JobExecutionStartedEvent
import groovy.transform.CompileStatic
import io.micronaut.runtime.event.annotation.EventListener
import jakarta.inject.Singleton

@Singleton
@CompileStatic
class JobExecutionRecorder {

final List<JobExecutionStartedEvent> startedEvents = []
final List<JobExecutionFinishedEvent> finishedEvents = []
final List<JobExecutionResultEvent> resultEvents = []

@EventListener
void onJobStarted(JobExecutionStartedEvent event) {
startedEvents.add(event)
}

@EventListener
void onJobFinished(JobExecutionFinishedEvent event) {
finishedEvents.add(event)
}

@EventListener
void onJobResult(JobExecutionResultEvent event) {
resultEvents.add(event)
}



}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.agorapulse.worker.runner

import io.micronaut.context.ApplicationContext
import io.micronaut.test.extensions.spock.annotation.MicronautTest
import jakarta.inject.Inject
import spock.lang.Specification

@MicronautTest(rebuildContext = true)
class JobRunnerSpec extends Specification {

@Inject ApplicationContext context
@Inject TestJob job
@Inject TestFunctionExitHandler exitHandler
@Inject JobExecutionRecorder recorder

void 'single job is executed'() {
when:
JobRunner runner = new JobRunner(context)
runner.run('test-job-one')
then:
'test-job-one' in recorder.finishedEvents*.name
'test-job-two' !in recorder.finishedEvents*.name
'test-job-three' !in recorder.finishedEvents*.name

exitHandler.success
}

void 'runner waits until all events are generated'() {
when:
JobRunner runner = new JobRunner(context)
runner.run('test-job-two')
then:
'test-job-one' !in recorder.finishedEvents*.name
'test-job-two' in recorder.finishedEvents*.name
'test-job-three' !in recorder.finishedEvents*.name

recorder.resultEvents.any { result -> result.name == 'test-job-two' && result.result == 'foo' }

exitHandler.success
}

void 'job failure is propagated'() {
when:
JobRunner runner = new JobRunner(context)
runner.run('test-job-three')
then:
'test-job-one' !in recorder.finishedEvents*.name
'test-job-two' !in recorder.finishedEvents*.name
'test-job-three' in recorder.finishedEvents*.name

!exitHandler.success
exitHandler.error instanceof IllegalStateException
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.agorapulse.worker.runner;

import io.micronaut.context.annotation.Primary;
import io.micronaut.function.executor.FunctionExitHandler;
import jakarta.inject.Singleton;

@Primary
@Singleton
public class TestFunctionExitHandler implements FunctionExitHandler {

private boolean success;
private Exception error;

@Override
public void exitWithError(Exception error, boolean debug) {
this.error = error;
}

@Override
public void exitWithSuccess() {
success = true;
}

@Override
public void exitWithNoData() {
success = true;
}

public boolean isSuccess() {
return success;
}

public Exception getError() {
return error;
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.agorapulse.worker.runner;

import com.agorapulse.worker.annotation.Job;
import io.micronaut.context.BeanContext;
import jakarta.inject.Singleton;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;

@Singleton
public class TestJob {

public record ExecutionEvent(String job) {}

private final BeanContext context;

public TestJob(BeanContext context) {
this.context = context;
}

@Job("test-job-one")
public void recordingJobOne() {
context.getEventPublisher(ExecutionEvent.class).publishEvent(new ExecutionEvent("test-job-one"));
}

@Job("test-job-two")
public Flux<String> recordingJobTwo() {
context.getEventPublisher(ExecutionEvent.class).publishEvent(new ExecutionEvent("test-job-two"));
return Flux.from(Mono.delay(Duration.ofMillis(300)).map(ignore -> "foo"));
}

@Job("test-job-three")
public void recordingJobThree() {
context.getEventPublisher(ExecutionEvent.class).publishEvent(new ExecutionEvent("test-job-three"));
throw new UnsupportedOperationException("This job is supposed to fail");
}

}
35 changes: 35 additions & 0 deletions libs/micronaut-worker-runner/src/test/resources/log4j2-test.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
SPDX-License-Identifier: Apache-2.0
Copyright 2021-2024 Agorapulse.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<Configuration packages="org.apache.logging.log4j.core">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="Console"/>
</Root>
<Logger name="com.agorapulse.worker" level="trace" additivity="false">
<AppenderRef ref="Console" />
</Logger>
</Loggers>
</Configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.agorapulse.worker.job.DefaultJobRunStatus;
import com.agorapulse.worker.json.DurationSerializer;
import com.agorapulse.worker.report.JobReport;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
Expand All @@ -45,6 +46,10 @@ default Duration getDuration() {
return Duration.between(started, finished);
}

default String getHumanReadableDuration() {
return JobReport.humanReadableFormat(getDuration());
}

default long getDurationMillis() {
return getDuration().toMillis();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import io.micronaut.context.annotation.AliasFor;
import io.micronaut.scheduling.TaskExecutors;
import jakarta.inject.Named;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
Expand All @@ -40,6 +41,7 @@
String value();

@AliasFor(annotation = Job.class, member = "value")
@AliasFor(annotation = Named.class, member = "value")
String name() default "";

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import io.micronaut.context.annotation.AliasFor;
import io.micronaut.scheduling.TaskExecutors;
import jakarta.inject.Named;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
Expand All @@ -40,6 +41,7 @@
String value();

@AliasFor(annotation = Job.class, member = "value")
@AliasFor(annotation = Named.class, member = "value")
String name() default "";

/**
Expand Down
Loading

0 comments on commit ba834e2

Please sign in to comment.