-
-
Notifications
You must be signed in to change notification settings - Fork 52
/
process.go
88 lines (71 loc) · 1.75 KB
/
process.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package neffos
import (
"sync"
"sync/atomic"
)
// processes is a collection of `process`.
type processes struct {
entries map[string]*process
locker *sync.RWMutex
}
func newProcesses() *processes {
return &processes{
entries: make(map[string]*process),
locker: new(sync.RWMutex),
}
}
func (p *processes) get(name string) *process {
p.locker.RLock()
entry := p.entries[name]
p.locker.RUnlock()
if entry == nil {
entry = &process{
finished: make(chan struct{}),
}
p.locker.Lock()
p.entries[name] = entry
p.locker.Unlock()
}
return entry
}
// process is used on connections on specific actions that needs to wait for an answer from the other side.
// Take for example the `Conn#handleMessage.tryNamespace` which waits for `Conn#askConnect` to finish on the specific namespace.
type process struct {
done uint32
finished chan struct{}
waiting sync.WaitGroup
}
// Signal closes the channel.
func (p *process) Signal() {
// if !atomic.CompareAndSwapUint32(&p.running, 1, 0) {
// return // already finished.
// }
close(p.finished)
}
// Finished returns the read-only channel of `finished`.
// It gets fired when `Signal` is called.
func (p *process) Finished() <-chan struct{} {
return p.finished
}
// Done calls the internal WaitGroup's `Done` method.
func (p *process) Done() {
if !atomic.CompareAndSwapUint32(&p.done, 0, 1) {
return
}
p.waiting.Done()
}
// Wait waits on the internal `WaitGroup`. See `Done` too.
func (p *process) Wait() {
if atomic.LoadUint32(&p.done) == 1 {
return
}
p.waiting.Wait()
}
// Start makes future `Wait` calls to hold until `Done`.
func (p *process) Start() {
p.waiting.Add(1)
}
// isDone reports whether process is finished.
func (p *process) isDone() bool {
return atomic.LoadUint32(&p.done) == 1
}