diff --git a/stroom-core-shared/src/main/java/stroom/job/shared/JobNode.java b/stroom-core-shared/src/main/java/stroom/job/shared/JobNode.java index 7b2f0cfae2c..0ffe139ee77 100644 --- a/stroom-core-shared/src/main/java/stroom/job/shared/JobNode.java +++ b/stroom-core-shared/src/main/java/stroom/job/shared/JobNode.java @@ -1,3 +1,19 @@ +/* + * Copyright 2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package stroom.job.shared; @@ -202,37 +218,6 @@ public void setEnabled(final boolean enabled) { this.enabled = enabled; } - - // -------------------------------------------------------------------------------- - - - @Schema - public enum JobType implements HasPrimitiveValue { - UNKNOWN("UNKNOWN", 0), - CRON("Cron", 1), - FREQUENCY("Frequency", 2), - DISTRIBUTED("Distributed", 3); - - public static final PrimitiveValueConverter PRIMITIVE_VALUE_CONVERTER = new PrimitiveValueConverter<>( - JobType.values()); - private final String displayValue; - private final byte primitiveValue; - - JobType(final String displayValue, final int primitiveValue) { - this.displayValue = displayValue; - this.primitiveValue = (byte) primitiveValue; - } - - public String getDisplayValue() { - return displayValue; - } - - @Override - public byte getPrimitiveValue() { - return primitiveValue; - } - } - @Override public String toString() { return "JobNode{" + @@ -267,4 +252,37 @@ public boolean equals(final Object o) { public int hashCode() { return Objects.hash(id); } + + + // -------------------------------------------------------------------------------- + + + @Schema + public enum JobType implements HasPrimitiveValue { + UNKNOWN("UNKNOWN", 0), + CRON("Cron", 1), + FREQUENCY("Frequency", 2), + DISTRIBUTED("Distributed", 3), + ; + + public static final PrimitiveValueConverter PRIMITIVE_VALUE_CONVERTER = new PrimitiveValueConverter<>( + JobType.values()); + + private final String displayValue; + private final byte primitiveValue; + + JobType(final String displayValue, final int primitiveValue) { + this.displayValue = displayValue; + this.primitiveValue = (byte) primitiveValue; + } + + public String getDisplayValue() { + return displayValue; + } + + @Override + public byte getPrimitiveValue() { + return primitiveValue; + } + } } diff --git a/stroom-job/stroom-job-impl/src/main/java/stroom/job/impl/JobBootstrap.java b/stroom-job/stroom-job-impl/src/main/java/stroom/job/impl/JobBootstrap.java index 41e2f5a1e58..dd56d710104 100644 --- a/stroom-job/stroom-job-impl/src/main/java/stroom/job/impl/JobBootstrap.java +++ b/stroom-job/stroom-job-impl/src/main/java/stroom/job/impl/JobBootstrap.java @@ -1,5 +1,5 @@ /* - * Copyright 2017 Crown Copyright + * Copyright 2024 Crown Copyright * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -12,7 +12,6 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * */ package stroom.job.impl; @@ -30,6 +29,7 @@ import stroom.util.logging.LambdaLoggerFactory; import stroom.util.logging.LogUtil; import stroom.util.shared.ResultPage; +import stroom.util.shared.scheduler.ScheduleType; import jakarta.inject.Inject; import jakarta.inject.Provider; @@ -113,16 +113,12 @@ public void startup() { newJobNode.setNodeName(nodeName); newJobNode.setEnabled(scheduledJob.isEnabled()); - switch (scheduledJob.getSchedule().getType()) { - case CRON: - newJobNode.setJobType(JobType.CRON); - break; - case FREQUENCY: - newJobNode.setJobType(JobType.FREQUENCY); - break; - default: - throw new RuntimeException("Unknown ScheduleType!"); - } + final JobType newJobType = switch (scheduledJob.getSchedule().getType()) { + case ScheduleType.CRON -> JobType.CRON; + case ScheduleType.FREQUENCY -> JobType.FREQUENCY; + default -> throw new RuntimeException("Unknown ScheduleType!"); + }; + newJobNode.setJobType(newJobType); newJobNode.setSchedule(scheduledJob.getSchedule().getExpression()); // Add the job node to the DB if it isn't there already. diff --git a/stroom-job/stroom-job-impl/src/main/java/stroom/job/impl/JobNodeService.java b/stroom-job/stroom-job-impl/src/main/java/stroom/job/impl/JobNodeService.java index d4aa298aa50..7ff1c50b9ef 100644 --- a/stroom-job/stroom-job-impl/src/main/java/stroom/job/impl/JobNodeService.java +++ b/stroom-job/stroom-job-impl/src/main/java/stroom/job/impl/JobNodeService.java @@ -1,5 +1,5 @@ /* - * Copyright 2017 Crown Copyright + * Copyright 2017-2024 Crown Copyright * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -12,7 +12,6 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * */ package stroom.job.impl; @@ -303,17 +302,17 @@ private void ensureSchedule(final JobNode jobNode) { } private void ensureSchedule(final JobType jobType, final String scheduleExpression) { - // Stop Job Nodes being saved with invalid crons. - if (JobType.CRON.equals(jobType)) { - if (scheduleExpression != null) { - // This will throw a runtime exception if the expression is invalid. - new CronTrigger(scheduleExpression); - } - } - if (JobType.FREQUENCY.equals(jobType)) { - if (scheduleExpression != null) { - // This will throw a runtime exception if the expression is invalid. - new FrequencyTrigger(scheduleExpression); + if (scheduleExpression != null) { + // Stop Job Nodes being saved with invalid crons. + switch (jobType) { + case CRON -> { + // This will throw a runtime exception if the expression is invalid. + new CronTrigger(scheduleExpression); + } + case FREQUENCY -> { + // This will throw a runtime exception if the expression is invalid. + new FrequencyTrigger(scheduleExpression); + } } } } diff --git a/stroom-job/stroom-job-impl/src/main/java/stroom/job/impl/JobNodeTrackerCacheImpl.java b/stroom-job/stroom-job-impl/src/main/java/stroom/job/impl/JobNodeTrackerCacheImpl.java index 6353683b5d5..24ccd479b48 100644 --- a/stroom-job/stroom-job-impl/src/main/java/stroom/job/impl/JobNodeTrackerCacheImpl.java +++ b/stroom-job/stroom-job-impl/src/main/java/stroom/job/impl/JobNodeTrackerCacheImpl.java @@ -1,5 +1,5 @@ /* - * Copyright 2017 Crown Copyright + * Copyright 2024 Crown Copyright * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -12,14 +12,12 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * */ package stroom.job.impl; import stroom.job.shared.FindJobNodeCriteria; import stroom.job.shared.JobNode; -import stroom.job.shared.JobNode.JobType; import stroom.job.shared.JobNodeUtil; import stroom.node.api.NodeInfo; import stroom.task.api.ExecutorProvider; @@ -105,42 +103,43 @@ static class JobNodeTrackersImpl implements JobNodeTrackers { trackersForJobNode.put(jobNode, jobNodeTracker); trackersForJobName.put(jobName, jobNodeTracker); - // Remember trackers for enabled and distributed jobs. - if (JobType.DISTRIBUTED.equals(jobNode.getJobType())) { - if (jobNode.getJob().isEnabled() && jobNode.isEnabled()) { - distributedJobNodeTrackers.add(jobNodeTracker); + switch (jobNode.getJobType()) { + // Remember trackers for enabled and distributed jobs. + case DISTRIBUTED -> { + if (jobNode.getJob().isEnabled() && jobNode.isEnabled()) { + distributedJobNodeTrackers.add(jobNodeTracker); + } } - } - - try { - // Update schedule and frequency times if this job node - // has a job type of cron or frequency. - if (JobType.CRON.equals(jobNode.getJobType()) - || JobType.FREQUENCY.equals(jobNode.getJobType())) { - final String expression = jobNode.getSchedule(); - // Update the schedule cache if the schedule has - // changed. - if (expression != null) { - scheduleValueMap.put(jobNode, expression); - if (previousState != null - && expression.equals(previousState.scheduleValueMap.get(jobNode))) { - schedulerMap.put(jobNode, previousState.schedulerMap.get(jobNode)); - } else { - try { - final Schedule schedule = JobNodeUtil.getSchedule(jobNode); - if (schedule != null) { - schedulerMap.put(jobNode, new SimpleScheduleExec( - TriggerFactory.create(schedule))); + case CRON, FREQUENCY -> { + try { + // Update schedule and frequency times if this job node + // has a job type of cron or frequency. + final String expression = jobNode.getSchedule(); + // Update the schedule cache if the schedule has + // changed. + if (expression != null) { + scheduleValueMap.put(jobNode, expression); + if (previousState != null + && expression.equals(previousState.scheduleValueMap.get(jobNode))) { + schedulerMap.put(jobNode, previousState.schedulerMap.get(jobNode)); + } else { + try { + final Schedule schedule = JobNodeUtil.getSchedule(jobNode); + if (schedule != null) { + schedulerMap.put(jobNode, new SimpleScheduleExec( + TriggerFactory.create(schedule))); + } + } catch (final RuntimeException e) { + LOGGER.error("Problem updating schedule for '" + jobName + "' job : " + + e.getMessage(), e); } - } catch (final RuntimeException e) { - LOGGER.error("Problem updating schedule for '" + jobName + "' job : " - + e.getMessage(), e); } } + } catch (final RuntimeException e) { + LOGGER.error("Problem updating schedule for '" + jobName + "' job : " + e.getMessage(), + e); } } - } catch (final RuntimeException e) { - LOGGER.error("Problem updating schedule for '" + jobName + "' job : " + e.getMessage(), e); } } } catch (final RuntimeException e) { @@ -165,18 +164,20 @@ public SimpleScheduleExec getScheduleExec(final JobNode jobNode) { @Override public void triggerImmediateExecution(final JobNode jobNode) { - if (jobNode != null - && (jobNode.getJobType() == JobType.CRON || jobNode.getJobType() == JobType.FREQUENCY)) { - - schedulerMap.compute(jobNode, (jobNode2, curSimpleScheduleExec) -> { - SimpleScheduleExec newSimpleScheduleExec = curSimpleScheduleExec; - if (newSimpleScheduleExec == null) { - final Schedule schedule = JobNodeUtil.getSchedule(jobNode); - final Trigger trigger = TriggerFactory.create(schedule); - newSimpleScheduleExec = new SimpleScheduleExec(trigger); - } - return newSimpleScheduleExec.cloneWithImmediateExecution(); - }); + if (jobNode != null) { + switch (jobNode.getJobType()) { + case CRON, FREQUENCY -> schedulerMap.compute( + jobNode, + (jobNode2, curSimpleScheduleExec) -> { + if (curSimpleScheduleExec == null) { + final Schedule schedule = JobNodeUtil.getSchedule(jobNode); + final Trigger trigger = TriggerFactory.create(schedule); + return SimpleScheduleExec.createForImmediateExecution(trigger); + } else { + return curSimpleScheduleExec.cloneForImmediateExecution(); + } + }); + } } } } diff --git a/stroom-job/stroom-job-impl/src/main/java/stroom/job/impl/ScheduledTaskExecutor.java b/stroom-job/stroom-job-impl/src/main/java/stroom/job/impl/ScheduledTaskExecutor.java index c80b2aaee0b..29744703519 100644 --- a/stroom-job/stroom-job-impl/src/main/java/stroom/job/impl/ScheduledTaskExecutor.java +++ b/stroom-job/stroom-job-impl/src/main/java/stroom/job/impl/ScheduledTaskExecutor.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 Crown Copyright + * Copyright 2024 Crown Copyright * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -238,7 +238,7 @@ private ScheduledJobFunction create(final ScheduledJob scheduledJob) { // Only run one instance of this method at a time. if (running.compareAndSet(false, true)) { try { - boolean enabled = true; + boolean isJobEnabledOnNode = true; SimpleScheduleExec scheduler = null; JobNodeTracker jobNodeTracker; @@ -246,7 +246,7 @@ private ScheduledJobFunction create(final ScheduledJob scheduledJob) { jobNodeTracker = trackers.getTrackerForJobName(scheduledJob.getName()); if (scheduledJob.isManaged()) { - enabled = false; + isJobEnabledOnNode = false; if (jobNodeTracker == null) { LOGGER.error("No job node tracker found for: " + scheduledJob.getName()); } else { @@ -254,7 +254,7 @@ private ScheduledJobFunction create(final ScheduledJob scheduledJob) { if (jobNode == null) { LOGGER.error("Job node tracker has null job node for: " + scheduledJob.getName()); } else { - enabled = jobNode.getJob().isEnabled() + isJobEnabledOnNode = jobNode.getJob().isEnabled() && jobNode.isEnabled(); scheduler = trackers.getScheduleExec(jobNode); } @@ -263,8 +263,9 @@ private ScheduledJobFunction create(final ScheduledJob scheduledJob) { scheduler = getOrCreateScheduler(scheduledJob); } - if (enabled && scheduler != null && scheduler.execute()) { - //TODO log trace + if (scheduler != null + && (isJobEnabledOnNode || scheduler.isRunIfDisabled()) + && scheduler.execute()) { // LOGGER.trace("Returning runnable for method: {} - {} - {}", methodReference, enabled, scheduler); final Provider consumerProvider = scheduledJobsMap.get(scheduledJob); if (jobNodeTracker != null) { @@ -276,7 +277,7 @@ private ScheduledJobFunction create(final ScheduledJob scheduledJob) { } else { LOGGER.trace("Not returning runnable for method: {} - {} - {}", scheduledJob.getName(), - enabled, + isJobEnabledOnNode, scheduler); running.set(false); } diff --git a/stroom-util-shared/src/main/java/stroom/util/shared/scheduler/ScheduleType.java b/stroom-util-shared/src/main/java/stroom/util/shared/scheduler/ScheduleType.java index f2f9cec1e4c..b881519bcc3 100644 --- a/stroom-util-shared/src/main/java/stroom/util/shared/scheduler/ScheduleType.java +++ b/stroom-util-shared/src/main/java/stroom/util/shared/scheduler/ScheduleType.java @@ -1,3 +1,19 @@ +/* + * Copyright 2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package stroom.util.shared.scheduler; import stroom.docref.HasDisplayValue; @@ -5,7 +21,9 @@ public enum ScheduleType implements HasDisplayValue { INSTANT("Instant"), CRON("Cron"), - FREQUENCY("Frequency"); + FREQUENCY("Frequency"), + ; + private final String displayValue; ScheduleType(final String displayValue) { diff --git a/stroom-util/src/main/java/stroom/util/scheduler/SimpleScheduleExec.java b/stroom-util/src/main/java/stroom/util/scheduler/SimpleScheduleExec.java index 7f8920dd706..1417abadbae 100644 --- a/stroom-util/src/main/java/stroom/util/scheduler/SimpleScheduleExec.java +++ b/stroom-util/src/main/java/stroom/util/scheduler/SimpleScheduleExec.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 Crown Copyright + * Copyright 2024 Crown Copyright * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,16 +29,27 @@ public class SimpleScheduleExec { private final Trigger trigger; private Instant lastExecute; private Instant nextExecute; + private boolean runIfDisabled = false; public SimpleScheduleExec(final Trigger trigger) { this.trigger = trigger; } + public static SimpleScheduleExec createForImmediateExecution(final Trigger trigger) { + return new SimpleScheduleExec(trigger, Instant.now(), true); + } + private SimpleScheduleExec(final Trigger trigger, final Instant nextExecute) { this.trigger = trigger; this.nextExecute = nextExecute; } + private SimpleScheduleExec(final Trigger trigger, final Instant nextExecute, final boolean runIfDisabled) { + this.trigger = trigger; + this.nextExecute = nextExecute; + this.runIfDisabled = runIfDisabled; + } + /** * Should we execute. * @@ -49,16 +60,22 @@ public boolean execute() { } public boolean execute(final Instant now) { + LOGGER.trace("execute() (before) - now: {}, lastExecute: {}, nextExecute: {}, runIfDisabled: {}", + now, lastExecute, nextExecute, runIfDisabled); boolean willExecute = false; if (nextExecute == null) { nextExecute = trigger.getNextExecutionTimeAfter(now); } else if (now.isAfter(nextExecute)) { nextExecute = trigger.getNextExecutionTimeAfter(now); lastExecute = now; + // We are executing, so clear the runIfDisabled state for the next check + // which may be scheduled rather than as a result of the user doing a 'run now'. + runIfDisabled = false; willExecute = true; } - LOGGER.trace("execute() - now: {}, lastExecute: {}, nextExecute: {}, willExecute: {}", - now, lastExecute, nextExecute, willExecute); + LOGGER.trace( + "execute() (after) - now: {}, lastExecute: {}, nextExecute: {}, willExecute: {}, runIfDisabled: {}", + now, lastExecute, nextExecute, willExecute, runIfDisabled); return willExecute; } @@ -69,6 +86,15 @@ public Instant getLastExecutionTime() { return Instant.now(); } + /** + * @return True if this schedule has been marked to disregard the enabled state of the + * job or jobNode. This MUST be called before {@link SimpleScheduleExec#execute()} is called + * as {@link SimpleScheduleExec#execute()} will reset the runIfDisabled state. + */ + public boolean isRunIfDisabled() { + return runIfDisabled; + } + @Override public String toString() { final StringBuilder sb = new StringBuilder(); @@ -83,15 +109,19 @@ public String toString() { sb.append(DateUtil.createNormalDateTimeString(nextExecute)); sb.append("\" "); } + sb.append("runIfDisabled=") + .append(runIfDisabled); return sb.toString(); } - public SimpleScheduleExec cloneWithImmediateExecution() { - // Set the nextExecute time to now, so that the next time execute() is called for this job, it will - // return true and thus run. + /** + * Set the nextExecute time to now, so that the next time execute() is called for this job, it will + * return true and thus run. + */ + public SimpleScheduleExec cloneForImmediateExecution() { final Instant now = Instant.now(); LOGGER.debug("cloneWithImmediateExecution() - now: {}, lastExecute: {}, nextExecute: {}", now, lastExecute, nextExecute); - return new SimpleScheduleExec(trigger, now); + return new SimpleScheduleExec(trigger, now, true); } } diff --git a/unreleased_changes/20241010_123207_271__4532.md b/unreleased_changes/20241010_123207_271__4532.md new file mode 100644 index 00000000000..286d064f134 --- /dev/null +++ b/unreleased_changes/20241010_123207_271__4532.md @@ -0,0 +1,24 @@ +* Issue **#4532** : Fix Run Job Now so that it works when the job or jobNode is disabled. + + +```sh +# ******************************************************************************** +# Issue title: `Run Now` doesn't work if the job or jobNode is disabled +# Issue link: https://github.com/gchq/stroom/issues/4532 +# ******************************************************************************** + +# ONLY the top line will be included as a change entry in the CHANGELOG. +# The entry should be in GitHub flavour markdown and should be written on a SINGLE +# line with no hard breaks. You can have multiple change files for a single GitHub issue. +# The entry should be written in the imperative mood, i.e. 'Fix nasty bug' rather than +# 'Fixed nasty bug'. +# +# Examples of acceptable entries are: +# +# +# * Issue **123** : Fix bug with an associated GitHub issue in this repository +# +# * Issue **namespace/other-repo#456** : Fix bug with an associated GitHub issue in another repository +# +# * Fix bug with no associated GitHub issue. +```