Skip to content

Commit

Permalink
refactor(op): decode op schema more logs
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <[email protected]>
  • Loading branch information
ngjaying committed May 11, 2024
1 parent 9382d71 commit 906b97a
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 22 deletions.
4 changes: 2 additions & 2 deletions internal/topo/node/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ type MergeableTopo interface {
// SubMetrics return the metrics of the sub nodes
SubMetrics() ([]string, []any)
// Close notifies subtopo to deref
Close(ruleId string)
Close(ctx api.StreamContext, ruleId string)
}

type SchemaNode interface {
// AttachSchema attach the schema to the node. The parameters are ruleId, sourceName, schema, whether is wildcard
AttachSchema(api.StreamContext, string, map[string]*ast.JsonStreamField, bool)
// DetachSchema detach the schema from the node. The parameters are ruleId
DetachSchema(string)
DetachSchema(api.StreamContext, string)
}
11 changes: 6 additions & 5 deletions internal/topo/node/decode_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"fmt"

"github.com/lf-edge/ekuiper/contract/v2/api"
"github.com/lf-edge/ekuiper/v2/internal/conf"
"github.com/lf-edge/ekuiper/v2/internal/converter"
schemaLayer "github.com/lf-edge/ekuiper/v2/internal/converter/schema"
"github.com/lf-edge/ekuiper/v2/internal/pkg/def"
Expand All @@ -35,23 +34,25 @@ type DecodeOp struct {
}

func (o *DecodeOp) AttachSchema(ctx api.StreamContext, dataSource string, schema map[string]*ast.JsonStreamField, isWildcard bool) {
ctx.GetLogger().Infof("attach schema to shared stream")
if fastDecoder, ok := o.converter.(message.SchemaResetAbleConverter); ok {
ctx.GetLogger().Infof("attach schema to shared stream")
if err := o.sLayer.MergeSchema(ctx.GetRuleId(), dataSource, schema, isWildcard); err != nil {
ctx.GetLogger().Warnf("merge schema to shared stream failed, err: %v", err)
} else {
ctx.GetLogger().Infof("attach schema become %+v", o.sLayer.GetSchema())
fastDecoder.ResetSchema(o.sLayer.GetSchema())
}
}
}

func (o *DecodeOp) DetachSchema(ruleId string) {
conf.Log.Infof("detach schema for shared stream rule %v", ruleId)
func (o *DecodeOp) DetachSchema(ctx api.StreamContext, ruleId string) {
if fastDecoder, ok := o.converter.(message.SchemaResetAbleConverter); ok {
ctx.GetLogger().Infof("detach schema for shared stream rule %v", ruleId)
if err := o.sLayer.DetachSchema(ruleId); err != nil {
conf.Log.Warnf("detach schema for shared stream rule %v failed, err:%v", ruleId, err)
ctx.GetLogger().Infof("detach schema for shared stream rule %v failed, err:%v", ruleId, err)
} else {
fastDecoder.ResetSchema(o.sLayer.GetSchema())
ctx.GetLogger().Infof("detach schema become %+v", o.sLayer.GetSchema())
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/topo/node/decode_op_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func TestJSONWithSchema(t *testing.T) {
}
}

op.DetachSchema(ctx.GetRuleId())
op.DetachSchema(ctx, ctx.GetRuleId())
cases = []any{
&xsql.RawTuple{Emitter: "test", Rawdata: []byte("{\"a\":1,\"b\":2}"), Timestamp: 111, Metadata: map[string]any{"topic": "demo", "qos": 1}},
&xsql.RawTuple{Emitter: "test", Rawdata: []byte("[{\"a\":1,\"b\":2},{\"a\":3,\"b\":4,\"c\":\"hello\"}]"), Timestamp: 111, Metadata: map[string]any{"topic": "demo", "qos": 1}},
Expand Down
4 changes: 2 additions & 2 deletions internal/topo/subtopo.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (s *SrcSubTopo) StoreSchema(ruleID, dataSource string, schema map[string]*a
}
}

func (s *SrcSubTopo) Close(ruleId string) {
func (s *SrcSubTopo) Close(ctx api.StreamContext, ruleId string) {
if _, ok := s.refRules.LoadAndDelete(ruleId); ok {
s.refCount.Add(-1)
if s.refCount.Load() == 0 {
Expand All @@ -182,7 +182,7 @@ func (s *SrcSubTopo) Close(ruleId string) {
}
for _, op := range s.ops {
if so, ok := op.(node.SchemaNode); ok {
so.DetachSchema(ruleId)
so.DetachSchema(ctx, ruleId)
}
}
}
Expand Down
27 changes: 16 additions & 11 deletions internal/topo/subtopo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ func TestSubtopoLC(t *testing.T) {
}
assert.Equal(t, ptopo, subTopo.topo)
// Test run
subTopo.Open(mockContext.NewMockContext("rule1", "abc"), make(chan error))
ctx1 := mockContext.NewMockContext("rule1", "abc")
subTopo.Open(ctx1, make(chan error))
assert.Equal(t, int32(1), subTopo.refCount.Load())
assert.Equal(t, 1, opNode.schemaCount)
// Run another
Expand All @@ -73,7 +74,8 @@ func TestSubtopoLC(t *testing.T) {
subTopo.StoreSchema("rule2", "shared", map[string]*ast.JsonStreamField{
"field2": {Type: "string"},
}, false)
subTopo2.Open(mockContext.NewMockContext("rule2", "abc"), make(chan error))
ctx2 := mockContext.NewMockContext("rule2", "abc")
subTopo2.Open(ctx2, make(chan error))
assert.Equal(t, int32(2), subTopo.refCount.Load())
assert.Equal(t, 2, opNode.schemaCount)
// Metrics test
Expand Down Expand Up @@ -106,10 +108,10 @@ func TestSubtopoLC(t *testing.T) {
assert.Equal(t, []checkpoint.StreamTask{srcNode}, sources)
assert.Equal(t, []checkpoint.NonSourceTask{opNode}, ops)
// Stop
subTopo.Close("rule1")
subTopo.Close(ctx1, "rule1")
assert.Equal(t, int32(1), subTopo.refCount.Load())
assert.Equal(t, 1, mlen(&subTopoPool))
subTopo2.Close("rule2")
subTopo2.Close(ctx2, "rule2")
assert.Equal(t, int32(0), subTopo.refCount.Load())
assert.Equal(t, 0, mlen(&subTopoPool))
assert.Equal(t, 2, len(subTopo.schemaReg))
Expand All @@ -132,33 +134,36 @@ func TestSubtopoRunError(t *testing.T) {
assert.Equal(t, 1, mlen(&subTopoPool))
// Test run firstly, successfully
assert.Equal(t, false, subTopo.opened.Load())
subTopo.Open(mockContext.NewMockContext("rule1", "abc"), make(chan error))
ctx1 := mockContext.NewMockContext("rule1", "abc")
subTopo.Open(ctx1, make(chan error))
assert.Equal(t, int32(1), subTopo.refCount.Load())
assert.Equal(t, true, subTopo.opened.Load())
subTopo.Close("rule1")
subTopo.Close(ctx1, "rule1")
assert.Equal(t, int32(0), subTopo.refCount.Load())
assert.Equal(t, 0, mlen(&subTopoPool))
time.Sleep(10 * time.Millisecond)
assert.Equal(t, false, subTopo.opened.Load())
// Test run secondly and thirdly, should fail
errCh1 := make(chan error, 1)
subTopo.Open(mockContext.NewMockContext("rule1", "abc"), errCh1)
ctx1 = mockContext.NewMockContext("rule1", "abc")
subTopo.Open(ctx1, errCh1)
assert.Equal(t, int32(1), subTopo.refCount.Load())
errCh2 := make(chan error, 1)
assert.Equal(t, true, subTopo.opened.Load())
subTopo.Open(mockContext.NewMockContext("rule2", "abc"), errCh2)
ctx2 := mockContext.NewMockContext("rule2", "abc")
subTopo.Open(ctx2, errCh2)
assert.Equal(t, int32(2), subTopo.refCount.Load())
select {
case err := <-errCh1:
assert.Equal(t, assert.AnError, err)
subTopo.Close("rule1")
subTopo.Close(ctx1, "rule1")
case <-time.After(1 * time.Second):
assert.Fail(t, "Should receive error")
}
select {
case err := <-errCh2:
assert.Equal(t, assert.AnError, err)
subTopo2.Close("rule2")
subTopo2.Close(ctx2, "rule2")
case <-time.After(1 * time.Second):
assert.Fail(t, "Should receive error")
}
Expand Down Expand Up @@ -326,6 +331,6 @@ func (m *mockOp) AttachSchema(ctx api.StreamContext, dataSource string, schema m
m.schemaCount++
}

func (m *mockOp) DetachSchema(ruleId string) {
func (m *mockOp) DetachSchema(ctx api.StreamContext, ruleId string) {
m.schemaCount--
}
2 changes: 1 addition & 1 deletion internal/topo/topo.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (s *Topo) Cancel() {
for _, src := range s.sources {
switch rt := src.(type) {
case node.MergeableTopo:
rt.Close(s.name)
rt.Close(s.ctx, s.name)
}
}
}
Expand Down

0 comments on commit 906b97a

Please sign in to comment.