Skip to content

Offset current time to previously scheduled date #96

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Sources/Queues/QueuesCommand.swift
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public final class QueuesCommand: Command {
}
}

private func schedule(_ job: AnyScheduledJob) {
private func schedule(_ job: AnyScheduledJob, minCurrentDate: Date? = nil) {
if self.isShuttingDown.load() {
self.application.logger.trace("Application is shutting down, cancelling scheduling \(job.job.name)")
return
Expand All @@ -161,7 +161,7 @@ public final class QueuesCommand: Command {
on: self.eventLoopGroup.next()
)

if let task = job.schedule(context: context) {
if let task = job.schedule(context: context, minCurrentDate: minCurrentDate) {
self.application.logger.trace("Job \(job.job.name) was scheduled successfully")
self.scheduledTasks[job.job.name] = task
task.done.whenComplete { result in
Expand All @@ -170,7 +170,7 @@ public final class QueuesCommand: Command {
context.logger.error("\(job.job.name) failed: \(error)")
case .success: break
}
self.schedule(job)
self.schedule(job, minCurrentDate: task.scheduledDate)
}
}
}
Expand Down
17 changes: 14 additions & 3 deletions Sources/Queues/ScheduledJob.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,18 @@ extension AnyScheduledJob {
struct Task {
let task: RepeatedTask
let done: EventLoopFuture<Void>
let scheduledDate: Date
}

func schedule(context: QueueContext) -> Task? {
func schedule(context: QueueContext, minCurrentDate: Date?) -> Task? {
context.logger.trace("Beginning the scheduler process")
guard let date = self.scheduler.nextDate() else {

var current = Date()
if let minCurrentDate = minCurrentDate {
current = max(minCurrentDate, current)
}

guard let date = self.scheduler.nextDate(current: current) else {
context.logger.debug("No date scheduled for \(self.job.name)")
return nil
}
Expand All @@ -45,6 +52,10 @@ extension AnyScheduledJob {
context.logger.trace("Running the scheduled job \(self.job.name)")
self.job.run(context: context).cascade(to: promise)
}
return .init(task: task, done: promise.futureResult)
return .init(
task: task,
done: promise.futureResult,
scheduledDate: date
)
}
}