-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnon_blocking_channel.go
133 lines (108 loc) · 2.5 KB
/
non_blocking_channel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package safe
import (
"math"
"runtime"
"sync"
"sync/atomic"
"unsafe"
)
// TODO: test
// A generic non-blocking channel with double closing and nil channel protection.
// Returns the zero value of the generic type if no item is available on the channel.
// Increases the overflow counter when the channel is full.
type NonBlockingChannel[T any] struct {
lock sync.Mutex
channel chan T
isClosed *atomic.Bool
closeOnce *sync.Once
overflowCounter uint64
overflow T
initialized bool
}
func NewNonBlockingChannel[T any](size int) *NonBlockingChannel[T] {
if size < 0 {
// Minimum channel size is 0
size = 0
} else if size > math.MaxUint16 {
// Only do a memory check for very large arrays
var m runtime.MemStats
runtime.ReadMemStats(&m)
sizeOf := uint64(unsafe.Sizeof(*new(T)))
if sizeOf == 0 {
return &NonBlockingChannel[T]{}
}
malloc := (m.Sys - m.Alloc) / sizeOf
// Limit the length by the maximum memory available
if malloc < math.MaxInt && size > int(malloc) {
size = int(malloc)
}
}
return &NonBlockingChannel[T]{
initialized: true,
channel: make(chan T, size),
isClosed: new(atomic.Bool),
closeOnce: new(sync.Once),
}
}
// TODO: test
func (c *NonBlockingChannel[T]) Clear() {
if !c.initialized {
return
}
c.lock.Lock()
defer c.lock.Unlock()
if !c.isClosed.Load() {
close(c.channel)
}
c.channel = make(chan T, len(c.channel))
}
func (c *NonBlockingChannel[T]) Close() {
if !c.initialized {
return
}
c.closeOnce.Do(func() {
c.isClosed.Store(true)
close(c.channel)
})
}
func (c *NonBlockingChannel[T]) Len() int {
if !c.initialized || c.isClosed.Load() {
return 0
}
return len(c.channel)
}
func (c *NonBlockingChannel[T]) Overflow() T {
return c.overflow
}
func (c *NonBlockingChannel[T]) OverflowCount() uint64 {
return c.overflowCounter
}
func (c *NonBlockingChannel[T]) Pop() T {
if !c.initialized || c.isClosed.Load() {
return *new(T)
}
select {
case item := <-c.channel:
return item
default:
return *new(T)
}
}
func (c *NonBlockingChannel[T]) Push(item T) {
if !c.initialized || c.isClosed.Load() {
return
}
select {
case c.channel <- item:
default:
c.overflowCounter++
c.overflow = item
}
}
// TODO: test
func (c *NonBlockingChannel[T]) Range() <-chan T {
if !c.initialized || c.isClosed.Load() {
return nil
}
return c.channel
}