From 06b618ed5984ec9c16788e28fd66db33dca62c57 Mon Sep 17 00:00:00 2001 From: Jiyong Huang Date: Thu, 28 Mar 2024 15:01:16 +0800 Subject: [PATCH] feat(planner): plan sink batch as an op Signed-off-by: Jiyong Huang --- internal/io/sink/send_manager.go | 139 ----------------- internal/io/sink/send_manager_test.go | 165 -------------------- internal/topo/node/batch_op_test.go | 1 - internal/topo/node/sink_node.go | 73 +++------ internal/topo/node/sink_node_test.go | 53 +------ internal/topo/planner/planner.go | 21 +-- internal/topo/planner/sink_planner.go | 77 ++++++++++ internal/topo/planner/sink_planner_test.go | 168 +++++++++++++++++++++ 8 files changed, 273 insertions(+), 424 deletions(-) delete mode 100644 internal/io/sink/send_manager.go delete mode 100644 internal/io/sink/send_manager_test.go create mode 100644 internal/topo/planner/sink_planner.go create mode 100644 internal/topo/planner/sink_planner_test.go diff --git a/internal/io/sink/send_manager.go b/internal/io/sink/send_manager.go deleted file mode 100644 index a065cf350c..0000000000 --- a/internal/io/sink/send_manager.go +++ /dev/null @@ -1,139 +0,0 @@ -// Copyright 2023 EMQ Technologies Co., Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sink - -import ( - "context" - "fmt" - - "github.com/lf-edge/ekuiper/internal/conf" -) - -type SendManager struct { - lingerInterval int - batchSize int - bufferCh chan map[string]interface{} - buffer []map[string]interface{} - outputCh chan []map[string]interface{} - currIndex int - finished bool -} - -func NewSendManager(batchSize, lingerInterval int) (*SendManager, error) { - if batchSize < 1 && lingerInterval < 1 { - return nil, fmt.Errorf("either batchSize or lingerInterval should be larger than 0") - } - sm := &SendManager{ - batchSize: batchSize, - lingerInterval: lingerInterval, - } - if batchSize == 0 { - batchSize = 1024 - } - sm.buffer = make([]map[string]interface{}, batchSize) - sm.bufferCh = make(chan map[string]interface{}) - sm.outputCh = make(chan []map[string]interface{}, 16) - return sm, nil -} - -func (sm *SendManager) RecvData(d map[string]interface{}) { - sm.bufferCh <- d -} - -func (sm *SendManager) Run(ctx context.Context) { - defer sm.finish() - switch { - case sm.batchSize > 0 && sm.lingerInterval > 0: - sm.runWithTickerAndBatchSize(ctx) - case sm.batchSize > 0 && sm.lingerInterval == 0: - sm.runWithBatchSize(ctx) - case sm.batchSize == 0 && sm.lingerInterval > 0: - sm.runWithTicker(ctx) - } -} - -func (sm *SendManager) runWithTicker(ctx context.Context) { - ticker := conf.GetTicker(int64(sm.lingerInterval)) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return - case d := <-sm.bufferCh: - sm.appendDataInBuffer(d, false) - case <-ticker.C: - sm.send() - } - } -} - -func (sm *SendManager) runWithBatchSize(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case d := <-sm.bufferCh: - sm.appendDataInBuffer(d, true) - } - } -} - -func (sm *SendManager) runWithTickerAndBatchSize(ctx context.Context) { - ticker := conf.GetTicker(int64(sm.lingerInterval)) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return - case d := <-sm.bufferCh: - sm.appendDataInBuffer(d, true) - case <-ticker.C: - sm.send() - } - } -} - -func (sm *SendManager) send() { - if sm.currIndex < 1 { - return - } - list := make([]map[string]interface{}, sm.currIndex) - for i := 0; i < sm.currIndex; i++ { - list[i] = sm.buffer[i] - } - sm.currIndex = 0 - sm.outputCh <- list -} - -func (sm *SendManager) appendDataInBuffer(d map[string]interface{}, sendData bool) { - if sm.currIndex >= len(sm.buffer) { - // The buffer should be enlarged if the data length is larger than capacity during runWithTicker - sm.buffer = append(sm.buffer, d) - } else { - sm.buffer[sm.currIndex] = d - } - sm.currIndex++ - if sendData && sm.currIndex >= sm.batchSize { - sm.send() - } -} - -func (sm *SendManager) GetOutputChan() <-chan []map[string]interface{} { - return sm.outputCh -} - -func (sm *SendManager) finish() { - sm.finished = true -} diff --git a/internal/io/sink/send_manager_test.go b/internal/io/sink/send_manager_test.go deleted file mode 100644 index bbbee08aea..0000000000 --- a/internal/io/sink/send_manager_test.go +++ /dev/null @@ -1,165 +0,0 @@ -// Copyright 2023 EMQ Technologies Co., Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sink - -import ( - "context" - "fmt" - "testing" - "time" - - "github.com/benbjohnson/clock" - - "github.com/lf-edge/ekuiper/internal/conf" -) - -func TestSendManager(t *testing.T) { - testcases := []struct { - sendCount int - batchSize int - lingerInterval int - err string - expectItems int - }{ - { - batchSize: 0, - lingerInterval: 0, - err: "either batchSize or lingerInterval should be larger than 0", - }, - { - sendCount: 3, - batchSize: 3, - lingerInterval: 0, - expectItems: 3, - }, - { - sendCount: 4, - batchSize: 10, - lingerInterval: 100, - expectItems: 4, - }, - { - sendCount: 4, - batchSize: 0, - lingerInterval: 100, - expectItems: 4, - }, - { - sendCount: 6, - batchSize: 3, - lingerInterval: 3000, - expectItems: 3, - }, - } - mc := conf.Clock.(*clock.Mock) - for i, tc := range testcases { - mc.Set(mc.Now()) - testF := func() error { - sm, err := NewSendManager(tc.batchSize, tc.lingerInterval) - if len(tc.err) > 0 { - if err == nil || err.Error() != tc.err { - return fmt.Errorf("expect err:%v, actual: %v", tc.err, err) - } - return nil - } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - go sm.Run(ctx) - for i := 0; i < tc.sendCount; i++ { - sm.RecvData(map[string]interface{}{}) - mc.Add(30 * time.Millisecond) - } - r := <-sm.GetOutputChan() - if len(r) != tc.expectItems { - return fmt.Errorf("testcase %v expect %v output data, actual %v", i, tc.expectItems, len(r)) - } - return nil - } - if err := testF(); err != nil { - t.Fatal(err) - } - } -} - -func TestSendEmpty(t *testing.T) { - sm, err := NewSendManager(1, 1) - if err != nil { - t.Fatal(err) - } - sm.outputCh = make(chan []map[string]interface{}) - // test shouldn't be blocked - sm.send() -} - -func TestCancelRun(t *testing.T) { - testcases := []struct { - batchSize int - lingerInterval int - }{ - { - batchSize: 0, - lingerInterval: 1, - }, - { - batchSize: 3, - lingerInterval: 0, - }, - { - batchSize: 10, - lingerInterval: 100, - }, - } - for _, tc := range testcases { - sm, err := NewSendManager(tc.batchSize, tc.lingerInterval) - if err != nil { - t.Fatal(err) - } - ctx, cancel := context.WithCancel(context.Background()) - c := make(chan struct{}) - go func() { - sm.Run(ctx) - c <- struct{}{} - }() - cancel() - <-c - if !sm.finished { - t.Fatal("send manager should be finished") - } - } -} - -func TestEnlargeSendManagerCap(t *testing.T) { - sm, err := NewSendManager(0, 1000) - if err != nil { - t.Fatal(err) - } - count := 1025 - for i := 0; i < count; i++ { - go sm.RecvData(map[string]interface{}{}) - sm.appendDataInBuffer(<-sm.bufferCh, false) - } - if len(sm.buffer) != count { - t.Fatal(fmt.Sprintf("sm buffer should be %v", count)) - } - if sm.currIndex != count { - t.Fatal(fmt.Sprintf("sm index should be %v", count)) - } - originCap := cap(sm.buffer) - originLen := len(sm.buffer) - sm.send() - if sm.currIndex != 0 || originCap != cap(sm.buffer) || originLen != len(sm.buffer) { - t.Fatal("sm buffer capacity shouldn't be changed after send") - } -} diff --git a/internal/topo/node/batch_op_test.go b/internal/topo/node/batch_op_test.go index c6849d9b63..a2a587793c 100644 --- a/internal/topo/node/batch_op_test.go +++ b/internal/topo/node/batch_op_test.go @@ -68,7 +68,6 @@ func TestRun(t *testing.T) { } mc := conf.Clock.(*clock.Mock) for i, tc := range testcases { - mc.Set(mc.Now()) t.Run(fmt.Sprintf("testcase %d", i), func(t *testing.T) { op, err := NewBatchOp("test", &api.RuleOption{BufferLength: 10, SendError: true}, tc.batchSize, tc.lingerInterval) if len(tc.err) > 0 { diff --git a/internal/topo/node/sink_node.go b/internal/topo/node/sink_node.go index 382b6044ed..73f52c2e07 100644 --- a/internal/topo/node/sink_node.go +++ b/internal/topo/node/sink_node.go @@ -19,7 +19,6 @@ import ( "github.com/lf-edge/ekuiper/internal/binder/io" "github.com/lf-edge/ekuiper/internal/conf" - sinkUtil "github.com/lf-edge/ekuiper/internal/io/sink" "github.com/lf-edge/ekuiper/internal/topo/context" "github.com/lf-edge/ekuiper/internal/topo/node/cache" nodeConf "github.com/lf-edge/ekuiper/internal/topo/node/conf" @@ -49,13 +48,6 @@ type SinkConf struct { conf.SinkConf } -func (sc *SinkConf) isBatchSinkEnabled() bool { - if sc.BatchSize > 0 || sc.LingerInterval > 0 { - return true - } - return false -} - type SinkNode struct { *defaultSinkNode // static @@ -91,7 +83,8 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) { logger.Debugf("open sink node %s", m.name) go func() { err := infra.SafeRun(func() error { - sconf, err := m.parseConf(logger) + sconf, err := ParseConf(logger, m.options) + m.concurrency = sconf.Concurrency if err != nil { return err } @@ -112,7 +105,7 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) { ctx = context.WithValue(ctx.(*context.DefaultContext), context.TransKey, tf) m.reset() - logger.Infof("open sink node %d instances with batchSize", m.concurrency, sconf.BatchSize) + logger.Infof("open sink node %d instances with batchSize %d", m.concurrency, sconf.BatchSize) go func(instance int) { panicOrError := infra.SafeRun(func() error { @@ -147,32 +140,15 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) { dataOutCh <-chan []map[string]interface{} resendCh chan []map[string]interface{} - sendManager *sinkUtil.SendManager - c *cache.SyncCache - rq *cache.SyncCache + c *cache.SyncCache + rq *cache.SyncCache ) logger.Infof("sink node %s instance %d starts with conf %+v", m.name, instance, *sconf) - if sconf.isBatchSinkEnabled() { - sendManager, err = sinkUtil.NewSendManager(sconf.BatchSize, sconf.LingerInterval) - if err != nil { - return err - } - go sendManager.Run(ctx) - } - if !sconf.EnableCache { - if sendManager != nil { - dataOutCh = sendManager.GetOutputChan() - } else { - dataOutCh = dataCh - } + dataOutCh = dataCh } else { - if sendManager != nil { - c = cache.NewSyncCache(ctx, sendManager.GetOutputChan(), result, &sconf.SinkConf, sconf.BufferLength) - } else { - c = cache.NewSyncCache(ctx, dataCh, result, &sconf.SinkConf, sconf.BufferLength) - } + c = cache.NewSyncCache(ctx, dataCh, result, &sconf.SinkConf, sconf.BufferLength) if sconf.ResendAlterQueue { resendCh = make(chan []map[string]interface{}, sconf.BufferLength) rq = cache.NewSyncCache(ctx, resendCh, result, &sconf.SinkConf, sconf.BufferLength) @@ -192,16 +168,10 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) { ctx.GetLogger().Debugf("receive empty in sink") return } - if sconf.isBatchSinkEnabled() { - for _, out := range outs { - sendManager.RecvData(out) - } - } else { - select { - case dataCh <- outs: - default: - ctx.GetLogger().Warnf("sink node %s instance %d buffer is full, drop data %v", m.name, instance, outs) - } + select { + case dataCh <- outs: + default: + ctx.GetLogger().Warnf("sink node %s instance %d buffer is full, drop data %v", m.name, instance, outs) } if resendCh != nil { select { @@ -383,7 +353,7 @@ func checkAck(ctx api.StreamContext, data interface{}, err error) bool { return true } -func (m *SinkNode) parseConf(logger api.Logger) (*SinkConf, error) { +func ParseConf(logger api.Logger, props map[string]any) (*SinkConf, error) { sconf := &SinkConf{ Concurrency: 1, Omitempty: false, @@ -392,30 +362,35 @@ func (m *SinkNode) parseConf(logger api.Logger) (*SinkConf, error) { SinkConf: *conf.Config.Sink, BufferLength: 1024, } - err := cast.MapToStruct(m.options, sconf) + err := cast.MapToStruct(props, sconf) if err != nil { - return nil, fmt.Errorf("read properties %v fail with error: %v", m.options, err) + return nil, fmt.Errorf("read properties %v fail with error: %v", props, err) } if sconf.Concurrency <= 0 { - logger.Warnf("invalid type for concurrency property, should be positive integer but found %t", sconf.Concurrency) + logger.Warnf("invalid type for concurrency property, should be positive integer but found %d", sconf.Concurrency) sconf.Concurrency = 1 } - m.concurrency = sconf.Concurrency if sconf.Format == "" { sconf.Format = "json" } else if sconf.Format != message.FormatJson && sconf.Format != message.FormatProtobuf && sconf.Format != message.FormatBinary && sconf.Format != message.FormatCustom && sconf.Format != message.FormatDelimited { logger.Warnf("invalid type for format property, should be json protobuf or binary but found %s", sconf.Format) sconf.Format = "json" } - err = cast.MapToStruct(m.options, &sconf.SinkConf) + err = cast.MapToStruct(props, &sconf.SinkConf) if err != nil { - return nil, fmt.Errorf("read properties %v to cache conf fail with error: %v", m.options, err) + return nil, fmt.Errorf("read properties %v to cache conf fail with error: %v", props, err) } if sconf.DataField == "" { - if v, ok := m.options["tableDataField"]; ok { + if v, ok := props["tableDataField"]; ok { sconf.DataField = v.(string) } } + if sconf.BatchSize < 0 { + return nil, fmt.Errorf("invalid batchSize %d", sconf.BatchSize) + } + if sconf.LingerInterval < 0 { + return nil, fmt.Errorf("invalid lingerInterval %d", sconf.LingerInterval) + } err = sconf.SinkConf.Validate() if err != nil { return nil, fmt.Errorf("invalid cache properties: %v", err) diff --git a/internal/topo/node/sink_node_test.go b/internal/topo/node/sink_node_test.go index f9baa78fc6..fe3ab900fe 100644 --- a/internal/topo/node/sink_node_test.go +++ b/internal/topo/node/sink_node_test.go @@ -25,7 +25,6 @@ import ( "testing" "time" - "github.com/benbjohnson/clock" "github.com/stretchr/testify/assert" "github.com/lf-edge/ekuiper/internal/conf" @@ -41,55 +40,6 @@ func init() { testx.InitEnv("node") } -func TestBatchSink(t *testing.T) { - mc := conf.Clock.(*clock.Mock) - conf.InitConf() - transform.RegisterAdditionalFuncs() - tests := []struct { - config map[string]interface{} - data []map[string]interface{} - result [][]byte - }{ - { - config: map[string]interface{}{ - "batchSize": 2, - }, - data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}, {"ab": "hello3"}}, - result: [][]byte{[]byte(`[{"ab":"hello1"},{"ab":"hello2"}]`)}, - }, - { - config: map[string]interface{}{ - "lingerInterval": 1000, - }, - data: []map[string]interface{}{{"ab": "hello1"}, {"ab": "hello2"}, {"ab": "hello3"}}, - result: [][]byte{[]byte(`[{"ab":"hello1"},{"ab":"hello2"},{"ab":"hello3"}]`)}, - }, - } - fmt.Printf("The test bucket size is %d.\n\n", len(tests)) - contextLogger := conf.Log.WithField("rule", "TestBatchSink") - ctx := context.WithValue(context.Background(), context.LoggerKey, contextLogger) - - for i, tt := range tests { - mc.Set(mc.Now()) - mockSink := mocknode.NewMockSink() - s := NewSinkNodeWithSink("mockSink", mockSink, tt.config) - s.Open(ctx, make(chan error)) - s.input <- tt.data - for i := 0; i < 10; i++ { - mc.Add(1 * time.Second) - time.Sleep(10 * time.Millisecond) - // wait until mockSink get results - if len(mockSink.GetResults()) > 0 { - break - } - } - results := mockSink.GetResults() - if !reflect.DeepEqual(tt.result, results) { - t.Errorf("%d \tresult mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.result, results) - } - } -} - func TestSinkTemplate_Apply(t *testing.T) { conf.InitConf() transform.RegisterAdditionalFuncs() @@ -449,8 +399,7 @@ func TestConfig(t *testing.T) { contextLogger := conf.Log.WithField("rule", "TestConfig") conf.InitConf() for i, tt := range tests { - mockSink := NewSinkNode(fmt.Sprintf("test_%d", i), "mockSink", tt.config) - sconf, err := mockSink.parseConf(contextLogger) + sconf, err := ParseConf(contextLogger, tt.config) if !reflect.DeepEqual(tt.err, err) { t.Errorf("%d \terror mismatch:\n\nexp=%s\n\ngot=%s\n\n", i, tt.err, err) } else if !reflect.DeepEqual(tt.sconf, sconf) { diff --git a/internal/topo/planner/planner.go b/internal/topo/planner/planner.go index 469a534edf..ce0fc585dc 100644 --- a/internal/topo/planner/planner.go +++ b/internal/topo/planner/planner.go @@ -116,24 +116,9 @@ func createTopo(rule *api.Rule, lp LogicalPlan, mockSourcesProp map[string]map[s tp.AddSink(inputs, sink) } } else { - manager := io.GetManager() - for i, m := range rule.Actions { - for name, action := range m { - props, ok := action.(map[string]interface{}) - if !ok { - return nil, fmt.Errorf("expect map[string]interface{} type for the action properties, but found %v", action) - } - s, err := manager.Sink(name) - if err != nil { - return nil, err - } - if s != nil { - if err := s.Configure(props); err != nil { - return nil, err - } - } - tp.AddSink(inputs, node.NewSinkNode(fmt.Sprintf("%s_%d", name, i), name, props)) - } + err = buildActions(tp, rule, inputs) + if err != nil { + return nil, err } } diff --git a/internal/topo/planner/sink_planner.go b/internal/topo/planner/sink_planner.go new file mode 100644 index 0000000000..829a1614a0 --- /dev/null +++ b/internal/topo/planner/sink_planner.go @@ -0,0 +1,77 @@ +// Copyright 2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package planner + +import ( + "fmt" + + "github.com/lf-edge/ekuiper/internal/binder/io" + "github.com/lf-edge/ekuiper/internal/conf" + "github.com/lf-edge/ekuiper/internal/topo" + "github.com/lf-edge/ekuiper/internal/topo/node" + "github.com/lf-edge/ekuiper/pkg/api" +) + +// SinkPlanner is the planner for sink node. It transforms logical sink plan to multiple physical nodes. +// It will split the sink plan into multiple sink nodes according to its sink configurations. + +func buildActions(tp *topo.Topo, rule *api.Rule, inputs []api.Emitter) error { + for i, m := range rule.Actions { + for name, action := range m { + s, _ := io.Sink(name) + if s == nil { + return fmt.Errorf("sink %s is not defined", name) + } + props, ok := action.(map[string]any) + if !ok { + return fmt.Errorf("expect map[string]interface{} type for the action properties, but found %v", action) + } + commonConf, err := node.ParseConf(conf.Log, props) + if err != nil { + return fmt.Errorf("fail to parse sink configuration: %v", err) + } + // Split sink node + sinkName := fmt.Sprintf("%s_%d", name, i) + newInputs, err := splitSink(tp, inputs, sinkName, rule.Options, commonConf) + if err != nil { + return err + } + if s != nil { + if err = s.Configure(props); err != nil { + return err + } + } + tp.AddSink(newInputs, node.NewSinkNode(sinkName, name, props)) + } + } + return nil +} + +// Split sink node according to the sink configuration. Return the new input emitters. +func splitSink(tp *topo.Topo, inputs []api.Emitter, sinkName string, options *api.RuleOption, sc *node.SinkConf) ([]api.Emitter, error) { + index := 0 + newInputs := inputs + // Batch enabled + if sc.BatchSize > 0 || sc.LingerInterval > 0 { + batchOp, err := node.NewBatchOp(fmt.Sprintf("%s_%d_batch", sinkName, index), options, sc.BatchSize, sc.LingerInterval) + if err != nil { + return nil, err + } + index++ + tp.AddOperator(newInputs, batchOp) + newInputs = []api.Emitter{batchOp} + } + return newInputs, nil +} diff --git a/internal/topo/planner/sink_planner_test.go b/internal/topo/planner/sink_planner_test.go new file mode 100644 index 0000000000..169d1ff864 --- /dev/null +++ b/internal/topo/planner/sink_planner_test.go @@ -0,0 +1,168 @@ +// Copyright 2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package planner + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/lf-edge/ekuiper/internal/topo" + "github.com/lf-edge/ekuiper/internal/topo/node" + "github.com/lf-edge/ekuiper/pkg/api" + "github.com/lf-edge/ekuiper/pkg/ast" +) + +func TestSinkPlan(t *testing.T) { + tc := []struct { + name string + rule *api.Rule + topo *api.PrintableTopo + }{ + { + name: "normal sink plan", + rule: &api.Rule{ + Actions: []map[string]any{ + { + "log": map[string]any{}, + }, + }, + Options: defaultOption, + }, + topo: &api.PrintableTopo{ + Sources: []string{"source_src1"}, + Edges: map[string][]any{ + "source_src1": { + "sink_log_0", + }, + }, + }, + }, + { + name: "batch sink plan", + rule: &api.Rule{ + Actions: []map[string]any{ + { + "log": map[string]any{ + "batchSize": 10, + }, + }, + }, + Options: defaultOption, + }, + topo: &api.PrintableTopo{ + Sources: []string{"source_src1"}, + Edges: map[string][]any{ + "source_src1": { + "op_log_0_0_batch", + }, + "op_log_0_0_batch": { + "sink_log_0", + }, + }, + }, + }, + } + for _, c := range tc { + tp, err := topo.NewWithNameAndOptions("test", c.rule.Options) + assert.NoError(t, err) + n := node.NewSourceNode("src1", ast.TypeStream, nil, &ast.Options{ + DATASOURCE: "/feed", + TYPE: "httppull", + }, &api.RuleOption{SendError: false}, false, false, nil) + tp.AddSrc(n) + inputs := []api.Emitter{n} + err = buildActions(tp, c.rule, inputs) + assert.NoError(t, err) + assert.Equal(t, c.topo, tp.GetTopo()) + } +} + +func TestSinkPlanError(t *testing.T) { + tc := []struct { + name string + rule *api.Rule + err string + }{ + { + name: "invalid sink", + rule: &api.Rule{ + Actions: []map[string]any{ + { + "noexist": map[string]any{}, + }, + }, + Options: defaultOption, + }, + err: "sink noexist is not defined", + }, + { + name: "invalid action format", + rule: &api.Rule{ + Actions: []map[string]any{ + { + "log": 12, + }, + }, + Options: defaultOption, + }, + err: "expect map[string]interface{} type for the action properties, but found 12", + }, + { + name: "invalid batchSize", + rule: &api.Rule{ + Actions: []map[string]any{ + { + "log": map[string]any{ + "batchSize": -1, + }, + }, + }, + Options: defaultOption, + }, + err: "fail to parse sink configuration: invalid batchSize -1", + }, + { + name: "invalid lingerInterval", + rule: &api.Rule{ + Actions: []map[string]any{ + { + "log": map[string]any{ + "batchSize": 10, + "lingerInterval": -1, + }, + }, + }, + Options: defaultOption, + }, + err: "fail to parse sink configuration: invalid lingerInterval -1", + }, + } + for _, c := range tc { + t.Run(c.name, func(t *testing.T) { + tp, err := topo.NewWithNameAndOptions("test", c.rule.Options) + assert.NoError(t, err) + n := node.NewSourceNode("src1", ast.TypeStream, nil, &ast.Options{ + DATASOURCE: "/feed", + TYPE: "httppull", + }, &api.RuleOption{SendError: false}, false, false, nil) + tp.AddSrc(n) + inputs := []api.Emitter{n} + err = buildActions(tp, c.rule, inputs) + assert.Error(t, err) + assert.Equal(t, c.err, err.Error()) + }) + } +}