diff --git a/pkg/mock/test_source.go b/pkg/mock/test_source.go index b311d91101..22c33839b7 100644 --- a/pkg/mock/test_source.go +++ b/pkg/mock/test_source.go @@ -24,7 +24,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/lf-edge/ekuiper/contract/v2/api" - "github.com/lf-edge/ekuiper/v2/internal/xsql" mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context" "github.com/lf-edge/ekuiper/v2/pkg/model" ) @@ -58,7 +57,7 @@ func TestSourceConnector(t *testing.T, r api.Source, props map[string]any, expec switch ss := r.(type) { case api.BytesSource: err = ss.Subscribe(ctx, func(ctx api.StreamContext, payload []byte, meta map[string]any, ts time.Time) { - result = append(result, model.NewDefaultRawTuple(payload, xsql.Message(meta), ts)) + result = append(result, model.NewDefaultRawTuple(payload, model.DefaultMessage(meta), ts)) limit-- if limit <= 0 { wg.Done() @@ -79,7 +78,7 @@ func TestSourceConnector(t *testing.T, r api.Source, props map[string]any, expec sender() }() - ticker := time.After(2 * time.Second) + ticker := time.After(60 * time.Second) finished := make(chan struct{}) go func() { wg.Wait()