diff --git a/docs/en_US/api/restapi/connection.md b/docs/en_US/api/restapi/connection.md new file mode 100644 index 0000000000..3c2ec6319d --- /dev/null +++ b/docs/en_US/api/restapi/connection.md @@ -0,0 +1,45 @@ +# Manage websocket connection + +Manage websocket endpoint connection in eKuiper through REST API + +## create websocket endpoint + +```shell +POST http://localhost:9081/connection/websocket +``` + +example: + +```json +{ + "endpoint": "/xxxx" +} +``` + +## delete websocket endpoint + +```shell +DELETE http://localhost:9081/connection/websocket +``` + +example: + +```json +{ + "endpoint": "/xxxx" +} +``` + +## query websocket endpoint + +```shell +GET http://localhost:9081/connection/websocket +``` + +example: + +```json +{ + "endpoint": "/xxxx" +} +``` diff --git a/docs/en_US/guide/sinks/builtin/websocket.md b/docs/en_US/guide/sinks/builtin/websocket.md index faff4f11ff..69452116b7 100644 --- a/docs/en_US/guide/sinks/builtin/websocket.md +++ b/docs/en_US/guide/sinks/builtin/websocket.md @@ -6,11 +6,12 @@ The action is used for publishing output message into websocket channel. | Property name | Optional | Description | | addr | false | The address of the websocket sink server, like: 127.0.0.1:8080 | -| path | false | The url path of the websocket sink server, like: /api/data | -| insecureSkipVerify | true | whether to ignore SSL verification | -| certificationPath | true | websocket client ssl verification crt file path | -| privateKeyPath | true | Key file path for websocket client SSL verification | -| rootCaPath | true | websocket client ssl verified ca certificate file path | +| path | true | The url path of the websocket sink server, like: /api/data | +| insecureSkipVerify | false | whether to ignore SSL verification | +| certificationPath | false | websocket client ssl verification crt file path | +| privateKeyPath | false | Key file path for websocket client SSL verification | +| rootCaPath | false | websocket client ssl verified ca certificate file path | +| checkConnection | false | check wehther websocket connection exists | Other common sink properties are supported. Please refer to the [sink common properties](../overview.md#common-properties) for more information. @@ -22,6 +23,8 @@ When the websocket sink defines both addr and path, eKuiper will act as a websoc When the websocket sink only defines path and addr is empty, eKuiper will serve as the websocket server and wait for the remote websocket connection to be established and push the message through the connection. +When `checkConnection` is true, when creating a rule we need to ensure that the corresponding websocket enpoint has been established and the websocket connection has been established in order to successfully create the rule. We can [manage websocket endpoint](../../../api/restapi/connection.md/#manage-websocket-connection) in eKuiper through REST API. + ### Server Configuration To set up eKuiper as an Websocket endpoint, configure the server settings in `etc/sources/websocket.yaml`. diff --git a/docs/zh_CN/api/restapi/connection.md b/docs/zh_CN/api/restapi/connection.md new file mode 100644 index 0000000000..3367904439 --- /dev/null +++ b/docs/zh_CN/api/restapi/connection.md @@ -0,0 +1,45 @@ +# websocket 连接管理 + +通过 API 管理 eKuiper websocket 的连接 + +## 创建 websocket endpoint + +```shell +POST http://localhost:9081/connection/websocket +``` + +请求示例: + +```json +{ + "endpoint": "/xxxx" +} +``` + +## 删除 websocket endpoint + +```shell +DELETE http://localhost:9081/connection/websocket +``` + +请求示例: + +```json +{ + "endpoint": "/xxxx" +} +``` + +## 查看 websocket endpoint + +```shell +GET http://localhost:9081/connection/websocket +``` + +请求示例: + +```json +{ + "endpoint": "/xxxx" +} +``` diff --git a/docs/zh_CN/guide/sinks/builtin/websocket.md b/docs/zh_CN/guide/sinks/builtin/websocket.md index f455a866d6..6d7dea239d 100644 --- a/docs/zh_CN/guide/sinks/builtin/websocket.md +++ b/docs/zh_CN/guide/sinks/builtin/websocket.md @@ -4,12 +4,13 @@ | 属性名称 | 是否必填 | 说明 | |--------------|------|------------------------------------------| -| addr | 是 | websocket server 的地址,如: 127.0.0.1:8080 | +| addr | 否 | websocket server 的地址,如: 127.0.0.1:8080 | | path | 是 | websocket server 的 url path,如: /api/data | -| insecureSkipVerify | 是 | 是否忽略 SSL 验证 | -| certificationPath | 是 | websocket 客户端 ssl 验证的 crt 文件路径 | -| privateKeyPath | 是 | websocket 客户端 ssl 验证的 key 文件路径 | -| rootCaPath | 是 | websocket 客户端 ssl 验证的 ca 证书文件路径 | +| insecureSkipVerify | 否 | 是否忽略 SSL 验证 | +| certificationPath | 否 | websocket 客户端 ssl 验证的 crt 文件路径 | +| privateKeyPath | 否 | websocket 客户端 ssl 验证的 key 文件路径 | +| rootCaPath | 否 | websocket 客户端 ssl 验证的 ca 证书文件路径 | +| checkConnection | 否 | 是否检查 websocket endpoint 已经存在连接 | 其他通用的 sink 属性也支持,请参阅[公共属性](../overview.md#公共属性)。 @@ -21,6 +22,8 @@ 当 websocket sink 只定义了 path 且 addr 为空时,eKuiper 将作为 websocket 服务端等待远方建立 websocket 连接,并将消息通过该连接推送。 +当 `checkConnection` 为 true 时,建立规则时我们需要保证对应的 websocket enpoint 已经建立,并且已经建立起了 websocket 连接,才能成功创建规则。我们可以通过 REST API 的方式在 eKuiper 中[管理 websocket endpoint](../../../api/restapi/connection.md#websocket-连接管理)。 + ### 服务器配置 服务器配置在 `etc/kuiper.yaml` 中的 `source` 部分。 diff --git a/internal/io/http/httpserver/data_server.go b/internal/io/http/httpserver/data_server.go index 81fc454326..953de1e471 100644 --- a/internal/io/http/httpserver/data_server.go +++ b/internal/io/http/httpserver/data_server.go @@ -196,6 +196,16 @@ func sendProcess(ctx api.StreamContext, c *websocket.Conn, endpoint string) { } } +func CheckWebsocketEndpoint(endpoint string) bool { + lock.Lock() + defer lock.Unlock() + if server == nil { + return false + } + _, ok := wsEndpointCtx[endpoint] + return ok +} + func RegisterWebSocketEndpoint(ctx api.StreamContext, endpoint string) (string, string, chan struct{}, error) { lock.Lock() defer lock.Unlock() diff --git a/internal/server/connection.go b/internal/server/connection.go index 5e1dd23aae..1078eb56d5 100644 --- a/internal/server/connection.go +++ b/internal/server/connection.go @@ -19,6 +19,7 @@ import ( "errors" "io" "net/http" + "strconv" "github.com/lf-edge/ekuiper/internal/io/http/httpserver" "github.com/lf-edge/ekuiper/internal/topo/context" @@ -45,6 +46,10 @@ func connectionHandler(w http.ResponseWriter, r *http.Request) { return } switch r.Method { + case http.MethodGet: + exists := httpserver.CheckWebsocketEndpoint(cb.Endpoint) + w.WriteHeader(http.StatusOK) + w.Write([]byte(strconv.FormatBool(exists))) case http.MethodPost: _, _, _, err := httpserver.RegisterWebSocketEndpoint(context.Background(), cb.Endpoint) if err != nil { diff --git a/internal/server/rest.go b/internal/server/rest.go index a34301ef40..3afbcea6f7 100644 --- a/internal/server/rest.go +++ b/internal/server/rest.go @@ -166,7 +166,7 @@ func createRestServer(ip string, port int, needToken bool) *http.Server { r.HandleFunc("/data/export", configurationExportHandler).Methods(http.MethodGet, http.MethodPost) r.HandleFunc("/data/import", configurationImportHandler).Methods(http.MethodPost) r.HandleFunc("/data/import/status", configurationStatusHandler).Methods(http.MethodGet) - r.HandleFunc("/connection/websocket", connectionHandler).Methods(http.MethodPost, http.MethodDelete) + r.HandleFunc("/connection/websocket", connectionHandler).Methods(http.MethodGet, http.MethodPost, http.MethodDelete) // Register extended routes for k, v := range components { logger.Infof("register rest endpoint for component %s", k) diff --git a/internal/server/rest_test.go b/internal/server/rest_test.go index a4b6852e0a..4dd0691037 100644 --- a/internal/server/rest_test.go +++ b/internal/server/rest_test.go @@ -87,7 +87,7 @@ func (suite *RestTestSuite) SetupTest() { r.HandleFunc("/data/export", configurationExportHandler).Methods(http.MethodGet, http.MethodPost) r.HandleFunc("/data/import", configurationImportHandler).Methods(http.MethodPost) r.HandleFunc("/data/import/status", configurationStatusHandler).Methods(http.MethodGet) - r.HandleFunc("/connection/websocket", connectionHandler).Methods(http.MethodPost, http.MethodDelete) + r.HandleFunc("/connection/websocket", connectionHandler).Methods(http.MethodGet, http.MethodPost, http.MethodDelete) suite.r = r } diff --git a/internal/topo/connection/clients/websocket/websocket.go b/internal/topo/connection/clients/websocket/websocket.go index e120380e5a..e82e9db9ad 100644 --- a/internal/topo/connection/clients/websocket/websocket.go +++ b/internal/topo/connection/clients/websocket/websocket.go @@ -28,10 +28,11 @@ import ( ) type WebSocketConnectionConfig struct { - Addr string `json:"addr"` - Path string `json:"path"` - MaxConnRetry int `json:"maxConnRetry"` - tlsConfig *tls.Config + Addr string `json:"addr"` + Path string `json:"path"` + MaxConnRetry int `json:"maxConnRetry"` + CheckConnection bool `json:"checkConnection"` + tlsConfig *tls.Config } type tlsConf struct { diff --git a/internal/topo/connection/clients/websocket/websocket_server.go b/internal/topo/connection/clients/websocket/websocket_server.go index 20f101d12f..70b28f9402 100644 --- a/internal/topo/connection/clients/websocket/websocket_server.go +++ b/internal/topo/connection/clients/websocket/websocket_server.go @@ -22,15 +22,17 @@ import ( "github.com/lf-edge/ekuiper/internal/io/http/httpserver" "github.com/lf-edge/ekuiper/internal/io/memory/pubsub" "github.com/lf-edge/ekuiper/internal/topo/connection/clients" + "github.com/lf-edge/ekuiper/internal/topo/context" "github.com/lf-edge/ekuiper/pkg/api" ) type websocketServerConnWrapper struct { - endpoint string - recvTopic string - sendTopic string - connSelector string - done chan struct{} + endpoint string + recvTopic string + sendTopic string + connSelector string + done chan struct{} + checkConnection bool sync.RWMutex isFinished bool @@ -44,12 +46,21 @@ func (wsw *websocketServerConnWrapper) isFinish() bool { } func newWebsocketServerConnWrapper(config *WebSocketConnectionConfig) (clients.ClientWrapper, error) { - recvTopic, sendTopic, done, err := httpserver.GetWebsocketEndpointCh(config.Path) - if err != nil { - return nil, err + if config.CheckConnection { + recvTopic, sendTopic, done, err := httpserver.GetWebsocketEndpointCh(config.Path) + if err != nil { + return nil, err + } + wsw := &websocketServerConnWrapper{endpoint: config.Path, recvTopic: recvTopic, sendTopic: sendTopic, done: done, refCount: 1, checkConnection: true} + return wsw, nil + } else { + recvTopic, sendTopic, done, err := httpserver.RegisterWebSocketEndpoint(context.Background(), config.Path) + if err != nil { + return nil, err + } + wsw := &websocketServerConnWrapper{endpoint: config.Path, recvTopic: recvTopic, sendTopic: sendTopic, done: done, refCount: 1, checkConnection: false} + return wsw, nil } - wsw := &websocketServerConnWrapper{endpoint: config.Path, recvTopic: recvTopic, sendTopic: sendTopic, done: done, refCount: 1} - return wsw, nil } func (wsw *websocketServerConnWrapper) process(ctx api.StreamContext, subChan []api.TopicChannel, messageErrors chan error) { diff --git a/internal/topo/connection/clients/websocket/websocket_server_test.go b/internal/topo/connection/clients/websocket/websocket_server_test.go index d7d4e8c23c..1e7c1cf562 100644 --- a/internal/topo/connection/clients/websocket/websocket_server_test.go +++ b/internal/topo/connection/clients/websocket/websocket_server_test.go @@ -34,17 +34,21 @@ var ( ) func TestWebsocketServerConn(t *testing.T) { + conf.InitConf() // no endpoint, create client failed - _, err := newWebsocketServerConnWrapper(&WebSocketConnectionConfig{Path: "/ws3"}) + _, err := newWebsocketServerConnWrapper(&WebSocketConnectionConfig{Path: "/ws3", CheckConnection: true}) require.Error(t, err) - conf.InitConf() + // no endpoint, create client success + _, err = newWebsocketServerConnWrapper(&WebSocketConnectionConfig{Path: "/ws3", CheckConnection: false}) + require.NoError(t, err) + ctx := context.NewMockContext("123", "123") _, _, _, err = httpserver.RegisterWebSocketEndpoint(ctx, "/ws3") require.NoError(t, err) // no connection, create client failed - _, err = newWebsocketServerConnWrapper(&WebSocketConnectionConfig{Path: "/ws3"}) + _, err = newWebsocketServerConnWrapper(&WebSocketConnectionConfig{Path: "/ws3", CheckConnection: true}) require.Error(t, err) c, err := createOneConn() @@ -54,7 +58,7 @@ func TestWebsocketServerConn(t *testing.T) { serverPubCh = make(chan map[string]interface{}) conf.InitConf() - cli, err := newWebsocketServerConnWrapper(&WebSocketConnectionConfig{Path: "/ws3"}) + cli, err := newWebsocketServerConnWrapper(&WebSocketConnectionConfig{Path: "/ws3", CheckConnection: true}) require.NoError(t, err) require.NotNil(t, cli)