forked from segmentio/kafka-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdeleteacls.go
114 lines (95 loc) · 3.34 KB
/
deleteacls.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package kafka
import (
"context"
"fmt"
"net"
"time"
"github.com/segmentio/kafka-go/protocol/deleteacls"
)
// DeleteACLsRequest represents a request sent to a kafka broker to delete
// ACLs.
type DeleteACLsRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr
// List of ACL filters to use for deletion.
Filters []DeleteACLsFilter
}
type DeleteACLsFilter struct {
ResourceTypeFilter ResourceType
ResourceNameFilter string
ResourcePatternTypeFilter PatternType
PrincipalFilter string
HostFilter string
Operation ACLOperationType
PermissionType ACLPermissionType
}
// DeleteACLsResponse represents a response from a kafka broker to an ACL
// deletion request.
type DeleteACLsResponse struct {
// The amount of time that the broker throttled the request.
Throttle time.Duration
// List of the results from the deletion request.
Results []DeleteACLsResult
}
type DeleteACLsResult struct {
Error error
MatchingACLs []DeleteACLsMatchingACLs
}
type DeleteACLsMatchingACLs struct {
Error error
ResourceType ResourceType
ResourceName string
ResourcePatternType PatternType
Principal string
Host string
Operation ACLOperationType
PermissionType ACLPermissionType
}
// DeleteACLs sends ACLs deletion request to a kafka broker and returns the
// response.
func (c *Client) DeleteACLs(ctx context.Context, req *DeleteACLsRequest) (*DeleteACLsResponse, error) {
filters := make([]deleteacls.RequestFilter, 0, len(req.Filters))
for _, filter := range req.Filters {
filters = append(filters, deleteacls.RequestFilter{
ResourceTypeFilter: int8(filter.ResourceTypeFilter),
ResourceNameFilter: filter.ResourceNameFilter,
ResourcePatternTypeFilter: int8(filter.ResourcePatternTypeFilter),
PrincipalFilter: filter.PrincipalFilter,
HostFilter: filter.HostFilter,
Operation: int8(filter.Operation),
PermissionType: int8(filter.PermissionType),
})
}
m, err := c.roundTrip(ctx, req.Addr, &deleteacls.Request{
Filters: filters,
})
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).DeleteACLs: %w", err)
}
res := m.(*deleteacls.Response)
results := make([]DeleteACLsResult, 0, len(res.FilterResults))
for _, result := range res.FilterResults {
matchingACLs := make([]DeleteACLsMatchingACLs, 0, len(result.MatchingACLs))
for _, matchingACL := range result.MatchingACLs {
matchingACLs = append(matchingACLs, DeleteACLsMatchingACLs{
Error: makeError(matchingACL.ErrorCode, matchingACL.ErrorMessage),
ResourceType: ResourceType(matchingACL.ResourceType),
ResourceName: matchingACL.ResourceName,
ResourcePatternType: PatternType(matchingACL.ResourcePatternType),
Principal: matchingACL.Principal,
Host: matchingACL.Host,
Operation: ACLOperationType(matchingACL.Operation),
PermissionType: ACLPermissionType(matchingACL.PermissionType),
})
}
results = append(results, DeleteACLsResult{
Error: makeError(result.ErrorCode, result.ErrorMessage),
MatchingACLs: matchingACLs,
})
}
ret := &DeleteACLsResponse{
Throttle: makeDuration(res.ThrottleTimeMs),
Results: results,
}
return ret, nil
}