diff --git a/extensions/impl/sql/sink.go b/extensions/impl/sql/sink.go index aba0557929..e961bc4caf 100644 --- a/extensions/impl/sql/sink.go +++ b/extensions/impl/sql/sink.go @@ -126,10 +126,6 @@ func (s *SQLSinkConnector) Close(ctx api.StreamContext) error { ctx.GetLogger().Infof("Closing sql sink connector url:%v", s.config.DBUrl) id := s.config.DBUrl connection.DetachConnection(ctx, id, s.props) - conn, _ := s.cw.Wait() - if conn != nil { - conn.DetachSub(ctx, s.props) - } return nil } diff --git a/internal/io/edgex/sink.go b/internal/io/edgex/sink.go index ca38b7cf20..971dd2ee80 100644 --- a/internal/io/edgex/sink.go +++ b/internal/io/edgex/sink.go @@ -528,10 +528,6 @@ func (ems *EdgexMsgBusSink) Close(ctx api.StreamContext) error { logger := ctx.GetLogger() logger.Infof("Closing edgex sink") connection.DetachConnection(ctx, ems.id, ems.config) - conn, _ := ems.cw.Wait() - if conn != nil { - conn.DetachSub(ctx, ems.config) - } return nil } diff --git a/internal/io/edgex/sink_test.go b/internal/io/edgex/sink_test.go index c9f071cb3c..eb5ed38cb2 100644 --- a/internal/io/edgex/sink_test.go +++ b/internal/io/edgex/sink_test.go @@ -34,10 +34,6 @@ var ( ctx = context.WithValue(context.Background(), context.LoggerKey, contextLogger) ) -func init() { - conf.InitConf() -} - func compareEvent(expected, actual *dtos.Event) bool { if (expected.Id == actual.Id || (expected.Id == "" && actual.Id != "")) && expected.ProfileName == actual.ProfileName && expected.DeviceName == actual.DeviceName && (expected.Origin == actual.Origin || (expected.Origin == 0 && actual.Origin > 0)) && reflect.DeepEqual(expected.Tags, actual.Tags) && expected.SourceName == actual.SourceName && len(expected.Readings) == len(actual.Readings) { for _, r := range expected.Readings { diff --git a/internal/io/http/rest_sink_test.go b/internal/io/http/rest_sink_test.go index e24c0f64f7..1191958f27 100644 --- a/internal/io/http/rest_sink_test.go +++ b/internal/io/http/rest_sink_test.go @@ -21,16 +21,11 @@ import ( "github.com/pingcap/failpoint" "github.com/stretchr/testify/require" - "github.com/lf-edge/ekuiper/v2/internal/conf" "github.com/lf-edge/ekuiper/v2/internal/xsql" "github.com/lf-edge/ekuiper/v2/pkg/errorx" mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context" ) -func init() { - conf.InitConf() -} - func TestRestSinkCollect(t *testing.T) { server := createServer() defer func() { diff --git a/internal/io/mqtt/sink_test.go b/internal/io/mqtt/sink_test.go index 63f6df7ca4..3669404372 100644 --- a/internal/io/mqtt/sink_test.go +++ b/internal/io/mqtt/sink_test.go @@ -27,10 +27,6 @@ import ( mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context" ) -func init() { - conf.InitConf() -} - func TestSinkConfigure(t *testing.T) { dataDir, err := conf.GetDataLoc() require.NoError(t, err) diff --git a/internal/io/neuron/sink.go b/internal/io/neuron/sink.go index 318e8e2c90..fd3b10b695 100644 --- a/internal/io/neuron/sink.go +++ b/internal/io/neuron/sink.go @@ -83,14 +83,7 @@ func (s *sink) Ping(ctx api.StreamContext, props map[string]interface{}) error { } func (s *sink) Connect(ctx api.StreamContext) error { - ctx.GetLogger().Infof("Connecting to neuron") - connId := PROTOCOL + s.cc.Url - cw, err := connection.FetchConnection(ctx, connId, "nng", s.props) - if err != nil { - return err - } - s.cw = cw - cli, err := s.cw.Wait() + cli, err := connect(ctx, s.cc.Url, s.props) if err != nil { return err } @@ -127,10 +120,6 @@ func (s *sink) CollectList(ctx api.StreamContext, data api.MessageTupleList) err func (s *sink) Close(ctx api.StreamContext) error { ctx.GetLogger().Debugf("closing neuron sink") - conn, _ := s.cw.Wait() - if conn != nil { - conn.DetachSub(ctx, s.props) - } close(ctx, s.cli, s.cc.Url, s.props) s.cli = nil return nil diff --git a/internal/io/neuron/sink_test.go b/internal/io/neuron/sink_test.go index 899f3ca7d5..70f89cca56 100644 --- a/internal/io/neuron/sink_test.go +++ b/internal/io/neuron/sink_test.go @@ -21,16 +21,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/lf-edge/ekuiper/contract/v2/api" - "github.com/lf-edge/ekuiper/v2/internal/conf" "github.com/lf-edge/ekuiper/v2/internal/xsql" "github.com/lf-edge/ekuiper/v2/pkg/mock" mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context" ) -func init() { - conf.InitConf() -} - func TestSink(t *testing.T) { server, ch := mockNeuron(false, true, DefaultNeuronUrl) defer server.Close() diff --git a/internal/io/websocket/websocket_sink_test.go b/internal/io/websocket/websocket_sink_test.go index dc9a2bb8b5..55b9cd5ed0 100644 --- a/internal/io/websocket/websocket_sink_test.go +++ b/internal/io/websocket/websocket_sink_test.go @@ -21,7 +21,6 @@ import ( "github.com/gorilla/websocket" "github.com/stretchr/testify/require" - "github.com/lf-edge/ekuiper/v2/internal/conf" "github.com/lf-edge/ekuiper/v2/internal/io/http/httpserver" "github.com/lf-edge/ekuiper/v2/internal/testx" "github.com/lf-edge/ekuiper/v2/pkg/connection" @@ -30,7 +29,6 @@ import ( ) func init() { - conf.InitConf() modules.RegisterConnection("websocket", httpserver.CreateWebsocketConnection) }