diff --git a/websocket/kraken.go b/websocket/kraken.go index 2288130..d584a83 100644 --- a/websocket/kraken.go +++ b/websocket/kraken.go @@ -24,14 +24,13 @@ type Kraken struct { readTimeout time.Duration heartbeatTimeout time.Duration - msg chan Update - connect chan struct{} - stop chan struct{} + msg chan Update + stop chan struct{} - wg sync.WaitGroup + lock sync.RWMutex } -// New - +// NewKraken - func NewKraken(url string, opts ...KrakenOption) *Kraken { kraken := Kraken{ url: url, @@ -39,7 +38,6 @@ func NewKraken(url string, opts ...KrakenOption) *Kraken { readTimeout: 15 * time.Second, heartbeatTimeout: 10 * time.Second, subscriptions: make(map[int64]*SubscriptionStatus), - connect: make(chan struct{}, 1), msg: make(chan Update, 1024), stop: make(chan struct{}, 1), } @@ -53,15 +51,10 @@ func NewKraken(url string, opts ...KrakenOption) *Kraken { // Connect to the Kraken API, this should only be called once. func (k *Kraken) Connect() error { - k.wg.Add(1) - go k.managerThread() - if err := k.dial(); err != nil { return err } - - k.wg.Add(1) - go k.listenSocket() + go k.managerThread() return nil } @@ -85,38 +78,43 @@ func (k *Kraken) dial() error { } func (k *Kraken) managerThread() { - defer k.wg.Done() - heartbeat := time.NewTicker(k.heartbeatTimeout) defer heartbeat.Stop() + connect := make(chan struct{}) + stopListener := make(chan struct{}) + reconnectCh := make(chan struct{}) + go k.listenSocket(stopListener, reconnectCh) + for { select { - case <-k.stop: - return - case <-k.connect: + case <-connect: time.Sleep(k.reconnectTimeout) - log.Warnf("reconnecting...") - if err := k.dial(); err != nil { log.Error(err) - k.connect <- struct{}{} + connect <- struct{}{} continue } - k.wg.Add(1) - go k.listenSocket() - if err := k.resubscribe(); err != nil { log.Error(err) } + + stopListener = make(chan struct{}) + reconnectCh = make(chan struct{}) + go k.listenSocket(stopListener, reconnectCh) + case <-reconnectCh: + connect <- struct{}{} + case <-k.stop: + return case <-heartbeat.C: if err := k.send(PingRequest{ Event: EventPing, }); err != nil { log.Println(err) - k.connect <- struct{}{} + close(stopListener) + connect <- struct{}{} } } } @@ -149,11 +147,6 @@ func (k *Kraken) Listen() <-chan Update { // Close - provides an interface for a user initiated shutdown. func (k *Kraken) Close() error { - for i := 0; i < 2; i++ { - k.stop <- struct{}{} - } - k.wg.Wait() - if k.conn != nil { if err := k.conn.Close(); err != nil { return err @@ -162,11 +155,12 @@ func (k *Kraken) Close() error { close(k.stop) close(k.msg) - close(k.connect) return nil } func (k *Kraken) send(msg interface{}) error { + k.lock.Lock() + defer k.lock.Unlock() if k.conn == nil { return nil } @@ -178,14 +172,14 @@ func (k *Kraken) send(msg interface{}) error { return k.conn.WriteMessage(websocket.TextMessage, data) } -func (k *Kraken) listenSocket() { - defer k.wg.Done() - - if k.conn == nil { +func (k *Kraken) listenSocket(stop chan struct{}, reconnectCh chan struct{}) { + defer close(reconnectCh) + conn := k.conn + if conn == nil { return } - if err := k.conn.SetReadDeadline(time.Now().Add(k.readTimeout)); err != nil { + if err := conn.SetReadDeadline(time.Now().Add(k.readTimeout)); err != nil { log.Error(err) return } @@ -194,17 +188,17 @@ func (k *Kraken) listenSocket() { select { case <-k.stop: return + case <-stop: + return default: - _, msg, err := k.conn.ReadMessage() + _, msg, err := conn.ReadMessage() if err != nil { log.Error(err) - k.connect <- struct{}{} return } - if err := k.conn.SetReadDeadline(time.Now().Add(k.readTimeout)); err != nil { + if err := conn.SetReadDeadline(time.Now().Add(k.readTimeout)); err != nil { log.Error(err) - k.connect <- struct{}{} return }