Skip to content

Commit

Permalink
Fixing races (#53)
Browse files Browse the repository at this point in the history
* Fixing races

During work with library I encounted couple races

For example:
panic: runtime error: slice bounds out of range [:1026] with capacity 1024

goroutine 125171 [running]:
bufio.(*Reader).Peek(0xc012610de0, 0x2)
	/usr/local/go/src/bufio/bufio.go:165 +0x16a
github.com/gorilla/websocket.(*Conn).read(0xc01210cc60, 0x0?)
	/go/pkg/mod/github.com/gorilla/[email protected]/conn.go:378 +0x26
github.com/gorilla/websocket.(*Conn).advanceFrame(0xc01210cc60)
	/go/pkg/mod/github.com/gorilla/[email protected]/conn.go:824 +0x6d
github.com/gorilla/websocket.(*Conn).NextReader(0xc01210cc60)
	/go/pkg/mod/github.com/gorilla/[email protected]/conn.go:1034 +0x13e
github.com/gorilla/websocket.(*Conn).ReadMessage(0xc00e924620?)
	/go/pkg/mod/github.com/gorilla/[email protected]/conn.go:1120 +0x13
github.com/aopoltorzhicky/go_kraken/websocket.(*Kraken).listenSocket(0xc0003ba0e0)
	/go/pkg/mod/github.com/aopoltorzhicky/[email protected]/websocket/kraken.go:198 +0x168
created by github.com/aopoltorzhicky/go_kraken/websocket.(*Kraken).managerThread in goroutine 82
	/go/pkg/mod/github.com/aopoltorzhicky/[email protected]/websocket/kraken.go:109 +0x227

So following patch should addres this problem.
Issues:
1. Multiple listenSocket methods
2. Possible lock leaks
3. Shouldn't be sends in parallel
4. Switch to close, multiple listeners

Refactor

* only one close
  • Loading branch information
kubmichael authored Jun 5, 2024
1 parent 314d408 commit 89349a3
Showing 1 changed file with 33 additions and 39 deletions.
72 changes: 33 additions & 39 deletions websocket/kraken.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,20 @@ type Kraken struct {
readTimeout time.Duration
heartbeatTimeout time.Duration

msg chan Update
connect chan struct{}
stop chan struct{}
msg chan Update
stop chan struct{}

wg sync.WaitGroup
lock sync.RWMutex
}

// New -
// NewKraken -
func NewKraken(url string, opts ...KrakenOption) *Kraken {
kraken := Kraken{
url: url,
reconnectTimeout: 5 * time.Second,
readTimeout: 15 * time.Second,
heartbeatTimeout: 10 * time.Second,
subscriptions: make(map[int64]*SubscriptionStatus),
connect: make(chan struct{}, 1),
msg: make(chan Update, 1024),
stop: make(chan struct{}, 1),
}
Expand All @@ -53,15 +51,10 @@ func NewKraken(url string, opts ...KrakenOption) *Kraken {

// Connect to the Kraken API, this should only be called once.
func (k *Kraken) Connect() error {
k.wg.Add(1)
go k.managerThread()

if err := k.dial(); err != nil {
return err
}

k.wg.Add(1)
go k.listenSocket()
go k.managerThread()

return nil
}
Expand All @@ -85,38 +78,43 @@ func (k *Kraken) dial() error {
}

func (k *Kraken) managerThread() {
defer k.wg.Done()

heartbeat := time.NewTicker(k.heartbeatTimeout)
defer heartbeat.Stop()

connect := make(chan struct{})
stopListener := make(chan struct{})
reconnectCh := make(chan struct{})
go k.listenSocket(stopListener, reconnectCh)

for {
select {
case <-k.stop:
return
case <-k.connect:
case <-connect:
time.Sleep(k.reconnectTimeout)

log.Warnf("reconnecting...")

if err := k.dial(); err != nil {
log.Error(err)
k.connect <- struct{}{}
connect <- struct{}{}
continue
}

k.wg.Add(1)
go k.listenSocket()

if err := k.resubscribe(); err != nil {
log.Error(err)
}

stopListener = make(chan struct{})
reconnectCh = make(chan struct{})
go k.listenSocket(stopListener, reconnectCh)
case <-reconnectCh:
connect <- struct{}{}
case <-k.stop:
return
case <-heartbeat.C:
if err := k.send(PingRequest{
Event: EventPing,
}); err != nil {
log.Println(err)
k.connect <- struct{}{}
close(stopListener)
connect <- struct{}{}
}
}
}
Expand Down Expand Up @@ -149,11 +147,6 @@ func (k *Kraken) Listen() <-chan Update {

// Close - provides an interface for a user initiated shutdown.
func (k *Kraken) Close() error {
for i := 0; i < 2; i++ {
k.stop <- struct{}{}
}
k.wg.Wait()

if k.conn != nil {
if err := k.conn.Close(); err != nil {
return err
Expand All @@ -162,11 +155,12 @@ func (k *Kraken) Close() error {

close(k.stop)
close(k.msg)
close(k.connect)
return nil
}

func (k *Kraken) send(msg interface{}) error {
k.lock.Lock()
defer k.lock.Unlock()
if k.conn == nil {
return nil
}
Expand All @@ -178,14 +172,14 @@ func (k *Kraken) send(msg interface{}) error {
return k.conn.WriteMessage(websocket.TextMessage, data)
}

func (k *Kraken) listenSocket() {
defer k.wg.Done()

if k.conn == nil {
func (k *Kraken) listenSocket(stop chan struct{}, reconnectCh chan struct{}) {
defer close(reconnectCh)
conn := k.conn
if conn == nil {
return
}

if err := k.conn.SetReadDeadline(time.Now().Add(k.readTimeout)); err != nil {
if err := conn.SetReadDeadline(time.Now().Add(k.readTimeout)); err != nil {
log.Error(err)
return
}
Expand All @@ -194,17 +188,17 @@ func (k *Kraken) listenSocket() {
select {
case <-k.stop:
return
case <-stop:
return
default:
_, msg, err := k.conn.ReadMessage()
_, msg, err := conn.ReadMessage()
if err != nil {
log.Error(err)
k.connect <- struct{}{}
return
}

if err := k.conn.SetReadDeadline(time.Now().Add(k.readTimeout)); err != nil {
if err := conn.SetReadDeadline(time.Now().Add(k.readTimeout)); err != nil {
log.Error(err)
k.connect <- struct{}{}
return
}

Expand Down

0 comments on commit 89349a3

Please sign in to comment.