Skip to content

Commit

Permalink
feat: write json and string methods for producer and consumer configs (
Browse files Browse the repository at this point in the history
  • Loading branch information
oktaykcr committed Aug 24, 2024
1 parent c27bd17 commit a7389e8
Show file tree
Hide file tree
Showing 10 changed files with 513 additions and 1 deletion.
19 changes: 19 additions & 0 deletions balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,22 @@ func GetBalancerReferenceHash() Balancer {
func GetBalancerRoundRobin() Balancer {
return &kafka.RoundRobin{}
}

func GetBalancerString(balancer Balancer) string {
switch balancer.(type) {
case *kafka.CRC32Balancer:
return "CRC32Balancer"
case *kafka.Hash:
return "Hash"
case *kafka.LeastBytes:
return "LeastBytes"
case *kafka.Murmur2Balancer:
return "Murmur2Balancer"
case *kafka.ReferenceHash:
return "ReferenceHash"
case *kafka.RoundRobin:
return "RoundRobin"
default:
return "Unknown"
}
}
52 changes: 52 additions & 0 deletions balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,55 @@ func TestGetBalancerRoundRobinh(t *testing.T) {
t.Errorf("Expected *kafka.RoundRobin, got %s", reflect.TypeOf(balancer).String())
}
}

func TestGetBalancerString(t *testing.T) {

Check failure on line 69 in balancer_test.go

View workflow job for this annotation

GitHub Actions / build

File is not `gofumpt`-ed (gofumpt)
tests := []struct {
name string
balancer Balancer
want string
}{
{
name: "Should_Return_CRC32Balancer",
balancer: GetBalancerCRC32(),
want: "CRC32Balancer",
},
{
name: "Should_Return_Hash",
balancer: GetBalancerHash(),
want: "Hash",
},
{
name: "Should_Return_LeastBytes",
balancer: GetBalancerLeastBytes(),
want: "LeastBytes",
},
{
name: "Should_Return_Murmur2Balancer",
balancer: GetBalancerMurmur2Balancer(),
want: "Murmur2Balancer",
},
{
name: "Should_Return_ReferenceHash",
balancer: GetBalancerReferenceHash(),
want: "ReferenceHash",
},
{
name: "Should_Return_RoundRobin",
balancer: GetBalancerRoundRobin(),
want: "RoundRobin",
},
{
name: "Should_Return_Unknown",
balancer: nil,
want: "Unknown",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := GetBalancerString(tt.balancer); got != tt.want {
t.Errorf("GetBalancerString() = %v, want %v", got, tt.want)
}
})
}
}
55 changes: 55 additions & 0 deletions consumer_config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package kafka

import (
"bytes"
"encoding/json"
"fmt"
"regexp"
"strings"
"time"

"github.com/segmentio/kafka-go"
Expand Down Expand Up @@ -61,6 +66,47 @@ type ConsumerConfig struct {
MetricPrefix string
}

func (cfg RetryConfiguration) Json() string {
return fmt.Sprintf(`{"Brokers": ["%s"], "Topic": %q, "StartTimeCron": %q, "WorkDuration": %q, "MaxRetry": %d, "VerifyTopicOnStartup": %t, "Rack": %q}`,

Check failure on line 70 in consumer_config.go

View workflow job for this annotation

GitHub Actions / build

line is 152 characters (lll)
strings.Join(cfg.Brokers, "\", \""), cfg.Topic, cfg.StartTimeCron,
cfg.WorkDuration, cfg.MaxRetry, cfg.VerifyTopicOnStartup, cfg.Rack)
}

func (cfg *BatchConfiguration) Json() string {
if cfg == nil {
return "{}"
}
return fmt.Sprintf(`{"MessageGroupLimit": %d}`, cfg.MessageGroupLimit)
}

func (cfg ReaderConfig) Json() string {
return fmt.Sprintf(`{"Brokers": ["%s"], "GroupId": %q, "GroupTopics": ["%s"], "MaxWait": %q, "CommitInterval": %q, "StartOffset": %q}`,
strings.Join(cfg.Brokers, "\", \""), cfg.GroupID, strings.Join(cfg.GroupTopics, "\", \""),
cfg.MaxWait, cfg.CommitInterval, kcronsumer.ToStringOffset(cfg.StartOffset))
}

func (cfg *ConsumerConfig) Json() string {
if cfg == nil {
return "{}"
}
return fmt.Sprintf(`{"ClientID": %q, "Reader": %s, "BatchConfiguration": %s, "MessageGroupDuration": %q, "TransactionalRetry": %t, "Concurrency": %d, "RetryEnabled": %t, "RetryConfiguration": %s, "VerifyTopicOnStartup": %t, "Rack": %q, "SASL": %s, "TLS": %s}`,

Check failure on line 92 in consumer_config.go

View workflow job for this annotation

GitHub Actions / build

line is 261 characters (lll)
cfg.ClientID, cfg.Reader.Json(), cfg.BatchConfiguration.Json(),
cfg.MessageGroupDuration, *cfg.TransactionalRetry, cfg.Concurrency,
cfg.RetryEnabled, cfg.RetryConfiguration.Json(), cfg.VerifyTopicOnStartup,
cfg.Rack, cfg.SASL.Json(), cfg.TLS.Json())
}

func (cfg *ConsumerConfig) JsonPretty() string {

Check warning on line 99 in consumer_config.go

View workflow job for this annotation

GitHub Actions / build

var-naming: method JsonPretty should be JSONPretty (revive)
return jsonPretty(cfg.Json())
}

func (cfg *ConsumerConfig) String() string {
re := regexp.MustCompile(`"(\w+)"\s*:`)
modifiedString := re.ReplaceAllString(cfg.Json(), `$1:`)
modifiedString = modifiedString[1 : len(modifiedString)-1]
return modifiedString
}

func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config {
cronsumerCfg := kcronsumer.Config{
MetricPrefix: cfg.RetryConfiguration.MetricPrefix,
Expand Down Expand Up @@ -266,3 +312,12 @@ func (cfg *ConsumerConfig) setDefaults() {
func NewBoolPtr(value bool) *bool {
return &value
}

func jsonPretty(jsonString string) string {
var out bytes.Buffer
err := json.Indent(&out, []byte(jsonString), "", "\t")
if err != nil {
return jsonString
}
return out.String()
}
171 changes: 171 additions & 0 deletions consumer_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,174 @@ func TestConsumerConfig_getTopics(t *testing.T) {
}
})
}

func Test_jsonPretty(t *testing.T) {
tests := []struct {
name string
input string
expected string
}{
{
name: "Simple JSON",
input: `{"key1":"value1","key2":2}`,
expected: "{\n\t\"key1\": \"value1\",\n\t\"key2\": 2\n}",
},
{
name: "Nested JSON",
input: `{"key1":"value1","key2":{"nestedKey1":1,"nestedKey2":2},"key3":[1,2,3]}`,
expected: "{\n\t\"key1\": \"value1\",\n\t\"key2\": {\n\t\t\"nestedKey1\": 1,\n\t\t\"nestedKey2\": 2\n\t},\n\t\"key3\": [\n\t\t1,\n\t\t2,\n\t\t3\n\t]\n}",

Check failure on line 207 in consumer_config_test.go

View workflow job for this annotation

GitHub Actions / build

line is 156 characters (lll)
},
{
name: "Invalid JSON",
input: `{"key1": "value1", "key2": 2`,
expected: `{"key1": "value1", "key2": 2`,
},
{
name: "Empty JSON",
input: ``,
expected: ``,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := jsonPretty(tt.input)
if got != tt.expected {
t.Errorf("jsonPretty() = %v, want %v", got, tt.expected)
}
})
}
}

func TestConsumerConfig_Json(t *testing.T) {
t.Run("Should_Convert_Nil_Config_To_Json", func(t *testing.T) {
// Given
var config *ConsumerConfig
expected := "{}"
// When
result := config.Json()
// Then
if result != expected {
t.Fatal("result must be equal to expected")
}
})
t.Run("Should_Convert_To_Json", func(t *testing.T) {
// Given
expected := "{\"ClientID\": \"test-consumer-client-id\", \"Reader\": {\"Brokers\": [\"broker-1.test.com\", \"broker-2.test.com\"], \"GroupId\": \"test-consumer.0\", \"GroupTopics\": [\"test-updated.0\"], \"MaxWait\": \"2s\", \"CommitInterval\": \"1s\", \"StartOffset\": \"earliest\"}, \"BatchConfiguration\": {\"MessageGroupLimit\": 100}, \"MessageGroupDuration\": \"20ns\", \"TransactionalRetry\": false, \"Concurrency\": 10, \"RetryEnabled\": true, \"RetryConfiguration\": {\"Brokers\": [\"broker-1.test.com\", \"broker-2.test.com\"], \"Topic\": \"test-exception.0\", \"StartTimeCron\": \"*/2 * * * *\", \"WorkDuration\": \"1m0s\", \"MaxRetry\": 3, \"VerifyTopicOnStartup\": true, \"Rack\": \"\"}, \"VerifyTopicOnStartup\": true, \"Rack\": \"stage\", \"SASL\": {\"Mechanism\": \"scram\", \"Username\": \"user\", \"Password\": \"pass\"}, \"TLS\": {\"RootCAPath\": \"resources/ca\", \"IntermediateCAPath\": \"resources/intCa\"}}"

Check failure on line 245 in consumer_config_test.go

View workflow job for this annotation

GitHub Actions / build

line is 931 characters (lll)
// When
result := getConsumerConfigExample().Json()
// Then
if result != expected {
t.Fatal("result must be equal to expected")
}
})
t.Run("Should_Convert_To_Json_Without_Inner_Object", func(t *testing.T) {
// Given
expected := "{\"ClientID\": \"test-consumer-client-id\", \"Reader\": {\"Brokers\": [\"\"], \"GroupId\": \"\", \"GroupTopics\": [\"\"], \"MaxWait\": \"0s\", \"CommitInterval\": \"0s\", \"StartOffset\": \"earliest\"}, \"BatchConfiguration\": {}, \"MessageGroupDuration\": \"20ns\", \"TransactionalRetry\": false, \"Concurrency\": 10, \"RetryEnabled\": true, \"RetryConfiguration\": {\"Brokers\": [\"\"], \"Topic\": \"\", \"StartTimeCron\": \"\", \"WorkDuration\": \"0s\", \"MaxRetry\": 0, \"VerifyTopicOnStartup\": false, \"Rack\": \"\"}, \"VerifyTopicOnStartup\": true, \"Rack\": \"stage\", \"SASL\": {}, \"TLS\": {}}"

Check failure on line 255 in consumer_config_test.go

View workflow job for this annotation

GitHub Actions / build

line is 619 characters (lll)
// When
result := getConsumerConfigWithoutInnerObjectExample().Json()
// Then
if result != expected {
t.Fatal("result must be equal to expected")
}
})
}

func TestConsumerConfig_String(t *testing.T) {
t.Run("Should_Convert_To_String", func(t *testing.T) {
// Given
expected := "ClientID: \"test-consumer-client-id\", Reader: {Brokers: [\"broker-1.test.com\", \"broker-2.test.com\"], GroupId: \"test-consumer.0\", GroupTopics: [\"test-updated.0\"], MaxWait: \"2s\", CommitInterval: \"1s\", StartOffset: \"earliest\"}, BatchConfiguration: {MessageGroupLimit: 100}, MessageGroupDuration: \"20ns\", TransactionalRetry: false, Concurrency: 10, RetryEnabled: true, RetryConfiguration: {Brokers: [\"broker-1.test.com\", \"broker-2.test.com\"], Topic: \"test-exception.0\", StartTimeCron: \"*/2 * * * *\", WorkDuration: \"1m0s\", MaxRetry: 3, VerifyTopicOnStartup: true, Rack: \"\"}, VerifyTopicOnStartup: true, Rack: \"stage\", SASL: {Mechanism: \"scram\", Username: \"user\", Password: \"pass\"}, TLS: {RootCAPath: \"resources/ca\", IntermediateCAPath: \"resources/intCa\"}"

Check failure on line 268 in consumer_config_test.go

View workflow job for this annotation

GitHub Actions / build

line is 805 characters (lll)
// When
result := getConsumerConfigExample().String()
// Then
if result != expected {
t.Fatal("result must be equal to expected")
}
})
t.Run("Should_Convert_To_String_Without_Inner_Object", func(t *testing.T) {
// Given
expected := "ClientID: \"test-consumer-client-id\", Reader: {Brokers: [\"\"], GroupId: \"\", GroupTopics: [\"\"], MaxWait: \"0s\", CommitInterval: \"0s\", StartOffset: \"earliest\"}, BatchConfiguration: {}, MessageGroupDuration: \"20ns\", TransactionalRetry: false, Concurrency: 10, RetryEnabled: true, RetryConfiguration: {Brokers: [\"\"], Topic: \"\", StartTimeCron: \"\", WorkDuration: \"0s\", MaxRetry: 0, VerifyTopicOnStartup: false, Rack: \"\"}, VerifyTopicOnStartup: true, Rack: \"stage\", SASL: {}, TLS: {}"

Check failure on line 278 in consumer_config_test.go

View workflow job for this annotation

GitHub Actions / build

line is 517 characters (lll)
// When
result := getConsumerConfigWithoutInnerObjectExample().String()
// Then
if result != expected {
t.Fatal("result must be equal to expected")
}
})
}

func TestConsumerConfig_JsonPretty(t *testing.T) {
t.Run("Should_Convert_To_Pretty_Json", func(t *testing.T) {
// Given
expected := "{\n\t\"ClientID\": \"test-consumer-client-id\",\n\t\"Reader\": {\n\t\t\"Brokers\": [\n\t\t\t\"broker-1.test.com\",\n\t\t\t\"broker-2.test.com\"\n\t\t],\n\t\t\"GroupId\": \"test-consumer.0\",\n\t\t\"GroupTopics\": [\n\t\t\t\"test-updated.0\"\n\t\t],\n\t\t\"MaxWait\": \"2s\",\n\t\t\"CommitInterval\": \"1s\",\n\t\t\"StartOffset\": \"earliest\"\n\t},\n\t\"BatchConfiguration\": {\n\t\t\"MessageGroupLimit\": 100\n\t},\n\t\"MessageGroupDuration\": \"20ns\",\n\t\"TransactionalRetry\": false,\n\t\"Concurrency\": 10,\n\t\"RetryEnabled\": true,\n\t\"RetryConfiguration\": {\n\t\t\"Brokers\": [\n\t\t\t\"broker-1.test.com\",\n\t\t\t\"broker-2.test.com\"\n\t\t],\n\t\t\"Topic\": \"test-exception.0\",\n\t\t\"StartTimeCron\": \"*/2 * * * *\",\n\t\t\"WorkDuration\": \"1m0s\",\n\t\t\"MaxRetry\": 3,\n\t\t\"VerifyTopicOnStartup\": true,\n\t\t\"Rack\": \"\"\n\t},\n\t\"VerifyTopicOnStartup\": true,\n\t\"Rack\": \"stage\",\n\t\"SASL\": {\n\t\t\"Mechanism\": \"scram\",\n\t\t\"Username\": \"user\",\n\t\t\"Password\": \"pass\"\n\t},\n\t\"TLS\": {\n\t\t\"RootCAPath\": \"resources/ca\",\n\t\t\"IntermediateCAPath\": \"resources/intCa\"\n\t}\n}"
// When
result := getConsumerConfigExample().JsonPretty()
// Then
if result != expected {
t.Fatal("result must be equal to expected")
}
})
t.Run("Should_Convert_To_Pretty_Json_Without_Inner_Object", func(t *testing.T) {
// Given
expected := "{\n\t\"ClientID\": \"test-consumer-client-id\",\n\t\"Reader\": {\n\t\t\"Brokers\": [\n\t\t\t\"\"\n\t\t],\n\t\t\"GroupId\": \"\",\n\t\t\"GroupTopics\": [\n\t\t\t\"\"\n\t\t],\n\t\t\"MaxWait\": \"0s\",\n\t\t\"CommitInterval\": \"0s\",\n\t\t\"StartOffset\": \"earliest\"\n\t},\n\t\"BatchConfiguration\": {},\n\t\"MessageGroupDuration\": \"20ns\",\n\t\"TransactionalRetry\": false,\n\t\"Concurrency\": 10,\n\t\"RetryEnabled\": true,\n\t\"RetryConfiguration\": {\n\t\t\"Brokers\": [\n\t\t\t\"\"\n\t\t],\n\t\t\"Topic\": \"\",\n\t\t\"StartTimeCron\": \"\",\n\t\t\"WorkDuration\": \"0s\",\n\t\t\"MaxRetry\": 0,\n\t\t\"VerifyTopicOnStartup\": false,\n\t\t\"Rack\": \"\"\n\t},\n\t\"VerifyTopicOnStartup\": true,\n\t\"Rack\": \"stage\",\n\t\"SASL\": {},\n\t\"TLS\": {}\n}"
// When
result := getConsumerConfigWithoutInnerObjectExample().JsonPretty()
// Then
if result != expected {
t.Fatal("result must be equal to expected")
}
})
}

func getConsumerConfigExample() *ConsumerConfig {
return &ConsumerConfig{
Rack: "stage",
ClientID: "test-consumer-client-id",
Reader: ReaderConfig{
Brokers: []string{"broker-1.test.com", "broker-2.test.com"},
GroupID: "test-consumer.0",
GroupTopics: []string{"test-updated.0"},
MaxWait: 2 * time.Second,
CommitInterval: time.Second,
},
BatchConfiguration: &BatchConfiguration{
MessageGroupLimit: 100,
},
MessageGroupDuration: 20,
TransactionalRetry: NewBoolPtr(false),
Concurrency: 10,
RetryEnabled: true,
RetryConfiguration: RetryConfiguration{
Brokers: []string{"broker-1.test.com", "broker-2.test.com"},
Topic: "test-exception.0",
StartTimeCron: "*/2 * * * *",
WorkDuration: time.Minute * 1,
MaxRetry: 3,
VerifyTopicOnStartup: true,
},
VerifyTopicOnStartup: true,
TLS: &TLSConfig{
RootCAPath: "resources/ca",
IntermediateCAPath: "resources/intCa",
},
SASL: &SASLConfig{
Type: "scram",
Username: "user",
Password: "pass",
},
}
}

func getConsumerConfigWithoutInnerObjectExample() *ConsumerConfig {
return &ConsumerConfig{
Rack: "stage",
ClientID: "test-consumer-client-id",
Reader: ReaderConfig{},
MessageGroupDuration: 20,
TransactionalRetry: NewBoolPtr(false),
Concurrency: 10,
RetryEnabled: true,
RetryConfiguration: RetryConfiguration{},
VerifyTopicOnStartup: true,
}
}
8 changes: 8 additions & 0 deletions mechanism.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafka

import (
"fmt"

Check failure on line 4 in mechanism.go

View workflow job for this annotation

GitHub Actions / build

File is not `gofumpt`-ed (gofumpt)
"github.com/segmentio/kafka-go/sasl"
"github.com/segmentio/kafka-go/sasl/plain"
"github.com/segmentio/kafka-go/sasl/scram"
Expand Down Expand Up @@ -37,3 +38,10 @@ func (s *SASLConfig) plain() sasl.Mechanism {
func (s *SASLConfig) IsEmpty() bool {
return s == nil
}

func (s *SASLConfig) Json() string {
if s == nil {
return "{}"
}
return fmt.Sprintf(`{"Mechanism": %q, "Username": %q, "Password": %q}`, s.Type, s.Username, s.Password)
}
34 changes: 34 additions & 0 deletions mechanism_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package kafka

import "testing"

func TestSASLConfig_Json(t *testing.T) {
t.Run("Should_Convert_Nil_Config_To_Json", func(t *testing.T) {
// Given
var cfg *SASLConfig

expected := "{}"
// When
result := cfg.Json()
// Then
if result != expected {
t.Fatal("result must be equal to expected")
}
})
t.Run("Should_Convert_To_Json", func(t *testing.T) {
// Given
cfg := &SASLConfig{
Type: "scram",
Username: "user",
Password: "pass",
}

expected := "{\"Mechanism\": \"scram\", \"Username\": \"user\", \"Password\": \"pass\"}"
// When
result := cfg.Json()
// Then
if result != expected {
t.Fatal("result must be equal to expected")
}
})
}
Loading

0 comments on commit a7389e8

Please sign in to comment.