diff --git a/Sources/Queues/QueuesCommand.swift b/Sources/Queues/QueuesCommand.swift index b9ff152..3b1fc52 100644 --- a/Sources/Queues/QueuesCommand.swift +++ b/Sources/Queues/QueuesCommand.swift @@ -144,7 +144,7 @@ public final class QueuesCommand: Command { } } - private func schedule(_ job: AnyScheduledJob) { + private func schedule(_ job: AnyScheduledJob, minCurrentDate: Date? = nil) { if self.isShuttingDown.load() { self.application.logger.trace("Application is shutting down, cancelling scheduling \(job.job.name)") return @@ -161,7 +161,7 @@ public final class QueuesCommand: Command { on: self.eventLoopGroup.next() ) - if let task = job.schedule(context: context) { + if let task = job.schedule(context: context, minCurrentDate: minCurrentDate) { self.application.logger.trace("Job \(job.job.name) was scheduled successfully") self.scheduledTasks[job.job.name] = task task.done.whenComplete { result in @@ -170,7 +170,7 @@ public final class QueuesCommand: Command { context.logger.error("\(job.job.name) failed: \(error)") case .success: break } - self.schedule(job) + self.schedule(job, minCurrentDate: task.scheduledDate) } } } diff --git a/Sources/Queues/ScheduledJob.swift b/Sources/Queues/ScheduledJob.swift index 6c57c72..94a320f 100644 --- a/Sources/Queues/ScheduledJob.swift +++ b/Sources/Queues/ScheduledJob.swift @@ -26,11 +26,18 @@ extension AnyScheduledJob { struct Task { let task: RepeatedTask let done: EventLoopFuture + let scheduledDate: Date } - func schedule(context: QueueContext) -> Task? { + func schedule(context: QueueContext, minCurrentDate: Date?) -> Task? { context.logger.trace("Beginning the scheduler process") - guard let date = self.scheduler.nextDate() else { + + var current = Date() + if let minCurrentDate = minCurrentDate { + current = max(minCurrentDate, current) + } + + guard let date = self.scheduler.nextDate(current: current) else { context.logger.debug("No date scheduled for \(self.job.name)") return nil } @@ -45,6 +52,10 @@ extension AnyScheduledJob { context.logger.trace("Running the scheduled job \(self.job.name)") self.job.run(context: context).cascade(to: promise) } - return .init(task: task, done: promise.futureResult) + return .init( + task: task, + done: promise.futureResult, + scheduledDate: date + ) } }