diff --git a/internal/pkg/store/encoding/encoding.go b/internal/pkg/store/encoding/encoding.go index 22b8eb9a54..850d90aaed 100644 --- a/internal/pkg/store/encoding/encoding.go +++ b/internal/pkg/store/encoding/encoding.go @@ -1,4 +1,4 @@ -// Copyright 2021 EMQ Technologies Co., Ltd. +// Copyright 2021-2023 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -17,13 +17,10 @@ package encoding import ( "bytes" "encoding/gob" - "time" ) func Encode(value interface{}) ([]byte, error) { var buff bytes.Buffer - gob.Register(time.Time{}) - gob.Register(value) enc := gob.NewEncoder(&buff) if err := enc.Encode(value); err != nil { return nil, err diff --git a/internal/pkg/store/redis/redisTs.go b/internal/pkg/store/redis/redisTs.go index 284ab35599..ddc7f87a7d 100644 --- a/internal/pkg/store/redis/redisTs.go +++ b/internal/pkg/store/redis/redisTs.go @@ -45,10 +45,6 @@ type ts struct { key string } -func init() { - gob.Register(make(map[string]interface{})) -} - func createRedisTs(redis *redis.Client, table string) (*ts, error) { key := fmt.Sprintf("%s:%s", TsPrefix, table) lastTs, err := getLast(redis, key, nil) diff --git a/internal/pkg/store/sql/sqlTs.go b/internal/pkg/store/sql/sqlTs.go index 93e6c77a02..2267a22c8a 100644 --- a/internal/pkg/store/sql/sqlTs.go +++ b/internal/pkg/store/sql/sqlTs.go @@ -1,4 +1,4 @@ -// Copyright 2022-2022 EMQ Technologies Co., Ltd. +// Copyright 2022-2023 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -29,10 +29,6 @@ type ts struct { last int64 } -func init() { - gob.Register(make(map[string]interface{})) -} - func createSqlTs(database Database, table string) (*ts, error) { store := &ts{ database: database, diff --git a/internal/topo/node/join_align_node.go b/internal/topo/node/join_align_node.go index 38dfa7df0c..b5fd5493a7 100644 --- a/internal/topo/node/join_align_node.go +++ b/internal/topo/node/join_align_node.go @@ -29,13 +29,13 @@ type JoinAlignNode struct { *defaultSinkNode statManager metric.StatManager // states - batch map[string][]xsql.TupleRow + batch map[string][]*xsql.Tuple } const BatchKey = "$$batchInputs" func NewJoinAlignNode(name string, emitters []string, options *api.RuleOption) (*JoinAlignNode, error) { - batch := make(map[string][]xsql.TupleRow, len(emitters)) + batch := make(map[string][]*xsql.Tuple, len(emitters)) for _, e := range emitters { batch[e] = nil } @@ -73,7 +73,7 @@ func (n *JoinAlignNode) Exec(ctx api.StreamContext, errCh chan<- error) { // restore batch state if s, err := ctx.GetState(BatchKey); err == nil { switch st := s.(type) { - case map[string][]xsql.TupleRow: + case map[string][]*xsql.Tuple: n.batch = st log.Infof("Restore batch state %+v", st) case nil: @@ -85,7 +85,7 @@ func (n *JoinAlignNode) Exec(ctx api.StreamContext, errCh chan<- error) { log.Warnf("Restore batch state fails: %s", err) } if n.batch == nil { - n.batch = make(map[string][]xsql.TupleRow) + n.batch = make(map[string][]*xsql.Tuple) } for { @@ -111,11 +111,7 @@ func (n *JoinAlignNode) Exec(ctx api.StreamContext, errCh chan<- error) { _ = n.Broadcast(d) case *xsql.Tuple: log.Debugf("JoinAlignNode receive tuple input %s", d) - temp := &xsql.WindowTuples{ - Content: make([]xsql.TupleRow, 0), - } - temp = temp.AddTuple(d) - n.alignBatch(ctx, temp) + n.alignBatch(ctx, d) case *xsql.WindowTuples: if d.WindowRange != nil { // real window log.Debugf("JoinAlignNode receive window input %s", d) @@ -131,7 +127,7 @@ func (n *JoinAlignNode) Exec(ctx api.StreamContext, errCh chan<- error) { n.statManager.IncTotalExceptions(e.Error()) break } - n.batch[emitter] = d.Content + n.batch[emitter] = convertToTupleSlice(d.Content) _ = ctx.PutState(BatchKey, n.batch) } default: @@ -151,14 +147,33 @@ func (n *JoinAlignNode) Exec(ctx api.StreamContext, errCh chan<- error) { }() } -func (n *JoinAlignNode) alignBatch(_ api.StreamContext, w *xsql.WindowTuples) { +func convertToTupleSlice(content []xsql.TupleRow) []*xsql.Tuple { + tuples := make([]*xsql.Tuple, len(content)) + for i, v := range content { + tuples[i] = v.(*xsql.Tuple) + } + return tuples +} + +func (n *JoinAlignNode) alignBatch(_ api.StreamContext, input any) { n.statManager.ProcessTimeStart() - for _, v := range n.batch { - if v != nil { - w.Content = append(w.Content, v...) + var w *xsql.WindowTuples + switch t := input.(type) { + case *xsql.Tuple: + w = &xsql.WindowTuples{ + Content: make([]xsql.TupleRow, 0), + } + w.AddTuple(t) + case *xsql.WindowTuples: + w = t + } + for _, contents := range n.batch { + if contents != nil { + for _, v := range contents { + w = w.AddTuple(v) + } } } - _ = n.Broadcast(w) n.statManager.ProcessTimeEnd() n.statManager.IncTotalRecordsOut() diff --git a/internal/topo/operator/table_processor.go b/internal/topo/operator/table_processor.go index 1a001cee4f..3555cbfe8d 100644 --- a/internal/topo/operator/table_processor.go +++ b/internal/topo/operator/table_processor.go @@ -1,4 +1,4 @@ -// Copyright 2021-2022 EMQ Technologies Co., Ltd. +// Copyright 2021-2023 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -32,7 +32,7 @@ type TableProcessor struct { emitterName string // States output *xsql.WindowTuples // current batched message collection - batchEmitted bool // if batch input, this is the signal for whether the last batch has emitted. If true, reinitialize. + batchEmitted bool // If batch input, this is the signal for whether the last batch has emitted. If true, reinitialize. } func NewTableProcessor(isSchemaless bool, name string, fields map[string]*ast.JsonStreamField, options *ast.Options) (*TableProcessor, error) { @@ -57,7 +57,7 @@ func NewTableProcessor(isSchemaless bool, name string, fields map[string]*ast.Js // // input: *xsql.Tuple or BatchCount // output: WindowTuples -func (p *TableProcessor) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} { +func (p *TableProcessor) Apply(ctx api.StreamContext, data interface{}, _ *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} { logger := ctx.GetLogger() tuple, ok := data.(*xsql.Tuple) if !ok { diff --git a/internal/topo/topotest/checkpoint_test.go b/internal/topo/topotest/checkpoint_test.go index 7d48b1e013..e87baeb66b 100644 --- a/internal/topo/topotest/checkpoint_test.go +++ b/internal/topo/topotest/checkpoint_test.go @@ -1,4 +1,4 @@ -// Copyright 2021-2022 EMQ Technologies Co., Ltd. +// Copyright 2021-2023 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -123,3 +123,98 @@ func TestCheckpoint(t *testing.T) { DoCheckpointRuleTest(t, tests, j, opt) } } + +func TestTableJoinCheckpoint(t *testing.T) { + conf.IsTesting = true + streamList := []string{"demo", "table1"} + HandleStream(false, streamList, t) + tests := []RuleCheckpointTest{ + { + RuleTest: RuleTest{ + Name: `TestCheckpointRule2`, + Sql: `SELECT * FROM demo INNER JOIN table1 on demo.ts = table1.id`, + R: [][]map[string]interface{}{ + {{ + "id": float64(1541152486013), + "name": "name1", + "color": "red", + "size": float64(3), + "ts": float64(1541152486013), + }}, + {{ + "id": float64(1541152487632), + "name": "name2", + "color": "blue", + "size": float64(2), + "ts": float64(1541152487632), + }}, + {{ + "id": float64(1541152487632), + "name": "name2", + "color": "blue", + "size": float64(2), + "ts": float64(1541152487632), + }}, + {{ + "id": float64(1541152489252), + "name": "name3", + "color": "red", + "size": float64(1), + "ts": float64(1541152489252), + }}, + }, + M: map[string]interface{}{ + "op_3_join_aligner_0_records_in_total": int64(4), + "op_3_join_aligner_0_records_out_total": int64(3), + + "op_4_join_0_exceptions_total": int64(0), + "op_4_join_0_records_in_total": int64(3), + "op_4_join_0_records_out_total": int64(2), + + "op_5_project_0_exceptions_total": int64(0), + "op_5_project_0_records_in_total": int64(2), + "op_5_project_0_records_out_total": int64(2), + + "sink_mockSink_0_exceptions_total": int64(0), + "sink_mockSink_0_records_in_total": int64(2), + "sink_mockSink_0_records_out_total": int64(2), + + "source_demo_0_exceptions_total": int64(0), + "source_demo_0_records_in_total": int64(3), + "source_demo_0_records_out_total": int64(3), + + "source_table1_0_exceptions_total": int64(0), + "source_table1_0_records_in_total": int64(4), + "source_table1_0_records_out_total": int64(1), + }, + }, + PauseSize: 3, + Cc: 2, + PauseMetric: map[string]interface{}{ + "sink_mockSink_0_exceptions_total": int64(0), + "sink_mockSink_0_records_in_total": int64(2), + "sink_mockSink_0_records_out_total": int64(2), + + "source_demo_0_exceptions_total": int64(0), + "source_demo_0_records_in_total": int64(3), + "source_demo_0_records_out_total": int64(3), + + "source_table1_0_exceptions_total": int64(0), + "source_table1_0_records_in_total": int64(4), + "source_table1_0_records_out_total": int64(1), + }, + }, + } + HandleStream(true, streamList, t) + options := []*api.RuleOption{ + { + BufferLength: 100, + Qos: api.AtLeastOnce, + CheckpointInterval: 600, + SendError: true, + }, + } + for j, opt := range options { + DoCheckpointRuleTest(t, tests, j, opt) + } +} diff --git a/internal/xsql/gob_register.go b/internal/xsql/gob_register.go new file mode 100644 index 0000000000..9cdf43395c --- /dev/null +++ b/internal/xsql/gob_register.go @@ -0,0 +1,27 @@ +// Copyright 2023 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package xsql + +import ( + "encoding/gob" + "time" +) + +func init() { + gob.Register(time.Time{}) + gob.Register(make(map[string]interface{})) + gob.Register(make(map[string][]*Tuple)) + gob.Register(Tuple{}) +} diff --git a/internal/xsql/row.go b/internal/xsql/row.go index 47a7edccba..61bf2a0c01 100644 --- a/internal/xsql/row.go +++ b/internal/xsql/row.go @@ -26,7 +26,7 @@ import ( // The tuple clone should be cheap. /* - * Interfaces definition + * Interfaces definition */ type Wildcarder interface { diff --git a/sdk/go/context/default.go b/sdk/go/context/default.go index bd5fc65c48..3c72f7afa1 100644 --- a/sdk/go/context/default.go +++ b/sdk/go/context/default.go @@ -1,4 +1,4 @@ -// Copyright 2021 EMQ Technologies Co., Ltd. +// Copyright 2021-2023 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -46,7 +46,7 @@ func WithValue(parent *DefaultContext, key, val interface{}) *DefaultContext { return parent } -// Implement context interface +// Deadline Implement context interface func (c *DefaultContext) Deadline() (deadline time.Time, ok bool) { return c.ctx.Deadline() }