-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstompy.go
376 lines (332 loc) · 12 KB
/
stompy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
//Stompy is a stomp client for communicating with stomp based messaging servers.
//It supports stomp 1.1 and 1.2. It exposes a set of interfaces to allow easy mocking
// in tests. The main interface is StompClient
package stompy
import (
"bufio"
"log"
"net"
"strconv"
"sync"
"time"
)
//Supported Versions of stomp protocol
const (
STOMP_1_1 string = "1.1"
STOMP_1_2 string = "1.2"
)
var (
Supported = []string{STOMP_1_1, STOMP_1_2}
//A dumb default Disconnect Handler
DefaultDisconnectHandler = func(err error) {
//hmmm what to do
log.Println("defualt disconnect handler: ", err)
}
)
//Available connection and auth params
type ClientOpts struct {
Vhost string
HostAndPort string
Timeout time.Duration
User string
PassCode string
Version string
}
//the disconnect handler is called on disconnect error from the network. It should handle trying to reconnect
//and set up the subscribers again
type DisconnectHandler func(error)
//responsible for defining the how the connection to the server should be handled
type StompConnector interface {
Connect() error
Disconnect() error
RegisterDisconnectHandler(handler DisconnectHandler)
}
//responsible for defining how a subscription should be handled
type StompSubscriber interface {
//accepts a destination /test/test for example a handler function for handling messages from that subscription and any headers you want to override / set
Subscribe(destination string, handler SubscriptionHandler, headers StompHeaders, receipt *Receipt) (string, error)
Unsubscribe(subId string, headers StompHeaders, receipt *Receipt) error
Ack(msg Frame) error
Nack(msg Frame) error
}
//responsible for defining how a publish should happen
type StompPublisher interface {
//accepts a body, destination, any headers you wish to override or set such as content-type
Publish(destination string, body []byte, headers StompHeaders, receipt *Receipt) error
}
//defines how transactions are done
type StompTransactor interface {
Begin(transId string, addedHeaders StompHeaders, receipt *Receipt) error
Abort(transId string, addedHeaders StompHeaders, receipt *Receipt) error
Commit(transId string, addedHeaders StompHeaders, receipt *Receipt) error
}
//A stomp client is a combination of all of these things
type StompClient interface {
StompConnector
StompSubscriber
StompPublisher
StompTransactor
}
type messageStats struct {
sync.Mutex
count int
}
func (s *messageStats) Increment() int {
s.Lock()
defer s.Unlock()
s.count++
return s.count
}
var stats = &messageStats{}
//main client type for interacting with stomp. This is the exposed type
type Client struct {
opts ClientOpts
connectionErr chan error //we send an error on this channel if there is a connection error. the DisconnectHandler is called if this channel receives an error
shutdown chan bool // tell any loops to exit as we are disconnecting. For example the readLoop
msgChan chan Frame // read loop sends new messages on this channel
DisconnectHandler DisconnectHandler // a func that should do something in the case of a network disconnection
conn net.Conn
reader stompSocketReader //used to read from the network socket
subscriptions *subscriptions
encoderDecoder encoderDecoder
headersFactory headers
}
//Create a new stomp client based on a set of options
func NewClient(opts ClientOpts) StompClient {
errChan := make(chan error)
shutdown := make(chan bool, 1)
msgChan := make(chan Frame)
subMap := make(map[string]subscription)
subs := &subscriptions{subs: subMap}
encoderDecoder := headerEncoderDecoder{opts.Version}
headersFactory := headers{version: opts.Version}
return &Client{opts: opts, connectionErr: errChan, shutdown: shutdown, subscriptions: subs,
msgChan: msgChan, encoderDecoder: encoderDecoder, headersFactory: headersFactory}
}
//Begin a transaction with the stomp server
func (client *Client) Begin(transId string, addedHeaders StompHeaders, receipt *Receipt) error {
headers := client.headersFactory.transactionHeaders(transId, addedHeaders)
headers, err := handleReceipt(headers, receipt)
if err != nil {
return err
}
f := NewFrame(_COMAND_TRANSACTION_BEGIN, headers, _NULLBUFF)
if err := writeFrame(bufio.NewWriter(client.conn), f, client.encoderDecoder); err != nil {
return err
}
return nil
}
//Abort a transaction with the stomp server
func (client *Client) Abort(transId string, addedHeaders StompHeaders, receipt *Receipt) error {
headers := client.headersFactory.transactionHeaders(transId, addedHeaders)
headers, err := handleReceipt(headers, receipt)
if err != nil {
return err
}
f := NewFrame(_COMAND_TRANSACTION_ABORT, headers, _NULLBUFF)
if err := writeFrame(bufio.NewWriter(client.conn), f, client.encoderDecoder); err != nil {
return err
}
return nil
}
//Commit a transaction with the stomp server
func (client *Client) Commit(transId string, addedHeaders StompHeaders, receipt *Receipt) error {
headers := client.headersFactory.transactionHeaders(transId, addedHeaders)
headers, err := handleReceipt(headers, receipt)
if err != nil {
return err
}
f := NewFrame(_COMAND_TRANSACTION_COMMIT, headers, _NULLBUFF)
if err := writeFrame(bufio.NewWriter(client.conn), f, client.encoderDecoder); err != nil {
return err
}
return nil
}
//Acknowledge receipt of a message with stomp server
func (client *Client) Ack(msg Frame) error {
if _, ok := msg.Headers["message-id"]; !ok {
return ClientError("cannot ack message without message-id header")
}
msgId := msg.Headers["message-id"]
transId := msg.Headers["transaction"]
subId := msg.Headers["subscription"]
ackid := msg.Headers["ack"]
headers := client.headersFactory.ackHeaders(msgId, subId, ackid, transId)
frame := NewFrame(_COMMAND_ACK, headers, _NULLBUFF)
return writeFrame(bufio.NewWriter(client.conn), frame, client.encoderDecoder)
}
//Dont acknowledge the message and let the server know so it can decide what to do with it
func (client *Client) Nack(msg Frame) error {
if _, ok := msg.Headers["message-id"]; !ok {
return ClientError("cannot ack message without message-id header")
}
msgId := msg.Headers["message-id"]
transId := msg.Headers["transaction"]
subId := msg.Headers["subscription"]
ackid := msg.Headers["ack"]
headers := client.headersFactory.nackHeaders(msgId, subId, ackid, transId)
frame := NewFrame(_COMMAND_NACK, headers, _NULLBUFF)
return writeFrame(bufio.NewWriter(client.conn), frame, client.encoderDecoder)
}
//StompConnector.Connect creates a tcp connection. sends any error through the errChan also returns the error
func (client *Client) Connect() error {
//set up default disconnect handler that just logs out the err
if client.DisconnectHandler == nil {
client.RegisterDisconnectHandler(DefaultDisconnectHandler)
}
conn, err := net.DialTimeout("tcp", client.opts.HostAndPort, client.opts.Timeout)
if err != nil {
connErr := ConnectionError(err.Error())
client.connectionErr <- connErr
return connErr
}
client.conn = conn
//setup version specific header encoder
encoder := headerEncoderDecoder{client.opts.Version}
client.reader = newStompReader(conn, client.shutdown, client.connectionErr, client.msgChan, encoder)
headers, err := client.headersFactory.connectionHeaders(client.opts)
if err != nil {
return ConnectionError(err.Error())
}
connectFrame := NewFrame(_COMMAND_CONNECT, headers, _NULLBUFF)
if err := writeFrame(bufio.NewWriter(conn), connectFrame, encoder); err != nil {
client.sendConnectionError(err)
return err
}
//read frame after writing out connection to check we are connected
f, err := client.reader.readFrame()
if err != nil {
client.sendConnectionError(err)
return err
}
if f.CommandString() == "ERROR" {
return ClientError("after initial connection recieved err : " + f.Headers["message"])
}
if err := versionCheck(f); err != nil {
return err
}
//start background readloop
go client.reader.startReadLoop()
//start background dispatch
go client.subscriptions.dispatch(client.msgChan)
return nil
}
//StompConnector.Disconnect close our error channel then close the socket connection
func (client *Client) Disconnect() error {
if nil != client.conn {
headers := StompHeaders{}
headers["receipt"] = "disconnect"
rec := NewReceipt(time.Second * 1)
awaitingReceipt.Add("disconnect", rec)
frame := NewFrame(_COMMAND_DISCONNECT, headers, _NULLBUFF)
if err := writeFrame(bufio.NewWriter(client.conn), frame, client.encoderDecoder); err != nil {
return err
}
<-rec.Received
//signal read loop to shutdown
client.shutdown <- true
client.subscriptions = newSubscriptions()
close(client.connectionErr)
close(client.shutdown)
close(client.msgChan)
return client.conn.Close()
}
return nil
}
//StompConnector.RegisterDisconnectHandler register a handler func that is sent any disconnect errors
func (client *Client) RegisterDisconnectHandler(handler DisconnectHandler) {
client.DisconnectHandler = handler
go func(errChan chan error) {
//todo could end up with multiple handlers
//todo prob dont want to fire this multiple times between disconnects. Likely needs more sophistication
for err := range errChan {
if _, ok := err.(ConnectionError); ok {
client.DisconnectHandler(err)
}
}
}(client.connectionErr)
}
//StompPublisher.Send publish a message to the server
func (client *Client) Publish(destination string, body []byte, addedHeaders StompHeaders, receipt *Receipt) error {
headers := client.headersFactory.sendHeaders(destination, addedHeaders)
headers, err := handleReceipt(headers, receipt)
if err != nil {
return err
}
frame := NewFrame(_COMMAND_SEND, headers, body)
return writeFrame(bufio.NewWriter(client.conn), frame, client.encoderDecoder)
}
//subscribe to messages sent to the destination. The SubscriptionHandler will also receive RECEIPTS if a receipt header is set
//headers are id and ack
func (client *Client) Subscribe(destination string, handler SubscriptionHandler, headers StompHeaders, receipt *Receipt) (string, error) {
//create an id
//ensure we don't end up with double registration
sub, err := newSubscription(destination, handler, headers)
if nil != err {
return "", err
}
if err := client.subscriptions.addSubscription(sub); err != nil {
return "", err
}
subHeaders := client.headersFactory.subscribeHeaders(sub.Id, destination, headers)
subHeaders, err = handleReceipt(subHeaders, receipt)
if err != nil {
return "", err
}
frame := Frame{_COMMAND_SUBSCRIBE, subHeaders, _NULLBUFF}
//todo think about if we have no conn
if err := writeFrame(bufio.NewWriter(client.conn), frame, client.encoderDecoder); err != nil {
return "", err
}
return sub.Id, nil
}
//Unsubscribe takes the id of a subscription and removes that subscriber so it will no longer receive messages
func (client *Client) Unsubscribe(id string, headers StompHeaders, receipt *Receipt) error {
unSub := client.headersFactory.unSubscribeHeaders(id, headers)
unSub, err := handleReceipt(unSub, receipt)
if err != nil {
return err
}
frame := Frame{_COMMAND_UNSUBSCRIBE, unSub, _NULLBUFF}
client.subscriptions.removeSubscription(id)
if err := writeFrame(bufio.NewWriter(client.conn), frame, client.encoderDecoder); err != nil {
return err
}
return nil
}
func versionCheck(f Frame) error {
var ok = false
version := f.Headers["version"]
if "" != version {
for _, v := range Supported {
if v == version {
ok = true
break
}
}
}
if !ok {
return VersionError("unsupported version " + version)
}
return nil
}
func (client *Client) sendConnectionError(err error) {
if _, is := err.(ConnectionError); is {
select {
case client.connectionErr <- err:
default:
}
}
}
func handleReceipt(headers StompHeaders, receipt *Receipt) (StompHeaders, error) {
if receiptId, ok := headers["receipt"]; ok && receipt != nil {
if err := awaitingReceipt.Add(receiptId, receipt); err != nil {
return headers, err
}
} else if nil != receipt {
receiptId := "message-" + strconv.Itoa(stats.Increment())
headers["receipt"] = receiptId
awaitingReceipt.Add(receiptId, receipt)
}
return headers, nil
}