-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathsubscribe.go
103 lines (90 loc) · 2.38 KB
/
subscribe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package qlcchain
import (
"encoding/json"
"fmt"
"golang.org/x/net/websocket"
)
type Subscribe struct {
ws *websocket.Conn
subscribeID string
Stopped chan bool
}
func NewSubscribe(url string) *Subscribe {
ws, err := websocket.Dial(url, "", url)
if err != nil {
fmt.Println("websocket dial: ", err)
}
return &Subscribe{
ws: ws,
Stopped: make(chan bool),
}
}
type subscribeInfo struct {
Jsonrpc string `json:"jsonrpc"`
ID int `json:"id"`
Result string `json:"result"`
}
type resultInfo struct {
Subscription string `json:"subscription"`
Result interface{} `json:"result"`
}
type publishInfo struct {
Jsonrpc string `json:"jsonrpc"`
Method string `json:"method"`
Params resultInfo `json:"params"`
}
// type unsubscribeInfo struct {
// Jsonrpc string `json:"jsonrpc"`
// Id int `json:"id"`
// Result bool `json:"result"`
//}
func (s *Subscribe) subscribe(request string) error {
if err := websocket.Message.Send(s.ws, request); err != nil {
return fmt.Errorf("send error: %s", err)
}
var response string
if err := websocket.Message.Receive(s.ws, &response); err != nil {
return fmt.Errorf("receive message: %v ", err)
}
reply := new(subscribeInfo)
err := json.Unmarshal([]byte(response), &reply)
if err != nil {
return fmt.Errorf("subscribe, Can not decode data: %s ", err)
}
s.subscribeID = reply.Result
return nil
}
func (s *Subscribe) publish() (interface{}, bool) {
var response string
err := websocket.Message.Receive(s.ws, &response)
if err != nil { // if call Close() or sever stopped, connect closed, can not receive message
fmt.Println("receive publish message: ", err)
s.closeConnection()
return nil, true
}
reply := new(publishInfo)
err = json.Unmarshal([]byte(response), reply)
if err != nil {
fmt.Println("Can not decode publish data: ", err)
s.closeConnection()
return nil, true
}
if reply.Params.Result == nil { // if call Unsubscribe(), connect closed, receive message is nil
s.closeConnection()
return nil, true
}
return reply.Params.Result, false
}
func (s *Subscribe) Unsubscribe(request string) error {
if err := websocket.Message.Send(s.ws, request); err != nil {
return fmt.Errorf("unsubscribe error: %s", err)
}
return nil
}
func (s *Subscribe) Close() error {
s.closeConnection()
return s.ws.Close()
}
func (s *Subscribe) closeConnection() {
s.Stopped <- true
}