-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathadmin_offsets.go
237 lines (219 loc) · 5.85 KB
/
admin_offsets.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
package kafka
import (
"fmt"
"github.com/Shopify/sarama"
)
const (
// OffsetNewest stands for the log head offset, i.e. the offset that will be
// assigned to the next message that will be produced to the partition. You
// can send this to a client's GetOffset method to get this offset, or when
// calling ConsumePartition to start consuming new messages.
OffsetNewest int64 = -1
// OffsetOldest stands for the oldest offset available on the broker for a
// partition. You can send this to a client's GetOffset method to get this
// offset, or when calling ConsumePartition to start consuming from the
// oldest offset that is still available on the broker.
OffsetOldest int64 = -2
)
// OffsetAdmin is used for managing offsets.
type OffsetAdmin interface {
Group(group string) OffsetAdmin
Topic(topic string) OffsetAdmin
Valid() bool
GetOffsetLag(partition int32) (int64, int64, error)
GetTotalLag(partitions []int32) (GroupLag, error)
GetGroupOffsets(partitions []int32) (GroupOffsetMap, error)
ResetOffset(partition int32, targetOffset int64) error
}
type offsetAdmin struct {
grp string
top string
client sarama.Client
om sarama.OffsetManager
pom sarama.PartitionOffsetManager
}
// GroupLag details Group Lag.
type GroupLag struct {
Group string
Topic string
PartitionOffset map[int32]int64
PartitionLag map[int32]int64
TotalLag int64
}
type grpPartLag struct {
partition int32
offset int64
lag int64
}
// GroupOffsetMap details offsets for each partition in a map.
type GroupOffsetMap struct {
Group string
Topic string
PartitionOffset map[int32]int64
}
// OffSetAdmin returns the underlying OffsetAdmin used to chain commands for offset management.
func (kc *KClient) OffSetAdmin() OffsetAdmin {
return &offsetAdmin{
client: kc.cl,
}
}
// Group to target.
func (oa *offsetAdmin) Group(group string) OffsetAdmin {
oa.grp = group
return oa
}
// Topic to Target.
func (oa *offsetAdmin) Topic(topic string) OffsetAdmin {
oa.top = topic
return oa
}
// Valid returns true if offset admin has both a target group and topic entered, false otherwise.
func (oa *offsetAdmin) Valid() bool {
if oa.grp == "" || oa.top == "" {
return false
}
return true
}
// GetOffsetLag returns the current group offset and lag for the given partition.
func (oa *offsetAdmin) GetOffsetLag(partition int32) (groupOffset int64, partitionLag int64, err error) {
if !oa.Valid() {
err = fmt.Errorf("No specified Group and/or Topic")
return
}
oa.om, err = sarama.NewOffsetManagerFromClient(oa.grp, oa.client)
if err != nil {
return
}
oa.pom, err = oa.om.ManagePartition(oa.top, partition)
if err != nil {
return
}
groupOffset, _ = oa.pom.NextOffset()
partOffset, err := oa.client.GetOffset(oa.top, partition, sarama.OffsetNewest)
if err != nil {
return
}
if groupOffset == -1 {
groupOffset = partOffset
}
partitionLag = (partOffset - groupOffset)
oa.om.Close()
oa.pom.Close()
return
}
// GetTotalLag returns the total lag for a Group.
func (oa *offsetAdmin) GetTotalLag(partitions []int32) (groupLag GroupLag, err error) {
if !oa.Valid() {
err = fmt.Errorf("No specified Group and/or Topic")
return
}
oa.om, err = sarama.NewOffsetManagerFromClient(oa.grp, oa.client)
if err != nil {
return
}
var totalLag int64
partitionOff := make(map[int32]int64, len(partitions))
partitionLag := make(map[int32]int64, len(partitions))
plChan := make(chan grpPartLag, 100)
for _, partition := range partitions {
go func(part int32) {
var groupOffset int64
var partOffset int64
oa.pom, err = oa.om.ManagePartition(oa.top, part)
if err != nil {
return
}
groupOffset, _ = oa.pom.NextOffset()
partOffset, err = oa.client.GetOffset(oa.top, part, sarama.OffsetNewest)
if err != nil {
return
}
if groupOffset == -1 {
groupOffset = partOffset
}
partLag := (partOffset - groupOffset)
pl := grpPartLag{
partition: part,
offset: groupOffset,
lag: partLag,
}
plChan <- pl
oa.pom.Close()
}(partition)
}
for i := 0; i < len(partitions); i++ {
pl := <-plChan
partitionOff[pl.partition] = pl.offset
partitionLag[pl.partition] = pl.lag
totalLag = (totalLag + pl.lag)
}
groupLag.Group = oa.grp
groupLag.Topic = oa.top
groupLag.TotalLag = totalLag
groupLag.PartitionOffset = partitionOff
groupLag.PartitionLag = partitionLag
oa.om.Close()
return
}
func (oa *offsetAdmin) GetGroupOffsets(partitions []int32) (groupOffsetMap GroupOffsetMap, err error) {
if !oa.Valid() {
err = fmt.Errorf("No specified Group and/or Topic")
return
}
oa.om, err = sarama.NewOffsetManagerFromClient(oa.grp, oa.client)
if err != nil {
return
}
partitionOff := make(map[int32]int64, len(partitions))
plChan := make(chan grpPartLag, 100)
for _, partition := range partitions {
go func(part int32) {
var groupOffset int64
oa.pom, err = oa.om.ManagePartition(oa.top, part)
if err != nil {
return
}
groupOffset, _ = oa.pom.NextOffset()
pl := grpPartLag{
partition: part,
offset: groupOffset,
}
plChan <- pl
oa.pom.Close()
}(partition)
}
for i := 0; i < len(partitions); i++ {
pl := <-plChan
partitionOff[pl.partition] = pl.offset
}
groupOffsetMap.Group = oa.grp
groupOffsetMap.Topic = oa.top
groupOffsetMap.PartitionOffset = partitionOff
oa.om.Close()
return
}
// ResetOffset resets an partition to the target Offset.
func (oa *offsetAdmin) ResetOffset(partition int32, targetOffset int64) (err error) {
if !oa.Valid() {
err = fmt.Errorf("No specified Group and/or Topic")
return
}
oa.om, err = sarama.NewOffsetManagerFromClient(oa.grp, oa.client)
if err != nil {
return
}
oa.pom, err = oa.om.ManagePartition(oa.top, partition)
if err != nil {
return
}
oa.pom.ResetOffset(targetOffset, "")
err = oa.om.Close()
if err != nil {
return
}
err = oa.pom.Close()
if err != nil {
return
}
return
}