Skip to content

Commit

Permalink
feat(module): modularize io (#2721)
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <[email protected]>
  • Loading branch information
ngjaying authored Mar 22, 2024
1 parent 1cc0815 commit 264bfef
Show file tree
Hide file tree
Showing 40 changed files with 235 additions and 174 deletions.
26 changes: 13 additions & 13 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ build_prepare:

.PHONY: build_without_edgex
build_without_edgex: build_prepare
GO111MODULE=on CGO_ENABLED=0 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -o kuiper cmd/kuiper/main.go
GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -o kuiperd cmd/kuiperd/main.go
GO111MODULE=on CGO_ENABLED=0 go build -trimpath -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=$(VERSION) -X github.com/lf-edge/ekuiper/cmd.LoadFileType=relative" -o kuiper cmd/kuiper/main.go
GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=$(VERSION) -X github.com/lf-edge/ekuiper/cmd.LoadFileType=relative" -o kuiperd cmd/kuiperd/main.go
@if [ "$$(uname -s)" = "Linux" ] && [ ! -z $$(which upx) ]; then upx ./kuiper; upx ./kuiperd; fi
@mv ./kuiper ./kuiperd $(BUILD_PATH)/$(PACKAGE_NAME)/bin
@echo "Build successfully"
Expand All @@ -54,16 +54,16 @@ pkg_without_edgex: build_without_edgex

.PHONY: build_with_edgex
build_with_edgex: build_prepare
GO111MODULE=on CGO_ENABLED=0 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -tags "edgex include_nats_messaging" -o kuiper cmd/kuiper/main.go
GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -tags "edgex include_nats_messaging" -o kuiperd cmd/kuiperd/main.go
GO111MODULE=on CGO_ENABLED=0 go build -trimpath -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=$(VERSION) -X github.com/lf-edge/ekuiper/cmd.LoadFileType=relative" -tags "edgex include_nats_messaging" -o kuiper cmd/kuiper/main.go
GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=$(VERSION) -X github.com/lf-edge/ekuiper/cmd.LoadFileType=relative" -tags "edgex include_nats_messaging" -o kuiperd cmd/kuiperd/main.go
@if [ "$$(uname -s)" = "Linux" ] && [ ! -z $$(which upx) ]; then upx ./kuiper; upx ./kuiperd; fi
@mv ./kuiper ./kuiperd $(BUILD_PATH)/$(PACKAGE_NAME)/bin
@echo "Build successfully"

.PHONY: build_with_edgex_and_script
build_with_edgex_and_script: build_prepare
GO111MODULE=on CGO_ENABLED=0 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -tags "edgex include_nats_messaging" -o kuiper cmd/kuiper/main.go
GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -tags "edgex include_nats_messaging script" -o kuiperd cmd/kuiperd/main.go
GO111MODULE=on CGO_ENABLED=0 go build -trimpath -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=$(VERSION) -X github.com/lf-edge/ekuiper/cmd.LoadFileType=relative" -tags "edgex include_nats_messaging" -o kuiper cmd/kuiper/main.go
GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=$(VERSION) -X github.com/lf-edge/ekuiper/cmd.LoadFileType=relative" -tags "edgex include_nats_messaging script" -o kuiperd cmd/kuiperd/main.go
@if [ "$$(uname -s)" = "Linux" ] && [ ! -z $$(which upx) ]; then upx ./kuiper; upx ./kuiperd; fi
@mv ./kuiper ./kuiperd $(BUILD_PATH)/$(PACKAGE_NAME)/bin
@echo "Build successfully"
Expand All @@ -74,8 +74,8 @@ pkg_with_edgex: build_with_edgex

.PHONY: build_with_fdb
build_with_fdb: build_prepare
GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -tags "fdb" -o kuiper cmd/kuiper/main.go
GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -tags "fdb" -o kuiperd cmd/kuiperd/main.go
GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=$(VERSION) -X github.com/lf-edge/ekuiper/cmd.LoadFileType=relative" -tags "fdb" -o kuiper cmd/kuiper/main.go
GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=$(VERSION) -X github.com/lf-edge/ekuiper/cmd.LoadFileType=relative" -tags "fdb" -o kuiperd cmd/kuiperd/main.go
@if [ "$$(uname -s)" = "Linux" ] && [ ! -z $$(which upx) ]; then upx ./kuiper; upx ./kuiperd; fi
@mv ./kuiper ./kuiperd $(BUILD_PATH)/$(PACKAGE_NAME)/bin
@echo "Build successfully"
Expand All @@ -86,7 +86,7 @@ pkg_with_fdb: build_with_fdb

.PHONY: build_core
build_core: build_prepare
GO111MODULE=on CGO_ENABLED=0 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -tags core -o kuiperd cmd/kuiperd/main.go
GO111MODULE=on CGO_ENABLED=0 go build -trimpath -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=$(VERSION) -X github.com/lf-edge/ekuiper/cmd.LoadFileType=relative" -tags core -o kuiperd cmd/kuiperd/main.go
@if [ "$$(uname -s)" = "Linux" ] && [ ! -z $$(which upx) ]; then upx ./kuiperd; fi
@mv ./kuiperd $(BUILD_PATH)/$(PACKAGE_NAME)/bin
@echo "Build successfully"
Expand All @@ -105,8 +105,8 @@ PLUGINS_IN_FULL := \
.PHONY: build_full
build_full: SHELL:=/bin/bash -euo pipefail
build_full: build_prepare
GO111MODULE=on CGO_ENABLED=0 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -o kuiper cmd/kuiper/main.go
GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -tags "full include_nats_messaging" -o kuiperd cmd/kuiperd/main.go
GO111MODULE=on CGO_ENABLED=0 go build -trimpath -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=$(VERSION) -X github.com/lf-edge/ekuiper/cmd.LoadFileType=relative" -o kuiper cmd/kuiper/main.go
GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=$(VERSION) -X github.com/lf-edge/ekuiper/cmd.LoadFileType=relative" -tags "full include_nats_messaging" -o kuiperd cmd/kuiperd/main.go
@if [ "$$(uname -s)" = "Linux" ] && [ ! -z $$(which upx) ]; then upx ./kuiper; upx ./kuiperd; fi
@mv ./kuiper ./kuiperd $(BUILD_PATH)/$(PACKAGE_NAME)/bin
@while read plugin; do \
Expand Down Expand Up @@ -143,8 +143,8 @@ real_pkg:

.PHONY: build_with_wasm
build_with_wasm: build_prepare
GO111MODULE=on CGO_ENABLED=0 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -o kuiper cmd/kuiper/main.go
GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -tags "wasmedge" -o kuiperd cmd/kuiperd/main.go
GO111MODULE=on CGO_ENABLED=0 go build -trimpath -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=$(VERSION) -X github.com/lf-edge/ekuiper/cmd.LoadFileType=relative" -o kuiper cmd/kuiper/main.go
GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=$(VERSION) -X github.com/lf-edge/ekuiper/cmd.LoadFileType=relative" -tags "wasmedge" -o kuiperd cmd/kuiperd/main.go
@if [ "$$(uname -s)" = "Linux" ] && [ ! -z $$(which upx) ]; then upx ./kuiper; upx ./kuiperd; fi
@mv ./kuiper ./kuiperd $(BUILD_PATH)/$(PACKAGE_NAME)/bin
@echo "Build successfully"
Expand Down
52 changes: 3 additions & 49 deletions cmd/kuiperd/main.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2023 EMQ Technologies Co., Ltd.
// Copyright 2021-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -14,54 +14,8 @@

package main

import (
"flag"

"github.com/lf-edge/ekuiper/internal/conf"
"github.com/lf-edge/ekuiper/internal/server"
)

var (
Version = "unknown"
LoadFileType = "relative"
)

var (
loadFileType string
etcPath string
dataPath string
logPath string
pluginsPath string
)

func init() {
flag.StringVar(&loadFileType, "loadFileType", "", "loadFileType indicates the how to load path")
flag.StringVar(&etcPath, "etc", "", "etc indicates the path of etc dir")
flag.StringVar(&dataPath, "data", "", "data indicates the path of data dir")
flag.StringVar(&logPath, "log", "", "log indicates the path of log dir")
flag.StringVar(&pluginsPath, "plugins", "", "plugins indicates the path of plugins dir")

flag.Parse()

if len(loadFileType) > 0 {
conf.PathConfig.LoadFileType = loadFileType
} else {
conf.PathConfig.LoadFileType = LoadFileType
}
if len(etcPath) > 0 {
conf.PathConfig.Dirs["etc"] = etcPath
}
if len(dataPath) > 0 {
conf.PathConfig.Dirs["data"] = dataPath
}
if len(logPath) > 0 {
conf.PathConfig.Dirs["log"] = logPath
}
if len(pluginsPath) > 0 {
conf.PathConfig.Dirs["plugins"] = pluginsPath
}
}
import "github.com/lf-edge/ekuiper/cmd"

func main() {
server.StartUp(Version)
cmd.Main()
}
68 changes: 68 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cmd

import (
"flag"

"github.com/lf-edge/ekuiper/internal/conf"
"github.com/lf-edge/ekuiper/internal/server"
)

// The compile time variable
var (
Version = "unknown"
LoadFileType = "relative"
)

var (
loadFileType string
etcPath string
dataPath string
logPath string
pluginsPath string
)

func init() {
flag.StringVar(&loadFileType, "loadFileType", "", "loadFileType indicates the how to load path")
flag.StringVar(&etcPath, "etc", "", "etc indicates the path of etc dir")
flag.StringVar(&dataPath, "data", "", "data indicates the path of data dir")
flag.StringVar(&logPath, "log", "", "log indicates the path of log dir")
flag.StringVar(&pluginsPath, "plugins", "", "plugins indicates the path of plugins dir")

flag.Parse()

if len(loadFileType) > 0 {
conf.PathConfig.LoadFileType = loadFileType
} else {
conf.PathConfig.LoadFileType = LoadFileType
}
if len(etcPath) > 0 {
conf.PathConfig.Dirs["etc"] = etcPath
}
if len(dataPath) > 0 {
conf.PathConfig.Dirs["data"] = dataPath
}
if len(logPath) > 0 {
conf.PathConfig.Dirs["log"] = logPath
}
if len(pluginsPath) > 0 {
conf.PathConfig.Dirs["plugins"] = pluginsPath
}
}

func Main() {
server.StartUp(Version)
}
4 changes: 2 additions & 2 deletions deploy/packages/deb/debian/rules
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ PKG_VSN ?= develop
## note that it is necessary to use overlay_vars relative to .. as
## the generate command EXECUTES in rel/
build:
GO111MODULE=on CGO_ENABLED=1 go build -ldflags="-s -w -X main.Version=$(PKG_VSN) -X main.LoadFileType=absolute" -o kuiper cmd/kuiper/main.go
GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X main.Version=$(PKG_VSN) -X main.LoadFileType=absolute" -o kuiperd cmd/kuiperd/main.go
GO111MODULE=on CGO_ENABLED=1 go build -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=$(PKG_VSN) -X github.com/lf-edge/ekuiper/cmd.LoadFileType=absolute" -o kuiper cmd/kuiper/main.go
GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=$(PKG_VSN) -X github.com/lf-edge/ekuiper/cmd.LoadFileType=absolute" -o kuiperd cmd/kuiperd/main.go

clean:
dh_clean
Expand Down
4 changes: 2 additions & 2 deletions deploy/packages/rpm/kuiper.spec
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ A lightweight IoT edge analytics software

%build
cd %{_code_source}
GO111MODULE=on CGO_ENABLED=1 go build -ldflags="-s -w -X main.Version=%{_version}-%{_release} -X main.LoadFileType=absolute" -o %{_code_source}/kuiper %{_code_source}/cmd/kuiper/main.go
GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X main.Version=%{_version}-%{_release} -X main.LoadFileType=absolute" -o %{_code_source}/kuiperd %{_code_source}/cmd/kuiperd/main.go
GO111MODULE=on CGO_ENABLED=1 go build -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=%{_version}-%{_release} -X github.com/lf-edge/ekuiper/cmd.LoadFileType=absolute" -o %{_code_source}/kuiper %{_code_source}/cmd/kuiper/main.go
GO111MODULE=on CGO_ENABLED=1 go build -trimpath -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=%{_version}-%{_release} -X github.com/lf-edge/ekuiper/cmd.LoadFileType=absolute" -o %{_code_source}/kuiperd %{_code_source}/cmd/kuiperd/main.go
cd -

%install
Expand Down
2 changes: 1 addition & 1 deletion docs/en_US/installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ For example, to cross build ARM64 binaries in AMD64 ubuntu/debian machine, do th
2. Update the Makefile in the build command. Examples:

```shell
GO111MODULE=on CGO_ENABLED=1 GOOS=linux GOARCH=arm64 CC=aarch64-linux-gnu-gcc go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -o kuiperd cmd/kuiperd/main.go
GO111MODULE=on CGO_ENABLED=1 GOOS=linux GOARCH=arm64 CC=aarch64-linux-gnu-gcc go build -trimpath -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=$(VERSION) -X github.com/lf-edge/ekuiper/cmd.LoadFileType=relative" -o kuiperd cmd/kuiperd/main.go
```

3. Run `make`
Expand Down
2 changes: 1 addition & 1 deletion docs/zh_CN/installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ sqlite,因此 `CGO_ENABLE` 必须设置为1。在交叉编译时,必须安
2. 更新 Makefile 里的编译相关参数如下:

```shell
GO111MODULE=on CGO_ENABLED=1 GOOS=linux GOARCH=arm64 CC=aarch64-linux-gnu-gcc go build -trimpath -ldflags="-s -w -X main.Version=$(VERSION) -X main.LoadFileType=relative" -o kuiperd cmd/kuiperd/main.go
GO111MODULE=on CGO_ENABLED=1 GOOS=linux GOARCH=arm64 CC=aarch64-linux-gnu-gcc go build -trimpath -ldflags="-s -w -X github.com/lf-edge/ekuiper/cmd.Version=$(VERSION) -X github.com/lf-edge/ekuiper/cmd.LoadFileType=relative" -o kuiperd cmd/kuiperd/main.go
```

3. 运行 `make`
Expand Down
65 changes: 28 additions & 37 deletions internal/binder/io/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,75 +25,66 @@ import (
"github.com/lf-edge/ekuiper/internal/io/websocket"
plugin2 "github.com/lf-edge/ekuiper/internal/plugin"
"github.com/lf-edge/ekuiper/pkg/api"
"github.com/lf-edge/ekuiper/pkg/modules"
)

type (
NewSourceFunc func() api.Source
NewLookupSourceFunc func() api.LookupSource
NewSinkFunc func() api.Sink
)
func init() {
modules.RegisterSource("mqtt", func() api.Source { return &mqtt.SourceConnector{} })
modules.RegisterSource("httppull", func() api.Source { return &http.PullSource{} })
modules.RegisterSource("httppush", func() api.Source { return &http.PushSource{} })
modules.RegisterSource("file", func() api.Source { return &file.FileSource{} })
modules.RegisterSource("memory", func() api.Source { return memory.GetSource() })
modules.RegisterSource("neuron", func() api.Source { return neuron.GetSource() })
modules.RegisterSource("websocket", func() api.Source { return &websocket.WebsocketSource{} })
modules.RegisterSource("simulator", func() api.Source { return &simulator.Source{} })

var (
sources = map[string]NewSourceFunc{
"mqtt": func() api.Source { return &mqtt.SourceConnector{} },
"httppull": func() api.Source { return &http.PullSource{} },
"httppush": func() api.Source { return &http.PushSource{} },
"file": func() api.Source { return &file.FileSource{} },
"memory": func() api.Source { return memory.GetSource() },
"neuron": func() api.Source { return neuron.GetSource() },
"websocket": func() api.Source { return &websocket.WebsocketSource{} },
"simulator": func() api.Source { return &simulator.Source{} },
}
sinks = map[string]NewSinkFunc{
"log": sink.NewLogSink,
"logToMemory": sink.NewLogSinkToMemory,
"mqtt": func() api.Sink { return &mqtt.MQTTSink{} },
"rest": func() api.Sink { return &http.RestSink{} },
"nop": func() api.Sink { return &sink.NopSink{} },
"memory": func() api.Sink { return memory.GetSink() },
"neuron": func() api.Sink { return neuron.GetSink() },
"file": func() api.Sink { return file.File() },
"websocket": func() api.Sink { return &websocket.WebSocketSink{} },
}
lookupSources = map[string]NewLookupSourceFunc{
"memory": func() api.LookupSource { return memory.GetLookupSource() },
"httppull": func() api.LookupSource { return http.GetLookUpSource() },
}
)
modules.RegisterSink("log", sink.NewLogSink)
modules.RegisterSink("logToMemory", sink.NewLogSinkToMemory)
modules.RegisterSink("mqtt", func() api.Sink { return &mqtt.MQTTSink{} })
modules.RegisterSink("rest", func() api.Sink { return &http.RestSink{} })
modules.RegisterSink("nop", func() api.Sink { return &sink.NopSink{} })
modules.RegisterSink("memory", func() api.Sink { return memory.GetSink() })
modules.RegisterSink("neuron", func() api.Sink { return neuron.GetSink() })
modules.RegisterSink("file", func() api.Sink { return file.File() })
modules.RegisterSink("websocket", func() api.Sink { return &websocket.WebSocketSink{} })

modules.RegisterLookupSource("memory", func() api.LookupSource { return memory.GetLookupSource() })
modules.RegisterLookupSource("httppull", func() api.LookupSource { return http.GetLookUpSource() })
}

type Manager struct{}

func (m *Manager) Source(name string) (api.Source, error) {
if s, ok := sources[name]; ok {
if s, ok := modules.Sources[name]; ok {
return s(), nil
}
return nil, nil
}

func (m *Manager) SourcePluginInfo(name string) (plugin2.EXTENSION_TYPE, string, string) {
if _, ok := sources[name]; ok {
if _, ok := modules.Sources[name]; ok {
return plugin2.INTERNAL, "", ""
} else {
return plugin2.NONE_EXTENSION, "", ""
}
}

func (m *Manager) LookupSource(name string) (api.LookupSource, error) {
if s, ok := lookupSources[name]; ok {
if s, ok := modules.LookupSources[name]; ok {
return s(), nil
}
return nil, nil
}

func (m *Manager) Sink(name string) (api.Sink, error) {
if s, ok := sinks[name]; ok {
if s, ok := modules.Sinks[name]; ok {
return s(), nil
}
return nil, nil
}

func (m *Manager) SinkPluginInfo(name string) (plugin2.EXTENSION_TYPE, string, string) {
if _, ok := sinks[name]; ok {
if _, ok := modules.Sinks[name]; ok {
return plugin2.INTERNAL, "", ""
} else {
return plugin2.NONE_EXTENSION, "", ""
Expand Down
8 changes: 5 additions & 3 deletions internal/binder/io/builtin_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 EMQ Technologies Co., Ltd.
// Copyright 2023-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -18,11 +18,13 @@ import (
"testing"

"github.com/stretchr/testify/require"

"github.com/lf-edge/ekuiper/pkg/modules"
)

func TestLookupSources(t *testing.T) {
_, ok := lookupSources["memory"]
_, ok := modules.LookupSources["memory"]
require.True(t, ok)
_, ok = lookupSources["httppull"]
_, ok = modules.LookupSources["httppull"]
require.True(t, ok)
}
7 changes: 4 additions & 3 deletions internal/binder/io/ext_edgex.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2023 EMQ Technologies Co., Ltd.
// Copyright 2021-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -19,9 +19,10 @@ package io
import (
"github.com/lf-edge/ekuiper/internal/io/edgex"
"github.com/lf-edge/ekuiper/pkg/api"
"github.com/lf-edge/ekuiper/pkg/modules"
)

func init() {
sources["edgex"] = func() api.Source { return &edgex.EdgexSource{} }
sinks["edgex"] = func() api.Sink { return &edgex.EdgexMsgBusSink{} }
modules.RegisterSource("edgex", func() api.Source { return &edgex.EdgexSource{} })
modules.RegisterSink("edgex", func() api.Sink { return &edgex.EdgexMsgBusSink{} })
}
Loading

0 comments on commit 264bfef

Please sign in to comment.