From f94d404d098d1d6a4a1619642efeb1ee5719c677 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Thu, 16 Nov 2023 20:22:40 +0800 Subject: [PATCH] feat: support kafka source rewindable (#2401) Signed-off-by: yisaer --- docs/en_US/guide/sources/plugin/kafka.md | 5 ---- docs/zh_CN/guide/sources/plugin/kafka.md | 5 ---- etc/sources/kafka.yaml | 1 - extensions/sources/kafka/ext/kafka.go | 36 +++++++++++++++++++----- extensions/sources/kafka/kafka.json | 15 ---------- 5 files changed, 29 insertions(+), 33 deletions(-) diff --git a/docs/en_US/guide/sources/plugin/kafka.md b/docs/en_US/guide/sources/plugin/kafka.md index 13ce1ad568..fcd6a83203 100644 --- a/docs/en_US/guide/sources/plugin/kafka.md +++ b/docs/en_US/guide/sources/plugin/kafka.md @@ -22,7 +22,6 @@ default: groupID: "" partition: 0 maxBytes: 1000000 - offset: 0 ``` ### Global configurations @@ -44,7 +43,3 @@ The partition specified when eKuiper consumes kafka messages ### maxBytes The maximum number of bytes that a single Kafka message batch can carry, the default is 1MB - -### offset - -The offset specified when eKuiper starts consuming messages from kafka, -1 represents lastOffset, and -2 represents firstOffset. diff --git a/docs/zh_CN/guide/sources/plugin/kafka.md b/docs/zh_CN/guide/sources/plugin/kafka.md index 53f4d8da9f..5235eed546 100644 --- a/docs/zh_CN/guide/sources/plugin/kafka.md +++ b/docs/zh_CN/guide/sources/plugin/kafka.md @@ -22,7 +22,6 @@ default: groupID: "" partition: 0 maxBytes: 1000000 - offset: 0 ``` ### 全局配置 @@ -44,7 +43,3 @@ eKuiper 消费 kafka 消息时所指定的 partition ### maxBytes 单个 kafka 消息批次最大所能携带的 bytes 数,默认为 1MB - -### offset - -eKuiper 启动向 kafka 进行消息消费时所指定的 offset, -1 代表 lastOffset,-2 代表 firstOffset。 diff --git a/etc/sources/kafka.yaml b/etc/sources/kafka.yaml index 95d476c580..366620612d 100644 --- a/etc/sources/kafka.yaml +++ b/etc/sources/kafka.yaml @@ -3,4 +3,3 @@ default: groupID: "" partition: 0 maxBytes: 1000000 - offset: 0 diff --git a/extensions/sources/kafka/ext/kafka.go b/extensions/sources/kafka/ext/kafka.go index f0ad8ec2df..b9e3152deb 100644 --- a/extensions/sources/kafka/ext/kafka.go +++ b/extensions/sources/kafka/ext/kafka.go @@ -29,6 +29,7 @@ import ( type KafkaSource struct { reader *kafkago.Reader + offset int64 } type kafkaSourceConf struct { @@ -38,7 +39,6 @@ type kafkaSourceConf struct { Partition int `json:"partition"` MaxAttempts int `json:"maxAttempts"` MaxBytes int `json:"maxBytes"` - Offset int64 `json:"offset"` } func (c *kafkaSourceConf) validate() error { @@ -118,13 +118,10 @@ func (s *KafkaSource) Configure(topic string, props map[string]interface{}) erro SASLMechanism: mechanism, } reader := kafkago.NewReader(readerConfig) - if kConf.Offset != 0 { - if err := reader.SetOffset(kConf.Offset); err != nil { - conf.Log.Errorf("kafka offset error: %v", err) - return fmt.Errorf("set kafka offset failed, err:%v", err) - } - } s.reader = reader + if err := s.reader.SetOffset(kafkago.LastOffset); err != nil { + return err + } conf.Log.Infof("kafka source got configured.") return nil } @@ -144,6 +141,7 @@ func (s *KafkaSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTupl errCh <- err return } + s.offset = msg.Offset dataList, err := ctx.DecodeIntoList(msg.Value) if err != nil { logger.Errorf("unmarshal kafka message value err: %v", err) @@ -160,3 +158,27 @@ func (s *KafkaSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTupl func (s *KafkaSource) Close(_ api.StreamContext) error { return nil } + +func (s *KafkaSource) Rewind(offset interface{}) error { + conf.Log.Infof("set kafka source offset: %v", offset) + offsetV := s.offset //nolint:staticcheck + switch v := offset.(type) { + case int64: + offsetV = v + case int: + offsetV = int64(v) + case float64: + offsetV = int64(v) + default: + return fmt.Errorf("%v can't be set as offset", offset) + } + if err := s.reader.SetOffset(offsetV); err != nil { + conf.Log.Errorf("kafka offset error: %v", err) + return fmt.Errorf("set kafka offset failed, err:%v", err) + } + return nil +} + +func (s *KafkaSource) GetOffset() (interface{}, error) { + return s.offset, nil +} diff --git a/extensions/sources/kafka/kafka.json b/extensions/sources/kafka/kafka.json index 04a95432fe..dc6a378490 100644 --- a/extensions/sources/kafka/kafka.json +++ b/extensions/sources/kafka/kafka.json @@ -95,21 +95,6 @@ "zh_CN": "建立连接的尝试次数" } }, - { - "name": "offset", - "default": "0", - "optional": true, - "control": "text", - "type": "int", - "hint": { - "en_US": "the offset from which the next batch of messages will be read. -1 indicates latest offset and -2 indicates first offset.", - "zh_CN": "kafka consumer 的初始 offset, -1 默认为最新 offset, -2 默认为最旧 offset" - }, - "label": { - "en_US": "offset", - "zh_CN": "offset" - } - }, { "name": "saslAuthType", "default": "none",