Skip to content

Commit

Permalink
feat: Kafka sink send out array in batch (#2425)
Browse files Browse the repository at this point in the history
Signed-off-by: yisaer <[email protected]>
  • Loading branch information
Yisaer committed Nov 20, 2023
1 parent a1e303b commit 4803ca5
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 18 deletions.
1 change: 1 addition & 0 deletions docs/en_US/guide/sinks/plugin/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ Restart the eKuiper server to activate the plugin.
| privateKeyPath | true | Key file path for Kafka client SSL verification |
| rootCaPath | true | Kafka client ssl verified ca certificate file path |
| maxAttempts | true | The number of retries the Kafka client sends messages to the server, the default is 1 |
| requiredACKs | true | The mechanism for Kafka client to confirm messages, -1 means waiting for leader confirmation, 1 means waiting for confirmation from all replicas, 0 means not waiting for confirmation, default -1 |
| key | true | Key information carried by the Kafka client in messages sent to the server |
| headers | true | The header information carried by the Kafka client in the message sent to the server |

Expand Down
1 change: 1 addition & 0 deletions docs/zh_CN/guide/sinks/plugin/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ $(PLUGINS_CUSTOM):
| privateKeyPath || Kafka 客户端 ssl 验证的 key 文件路径 |
| rootCaPath || Kafka 客户端 ssl 验证的 ca 证书文件路径 |
| maxAttempts || Kafka 客户端向 server 发送消息的重试次数,默认为1 |
| requiredACKs || Kafka 客户端确认消息的机制,-1 代表等待 leader 确认,1 代表等待所有副本确认, 0 代表不等待确认, 默认为 -1|
| key || Kafka 客户端向 server 发送消息所携带的 Key 信息 |
| headers || Kafka 客户端向 server 发送消息所携带的 headers 信息 |

Expand Down
48 changes: 31 additions & 17 deletions extensions/sinks/kafka/ext/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package kafka

import (
"context"
"encoding/json"
"fmt"
"strings"
Expand Down Expand Up @@ -44,9 +45,10 @@ type sinkConf struct {
}

type kafkaConf struct {
MaxAttempts int `json:"maxAttempts"`
Key string `json:"key"`
Headers interface{} `json:"headers"`
MaxAttempts int `json:"maxAttempts"`
RequiredACKs int `json:"requiredACKs"`
Key string `json:"key"`
Headers interface{} `json:"headers"`
}

func (m *kafkaSink) Configure(props map[string]interface{}) error {
Expand Down Expand Up @@ -80,7 +82,8 @@ func (m *kafkaSink) Configure(props map[string]interface{}) error {
return err
}
kc := &kafkaConf{
MaxAttempts: 1,
RequiredACKs: -1,
MaxAttempts: 1,
}
if err := cast.MapToStruct(props, kc); err != nil {
return err
Expand All @@ -92,11 +95,10 @@ func (m *kafkaSink) Configure(props map[string]interface{}) error {
return fmt.Errorf("set kafka header failed, err:%v", err)
}
m.tc.TlsConfigLog("sink")
return nil
return m.buildKafkaWriter()
}

func (m *kafkaSink) Open(ctx api.StreamContext) error {
ctx.GetLogger().Debug("Opening kafka sink")
func (m *kafkaSink) buildKafkaWriter() error {
mechanism, err := m.sc.GetMechanism()
if err != nil {
return err
Expand All @@ -114,33 +116,45 @@ func (m *kafkaSink) Open(ctx api.StreamContext) error {
Async: false,
AllowAutoTopicCreation: true,
MaxAttempts: m.kc.MaxAttempts,
RequiredAcks: -1,
RequiredAcks: kafkago.RequiredAcks(m.kc.RequiredACKs),
BatchSize: 1,
Transport: &kafkago.Transport{
SASL: mechanism,
TLS: tlsConfig,
},
}
// ping message
err = w.WriteMessages(context.Background(), kafkago.Message{})
if err != nil {
return err
}
m.writer = w
return nil
}

func (m *kafkaSink) Open(ctx api.StreamContext) error {
ctx.GetLogger().Debug("Opening kafka sink")
return nil
}

func (m *kafkaSink) Collect(ctx api.StreamContext, item interface{}) error {
logger := ctx.GetLogger()
logger.Debugf("kafka sink receive %s", item)
var messages []kafkago.Message
switch d := item.(type) {
case []map[string]interface{}:
decodedBytes, _, err := ctx.TransformOutput(d)
if err != nil {
return fmt.Errorf("kafka sink transform data error: %v", err)
}
msg, err := m.buildMsg(ctx, item, decodedBytes)
if err != nil {
conf.Log.Errorf("build kafka msg failed, err:%v", err)
return err
for _, msg := range d {
decodedBytes, _, err := ctx.TransformOutput(msg)
if err != nil {
return fmt.Errorf("kafka sink transform data error: %v", err)
}
kafkaMsg, err := m.buildMsg(ctx, item, decodedBytes)
if err != nil {
conf.Log.Errorf("build kafka msg failed, err:%v", err)
return err
}
messages = append(messages, kafkaMsg)
}
messages = append(messages, msg)
case map[string]interface{}:
decodedBytes, _, err := ctx.TransformOutput(d)
if err != nil {
Expand Down
17 changes: 16 additions & 1 deletion extensions/sinks/kafka/kafka.json
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@
},
{
"name": "maxAttempts",
"default": true,
"default": 1,
"optional": true,
"control": "text",
"type": "int",
Expand All @@ -178,6 +178,21 @@
"zh_CN": "消息发送重发次数"
}
},
{
"name": "requiredACKs",
"default": -1,
"optional": true,
"control": "text",
"type": "int",
"hint": {
"en_US": "Number of acknowledges from partition replicas required before receiving a response to a produce request, -1 means wait for the full ISR to acknowledge the writes,1 means wait for the leader to acknowledge the writes,0 means fire-and-forget",
"zh_CN": "kafka 客户端发送消息确认回复的行为控制,-1 代表等待仅 leader 确认,1 代表等待所有副本确认,0 代表等待无需确认"
},
"label": {
"en_US": "requiredACKs",
"zh_CN": "消息发送确认机制"
}
},
{
"name": "headers",
"default": {},
Expand Down

0 comments on commit 4803ca5

Please sign in to comment.