Skip to content

Commit

Permalink
- added chan with last update time
Browse files Browse the repository at this point in the history
  • Loading branch information
andrey_pugach committed Aug 24, 2023
1 parent 0b5a822 commit c38b827
Showing 1 changed file with 41 additions and 29 deletions.
70 changes: 41 additions & 29 deletions websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,23 +148,26 @@ type WebSocketClient struct {
// Error channel
errors chan error
// Downstream message channel
messages chan *WebSocketDownstreamMessage
conn *fastws.Conn
token *WebSocketTokenModel
server *WebSocketServerModel
enableHeartbeat bool
skipVerifyTls bool
timeout time.Duration
messages chan *WebSocketDownstreamMessage
conn *fastws.Conn
token *WebSocketTokenModel
server *WebSocketServerModel
enableHeartbeat bool
skipVerifyTls bool
useLastUpdateTime bool
lastUpdateTime chan time.Time
timeout time.Duration
}

var defaultTimeout = time.Second * 5

// WebSocketClientOpts defines the options for the client
// during the websocket connection.
type WebSocketClientOpts struct {
Token *WebSocketTokenModel
TLSSkipVerify bool
Timeout time.Duration
Token *WebSocketTokenModel
TLSSkipVerify bool
UseLastUpdateTime bool
Timeout time.Duration
}

// NewWebSocketClient creates an instance of WebSocketClient.
Expand All @@ -179,25 +182,27 @@ func (as *ApiService) NewWebSocketClient(token *WebSocketTokenModel) *WebSocketC
// NewWebSocketClientOpts creates an instance of WebSocketClient with the parsed options.
func (as *ApiService) NewWebSocketClientOpts(opts WebSocketClientOpts) *WebSocketClient {
wc := &WebSocketClient{
wg: &sync.WaitGroup{},
done: make(chan struct{}),
errors: make(chan error, 1),
pongs: make(chan string, 1),
acks: make(chan string, 1),
token: opts.Token,
messages: make(chan *WebSocketDownstreamMessage, 2048),
skipVerifyTls: opts.TLSSkipVerify,
timeout: opts.Timeout,
wg: &sync.WaitGroup{},
done: make(chan struct{}),
errors: make(chan error, 1),
pongs: make(chan string, 1),
acks: make(chan string, 1),
lastUpdateTime: make(chan time.Time, 2048),
useLastUpdateTime: opts.UseLastUpdateTime,
token: opts.Token,
messages: make(chan *WebSocketDownstreamMessage, 2048),
skipVerifyTls: opts.TLSSkipVerify,
timeout: opts.Timeout,
}
return wc
}

// Connect connects the WebSocket server.
func (wc *WebSocketClient) Connect() (<-chan *WebSocketDownstreamMessage, <-chan error, error) {
func (wc *WebSocketClient) Connect() (<-chan *WebSocketDownstreamMessage, <-chan time.Time, <-chan error, error) {
// Find out a server
s, err := wc.token.Servers.RandomServer()
if err != nil {
return wc.messages, wc.errors, err
return wc.messages, wc.lastUpdateTime, wc.errors, err
}
wc.server = s

Expand All @@ -211,7 +216,7 @@ func (wc *WebSocketClient) Connect() (<-chan *WebSocketDownstreamMessage, <-chan

uri, err := url.Parse(s.Endpoint)
if err != nil {
return wc.messages, wc.errors, err
return wc.messages, wc.lastUpdateTime, wc.errors, err
}

port := ":443"
Expand All @@ -226,13 +231,13 @@ func (wc *WebSocketClient) Connect() (<-chan *WebSocketDownstreamMessage, <-chan
if scheme == "http" {
netConn, err = net.Dial("tcp", addr)
if err != nil {
return wc.messages, wc.errors, err
return wc.messages, wc.lastUpdateTime, wc.errors, err
}
} else {
netConn, err = tls.DialWithDialer(&net.Dialer{Timeout: 15 * time.Second}, "tcp", addr,
&tls.Config{InsecureSkipVerify: wc.skipVerifyTls})
if err != nil {
return wc.messages, wc.errors, err
return wc.messages, wc.lastUpdateTime, wc.errors, err
}
}

Expand All @@ -241,7 +246,7 @@ func (wc *WebSocketClient) Connect() (<-chan *WebSocketDownstreamMessage, <-chan
// Connect ws server
wc.conn, err = fastws.Client(netConn, u)
if err != nil {
return wc.messages, wc.errors, err
return wc.messages, wc.lastUpdateTime, wc.errors, err
}

wc.conn.WriteTimeout = time.Second * 45
Expand All @@ -252,17 +257,20 @@ func (wc *WebSocketClient) Connect() (<-chan *WebSocketDownstreamMessage, <-chan
var buf []byte
m := &WebSocketDownstreamMessage{}
_, buf, err = wc.conn.ReadMessage(buf[:0])
if wc.useLastUpdateTime {
wc.lastUpdateTime <- time.Now()
}
if err != nil {
return wc.messages, wc.errors, err
return wc.messages, wc.lastUpdateTime, wc.errors, err
}
if err := json.Unmarshal(buf, m); err != nil {
return wc.messages, wc.errors, err
return wc.messages, wc.lastUpdateTime, wc.errors, err
}
if DebugMode {
logrus.Debugf("Received a WebSocket message: %s", ToJsonString(m))
}
if m.Type == ErrorMessage {
return wc.messages, wc.errors, errors.Errorf("Error message: %s", ToJsonString(m))
return wc.messages, wc.lastUpdateTime, wc.errors, errors.Errorf("Error message: %s", ToJsonString(m))
}
if m.Type == WelcomeMessage {
break
Expand All @@ -273,13 +281,14 @@ func (wc *WebSocketClient) Connect() (<-chan *WebSocketDownstreamMessage, <-chan
go wc.read()
go wc.keepHeartbeat()

return wc.messages, wc.errors, nil
return wc.messages, wc.lastUpdateTime, wc.errors, nil
}

func (wc *WebSocketClient) read() {
defer func() {
close(wc.pongs)
close(wc.messages)
close(wc.lastUpdateTime)
wc.wg.Done()
}()

Expand Down Expand Up @@ -321,6 +330,9 @@ func (wc *WebSocketClient) read() {
default:
wc.errors <- errors.Errorf("Unknown message type: %s", m.Type)
}
if wc.useLastUpdateTime {
wc.lastUpdateTime <- time.Now()
}
}
}
}
Expand Down

0 comments on commit c38b827

Please sign in to comment.