From f7466ea37d994396e26da2a8b7ca33e6691abafd Mon Sep 17 00:00:00 2001 From: Conor Mongey Date: Sat, 3 Feb 2018 20:34:14 +0000 Subject: [PATCH] Add Describe + AlterConfigs (#1014) Add DescribeConfig and AlterConfig --- alter_configs_request.go | 120 +++++++++++++++++++ alter_configs_request_test.go | 91 +++++++++++++++ alter_configs_response.go | 95 +++++++++++++++ alter_configs_response_test.go | 45 +++++++ broker.go | 21 ++++ config_resource_type.go | 15 +++ describe_configs_request.go | 91 +++++++++++++++ describe_configs_request_test.go | 90 ++++++++++++++ describe_configs_response.go | 188 ++++++++++++++++++++++++++++++ describe_configs_response_test.go | 60 ++++++++++ request.go | 4 + 11 files changed, 820 insertions(+) create mode 100644 alter_configs_request.go create mode 100644 alter_configs_request_test.go create mode 100644 alter_configs_response.go create mode 100644 alter_configs_response_test.go create mode 100644 config_resource_type.go create mode 100644 describe_configs_request.go create mode 100644 describe_configs_request_test.go create mode 100644 describe_configs_response.go create mode 100644 describe_configs_response_test.go diff --git a/alter_configs_request.go b/alter_configs_request.go new file mode 100644 index 000000000..48c44ead6 --- /dev/null +++ b/alter_configs_request.go @@ -0,0 +1,120 @@ +package sarama + +type AlterConfigsRequest struct { + Resources []*AlterConfigsResource + ValidateOnly bool +} + +type AlterConfigsResource struct { + Type ConfigResourceType + Name string + ConfigEntries map[string]*string +} + +func (acr *AlterConfigsRequest) encode(pe packetEncoder) error { + if err := pe.putArrayLength(len(acr.Resources)); err != nil { + return err + } + + for _, r := range acr.Resources { + if err := r.encode(pe); err != nil { + return err + } + } + + pe.putBool(acr.ValidateOnly) + return nil +} + +func (acr *AlterConfigsRequest) decode(pd packetDecoder, version int16) error { + resourceCount, err := pd.getArrayLength() + if err != nil { + return err + } + + acr.Resources = make([]*AlterConfigsResource, resourceCount) + for i := range acr.Resources { + r := &AlterConfigsResource{} + err = r.decode(pd, version) + if err != nil { + return err + } + acr.Resources[i] = r + } + + validateOnly, err := pd.getBool() + if err != nil { + return err + } + + acr.ValidateOnly = validateOnly + + return nil +} + +func (ac *AlterConfigsResource) encode(pe packetEncoder) error { + pe.putInt8(int8(ac.Type)) + + if err := pe.putString(ac.Name); err != nil { + return err + } + + if err := pe.putArrayLength(len(ac.ConfigEntries)); err != nil { + return err + } + for configKey, configValue := range ac.ConfigEntries { + if err := pe.putString(configKey); err != nil { + return err + } + if err := pe.putNullableString(configValue); err != nil { + return err + } + } + + return nil +} + +func (ac *AlterConfigsResource) decode(pd packetDecoder, version int16) error { + t, err := pd.getInt8() + if err != nil { + return err + } + ac.Type = ConfigResourceType(t) + + name, err := pd.getString() + if err != nil { + return err + } + ac.Name = name + + n, err := pd.getArrayLength() + if err != nil { + return err + } + + if n > 0 { + ac.ConfigEntries = make(map[string]*string, n) + for i := 0; i < n; i++ { + configKey, err := pd.getString() + if err != nil { + return err + } + if ac.ConfigEntries[configKey], err = pd.getNullableString(); err != nil { + return err + } + } + } + return err +} + +func (acr *AlterConfigsRequest) key() int16 { + return 33 +} + +func (acr *AlterConfigsRequest) version() int16 { + return 0 +} + +func (acr *AlterConfigsRequest) requiredVersion() KafkaVersion { + return V0_11_0_0 +} diff --git a/alter_configs_request_test.go b/alter_configs_request_test.go new file mode 100644 index 000000000..bfeff4491 --- /dev/null +++ b/alter_configs_request_test.go @@ -0,0 +1,91 @@ +package sarama + +import "testing" + +var ( + emptyAlterConfigsRequest = []byte{ + 0, 0, 0, 0, // 0 configs + 0, // don't Validate + } + + singleAlterConfigsRequest = []byte{ + 0, 0, 0, 1, // 1 config + 2, // a topic + 0, 3, 'f', 'o', 'o', // topic name: foo + 0, 0, 0, 1, //1 config name + 0, 10, // 10 chars + 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's', + 0, 4, + '1', '0', '0', '0', + 0, // don't validate + } + + doubleAlterConfigsRequest = []byte{ + 0, 0, 0, 2, // 2 config + 2, // a topic + 0, 3, 'f', 'o', 'o', // topic name: foo + 0, 0, 0, 1, //1 config name + 0, 10, // 10 chars + 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's', + 0, 4, + '1', '0', '0', '0', + 2, // a topic + 0, 3, 'b', 'a', 'r', // topic name: foo + 0, 0, 0, 2, //2 config + 0, 10, // 10 chars + 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's', + 0, 4, + '1', '0', '0', '0', + 0, 12, // 12 chars + 'r', 'e', 't', 'e', 'n', 't', 'i', 'o', 'n', '.', 'm', 's', + 0, 4, + '1', '0', '0', '0', + 0, // don't validate + } +) + +func TestAlterConfigsRequest(t *testing.T) { + var request *AlterConfigsRequest + + request = &AlterConfigsRequest{ + Resources: []*AlterConfigsResource{}, + } + testRequest(t, "no requests", request, emptyAlterConfigsRequest) + + configValue := "1000" + request = &AlterConfigsRequest{ + Resources: []*AlterConfigsResource{ + &AlterConfigsResource{ + Type: TopicResource, + Name: "foo", + ConfigEntries: map[string]*string{ + "segment.ms": &configValue, + }, + }, + }, + } + + testRequest(t, "one config", request, singleAlterConfigsRequest) + + request = &AlterConfigsRequest{ + Resources: []*AlterConfigsResource{ + &AlterConfigsResource{ + Type: TopicResource, + Name: "foo", + ConfigEntries: map[string]*string{ + "segment.ms": &configValue, + }, + }, + &AlterConfigsResource{ + Type: TopicResource, + Name: "bar", + ConfigEntries: map[string]*string{ + "segment.ms": &configValue, + "retention.ms": &configValue, + }, + }, + }, + } + + testRequest(t, "two configs", request, doubleAlterConfigsRequest) +} diff --git a/alter_configs_response.go b/alter_configs_response.go new file mode 100644 index 000000000..29b09e1ff --- /dev/null +++ b/alter_configs_response.go @@ -0,0 +1,95 @@ +package sarama + +import "time" + +type AlterConfigsResponse struct { + ThrottleTime time.Duration + Resources []*AlterConfigsResourceResponse +} + +type AlterConfigsResourceResponse struct { + ErrorCode int16 + ErrorMsg string + Type ConfigResourceType + Name string +} + +func (ct *AlterConfigsResponse) encode(pe packetEncoder) error { + pe.putInt32(int32(ct.ThrottleTime / time.Millisecond)) + + if err := pe.putArrayLength(len(ct.Resources)); err != nil { + return err + } + + for i := range ct.Resources { + pe.putInt16(ct.Resources[i].ErrorCode) + err := pe.putString(ct.Resources[i].ErrorMsg) + if err != nil { + return nil + } + pe.putInt8(int8(ct.Resources[i].Type)) + err = pe.putString(ct.Resources[i].Name) + if err != nil { + return nil + } + } + + return nil +} + +func (acr *AlterConfigsResponse) decode(pd packetDecoder, version int16) error { + throttleTime, err := pd.getInt32() + if err != nil { + return err + } + acr.ThrottleTime = time.Duration(throttleTime) * time.Millisecond + + responseCount, err := pd.getArrayLength() + if err != nil { + return err + } + + acr.Resources = make([]*AlterConfigsResourceResponse, responseCount) + + for i := range acr.Resources { + acr.Resources[i] = new(AlterConfigsResourceResponse) + + errCode, err := pd.getInt16() + if err != nil { + return err + } + acr.Resources[i].ErrorCode = errCode + + e, err := pd.getString() + if err != nil { + return err + } + acr.Resources[i].ErrorMsg = e + + t, err := pd.getInt8() + if err != nil { + return err + } + acr.Resources[i].Type = ConfigResourceType(t) + + name, err := pd.getString() + if err != nil { + return err + } + acr.Resources[i].Name = name + } + + return nil +} + +func (r *AlterConfigsResponse) key() int16 { + return 32 +} + +func (r *AlterConfigsResponse) version() int16 { + return 0 +} + +func (r *AlterConfigsResponse) requiredVersion() KafkaVersion { + return V0_11_0_0 +} diff --git a/alter_configs_response_test.go b/alter_configs_response_test.go new file mode 100644 index 000000000..459202870 --- /dev/null +++ b/alter_configs_response_test.go @@ -0,0 +1,45 @@ +package sarama + +import ( + "testing" +) + +var ( + alterResponseEmpty = []byte{ + 0, 0, 0, 0, //throttle + 0, 0, 0, 0, // no configs + } + + alterResponsePopulated = []byte{ + 0, 0, 0, 0, //throttle + 0, 0, 0, 1, // response + 0, 0, //errorcode + 0, 0, //string + 2, // topic + 0, 3, 'f', 'o', 'o', + } +) + +func TestAlterConfigsResponse(t *testing.T) { + var response *AlterConfigsResponse + + response = &AlterConfigsResponse{ + Resources: []*AlterConfigsResourceResponse{}, + } + testVersionDecodable(t, "empty", response, alterResponseEmpty, 0) + if len(response.Resources) != 0 { + t.Error("Expected no groups") + } + + response = &AlterConfigsResponse{ + Resources: []*AlterConfigsResourceResponse{ + &AlterConfigsResourceResponse{ + ErrorCode: 0, + ErrorMsg: "", + Type: TopicResource, + Name: "foo", + }, + }, + } + testResponse(t, "response with error", response, alterResponsePopulated) +} diff --git a/broker.go b/broker.go index 9aa04196d..b759f8f78 100644 --- a/broker.go +++ b/broker.go @@ -483,6 +483,27 @@ func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCom return response, nil } +func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) { + response := new(DescribeConfigsResponse) + + err := b.sendAndReceive(request, response) + if err != nil { + return nil, err + } + + return response, nil +} + +func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) { + response := new(AlterConfigsResponse) + + err := b.sendAndReceive(request, response) + if err != nil { + return nil, err + } + + return response, nil +} func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) { b.lock.Lock() defer b.lock.Unlock() diff --git a/config_resource_type.go b/config_resource_type.go new file mode 100644 index 000000000..848cc9c90 --- /dev/null +++ b/config_resource_type.go @@ -0,0 +1,15 @@ +package sarama + +type ConfigResourceType int8 + +// Taken from : +// https://cwiki.apache.org/confluence/display/KAFKA/KIP-133%3A+Describe+and+Alter+Configs+Admin+APIs#KIP-133:DescribeandAlterConfigsAdminAPIs-WireFormattypes + +const ( + UnknownResource ConfigResourceType = 0 + AnyResource ConfigResourceType = 1 + TopicResource ConfigResourceType = 2 + GroupResource ConfigResourceType = 3 + ClusterResource ConfigResourceType = 4 + BrokerResource ConfigResourceType = 5 +) diff --git a/describe_configs_request.go b/describe_configs_request.go new file mode 100644 index 000000000..7a7cffc3f --- /dev/null +++ b/describe_configs_request.go @@ -0,0 +1,91 @@ +package sarama + +type ConfigResource struct { + Type ConfigResourceType + Name string + ConfigNames []string +} + +type DescribeConfigsRequest struct { + Resources []*ConfigResource +} + +func (r *DescribeConfigsRequest) encode(pe packetEncoder) error { + if err := pe.putArrayLength(len(r.Resources)); err != nil { + return err + } + + for _, c := range r.Resources { + pe.putInt8(int8(c.Type)) + if err := pe.putString(c.Name); err != nil { + return err + } + + if len(c.ConfigNames) == 0 { + pe.putInt32(-1) + continue + } + if err := pe.putStringArray(c.ConfigNames); err != nil { + return err + } + } + + return nil +} + +func (r *DescribeConfigsRequest) decode(pd packetDecoder, version int16) (err error) { + n, err := pd.getArrayLength() + if err != nil { + return err + } + + r.Resources = make([]*ConfigResource, n) + + for i := 0; i < n; i++ { + r.Resources[i] = &ConfigResource{} + t, err := pd.getInt8() + if err != nil { + return err + } + r.Resources[i].Type = ConfigResourceType(t) + name, err := pd.getString() + if err != nil { + return err + } + r.Resources[i].Name = name + + confLength, err := pd.getArrayLength() + + if err != nil { + return err + } + + if confLength == -1 { + continue + } + + cfnames := make([]string, confLength) + for i := 0; i < confLength; i++ { + s, err := pd.getString() + if err != nil { + return err + } + cfnames[i] = s + } + r.Resources[i].ConfigNames = cfnames + } + + return nil +} + +func (r *DescribeConfigsRequest) key() int16 { + return 32 +} + +func (r *DescribeConfigsRequest) version() int16 { + return 0 +} + +func (r *DescribeConfigsRequest) requiredVersion() KafkaVersion { + return V0_11_0_0 +} diff --git a/describe_configs_request_test.go b/describe_configs_request_test.go new file mode 100644 index 000000000..ca0fd0495 --- /dev/null +++ b/describe_configs_request_test.go @@ -0,0 +1,90 @@ +package sarama + +import "testing" + +var ( + emptyDescribeConfigsRequest = []byte{ + 0, 0, 0, 0, // 0 configs + } + + singleDescribeConfigsRequest = []byte{ + 0, 0, 0, 1, // 1 config + 2, // a topic + 0, 3, 'f', 'o', 'o', // topic name: foo + 0, 0, 0, 1, //1 config name + 0, 10, // 10 chars + 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's', + } + + doubleDescribeConfigsRequest = []byte{ + 0, 0, 0, 2, // 2 configs + 2, // a topic + 0, 3, 'f', 'o', 'o', // topic name: foo + 0, 0, 0, 2, //2 config name + 0, 10, // 10 chars + 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's', + 0, 12, // 12 chars + 'r', 'e', 't', 'e', 'n', 't', 'i', 'o', 'n', '.', 'm', 's', + 2, // a topic + 0, 3, 'b', 'a', 'r', // topic name: foo + 0, 0, 0, 1, // 1 config + 0, 10, // 10 chars + 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's', + } + + singleDescribeConfigsRequestAllConfigs = []byte{ + 0, 0, 0, 1, // 1 config + 2, // a topic + 0, 3, 'f', 'o', 'o', // topic name: foo + 255, 255, 255, 255, // no configs + } +) + +func TestDescribeConfigsRequest(t *testing.T) { + var request *DescribeConfigsRequest + + request = &DescribeConfigsRequest{ + Resources: []*ConfigResource{}, + } + testRequest(t, "no requests", request, emptyDescribeConfigsRequest) + + configs := []string{"segment.ms"} + request = &DescribeConfigsRequest{ + Resources: []*ConfigResource{ + &ConfigResource{ + Type: TopicResource, + Name: "foo", + ConfigNames: configs, + }, + }, + } + + testRequest(t, "one config", request, singleDescribeConfigsRequest) + + request = &DescribeConfigsRequest{ + Resources: []*ConfigResource{ + &ConfigResource{ + Type: TopicResource, + Name: "foo", + ConfigNames: []string{"segment.ms", "retention.ms"}, + }, + &ConfigResource{ + Type: TopicResource, + Name: "bar", + ConfigNames: []string{"segment.ms"}, + }, + }, + } + testRequest(t, "two configs", request, doubleDescribeConfigsRequest) + + request = &DescribeConfigsRequest{ + Resources: []*ConfigResource{ + &ConfigResource{ + Type: TopicResource, + Name: "foo", + }, + }, + } + + testRequest(t, "one topic, all configs", request, singleDescribeConfigsRequestAllConfigs) +} diff --git a/describe_configs_response.go b/describe_configs_response.go new file mode 100644 index 000000000..6e5d30e4f --- /dev/null +++ b/describe_configs_response.go @@ -0,0 +1,188 @@ +package sarama + +import "time" + +type DescribeConfigsResponse struct { + ThrottleTime time.Duration + Resources []*ResourceResponse +} + +type ResourceResponse struct { + ErrorCode int16 + ErrorMsg string + Type ConfigResourceType + Name string + Configs []*ConfigEntry +} + +type ConfigEntry struct { + Name string + Value string + ReadOnly bool + Default bool + Sensitive bool +} + +func (r *DescribeConfigsResponse) encode(pe packetEncoder) (err error) { + pe.putInt32(int32(r.ThrottleTime / time.Millisecond)) + if err = pe.putArrayLength(len(r.Resources)); err != nil { + return err + } + + for _, c := range r.Resources { + if err = c.encode(pe); err != nil { + return err + } + } + return nil +} + +func (r *DescribeConfigsResponse) decode(pd packetDecoder, version int16) (err error) { + throttleTime, err := pd.getInt32() + if err != nil { + return err + } + r.ThrottleTime = time.Duration(throttleTime) * time.Millisecond + + n, err := pd.getArrayLength() + if err != nil { + return err + } + + r.Resources = make([]*ResourceResponse, n) + for i := 0; i < n; i++ { + rr := &ResourceResponse{} + if err := rr.decode(pd, version); err != nil { + return err + } + r.Resources[i] = rr + } + + return nil +} + +func (r *DescribeConfigsResponse) key() int16 { + return 32 +} + +func (r *DescribeConfigsResponse) version() int16 { + return 0 +} + +func (r *DescribeConfigsResponse) requiredVersion() KafkaVersion { + return V0_11_0_0 +} + +func (r *ResourceResponse) encode(pe packetEncoder) (err error) { + pe.putInt16(r.ErrorCode) + + if err = pe.putString(r.ErrorMsg); err != nil { + return err + } + + pe.putInt8(int8(r.Type)) + + if err = pe.putString(r.Name); err != nil { + return err + } + + if err = pe.putArrayLength(len(r.Configs)); err != nil { + return err + } + + for _, c := range r.Configs { + if err = c.encode(pe); err != nil { + return err + } + } + return nil +} + +func (r *ResourceResponse) decode(pd packetDecoder, version int16) (err error) { + ec, err := pd.getInt16() + if err != nil { + return err + } + r.ErrorCode = ec + + em, err := pd.getString() + if err != nil { + return err + } + r.ErrorMsg = em + + t, err := pd.getInt8() + if err != nil { + return err + } + r.Type = ConfigResourceType(t) + + name, err := pd.getString() + if err != nil { + return err + } + r.Name = name + + n, err := pd.getArrayLength() + if err != nil { + return err + } + + r.Configs = make([]*ConfigEntry, n) + for i := 0; i < n; i++ { + c := &ConfigEntry{} + if err := c.decode(pd, version); err != nil { + return err + } + r.Configs[i] = c + } + return nil +} + +func (r *ConfigEntry) encode(pe packetEncoder) (err error) { + if err = pe.putString(r.Name); err != nil { + return err + } + + if err = pe.putString(r.Value); err != nil { + return err + } + + pe.putBool(r.ReadOnly) + pe.putBool(r.Default) + pe.putBool(r.Sensitive) + return nil +} + +func (r *ConfigEntry) decode(pd packetDecoder, version int16) (err error) { + name, err := pd.getString() + if err != nil { + return err + } + r.Name = name + + value, err := pd.getString() + if err != nil { + return err + } + r.Value = value + + read, err := pd.getBool() + if err != nil { + return err + } + r.ReadOnly = read + + de, err := pd.getBool() + if err != nil { + return err + } + r.Default = de + + sensitive, err := pd.getBool() + if err != nil { + return err + } + r.Sensitive = sensitive + return nil +} diff --git a/describe_configs_response_test.go b/describe_configs_response_test.go new file mode 100644 index 000000000..e3dcbac39 --- /dev/null +++ b/describe_configs_response_test.go @@ -0,0 +1,60 @@ +package sarama + +import ( + "testing" +) + +var ( + describeConfigsResponseEmpty = []byte{ + 0, 0, 0, 0, //throttle + 0, 0, 0, 0, // no configs + } + + describeConfigsResponsePopulated = []byte{ + 0, 0, 0, 0, //throttle + 0, 0, 0, 1, // response + 0, 0, //errorcode + 0, 0, //string + 2, // topic + 0, 3, 'f', 'o', 'o', + 0, 0, 0, 1, //configs + 0, 10, 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's', + 0, 4, '1', '0', '0', '0', + 0, // ReadOnly + 0, // Default + 0, // Sensitive + } +) + +func TestDescribeConfigsResponse(t *testing.T) { + var response *DescribeConfigsResponse + + response = &DescribeConfigsResponse{ + Resources: []*ResourceResponse{}, + } + testVersionDecodable(t, "empty", response, describeConfigsResponseEmpty, 0) + if len(response.Resources) != 0 { + t.Error("Expected no groups") + } + + response = &DescribeConfigsResponse{ + Resources: []*ResourceResponse{ + &ResourceResponse{ + ErrorCode: 0, + ErrorMsg: "", + Type: TopicResource, + Name: "foo", + Configs: []*ConfigEntry{ + &ConfigEntry{ + Name: "segment.ms", + Value: "1000", + ReadOnly: false, + Default: false, + Sensitive: false, + }, + }, + }, + }, + } + testResponse(t, "response with error", response, describeConfigsResponsePopulated) +} diff --git a/request.go b/request.go index 42203edf8..5f7cb76e9 100644 --- a/request.go +++ b/request.go @@ -134,6 +134,10 @@ func allocateBody(key, version int16) protocolBody { return &CreateAclsRequest{} case 31: return &DeleteAclsRequest{} + case 32: + return &DescribeConfigsRequest{} + case 33: + return &AlterConfigsRequest{} case 37: return &CreatePartitionsRequest{} }