Skip to content

Commit 0d0d7ba

Browse files
vigodskyArtur Abelian
and
Artur Abelian
authored
feat: added spot ws service, moved client to common, refactoring (#633)
Co-authored-by: Artur Abelian <[email protected]>
1 parent fce4cf2 commit 0d0d7ba

15 files changed

+1126
-275
lines changed

v2/futures/client_ws.go v2/common/websocket/client.go

+56-95
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package futures
1+
package websocket
22

33
import (
44
"context"
@@ -13,11 +13,9 @@ import (
1313

1414
"github.com/gorilla/websocket"
1515
"github.com/jpillora/backoff"
16-
17-
"github.com/adshao/go-binance/v2/common"
1816
)
1917

20-
//go:generate mockgen -source client_ws.go -destination mock/client_ws.go -package mock
18+
//go:generate mockgen -source client.go -destination mock/client.go -package mock
2119

2220
const (
2321
// reconnectMinInterval define reconnect min interval
@@ -33,22 +31,21 @@ var (
3331

3432
// ErrorWsIdAlreadySent defines that request with the same id was already sent
3533
ErrorWsIdAlreadySent = errors.New("ws error: request with same id already sent")
34+
35+
// KeepAlivePingDeadline defines deadline to send ping frame
36+
KeepAlivePingDeadline = 10 * time.Second
3637
)
3738

3839
// messageId define id field of request/response
3940
type messageId struct {
4041
Id string `json:"id"`
4142
}
4243

43-
// ClientWs define API websocket client
44-
type ClientWs struct {
45-
APIKey string
46-
SecretKey string
44+
// client define API websocket client
45+
type client struct {
4746
Debug bool
48-
KeyType string
49-
TimeOffset int64
5047
logger *log.Logger
51-
conn wsConnection
48+
conn Connection
5249
connMu sync.Mutex
5350
reconnectSignal chan struct{}
5451
connectionEstablishedSignal chan struct{}
@@ -58,18 +55,15 @@ type ClientWs struct {
5855
reconnectCount int64
5956
}
6057

61-
func (c *ClientWs) debug(format string, v ...interface{}) {
58+
func (c *client) debug(format string, v ...interface{}) {
6259
if c.Debug {
6360
c.logger.Println(fmt.Sprintf(format, v...))
6461
}
6562
}
6663

67-
// NewClientWs init ClientWs
68-
func NewClientWs(conn wsConnection, apiKey, secretKey string) (*ClientWs, error) {
69-
client := &ClientWs{
70-
APIKey: apiKey,
71-
SecretKey: secretKey,
72-
KeyType: common.KeyTypeHmac,
64+
// NewClient init client
65+
func NewClient(conn Connection) (Client, error) {
66+
client := &client{
7367
logger: log.New(os.Stderr, "Binance-golang ", log.LstdFlags),
7468
conn: conn,
7569
connMu: sync.Mutex{},
@@ -86,21 +80,17 @@ func NewClientWs(conn wsConnection, apiKey, secretKey string) (*ClientWs, error)
8680
return client, nil
8781
}
8882

89-
type wsClient interface {
83+
type Client interface {
9084
Write(id string, data []byte) error
9185
WriteSync(id string, data []byte, timeout time.Duration) ([]byte, error)
9286
GetReadChannel() <-chan []byte
9387
GetReadErrorChannel() <-chan error
94-
GetApiKey() string
95-
GetSecretKey() string
96-
GetTimeOffset() int64
97-
GetKeyType() string
9888
GetReconnectCount() int64
9989
Wait(timeout time.Duration)
10090
}
10191

10292
// Write sends data into websocket connection
103-
func (c *ClientWs) Write(id string, data []byte) error {
93+
func (c *client) Write(id string, data []byte) error {
10494
c.connMu.Lock()
10595
defer c.connMu.Unlock()
10696

@@ -120,7 +110,7 @@ func (c *ClientWs) Write(id string, data []byte) error {
120110

121111
// WriteSync sends data to the websocket connection and waits for a response synchronously
122112
// Should be used separately from the asynchronous Write method (do not send anything in parallel)
123-
func (c *ClientWs) WriteSync(id string, data []byte, timeout time.Duration) ([]byte, error) {
113+
func (c *client) WriteSync(id string, data []byte, timeout time.Duration) ([]byte, error) {
124114
c.connMu.Lock()
125115
defer c.connMu.Unlock()
126116

@@ -157,36 +147,20 @@ func (c *ClientWs) WriteSync(id string, data []byte, timeout time.Duration) ([]b
157147
}
158148
}
159149

160-
func (c *ClientWs) GetReadChannel() <-chan []byte {
150+
func (c *client) GetReadChannel() <-chan []byte {
161151
return c.readC
162152
}
163153

164-
func (c *ClientWs) GetReadErrorChannel() <-chan error {
154+
func (c *client) GetReadErrorChannel() <-chan error {
165155
return c.readErrChan
166156
}
167157

168-
func (c *ClientWs) GetApiKey() string {
169-
return c.APIKey
170-
}
171-
172-
func (c *ClientWs) GetSecretKey() string {
173-
return c.SecretKey
174-
}
175-
176-
func (c *ClientWs) GetTimeOffset() int64 {
177-
return c.TimeOffset
178-
}
179-
180-
func (c *ClientWs) GetKeyType() string {
181-
return c.KeyType
182-
}
183-
184-
func (c *ClientWs) Wait(timeout time.Duration) {
158+
func (c *client) Wait(timeout time.Duration) {
185159
c.wait(timeout)
186160
}
187161

188162
// read data from connection
189-
func (c *ClientWs) read() {
163+
func (c *client) read() {
190164
defer func() {
191165
// reading from closed connection 1000 times caused panic
192166
// prevent panic for any case
@@ -231,7 +205,7 @@ func (c *ClientWs) read() {
231205

232206
// wait until all responses received
233207
// make sure that you are not sending requests
234-
func (c *ClientWs) wait(timeout time.Duration) {
208+
func (c *client) wait(timeout time.Duration) {
235209
doneC := make(chan struct{})
236210

237211
ctx, cancel := context.WithCancel(context.Background())
@@ -260,7 +234,7 @@ func (c *ClientWs) wait(timeout time.Duration) {
260234
}
261235

262236
// handleReconnect waits for reconnect signal and starts reconnect
263-
func (c *ClientWs) handleReconnect() {
237+
func (c *client) handleReconnect() {
264238
for _ = range c.reconnectSignal {
265239
c.debug("reconnect: received signal")
266240

@@ -285,10 +259,10 @@ func (c *ClientWs) handleReconnect() {
285259
}
286260

287261
// startReconnect starts reconnect loop with increasing delay
288-
func (c *ClientWs) startReconnect(b *backoff.Backoff) *connection {
262+
func (c *client) startReconnect(b *backoff.Backoff) Connection {
289263
for {
290264
atomic.AddInt64(&c.reconnectCount, 1)
291-
conn, err := newConnection()
265+
conn, err := c.conn.RestoreConnection()
292266
if err != nil {
293267
delay := b.Duration()
294268
c.debug("reconnect: error while reconnecting. try in %s", delay.Round(time.Millisecond))
@@ -301,7 +275,9 @@ func (c *ClientWs) startReconnect(b *backoff.Backoff) *connection {
301275
}
302276

303277
// GetReconnectCount returns reconnect counter value
304-
func (c *ClientWs) GetReconnectCount() int64 { return atomic.LoadInt64(&c.reconnectCount) }
278+
func (c *client) GetReconnectCount() int64 {
279+
return atomic.LoadInt64(&c.reconnectCount)
280+
}
305281

306282
// NewRequestList creates request list
307283
func NewRequestList() RequestList {
@@ -356,37 +332,47 @@ func (l *RequestList) IsAlreadyInList(id string) bool {
356332
return false
357333
}
358334

359-
// constructor for connection
360-
func newConnection() (*connection, error) {
361-
conn, err := WsApiInitReadWriteConn()
335+
// NewConnection constructor for connection
336+
func NewConnection(
337+
initUnderlyingWsConnFn func() (*websocket.Conn, error),
338+
isKeepAliveNeeded bool,
339+
keepaliveTimeout time.Duration,
340+
) (Connection, error) {
341+
underlyingWsConn, err := initUnderlyingWsConnFn()
362342
if err != nil {
363343
return nil, err
364344
}
365345

366346
wsConn := &connection{
367-
conn: conn,
368-
connectionMu: sync.Mutex{},
369-
lastResponseMu: sync.Mutex{},
347+
conn: underlyingWsConn,
348+
connectionMu: sync.Mutex{},
349+
lastResponseMu: sync.Mutex{},
350+
initUnderlyingWsConnFn: initUnderlyingWsConnFn,
351+
keepaliveTimeout: keepaliveTimeout,
370352
}
371353

372-
if WebsocketKeepalive {
373-
go wsConn.keepAlive(WebsocketTimeoutReadWriteConnection)
354+
if isKeepAliveNeeded {
355+
go wsConn.keepAlive(keepaliveTimeout)
374356
}
375357

376358
return wsConn, nil
377359
}
378360

379-
// instance of single connection with keepalive handler
361+
// connection is an instance of single ws connection with keepalive handler
380362
type connection struct {
381-
conn *websocket.Conn
382-
connectionMu sync.Mutex
383-
lastResponse time.Time
384-
lastResponseMu sync.Mutex
363+
conn *websocket.Conn
364+
connectionMu sync.Mutex
365+
lastResponse time.Time
366+
lastResponseMu sync.Mutex
367+
initUnderlyingWsConnFn func() (*websocket.Conn, error)
368+
keepaliveTimeout time.Duration
369+
isKeepAliveNeeded bool
385370
}
386371

387-
type wsConnection interface {
372+
type Connection interface {
388373
WriteMessage(messageType int, data []byte) error
389374
ReadMessage() (messageType int, p []byte, err error)
375+
RestoreConnection() (Connection, error)
390376
}
391377

392378
// WriteMessage is a thread-safe method for conn.WriteMessage
@@ -401,6 +387,11 @@ func (c *connection) ReadMessage() (int, []byte, error) {
401387
return c.conn.ReadMessage()
402388
}
403389

390+
// RestoreConnection recreates ws connection with the same underlying connection callback and keepalive timeout
391+
func (c *connection) RestoreConnection() (Connection, error) {
392+
return NewConnection(c.initUnderlyingWsConnFn, c.isKeepAliveNeeded, c.keepaliveTimeout)
393+
}
394+
404395
// keepAlive handles ping-pong for connection
405396
func (c *connection) keepAlive(timeout time.Duration) {
406397
ticker := time.NewTicker(timeout)
@@ -455,41 +446,11 @@ func (c *connection) ping() error {
455446
c.connectionMu.Lock()
456447
defer c.connectionMu.Unlock()
457448

458-
deadline := time.Now().Add(10 * time.Second)
449+
deadline := time.Now().Add(KeepAlivePingDeadline)
459450
err := c.conn.WriteControl(websocket.PingMessage, []byte{}, deadline)
460451
if err != nil {
461452
return err
462453
}
463454

464455
return nil
465456
}
466-
467-
// NewOrderPlaceWsService init OrderPlaceWsService
468-
func NewOrderPlaceWsService(apiKey, secretKey string) (*OrderPlaceWsService, error) {
469-
conn, err := newConnection()
470-
if err != nil {
471-
return nil, err
472-
}
473-
474-
client, err := NewClientWs(conn, apiKey, secretKey)
475-
if err != nil {
476-
return nil, err
477-
}
478-
479-
return &OrderPlaceWsService{c: client}, nil
480-
}
481-
482-
// NewOrderCancelWsService init OrderCancelWsService
483-
func NewOrderCancelWsService(apiKey, secretKey string) (*OrderCancelWsService, error) {
484-
conn, err := newConnection()
485-
if err != nil {
486-
return nil, err
487-
}
488-
489-
client, err := NewClientWs(conn, apiKey, secretKey)
490-
if err != nil {
491-
return nil, err
492-
}
493-
494-
return &OrderCancelWsService{c: client}, nil
495-
}

0 commit comments

Comments
 (0)