From a7389e89ec96cbb16a7c61a49b5660a48484c367 Mon Sep 17 00:00:00 2001 From: oktaykcr Date: Sat, 24 Aug 2024 18:29:46 +0300 Subject: [PATCH] feat: write json and string methods for producer and consumer configs (#140) --- balancer.go | 19 +++++ balancer_test.go | 52 ++++++++++++ consumer_config.go | 55 +++++++++++++ consumer_config_test.go | 171 ++++++++++++++++++++++++++++++++++++++++ mechanism.go | 8 ++ mechanism_test.go | 34 ++++++++ producer_config.go | 27 +++++++ producer_config_test.go | 111 +++++++++++++++++++++++++- tls.go | 7 ++ tls_test.go | 30 +++++++ 10 files changed, 513 insertions(+), 1 deletion(-) create mode 100644 mechanism_test.go diff --git a/balancer.go b/balancer.go index 9319c67..de5abc5 100644 --- a/balancer.go +++ b/balancer.go @@ -27,3 +27,22 @@ func GetBalancerReferenceHash() Balancer { func GetBalancerRoundRobin() Balancer { return &kafka.RoundRobin{} } + +func GetBalancerString(balancer Balancer) string { + switch balancer.(type) { + case *kafka.CRC32Balancer: + return "CRC32Balancer" + case *kafka.Hash: + return "Hash" + case *kafka.LeastBytes: + return "LeastBytes" + case *kafka.Murmur2Balancer: + return "Murmur2Balancer" + case *kafka.ReferenceHash: + return "ReferenceHash" + case *kafka.RoundRobin: + return "RoundRobin" + default: + return "Unknown" + } +} diff --git a/balancer_test.go b/balancer_test.go index 11da72d..331f6f6 100644 --- a/balancer_test.go +++ b/balancer_test.go @@ -64,3 +64,55 @@ func TestGetBalancerRoundRobinh(t *testing.T) { t.Errorf("Expected *kafka.RoundRobin, got %s", reflect.TypeOf(balancer).String()) } } + +func TestGetBalancerString(t *testing.T) { + + tests := []struct { + name string + balancer Balancer + want string + }{ + { + name: "Should_Return_CRC32Balancer", + balancer: GetBalancerCRC32(), + want: "CRC32Balancer", + }, + { + name: "Should_Return_Hash", + balancer: GetBalancerHash(), + want: "Hash", + }, + { + name: "Should_Return_LeastBytes", + balancer: GetBalancerLeastBytes(), + want: "LeastBytes", + }, + { + name: "Should_Return_Murmur2Balancer", + balancer: GetBalancerMurmur2Balancer(), + want: "Murmur2Balancer", + }, + { + name: "Should_Return_ReferenceHash", + balancer: GetBalancerReferenceHash(), + want: "ReferenceHash", + }, + { + name: "Should_Return_RoundRobin", + balancer: GetBalancerRoundRobin(), + want: "RoundRobin", + }, + { + name: "Should_Return_Unknown", + balancer: nil, + want: "Unknown", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := GetBalancerString(tt.balancer); got != tt.want { + t.Errorf("GetBalancerString() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/consumer_config.go b/consumer_config.go index e3cf8d1..ceec7b1 100644 --- a/consumer_config.go +++ b/consumer_config.go @@ -1,6 +1,11 @@ package kafka import ( + "bytes" + "encoding/json" + "fmt" + "regexp" + "strings" "time" "github.com/segmentio/kafka-go" @@ -61,6 +66,47 @@ type ConsumerConfig struct { MetricPrefix string } +func (cfg RetryConfiguration) Json() string { + return fmt.Sprintf(`{"Brokers": ["%s"], "Topic": %q, "StartTimeCron": %q, "WorkDuration": %q, "MaxRetry": %d, "VerifyTopicOnStartup": %t, "Rack": %q}`, + strings.Join(cfg.Brokers, "\", \""), cfg.Topic, cfg.StartTimeCron, + cfg.WorkDuration, cfg.MaxRetry, cfg.VerifyTopicOnStartup, cfg.Rack) +} + +func (cfg *BatchConfiguration) Json() string { + if cfg == nil { + return "{}" + } + return fmt.Sprintf(`{"MessageGroupLimit": %d}`, cfg.MessageGroupLimit) +} + +func (cfg ReaderConfig) Json() string { + return fmt.Sprintf(`{"Brokers": ["%s"], "GroupId": %q, "GroupTopics": ["%s"], "MaxWait": %q, "CommitInterval": %q, "StartOffset": %q}`, + strings.Join(cfg.Brokers, "\", \""), cfg.GroupID, strings.Join(cfg.GroupTopics, "\", \""), + cfg.MaxWait, cfg.CommitInterval, kcronsumer.ToStringOffset(cfg.StartOffset)) +} + +func (cfg *ConsumerConfig) Json() string { + if cfg == nil { + return "{}" + } + return fmt.Sprintf(`{"ClientID": %q, "Reader": %s, "BatchConfiguration": %s, "MessageGroupDuration": %q, "TransactionalRetry": %t, "Concurrency": %d, "RetryEnabled": %t, "RetryConfiguration": %s, "VerifyTopicOnStartup": %t, "Rack": %q, "SASL": %s, "TLS": %s}`, + cfg.ClientID, cfg.Reader.Json(), cfg.BatchConfiguration.Json(), + cfg.MessageGroupDuration, *cfg.TransactionalRetry, cfg.Concurrency, + cfg.RetryEnabled, cfg.RetryConfiguration.Json(), cfg.VerifyTopicOnStartup, + cfg.Rack, cfg.SASL.Json(), cfg.TLS.Json()) +} + +func (cfg *ConsumerConfig) JsonPretty() string { + return jsonPretty(cfg.Json()) +} + +func (cfg *ConsumerConfig) String() string { + re := regexp.MustCompile(`"(\w+)"\s*:`) + modifiedString := re.ReplaceAllString(cfg.Json(), `$1:`) + modifiedString = modifiedString[1 : len(modifiedString)-1] + return modifiedString +} + func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config { cronsumerCfg := kcronsumer.Config{ MetricPrefix: cfg.RetryConfiguration.MetricPrefix, @@ -266,3 +312,12 @@ func (cfg *ConsumerConfig) setDefaults() { func NewBoolPtr(value bool) *bool { return &value } + +func jsonPretty(jsonString string) string { + var out bytes.Buffer + err := json.Indent(&out, []byte(jsonString), "", "\t") + if err != nil { + return jsonString + } + return out.String() +} diff --git a/consumer_config_test.go b/consumer_config_test.go index 98a8333..2fc9586 100644 --- a/consumer_config_test.go +++ b/consumer_config_test.go @@ -189,3 +189,174 @@ func TestConsumerConfig_getTopics(t *testing.T) { } }) } + +func Test_jsonPretty(t *testing.T) { + tests := []struct { + name string + input string + expected string + }{ + { + name: "Simple JSON", + input: `{"key1":"value1","key2":2}`, + expected: "{\n\t\"key1\": \"value1\",\n\t\"key2\": 2\n}", + }, + { + name: "Nested JSON", + input: `{"key1":"value1","key2":{"nestedKey1":1,"nestedKey2":2},"key3":[1,2,3]}`, + expected: "{\n\t\"key1\": \"value1\",\n\t\"key2\": {\n\t\t\"nestedKey1\": 1,\n\t\t\"nestedKey2\": 2\n\t},\n\t\"key3\": [\n\t\t1,\n\t\t2,\n\t\t3\n\t]\n}", + }, + { + name: "Invalid JSON", + input: `{"key1": "value1", "key2": 2`, + expected: `{"key1": "value1", "key2": 2`, + }, + { + name: "Empty JSON", + input: ``, + expected: ``, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := jsonPretty(tt.input) + if got != tt.expected { + t.Errorf("jsonPretty() = %v, want %v", got, tt.expected) + } + }) + } +} + +func TestConsumerConfig_Json(t *testing.T) { + t.Run("Should_Convert_Nil_Config_To_Json", func(t *testing.T) { + // Given + var config *ConsumerConfig + expected := "{}" + // When + result := config.Json() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) + t.Run("Should_Convert_To_Json", func(t *testing.T) { + // Given + expected := "{\"ClientID\": \"test-consumer-client-id\", \"Reader\": {\"Brokers\": [\"broker-1.test.com\", \"broker-2.test.com\"], \"GroupId\": \"test-consumer.0\", \"GroupTopics\": [\"test-updated.0\"], \"MaxWait\": \"2s\", \"CommitInterval\": \"1s\", \"StartOffset\": \"earliest\"}, \"BatchConfiguration\": {\"MessageGroupLimit\": 100}, \"MessageGroupDuration\": \"20ns\", \"TransactionalRetry\": false, \"Concurrency\": 10, \"RetryEnabled\": true, \"RetryConfiguration\": {\"Brokers\": [\"broker-1.test.com\", \"broker-2.test.com\"], \"Topic\": \"test-exception.0\", \"StartTimeCron\": \"*/2 * * * *\", \"WorkDuration\": \"1m0s\", \"MaxRetry\": 3, \"VerifyTopicOnStartup\": true, \"Rack\": \"\"}, \"VerifyTopicOnStartup\": true, \"Rack\": \"stage\", \"SASL\": {\"Mechanism\": \"scram\", \"Username\": \"user\", \"Password\": \"pass\"}, \"TLS\": {\"RootCAPath\": \"resources/ca\", \"IntermediateCAPath\": \"resources/intCa\"}}" + // When + result := getConsumerConfigExample().Json() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) + t.Run("Should_Convert_To_Json_Without_Inner_Object", func(t *testing.T) { + // Given + expected := "{\"ClientID\": \"test-consumer-client-id\", \"Reader\": {\"Brokers\": [\"\"], \"GroupId\": \"\", \"GroupTopics\": [\"\"], \"MaxWait\": \"0s\", \"CommitInterval\": \"0s\", \"StartOffset\": \"earliest\"}, \"BatchConfiguration\": {}, \"MessageGroupDuration\": \"20ns\", \"TransactionalRetry\": false, \"Concurrency\": 10, \"RetryEnabled\": true, \"RetryConfiguration\": {\"Brokers\": [\"\"], \"Topic\": \"\", \"StartTimeCron\": \"\", \"WorkDuration\": \"0s\", \"MaxRetry\": 0, \"VerifyTopicOnStartup\": false, \"Rack\": \"\"}, \"VerifyTopicOnStartup\": true, \"Rack\": \"stage\", \"SASL\": {}, \"TLS\": {}}" + // When + result := getConsumerConfigWithoutInnerObjectExample().Json() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) +} + +func TestConsumerConfig_String(t *testing.T) { + t.Run("Should_Convert_To_String", func(t *testing.T) { + // Given + expected := "ClientID: \"test-consumer-client-id\", Reader: {Brokers: [\"broker-1.test.com\", \"broker-2.test.com\"], GroupId: \"test-consumer.0\", GroupTopics: [\"test-updated.0\"], MaxWait: \"2s\", CommitInterval: \"1s\", StartOffset: \"earliest\"}, BatchConfiguration: {MessageGroupLimit: 100}, MessageGroupDuration: \"20ns\", TransactionalRetry: false, Concurrency: 10, RetryEnabled: true, RetryConfiguration: {Brokers: [\"broker-1.test.com\", \"broker-2.test.com\"], Topic: \"test-exception.0\", StartTimeCron: \"*/2 * * * *\", WorkDuration: \"1m0s\", MaxRetry: 3, VerifyTopicOnStartup: true, Rack: \"\"}, VerifyTopicOnStartup: true, Rack: \"stage\", SASL: {Mechanism: \"scram\", Username: \"user\", Password: \"pass\"}, TLS: {RootCAPath: \"resources/ca\", IntermediateCAPath: \"resources/intCa\"}" + // When + result := getConsumerConfigExample().String() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) + t.Run("Should_Convert_To_String_Without_Inner_Object", func(t *testing.T) { + // Given + expected := "ClientID: \"test-consumer-client-id\", Reader: {Brokers: [\"\"], GroupId: \"\", GroupTopics: [\"\"], MaxWait: \"0s\", CommitInterval: \"0s\", StartOffset: \"earliest\"}, BatchConfiguration: {}, MessageGroupDuration: \"20ns\", TransactionalRetry: false, Concurrency: 10, RetryEnabled: true, RetryConfiguration: {Brokers: [\"\"], Topic: \"\", StartTimeCron: \"\", WorkDuration: \"0s\", MaxRetry: 0, VerifyTopicOnStartup: false, Rack: \"\"}, VerifyTopicOnStartup: true, Rack: \"stage\", SASL: {}, TLS: {}" + // When + result := getConsumerConfigWithoutInnerObjectExample().String() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) +} + +func TestConsumerConfig_JsonPretty(t *testing.T) { + t.Run("Should_Convert_To_Pretty_Json", func(t *testing.T) { + // Given + expected := "{\n\t\"ClientID\": \"test-consumer-client-id\",\n\t\"Reader\": {\n\t\t\"Brokers\": [\n\t\t\t\"broker-1.test.com\",\n\t\t\t\"broker-2.test.com\"\n\t\t],\n\t\t\"GroupId\": \"test-consumer.0\",\n\t\t\"GroupTopics\": [\n\t\t\t\"test-updated.0\"\n\t\t],\n\t\t\"MaxWait\": \"2s\",\n\t\t\"CommitInterval\": \"1s\",\n\t\t\"StartOffset\": \"earliest\"\n\t},\n\t\"BatchConfiguration\": {\n\t\t\"MessageGroupLimit\": 100\n\t},\n\t\"MessageGroupDuration\": \"20ns\",\n\t\"TransactionalRetry\": false,\n\t\"Concurrency\": 10,\n\t\"RetryEnabled\": true,\n\t\"RetryConfiguration\": {\n\t\t\"Brokers\": [\n\t\t\t\"broker-1.test.com\",\n\t\t\t\"broker-2.test.com\"\n\t\t],\n\t\t\"Topic\": \"test-exception.0\",\n\t\t\"StartTimeCron\": \"*/2 * * * *\",\n\t\t\"WorkDuration\": \"1m0s\",\n\t\t\"MaxRetry\": 3,\n\t\t\"VerifyTopicOnStartup\": true,\n\t\t\"Rack\": \"\"\n\t},\n\t\"VerifyTopicOnStartup\": true,\n\t\"Rack\": \"stage\",\n\t\"SASL\": {\n\t\t\"Mechanism\": \"scram\",\n\t\t\"Username\": \"user\",\n\t\t\"Password\": \"pass\"\n\t},\n\t\"TLS\": {\n\t\t\"RootCAPath\": \"resources/ca\",\n\t\t\"IntermediateCAPath\": \"resources/intCa\"\n\t}\n}" + // When + result := getConsumerConfigExample().JsonPretty() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) + t.Run("Should_Convert_To_Pretty_Json_Without_Inner_Object", func(t *testing.T) { + // Given + expected := "{\n\t\"ClientID\": \"test-consumer-client-id\",\n\t\"Reader\": {\n\t\t\"Brokers\": [\n\t\t\t\"\"\n\t\t],\n\t\t\"GroupId\": \"\",\n\t\t\"GroupTopics\": [\n\t\t\t\"\"\n\t\t],\n\t\t\"MaxWait\": \"0s\",\n\t\t\"CommitInterval\": \"0s\",\n\t\t\"StartOffset\": \"earliest\"\n\t},\n\t\"BatchConfiguration\": {},\n\t\"MessageGroupDuration\": \"20ns\",\n\t\"TransactionalRetry\": false,\n\t\"Concurrency\": 10,\n\t\"RetryEnabled\": true,\n\t\"RetryConfiguration\": {\n\t\t\"Brokers\": [\n\t\t\t\"\"\n\t\t],\n\t\t\"Topic\": \"\",\n\t\t\"StartTimeCron\": \"\",\n\t\t\"WorkDuration\": \"0s\",\n\t\t\"MaxRetry\": 0,\n\t\t\"VerifyTopicOnStartup\": false,\n\t\t\"Rack\": \"\"\n\t},\n\t\"VerifyTopicOnStartup\": true,\n\t\"Rack\": \"stage\",\n\t\"SASL\": {},\n\t\"TLS\": {}\n}" + // When + result := getConsumerConfigWithoutInnerObjectExample().JsonPretty() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) +} + +func getConsumerConfigExample() *ConsumerConfig { + return &ConsumerConfig{ + Rack: "stage", + ClientID: "test-consumer-client-id", + Reader: ReaderConfig{ + Brokers: []string{"broker-1.test.com", "broker-2.test.com"}, + GroupID: "test-consumer.0", + GroupTopics: []string{"test-updated.0"}, + MaxWait: 2 * time.Second, + CommitInterval: time.Second, + }, + BatchConfiguration: &BatchConfiguration{ + MessageGroupLimit: 100, + }, + MessageGroupDuration: 20, + TransactionalRetry: NewBoolPtr(false), + Concurrency: 10, + RetryEnabled: true, + RetryConfiguration: RetryConfiguration{ + Brokers: []string{"broker-1.test.com", "broker-2.test.com"}, + Topic: "test-exception.0", + StartTimeCron: "*/2 * * * *", + WorkDuration: time.Minute * 1, + MaxRetry: 3, + VerifyTopicOnStartup: true, + }, + VerifyTopicOnStartup: true, + TLS: &TLSConfig{ + RootCAPath: "resources/ca", + IntermediateCAPath: "resources/intCa", + }, + SASL: &SASLConfig{ + Type: "scram", + Username: "user", + Password: "pass", + }, + } +} + +func getConsumerConfigWithoutInnerObjectExample() *ConsumerConfig { + return &ConsumerConfig{ + Rack: "stage", + ClientID: "test-consumer-client-id", + Reader: ReaderConfig{}, + MessageGroupDuration: 20, + TransactionalRetry: NewBoolPtr(false), + Concurrency: 10, + RetryEnabled: true, + RetryConfiguration: RetryConfiguration{}, + VerifyTopicOnStartup: true, + } +} diff --git a/mechanism.go b/mechanism.go index 5e89bf6..ceb0fe6 100644 --- a/mechanism.go +++ b/mechanism.go @@ -1,6 +1,7 @@ package kafka import ( + "fmt" "github.com/segmentio/kafka-go/sasl" "github.com/segmentio/kafka-go/sasl/plain" "github.com/segmentio/kafka-go/sasl/scram" @@ -37,3 +38,10 @@ func (s *SASLConfig) plain() sasl.Mechanism { func (s *SASLConfig) IsEmpty() bool { return s == nil } + +func (s *SASLConfig) Json() string { + if s == nil { + return "{}" + } + return fmt.Sprintf(`{"Mechanism": %q, "Username": %q, "Password": %q}`, s.Type, s.Username, s.Password) +} diff --git a/mechanism_test.go b/mechanism_test.go new file mode 100644 index 0000000..1436fab --- /dev/null +++ b/mechanism_test.go @@ -0,0 +1,34 @@ +package kafka + +import "testing" + +func TestSASLConfig_Json(t *testing.T) { + t.Run("Should_Convert_Nil_Config_To_Json", func(t *testing.T) { + // Given + var cfg *SASLConfig + + expected := "{}" + // When + result := cfg.Json() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) + t.Run("Should_Convert_To_Json", func(t *testing.T) { + // Given + cfg := &SASLConfig{ + Type: "scram", + Username: "user", + Password: "pass", + } + + expected := "{\"Mechanism\": \"scram\", \"Username\": \"user\", \"Password\": \"pass\"}" + // When + result := cfg.Json() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) +} diff --git a/producer_config.go b/producer_config.go index 1feeecd..1d8a908 100644 --- a/producer_config.go +++ b/producer_config.go @@ -1,6 +1,9 @@ package kafka import ( + "fmt" + "regexp" + "strings" "time" "go.opentelemetry.io/otel" @@ -29,6 +32,11 @@ type WriterConfig struct { AllowAutoTopicCreation bool } +func (cfg WriterConfig) Json() string { + return fmt.Sprintf(`{"Brokers": ["%s"], "Balancer": %q, "Compression": %q}`, + strings.Join(cfg.Brokers, "\", \""), GetBalancerString(cfg.Balancer), cfg.Compression.String()) +} + type TransportConfig struct { MetadataTopics []string DialTimeout time.Duration @@ -46,6 +54,25 @@ type ProducerConfig struct { DistributedTracingEnabled bool } +func (cfg *ProducerConfig) String() string { + re := regexp.MustCompile(`"(\w+)"\s*:`) + modifiedString := re.ReplaceAllString(cfg.Json(), `$1:`) + modifiedString = modifiedString[1 : len(modifiedString)-1] + return modifiedString +} + +func (cfg *ProducerConfig) Json() string { + if cfg == nil { + return "{}" + } + return fmt.Sprintf(`{"Writer": %s, "ClientID": %q, "DistributedTracingEnabled": %t, "SASL": %s, "TLS": %s}`, + cfg.Writer.Json(), cfg.ClientID, cfg.DistributedTracingEnabled, cfg.SASL.Json(), cfg.TLS.Json()) +} + +func (cfg *ProducerConfig) JsonPretty() string { + return jsonPretty(cfg.Json()) +} + func (cfg *ProducerConfig) newKafkaTransport() (*kafka.Transport, error) { transport := &Transport{ Transport: &kafka.Transport{ diff --git a/producer_config_test.go b/producer_config_test.go index 95bf956..d8f0fbd 100644 --- a/producer_config_test.go +++ b/producer_config_test.go @@ -1,6 +1,9 @@ package kafka -import "testing" +import ( + "github.com/segmentio/kafka-go" + "testing" +) func TestProducerConfig_setDefaults(t *testing.T) { // Given @@ -17,3 +20,109 @@ func TestProducerConfig_setDefaults(t *testing.T) { t.Fatal("Propagator cannot be null") } } + +func TestProducerConfig_Json(t *testing.T) { + t.Run("Should_Convert_Nil_Config_To_Json", func(t *testing.T) { + // Given + var config *ProducerConfig + expected := "{}" + // When + result := config.Json() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) + t.Run("Should_Convert_To_Json", func(t *testing.T) { + // Given + expected := "{\"Writer\": {\"Brokers\": [\"broker-1.test.com\", \"broker-2.test.com\"], \"Balancer\": \"Hash\", \"Compression\": \"gzip\"}, \"ClientID\": \"test-consumer-client-id\", \"DistributedTracingEnabled\": false, \"SASL\": {\"Mechanism\": \"scram\", \"Username\": \"user\", \"Password\": \"pass\"}, \"TLS\": {\"RootCAPath\": \"resources/ca\", \"IntermediateCAPath\": \"resources/intCa\"}}" + // When + result := getProducerConfigExample().Json() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) + t.Run("Should_Convert_To_Json_Without_Inner_Object", func(t *testing.T) { + // Given + expected := "{\"Writer\": {\"Brokers\": [\"\"], \"Balancer\": \"Unknown\", \"Compression\": \"uncompressed\"}, \"ClientID\": \"test-consumer-client-id\", \"DistributedTracingEnabled\": false, \"SASL\": {}, \"TLS\": {}}" + // When + result := getProducerConfigWithoutInnerObjectExample().Json() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) +} + +func TestProducerConfig_JsonPretty(t *testing.T) { + t.Run("Should_Convert_To_Pretty_Json", func(t *testing.T) { + // Given + expected := "{\n\t\"Writer\": {\n\t\t\"Brokers\": [\n\t\t\t\"broker-1.test.com\",\n\t\t\t\"broker-2.test.com\"\n\t\t],\n\t\t\"Balancer\": \"Hash\",\n\t\t\"Compression\": \"gzip\"\n\t},\n\t\"ClientID\": \"test-consumer-client-id\",\n\t\"DistributedTracingEnabled\": false,\n\t\"SASL\": {\n\t\t\"Mechanism\": \"scram\",\n\t\t\"Username\": \"user\",\n\t\t\"Password\": \"pass\"\n\t},\n\t\"TLS\": {\n\t\t\"RootCAPath\": \"resources/ca\",\n\t\t\"IntermediateCAPath\": \"resources/intCa\"\n\t}\n}" + // When + result := getProducerConfigExample().JsonPretty() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) + t.Run("Should_Convert_To_Pretty_Json_Without_Inner_Object", func(t *testing.T) { + // Given + expected := "{\n\t\"Writer\": {\n\t\t\"Brokers\": [\n\t\t\t\"\"\n\t\t],\n\t\t\"Balancer\": \"Unknown\",\n\t\t\"Compression\": \"uncompressed\"\n\t},\n\t\"ClientID\": \"test-consumer-client-id\",\n\t\"DistributedTracingEnabled\": false,\n\t\"SASL\": {},\n\t\"TLS\": {}\n}" + // When + result := getProducerConfigWithoutInnerObjectExample().JsonPretty() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) +} + +func TestProducerConfig_String(t *testing.T) { + t.Run("Should_Convert_To_String", func(t *testing.T) { + // Given + expected := "Writer: {Brokers: [\"broker-1.test.com\", \"broker-2.test.com\"], Balancer: \"Hash\", Compression: \"gzip\"}, ClientID: \"test-consumer-client-id\", DistributedTracingEnabled: false, SASL: {Mechanism: \"scram\", Username: \"user\", Password: \"pass\"}, TLS: {RootCAPath: \"resources/ca\", IntermediateCAPath: \"resources/intCa\"}" + // When + result := getProducerConfigExample().String() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) + t.Run("Should_Convert_To_String_Without_Inner_Object", func(t *testing.T) { + // Given + expected := "Writer: {Brokers: [\"\"], Balancer: \"Unknown\", Compression: \"uncompressed\"}, ClientID: \"test-consumer-client-id\", DistributedTracingEnabled: false, SASL: {}, TLS: {}" + // When + result := getProducerConfigWithoutInnerObjectExample().String() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) +} + +func getProducerConfigExample() *ProducerConfig { + return &ProducerConfig{ + ClientID: "test-consumer-client-id", + Writer: WriterConfig{ + Balancer: GetBalancerHash(), + Brokers: []string{"broker-1.test.com", "broker-2.test.com"}, + Compression: kafka.Gzip, + }, + TLS: &TLSConfig{ + RootCAPath: "resources/ca", + IntermediateCAPath: "resources/intCa", + }, + SASL: &SASLConfig{ + Type: "scram", + Username: "user", + Password: "pass", + }, + } +} + +func getProducerConfigWithoutInnerObjectExample() *ProducerConfig { + return &ProducerConfig{ + ClientID: "test-consumer-client-id", + } +} diff --git a/tls.go b/tls.go index d6ebf6b..dbb08e7 100644 --- a/tls.go +++ b/tls.go @@ -33,3 +33,10 @@ func (c *TLSConfig) TLSConfig() (*tls.Config, error) { func (c *TLSConfig) IsEmpty() bool { return c == nil || c.RootCAPath == "" && c.IntermediateCAPath == "" } + +func (c *TLSConfig) Json() string { + if c == nil { + return "{}" + } + return fmt.Sprintf(`{"RootCAPath": %q, "IntermediateCAPath": %q}`, c.RootCAPath, c.IntermediateCAPath) +} diff --git a/tls_test.go b/tls_test.go index e3351e0..f138507 100644 --- a/tls_test.go +++ b/tls_test.go @@ -65,3 +65,33 @@ func TestTLSConfig_IsEmpty(t *testing.T) { }) } } + +func TestTLSConfig_Json(t *testing.T) { + t.Run("Should_Convert_Nil_Config_To_Json", func(t *testing.T) { + // Given + var cfg *TLSConfig + + expected := "{}" + // When + result := cfg.Json() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) + t.Run("Should_Convert_To_Json", func(t *testing.T) { + // Given + cfg := &TLSConfig{ + RootCAPath: "resources/ca", + IntermediateCAPath: "resources/intCa", + } + + expected := "{\"RootCAPath\": \"resources/ca\", \"IntermediateCAPath\": \"resources/intCa\"}" + // When + result := cfg.Json() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) +}