-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathslowtask_test.go
77 lines (65 loc) · 1.69 KB
/
slowtask_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package metafora
import (
"testing"
"time"
)
type releaseAllBalancer struct {
balances chan int
ctx BalancerContext
}
func (b *releaseAllBalancer) Init(c BalancerContext) {
b.ctx = c
b.balances = make(chan int)
}
func (b *releaseAllBalancer) CanClaim(Task) (time.Time, bool) { return NoDelay, true }
func (b *releaseAllBalancer) Balance() []string {
b.balances <- 1
ids := []string{}
for _, task := range b.ctx.Tasks() {
ids = append(ids, task.Task().ID())
}
return ids
}
func TestDoubleRelease(t *testing.T) {
t.Parallel()
started := make(chan int)
reallyStop := make(chan bool)
h := SimpleHandler(func(task Task, stop <-chan bool) bool {
started <- 1
t.Logf("TestDoubleRelease handler recieved %s - blocking until reallyStop closed.", task)
<-reallyStop
return true
})
tc := NewTestCoord()
b := &releaseAllBalancer{}
c, err := NewConsumer(tc, h, b)
if err != nil {
t.Fatalf("Error creating consumer: %v", err)
}
go c.Run()
// This won't exit when told to
tc.Tasks <- testTask{"1"}
<-started
// Make sure balancing/mainloop isn't blocked
tc.Commands <- CommandBalance()
<-b.balances
tc.Commands <- CommandBalance()
<-b.balances
tc.Commands <- CommandBalance()
<-b.balances
shutdownComplete := make(chan bool)
go func() {
c.Shutdown()
close(shutdownComplete)
}()
// Make sure the release insidiously blocks until we close reallyStop
select {
case <-shutdownComplete:
t.Fatal("Shutdown completed when it should have blocked indefinitely")
case <-time.After(100 * time.Millisecond):
}
// Close reallyStop and make sure Shutdown actually exits
close(reallyStop)
// Make sure the release insidiously blocks until we close reallyStop
<-shutdownComplete
}