Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
Signed-off-by: Song Gao <[email protected]>
  • Loading branch information
Yisaer committed Jul 23, 2024
1 parent e20cbed commit e89a32f
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 28 deletions.
19 changes: 11 additions & 8 deletions extensions/impl/kafka/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/pingcap/failpoint"
kafkago "github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl"

"github.com/lf-edge/ekuiper/contract/v2/api"
"github.com/lf-edge/ekuiper/v2/pkg/cast"
Expand All @@ -35,6 +36,7 @@ type KafkaSink struct {
headersMap map[string]string
headerTemplate string
saslConf *saslConf
mechanism sasl.Mechanism
}

type kafkaConf struct {
Expand Down Expand Up @@ -87,6 +89,14 @@ func (k *KafkaSink) Provision(ctx api.StreamContext, configs map[string]any) err
if err != nil {
return err
}
mechanism, err := k.saslConf.GetMechanism()
failpoint.Inject("kafkaErr", func(val failpoint.Value) {
err = mockKakfaSourceErr(val.(int), mechanismErr)
})
if err != nil {
return err
}
k.mechanism = mechanism
k.tlsConfig = tlsConfig
k.kc = c
err = k.setHeaders()
Expand All @@ -97,13 +107,6 @@ func (k *KafkaSink) Provision(ctx api.StreamContext, configs map[string]any) err
}

func (k *KafkaSink) buildKafkaWriter() error {
mechanism, err := k.saslConf.GetMechanism()
failpoint.Inject("kafkaErr", func(val failpoint.Value) {
err = mockKakfaSourceErr(val.(int), mechanismErr)
})
if err != nil {
return err
}
brokers := strings.Split(k.kc.Brokers, ",")
w := &kafkago.Writer{
Addr: kafkago.TCP(brokers...),
Expand All @@ -116,7 +119,7 @@ func (k *KafkaSink) buildKafkaWriter() error {
RequiredAcks: kafkago.RequiredAcks(k.kc.RequiredACKs),
BatchSize: 1,
Transport: &kafkago.Transport{
SASL: mechanism,
SASL: k.mechanism,
TLS: k.tlsConfig,
},
}
Expand Down
15 changes: 10 additions & 5 deletions extensions/impl/kafka/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,16 @@ func TestKafkaSink(t *testing.T) {
"brokers": "localhost:9092",
}
require.NoError(t, ks.Provision(ctx, configs))
require.Error(t, ks.Connect(ctx))
require.NoError(t, ks.Connect(ctx))
mockT := testx.MockTuple{
Map: map[string]any{"1": 1},
}
_, err := ks.collect(ctx, mockT)
require.Error(t, err)
msgs, err := ks.collect(ctx, mockT)
require.Len(t, msgs, 1)
require.NoError(t, err)
require.NoError(t, ks.Close(ctx))

for i := mockErrStart + 1; i < offsetErr; i++ {
for i := mockErrStart + 1; i < mockErrEnd; i++ {
failpoint.Enable("github.com/lf-edge/ekuiper/v2/extensions/impl/kafka/kafkaErr", fmt.Sprintf("return(%v)", i))
require.Error(t, ks.Provision(ctx, configs), i)
}
Expand All @@ -100,22 +101,26 @@ func TestKafkaSinkBuildMsg(t *testing.T) {
"headers": map[string]any{
"a": "{{.a}}",
},
"key": "{{.a}}",
}
ks := &KafkaSink{}
ctx := mockContext.NewMockContext("1", "2")
require.NoError(t, ks.Provision(ctx, configs))
require.NoError(t, ks.Connect(ctx))
item := map[string]any{
"a": 1,
}
d, _ := json.Marshal(item)
mockT := testx.MockTuple{
Map: item,
Template: map[string]string{"a": "1"},
Template: map[string]string{"a": "1", "key": "1"},
}
msg, err := ks.buildMsg(ctx, mockT, d)
require.NoError(t, err)
require.Equal(t, "a", msg.Headers[0].Key)
b := make([]uint8, 0, 8)
b = strconv.AppendInt(b, int64(1), 10)
require.Equal(t, b, msg.Headers[0].Value)
require.Equal(t, []byte("1"), msg.Key)
require.NoError(t, ks.Close(ctx))
}
26 changes: 12 additions & 14 deletions extensions/impl/kafka/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type KafkaSource struct {
tlsConfig *tls.Config
sc *kafkaSourceConf
saslConf *saslConf
mechanism sasl.Mechanism
}

type kafkaSourceConf struct {
Expand Down Expand Up @@ -116,6 +117,15 @@ func (k *KafkaSource) Provision(ctx api.StreamContext, configs map[string]any) e
return err
}
k.saslConf = saslConf
mechanism, err := k.saslConf.GetMechanism()
failpoint.Inject("kafkaErr", func(val failpoint.Value) {
err = mockKakfaSourceErr(val.(int), mechanismErr)
})
if err != nil {
conf.Log.Errorf("kafka sasl mechanism error: %v", err)
return err
}
k.mechanism = mechanism
conf.Log.Infof("kafka source got configured.")
return nil
}
Expand All @@ -125,28 +135,17 @@ func (k *KafkaSource) Close(ctx api.StreamContext) error {
}

func (k *KafkaSource) Connect(ctx api.StreamContext) error {
mechanism, err := k.saslConf.GetMechanism()
failpoint.Inject("kafkaErr", func(val failpoint.Value) {
err = mockKakfaSourceErr(val.(int), mechanismErr)
})
if err != nil {
conf.Log.Errorf("kafka sasl mechanism error: %v", err)
return err
}
readerConfig := k.sc.GetReaderConfig()
conf.Log.Infof("topic: %s, brokers: %v", readerConfig.Topic, readerConfig.Brokers)
readerConfig.Dialer = &kafkago.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
TLS: k.tlsConfig,
SASLMechanism: mechanism,
SASLMechanism: k.mechanism,
}
reader := kafkago.NewReader(readerConfig)
k.reader = reader
err = k.reader.SetOffset(kafkago.LastOffset)
failpoint.Inject("kafkaErr", func(val failpoint.Value) {
err = mockKakfaSourceErr(val.(int), offsetErr)
})
err := k.reader.SetOffset(kafkago.LastOffset)
if err != nil {
return err

Check warning on line 150 in extensions/impl/kafka/source.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/source.go#L150

Added line #L150 was not covered by tests
}
Expand Down Expand Up @@ -257,7 +256,6 @@ const (
castConfErr
saslConfErr
mechanismErr
offsetErr
mockErrEnd
)

Expand Down
2 changes: 1 addition & 1 deletion extensions/impl/kafka/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestKafkaSource(t *testing.T) {
"brokers": "localhost:9092",
}
require.NoError(t, ks.Provision(ctx, configs))
require.Error(t, ks.Connect(ctx))
require.NoError(t, ks.Connect(ctx))
require.NoError(t, ks.Close(ctx))

for i := mockErrStart + 1; i < mockErrEnd; i++ {
Expand Down

0 comments on commit e89a32f

Please sign in to comment.