Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offset current time to previously scheduled date #96

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Sources/Queues/QueuesCommand.swift
Original file line number Diff line number Diff line change
@@ -144,7 +144,7 @@ public final class QueuesCommand: Command {
}
}

private func schedule(_ job: AnyScheduledJob) {
private func schedule(_ job: AnyScheduledJob, minCurrentDate: Date? = nil) {
if self.isShuttingDown.load() {
self.application.logger.trace("Application is shutting down, cancelling scheduling \(job.job.name)")
return
@@ -161,7 +161,7 @@ public final class QueuesCommand: Command {
on: self.eventLoopGroup.next()
)

if let task = job.schedule(context: context) {
if let task = job.schedule(context: context, minCurrentDate: minCurrentDate) {
self.application.logger.trace("Job \(job.job.name) was scheduled successfully")
self.scheduledTasks[job.job.name] = task
task.done.whenComplete { result in
@@ -170,7 +170,7 @@ public final class QueuesCommand: Command {
context.logger.error("\(job.job.name) failed: \(error)")
case .success: break
}
self.schedule(job)
self.schedule(job, minCurrentDate: task.scheduledDate)
}
}
}
17 changes: 14 additions & 3 deletions Sources/Queues/ScheduledJob.swift
Original file line number Diff line number Diff line change
@@ -26,11 +26,18 @@ extension AnyScheduledJob {
struct Task {
let task: RepeatedTask
let done: EventLoopFuture<Void>
let scheduledDate: Date
}

func schedule(context: QueueContext) -> Task? {
func schedule(context: QueueContext, minCurrentDate: Date?) -> Task? {
context.logger.trace("Beginning the scheduler process")
guard let date = self.scheduler.nextDate() else {

var current = Date()
if let minCurrentDate = minCurrentDate {
current = max(minCurrentDate, current)
}

guard let date = self.scheduler.nextDate(current: current) else {
context.logger.debug("No date scheduled for \(self.job.name)")
return nil
}
@@ -45,6 +52,10 @@ extension AnyScheduledJob {
context.logger.trace("Running the scheduled job \(self.job.name)")
self.job.run(context: context).cascade(to: promise)
}
return .init(task: task, done: promise.futureResult)
return .init(
task: task,
done: promise.futureResult,
scheduledDate: date
)
}
}