-
Notifications
You must be signed in to change notification settings - Fork 13
/
verify_topic_test.go
130 lines (110 loc) · 2.55 KB
/
verify_topic_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package kafka
import (
"context"
"errors"
"testing"
"github.com/segmentio/kafka-go"
)
type mockKafkaClientWrapper struct {
wantErr bool
wantExistTopic bool
}
func (m mockKafkaClientWrapper) GetClient() *kafka.Client {
return &kafka.Client{}
}
func (m mockKafkaClientWrapper) Metadata(_ context.Context, _ *kafka.MetadataRequest) (*kafka.MetadataResponse, error) {
if m.wantErr {
return nil, errors.New("metadataReqErr")
}
if !m.wantExistTopic {
return &kafka.MetadataResponse{
Topics: []kafka.Topic{
{Name: "topic1", Error: kafka.UnknownTopicOrPartition},
{Name: "topic2", Error: nil},
},
}, nil
}
return &kafka.MetadataResponse{
Topics: []kafka.Topic{
{Name: "topic1", Error: nil},
{Name: "topic2", Error: nil},
},
}, nil
}
func Test_kafkaClientWrapper_VerifyTopics(t *testing.T) {
t.Run("Should_Return_Error_When_Metadata_Request_Has_Failed", func(t *testing.T) {
// Given
mockClient := mockKafkaClientWrapper{wantErr: true}
cfg := &ConsumerConfig{}
// When
_, err := verifyTopics(mockClient, cfg)
// Then
if err == nil {
t.Error("metadata request must be failed!")
}
})
t.Run("Should_Return_False_When_Given_Topic_Does_Not_Exist", func(t *testing.T) {
// Given
mockClient := mockKafkaClientWrapper{wantExistTopic: false}
cfg := &ConsumerConfig{
Reader: ReaderConfig{
Topic: "topic1",
},
}
// When
exist, err := verifyTopics(mockClient, cfg)
// Then
if exist {
t.Errorf("topic %s must not exist", cfg.Reader.Topic)
}
if err != nil {
t.Error("err must be nil")
}
})
t.Run("Should_Return_True_When_Given_Topic_Exist", func(t *testing.T) {
// Given
mockClient := mockKafkaClientWrapper{wantExistTopic: true}
cfg := &ConsumerConfig{
Reader: ReaderConfig{
Topic: "topic1",
},
}
// When
exist, err := verifyTopics(mockClient, cfg)
// Then
if !exist {
t.Errorf("topic %s must exist", cfg.Reader.Topic)
}
if err != nil {
t.Error("err must be nil")
}
})
}
func Test_newKafkaClient(t *testing.T) {
// Given
cfg := &ConsumerConfig{
Reader: ReaderConfig{
Topic: "topic",
Brokers: []string{"127.0.0.1:9092"},
},
}
// When
client, err := newKafkaClient(cfg)
// Then
if client.GetClient().Addr.String() != "127.0.0.1:9092" {
t.Errorf("broker address must be 127.0.0.1:9092")
}
if err != nil {
t.Errorf("err must be nil")
}
}
func Test_kClient_GetClient(t *testing.T) {
// Given
mockClient := mockKafkaClientWrapper{}
// When
client := mockClient.GetClient()
// Then
if client == nil {
t.Error("client must not be nil")
}
}