-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
226 lines (205 loc) · 6.61 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
package main
import (
"io"
"log"
"fmt"
"time"
"runtime"
"strconv"
"net/http"
"math/rand"
"encoding/json"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"github.com/go-redis/redis"
)
var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
func RandStringRunes(n int) string {
b := make([]rune, n)
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}
return string(b)
}
type ADNMetaRespone struct {
Type string `json:"type"`
Connection_id string `json:"connection_id"`
}
type ADNPingDataRespone struct {
Id uint64 `json:"id"`
}
type ADNRepsonse struct {
Meta ADNMetaRespone `json:"meta"`
//Data string `json:"data"`
}
type ADNPingRepsonse struct {
Meta ADNMetaRespone `json:"meta"`
Data ADNPingDataRespone `json:"data"`
}
func main() {
rand.Seed(time.Now().UnixNano())
redisClient := redis.NewClient(&redis.Options{
Addr: "192.168.253.145:6379",
Password: "", // no password set
DB: 0, // use default DB
})
pong, err := redisClient.Ping().Result()
fmt.Println(pong, err)
log.Println("AppDotNetWS v 0.2 TLS server starting on port 8000")
http.ListenAndServeTLS(":8000", "/root/.acme.sh/api.sapphire.moe/fullchain.cer", "/root/.acme.sh/api.sapphire.moe/api.sapphire.moe.key", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
log.Println("Request", r.RemoteAddr)
reqConnId := r.URL.Query().Get("connection_id")
if (reqConnId != "") {
// FIXME: if reqConnId is set, we need to boot any others using it
// check redis, msg redis to disconnect
// then reuse
// otherwise just recreate
log.Println("reqConnId", reqConnId)
}
var connectionId = RandStringRunes(64)
// now connectionId is sorted
token := r.URL.Query().Get("access_token")
// we need to tie token to connectionId
var connTokenKey string
connTokenKey += "token_"
connTokenKey += connectionId
rErr := redisClient.Set(connTokenKey, token, 0).Err()
if rErr != nil {
panic(rErr)
}
// we need to tie auto_delete to connectionId
autoDelete := r.URL.Query().Get("auto_delete")
var autoDelKey string
autoDelKey += "autoDelete_"
autoDelKey += connectionId
rErr = redisClient.Set(autoDelKey, autoDelete, 0).Err()
if rErr != nil {
panic(rErr)
}
// javascript can't read websocket headers
header := http.Header{
"X-Go-Version": []string{runtime.Version()},
"Connection-Id": []string{connectionId},
}
conn, _, _, err := ws.UpgradeHTTP(r, w, header)
if err != nil {
// handle error
}
pubsub := redisClient.Subscribe(connectionId)
// Wait for subscription to be created before publishing message.
subscr, err := pubsub.ReceiveTimeout(time.Second)
if err != nil {
panic(err)
}
fmt.Println(subscr)
// one thread to pump redis
go func() {
defer pubsub.Close()
for {
msg, err := pubsub.ReceiveMessage()
if err != nil {
panic(err)
}
fmt.Println(msg.Channel, msg.Payload)
//var dat map[string]interface{}
var dat ADNRepsonse
if err := json.Unmarshal([]byte(msg.Payload), &dat); err != nil {
fmt.Println("Couldnt Unmarshal Payload into ADNRepsonse")
}
fmt.Println("MetaType", dat)
if (dat.Meta.Type == "ping") {
var res ADNPingRepsonse
if err := json.Unmarshal([]byte(msg.Payload), &res); err != nil {
fmt.Println("Couldnt Unmarshal Payload into ADNPingRepsonse", err)
continue // we just wont pong
}
fmt.Println("pong!")
redisClient.Publish("AppDotNetWS", "pong_"+connectionId+"_"+strconv.FormatUint(res.Data.Id, 10))
continue
}
// demarshall
err = wsutil.WriteServerMessage(conn, ws.OpText, []byte(msg.Payload))
if err != nil {
// handle error
log.Println("redis write message err", err)
redisClient.Del(connTokenKey)
redisClient.Publish("AppDotNetWS", "disconnect_"+connectionId)
pubsub.Close()
// we need to kill the websocket too tbh
conn.Close()
break
}
}
}()
// another thread to listen to socket and handle closing the socket
go func() {
defer conn.Close()
log.Println("WebSocket Connected", conn.RemoteAddr(), connectionId)
var (
state = ws.StateServerSide
reader = wsutil.NewReader(conn, state)
writer = wsutil.NewWriter(conn, state, ws.OpText)
)
// javascript can't read websocket headers
// The user stream endpoint will return the negotiated connection_id in HTTP headers (https)
// or initial message (websocket).
// send initial JSON
msg := []byte(`{
"meta": {
"connection_id": "` + connectionId + `"
},
"data": {
}
}`)
err = wsutil.WriteServerMessage(conn, ws.OpText, msg)
if err != nil {
// handle error
log.Println("conn write message err", err)
}
for {
header, err := reader.NextFrame()
if err != nil {
// handle error
log.Println("frame err", err)
}
if header.OpCode == ws.OpClose {
log.Println("opclose closing connection", conn.RemoteAddr())
redisClient.Del(connTokenKey)
redisClient.Publish("AppDotNetWS", "disconnect_"+connectionId)
conn.Close()
break
}
// https://godoc.org/github.com/gobwas/ws#Header
log.Println("Frame Header", header)
log.Println("Frame Content", reader)
var buff [32 * 1024]byte
if (header.OpCode.IsControl()) {
log.Println("control frame", reader)
} else {
if (header.Length > 32 * 1024) {
log.Println("skipping large frame")
reader.Discard()
} else {
_, err = io.ReadFull(reader, buff[0:header.Length])
if err == nil {
s := string(buff[:header.Length])
if header.OpCode.IsData() {
log.Println("data frame", s)
}
}
}
}
if err != nil {
log.Println("err is closing connection", conn.RemoteAddr())
redisClient.Del(connTokenKey)
redisClient.Publish("AppDotNetWS", "disconnect_"+connectionId)
//redisClient.Publish(connectionId, "disconnect")
conn.Close()
break
}
// Reset writer to write frame with right operation code.
writer.Reset(conn, state, header.OpCode)
}
}()
}))
}