Skip to content

Commit

Permalink
refactor: split shared source schema support (#2678)
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <[email protected]>
  • Loading branch information
ngjaying authored Mar 7, 2024
1 parent 60780fa commit 0ca7c72
Show file tree
Hide file tree
Showing 8 changed files with 227 additions and 16 deletions.
2 changes: 1 addition & 1 deletion internal/converter/converter.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2023 EMQ Technologies Co., Ltd.
// Copyright 2022-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
21 changes: 21 additions & 0 deletions internal/topo/node/decode_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package node
import (
"fmt"

"github.com/lf-edge/ekuiper/internal/conf"
"github.com/lf-edge/ekuiper/internal/converter"
"github.com/lf-edge/ekuiper/internal/topo/node/metric"
"github.com/lf-edge/ekuiper/internal/xsql"
Expand All @@ -31,6 +32,24 @@ type DecodeOp struct {
converter message.Converter
}

func (o *DecodeOp) AttachSchema(ctx api.StreamContext, dataSource string, schema map[string]*ast.JsonStreamField, isWildcard bool) {
ctx.GetLogger().Infof("attach schema to shared stream")
if fastDecoder, ok := o.converter.(message.SchemaMergeAbleConverter); ok {
if err := fastDecoder.MergeSchema(ctx.GetRuleId(), dataSource, schema, isWildcard); err != nil {
ctx.GetLogger().Warnf("merge schema to shared stream failed, err: %v", err)
}
}
}

func (o *DecodeOp) DetachSchema(ruleId string) {
conf.Log.Infof("detach schema for shared stream rule %v", ruleId)
if fastDecoder, ok := o.converter.(message.SchemaMergeAbleConverter); ok {
if err := fastDecoder.DetachSchema(ruleId); err != nil {
conf.Log.Warnf("detach schema for shared stream rule %v failed, err:%v", ruleId, err)
}
}
}

func NewDecodeOp(name, StreamName string, ruleId string, rOpt *api.RuleOption, options *ast.Options, isWildcard, isSchemaless bool, schema map[string]*ast.JsonStreamField) (*DecodeOp, error) {
options.Schema = nil
options.IsWildCard = isWildcard
Expand Down Expand Up @@ -116,3 +135,5 @@ func (o *DecodeOp) toTuple(v map[string]any, d *xsql.Tuple) *xsql.Tuple {
Emitter: d.Emitter,
}
}

var _ SchemaNode = &DecodeOp{}
112 changes: 112 additions & 0 deletions internal/topo/node/decode_op_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,118 @@ func BenchmarkThrougput(b *testing.B) {
}
}

func TestJSONWithSchema(t *testing.T) {
tests := []struct {
name string
concurrency int
schema map[string]*ast.JsonStreamField
}{
{"single", 1, map[string]*ast.JsonStreamField{
"b": {
Type: "float",
},
}},
{"multi", 10, map[string]*ast.JsonStreamField{
"b": {
Type: "float",
},
}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
op, err := NewDecodeOp("test", "streamName", "test1", &api.RuleOption{BufferLength: 10, SendError: true, Concurrency: tt.concurrency}, &ast.Options{FORMAT: "json", SHARED: true}, false, false, map[string]*ast.JsonStreamField{
"a": {
Type: "bigint",
},
})
assert.NoError(t, err)
out := make(chan any, 100)
err = op.AddOutput(out, "test")
assert.NoError(t, err)
ctx := mockContext.NewMockContext("test1", "decode_test")
errCh := make(chan error)
op.Exec(ctx, errCh)

cases := []any{
&xsql.Tuple{Emitter: "test", Raw: []byte("{\"a\":1,\"b\":2}"), Timestamp: 111, Metadata: map[string]any{"topic": "demo", "qos": 1}},
&xsql.Tuple{Emitter: "test", Raw: []byte("[{\"a\":1,\"b\":2},{\"a\":3,\"b\":4,\"c\":\"hello\"}]"), Timestamp: 111, Metadata: map[string]any{"topic": "demo", "qos": 1}},
}
expects := [][]any{
{&xsql.Tuple{Emitter: "test", Message: map[string]interface{}{"a": int64(1)}, Timestamp: 111, Metadata: map[string]any{"topic": "demo", "qos": 1}}},
{
&xsql.Tuple{Emitter: "test", Message: map[string]interface{}{"a": int64(1)}, Timestamp: 111, Metadata: map[string]any{"topic": "demo", "qos": 1}},
&xsql.Tuple{Emitter: "test", Message: map[string]interface{}{"a": int64(3)}, Timestamp: 111, Metadata: map[string]any{"topic": "demo", "qos": 1}},
},
}

for i, c := range cases {
op.input <- c
for _, e := range expects[i] {
r := <-out
switch tr := r.(type) {
case error:
assert.EqualError(t, e.(error), tr.Error())
default:
assert.Equal(t, e, r)
}
}
}

nctx := mockContext.NewMockContext("test2", "decode_test")
op.AttachSchema(nctx, "streamName", tt.schema, false)
cases = []any{
&xsql.Tuple{Emitter: "test", Raw: []byte("{\"a\":1,\"b\":2}"), Timestamp: 111, Metadata: map[string]any{"topic": "demo", "qos": 1}},
&xsql.Tuple{Emitter: "test", Raw: []byte("[{\"a\":1,\"b\":2},{\"a\":3,\"b\":4,\"c\":\"hello\"}]"), Timestamp: 111, Metadata: map[string]any{"topic": "demo", "qos": 1}},
}
expectsWithSchema := [][]any{
{&xsql.Tuple{Emitter: "test", Message: map[string]interface{}{"a": int64(1), "b": 2.0}, Timestamp: 111, Metadata: map[string]any{"topic": "demo", "qos": 1}}},
{
&xsql.Tuple{Emitter: "test", Message: map[string]interface{}{"a": int64(1), "b": 2.0}, Timestamp: 111, Metadata: map[string]any{"topic": "demo", "qos": 1}},
&xsql.Tuple{Emitter: "test", Message: map[string]interface{}{"a": int64(3), "b": 4.0}, Timestamp: 111, Metadata: map[string]any{"topic": "demo", "qos": 1}},
},
}

for i, c := range cases {
op.input <- c
for _, e := range expectsWithSchema[i] {
r := <-out
switch tr := r.(type) {
case error:
assert.EqualError(t, e.(error), tr.Error())
default:
assert.Equal(t, e, r)
}
}
}

op.DetachSchema(ctx.GetRuleId())
cases = []any{
&xsql.Tuple{Emitter: "test", Raw: []byte("{\"a\":1,\"b\":2}"), Timestamp: 111, Metadata: map[string]any{"topic": "demo", "qos": 1}},
&xsql.Tuple{Emitter: "test", Raw: []byte("[{\"a\":1,\"b\":2},{\"a\":3,\"b\":4,\"c\":\"hello\"}]"), Timestamp: 111, Metadata: map[string]any{"topic": "demo", "qos": 1}},
}
expects = [][]any{
{&xsql.Tuple{Emitter: "test", Message: map[string]interface{}{"b": 2.0}, Timestamp: 111, Metadata: map[string]any{"topic": "demo", "qos": 1}}},
{
&xsql.Tuple{Emitter: "test", Message: map[string]interface{}{"b": 2.0}, Timestamp: 111, Metadata: map[string]any{"topic": "demo", "qos": 1}},
&xsql.Tuple{Emitter: "test", Message: map[string]interface{}{"b": 4.0}, Timestamp: 111, Metadata: map[string]any{"topic": "demo", "qos": 1}},
},
}
for i, c := range cases {
op.input <- c
for _, e := range expects[i] {
r := <-out
switch tr := r.(type) {
case error:
assert.EqualError(t, e.(error), tr.Error())
default:
assert.Equal(t, e, r)
}
}
}
})
}
}

func TestValidate(t *testing.T) {
_, err := NewDecodeOp("test", "streamName", "test1", &api.RuleOption{BufferLength: 10, SendError: true}, &ast.Options{FORMAT: "cann"}, false, true, nil)
assert.Error(t, err)
Expand Down
8 changes: 8 additions & 0 deletions internal/topo/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/lf-edge/ekuiper/internal/topo/node/metric"
"github.com/lf-edge/ekuiper/internal/xsql"
"github.com/lf-edge/ekuiper/pkg/api"
"github.com/lf-edge/ekuiper/pkg/ast"
"github.com/lf-edge/ekuiper/pkg/cast"
)

Expand All @@ -39,6 +40,13 @@ type OperatorNode interface {
RemoveMetrics(name string)
}

type SchemaNode interface {
// AttachSchema attach the schema to the node. The parameters are ruleId, sourceName, schema, whether is wildcard
AttachSchema(api.StreamContext, string, map[string]*ast.JsonStreamField, bool)
// DetachSchema detach the schema from the node. The parameters are ruleId
DetachSchema(string)
}

type DataSourceNode interface {
api.Emitter
Open(ctx api.StreamContext, errCh chan<- error)
Expand Down
29 changes: 18 additions & 11 deletions internal/topo/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,11 +385,18 @@ type SourcePropsForSplit struct {
}

func splitSource(t *DataSourcePlan, ss api.SourceConnector, options *api.RuleOption, index int, ruleId string, pp node.UnOperation) (node.DataSourceNode, []node.OperatorNode, int, error) {
if t.streamStmt.Options.SHARED {
srcSubtopo, existed := topo.GetSubTopo(string(t.name))
if existed {
srcSubtopo.StoreSchema(ruleId, string(t.name), t.streamFields, t.isWildCard)
return srcSubtopo, nil, srcSubtopo.OpsCount(), nil
}
}

// Get all props
props := nodeConf.GetSourceConf(t.streamStmt.Options.TYPE, t.streamStmt.Options)
sp := &SourcePropsForSplit{}
_ = cast.MapToStruct(props, sp)

// Create the connector node as source node
var (
err error
Expand Down Expand Up @@ -441,16 +448,16 @@ func splitSource(t *DataSourcePlan, ss api.SourceConnector, options *api.RuleOpt
}

if t.streamStmt.Options.SHARED && len(ops) > 0 {
srcSubtopo, existed := topo.GetSubTopo(string(t.name))
if !existed {
conf.Log.Infof("Create SubTopo %s", string(t.name))
srcSubtopo.AddSrc(srcConnNode)
subInputs := []api.Emitter{srcSubtopo}
for _, e := range ops {
srcSubtopo.AddOperator(subInputs, e)
subInputs = []api.Emitter{e}
}
}
// should not exist
srcSubtopo, _ := topo.GetSubTopo(string(t.name))
conf.Log.Infof("Create SubTopo %s", string(t.name))
srcSubtopo.AddSrc(srcConnNode)
subInputs := []api.Emitter{srcSubtopo}
for _, e := range ops {
srcSubtopo.AddOperator(subInputs, e)
subInputs = []api.Emitter{e}
}
srcSubtopo.StoreSchema(ruleId, string(t.name), t.streamFields, t.isWildCard)
return srcSubtopo, nil, len(ops), nil
}
return srcConnNode, ops, 0, nil
Expand Down
37 changes: 37 additions & 0 deletions internal/topo/subtopo.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,16 @@ import (
"github.com/lf-edge/ekuiper/internal/topo/node/metric"
"github.com/lf-edge/ekuiper/internal/topo/state"
"github.com/lf-edge/ekuiper/pkg/api"
"github.com/lf-edge/ekuiper/pkg/ast"
"github.com/lf-edge/ekuiper/pkg/infra"
)

type schemainfo struct {
datasource string
schema map[string]*ast.JsonStreamField
isWildcard bool
}

// SrcSubTopo Implements node.SourceNode
type SrcSubTopo struct {
name string
Expand All @@ -40,6 +47,9 @@ type SrcSubTopo struct {
ops []node.OperatorNode
tail api.Emitter
topo *api.PrintableTopo
// Save the schemainfo for each rule only to use when need to attach schema when the rule is starting.
// Get updated if the rule is updated. Never delete it until the subtopo is deleted.
schemaReg map[string]schemainfo

// runtime state
// Ref state, affect the pool. Update when rule created or stopped
Expand All @@ -60,6 +70,16 @@ func (s *SrcSubTopo) Open(ctx api.StreamContext, parentErrCh chan<- error) {
s.refCount.Add(1)
ctx.GetLogger().Infof("Sub topo %s opened by rule %s with %d ref", s.name, ctx.GetRuleId(), s.refCount.Load())
}
// Attach schemas
for _, op := range s.ops {
if so, ok := op.(node.SchemaNode); ok {
si, hasSchema := s.schemaReg[ctx.GetRuleId()]
if hasSchema {
ctx.GetLogger().Infof("attach schema to op %s", op.GetName())
so.AttachSchema(ctx, si.datasource, si.schema, si.isWildcard)
}
}
}
// If not opened yet, open it. It may be opened before, but failed to open. In this case, try to open it again.
if s.opened.CompareAndSwap(false, true) {
poe := infra.SafeRun(func() error {
Expand Down Expand Up @@ -138,6 +158,18 @@ func (s *SrcSubTopo) GetMetrics() []any {
return result
}

func (s *SrcSubTopo) OpsCount() int {
return len(s.ops)
}

func (s *SrcSubTopo) StoreSchema(ruleID, dataSource string, schema map[string]*ast.JsonStreamField, isWildCard bool) {
s.schemaReg[ruleID] = schemainfo{
datasource: dataSource,
schema: schema,
isWildcard: isWildCard,
}
}

func (s *SrcSubTopo) Close(ruleId string) {
if _, ok := s.refRules.LoadAndDelete(ruleId); ok {
s.refCount.Add(-1)
Expand All @@ -147,6 +179,11 @@ func (s *SrcSubTopo) Close(ruleId string) {
}
RemoveSubTopo(s.name)
}
for _, op := range s.ops {
if so, ok := op.(node.SchemaNode); ok {
so.DetachSchema(ruleId)
}
}
}
}

Expand Down
1 change: 1 addition & 0 deletions internal/topo/subtopo_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func GetSubTopo(name string) (*SrcSubTopo, bool) {
Sources: make([]string, 0),
Edges: make(map[string][]any),
},
schemaReg: make(map[string]schemainfo),
})
return ac.(*SrcSubTopo), ok
}
Expand Down
33 changes: 29 additions & 4 deletions internal/topo/subtopo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/lf-edge/ekuiper/internal/topo/checkpoint"
"github.com/lf-edge/ekuiper/internal/topo/node"
"github.com/lf-edge/ekuiper/pkg/api"
"github.com/lf-edge/ekuiper/pkg/ast"
)

func TestSubtopoLC(t *testing.T) {
Expand All @@ -37,10 +38,18 @@ func TestSubtopoLC(t *testing.T) {
opNode := &mockOp{name: "op1", ch: make(chan any)}
subTopo.AddSrc(srcNode)
subTopo.AddOperator([]api.Emitter{srcNode}, opNode)
subTopo.StoreSchema("rule1", "shared", map[string]*ast.JsonStreamField{
"field1": {Type: "string"},
}, false)
assert.Equal(t, 1, mlen(&subTopoPool))
assert.Equal(t, srcNode, subTopo.GetSource())
assert.Equal(t, []node.OperatorNode{opNode}, subTopo.ops)
assert.Equal(t, opNode, subTopo.tail)
assert.Equal(t, 1, subTopo.OpsCount())
assert.Equal(t, 1, len(subTopo.schemaReg))
assert.Equal(t, schemainfo{"shared", map[string]*ast.JsonStreamField{
"field1": {Type: "string"},
}, false}, subTopo.schemaReg["rule1"])
// Test linkage
assert.Equal(t, 1, len(srcNode.outputs))
var tch chan<- any = opNode.ch
Expand All @@ -55,12 +64,17 @@ func TestSubtopoLC(t *testing.T) {
// Test run
subTopo.Open(mockContext.NewMockContext("rule1", "abc"), make(chan error))
assert.Equal(t, int32(1), subTopo.refCount.Load())
assert.Equal(t, 1, opNode.schemaCount)
// Run another
subTopo2, existed := GetSubTopo("shared")
assert.True(t, existed)
assert.Equal(t, subTopo, subTopo2)
subTopo.StoreSchema("rule2", "shared", map[string]*ast.JsonStreamField{
"field2": {Type: "string"},
}, false)
subTopo2.Open(mockContext.NewMockContext("rule2", "abc"), make(chan error))
assert.Equal(t, int32(2), subTopo.refCount.Load())
assert.Equal(t, 2, opNode.schemaCount)
// Metrics test
metrics := []any{0, 0, 0, 0, 0, 0, 0, "", 0, 0, 0, 0, 0, 0, 0, 0, "", 0}
assert.Equal(t, metrics, subTopo.GetMetrics())
Expand Down Expand Up @@ -97,6 +111,8 @@ func TestSubtopoLC(t *testing.T) {
subTopo2.Close("rule2")
assert.Equal(t, int32(0), subTopo.refCount.Load())
assert.Equal(t, 0, mlen(&subTopoPool))
assert.Equal(t, 2, len(subTopo.schemaReg))
assert.Equal(t, 0, opNode.schemaCount)
}

// Test when connection fails
Expand Down Expand Up @@ -244,10 +260,11 @@ func (m *mockSrc) RemoveMetrics(ruleId string) {
var _ checkpoint.StreamTask = &mockSrc{}

type mockOp struct {
name string
ch chan any
outputs []chan<- any
inputC int
name string
ch chan any
outputs []chan<- any
inputC int
schemaCount int
}

func (m *mockOp) AddOutput(c chan<- interface{}, s string) error {
Expand Down Expand Up @@ -303,3 +320,11 @@ func (m *mockOp) SetBarrierHandler(handler checkpoint.BarrierHandler) {
func (m *mockOp) RemoveMetrics(name string) {
// do nothing
}

func (m *mockOp) AttachSchema(ctx api.StreamContext, dataSource string, schema map[string]*ast.JsonStreamField, isWildcard bool) {
m.schemaCount++
}

func (m *mockOp) DetachSchema(ruleId string) {
m.schemaCount--
}

0 comments on commit 0ca7c72

Please sign in to comment.