diff --git a/internal/server/rest.go b/internal/server/rest.go index 435dcae47a..a36fa98224 100644 --- a/internal/server/rest.go +++ b/internal/server/rest.go @@ -39,6 +39,7 @@ import ( "github.com/lf-edge/ekuiper/v2/internal/server/middleware" "github.com/lf-edge/ekuiper/v2/internal/topo/planner" "github.com/lf-edge/ekuiper/v2/internal/trial" + "github.com/lf-edge/ekuiper/v2/internal/xsql" "github.com/lf-edge/ekuiper/v2/pkg/ast" "github.com/lf-edge/ekuiper/v2/pkg/cast" "github.com/lf-edge/ekuiper/v2/pkg/errorx" @@ -484,10 +485,23 @@ func checkStreamBeforeDrop(name string) (bool, error) { if !ok { continue } - streams := rs.Topology.GetStreams() - for _, s := range streams { - if name == s { - return true, nil + if rs.Rule.Triggered { + streams := rs.Topology.GetStreams() + for _, s := range streams { + if name == s { + return true, nil + } + } + } else { + stmt, err := xsql.GetStatementFromSql(rs.Rule.Sql) + if err != nil { + continue + } + streams := xsql.GetStreams(stmt) + for _, s := range streams { + if name == s { + return true, nil + } } } } diff --git a/internal/topo/rule/ruleState.go b/internal/topo/rule/ruleState.go index 8ca05a207d..e731ec05bf 100644 --- a/internal/topo/rule/ruleState.go +++ b/internal/topo/rule/ruleState.go @@ -122,18 +122,16 @@ func NewRuleState(rule *def.Rule) (rs *RuleState, err error) { Rule: rule, ActionCh: make(chan ActionSignal), } - err = infra.SafeRun(func() error { - if tp, err := planner.Plan(rule); err != nil { - return err - } else { - rs.Topology = tp - } - if !rs.Rule.Triggered { - // manually force stop rule - rs.Topology.Cancel() - } - return nil - }) + if rs.Rule.Triggered { + err = infra.SafeRun(func() error { + if tp, err := planner.Plan(rule); err != nil { + return err + } else { + rs.Topology = tp + } + return nil + }) + } rs.run() defer func() { if err != nil { @@ -560,6 +558,9 @@ func (rs *RuleState) GetState() (s string, err error) { }() rs.RLock() defer rs.RUnlock() + if !rs.Rule.Triggered { + return rs.getStoppedRuleState(), nil + } result := "" if rs.Topology == nil { result = "Stopped: fail to create the topo." diff --git a/internal/topo/rule/ruleState_test.go b/internal/topo/rule/ruleState_test.go index 596c3e9436..03201396c7 100644 --- a/internal/topo/rule/ruleState_test.go +++ b/internal/topo/rule/ruleState_test.go @@ -66,7 +66,7 @@ func TestCreate(t *testing.T) { }{ { r: &def.Rule{ - Triggered: false, + Triggered: true, Id: "test", Sql: "SELECT ts FROM demo", Actions: []map[string]interface{}{ @@ -80,7 +80,7 @@ func TestCreate(t *testing.T) { }, { r: &def.Rule{ - Triggered: false, + Triggered: true, Id: "test", Sql: "SELECT FROM demo", Actions: []map[string]interface{}{ @@ -95,7 +95,7 @@ func TestCreate(t *testing.T) { }, { r: &def.Rule{ - Triggered: false, + Triggered: true, Id: "test", Sql: "SELECT * FROM demo1", Actions: []map[string]interface{}{ @@ -340,9 +340,9 @@ func TestRuleState_Start(t *testing.T) { sp := processor.NewStreamProcessor() sp.ExecStmt(`CREATE STREAM demo () WITH (TYPE="memory", FORMAT="JSON", DATASOURCE="test")`) defer sp.ExecStmt(`DROP STREAM demo`) - // Test rule not triggered + // Test rule triggered r := &def.Rule{ - Triggered: false, + Triggered: true, Id: "test", Sql: "SELECT ts FROM demo", Actions: []map[string]interface{}{ @@ -427,9 +427,8 @@ func TestScheduleRule(t *testing.T) { sp := processor.NewStreamProcessor() sp.ExecStmt(`CREATE STREAM demo () WITH (TYPE="memory", DATASOURCE="test", FORMAT="JSON")`) defer sp.ExecStmt(`DROP STREAM demo`) - // Test rule not triggered r := &def.Rule{ - Triggered: false, + Triggered: true, Id: "test", Sql: "SELECT ts FROM demo", Actions: []map[string]interface{}{ diff --git a/internal/topo/subtopo_test.go b/internal/topo/subtopo_test.go index ec31f20619..47e19fb51c 100644 --- a/internal/topo/subtopo_test.go +++ b/internal/topo/subtopo_test.go @@ -120,15 +120,16 @@ func TestSubtopoLC(t *testing.T) { // Test when connection fails func TestSubtopoRunError(t *testing.T) { + name := "testSharedErr" assert.Equal(t, 0, mlen(&subTopoPool)) - subTopo, existed := GetSubTopo("shared") + subTopo, existed := GetSubTopo(name) assert.False(t, existed) srcNode := &mockSrc{name: "src1"} opNode := &mockOp{name: "op1", ch: make(chan any)} subTopo.AddSrc(srcNode) subTopo.AddOperator([]node.Emitter{srcNode}, opNode) // create another subtopo - subTopo2, existed := GetSubTopo("shared") + subTopo2, existed := GetSubTopo(name) assert.True(t, existed) assert.Equal(t, subTopo, subTopo2) assert.Equal(t, 1, mlen(&subTopoPool)) diff --git a/internal/topo/topotest/mock_topo.go b/internal/topo/topotest/mock_topo.go index 494ff92ce0..d2d179d76c 100644 --- a/internal/topo/topotest/mock_topo.go +++ b/internal/topo/topotest/mock_topo.go @@ -268,8 +268,9 @@ func createTestRule(t *testing.T, id string, tt RuleTest, opt *def.RuleOption) ( } } rule := &def.Rule{ - Id: id, - Sql: tt.Sql, + Triggered: true, + Id: id, + Sql: tt.Sql, Actions: []map[string]any{ { "memory": map[string]any{