Skip to content

Commit

Permalink
feat: revise websocket connection endpoint (#2420)
Browse files Browse the repository at this point in the history
Signed-off-by: yisaer <[email protected]>
  • Loading branch information
Yisaer committed Nov 14, 2023
1 parent 4a0bd4e commit dcbd01b
Show file tree
Hide file tree
Showing 11 changed files with 157 additions and 30 deletions.
45 changes: 45 additions & 0 deletions docs/en_US/api/restapi/connection.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Manage websocket connection

Manage websocket endpoint connection in eKuiper through REST API

## create websocket endpoint

```shell
POST http://localhost:9081/connection/websocket
```

example:

```json
{
"endpoint": "/xxxx"
}
```

## delete websocket endpoint

```shell
DELETE http://localhost:9081/connection/websocket
```

example:

```json
{
"endpoint": "/xxxx"
}
```

## query websocket endpoint

```shell
GET http://localhost:9081/connection/websocket
```

example:

```json
{
"endpoint": "/xxxx"
}
```
13 changes: 8 additions & 5 deletions docs/en_US/guide/sinks/builtin/websocket.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ The action is used for publishing output message into websocket channel.

| Property name | Optional | Description |
| addr | false | The address of the websocket sink server, like: 127.0.0.1:8080 |
| path | false | The url path of the websocket sink server, like: /api/data |
| insecureSkipVerify | true | whether to ignore SSL verification |
| certificationPath | true | websocket client ssl verification crt file path |
| privateKeyPath | true | Key file path for websocket client SSL verification |
| rootCaPath | true | websocket client ssl verified ca certificate file path |
| path | true | The url path of the websocket sink server, like: /api/data |
| insecureSkipVerify | false | whether to ignore SSL verification |
| certificationPath | false | websocket client ssl verification crt file path |
| privateKeyPath | false | Key file path for websocket client SSL verification |
| rootCaPath | false | websocket client ssl verified ca certificate file path |
| checkConnection | false | check wehther websocket connection exists |

Other common sink properties are supported. Please refer to the [sink common properties](../overview.md#common-properties) for more information.

Expand All @@ -22,6 +23,8 @@ When the websocket sink defines both addr and path, eKuiper will act as a websoc

When the websocket sink only defines path and addr is empty, eKuiper will serve as the websocket server and wait for the remote websocket connection to be established and push the message through the connection.

When `checkConnection` is true, when creating a rule we need to ensure that the corresponding websocket enpoint has been established and the websocket connection has been established in order to successfully create the rule. We can [manage websocket endpoint](../../../api/restapi/connection.md/#manage-websocket-connection) in eKuiper through REST API.

### Server Configuration

To set up eKuiper as an Websocket endpoint, configure the server settings in `etc/sources/websocket.yaml`.
Expand Down
45 changes: 45 additions & 0 deletions docs/zh_CN/api/restapi/connection.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# websocket 连接管理

通过 API 管理 eKuiper websocket 的连接

## 创建 websocket endpoint

```shell
POST http://localhost:9081/connection/websocket
```

请求示例:

```json
{
"endpoint": "/xxxx"
}
```

## 删除 websocket endpoint

```shell
DELETE http://localhost:9081/connection/websocket
```

请求示例:

```json
{
"endpoint": "/xxxx"
}
```

## 查看 websocket endpoint

```shell
GET http://localhost:9081/connection/websocket
```

请求示例:

```json
{
"endpoint": "/xxxx"
}
```
13 changes: 8 additions & 5 deletions docs/zh_CN/guide/sinks/builtin/websocket.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@

| 属性名称 | 是否必填 | 说明 |
|--------------|------|------------------------------------------|
| addr | | websocket server 的地址,如: 127.0.0.1:8080 |
| addr | | websocket server 的地址,如: 127.0.0.1:8080 |
| path || websocket server 的 url path,如: /api/data |
| insecureSkipVerify || 是否忽略 SSL 验证 |
| certificationPath || websocket 客户端 ssl 验证的 crt 文件路径 |
| privateKeyPath || websocket 客户端 ssl 验证的 key 文件路径 |
| rootCaPath || websocket 客户端 ssl 验证的 ca 证书文件路径 |
| insecureSkipVerify || 是否忽略 SSL 验证 |
| certificationPath || websocket 客户端 ssl 验证的 crt 文件路径 |
| privateKeyPath || websocket 客户端 ssl 验证的 key 文件路径 |
| rootCaPath || websocket 客户端 ssl 验证的 ca 证书文件路径 |
| checkConnection || 是否检查 websocket endpoint 已经存在连接 |

其他通用的 sink 属性也支持,请参阅[公共属性](../overview.md#公共属性)

Expand All @@ -21,6 +22,8 @@

当 websocket sink 只定义了 path 且 addr 为空时,eKuiper 将作为 websocket 服务端等待远方建立 websocket 连接,并将消息通过该连接推送。

`checkConnection` 为 true 时,建立规则时我们需要保证对应的 websocket enpoint 已经建立,并且已经建立起了 websocket 连接,才能成功创建规则。我们可以通过 REST API 的方式在 eKuiper 中[管理 websocket endpoint](../../../api/restapi/connection.md#websocket-连接管理)

### 服务器配置

服务器配置在 `etc/kuiper.yaml` 中的 `source` 部分。
Expand Down
10 changes: 10 additions & 0 deletions internal/io/http/httpserver/data_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,16 @@ func sendProcess(ctx api.StreamContext, c *websocket.Conn, endpoint string) {
}
}

func CheckWebsocketEndpoint(endpoint string) bool {
lock.Lock()
defer lock.Unlock()
if server == nil {
return false
}
_, ok := wsEndpointCtx[endpoint]
return ok
}

func RegisterWebSocketEndpoint(ctx api.StreamContext, endpoint string) (string, string, chan struct{}, error) {
lock.Lock()
defer lock.Unlock()
Expand Down
5 changes: 5 additions & 0 deletions internal/server/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"io"
"net/http"
"strconv"

"github.com/lf-edge/ekuiper/internal/io/http/httpserver"
"github.com/lf-edge/ekuiper/internal/topo/context"
Expand All @@ -45,6 +46,10 @@ func connectionHandler(w http.ResponseWriter, r *http.Request) {
return
}
switch r.Method {
case http.MethodGet:
exists := httpserver.CheckWebsocketEndpoint(cb.Endpoint)
w.WriteHeader(http.StatusOK)
w.Write([]byte(strconv.FormatBool(exists)))
case http.MethodPost:
_, _, _, err := httpserver.RegisterWebSocketEndpoint(context.Background(), cb.Endpoint)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/server/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func createRestServer(ip string, port int, needToken bool) *http.Server {
r.HandleFunc("/data/export", configurationExportHandler).Methods(http.MethodGet, http.MethodPost)
r.HandleFunc("/data/import", configurationImportHandler).Methods(http.MethodPost)
r.HandleFunc("/data/import/status", configurationStatusHandler).Methods(http.MethodGet)
r.HandleFunc("/connection/websocket", connectionHandler).Methods(http.MethodPost, http.MethodDelete)
r.HandleFunc("/connection/websocket", connectionHandler).Methods(http.MethodGet, http.MethodPost, http.MethodDelete)
// Register extended routes
for k, v := range components {
logger.Infof("register rest endpoint for component %s", k)
Expand Down
2 changes: 1 addition & 1 deletion internal/server/rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (suite *RestTestSuite) SetupTest() {
r.HandleFunc("/data/export", configurationExportHandler).Methods(http.MethodGet, http.MethodPost)
r.HandleFunc("/data/import", configurationImportHandler).Methods(http.MethodPost)
r.HandleFunc("/data/import/status", configurationStatusHandler).Methods(http.MethodGet)
r.HandleFunc("/connection/websocket", connectionHandler).Methods(http.MethodPost, http.MethodDelete)
r.HandleFunc("/connection/websocket", connectionHandler).Methods(http.MethodGet, http.MethodPost, http.MethodDelete)
suite.r = r
}

Expand Down
9 changes: 5 additions & 4 deletions internal/topo/connection/clients/websocket/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ import (
)

type WebSocketConnectionConfig struct {
Addr string `json:"addr"`
Path string `json:"path"`
MaxConnRetry int `json:"maxConnRetry"`
tlsConfig *tls.Config
Addr string `json:"addr"`
Path string `json:"path"`
MaxConnRetry int `json:"maxConnRetry"`
CheckConnection bool `json:"checkConnection"`
tlsConfig *tls.Config
}

type tlsConf struct {
Expand Down
31 changes: 21 additions & 10 deletions internal/topo/connection/clients/websocket/websocket_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,17 @@ import (
"github.com/lf-edge/ekuiper/internal/io/http/httpserver"
"github.com/lf-edge/ekuiper/internal/io/memory/pubsub"
"github.com/lf-edge/ekuiper/internal/topo/connection/clients"
"github.com/lf-edge/ekuiper/internal/topo/context"
"github.com/lf-edge/ekuiper/pkg/api"
)

type websocketServerConnWrapper struct {
endpoint string
recvTopic string
sendTopic string
connSelector string
done chan struct{}
endpoint string
recvTopic string
sendTopic string
connSelector string
done chan struct{}
checkConnection bool

sync.RWMutex
isFinished bool
Expand All @@ -44,12 +46,21 @@ func (wsw *websocketServerConnWrapper) isFinish() bool {
}

func newWebsocketServerConnWrapper(config *WebSocketConnectionConfig) (clients.ClientWrapper, error) {
recvTopic, sendTopic, done, err := httpserver.GetWebsocketEndpointCh(config.Path)
if err != nil {
return nil, err
if config.CheckConnection {
recvTopic, sendTopic, done, err := httpserver.GetWebsocketEndpointCh(config.Path)
if err != nil {
return nil, err
}
wsw := &websocketServerConnWrapper{endpoint: config.Path, recvTopic: recvTopic, sendTopic: sendTopic, done: done, refCount: 1, checkConnection: true}
return wsw, nil
} else {
recvTopic, sendTopic, done, err := httpserver.RegisterWebSocketEndpoint(context.Background(), config.Path)
if err != nil {
return nil, err
}
wsw := &websocketServerConnWrapper{endpoint: config.Path, recvTopic: recvTopic, sendTopic: sendTopic, done: done, refCount: 1, checkConnection: false}
return wsw, nil
}
wsw := &websocketServerConnWrapper{endpoint: config.Path, recvTopic: recvTopic, sendTopic: sendTopic, done: done, refCount: 1}
return wsw, nil
}

func (wsw *websocketServerConnWrapper) process(ctx api.StreamContext, subChan []api.TopicChannel, messageErrors chan error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,21 @@ var (
)

func TestWebsocketServerConn(t *testing.T) {
conf.InitConf()
// no endpoint, create client failed
_, err := newWebsocketServerConnWrapper(&WebSocketConnectionConfig{Path: "/ws3"})
_, err := newWebsocketServerConnWrapper(&WebSocketConnectionConfig{Path: "/ws3", CheckConnection: true})
require.Error(t, err)

conf.InitConf()
// no endpoint, create client success
_, err = newWebsocketServerConnWrapper(&WebSocketConnectionConfig{Path: "/ws3", CheckConnection: false})
require.NoError(t, err)

ctx := context.NewMockContext("123", "123")
_, _, _, err = httpserver.RegisterWebSocketEndpoint(ctx, "/ws3")
require.NoError(t, err)

// no connection, create client failed
_, err = newWebsocketServerConnWrapper(&WebSocketConnectionConfig{Path: "/ws3"})
_, err = newWebsocketServerConnWrapper(&WebSocketConnectionConfig{Path: "/ws3", CheckConnection: true})
require.Error(t, err)

c, err := createOneConn()
Expand All @@ -54,7 +58,7 @@ func TestWebsocketServerConn(t *testing.T) {
serverPubCh = make(chan map[string]interface{})

conf.InitConf()
cli, err := newWebsocketServerConnWrapper(&WebSocketConnectionConfig{Path: "/ws3"})
cli, err := newWebsocketServerConnWrapper(&WebSocketConnectionConfig{Path: "/ws3", CheckConnection: true})
require.NoError(t, err)
require.NotNil(t, cli)

Expand Down

0 comments on commit dcbd01b

Please sign in to comment.