diff --git a/job/job.go b/job/job.go index fbd5fafc..6e4d41ce 100644 --- a/job/job.go +++ b/job/job.go @@ -96,7 +96,7 @@ type Job struct { ResourceID, ResourceType string entryID *cron.EntryID lock *sync.Mutex - lastRun *time.Time + lastHistoryCleanup time.Time Retention Retention LastJob *models.JobHistory initialized bool @@ -271,7 +271,7 @@ func (j *Job) cleanupHistory() int { ctx, span := j.Context.StartSpan("CleanupHistory") defer span.End() db := ctx.FastDB() - if err := db.Exec("DELETE FROM job_history WHERE name = ? AND now() - created_at > interval '1 minute' * ?", j.Name, j.Retention.Age.Minutes()).Error; err != nil { + if err := db.Exec("DELETE FROM job_history WHERE name = ? AND resource_id = ? AND now() - created_at > interval '1 minute' * ?", j.Name, j.ResourceID, j.Retention.Age.Minutes()).Error; err != nil { ctx.Warnf("failed to cleanup history %v", err) } query := `WITH ordered_history AS ( @@ -280,7 +280,7 @@ func (j *Job) cleanupHistory() int { status, ROW_NUMBER() OVER (PARTITION by resource_id, name, status ORDER BY created_at DESC) FROM job_history - WHERE name = ? AND status IN ? + WHERE name = ? AND resource_id = ? AND status IN ? ) DELETE FROM job_history WHERE id IN ( SELECT id FROM ordered_history WHERE row_number > ? @@ -296,7 +296,7 @@ func (j *Job) cleanupHistory() int { } count := 0 for _, r := range policies { - tx := db.Exec(query, j.Name, r.statuses, r.count) + tx := db.Exec(query, j.Name, j.ResourceID, r.statuses, r.count) count += int(tx.RowsAffected) if tx.Error != nil { ctx.Warnf("failed to cleanup history: %v", tx.Error) @@ -308,10 +308,6 @@ func (j *Job) cleanupHistory() int { func (j *Job) Run() { j.init() - if j.lastRun != nil && time.Since(*j.lastRun) > j.Retention.Interval { - defer j.cleanupHistory() - } - j.lastRun = lo.ToPtr(time.Now()) ctx, span := j.Context.StartSpan(j.Name) defer span.End() @@ -349,6 +345,13 @@ func (j *Job) Run() { defer cancel() } + if shouldCleanupHistory(j.lastHistoryCleanup, j.Retention.Age) { + defer func() { + j.cleanupHistory() + j.lastHistoryCleanup = time.Now() + }() + } + err := j.Fn(r) if err != nil { ctx.Tracef("finished duration=%s, error=%s", time.Since(r.History.TimeStart), err) @@ -358,6 +361,22 @@ func (j *Job) Run() { } } +func shouldCleanupHistory(lastCleanup time.Time, retentionAge time.Duration) bool { + cleanupInterval := time.Hour * 6 + + // If retention is more than a day, cleanup every half a day + if retentionAge >= (24 * time.Hour) { + cleanupInterval = 12 * time.Hour + } + + // If retention is less than an hour, cleanup every hour + if retentionAge <= (time.Hour) { + cleanupInterval = time.Hour + } + + return time.Since(lastCleanup) >= cleanupInterval +} + func getProperty(j *Job, properties map[string]string, property string) (string, bool) { if val, ok := properties[j.Name+"."+property]; ok { return val, ok @@ -376,8 +395,10 @@ func (j *Job) init() { if j.initialized { return } - properties := j.Context.Properties() + j.lastHistoryCleanup = time.Now() + + properties := j.Context.Properties() if schedule, ok := getProperty(j, properties, "schedule"); ok { j.Schedule = schedule } @@ -406,6 +427,7 @@ func (j *Job) init() { j.Debug = debug == "true" } + // Set default retention if it is unset if j.Retention.Age.Nanoseconds() == 0 { j.Retention = Retention{ Success: 1, Failed: 3,