-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmetadata_cluster.go
287 lines (270 loc) · 8.22 KB
/
metadata_cluster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
package kafka
import (
"fmt"
"sort"
"github.com/Shopify/sarama"
"github.com/spf13/cast"
)
// ClusterMeta contains metadata for a Kafka Cluster.
type ClusterMeta struct {
BrokerIDs []int32
Brokers []string
Topics []string
Groups []string
Controller int32
APIMaxVersions map[int16]int16
ErrorStack []string
}
// BrokerCount returns the number of brokers.
func (cm ClusterMeta) BrokerCount() int {
return len(cm.Brokers)
}
// TopicCount returns the number of Topics.
func (cm ClusterMeta) TopicCount() int {
return len(cm.Topics)
}
// GroupCount returns the number of Groups.
func (cm ClusterMeta) GroupCount() int {
return len(cm.Groups)
}
// BrokerList returns the list of brokers.
func (kc *KClient) BrokerList() ([]string, error) {
var brokerlist []string
res, err := kc.ReqMetadata()
if err != nil {
return brokerlist, err
}
for _, b := range res.Brokers {
brokerlist = append(brokerlist, b.Addr())
}
return brokerlist, nil
}
// BrokerIDMap returns broker addresses by their corresponding IDs.
func (kc *KClient) BrokerIDMap() (map[int32]string, error) {
brokerMap := make(map[int32]string, len(kc.brokers))
res, err := kc.ReqMetadata()
if err != nil {
return brokerMap, err
}
for _, b := range res.Brokers {
brokerMap[b.ID()] = b.Addr()
}
return brokerMap, nil
}
// GetClusterMeta returns Cluster Metadata.
func (kc *KClient) GetClusterMeta() (ClusterMeta, error) {
cm := ClusterMeta{}
res, err := kc.ReqMetadata()
if err != nil {
return cm, err
}
grps, errs := kc.ListGroups()
if len(errs) > 0 {
cm.ErrorStack = append(cm.ErrorStack, errs...)
}
cm.Controller = res.ControllerID
for _, b := range res.Brokers {
id := b.ID()
addr := b.Addr()
broker := string(addr + "/" + cast.ToString(id))
cm.Brokers = append(cm.Brokers, broker)
cm.BrokerIDs = append(cm.BrokerIDs, id)
}
for _, t := range res.Topics {
cm.Topics = append(cm.Topics, t.Name)
}
cm.APIMaxVersions, err = kc.GetAPIVersions()
if err != nil {
if len(grps) > 0 {
cm.ErrorStack = append(cm.ErrorStack, err.Error())
} else {
return cm, err
}
}
cm.Groups = grps
sort.Strings(cm.Groups)
sort.Strings(cm.Brokers)
sort.Strings(cm.Topics)
return cm, nil
}
// ReqMetadata returns a metadata response from the first responsive broker.
func (kc *KClient) ReqMetadata() (*sarama.MetadataResponse, error) {
var res *sarama.MetadataResponse
var err error
var req = sarama.MetadataRequest{
AllowAutoTopicCreation: false,
}
for _, b := range kc.brokers {
res, err = b.GetMetadata(&req)
if err == nil {
return res, nil
}
}
return nil, fmt.Errorf("No Metadata Response Received")
}
// APIKey Codes
const (
APIKeyProduce int16 = iota
APIKeyFetch
APIKeyListOffsets
APIKeyMetadata
APIKeyLeaderAndIsr
APIKeyStopReplica
APIKeyUpdateMetadata
APIKeyControlledShutdown
APIKeyOffsetCommit
APIKeyOffsetFetch
APIKeyFindCoordinator
APIKeyJoinGroup
APIKeyHeartbeat
APIKeyLeaveGroup
APIKeySyncGroup
APIKeyDescribeGroups
APIKeyListGroups
APIKeySaslHandshake
APIKeyAPIVersions
APIKeyCreateTopics
APIKeyDeleteTopics
APIKeyDeleteRecords
APIKeyInitProducerID
APIKeyOffsetForLeaderEpoch
APIKeyAddPartitionsToTxn
APIKeyAddOffsetsToTxn
APIKeyEndTxn
APIKeyWriteTxnMarkers
APIKeyTxnOffsetCommit
APIKeyDescribeAcls
APIKeyCreateAcls
APIKeyDeleteAcls
APIKeyDescribeConfigs
APIKeyAlterConfigs
APIKeyAlterReplicaLogDirs
APIKeyDescribeLogDirs
APIKeySaslAuthenticate
APIKeyCreatePartitions
APIKeyCreateDelegationToken
APIKeyRenewDelegationToken
APIKeyExpireDelegationToken
APIKeyDescribeDelegationToken
APIKeyDeleteGroups
APIKeyElectPreferredLeaders
APIKeyIncrementalAlterConfigs
APIKeyAlterPartitionReassignments
APIKeyListPartitionReassignments
APIKeyOffsetDelete
APIKeyDescribeClientQuotas
APIKeyAlterClientQuotas
)
// APIDescriptions for APIs.
// https://kafka.apache.org/protocol
var APIDescriptions = map[int16]string{
APIKeyProduce: "Produce",
APIKeyFetch: "Fetch",
APIKeyListOffsets: "ListOffsets",
APIKeyMetadata: "Metadata",
APIKeyLeaderAndIsr: "LeaderAndIsr",
APIKeyStopReplica: "StopReplica",
APIKeyUpdateMetadata: "UpdateMetadata",
APIKeyControlledShutdown: "ControlledShutdown",
APIKeyOffsetCommit: "OffsetCommit",
APIKeyOffsetFetch: "OffsetFetch",
APIKeyFindCoordinator: "FindCoordinator",
APIKeyJoinGroup: "JoinGroup",
APIKeyHeartbeat: "Heartbeat",
APIKeyLeaveGroup: "LeaveGroup",
APIKeySyncGroup: "SyncGroup",
APIKeyDescribeGroups: "DescribeGroups",
APIKeyListGroups: "ListGroups",
APIKeySaslHandshake: "SaslHandshake",
APIKeyAPIVersions: "ApiVersions",
APIKeyCreateTopics: "CreateTopics",
APIKeyDeleteTopics: "DeleteTopics",
APIKeyDeleteRecords: "DeleteRecords",
APIKeyInitProducerID: "InitProducerId",
APIKeyOffsetForLeaderEpoch: "OffsetForLeaderEpoch",
APIKeyAddPartitionsToTxn: "AddPartitionsToTxn",
APIKeyAddOffsetsToTxn: "AddOffsetsToTxn",
APIKeyEndTxn: "EndTxn",
APIKeyWriteTxnMarkers: "WriteTxnMarkers",
APIKeyTxnOffsetCommit: "TxnOffsetCommit",
APIKeyDescribeAcls: "DescribeAcls",
APIKeyCreateAcls: "CreateAcls",
APIKeyDeleteAcls: "DeleteAcls",
APIKeyDescribeConfigs: "DescribeConfigs",
APIKeyAlterConfigs: "AlterConfigs",
APIKeyAlterReplicaLogDirs: "AlterReplicaLogDirs",
APIKeyDescribeLogDirs: "DescribeLogDirs",
APIKeySaslAuthenticate: "SaslAuthenticate",
APIKeyCreatePartitions: "CreatePartitions",
APIKeyCreateDelegationToken: "CreateDelegationToken",
APIKeyRenewDelegationToken: "RenewDelegationToken",
APIKeyExpireDelegationToken: "ExpireDelegationToken",
APIKeyDescribeDelegationToken: "DescribeDelegationToken",
APIKeyDeleteGroups: "DeleteGroups",
APIKeyElectPreferredLeaders: "ElectPreferredLeaders",
APIKeyIncrementalAlterConfigs: "IncrementalAlterConfigs",
APIKeyAlterPartitionReassignments: "AlterPartitionReassignments",
APIKeyListPartitionReassignments: "ListPartitionReassignments",
APIKeyOffsetDelete: "OffsetDelete",
APIKeyDescribeClientQuotas: "DescribeClientQuotas",
APIKeyAlterClientQuotas: "AlterClientQuotas",
}
// Kafka API Versions:
var (
MinKafkaVersion = sarama.MinVersion
MaxKafkaVersion = sarama.MaxVersion
VER210KafkaVersion = sarama.V2_1_0_0
RecKafkaVersion = sarama.V1_1_0_0
MinCreatePartsVer = sarama.V1_0_0_0
MinDeleteRecordsVer = sarama.V0_11_0_0
MinTopicOpsVer = sarama.V0_10_1_0
)
// GetAPIVersions returns a API version Mapping.
func (kc *KClient) GetAPIVersions() (apiMaxVers map[int16]int16, err error) {
apiMaxVers = make(map[int16]int16)
apiVers, err := kc.apiVersions()
if err != nil {
return
}
for _, api := range apiVers.ApiKeys {
apiMaxVers[api.ApiKey] = api.MaxVersion
}
return
}
func (kc *KClient) apiVersions() (*sarama.ApiVersionsResponse, error) {
var apiRes *sarama.ApiVersionsResponse
controller, err := kc.cl.Controller()
if err != nil {
return apiRes, err
}
apiReq := sarama.ApiVersionsRequest{}
apiRes, err = controller.ApiVersions(&apiReq)
if err != nil {
return apiRes, err
}
return apiRes, nil
}
// BrokerAPIVersions returns the available API Versions for the given broker.
func BrokerAPIVersions(conf *sarama.Config, broker string) (apiMaxVers map[int16]int16, err error) {
b := sarama.NewBroker(broker)
if conf == nil {
conf = GetConf()
conf.ClientID = makeHex(3)
conf.Version = RecKafkaVersion
}
b.Open(conf)
apiReq := sarama.ApiVersionsRequest{}
apiVers, err := b.ApiVersions(&apiReq)
if err != nil {
return
}
apiMaxVers = make(map[int16]int16)
for _, api := range apiVers.ApiKeys {
apiMaxVers[api.ApiKey] = api.MaxVersion
}
return
}
// MatchKafkaVersion parses the given versiona and returns the corresponding KafkaVersion from sarama.
func MatchKafkaVersion(version string) (sarama.KafkaVersion, error) {
return sarama.ParseKafkaVersion(version)
}