diff --git a/internal/io/http/httpserver/data_server.go b/internal/io/http/httpserver/data_server.go index d8fe80db0c..30b51040b9 100644 --- a/internal/io/http/httpserver/data_server.go +++ b/internal/io/http/httpserver/data_server.go @@ -156,7 +156,7 @@ func recvProcess(ctx api.StreamContext, c *websocket.Conn, endpoint string) { if r := recover(); r != nil { conf.Log.Infof("websocket recvProcess Process panic recovered, err:%v", r) } - conf.Log.Infof("websocket endpoint %v remove stop recvProcess", endpoint) + conf.Log.Infof("websocket endpoint %v stop recvProcess", endpoint) }() conf.Log.Infof("websocket endpoint %v start recvProcess", endpoint) topic := fmt.Sprintf("recv/%s/%s", WebsocketTopicPrefix, endpoint) @@ -203,6 +203,7 @@ func sendProcess(ctx api.StreamContext, c *websocket.Conn, endpoint string) { conf.Log.Infof("websocket endpoint %v start sendProcess", endpoint) topic := fmt.Sprintf("send/%s/%s", WebsocketTopicPrefix, endpoint) subCh := pubsub.CreateSub(topic, nil, "", 1024) + defer pubsub.CloseSourceConsumerChannel(topic, "") for { select { case <-ctx.Done(): @@ -260,13 +261,13 @@ func RegisterWebSocketEndpoint(ctx api.StreamContext, endpoint string) (string, pubsub.CreatePub(recvTopic) sendTopic := fmt.Sprintf("send/%s/%s", WebsocketTopicPrefix, endpoint) pubsub.CreatePub(sendTopic) - subCtx, cancel := ctx.WithCancel() router.HandleFunc(endpoint, func(w http.ResponseWriter, r *http.Request) { c, err := upgrader.Upgrade(w, r, nil) if err != nil { conf.Log.Errorf("websocket upgrade error: %v", err) return } + subCtx, cancel := ctx.WithCancel() wsCtx.addConn(c, cancel) conf.Log.Infof("websocket endpint %v create connection", endpoint) go recvProcess(subCtx, c, endpoint) diff --git a/internal/topo/connection/clients/websocket/websocket_server_test.go b/internal/topo/connection/clients/websocket/websocket_server_test.go index a248a11a80..6904d7c228 100644 --- a/internal/topo/connection/clients/websocket/websocket_server_test.go +++ b/internal/topo/connection/clients/websocket/websocket_server_test.go @@ -54,6 +54,11 @@ func TestWebsocketServerConn(t *testing.T) { c, err := createOneConn(t) require.NoError(t, err) + c.Close() + // wait previous connection goroutine closed + time.Sleep(10 * time.Millisecond) + c, err = createOneConn(t) + require.NoError(t, err) serverRecvCh = make(chan map[string]interface{}) serverPubCh = make(chan map[string]interface{})