Skip to content

Commit

Permalink
feat: support statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
wolfogre committed Mar 3, 2020
1 parent 1dccb25 commit 530be6e
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 12 deletions.
18 changes: 18 additions & 0 deletions cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
type CronMeta interface {
Key() string
Hostname() string
Statistics() Statistics
Jobs() []JobMeta
}

type Cron struct {
Expand Down Expand Up @@ -109,3 +111,19 @@ func (c *Cron) Key() string {
func (c *Cron) Hostname() string {
return c.hostname
}

func (c *Cron) Statistics() Statistics {
ret := Statistics{}
for _, j := range c.jobs {
ret.add(j.statistics)
}
return ret
}

func (c *Cron) Jobs() []JobMeta {
var ret []JobMeta
for _, j := range c.jobs {
ret = append(ret, j)
}
return ret
}
5 changes: 5 additions & 0 deletions cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ func Test_Cron(t *testing.T) {
c.Run() // should be not working
time.Sleep(10 * time.Second)
<-c.Stop().Done()

t.Logf("cron statistics: %+v", c.Statistics())
for _, j := range c.Jobs() {
t.Logf("job %v statistics: %+v", j.Key(), j.Statistics())
}
}

func TestCron_AddJobs(t *testing.T) {
Expand Down
34 changes: 29 additions & 5 deletions inner_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,18 @@ import (
"context"
"fmt"
"runtime/debug"
"sync/atomic"
"time"

"github.com/robfig/cron/v3"
)

type JobMeta interface {
Key() string
Spec() string
Statistics() Statistics
}

type innerJob struct {
cron *Cron
entryID cron.EntryID
Expand All @@ -21,6 +28,7 @@ type innerJob struct {
retryTimes int
retryInterval RetryInterval
noMutex bool
statistics Statistics
}

func (j *innerJob) Key() string {
Expand All @@ -31,6 +39,10 @@ func (j *innerJob) Spec() string {
return j.spec
}

func (j *innerJob) Statistics() Statistics {
return j.statistics
}

func (j *innerJob) Run() {
c := j.cron
entry := j.entryGetter.Entry(j.entryID)
Expand All @@ -45,17 +57,14 @@ func (j *innerJob) Run() {
PlanAt: planAt,
TriedTimes: 0,
}
atomic.AddInt64(&j.statistics.TotalTask, 1)

ctx, cancel := context.WithDeadline(context.WithValue(context.Background(), keyContextTask, task), nextAt)
defer cancel()

skip := false
if j.before != nil && j.before(task) {
skip = true
}

if skip {
task.Skipped = true
atomic.AddInt64(&j.statistics.SkippedTask, 1)
}

if !task.Skipped {
Expand All @@ -65,10 +74,16 @@ func (j *innerJob) Run() {

for i := 0; i < j.retryTimes; i++ {
task.Return = safeRun(ctx, j.run)
atomic.AddInt64(&j.statistics.TotalRun, 1)
if i > 0 {
atomic.AddInt64(&j.statistics.RetriedRun, 1)
}
task.TriedTimes++
if task.Return == nil {
atomic.AddInt64(&j.statistics.PassedRun, 1)
break
}
atomic.AddInt64(&j.statistics.FailedRun, 1)
if ctx.Err() != nil {
break
}
Expand All @@ -86,12 +101,21 @@ func (j *innerJob) Run() {
task.EndAt = &endAt
} else {
task.Missed = true
atomic.AddInt64(&j.statistics.MissedTask, 1)
}
}

if j.after != nil {
j.after(task)
}

if !task.Skipped && !task.Missed {
if task.Return == nil {
atomic.AddInt64(&j.statistics.PassedTask, 1)
} else {
atomic.AddInt64(&j.statistics.FailedTask, 1)
}
}
}

func (j *innerJob) Cron() *Cron {
Expand Down
96 changes: 94 additions & 2 deletions inner_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,9 @@ func Test_innerJob_Run(t *testing.T) {
retryInterval RetryInterval
}
tests := []struct {
name string
fields fields
name string
fields fields
statistics Statistics
}{
{
name: "regular",
Expand All @@ -167,6 +168,17 @@ func Test_innerJob_Run(t *testing.T) {
},
retryTimes: 1,
},
statistics: Statistics{
TotalTask: 1,
PassedTask: 1,
FailedTask: 0,
SkippedTask: 0,
MissedTask: 0,
TotalRun: 1,
PassedRun: 1,
FailedRun: 0,
RetriedRun: 0,
},
},
{
name: "skip",
Expand All @@ -187,6 +199,17 @@ func Test_innerJob_Run(t *testing.T) {
},
retryTimes: 1,
},
statistics: Statistics{
TotalTask: 1,
PassedTask: 0,
FailedTask: 0,
SkippedTask: 1,
MissedTask: 0,
TotalRun: 0,
PassedRun: 0,
FailedRun: 0,
RetriedRun: 0,
},
},
{
name: "retry",
Expand All @@ -210,6 +233,17 @@ func Test_innerJob_Run(t *testing.T) {
},
retryTimes: 10,
},
statistics: Statistics{
TotalTask: 1,
PassedTask: 0,
FailedTask: 1,
SkippedTask: 0,
MissedTask: 0,
TotalRun: 10,
PassedRun: 0,
FailedRun: 10,
RetriedRun: 9,
},
},
{
name: "retry with interval",
Expand All @@ -236,6 +270,17 @@ func Test_innerJob_Run(t *testing.T) {
return time.Duration(triedTimes) * time.Second
},
},
statistics: Statistics{
TotalTask: 1,
PassedTask: 0,
FailedTask: 1,
SkippedTask: 0,
MissedTask: 0,
TotalRun: 3,
PassedRun: 0,
FailedRun: 3,
RetriedRun: 2,
},
},
{
name: "take too long",
Expand All @@ -261,6 +306,17 @@ func Test_innerJob_Run(t *testing.T) {
retryTimes: 5,
retryInterval: nil,
},
statistics: Statistics{
TotalTask: 1,
PassedTask: 0,
FailedTask: 1,
SkippedTask: 0,
MissedTask: 0,
TotalRun: 1,
PassedRun: 0,
FailedRun: 1,
RetriedRun: 0,
},
},
{
name: "miss",
Expand All @@ -281,6 +337,17 @@ func Test_innerJob_Run(t *testing.T) {
},
retryTimes: 1,
},
statistics: Statistics{
TotalTask: 1,
PassedTask: 0,
FailedTask: 0,
SkippedTask: 0,
MissedTask: 1,
TotalRun: 0,
PassedRun: 0,
FailedRun: 0,
RetriedRun: 0,
},
},
{
name: "panic by calling",
Expand All @@ -298,6 +365,17 @@ func Test_innerJob_Run(t *testing.T) {
},
retryTimes: 1,
},
statistics: Statistics{
TotalTask: 1,
PassedTask: 0,
FailedTask: 1,
SkippedTask: 0,
MissedTask: 0,
TotalRun: 1,
PassedRun: 0,
FailedRun: 1,
RetriedRun: 0,
},
},
{
name: "panic by runtime",
Expand All @@ -317,6 +395,17 @@ func Test_innerJob_Run(t *testing.T) {
},
retryTimes: 1,
},
statistics: Statistics{
TotalTask: 1,
PassedTask: 0,
FailedTask: 1,
SkippedTask: 0,
MissedTask: 0,
TotalRun: 1,
PassedRun: 0,
FailedRun: 1,
RetriedRun: 0,
},
},
}
for _, tt := range tests {
Expand All @@ -334,6 +423,9 @@ func Test_innerJob_Run(t *testing.T) {
retryInterval: tt.fields.retryInterval,
}
j.Run()
if got := j.Statistics(); got != tt.statistics {
t.Errorf("Statistics() = %v, want %v", got, tt.statistics)
}
})
}
}
Expand Down
6 changes: 1 addition & 5 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,9 @@ import (
"strings"
)

type JobMeta interface {
type Job interface {
Key() string
Spec() string
}

type Job interface {
JobMeta
Run(ctx context.Context) error
Options() []JobOption
}
Expand Down
26 changes: 26 additions & 0 deletions statistics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package dcron

type Statistics struct {
TotalTask int64
PassedTask int64
FailedTask int64
SkippedTask int64
MissedTask int64

TotalRun int64
PassedRun int64
FailedRun int64
RetriedRun int64
}

func (s *Statistics) add(delta Statistics) {
s.TotalTask += delta.TotalTask
s.PassedTask += delta.PassedTask
s.FailedTask += delta.FailedTask
s.SkippedTask += delta.SkippedTask
s.MissedTask += delta.MissedTask
s.TotalRun += delta.TotalRun
s.PassedRun += delta.PassedRun
s.FailedRun += delta.FailedRun
s.RetriedRun += delta.RetriedRun
}

0 comments on commit 530be6e

Please sign in to comment.