From 4803ca59fe67693a103dd036b6c6ab842d06cb27 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Mon, 20 Nov 2023 14:31:22 +0800 Subject: [PATCH] feat: Kafka sink send out array in batch (#2425) Signed-off-by: yisaer --- docs/en_US/guide/sinks/plugin/kafka.md | 1 + docs/zh_CN/guide/sinks/plugin/kafka.md | 1 + extensions/sinks/kafka/ext/kafka.go | 48 +++++++++++++++++--------- extensions/sinks/kafka/kafka.json | 17 ++++++++- 4 files changed, 49 insertions(+), 18 deletions(-) diff --git a/docs/en_US/guide/sinks/plugin/kafka.md b/docs/en_US/guide/sinks/plugin/kafka.md index 2f3fa87ed3..9467a08e15 100644 --- a/docs/en_US/guide/sinks/plugin/kafka.md +++ b/docs/en_US/guide/sinks/plugin/kafka.md @@ -66,6 +66,7 @@ Restart the eKuiper server to activate the plugin. | privateKeyPath | true | Key file path for Kafka client SSL verification | | rootCaPath | true | Kafka client ssl verified ca certificate file path | | maxAttempts | true | The number of retries the Kafka client sends messages to the server, the default is 1 | +| requiredACKs | true | The mechanism for Kafka client to confirm messages, -1 means waiting for leader confirmation, 1 means waiting for confirmation from all replicas, 0 means not waiting for confirmation, default -1 | | key | true | Key information carried by the Kafka client in messages sent to the server | | headers | true | The header information carried by the Kafka client in the message sent to the server | diff --git a/docs/zh_CN/guide/sinks/plugin/kafka.md b/docs/zh_CN/guide/sinks/plugin/kafka.md index 825a6f14b5..bf3961845f 100644 --- a/docs/zh_CN/guide/sinks/plugin/kafka.md +++ b/docs/zh_CN/guide/sinks/plugin/kafka.md @@ -66,6 +66,7 @@ $(PLUGINS_CUSTOM): | privateKeyPath | 是 | Kafka 客户端 ssl 验证的 key 文件路径 | | rootCaPath | 是 | Kafka 客户端 ssl 验证的 ca 证书文件路径 | | maxAttempts | 是 | Kafka 客户端向 server 发送消息的重试次数,默认为1 | +| requiredACKs | 是 | Kafka 客户端确认消息的机制,-1 代表等待 leader 确认,1 代表等待所有副本确认, 0 代表不等待确认, 默认为 -1| | key | 是 | Kafka 客户端向 server 发送消息所携带的 Key 信息 | | headers | 是 | Kafka 客户端向 server 发送消息所携带的 headers 信息 | diff --git a/extensions/sinks/kafka/ext/kafka.go b/extensions/sinks/kafka/ext/kafka.go index 6255ca8a89..8d3c8d2a24 100644 --- a/extensions/sinks/kafka/ext/kafka.go +++ b/extensions/sinks/kafka/ext/kafka.go @@ -15,6 +15,7 @@ package kafka import ( + "context" "encoding/json" "fmt" "strings" @@ -44,9 +45,10 @@ type sinkConf struct { } type kafkaConf struct { - MaxAttempts int `json:"maxAttempts"` - Key string `json:"key"` - Headers interface{} `json:"headers"` + MaxAttempts int `json:"maxAttempts"` + RequiredACKs int `json:"requiredACKs"` + Key string `json:"key"` + Headers interface{} `json:"headers"` } func (m *kafkaSink) Configure(props map[string]interface{}) error { @@ -80,7 +82,8 @@ func (m *kafkaSink) Configure(props map[string]interface{}) error { return err } kc := &kafkaConf{ - MaxAttempts: 1, + RequiredACKs: -1, + MaxAttempts: 1, } if err := cast.MapToStruct(props, kc); err != nil { return err @@ -92,11 +95,10 @@ func (m *kafkaSink) Configure(props map[string]interface{}) error { return fmt.Errorf("set kafka header failed, err:%v", err) } m.tc.TlsConfigLog("sink") - return nil + return m.buildKafkaWriter() } -func (m *kafkaSink) Open(ctx api.StreamContext) error { - ctx.GetLogger().Debug("Opening kafka sink") +func (m *kafkaSink) buildKafkaWriter() error { mechanism, err := m.sc.GetMechanism() if err != nil { return err @@ -114,33 +116,45 @@ func (m *kafkaSink) Open(ctx api.StreamContext) error { Async: false, AllowAutoTopicCreation: true, MaxAttempts: m.kc.MaxAttempts, - RequiredAcks: -1, + RequiredAcks: kafkago.RequiredAcks(m.kc.RequiredACKs), BatchSize: 1, Transport: &kafkago.Transport{ SASL: mechanism, TLS: tlsConfig, }, } + // ping message + err = w.WriteMessages(context.Background(), kafkago.Message{}) + if err != nil { + return err + } m.writer = w return nil } +func (m *kafkaSink) Open(ctx api.StreamContext) error { + ctx.GetLogger().Debug("Opening kafka sink") + return nil +} + func (m *kafkaSink) Collect(ctx api.StreamContext, item interface{}) error { logger := ctx.GetLogger() logger.Debugf("kafka sink receive %s", item) var messages []kafkago.Message switch d := item.(type) { case []map[string]interface{}: - decodedBytes, _, err := ctx.TransformOutput(d) - if err != nil { - return fmt.Errorf("kafka sink transform data error: %v", err) - } - msg, err := m.buildMsg(ctx, item, decodedBytes) - if err != nil { - conf.Log.Errorf("build kafka msg failed, err:%v", err) - return err + for _, msg := range d { + decodedBytes, _, err := ctx.TransformOutput(msg) + if err != nil { + return fmt.Errorf("kafka sink transform data error: %v", err) + } + kafkaMsg, err := m.buildMsg(ctx, item, decodedBytes) + if err != nil { + conf.Log.Errorf("build kafka msg failed, err:%v", err) + return err + } + messages = append(messages, kafkaMsg) } - messages = append(messages, msg) case map[string]interface{}: decodedBytes, _, err := ctx.TransformOutput(d) if err != nil { diff --git a/extensions/sinks/kafka/kafka.json b/extensions/sinks/kafka/kafka.json index 59fd71ec20..84e6fa0a3a 100644 --- a/extensions/sinks/kafka/kafka.json +++ b/extensions/sinks/kafka/kafka.json @@ -165,7 +165,7 @@ }, { "name": "maxAttempts", - "default": true, + "default": 1, "optional": true, "control": "text", "type": "int", @@ -178,6 +178,21 @@ "zh_CN": "消息发送重发次数" } }, + { + "name": "requiredACKs", + "default": -1, + "optional": true, + "control": "text", + "type": "int", + "hint": { + "en_US": "Number of acknowledges from partition replicas required before receiving a response to a produce request, -1 means wait for the full ISR to acknowledge the writes,1 means wait for the leader to acknowledge the writes,0 means fire-and-forget", + "zh_CN": "kafka 客户端发送消息确认回复的行为控制,-1 代表等待仅 leader 确认,1 代表等待所有副本确认,0 代表等待无需确认" + }, + "label": { + "en_US": "requiredACKs", + "zh_CN": "消息发送确认机制" + } + }, { "name": "headers", "default": {},