Skip to content

Commit

Permalink
fix: change job history cleanup invokation
Browse files Browse the repository at this point in the history
  • Loading branch information
yashmehrotra committed Mar 7, 2024
1 parent 9d90b0e commit 7578e57
Showing 1 changed file with 31 additions and 9 deletions.
40 changes: 31 additions & 9 deletions job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type Job struct {
ResourceID, ResourceType string
entryID *cron.EntryID
lock *sync.Mutex
lastRun *time.Time
lastHistoryCleanup time.Time
Retention Retention
LastJob *models.JobHistory
initialized bool
Expand Down Expand Up @@ -271,7 +271,7 @@ func (j *Job) cleanupHistory() int {
ctx, span := j.Context.StartSpan("CleanupHistory")
defer span.End()
db := ctx.FastDB()
if err := db.Exec("DELETE FROM job_history WHERE name = ? AND now() - created_at > interval '1 minute' * ?", j.Name, j.Retention.Age.Minutes()).Error; err != nil {
if err := db.Exec("DELETE FROM job_history WHERE name = ? AND resource_id = ? AND now() - created_at > interval '1 minute' * ?", j.Name, j.ResourceID, j.Retention.Age.Minutes()).Error; err != nil {
ctx.Warnf("failed to cleanup history %v", err)
}
query := `WITH ordered_history AS (
Expand All @@ -280,7 +280,7 @@ func (j *Job) cleanupHistory() int {
status,
ROW_NUMBER() OVER (PARTITION by resource_id, name, status ORDER BY created_at DESC)
FROM job_history
WHERE name = ? AND status IN ?
WHERE name = ? AND resource_id = ? AND status IN ?
)
DELETE FROM job_history WHERE id IN (
SELECT id FROM ordered_history WHERE row_number > ?
Expand All @@ -296,7 +296,7 @@ func (j *Job) cleanupHistory() int {
}
count := 0
for _, r := range policies {
tx := db.Exec(query, j.Name, r.statuses, r.count)
tx := db.Exec(query, j.Name, j.ResourceID, r.statuses, r.count)
count += int(tx.RowsAffected)
if tx.Error != nil {
ctx.Warnf("failed to cleanup history: %v", tx.Error)
Expand All @@ -308,10 +308,6 @@ func (j *Job) cleanupHistory() int {

func (j *Job) Run() {
j.init()
if j.lastRun != nil && time.Since(*j.lastRun) > j.Retention.Interval {
defer j.cleanupHistory()
}
j.lastRun = lo.ToPtr(time.Now())
ctx, span := j.Context.StartSpan(j.Name)

defer span.End()
Expand Down Expand Up @@ -349,6 +345,13 @@ func (j *Job) Run() {
defer cancel()
}

if shouldCleanupHistory(j.lastHistoryCleanup, j.Retention.Age) {
defer func() {
j.cleanupHistory()
j.lastHistoryCleanup = time.Now()
}()
}

err := j.Fn(r)
if err != nil {
ctx.Tracef("finished duration=%s, error=%s", time.Since(r.History.TimeStart), err)
Expand All @@ -358,6 +361,22 @@ func (j *Job) Run() {
}
}

func shouldCleanupHistory(lastCleanup time.Time, retentionAge time.Duration) bool {
cleanupInterval := time.Hour * 6

// If retention is more than a day, cleanup every half a day
if retentionAge >= (24 * time.Hour) {
cleanupInterval = 12 * time.Hour
}

// If retention is less than an hour, cleanup every hour
if retentionAge <= (time.Hour) {
cleanupInterval = time.Hour
}

return time.Since(lastCleanup) >= cleanupInterval
}

func getProperty(j *Job, properties map[string]string, property string) (string, bool) {
if val, ok := properties[j.Name+"."+property]; ok {
return val, ok
Expand All @@ -376,8 +395,10 @@ func (j *Job) init() {
if j.initialized {
return
}
properties := j.Context.Properties()

j.lastHistoryCleanup = time.Now()

properties := j.Context.Properties()
if schedule, ok := getProperty(j, properties, "schedule"); ok {
j.Schedule = schedule
}
Expand Down Expand Up @@ -406,6 +427,7 @@ func (j *Job) init() {
j.Debug = debug == "true"
}

// Set default retention if it is unset
if j.Retention.Age.Nanoseconds() == 0 {
j.Retention = Retention{
Success: 1, Failed: 3,
Expand Down

0 comments on commit 7578e57

Please sign in to comment.