Skip to content

Commit

Permalink
fix: concurrent write to websocket error
Browse files Browse the repository at this point in the history
  • Loading branch information
bitxeno committed Jul 1, 2024
1 parent 8f3983d commit a6000fe
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 39 deletions.
61 changes: 61 additions & 0 deletions internal/manager/websocket_managerr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package manager

import (
"context"

"github.com/bitxeno/atvloadly/internal/model"
"github.com/gofiber/contrib/websocket"
)

type WebsocketManager struct {
ctx context.Context
cancel context.CancelFunc
conn *websocket.Conn
chMsg chan string
}

func NewWebsocketManager(conn *websocket.Conn) *WebsocketManager {
ctx, cancel := context.WithCancel(context.Background())
mgr := &WebsocketManager{
ctx: ctx,
cancel: cancel,
conn: conn,
chMsg: make(chan string, 100),
}

go mgr.runWriteMessage()

return mgr
}

func (mgr *WebsocketManager) runWriteMessage() {
for {
select {
case <-mgr.ctx.Done():
return
case msg := <-mgr.chMsg:
_ = mgr.conn.WriteMessage(websocket.TextMessage, []byte(msg))
}
}
}

func (mgr *WebsocketManager) ReadMessage() (*model.Message, error) {
var msg model.Message
if err := mgr.conn.ReadJSON(&msg); err != nil {
return nil, err
}

return &msg, nil
}

func (mgr *WebsocketManager) WriteMessage(msg string) {
mgr.chMsg <- msg
}

func (mgr *WebsocketManager) Cancel() {
mgr.cancel()
}

func (mgr *WebsocketManager) Context() context.Context {
return mgr.ctx
}
6 changes: 2 additions & 4 deletions internal/model/message.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package model

import "encoding/json"

// Message.Type
const (
MessageTypeInstall = 1
Expand All @@ -13,6 +11,6 @@ const (

// Message Websocket Communication data format
type Message struct {
Type int `json:"t"`
Data json.RawMessage `json:"d"`
Type int `json:"t"`
Data string `json:"d"`
}
57 changes: 22 additions & 35 deletions internal/service/websocket.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package service

import (
"context"
"encoding/json"
"fmt"

Expand All @@ -12,35 +11,29 @@ import (
)

func HandleInstallMessage(c *websocket.Conn) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
websocketMgr := manager.NewWebsocketManager(c)
defer websocketMgr.Cancel()
installMgr := manager.NewInteractiveInstallManager()
installMgr.OnOutput(func(line string) {
_ = c.WriteMessage(websocket.TextMessage, []byte(line))
websocketMgr.WriteMessage(line)
})
defer installMgr.Close()

for {
var msg model.Message
if err := c.ReadJSON(&msg); err != nil {
msg, err := websocketMgr.ReadMessage()
if err != nil {
// websocket client close
if websocket.IsUnexpectedCloseError(err) || websocket.IsCloseError(err) {
return
}
log.Err(err).Msg("Read websocket message error: ")
return
}
var data string
if err := json.Unmarshal(msg.Data, &data); err != nil {
msg := fmt.Sprintf("ERROR: %s", err.Error())
_ = c.WriteMessage(websocket.TextMessage, []byte(msg))
return
}

switch msg.Type {
case model.MessageTypeInstall:
var v model.InstalledApp
err := json.Unmarshal([]byte(data), &v)
err := json.Unmarshal([]byte(msg.Data), &v)
if err != nil {
msg := fmt.Sprintf("ERROR: %s", err.Error())
_ = c.WriteMessage(websocket.TextMessage, []byte(msg))
Expand All @@ -52,9 +45,9 @@ func HandleInstallMessage(c *websocket.Conn) {
continue
}

go runInstallMessage(ctx, c, installMgr, v)
go runInstallMessage(websocketMgr, installMgr, v)
case model.MessageType2FA:
code := data
code := msg.Data
installMgr.Write([]byte(code + "\n"))
default:
_ = c.WriteMessage(websocket.TextMessage, []byte("ERROR: invalid message type"))
Expand All @@ -63,48 +56,42 @@ func HandleInstallMessage(c *websocket.Conn) {
}
}

func runInstallMessage(ctx context.Context, c *websocket.Conn, installMgr *manager.InstallManager, v model.InstalledApp) {
err := installMgr.Start(ctx, v.UDID, v.Account, v.Password, v.IpaPath)
func runInstallMessage(mgr *manager.WebsocketManager, installMgr *manager.InstallManager, v model.InstalledApp) {
err := installMgr.Start(mgr.Context(), v.UDID, v.Account, v.Password, v.IpaPath)
if err != nil {
msg := fmt.Sprintf("ERROR: %s", err.Error())
_ = c.WriteMessage(websocket.TextMessage, []byte(msg))
mgr.WriteMessage(msg)
return
}
log.Infof("install exit: %s", v.IpaPath)
}

func HandlePairMessage(c *websocket.Conn) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
websocketMgr := manager.NewWebsocketManager(c)
defer websocketMgr.Cancel()
pairMgr := manager.NewPairManager()
pairMgr.OnOutput(func(line string) {
_ = c.WriteMessage(websocket.TextMessage, []byte(line))
websocketMgr.WriteMessage(line)
})
defer pairMgr.Close()

for {
var msg model.Message
if err := c.ReadJSON(&msg); err != nil {
msg, err := websocketMgr.ReadMessage()
if err != nil {
// websocket client close
if websocket.IsUnexpectedCloseError(err) || websocket.IsCloseError(err) {
return
}
log.Err(err).Msg("Read websocket message error: ")
return
}
var data string
if err := json.Unmarshal(msg.Data, &data); err != nil {
msg := fmt.Sprintf("ERROR: %s", err.Error())
_ = c.WriteMessage(websocket.TextMessage, []byte(msg))
return
}

switch msg.Type {
case model.MessageTypePair:
udid := data
go runPairMessage(ctx, c, pairMgr, udid)
udid := msg.Data
go runPairMessage(websocketMgr, pairMgr, udid)
case model.MessageTypePairConfirm:
code := data
code := msg.Data
pairMgr.Write([]byte(code + "\n"))
default:
_ = c.WriteMessage(websocket.TextMessage, []byte("ERROR: invalid message type"))
Expand All @@ -113,11 +100,11 @@ func HandlePairMessage(c *websocket.Conn) {
}
}

func runPairMessage(ctx context.Context, c *websocket.Conn, pairMgr *manager.PairManager, udid string) {
err := pairMgr.Start(ctx, udid)
func runPairMessage(mgr *manager.WebsocketManager, pairMgr *manager.PairManager, udid string) {
err := pairMgr.Start(mgr.Context(), udid)
if err != nil {
msg := fmt.Sprintf("ERROR: %s", err.Error())
_ = c.WriteMessage(websocket.TextMessage, []byte(msg))
mgr.WriteMessage(msg)
return
}
}

0 comments on commit a6000fe

Please sign in to comment.