Skip to content

Commit

Permalink
Merge pull request #4533 from gchq/gh-4532-run-job-now
Browse files Browse the repository at this point in the history
PR for #4532 - `Run Now` doesn't work if the job or jobNode is disabled
  • Loading branch information
at055612 authored Oct 10, 2024
2 parents 3d45bba + 71585ff commit 9215f51
Show file tree
Hide file tree
Showing 8 changed files with 203 additions and 116 deletions.
80 changes: 49 additions & 31 deletions stroom-core-shared/src/main/java/stroom/job/shared/JobNode.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright 2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package stroom.job.shared;


Expand Down Expand Up @@ -202,37 +218,6 @@ public void setEnabled(final boolean enabled) {
this.enabled = enabled;
}


// --------------------------------------------------------------------------------


@Schema
public enum JobType implements HasPrimitiveValue {
UNKNOWN("UNKNOWN", 0),
CRON("Cron", 1),
FREQUENCY("Frequency", 2),
DISTRIBUTED("Distributed", 3);

public static final PrimitiveValueConverter<JobType> PRIMITIVE_VALUE_CONVERTER = new PrimitiveValueConverter<>(
JobType.values());
private final String displayValue;
private final byte primitiveValue;

JobType(final String displayValue, final int primitiveValue) {
this.displayValue = displayValue;
this.primitiveValue = (byte) primitiveValue;
}

public String getDisplayValue() {
return displayValue;
}

@Override
public byte getPrimitiveValue() {
return primitiveValue;
}
}

@Override
public String toString() {
return "JobNode{" +
Expand Down Expand Up @@ -267,4 +252,37 @@ public boolean equals(final Object o) {
public int hashCode() {
return Objects.hash(id);
}


// --------------------------------------------------------------------------------


@Schema
public enum JobType implements HasPrimitiveValue {
UNKNOWN("UNKNOWN", 0),
CRON("Cron", 1),
FREQUENCY("Frequency", 2),
DISTRIBUTED("Distributed", 3),
;

public static final PrimitiveValueConverter<JobType> PRIMITIVE_VALUE_CONVERTER = new PrimitiveValueConverter<>(
JobType.values());

private final String displayValue;
private final byte primitiveValue;

JobType(final String displayValue, final int primitiveValue) {
this.displayValue = displayValue;
this.primitiveValue = (byte) primitiveValue;
}

public String getDisplayValue() {
return displayValue;
}

@Override
public byte getPrimitiveValue() {
return primitiveValue;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017 Crown Copyright
* Copyright 2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -12,7 +12,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package stroom.job.impl;
Expand All @@ -30,6 +29,7 @@
import stroom.util.logging.LambdaLoggerFactory;
import stroom.util.logging.LogUtil;
import stroom.util.shared.ResultPage;
import stroom.util.shared.scheduler.ScheduleType;

import jakarta.inject.Inject;
import jakarta.inject.Provider;
Expand Down Expand Up @@ -113,16 +113,12 @@ public void startup() {
newJobNode.setNodeName(nodeName);
newJobNode.setEnabled(scheduledJob.isEnabled());

switch (scheduledJob.getSchedule().getType()) {
case CRON:
newJobNode.setJobType(JobType.CRON);
break;
case FREQUENCY:
newJobNode.setJobType(JobType.FREQUENCY);
break;
default:
throw new RuntimeException("Unknown ScheduleType!");
}
final JobType newJobType = switch (scheduledJob.getSchedule().getType()) {
case ScheduleType.CRON -> JobType.CRON;
case ScheduleType.FREQUENCY -> JobType.FREQUENCY;
default -> throw new RuntimeException("Unknown ScheduleType!");
};
newJobNode.setJobType(newJobType);
newJobNode.setSchedule(scheduledJob.getSchedule().getExpression());

// Add the job node to the DB if it isn't there already.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017 Crown Copyright
* Copyright 2017-2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -12,7 +12,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package stroom.job.impl;
Expand Down Expand Up @@ -303,17 +302,17 @@ private void ensureSchedule(final JobNode jobNode) {
}

private void ensureSchedule(final JobType jobType, final String scheduleExpression) {
// Stop Job Nodes being saved with invalid crons.
if (JobType.CRON.equals(jobType)) {
if (scheduleExpression != null) {
// This will throw a runtime exception if the expression is invalid.
new CronTrigger(scheduleExpression);
}
}
if (JobType.FREQUENCY.equals(jobType)) {
if (scheduleExpression != null) {
// This will throw a runtime exception if the expression is invalid.
new FrequencyTrigger(scheduleExpression);
if (scheduleExpression != null) {
// Stop Job Nodes being saved with invalid crons.
switch (jobType) {
case CRON -> {
// This will throw a runtime exception if the expression is invalid.
new CronTrigger(scheduleExpression);
}
case FREQUENCY -> {
// This will throw a runtime exception if the expression is invalid.
new FrequencyTrigger(scheduleExpression);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017 Crown Copyright
* Copyright 2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -12,14 +12,12 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package stroom.job.impl;

import stroom.job.shared.FindJobNodeCriteria;
import stroom.job.shared.JobNode;
import stroom.job.shared.JobNode.JobType;
import stroom.job.shared.JobNodeUtil;
import stroom.node.api.NodeInfo;
import stroom.task.api.ExecutorProvider;
Expand Down Expand Up @@ -105,42 +103,43 @@ static class JobNodeTrackersImpl implements JobNodeTrackers {
trackersForJobNode.put(jobNode, jobNodeTracker);
trackersForJobName.put(jobName, jobNodeTracker);

// Remember trackers for enabled and distributed jobs.
if (JobType.DISTRIBUTED.equals(jobNode.getJobType())) {
if (jobNode.getJob().isEnabled() && jobNode.isEnabled()) {
distributedJobNodeTrackers.add(jobNodeTracker);
switch (jobNode.getJobType()) {
// Remember trackers for enabled and distributed jobs.
case DISTRIBUTED -> {
if (jobNode.getJob().isEnabled() && jobNode.isEnabled()) {
distributedJobNodeTrackers.add(jobNodeTracker);
}
}
}

try {
// Update schedule and frequency times if this job node
// has a job type of cron or frequency.
if (JobType.CRON.equals(jobNode.getJobType())
|| JobType.FREQUENCY.equals(jobNode.getJobType())) {
final String expression = jobNode.getSchedule();
// Update the schedule cache if the schedule has
// changed.
if (expression != null) {
scheduleValueMap.put(jobNode, expression);
if (previousState != null
&& expression.equals(previousState.scheduleValueMap.get(jobNode))) {
schedulerMap.put(jobNode, previousState.schedulerMap.get(jobNode));
} else {
try {
final Schedule schedule = JobNodeUtil.getSchedule(jobNode);
if (schedule != null) {
schedulerMap.put(jobNode, new SimpleScheduleExec(
TriggerFactory.create(schedule)));
case CRON, FREQUENCY -> {
try {
// Update schedule and frequency times if this job node
// has a job type of cron or frequency.
final String expression = jobNode.getSchedule();
// Update the schedule cache if the schedule has
// changed.
if (expression != null) {
scheduleValueMap.put(jobNode, expression);
if (previousState != null
&& expression.equals(previousState.scheduleValueMap.get(jobNode))) {
schedulerMap.put(jobNode, previousState.schedulerMap.get(jobNode));
} else {
try {
final Schedule schedule = JobNodeUtil.getSchedule(jobNode);
if (schedule != null) {
schedulerMap.put(jobNode, new SimpleScheduleExec(
TriggerFactory.create(schedule)));
}
} catch (final RuntimeException e) {
LOGGER.error("Problem updating schedule for '" + jobName + "' job : "
+ e.getMessage(), e);
}
} catch (final RuntimeException e) {
LOGGER.error("Problem updating schedule for '" + jobName + "' job : "
+ e.getMessage(), e);
}
}
} catch (final RuntimeException e) {
LOGGER.error("Problem updating schedule for '" + jobName + "' job : " + e.getMessage(),
e);
}
}
} catch (final RuntimeException e) {
LOGGER.error("Problem updating schedule for '" + jobName + "' job : " + e.getMessage(), e);
}
}
} catch (final RuntimeException e) {
Expand All @@ -165,18 +164,20 @@ public SimpleScheduleExec getScheduleExec(final JobNode jobNode) {

@Override
public void triggerImmediateExecution(final JobNode jobNode) {
if (jobNode != null
&& (jobNode.getJobType() == JobType.CRON || jobNode.getJobType() == JobType.FREQUENCY)) {

schedulerMap.compute(jobNode, (jobNode2, curSimpleScheduleExec) -> {
SimpleScheduleExec newSimpleScheduleExec = curSimpleScheduleExec;
if (newSimpleScheduleExec == null) {
final Schedule schedule = JobNodeUtil.getSchedule(jobNode);
final Trigger trigger = TriggerFactory.create(schedule);
newSimpleScheduleExec = new SimpleScheduleExec(trigger);
}
return newSimpleScheduleExec.cloneWithImmediateExecution();
});
if (jobNode != null) {
switch (jobNode.getJobType()) {
case CRON, FREQUENCY -> schedulerMap.compute(
jobNode,
(jobNode2, curSimpleScheduleExec) -> {
if (curSimpleScheduleExec == null) {
final Schedule schedule = JobNodeUtil.getSchedule(jobNode);
final Trigger trigger = TriggerFactory.create(schedule);
return SimpleScheduleExec.createForImmediateExecution(trigger);
} else {
return curSimpleScheduleExec.cloneForImmediateExecution();
}
});
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016 Crown Copyright
* Copyright 2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -238,23 +238,23 @@ private ScheduledJobFunction create(final ScheduledJob scheduledJob) {
// Only run one instance of this method at a time.
if (running.compareAndSet(false, true)) {
try {
boolean enabled = true;
boolean isJobEnabledOnNode = true;
SimpleScheduleExec scheduler = null;
JobNodeTracker jobNodeTracker;

final JobNodeTrackers trackers = jobNodeTrackerCache.getTrackers();
jobNodeTracker = trackers.getTrackerForJobName(scheduledJob.getName());

if (scheduledJob.isManaged()) {
enabled = false;
isJobEnabledOnNode = false;
if (jobNodeTracker == null) {
LOGGER.error("No job node tracker found for: " + scheduledJob.getName());
} else {
final JobNode jobNode = jobNodeTracker.getJobNode();
if (jobNode == null) {
LOGGER.error("Job node tracker has null job node for: " + scheduledJob.getName());
} else {
enabled = jobNode.getJob().isEnabled()
isJobEnabledOnNode = jobNode.getJob().isEnabled()
&& jobNode.isEnabled();
scheduler = trackers.getScheduleExec(jobNode);
}
Expand All @@ -263,8 +263,9 @@ private ScheduledJobFunction create(final ScheduledJob scheduledJob) {
scheduler = getOrCreateScheduler(scheduledJob);
}

if (enabled && scheduler != null && scheduler.execute()) {
//TODO log trace
if (scheduler != null
&& (isJobEnabledOnNode || scheduler.isRunIfDisabled())
&& scheduler.execute()) {
// LOGGER.trace("Returning runnable for method: {} - {} - {}", methodReference, enabled, scheduler);
final Provider<Runnable> consumerProvider = scheduledJobsMap.get(scheduledJob);
if (jobNodeTracker != null) {
Expand All @@ -276,7 +277,7 @@ private ScheduledJobFunction create(final ScheduledJob scheduledJob) {
} else {
LOGGER.trace("Not returning runnable for method: {} - {} - {}",
scheduledJob.getName(),
enabled,
isJobEnabledOnNode,
scheduler);
running.set(false);
}
Expand Down
Loading

0 comments on commit 9215f51

Please sign in to comment.