Skip to content

Commit

Permalink
Merge pull request #16 from gochore/dev
Browse files Browse the repository at this point in the history
support statistics
  • Loading branch information
wolfogre authored Mar 3, 2020
2 parents 088b999 + 530be6e commit bf9e852
Show file tree
Hide file tree
Showing 8 changed files with 226 additions and 30 deletions.
28 changes: 27 additions & 1 deletion cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ import (
"fmt"
"os"
"strings"
"time"

"github.com/robfig/cron/v3"
)

type CronMeta interface {
Key() string
Hostname() string
Statistics() Statistics
Jobs() []JobMeta
}

type Cron struct {
Expand All @@ -21,16 +24,23 @@ type Cron struct {
cron *cron.Cron
atomic Atomic
jobs []*innerJob
location *time.Location
}

func NewCron(options ...CronOption) *Cron {
ret := &Cron{
cron: cron.New(cron.WithSeconds(), cron.WithLogger(cron.DiscardLogger)),
location: time.Local,
}
ret.hostname, _ = os.Hostname()
for _, option := range options {
option(ret)
}

ret.cron = cron.New(
cron.WithSeconds(),
cron.WithLogger(cron.DiscardLogger),
cron.WithLocation(ret.location),
)
return ret
}

Expand Down Expand Up @@ -101,3 +111,19 @@ func (c *Cron) Key() string {
func (c *Cron) Hostname() string {
return c.hostname
}

func (c *Cron) Statistics() Statistics {
ret := Statistics{}
for _, j := range c.jobs {
ret.add(j.statistics)
}
return ret
}

func (c *Cron) Jobs() []JobMeta {
var ret []JobMeta
for _, j := range c.jobs {
ret = append(ret, j)
}
return ret
}
8 changes: 8 additions & 0 deletions cron_option.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package dcron

import "time"

type CronOption func(c *Cron)

func WithKey(key string) CronOption {
Expand All @@ -19,3 +21,9 @@ func WithAtomic(atomic Atomic) CronOption {
c.atomic = atomic
}
}

func WithLocation(loc *time.Location) CronOption {
return func(c *Cron) {
c.location = loc
}
}
33 changes: 33 additions & 0 deletions cron_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dcron

import (
"testing"
"time"

"github.com/gochore/dcron/mock_dcron"
)
Expand Down Expand Up @@ -98,3 +99,35 @@ func TestWithAtomic(t *testing.T) {
})
}
}

func TestWithLocation(t *testing.T) {
type args struct {
loc *time.Location
}
tests := []struct {
name string
args args
want CronOption
check func(t *testing.T, option CronOption)
}{
{
name: "regular",
args: args{
loc: time.UTC,
},
check: func(t *testing.T, option CronOption) {
c := NewCron()
option(c)
if c.location != time.UTC {
t.Fatal(c.location)
}
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := WithLocation(tt.args.loc)
tt.check(t, got)
})
}
}
5 changes: 5 additions & 0 deletions cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ func Test_Cron(t *testing.T) {
c.Run() // should be not working
time.Sleep(10 * time.Second)
<-c.Stop().Done()

t.Logf("cron statistics: %+v", c.Statistics())
for _, j := range c.Jobs() {
t.Logf("job %v statistics: %+v", j.Key(), j.Statistics())
}
}

func TestCron_AddJobs(t *testing.T) {
Expand Down
50 changes: 30 additions & 20 deletions inner_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,19 @@ package dcron
import (
"context"
"fmt"
"runtime"
"runtime/debug"
"strings"
"sync/atomic"
"time"

"github.com/robfig/cron/v3"
)

type JobMeta interface {
Key() string
Spec() string
Statistics() Statistics
}

type innerJob struct {
cron *Cron
entryID cron.EntryID
Expand All @@ -23,6 +28,7 @@ type innerJob struct {
retryTimes int
retryInterval RetryInterval
noMutex bool
statistics Statistics
}

func (j *innerJob) Key() string {
Expand All @@ -33,6 +39,10 @@ func (j *innerJob) Spec() string {
return j.spec
}

func (j *innerJob) Statistics() Statistics {
return j.statistics
}

func (j *innerJob) Run() {
c := j.cron
entry := j.entryGetter.Entry(j.entryID)
Expand All @@ -47,17 +57,14 @@ func (j *innerJob) Run() {
PlanAt: planAt,
TriedTimes: 0,
}
atomic.AddInt64(&j.statistics.TotalTask, 1)

ctx, cancel := context.WithDeadline(context.WithValue(context.Background(), keyContextTask, task), nextAt)
defer cancel()

skip := false
if j.before != nil && j.before(task) {
skip = true
}

if skip {
task.Skipped = true
atomic.AddInt64(&j.statistics.SkippedTask, 1)
}

if !task.Skipped {
Expand All @@ -67,10 +74,16 @@ func (j *innerJob) Run() {

for i := 0; i < j.retryTimes; i++ {
task.Return = safeRun(ctx, j.run)
atomic.AddInt64(&j.statistics.TotalRun, 1)
if i > 0 {
atomic.AddInt64(&j.statistics.RetriedRun, 1)
}
task.TriedTimes++
if task.Return == nil {
atomic.AddInt64(&j.statistics.PassedRun, 1)
break
}
atomic.AddInt64(&j.statistics.FailedRun, 1)
if ctx.Err() != nil {
break
}
Expand All @@ -88,12 +101,21 @@ func (j *innerJob) Run() {
task.EndAt = &endAt
} else {
task.Missed = true
atomic.AddInt64(&j.statistics.MissedTask, 1)
}
}

if j.after != nil {
j.after(task)
}

if !task.Skipped && !task.Missed {
if task.Return == nil {
atomic.AddInt64(&j.statistics.PassedTask, 1)
} else {
atomic.AddInt64(&j.statistics.FailedTask, 1)
}
}
}

func (j *innerJob) Cron() *Cron {
Expand All @@ -103,19 +125,7 @@ func (j *innerJob) Cron() *Cron {
func safeRun(ctx context.Context, run RunFunc) (err error) {
defer func() {
if r := recover(); r != nil {
pc := make([]uintptr, 16)
n := runtime.Callers(0, pc)
for _, p := range pc[:n] {
fn := runtime.FuncForPC(p)
if fn != nil {
file, line := fn.FileLine(p)
if !strings.Contains(fn.Name(), "runtime") {
err = fmt.Errorf("panic(%v) at %s:%d", r, file, line)
return
}
}
}
err = fmt.Errorf("panic(%v): %s", r, debug.Stack())
err = fmt.Errorf("%v: %s", r, debug.Stack())
}
}()
return run(ctx)
Expand Down
Loading

0 comments on commit bf9e852

Please sign in to comment.