Skip to content

Commit

Permalink
restore kafka
Browse files Browse the repository at this point in the history
Signed-off-by: Song Gao <[email protected]>
  • Loading branch information
Yisaer committed Jul 22, 2024
1 parent be44639 commit 1c269ea
Show file tree
Hide file tree
Showing 12 changed files with 1,309 additions and 2 deletions.
245 changes: 245 additions & 0 deletions extensions/impl/kafka/sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafka

import (
"crypto/tls"
"encoding/json"
"fmt"
"strings"

"github.com/pingcap/failpoint"
kafkago "github.com/segmentio/kafka-go"

"github.com/lf-edge/ekuiper/contract/v2/api"
"github.com/lf-edge/ekuiper/v2/pkg/cast"
"github.com/lf-edge/ekuiper/v2/pkg/cert"
)

type KafkaSink struct {
writer *kafkago.Writer
kc *kafkaConf
tlsConfig *tls.Config
headersMap map[string]string
headerTemplate string
}

type kafkaConf struct {
Brokers string `json:"brokers"`
Topic string `json:"topic"`
MaxAttempts int `json:"maxAttempts"`
RequiredACKs int `json:"requiredACKs"`
Key string `json:"key"`
Headers interface{} `json:"headers"`
}

func (c *kafkaConf) validate() error {
if c.Topic == "" {
return fmt.Errorf("topic can not be empty")
}
if len(c.Brokers) < 1 {
return fmt.Errorf("brokers can not be empty")
}
return nil
}

func (k *KafkaSink) Provision(ctx api.StreamContext, configs map[string]any) error {
c := &kafkaConf{
RequiredACKs: -1,
MaxAttempts: 1,
}
err := cast.MapToStruct(configs, c)
failpoint.Inject("kafkaErr", func(val failpoint.Value) {
err = mockKakfaSourceErr(val.(int), castConfErr)
})
if err != nil {
return err
}
err = c.validate()
if err != nil {
return err
}
sc, err := getSaslConf(configs)
failpoint.Inject("kafkaErr", func(val failpoint.Value) {
err = mockKakfaSourceErr(val.(int), saslConfErr)
})
if err != nil {
return err
}
if err := sc.Validate(); err != nil {
return err

Check warning on line 82 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L82

Added line #L82 was not covered by tests
}
tlsConfig, err := cert.GenTLSConfig(configs, "kafka-sink")
if err != nil {
return err
}
k.tlsConfig = tlsConfig
k.kc = c
err = k.setHeaders()
if err != nil {
return err
}
return k.buildKafkaWriter(sc)
}

func (k *KafkaSink) buildKafkaWriter(sc *saslConf) error {
mechanism, err := sc.GetMechanism()
failpoint.Inject("kafkaErr", func(val failpoint.Value) {
err = mockKakfaSourceErr(val.(int), mechanismErr)
})
if err != nil {
return err
}
brokers := strings.Split(k.kc.Brokers, ",")
w := &kafkago.Writer{
Addr: kafkago.TCP(brokers...),
Topic: k.kc.Topic,
// kafka java-client default balancer
Balancer: &kafkago.Murmur2Balancer{},
Async: false,
AllowAutoTopicCreation: true,
MaxAttempts: k.kc.MaxAttempts,
RequiredAcks: kafkago.RequiredAcks(k.kc.RequiredACKs),
BatchSize: 1,
Transport: &kafkago.Transport{
SASL: mechanism,
TLS: k.tlsConfig,
},
}
k.writer = w
return nil
}

func (k *KafkaSink) Close(ctx api.StreamContext) error {
return k.writer.Close()
}

func (k *KafkaSink) Connect(ctx api.StreamContext) error {
for _, broker := range strings.Split(k.kc.Brokers, ",") {
err := ping(k.tlsConfig, broker)
if err != nil {
return err
}
}
return nil

Check warning on line 136 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L136

Added line #L136 was not covered by tests
}

func (k *KafkaSink) Collect(ctx api.StreamContext, item api.MessageTuple) error {
return k.collect(ctx, item.ToMap())

Check warning on line 140 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L139-L140

Added lines #L139 - L140 were not covered by tests
}

func (k *KafkaSink) CollectList(ctx api.StreamContext, items api.MessageTupleList) error {
for _, data := range items.ToMaps() {
err := k.collect(ctx, data)
if err != nil {
return err

Check warning on line 147 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L143-L147

Added lines #L143 - L147 were not covered by tests
}
}
return nil

Check warning on line 150 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L150

Added line #L150 was not covered by tests
}

func (k *KafkaSink) collect(ctx api.StreamContext, data map[string]any) error {
ds, err := json.Marshal(data)
if err != nil {
return err

Check warning on line 156 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L156

Added line #L156 was not covered by tests
}
var messages []kafkago.Message
msg, err := k.buildMsg(ctx, data, ds)
if err != nil {
return err

Check warning on line 161 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L161

Added line #L161 was not covered by tests
}
messages = append(messages, msg)
return k.writer.WriteMessages(ctx, messages...)
}

func (k *KafkaSink) buildMsg(ctx api.StreamContext, item interface{}, decodedBytes []byte) (kafkago.Message, error) {
msg := kafkago.Message{Value: decodedBytes}
if len(k.kc.Key) > 0 {
newKey, err := ctx.ParseTemplate(k.kc.Key, item)
if err != nil {
return kafkago.Message{}, fmt.Errorf("parse kafka key error: %v", err)

Check warning on line 172 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L170-L172

Added lines #L170 - L172 were not covered by tests
}
msg.Key = []byte(newKey)

Check warning on line 174 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L174

Added line #L174 was not covered by tests
}
headers, err := k.parseHeaders(ctx, item)
if err != nil {
return kafkago.Message{}, fmt.Errorf("parse kafka headers error: %v", err)

Check warning on line 178 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L178

Added line #L178 was not covered by tests
}
msg.Headers = headers
return msg, nil
}

func (k *KafkaSink) setHeaders() error {
if k.kc.Headers == nil {
return nil
}
switch h := k.kc.Headers.(type) {
case map[string]interface{}:
kafkaHeaders := make(map[string]string)
for key, value := range h {
if sv, ok := value.(string); ok {
kafkaHeaders[key] = sv
}
}
k.headersMap = kafkaHeaders
return nil
case string:
k.headerTemplate = h
return nil

Check warning on line 200 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L198-L200

Added lines #L198 - L200 were not covered by tests
default:
return fmt.Errorf("kafka headers must be a map[string]string or a string")
}
}

func (k *KafkaSink) parseHeaders(ctx api.StreamContext, data interface{}) ([]kafkago.Header, error) {
if len(k.headersMap) > 0 {
var kafkaHeaders []kafkago.Header
for k, v := range k.headersMap {
value, err := ctx.ParseTemplate(v, data)
if err != nil {
return nil, fmt.Errorf("parse kafka header map failed, err:%v", err)

Check warning on line 212 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L212

Added line #L212 was not covered by tests
}
kafkaHeaders = append(kafkaHeaders, kafkago.Header{
Key: k,
Value: []byte(value),
})
}
return kafkaHeaders, nil
} else if len(k.headerTemplate) > 0 {
headers := make(map[string]string)
s, err := ctx.ParseTemplate(k.headerTemplate, data)
if err != nil {
return nil, fmt.Errorf("parse kafka header template failed, err:%v", err)

Check warning on line 224 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L221-L224

Added lines #L221 - L224 were not covered by tests
}
if err := json.Unmarshal([]byte(s), &headers); err != nil {
return nil, err

Check warning on line 227 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L226-L227

Added lines #L226 - L227 were not covered by tests
}
var kafkaHeaders []kafkago.Header
for key, value := range headers {
kafkaHeaders = append(kafkaHeaders, kafkago.Header{
Key: key,
Value: []byte(value),
})

Check warning on line 234 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L229-L234

Added lines #L229 - L234 were not covered by tests
}
return kafkaHeaders, nil

Check warning on line 236 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L236

Added line #L236 was not covered by tests
}
return nil, nil
}

func GetSink() api.Sink {
return &KafkaSink{}

Check warning on line 242 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L241-L242

Added lines #L241 - L242 were not covered by tests
}

var _ api.TupleCollector = &KafkaSink{}
112 changes: 112 additions & 0 deletions extensions/impl/kafka/sink_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafka

import (
"encoding/json"
"fmt"
"strconv"
"testing"

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/require"

mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context"
)

func TestKafkaSink(t *testing.T) {
ks := &KafkaSink{}
testcases := []struct {
configs map[string]any
}{
{
configs: map[string]any{},
},
{
configs: map[string]any{
"topic": "t",
},
},
{
configs: map[string]any{
"topic": "t",
"brokers": "localhost:9092",
"certificationRaw": "mockErr",
},
},
{
configs: map[string]any{
"datasource": "t",
"brokers": "localhost:9092",
"saslAuthType": "mockErr",
},
},
{
configs: map[string]any{
"datasource": "t",
"brokers": "localhost:9092",
"saslAuthType": "plain",
},
},
{
configs: map[string]any{
"topic": "t",
"brokers": "localhost:9092",
"headers": 1,
},
},
}
ctx := mockContext.NewMockContext("1", "2")
for index, tc := range testcases {
require.Error(t, ks.Provision(ctx, tc.configs), index)
}
configs := map[string]any{
"topic": "t",
"brokers": "localhost:9092",
}
require.NoError(t, ks.Provision(ctx, configs))
require.Error(t, ks.Connect(ctx))
require.Error(t, ks.collect(ctx, map[string]any{"1": 1}))
require.NoError(t, ks.Close(ctx))

for i := mockErrStart + 1; i < offsetErr; i++ {
failpoint.Enable("github.com/lf-edge/ekuiper/v2/extensions/impl/kafka/kafkaErr", fmt.Sprintf("return(%v)", i))
require.Error(t, ks.Provision(ctx, configs), i)
}
failpoint.Disable("github.com/lf-edge/ekuiper/v2/extensions/impl/kafka/kafkaErr")
}

func TestKafkaSinkBuildMsg(t *testing.T) {
configs := map[string]any{
"topic": "t",
"brokers": "localhost:9092",
"headers": map[string]any{
"a": "{{.a}}",
},
}
ks := &KafkaSink{}
ctx := mockContext.NewMockContext("1", "2")
require.NoError(t, ks.Provision(ctx, configs))
item := map[string]any{
"a": 1,
}
d, _ := json.Marshal(item)
msg, err := ks.buildMsg(ctx, item, d)
require.NoError(t, err)
require.Equal(t, "a", msg.Headers[0].Key)
b := make([]uint8, 0, 8)
b = strconv.AppendInt(b, int64(1), 10)
require.Equal(t, b, msg.Headers[0].Value)
}
Loading

0 comments on commit 1c269ea

Please sign in to comment.