Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: restore kafka #3033

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
262 changes: 262 additions & 0 deletions extensions/impl/kafka/sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafka

import (
"crypto/tls"
"encoding/json"
"fmt"
"strings"

"github.com/pingcap/failpoint"
kafkago "github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl"

"github.com/lf-edge/ekuiper/contract/v2/api"
"github.com/lf-edge/ekuiper/v2/pkg/cast"
"github.com/lf-edge/ekuiper/v2/pkg/cert"
)

type KafkaSink struct {
writer *kafkago.Writer
kc *kafkaConf
tlsConfig *tls.Config
headersMap map[string]string
headerTemplate string
saslConf *saslConf
mechanism sasl.Mechanism
}

type kafkaConf struct {
Brokers string `json:"brokers"`
Topic string `json:"topic"`
MaxAttempts int `json:"maxAttempts"`
RequiredACKs int `json:"requiredACKs"`
Key string `json:"key"`
Headers interface{} `json:"headers"`
}

func (c *kafkaConf) validate() error {
if c.Topic == "" {
return fmt.Errorf("topic can not be empty")
}
if len(c.Brokers) < 1 {
return fmt.Errorf("brokers can not be empty")
}
return nil
}

func (k *KafkaSink) Provision(ctx api.StreamContext, configs map[string]any) error {
c := &kafkaConf{
RequiredACKs: -1,
MaxAttempts: 1,
}
err := cast.MapToStruct(configs, c)
failpoint.Inject("kafkaErr", func(val failpoint.Value) {
err = mockKakfaSourceErr(val.(int), castConfErr)
})
if err != nil {
return err
}
err = c.validate()
if err != nil {
return err
}
sc, err := getSaslConf(configs)
failpoint.Inject("kafkaErr", func(val failpoint.Value) {
err = mockKakfaSourceErr(val.(int), saslConfErr)
})
if err != nil {
return err
}
if err := sc.Validate(); err != nil {
return err

Check warning on line 85 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L85

Added line #L85 was not covered by tests
}
k.saslConf = sc
tlsConfig, err := cert.GenTLSConfig(configs, "kafka-sink")
if err != nil {
return err
}
mechanism, err := k.saslConf.GetMechanism()
failpoint.Inject("kafkaErr", func(val failpoint.Value) {
err = mockKakfaSourceErr(val.(int), mechanismErr)
})
if err != nil {
return err
}
k.mechanism = mechanism
k.tlsConfig = tlsConfig
k.kc = c
err = k.setHeaders()
if err != nil {
return err
}
return nil
}

func (k *KafkaSink) buildKafkaWriter() error {
brokers := strings.Split(k.kc.Brokers, ",")
w := &kafkago.Writer{
Addr: kafkago.TCP(brokers...),
Topic: k.kc.Topic,
// kafka java-client default balancer
Balancer: &kafkago.Murmur2Balancer{},
Async: false,
AllowAutoTopicCreation: true,
MaxAttempts: k.kc.MaxAttempts,
RequiredAcks: kafkago.RequiredAcks(k.kc.RequiredACKs),
BatchSize: 1,
Transport: &kafkago.Transport{
SASL: k.mechanism,
TLS: k.tlsConfig,
},
}
k.writer = w
return nil
}

func (k *KafkaSink) Close(ctx api.StreamContext) error {
return k.writer.Close()
}

func (k *KafkaSink) Connect(ctx api.StreamContext) error {
return k.buildKafkaWriter()
}

func (k *KafkaSink) Collect(ctx api.StreamContext, item api.MessageTuple) error {
msgs, err := k.collect(ctx, item)
if err != nil {
return err

Check warning on line 141 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L138-L141

Added lines #L138 - L141 were not covered by tests
}
return k.writer.WriteMessages(ctx, msgs...)

Check warning on line 143 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L143

Added line #L143 was not covered by tests
}

func (k *KafkaSink) CollectList(ctx api.StreamContext, items api.MessageTupleList) error {
allMsgs := make([]kafkago.Message, 0)
items.RangeOfTuples(func(index int, tuple api.MessageTuple) bool {
msgs, err := k.collect(ctx, tuple)
if err != nil {
return false

Check warning on line 151 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L146-L151

Added lines #L146 - L151 were not covered by tests
}
allMsgs = append(allMsgs, msgs...)
return true

Check warning on line 154 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L153-L154

Added lines #L153 - L154 were not covered by tests
})
return k.writer.WriteMessages(ctx, allMsgs...)

Check warning on line 156 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L156

Added line #L156 was not covered by tests
}

func (k *KafkaSink) collect(ctx api.StreamContext, item api.MessageTuple) ([]kafkago.Message, error) {
ds, err := json.Marshal(item.ToMap())
if err != nil {
return nil, err

Check warning on line 162 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L162

Added line #L162 was not covered by tests
}
var messages []kafkago.Message
msg, err := k.buildMsg(ctx, item, ds)
if err != nil {
return nil, err

Check warning on line 167 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L167

Added line #L167 was not covered by tests
}
messages = append(messages, msg)
return messages, nil
}

func (k *KafkaSink) buildMsg(ctx api.StreamContext, item api.MessageTuple, decodedBytes []byte) (kafkago.Message, error) {
msg := kafkago.Message{Value: decodedBytes}
if len(k.kc.Key) > 0 {
newKey := k.kc.Key
if dp, ok := item.(api.HasDynamicProps); ok {
key, ok := dp.DynamicProps("key")
if ok {
newKey = key
}
}
msg.Key = []byte(newKey)
}
headers, err := k.parseHeaders(ctx, item)
if err != nil {
return kafkago.Message{}, fmt.Errorf("parse kafka headers error: %v", err)

Check warning on line 187 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L187

Added line #L187 was not covered by tests
}
msg.Headers = headers
return msg, nil
}

func (k *KafkaSink) setHeaders() error {
if k.kc.Headers == nil {
return nil
}
switch h := k.kc.Headers.(type) {
case map[string]interface{}:
kafkaHeaders := make(map[string]string)
for key, value := range h {
if sv, ok := value.(string); ok {
kafkaHeaders[key] = sv
}
}
k.headersMap = kafkaHeaders
return nil
case string:
k.headerTemplate = h
return nil

Check warning on line 209 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L207-L209

Added lines #L207 - L209 were not covered by tests
default:
return fmt.Errorf("kafka headers must be a map[string]string or a string")
}
}

func (k *KafkaSink) parseHeaders(ctx api.StreamContext, item api.MessageTuple) ([]kafkago.Header, error) {
if len(k.headersMap) > 0 {
var kafkaHeaders []kafkago.Header
for k, v := range k.headersMap {
value := v
dp, ok := item.(api.HasDynamicProps)
if ok {
nv, ok := dp.DynamicProps(k)
if ok {
value = nv
}
}
kafkaHeaders = append(kafkaHeaders, kafkago.Header{
Key: k,
Value: []byte(value),
})
}
return kafkaHeaders, nil
} else if len(k.headerTemplate) > 0 {
raw := k.headerTemplate
dp, ok := item.(api.HasDynamicProps)
if ok {
nv, ok := dp.DynamicProps("headers")
if ok {
raw = nv

Check warning on line 239 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L234-L239

Added lines #L234 - L239 were not covered by tests
}
}
headers := make(map[string]string)
if err := json.Unmarshal([]byte(raw), &headers); err != nil {
return nil, err

Check warning on line 244 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L242-L244

Added lines #L242 - L244 were not covered by tests
}
var kafkaHeaders []kafkago.Header
for key, value := range headers {
kafkaHeaders = append(kafkaHeaders, kafkago.Header{
Key: key,
Value: []byte(value),
})

Check warning on line 251 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L246-L251

Added lines #L246 - L251 were not covered by tests
}
return kafkaHeaders, nil

Check warning on line 253 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L253

Added line #L253 was not covered by tests
}
return nil, nil
}

func GetSink() api.Sink {
return &KafkaSink{}

Check warning on line 259 in extensions/impl/kafka/sink.go

View check run for this annotation

Codecov / codecov/patch

extensions/impl/kafka/sink.go#L258-L259

Added lines #L258 - L259 were not covered by tests
}

var _ api.TupleCollector = &KafkaSink{}
126 changes: 126 additions & 0 deletions extensions/impl/kafka/sink_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafka

import (
"encoding/json"
"fmt"
"strconv"
"testing"

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/require"

"github.com/lf-edge/ekuiper/v2/internal/testx"
mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context"
)

func TestKafkaSink(t *testing.T) {
ks := &KafkaSink{}
testcases := []struct {
configs map[string]any
}{
{
configs: map[string]any{},
},
{
configs: map[string]any{
"topic": "t",
},
},
{
configs: map[string]any{
"topic": "t",
"brokers": "localhost:9092",
"certificationRaw": "mockErr",
},
},
{
configs: map[string]any{
"datasource": "t",
"brokers": "localhost:9092",
"saslAuthType": "mockErr",
},
},
{
configs: map[string]any{
"datasource": "t",
"brokers": "localhost:9092",
"saslAuthType": "plain",
},
},
{
configs: map[string]any{
"topic": "t",
"brokers": "localhost:9092",
"headers": 1,
},
},
}
ctx := mockContext.NewMockContext("1", "2")
for index, tc := range testcases {
require.Error(t, ks.Provision(ctx, tc.configs), index)
}
configs := map[string]any{
"topic": "t",
"brokers": "localhost:9092",
}
require.NoError(t, ks.Provision(ctx, configs))
require.NoError(t, ks.Connect(ctx))
mockT := testx.MockTuple{
Map: map[string]any{"1": 1},
}
msgs, err := ks.collect(ctx, mockT)
require.Len(t, msgs, 1)
require.NoError(t, err)
require.NoError(t, ks.Close(ctx))

for i := mockErrStart + 1; i < mockErrEnd; i++ {
failpoint.Enable("github.com/lf-edge/ekuiper/v2/extensions/impl/kafka/kafkaErr", fmt.Sprintf("return(%v)", i))
require.Error(t, ks.Provision(ctx, configs), i)
}
failpoint.Disable("github.com/lf-edge/ekuiper/v2/extensions/impl/kafka/kafkaErr")
}

func TestKafkaSinkBuildMsg(t *testing.T) {
configs := map[string]any{
"topic": "t",
"brokers": "localhost:9092",
"headers": map[string]any{
"a": "{{.a}}",
},
"key": "{{.a}}",
}
ks := &KafkaSink{}
ctx := mockContext.NewMockContext("1", "2")
require.NoError(t, ks.Provision(ctx, configs))
require.NoError(t, ks.Connect(ctx))
item := map[string]any{
"a": 1,
}
d, _ := json.Marshal(item)
mockT := testx.MockTuple{
Map: item,
Template: map[string]string{"a": "1", "key": "1"},
}
msg, err := ks.buildMsg(ctx, mockT, d)
require.NoError(t, err)
require.Equal(t, "a", msg.Headers[0].Key)
b := make([]uint8, 0, 8)
b = strconv.AppendInt(b, int64(1), 10)
require.Equal(t, b, msg.Headers[0].Value)
require.Equal(t, []byte("1"), msg.Key)
require.NoError(t, ks.Close(ctx))
}
Loading
Loading