Skip to content

Commit

Permalink
address the comment
Browse files Browse the repository at this point in the history
Signed-off-by: Song Gao <[email protected]>
  • Loading branch information
Yisaer committed Jul 22, 2024
1 parent 044590f commit b2ee8c5
Show file tree
Hide file tree
Showing 8 changed files with 1 addition and 40 deletions.
4 changes: 0 additions & 4 deletions extensions/impl/sql/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,6 @@ func (s *SQLSinkConnector) Close(ctx api.StreamContext) error {
ctx.GetLogger().Infof("Closing sql sink connector url:%v", s.config.DBUrl)
id := s.config.DBUrl
connection.DetachConnection(ctx, id, s.props)
conn, _ := s.cw.Wait()
if conn != nil {
conn.DetachSub(ctx, s.props)
}
return nil
}

Expand Down
4 changes: 0 additions & 4 deletions internal/io/edgex/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,10 +528,6 @@ func (ems *EdgexMsgBusSink) Close(ctx api.StreamContext) error {
logger := ctx.GetLogger()
logger.Infof("Closing edgex sink")
connection.DetachConnection(ctx, ems.id, ems.config)
conn, _ := ems.cw.Wait()
if conn != nil {
conn.DetachSub(ctx, ems.config)
}
return nil
}

Expand Down
4 changes: 0 additions & 4 deletions internal/io/edgex/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ var (
ctx = context.WithValue(context.Background(), context.LoggerKey, contextLogger)
)

func init() {
conf.InitConf()
}

func compareEvent(expected, actual *dtos.Event) bool {
if (expected.Id == actual.Id || (expected.Id == "" && actual.Id != "")) && expected.ProfileName == actual.ProfileName && expected.DeviceName == actual.DeviceName && (expected.Origin == actual.Origin || (expected.Origin == 0 && actual.Origin > 0)) && reflect.DeepEqual(expected.Tags, actual.Tags) && expected.SourceName == actual.SourceName && len(expected.Readings) == len(actual.Readings) {
for _, r := range expected.Readings {
Expand Down
5 changes: 0 additions & 5 deletions internal/io/http/rest_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,11 @@ import (
"github.com/pingcap/failpoint"
"github.com/stretchr/testify/require"

"github.com/lf-edge/ekuiper/v2/internal/conf"
"github.com/lf-edge/ekuiper/v2/internal/xsql"
"github.com/lf-edge/ekuiper/v2/pkg/errorx"
mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context"
)

func init() {
conf.InitConf()
}

func TestRestSinkCollect(t *testing.T) {
server := createServer()
defer func() {
Expand Down
4 changes: 0 additions & 4 deletions internal/io/mqtt/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ import (
mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context"
)

func init() {
conf.InitConf()
}

func TestSinkConfigure(t *testing.T) {
dataDir, err := conf.GetDataLoc()
require.NoError(t, err)
Expand Down
13 changes: 1 addition & 12 deletions internal/io/neuron/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,7 @@ func (s *sink) Ping(ctx api.StreamContext, props map[string]interface{}) error {
}

func (s *sink) Connect(ctx api.StreamContext) error {
ctx.GetLogger().Infof("Connecting to neuron")
connId := PROTOCOL + s.cc.Url
cw, err := connection.FetchConnection(ctx, connId, "nng", s.props)
if err != nil {
return err
}
s.cw = cw
cli, err := s.cw.Wait()
cli, err := connect(ctx, s.cc.Url, s.props)
if err != nil {
return err
}
Expand Down Expand Up @@ -127,10 +120,6 @@ func (s *sink) CollectList(ctx api.StreamContext, data api.MessageTupleList) err

func (s *sink) Close(ctx api.StreamContext) error {
ctx.GetLogger().Debugf("closing neuron sink")
conn, _ := s.cw.Wait()
if conn != nil {
conn.DetachSub(ctx, s.props)
}
close(ctx, s.cli, s.cc.Url, s.props)
s.cli = nil
return nil
Expand Down
5 changes: 0 additions & 5 deletions internal/io/neuron/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,11 @@ import (
"github.com/stretchr/testify/assert"

"github.com/lf-edge/ekuiper/contract/v2/api"
"github.com/lf-edge/ekuiper/v2/internal/conf"
"github.com/lf-edge/ekuiper/v2/internal/xsql"
"github.com/lf-edge/ekuiper/v2/pkg/mock"
mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context"
)

func init() {
conf.InitConf()
}

func TestSink(t *testing.T) {
server, ch := mockNeuron(false, true, DefaultNeuronUrl)
defer server.Close()
Expand Down
2 changes: 0 additions & 2 deletions internal/io/websocket/websocket_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/gorilla/websocket"
"github.com/stretchr/testify/require"

"github.com/lf-edge/ekuiper/v2/internal/conf"
"github.com/lf-edge/ekuiper/v2/internal/io/http/httpserver"
"github.com/lf-edge/ekuiper/v2/internal/testx"
"github.com/lf-edge/ekuiper/v2/pkg/connection"
Expand All @@ -30,7 +29,6 @@ import (
)

func init() {
conf.InitConf()
modules.RegisterConnection("websocket", httpserver.CreateWebsocketConnection)
}

Expand Down

0 comments on commit b2ee8c5

Please sign in to comment.