Skip to content

Commit

Permalink
Add context test
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Jan 26, 2021
1 parent 6adc18d commit 0c8d07a
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 4 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
module github.com/Jeffail/tunny

go 1.13
9 changes: 5 additions & 4 deletions tunny.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package tunny

import (
"context"
"errors"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -218,7 +219,7 @@ func (p *Pool) ProcessTimed(
// the result. If the context cancels before the job has finished the worker will
// be interrupted and ErrJobTimedOut will be returned. ProcessCtx can be
// called safely by any goroutines.
func (p *Pool) ProcessCtx(ctx context.Context, i interface{}) (interface{}, error) {
func (p *Pool) ProcessCtx(ctx context.Context, payload interface{}) (interface{}, error) {
atomic.AddInt64(&p.queuedJobs, 1)
defer atomic.AddInt64(&p.queuedJobs, -1)

Expand All @@ -231,14 +232,14 @@ func (p *Pool) ProcessCtx(ctx context.Context, i interface{}) (interface{}, erro
return nil, ErrPoolNotRunning
}
case <-ctx.Done():
return nil, ErrJobTimedOut
return nil, ctx.Err()
}

select {
case request.jobChan <- payload:
case <-ctx.Done():
request.interruptFunc()
return nil, ErrJobTimedOut
return nil, ctx.Err()
}

select {
Expand All @@ -248,7 +249,7 @@ func (p *Pool) ProcessCtx(ctx context.Context, i interface{}) (interface{}, erro
}
case <-ctx.Done():
request.interruptFunc()
return nil, ErrJobTimedOut
return nil, ctx.Err()
}

return payload, nil
Expand Down
19 changes: 19 additions & 0 deletions tunny_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package tunny

import (
"context"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -106,6 +107,24 @@ func TestFuncJobTimed(t *testing.T) {
}
}

func TestFuncJobCtx(t *testing.T) {
pool := NewFunc(10, func(in interface{}) interface{} {
intVal := in.(int)
return intVal * 2
})
defer pool.Close()

for i := 0; i < 10; i++ {
ret, err := pool.ProcessCtx(context.Background(), 10)
if err != nil {
t.Fatalf("Failed to process: %v", err)
}
if exp, act := 20, ret.(int); exp != act {
t.Errorf("Wrong result: %v != %v", act, exp)
}
}
}

func TestCallbackJob(t *testing.T) {
pool := NewCallback(10)
defer pool.Close()
Expand Down

2 comments on commit 0c8d07a

@jarri-abidi
Copy link
Contributor

@jarri-abidi jarri-abidi commented on 0c8d07a Jan 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Jeffail I'll add another test for testing with a canceled context. Is it cool if I add a sub-test within the same TestFuncJobCtx func that you added here?
Also will add this to docs next to ProcessTimeout.

@Jeffail
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jarri-abidi, sure, sounds good 👍

Please sign in to comment.