Skip to content

Commit

Permalink
fix: fix websocket reopen connection (#2444)
Browse files Browse the repository at this point in the history
Signed-off-by: yisaer <[email protected]>
  • Loading branch information
Yisaer committed Nov 28, 2023
1 parent d7766c9 commit 5cb22b3
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 2 deletions.
5 changes: 3 additions & 2 deletions internal/io/http/httpserver/data_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func recvProcess(ctx api.StreamContext, c *websocket.Conn, endpoint string) {
if r := recover(); r != nil {
conf.Log.Infof("websocket recvProcess Process panic recovered, err:%v", r)
}
conf.Log.Infof("websocket endpoint %v remove stop recvProcess", endpoint)
conf.Log.Infof("websocket endpoint %v stop recvProcess", endpoint)
}()
conf.Log.Infof("websocket endpoint %v start recvProcess", endpoint)
topic := fmt.Sprintf("recv/%s/%s", WebsocketTopicPrefix, endpoint)
Expand Down Expand Up @@ -203,6 +203,7 @@ func sendProcess(ctx api.StreamContext, c *websocket.Conn, endpoint string) {
conf.Log.Infof("websocket endpoint %v start sendProcess", endpoint)
topic := fmt.Sprintf("send/%s/%s", WebsocketTopicPrefix, endpoint)
subCh := pubsub.CreateSub(topic, nil, "", 1024)
defer pubsub.CloseSourceConsumerChannel(topic, "")
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -260,13 +261,13 @@ func RegisterWebSocketEndpoint(ctx api.StreamContext, endpoint string) (string,
pubsub.CreatePub(recvTopic)
sendTopic := fmt.Sprintf("send/%s/%s", WebsocketTopicPrefix, endpoint)
pubsub.CreatePub(sendTopic)
subCtx, cancel := ctx.WithCancel()
router.HandleFunc(endpoint, func(w http.ResponseWriter, r *http.Request) {
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
conf.Log.Errorf("websocket upgrade error: %v", err)
return
}
subCtx, cancel := ctx.WithCancel()
wsCtx.addConn(c, cancel)
conf.Log.Infof("websocket endpint %v create connection", endpoint)
go recvProcess(subCtx, c, endpoint)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ func TestWebsocketServerConn(t *testing.T) {

c, err := createOneConn(t)
require.NoError(t, err)
c.Close()
// wait previous connection goroutine closed
time.Sleep(10 * time.Millisecond)
c, err = createOneConn(t)
require.NoError(t, err)

serverRecvCh = make(chan map[string]interface{})
serverPubCh = make(chan map[string]interface{})
Expand Down

0 comments on commit 5cb22b3

Please sign in to comment.