Skip to content

Commit

Permalink
Support context in Atomic (#23)
Browse files Browse the repository at this point in the history
* chore: upgrade gomock

* feat: atomic with context

* chore: remove _example

* docs: update readme

Close #11

* test: fix cases
  • Loading branch information
wolfogre authored Oct 13, 2023
1 parent f093730 commit 3bc1c27
Show file tree
Hide file tree
Showing 13 changed files with 81 additions and 276 deletions.
102 changes: 26 additions & 76 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,117 +15,67 @@ A distributed cron framework.
go get github.com/gochore/dcron
```

## Example
## Quick Start

First of all, you should implement a distributed atomic operation:
First, implement a distributed atomic operation that only requires support for one method: `SetIfNotExists`.
You can implement it in any way you prefer, such as using Redis `SetNX`.

```go
type Atomic interface {
SetIfNotExists(key, value string) bool
}
```

You can implement it any way you like, for example, via Redis `SetNX`:
import "github.com/redis/go-redis/v9"

```go
type RedisAtomic struct {
client *redis.Client
}

func (m *RedisAtomic) SetIfNotExists(key, value string) bool {
ret := m.client.SetNX(key, value, time.Hour)
func (m *RedisAtomic) SetIfNotExists(ctx context.Context, key, value string) bool {
ret := m.client.SetNX(ctx, key, value, time.Hour)
return ret.Err() == nil && ret.Val()
}
```

Now we can create a cron with that:
Now you can create a cron with that:

```go
ra := &RedisAtomic{
// init redis client
func main() {
atomic := &RedisAtomic{
client: redis.NewClient(&redis.Options{
Addr: "localhost:6379",
}),
}
cron := dcron.NewCron(dcron.WithKey("TestCron"), dcron.WithAtomic(ra))
cron := dcron.NewCron(dcron.WithKey("TestCron"), dcron.WithAtomic(atomic))
}
```

There are many ways to create jobs:
- use `dcron.NewJob`;
- use `dcron.NewJobWithAutoKey`;
- implement interface `dcron.Job`.
Then, create a job and add it to the cron.

```go

func main {
job1 := dcron.NewJob("Job1", "*/15 * * * * *", func(ctx context.Context) error {
if task, ok := dcron.TaskFromContext(ctx); ok {
log.Println("run:", task.Job.Spec(), task.Key)
}
// do something
return nil
})
job2 := dcron.NewJobWithAutoKey("*/20 * * * * *", Job2)
job3 := Job3{}
}

func Job2(ctx context.Context) error {
if task, ok := dcron.TaskFromContext(ctx); ok {
log.Println("run:", task.Job.Spec(), task.Key)
}
// do something
return nil
}

type Job3 struct {
}

func (j Job3) Key() string {
return "Job3"
}

func (j Job3) Spec() string {
return "*/30 * * * * *"
}

func (j Job3) Run(ctx context.Context) error {
if task, ok := dcron.TaskFromContext(ctx); ok {
log.Println("run:", task.Job.Spec(), task.Key)
if err := cron.AddJobs(job1); err != nil {
log.Fatal(err)
}
// do something
return nil
}

func (j Job3) Options() []dcron.JobOption {
return nil
}
```

Finally, add the jobs to the cron, and start it:
Finally, start the cron:

```go
if err := cron.AddJobs(job1, job2, job3); err != nil {
panic(err)
}

cron.Start()
log.Println("cron started")
time.Sleep(time.Minute)
<-cron.Stop().Done()
log.Println("cron stopped")
```

You will see logging:

```text
2020/03/11 15:28:04 cron started
2020/03/11 15:28:15 run: */15 * * * * * dcron:TestCron.Job1@1583911695
2020/03/11 15:28:20 run: */20 * * * * * dcron:TestCron.Job2@1583911700
2020/03/11 15:28:30 run: */30 * * * * * dcron:TestCron.Job3@1583911710
2020/03/11 15:28:30 run: */15 * * * * * dcron:TestCron.Job1@1583911710
2020/03/11 15:28:40 run: */20 * * * * * dcron:TestCron.Job2@1583911720
2020/03/11 15:28:45 run: */15 * * * * * dcron:TestCron.Job1@1583911725
2020/03/11 15:29:00 run: */30 * * * * * dcron:TestCron.Job3@1583911740
2020/03/11 15:29:00 run: */15 * * * * * dcron:TestCron.Job1@1583911740
2020/03/11 15:29:00 run: */20 * * * * * dcron:TestCron.Job2@1583911740
2020/03/11 15:29:04 cron stopped
```
If you start the program multiple times, you will notice that the cron will run the job once every 15 seconds on only one of the processes.

There is the complete example: [_example/main.go](https://github.com/gochore/dcron/tree/_example/main.go).
| process 1 | process 2 | process 3 |
|------------------------------------------------------------------------|------------------------------------------------------------------------|------------------------------------------------------------------------|
| 2023/10/13 11:39:45 cron started | 2023/10/13 11:39:47 cron started | 2023/10/13 11:39:48 cron started |
| | | 2023/10/13 11:40:00 run: */15 * * * * * dcron:TestCron.Job1@1697168400 |
| | 2023/10/13 11:40:15 run: */15 * * * * * dcron:TestCron.Job1@1697168415 | |
| | | 2023/10/13 11:40:30 run: */15 * * * * * dcron:TestCron.Job1@1697168430 |
| 2023/10/13 11:40:45 run: */15 * * * * * dcron:TestCron.Job1@1697168445 | | |
8 changes: 0 additions & 8 deletions _example/go.mod

This file was deleted.

54 changes: 0 additions & 54 deletions _example/go.sum

This file was deleted.

77 changes: 0 additions & 77 deletions _example/main.go

This file was deleted.

8 changes: 6 additions & 2 deletions atomic.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package dcron

//go:generate mockgen -source=atomic.go -destination mock_dcron/atomic.go
import "context"

//go:generate go get go.uber.org/mock/mockgen
//go:generate go run go.uber.org/mock/mockgen -source=atomic.go -destination mock_dcron/atomic.go
//go:generate go mod tidy

// Atomic provides distributed atomic operation for dcron,
// it can be implemented easily via Redis/SQL and so on.
Expand All @@ -9,5 +13,5 @@ type Atomic interface {
// or does nothing and return false.
// Note that the key/value should be kept for at least one minute.
// For example, `SetNX(key, value, time.Minute)` via redis.
SetIfNotExists(key, value string) bool
SetIfNotExists(ctx context.Context, key, value string) bool
}
12 changes: 6 additions & 6 deletions cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import (
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/robfig/cron/v3"

"github.com/gochore/dcron/mock_dcron"

"github.com/robfig/cron/v3"
"go.uber.org/mock/gomock"
)

func Test_Cron(t *testing.T) {
Expand All @@ -20,7 +20,7 @@ func Test_Cron(t *testing.T) {
c := NewCron(WithKey("test_cron"), WithAtomic(atomic))

atomic.EXPECT().
SetIfNotExists(gomock.Any(), c.Hostname()).
SetIfNotExists(gomock.Any(), gomock.Any(), c.Hostname()).
Return(true).
Times(2)

Expand Down Expand Up @@ -304,8 +304,8 @@ func Test_JobWithGroup(t *testing.T) {
c := NewCron(WithKey("test_cron"), WithAtomic(atomic))

atomic.EXPECT().
SetIfNotExists(gomock.Any(), c.Hostname()).
DoAndReturn(func(key, value interface{}) bool {
SetIfNotExists(gomock.Any(), gomock.Any(), c.Hostname()).
DoAndReturn(func(ctx context.Context, key, value string) bool {
time.Sleep(time.Duration(rand.Int63n(int64(time.Millisecond))))
return true
}).
Expand Down
4 changes: 3 additions & 1 deletion entry_getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package dcron

import "github.com/robfig/cron/v3"

//go:generate mockgen -source=entry_getter.go -destination mock_dcron/entry_getter.go
//go:generate go get go.uber.org/mock/mockgen
//go:generate go run go.uber.org/mock/mockgen -source=entry_getter.go -destination mock_dcron/entry_getter.go
//go:generate go mod tidy

type entryGetter interface {
Entry(id cron.EntryID) cron.Entry
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ module github.com/gochore/dcron
go 1.21.3

require (
github.com/golang/mock v1.6.0
github.com/robfig/cron/v3 v3.0.1
go.uber.org/mock v0.3.0
)
27 changes: 2 additions & 25 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,27 +1,4 @@
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
go.uber.org/mock v0.3.0 h1:3mUxI1No2/60yUYax92Pt8eNOEecx2D3lcXZh2NEZJo=
go.uber.org/mock v0.3.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc=
2 changes: 1 addition & 1 deletion inner_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (j *innerJob) Run() {

if !task.Skipped {
checkAtomic := func() bool {
return j.noMutex || j.cron.atomic == nil || j.cron.atomic.SetIfNotExists(task.Key, c.hostname)
return j.noMutex || j.cron.atomic == nil || j.cron.atomic.SetIfNotExists(ctx, task.Key, c.hostname)
}
needExec := false
if j.group != nil {
Expand Down
Loading

0 comments on commit 3bc1c27

Please sign in to comment.