-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathserver.go
148 lines (130 loc) · 3.21 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package redplex
import (
"bufio"
"fmt"
"net"
"sync"
"github.com/sirupsen/logrus"
)
// toSendQueueLimit is the number of pubsub messages we can
// buffer on the outgoing connection.
const toSendQueueLimit = 128
// Server is the redplex server which accepts connections and talks to the
// underlying Pubsub implementation.
type Server struct {
l net.Listener
pubsub *Pubsub
}
// NewServer creates a new Redplex server. It listens for connections from
// the listener and uses the Dialer to proxy to the remote server.
func NewServer(l net.Listener, pubsub *Pubsub) *Server {
return &Server{
l: l,
pubsub: pubsub,
}
}
// Listen accepts and serves incoming connections.
func (s *Server) Listen() error {
go s.pubsub.Start()
for {
cnx, err := s.l.Accept()
if err != nil {
return err
}
go (&connection{
cnx: cnx,
pubsub: s.pubsub,
toSendCond: *sync.NewCond(&sync.Mutex{}),
toSend: make([][]byte, 0, toSendQueueLimit),
}).Start()
}
}
// Close frees resources associated with the server.
func (s *Server) Close() {
s.l.Close()
s.pubsub.Close()
}
type connection struct {
cnx net.Conn
pubsub *Pubsub
listeners []*Listener
toSendCond sync.Cond
isClosed bool
toSend [][]byte
}
// Start reads data from the connection until we're no longer able to.
func (s *connection) Start() {
reader := bufio.NewReader(s.cnx)
defer func() {
s.toSendCond.L.Lock()
s.isClosed = true
s.toSendCond.Broadcast()
s.toSendCond.L.Unlock()
s.pubsub.UnsubscribeAll(s)
clientsCount.Dec()
}()
clientsCount.Inc()
logrus.Debug("redplex/server: accepted connection")
go s.loopWrite()
for {
method, args, err := ParseRequest(reader)
if err != nil {
logrus.WithError(err).Debug("redplex/server: error reading command, terminating client connection")
return
}
switch method {
case commandSubscribe:
for _, channel := range args {
s.pubsub.Subscribe(Listener{false, string(channel), s})
}
case commandPSubscribe:
for _, channel := range args {
s.pubsub.Subscribe(Listener{true, string(channel), s})
}
case commandUnsubscribe:
for _, channel := range args {
s.pubsub.Unsubscribe(Listener{false, string(channel), s})
}
case commandPUnsubscribe:
for _, channel := range args {
s.pubsub.Unsubscribe(Listener{true, string(channel), s})
}
case commandQuit:
logrus.Debug("redplex/server: terminating connection at client's request")
return
default:
s.cnx.Write([]byte(fmt.Sprintf("-ERR unknown command '%s'\r\n", method)))
continue
}
for _, channel := range args {
s.cnx.Write(SubscribeResponse(method, channel))
}
}
}
func (s *connection) loopWrite() {
buffers := net.Buffers{}
for {
s.toSendCond.L.Lock()
for len(s.toSend) == 0 && !s.isClosed {
s.toSendCond.Wait()
}
if s.isClosed {
s.toSendCond.L.Unlock()
return
}
buffers = append(buffers, s.toSend...)
s.toSend = s.toSend[:0]
s.toSendCond.L.Unlock()
buffers.WriteTo(s.cnx)
buffers = buffers[:0]
}
}
// Write implements Writable.Write.
func (s *connection) Write(b []byte) {
s.toSendCond.L.Lock()
if len(s.toSend) < cap(s.toSend) && !s.isClosed {
s.toSend = append(s.toSend, b)
s.toSendCond.Broadcast()
}
s.toSendCond.L.Unlock()
}