-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathevents.go
179 lines (142 loc) · 4.02 KB
/
events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package web
import (
"fmt"
"net/http"
"golang.org/x/net/websocket"
)
const EVENTS_BUFFER = 100
type State interface{}
type Event interface{}
type clientSet map[chan Event]bool
// add to set of clients
func (clientSet clientSet) register(clientChan chan Event) {
clientSet[clientChan] = true
}
// remove from set on behalf of client requesting stop(); the clientChan may already be closed
func (clientSet clientSet) unregister(clientChan chan Event) {
delete(clientSet, clientChan)
}
// remove from set on behalf of server; closes the clientChan to tell the client
//
// the client may trigger .unregister() later, which will be a no-op
func (clientSet clientSet) drop(clientChan chan Event) {
close(clientChan)
delete(clientSet, clientChan)
}
// write event to client, drop client if stuck
func (clientSet clientSet) send(clientChan chan Event, event Event) {
select {
case clientChan <- event:
default:
// client dropped behind
clientSet.drop(clientChan)
}
}
// distribute events to clients, dropping clients if they are stuck
func (clientSet clientSet) publish(event Event) {
for clientChan, _ := range clientSet {
clientSet.send(clientChan, event)
}
}
func (clientSet clientSet) close() {
for clientChan, _ := range clientSet {
clientSet.drop(clientChan)
}
}
type EventConfig struct {
// recv from Events
StateFunc func() State
// send to Events
EventPush <-chan Event
}
// WebSocket publish/subscribe
type Events struct {
config EventConfig
registerChan chan chan Event
unregisterChan chan chan Event
}
// Publish events from chan
//
// Close chan to stop
func MakeEvents(config EventConfig) Events {
events := Events{
config: config,
registerChan: make(chan chan Event),
unregisterChan: make(chan chan Event),
}
go events.run(config)
return events
}
func (events Events) run(config EventConfig) {
clients := make(clientSet)
defer clients.close()
// panics any subscribed clients
defer close(events.registerChan)
defer close(events.unregisterChan)
for {
select {
case clientChan := <-events.registerChan:
clients.register(clientChan)
case clientChan := <-events.unregisterChan:
clients.unregister(clientChan)
case event, ok := <-config.EventPush:
if !ok {
return
}
// log.Printf("web:Events: publish: %v", event)
clients.publish(event)
}
}
}
// pull current state from sender
func (events Events) state() State {
if events.config.StateFunc != nil {
return events.config.StateFunc()
} else {
return struct{}{}
}
}
// each subscriber has its own chan to receive from Events
type eventsClient chan Event
// Register new client
//
// recv on the returned chan
func (events Events) listen() (State, eventsClient) {
eventChan := make(chan Event, EVENTS_BUFFER)
events.registerChan <- eventChan
return events.state(), eventChan
}
// Request server to stop sending us events
//
// XXX: panics with send on closed chan if server has stopped
func (events Events) stop(eventsClient eventsClient) {
events.unregisterChan <- eventsClient
}
// Return error if aborting, nil if events closed
func (eventsClient eventsClient) serveWebsocket(websocketConn *websocket.Conn, state State) error {
// initial state
if err := websocket.JSON.Send(websocketConn, state); err != nil {
return fmt.Errorf("webSocket.JSON.Send: %v", err)
}
// update events
for event := range eventsClient {
if err := websocket.JSON.Send(websocketConn, event); err != nil {
return fmt.Errorf("webSocket.JSON.Send: %v", err)
}
}
return nil
}
// goroutine-safe websocket subscriber
func (events Events) ServeWebsocket(websocketConn *websocket.Conn) {
var state, eventsClient = events.listen()
if err := eventsClient.serveWebsocket(websocketConn, state); err != nil {
// stop, assuming that server is still alive
// will panic if server has stopped
events.stop(eventsClient)
} else {
// we do not need to request stop, server has unregistered us
}
}
func (events Events) ServeHTTP(w http.ResponseWriter, r *http.Request) {
websocket.Handler(events.ServeWebsocket).ServeHTTP(w, r)
}