Skip to content

Commit

Permalink
Add producer/consumer test for goredis
Browse files Browse the repository at this point in the history
  • Loading branch information
marselester committed Mar 18, 2023
1 parent 503243b commit bed471f
Showing 1 changed file with 52 additions and 7 deletions.
59 changes: 52 additions & 7 deletions celery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ import (

"github.com/go-kit/log"

"github.com/marselester/gopher-celery/goredis"
"github.com/marselester/gopher-celery/protocol"
)

func TestExecuteTaskPanic(t *testing.T) {
a := NewApp()
a.Register(
app := NewApp()
app.Register(
"myproject.apps.myapp.tasks.mytask",
"important",
func(ctx context.Context, p *TaskParam) error {
Expand All @@ -35,7 +36,7 @@ func TestExecuteTaskPanic(t *testing.T) {
}

want := "unexpected task error"
err := a.executeTask(context.Background(), &m)
err := app.executeTask(context.Background(), &m)
if !strings.HasPrefix(err.Error(), want) {
t.Errorf("expected %q got %q", want, err)
}
Expand Down Expand Up @@ -100,18 +101,18 @@ func TestExecuteTaskMiddlewares(t *testing.T) {
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
a := NewApp(
app := NewApp(
WithMiddlewares(tc.middlewares...),
)
a.Register(
app.Register(
"myproject.apps.myapp.tasks.mytask",
"important",
func(ctx context.Context, p *TaskParam) error {
return fmt.Errorf("task")
},
)

err := a.executeTask(ctx, &m)
err := app.executeTask(ctx, &m)
if !strings.HasPrefix(err.Error(), tc.want) {
t.Errorf("expected %q got %q", tc.want, err)
}
Expand Down Expand Up @@ -157,7 +158,7 @@ func TestProduceAndConsume(t *testing.T) {
}
}

func TestProduceAndConsume_100times(t *testing.T) {
func TestProduceAndConsume100times(t *testing.T) {
app := NewApp(WithLogger(log.NewJSONLogger(os.Stderr)))
for i := 0; i < 100; i++ {
err := app.Delay(
Expand Down Expand Up @@ -197,3 +198,47 @@ func TestProduceAndConsume_100times(t *testing.T) {
t.Errorf("expected sum %d got %d", want, sum)
}
}

func TestGoredisProduceAndConsume100times(t *testing.T) {
app := NewApp(
WithBroker(goredis.NewBroker()),
WithLogger(log.NewJSONLogger(os.Stderr)),
)
for i := 0; i < 100; i++ {
err := app.Delay(
"myproject.apps.myapp.tasks.mytask",
"important",
2,
3,
)
if err != nil {
t.Fatal(err)
}
}

// The test finishes either when ctx times out or all the tasks finish.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
t.Cleanup(cancel)

var sum int32
app.Register(
"myproject.apps.myapp.tasks.mytask",
"important",
func(ctx context.Context, p *TaskParam) error {
p.NameArgs("a", "b")
atomic.AddInt32(
&sum,
int32(p.MustInt("a")+p.MustInt("b")),
)
return nil
},
)
if err := app.Run(ctx); err != nil {
t.Error(err)
}

var want int32 = 500
if want != sum {
t.Errorf("expected sum %d got %d", want, sum)
}
}

0 comments on commit bed471f

Please sign in to comment.