From ca1c246334cac63f8b892b94e25b638abdb8e6d8 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Fri, 15 Mar 2024 17:42:04 +0800 Subject: [PATCH] fix: revise mqtt source on msg behavior (#2712) Signed-off-by: yisaer --- internal/io/mqtt/connection.go | 1 + internal/io/mqtt/mqtt_source_connector.go | 27 ++++++++- .../io/mqtt/mqtt_source_connector_test.go | 55 +++++++++++++++++++ internal/topo/operator/join_operator.go | 9 ++- 4 files changed, 89 insertions(+), 3 deletions(-) diff --git a/internal/io/mqtt/connection.go b/internal/io/mqtt/connection.go index 05e31387a1..2744764534 100644 --- a/internal/io/mqtt/connection.go +++ b/internal/io/mqtt/connection.go @@ -89,6 +89,7 @@ func (conn *Connection) attach() { // Do not call this directly. Call connection pool Detach method to release the connection func (conn *Connection) detach(topic string) bool { delete(conn.subscriptions, topic) + conn.Client.Unsubscribe(topic) if conn.refCount.Add(-1) == 0 { go conn.Close() return true diff --git a/internal/io/mqtt/mqtt_source_connector.go b/internal/io/mqtt/mqtt_source_connector.go index 3673cbb42f..2c160ba380 100644 --- a/internal/io/mqtt/mqtt_source_connector.go +++ b/internal/io/mqtt/mqtt_source_connector.go @@ -18,6 +18,7 @@ import ( "fmt" pahoMqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/pingcap/failpoint" "github.com/lf-edge/ekuiper/internal/conf" "github.com/lf-edge/ekuiper/internal/topo/context" @@ -103,12 +104,25 @@ func (ms *SourceConnector) Subscribe(ctx api.StreamContext) error { } func (ms *SourceConnector) onMessage(ctx api.StreamContext, msg pahoMqtt.Message) { + select { + case <-ctx.Done(): + return + default: + } + failpoint.Inject("ctxCancel", func(val failpoint.Value) { + if val.(bool) { + panic("shouldn't run") + } + }) + if ms.consumer == nil { // The consumer is closed, no need to process the message ctx.GetLogger().Debugf("The consumer is closed, skip to process the message %s from topic %s", string(msg.Payload()), msg.Topic()) return } - ctx.GetLogger().Debugf("Received message %s from topic %s", string(msg.Payload()), msg.Topic()) + if msg != nil { + ctx.GetLogger().Debugf("Received message %s from topic %s", string(msg.Payload()), msg.Topic()) + } rcvTime := conf.GetNow() select { case ms.consumer <- api.NewDefaultRawTuple(msg.Payload(), map[string]interface{}{ @@ -122,6 +136,17 @@ func (ms *SourceConnector) onMessage(ctx api.StreamContext, msg pahoMqtt.Message } func (ms *SourceConnector) onError(ctx api.StreamContext, err error) { + select { + case <-ctx.Done(): + return + default: + } + failpoint.Inject("ctxCancel", func(val failpoint.Value) { + if val.(bool) { + panic("shouldn't run") + } + }) + if ms.consumer == nil { // The consumer is closed, no need to process the message ctx.GetLogger().Debugf("The consumer is closed, skip to send the error") diff --git a/internal/io/mqtt/mqtt_source_connector_test.go b/internal/io/mqtt/mqtt_source_connector_test.go index ef18aed728..02440515a5 100644 --- a/internal/io/mqtt/mqtt_source_connector_test.go +++ b/internal/io/mqtt/mqtt_source_connector_test.go @@ -22,13 +22,16 @@ import ( "github.com/benbjohnson/clock" mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/pingcap/failpoint" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/lf-edge/ekuiper/internal/conf" "github.com/lf-edge/ekuiper/internal/io/mock" + mockContext "github.com/lf-edge/ekuiper/internal/io/mock/context" "github.com/lf-edge/ekuiper/internal/testx" "github.com/lf-edge/ekuiper/internal/topo/connection/factory" + "github.com/lf-edge/ekuiper/internal/topo/context" "github.com/lf-edge/ekuiper/pkg/api" ) @@ -118,3 +121,55 @@ func TestOpen(t *testing.T) { } }) } + +func TestOnMsgCancel(t *testing.T) { + sc := &SourceConnector{} + sc.consumer = make(chan<- api.SourceTuple, 10) + sc.onMessage(mockContext.NewMockContext("1", "1"), MockMessage{}) + sc.onError(mockContext.NewMockContext("1", "1"), nil) + + require.NoError(t, failpoint.Enable("github.com/lf-edge/ekuiper/internal/io/mqtt/ctxCancel", "return(ture)")) + defer func() { + failpoint.Disable("github.com/lf-edge/ekuiper/internal/io/mqtt/ctxCancel") + }() + ctx, cancel := context.Background().WithCancel() + cancel() + time.Sleep(100 * time.Millisecond) + sc.onMessage(ctx, nil) + sc.onError(ctx, nil) +} + +// MockMessage implements the Message interface and allows for control over the returned data when a MessageHandler is +// invoked. +type MockMessage struct { + payload []byte + topic string +} + +func (mm MockMessage) Payload() []byte { + return mm.payload +} + +func (MockMessage) Duplicate() bool { + panic("function not expected to be invoked") +} + +func (MockMessage) Qos() byte { + return 0 +} + +func (MockMessage) Retained() bool { + panic("function not expected to be invoked") +} + +func (mm MockMessage) Topic() string { + return mm.topic +} + +func (MockMessage) MessageID() uint16 { + return 0 +} + +func (MockMessage) Ack() { + panic("function not expected to be invoked") +} diff --git a/internal/topo/operator/join_operator.go b/internal/topo/operator/join_operator.go index 9684600d38..6b7aafc1f0 100644 --- a/internal/topo/operator/join_operator.go +++ b/internal/topo/operator/join_operator.go @@ -51,7 +51,7 @@ func (jp *JoinOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.Functi default: } if i == 0 { - v, err := jp.evalSet(input, join, fv) + v, err := jp.evalSet(ctx, input, join, fv) if err != nil { return fmt.Errorf("run Join error: %s", err) } @@ -108,7 +108,7 @@ func (jp *JoinOp) getStreamNames(join *ast.Join) ([]string, error) { return srcs, nil } -func (jp *JoinOp) evalSet(input xsql.MergedCollection, join ast.Join, fv *xsql.FunctionValuer) (*xsql.JoinTuples, error) { +func (jp *JoinOp) evalSet(ctx api.StreamContext, input xsql.MergedCollection, join ast.Join, fv *xsql.FunctionValuer) (*xsql.JoinTuples, error) { var leftStream, rightStream string if join.JoinType != ast.CROSS_JOIN { @@ -143,6 +143,11 @@ func (jp *JoinOp) evalSet(input xsql.MergedCollection, join ast.Join, fv *xsql.F return jp.evalSetWithRightJoin(input, join, false, fv) } for _, left := range lefts { + select { + case <-ctx.Done(): + return nil, nil + default: + } leftJoined := false for index, right := range rights { tupleJoined := false