Skip to content

Commit

Permalink
fix: scan table join checkpoint (#2154)
Browse files Browse the repository at this point in the history
Scan table saves state data with an interface type which cannot encode/decode by gob

Signed-off-by: Jiyong Huang <[email protected]>
  • Loading branch information
ngjaying committed Aug 4, 2023
1 parent c72afec commit 94283f3
Show file tree
Hide file tree
Showing 9 changed files with 161 additions and 35 deletions.
5 changes: 1 addition & 4 deletions internal/pkg/store/encoding/encoding.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 EMQ Technologies Co., Ltd.
// Copyright 2021-2023 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -17,13 +17,10 @@ package encoding
import (
"bytes"
"encoding/gob"
"time"
)

func Encode(value interface{}) ([]byte, error) {
var buff bytes.Buffer
gob.Register(time.Time{})
gob.Register(value)
enc := gob.NewEncoder(&buff)
if err := enc.Encode(value); err != nil {
return nil, err
Expand Down
4 changes: 0 additions & 4 deletions internal/pkg/store/redis/redisTs.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ type ts struct {
key string
}

func init() {
gob.Register(make(map[string]interface{}))
}

func createRedisTs(redis *redis.Client, table string) (*ts, error) {
key := fmt.Sprintf("%s:%s", TsPrefix, table)
lastTs, err := getLast(redis, key, nil)
Expand Down
6 changes: 1 addition & 5 deletions internal/pkg/store/sql/sqlTs.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2022 EMQ Technologies Co., Ltd.
// Copyright 2022-2023 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,10 +29,6 @@ type ts struct {
last int64
}

func init() {
gob.Register(make(map[string]interface{}))
}

func createSqlTs(database Database, table string) (*ts, error) {
store := &ts{
database: database,
Expand Down
45 changes: 30 additions & 15 deletions internal/topo/node/join_align_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ type JoinAlignNode struct {
*defaultSinkNode
statManager metric.StatManager
// states
batch map[string][]xsql.TupleRow
batch map[string][]*xsql.Tuple
}

const BatchKey = "$$batchInputs"

func NewJoinAlignNode(name string, emitters []string, options *api.RuleOption) (*JoinAlignNode, error) {
batch := make(map[string][]xsql.TupleRow, len(emitters))
batch := make(map[string][]*xsql.Tuple, len(emitters))
for _, e := range emitters {
batch[e] = nil
}
Expand Down Expand Up @@ -73,7 +73,7 @@ func (n *JoinAlignNode) Exec(ctx api.StreamContext, errCh chan<- error) {
// restore batch state
if s, err := ctx.GetState(BatchKey); err == nil {
switch st := s.(type) {
case map[string][]xsql.TupleRow:
case map[string][]*xsql.Tuple:
n.batch = st
log.Infof("Restore batch state %+v", st)
case nil:
Expand All @@ -85,7 +85,7 @@ func (n *JoinAlignNode) Exec(ctx api.StreamContext, errCh chan<- error) {
log.Warnf("Restore batch state fails: %s", err)
}
if n.batch == nil {
n.batch = make(map[string][]xsql.TupleRow)
n.batch = make(map[string][]*xsql.Tuple)
}

for {
Expand All @@ -111,11 +111,7 @@ func (n *JoinAlignNode) Exec(ctx api.StreamContext, errCh chan<- error) {
_ = n.Broadcast(d)
case *xsql.Tuple:
log.Debugf("JoinAlignNode receive tuple input %s", d)
temp := &xsql.WindowTuples{
Content: make([]xsql.TupleRow, 0),
}
temp = temp.AddTuple(d)
n.alignBatch(ctx, temp)
n.alignBatch(ctx, d)
case *xsql.WindowTuples:
if d.WindowRange != nil { // real window
log.Debugf("JoinAlignNode receive window input %s", d)
Expand All @@ -131,7 +127,7 @@ func (n *JoinAlignNode) Exec(ctx api.StreamContext, errCh chan<- error) {
n.statManager.IncTotalExceptions(e.Error())
break
}
n.batch[emitter] = d.Content
n.batch[emitter] = convertToTupleSlice(d.Content)
_ = ctx.PutState(BatchKey, n.batch)
}
default:
Expand All @@ -151,14 +147,33 @@ func (n *JoinAlignNode) Exec(ctx api.StreamContext, errCh chan<- error) {
}()
}

func (n *JoinAlignNode) alignBatch(_ api.StreamContext, w *xsql.WindowTuples) {
func convertToTupleSlice(content []xsql.TupleRow) []*xsql.Tuple {
tuples := make([]*xsql.Tuple, len(content))
for i, v := range content {
tuples[i] = v.(*xsql.Tuple)
}
return tuples
}

func (n *JoinAlignNode) alignBatch(_ api.StreamContext, input any) {
n.statManager.ProcessTimeStart()
for _, v := range n.batch {
if v != nil {
w.Content = append(w.Content, v...)
var w *xsql.WindowTuples
switch t := input.(type) {
case *xsql.Tuple:
w = &xsql.WindowTuples{
Content: make([]xsql.TupleRow, 0),
}
w.AddTuple(t)
case *xsql.WindowTuples:
w = t
}
for _, contents := range n.batch {
if contents != nil {
for _, v := range contents {
w = w.AddTuple(v)
}
}
}

_ = n.Broadcast(w)
n.statManager.ProcessTimeEnd()
n.statManager.IncTotalRecordsOut()
Expand Down
6 changes: 3 additions & 3 deletions internal/topo/operator/table_processor.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2022 EMQ Technologies Co., Ltd.
// Copyright 2021-2023 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,7 +32,7 @@ type TableProcessor struct {
emitterName string
// States
output *xsql.WindowTuples // current batched message collection
batchEmitted bool // if batch input, this is the signal for whether the last batch has emitted. If true, reinitialize.
batchEmitted bool // If batch input, this is the signal for whether the last batch has emitted. If true, reinitialize.
}

func NewTableProcessor(isSchemaless bool, name string, fields map[string]*ast.JsonStreamField, options *ast.Options) (*TableProcessor, error) {
Expand All @@ -57,7 +57,7 @@ func NewTableProcessor(isSchemaless bool, name string, fields map[string]*ast.Js
//
// input: *xsql.Tuple or BatchCount
// output: WindowTuples
func (p *TableProcessor) Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
func (p *TableProcessor) Apply(ctx api.StreamContext, data interface{}, _ *xsql.FunctionValuer, _ *xsql.AggregateFunctionValuer) interface{} {
logger := ctx.GetLogger()
tuple, ok := data.(*xsql.Tuple)
if !ok {
Expand Down
97 changes: 96 additions & 1 deletion internal/topo/topotest/checkpoint_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2022 EMQ Technologies Co., Ltd.
// Copyright 2021-2023 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -123,3 +123,98 @@ func TestCheckpoint(t *testing.T) {
DoCheckpointRuleTest(t, tests, j, opt)
}
}

func TestTableJoinCheckpoint(t *testing.T) {
conf.IsTesting = true
streamList := []string{"demo", "table1"}
HandleStream(false, streamList, t)
tests := []RuleCheckpointTest{
{
RuleTest: RuleTest{
Name: `TestCheckpointRule2`,
Sql: `SELECT * FROM demo INNER JOIN table1 on demo.ts = table1.id`,
R: [][]map[string]interface{}{
{{
"id": float64(1541152486013),
"name": "name1",
"color": "red",
"size": float64(3),
"ts": float64(1541152486013),
}},
{{
"id": float64(1541152487632),
"name": "name2",
"color": "blue",
"size": float64(2),
"ts": float64(1541152487632),
}},
{{
"id": float64(1541152487632),
"name": "name2",
"color": "blue",
"size": float64(2),
"ts": float64(1541152487632),
}},
{{
"id": float64(1541152489252),
"name": "name3",
"color": "red",
"size": float64(1),
"ts": float64(1541152489252),
}},
},
M: map[string]interface{}{
"op_3_join_aligner_0_records_in_total": int64(4),
"op_3_join_aligner_0_records_out_total": int64(3),

"op_4_join_0_exceptions_total": int64(0),
"op_4_join_0_records_in_total": int64(3),
"op_4_join_0_records_out_total": int64(2),

"op_5_project_0_exceptions_total": int64(0),
"op_5_project_0_records_in_total": int64(2),
"op_5_project_0_records_out_total": int64(2),

"sink_mockSink_0_exceptions_total": int64(0),
"sink_mockSink_0_records_in_total": int64(2),
"sink_mockSink_0_records_out_total": int64(2),

"source_demo_0_exceptions_total": int64(0),
"source_demo_0_records_in_total": int64(3),
"source_demo_0_records_out_total": int64(3),

"source_table1_0_exceptions_total": int64(0),
"source_table1_0_records_in_total": int64(4),
"source_table1_0_records_out_total": int64(1),
},
},
PauseSize: 3,
Cc: 2,
PauseMetric: map[string]interface{}{
"sink_mockSink_0_exceptions_total": int64(0),
"sink_mockSink_0_records_in_total": int64(2),
"sink_mockSink_0_records_out_total": int64(2),

"source_demo_0_exceptions_total": int64(0),
"source_demo_0_records_in_total": int64(3),
"source_demo_0_records_out_total": int64(3),

"source_table1_0_exceptions_total": int64(0),
"source_table1_0_records_in_total": int64(4),
"source_table1_0_records_out_total": int64(1),
},
},
}
HandleStream(true, streamList, t)
options := []*api.RuleOption{
{
BufferLength: 100,
Qos: api.AtLeastOnce,
CheckpointInterval: 600,
SendError: true,
},
}
for j, opt := range options {
DoCheckpointRuleTest(t, tests, j, opt)
}
}
27 changes: 27 additions & 0 deletions internal/xsql/gob_register.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2023 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package xsql

import (
"encoding/gob"
"time"
)

func init() {
gob.Register(time.Time{})
gob.Register(make(map[string]interface{}))
gob.Register(make(map[string][]*Tuple))
gob.Register(Tuple{})
}
2 changes: 1 addition & 1 deletion internal/xsql/row.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
// The tuple clone should be cheap.

/*
* Interfaces definition
* Interfaces definition
*/

type Wildcarder interface {
Expand Down
4 changes: 2 additions & 2 deletions sdk/go/context/default.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 EMQ Technologies Co., Ltd.
// Copyright 2021-2023 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -46,7 +46,7 @@ func WithValue(parent *DefaultContext, key, val interface{}) *DefaultContext {
return parent
}

// Implement context interface
// Deadline Implement context interface
func (c *DefaultContext) Deadline() (deadline time.Time, ok bool) {
return c.ctx.Deadline()
}
Expand Down

0 comments on commit 94283f3

Please sign in to comment.