Skip to content

Commit

Permalink
fix: revise mqtt source on msg behavior (#2712)
Browse files Browse the repository at this point in the history
Signed-off-by: yisaer <[email protected]>
  • Loading branch information
Yisaer committed Mar 15, 2024
1 parent bff0475 commit ca1c246
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 3 deletions.
1 change: 1 addition & 0 deletions internal/io/mqtt/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func (conn *Connection) attach() {
// Do not call this directly. Call connection pool Detach method to release the connection
func (conn *Connection) detach(topic string) bool {
delete(conn.subscriptions, topic)
conn.Client.Unsubscribe(topic)
if conn.refCount.Add(-1) == 0 {
go conn.Close()
return true
Expand Down
27 changes: 26 additions & 1 deletion internal/io/mqtt/mqtt_source_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"

pahoMqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/pingcap/failpoint"

"github.com/lf-edge/ekuiper/internal/conf"
"github.com/lf-edge/ekuiper/internal/topo/context"
Expand Down Expand Up @@ -103,12 +104,25 @@ func (ms *SourceConnector) Subscribe(ctx api.StreamContext) error {
}

func (ms *SourceConnector) onMessage(ctx api.StreamContext, msg pahoMqtt.Message) {
select {
case <-ctx.Done():
return
default:
}
failpoint.Inject("ctxCancel", func(val failpoint.Value) {
if val.(bool) {
panic("shouldn't run")
}
})

if ms.consumer == nil {
// The consumer is closed, no need to process the message
ctx.GetLogger().Debugf("The consumer is closed, skip to process the message %s from topic %s", string(msg.Payload()), msg.Topic())
return
}
ctx.GetLogger().Debugf("Received message %s from topic %s", string(msg.Payload()), msg.Topic())
if msg != nil {
ctx.GetLogger().Debugf("Received message %s from topic %s", string(msg.Payload()), msg.Topic())
}
rcvTime := conf.GetNow()
select {
case ms.consumer <- api.NewDefaultRawTuple(msg.Payload(), map[string]interface{}{
Expand All @@ -122,6 +136,17 @@ func (ms *SourceConnector) onMessage(ctx api.StreamContext, msg pahoMqtt.Message
}

func (ms *SourceConnector) onError(ctx api.StreamContext, err error) {
select {
case <-ctx.Done():
return
default:
}
failpoint.Inject("ctxCancel", func(val failpoint.Value) {
if val.(bool) {
panic("shouldn't run")
}
})

if ms.consumer == nil {
// The consumer is closed, no need to process the message
ctx.GetLogger().Debugf("The consumer is closed, skip to send the error")
Expand Down
55 changes: 55 additions & 0 deletions internal/io/mqtt/mqtt_source_connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ import (

"github.com/benbjohnson/clock"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/pingcap/failpoint"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/lf-edge/ekuiper/internal/conf"
"github.com/lf-edge/ekuiper/internal/io/mock"
mockContext "github.com/lf-edge/ekuiper/internal/io/mock/context"
"github.com/lf-edge/ekuiper/internal/testx"
"github.com/lf-edge/ekuiper/internal/topo/connection/factory"
"github.com/lf-edge/ekuiper/internal/topo/context"
"github.com/lf-edge/ekuiper/pkg/api"
)

Expand Down Expand Up @@ -118,3 +121,55 @@ func TestOpen(t *testing.T) {
}
})
}

func TestOnMsgCancel(t *testing.T) {
sc := &SourceConnector{}
sc.consumer = make(chan<- api.SourceTuple, 10)
sc.onMessage(mockContext.NewMockContext("1", "1"), MockMessage{})
sc.onError(mockContext.NewMockContext("1", "1"), nil)

require.NoError(t, failpoint.Enable("github.com/lf-edge/ekuiper/internal/io/mqtt/ctxCancel", "return(ture)"))
defer func() {
failpoint.Disable("github.com/lf-edge/ekuiper/internal/io/mqtt/ctxCancel")
}()
ctx, cancel := context.Background().WithCancel()
cancel()
time.Sleep(100 * time.Millisecond)
sc.onMessage(ctx, nil)
sc.onError(ctx, nil)
}

// MockMessage implements the Message interface and allows for control over the returned data when a MessageHandler is
// invoked.
type MockMessage struct {
payload []byte
topic string
}

func (mm MockMessage) Payload() []byte {
return mm.payload
}

func (MockMessage) Duplicate() bool {
panic("function not expected to be invoked")
}

func (MockMessage) Qos() byte {
return 0
}

func (MockMessage) Retained() bool {
panic("function not expected to be invoked")
}

func (mm MockMessage) Topic() string {
return mm.topic
}

func (MockMessage) MessageID() uint16 {
return 0
}

func (MockMessage) Ack() {
panic("function not expected to be invoked")
}
9 changes: 7 additions & 2 deletions internal/topo/operator/join_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (jp *JoinOp) Apply(ctx api.StreamContext, data interface{}, fv *xsql.Functi
default:
}
if i == 0 {
v, err := jp.evalSet(input, join, fv)
v, err := jp.evalSet(ctx, input, join, fv)
if err != nil {
return fmt.Errorf("run Join error: %s", err)
}
Expand Down Expand Up @@ -108,7 +108,7 @@ func (jp *JoinOp) getStreamNames(join *ast.Join) ([]string, error) {
return srcs, nil
}

func (jp *JoinOp) evalSet(input xsql.MergedCollection, join ast.Join, fv *xsql.FunctionValuer) (*xsql.JoinTuples, error) {
func (jp *JoinOp) evalSet(ctx api.StreamContext, input xsql.MergedCollection, join ast.Join, fv *xsql.FunctionValuer) (*xsql.JoinTuples, error) {
var leftStream, rightStream string

if join.JoinType != ast.CROSS_JOIN {
Expand Down Expand Up @@ -143,6 +143,11 @@ func (jp *JoinOp) evalSet(input xsql.MergedCollection, join ast.Join, fv *xsql.F
return jp.evalSetWithRightJoin(input, join, false, fv)
}
for _, left := range lefts {
select {
case <-ctx.Done():
return nil, nil
default:
}
leftJoined := false
for index, right := range rights {
tupleJoined := false
Expand Down

0 comments on commit ca1c246

Please sign in to comment.