Skip to content

Commit 48a7e03

Browse files
committed
move sockjs to Centrifugo to start deprecation process
1 parent dbc0da4 commit 48a7e03

File tree

7 files changed

+677
-6
lines changed

7 files changed

+677
-6
lines changed

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ require (
7070
github.com/golang/protobuf v1.5.3 // indirect
7171
github.com/google/pprof v0.0.0-20230926050212-f7f687d19a98 // indirect
7272
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
73-
github.com/igm/sockjs-go/v3 v3.0.2 // indirect
73+
github.com/igm/sockjs-go/v3 v3.0.2
7474
github.com/inconshreveable/mousetrap v1.1.0 // indirect
7575
github.com/josharian/intern v1.0.0 // indirect
7676
github.com/klauspost/compress v1.17.2 // indirect

internal/sockjs/cancelctx.go

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package sockjs
2+
3+
import (
4+
"context"
5+
"time"
6+
)
7+
8+
// customCancelContext wraps context and cancels as soon as channel closed.
9+
type customCancelContext struct {
10+
context.Context
11+
ch <-chan struct{}
12+
}
13+
14+
// Deadline not used.
15+
func (c customCancelContext) Deadline() (time.Time, bool) { return time.Time{}, false }
16+
17+
// Done returns channel that will be closed as soon as connection closed.
18+
func (c customCancelContext) Done() <-chan struct{} { return c.ch }
19+
20+
// Err returns context error.
21+
func (c customCancelContext) Err() error {
22+
select {
23+
case <-c.ch:
24+
return context.Canceled
25+
default:
26+
return nil
27+
}
28+
}
29+
30+
// NewCancelContext returns a wrapper context around original context that will
31+
// be canceled on channel close.
32+
func NewCancelContext(ctx context.Context, ch <-chan struct{}) context.Context {
33+
return customCancelContext{Context: ctx, ch: ch}
34+
}

internal/sockjs/handler_sockjs.go

+278
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,278 @@
1+
package sockjs
2+
3+
import (
4+
"net/http"
5+
"sync"
6+
"time"
7+
8+
"github.com/centrifugal/centrifuge"
9+
10+
"github.com/centrifugal/protocol"
11+
"github.com/gorilla/websocket"
12+
"github.com/igm/sockjs-go/v3/sockjs"
13+
)
14+
15+
// Config represents config for SockJS handler.
16+
type Config struct {
17+
// HandlerPrefix sets prefix for SockJS handler endpoint path.
18+
HandlerPrefix string
19+
20+
// URL is an address to SockJS client javascript library. Required for iframe-based
21+
// transports to work. This URL should lead to the same SockJS client version as used
22+
// for connecting on the client side.
23+
URL string
24+
25+
// CheckOrigin allows deciding whether to use CORS or not in XHR case.
26+
// When false returned then CORS headers won't be set.
27+
CheckOrigin func(*http.Request) bool
28+
29+
// WebsocketCheckOrigin allows setting custom CheckOrigin func for underlying
30+
// Gorilla Websocket based websocket.Upgrader.
31+
WebsocketCheckOrigin func(*http.Request) bool
32+
33+
// WebsocketReadBufferSize is a parameter that is used for raw websocket.Upgrader.
34+
// If set to zero reasonable default value will be used.
35+
WebsocketReadBufferSize int
36+
37+
// WebsocketWriteBufferSize is a parameter that is used for raw websocket.Upgrader.
38+
// If set to zero reasonable default value will be used.
39+
WebsocketWriteBufferSize int
40+
41+
// WebsocketUseWriteBufferPool enables using buffer pool for writes in Websocket transport.
42+
WebsocketUseWriteBufferPool bool
43+
44+
// WebsocketWriteTimeout is maximum time of write message operation.
45+
// Slow client will be disconnected.
46+
// By default, 1 * time.Second will be used.
47+
WebsocketWriteTimeout time.Duration
48+
49+
centrifuge.PingPongConfig
50+
}
51+
52+
// Handler accepts SockJS connections. SockJS has a bunch of fallback
53+
// transports when WebSocket connection is not supported. It comes with additional
54+
// costs though: small protocol framing overhead, lack of binary support, more
55+
// goroutines per connection, and you need to use sticky session mechanism on
56+
// your load balancer in case you are using HTTP-based SockJS fallbacks and have
57+
// more than one Centrifuge Node on a backend (so SockJS to be able to emulate
58+
// bidirectional protocol). So if you can afford it - use WebsocketHandler only.
59+
type Handler struct {
60+
node *centrifuge.Node
61+
config Config
62+
handler http.Handler
63+
}
64+
65+
var writeBufferPool = &sync.Pool{}
66+
67+
// NewHandler creates new Handler.
68+
func NewHandler(node *centrifuge.Node, config Config) *Handler {
69+
options := sockjs.DefaultOptions
70+
71+
wsUpgrader := &websocket.Upgrader{
72+
ReadBufferSize: config.WebsocketReadBufferSize,
73+
WriteBufferSize: config.WebsocketWriteBufferSize,
74+
Error: func(w http.ResponseWriter, r *http.Request, status int, reason error) {},
75+
}
76+
wsUpgrader.CheckOrigin = config.WebsocketCheckOrigin
77+
if config.WebsocketUseWriteBufferPool {
78+
wsUpgrader.WriteBufferPool = writeBufferPool
79+
} else {
80+
wsUpgrader.WriteBufferSize = config.WebsocketWriteBufferSize
81+
}
82+
options.WebsocketUpgrader = wsUpgrader
83+
84+
// Override sockjs url. It's important to use the same SockJS
85+
// library version on client and server sides when using iframe
86+
// based SockJS transports, otherwise SockJS will raise error
87+
// about version mismatch.
88+
options.SockJSURL = config.URL
89+
options.CheckOrigin = config.CheckOrigin
90+
91+
wsWriteTimeout := config.WebsocketWriteTimeout
92+
if wsWriteTimeout == 0 {
93+
wsWriteTimeout = 1 * time.Second
94+
}
95+
options.WebsocketWriteTimeout = wsWriteTimeout
96+
97+
s := &Handler{
98+
node: node,
99+
config: config,
100+
}
101+
102+
options.HeartbeatDelay = 0
103+
s.handler = sockjs.NewHandler(config.HandlerPrefix, options, s.sockJSHandler)
104+
return s
105+
}
106+
107+
func (s *Handler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
108+
s.handler.ServeHTTP(rw, r)
109+
}
110+
111+
// sockJSHandler called when new client connection comes to SockJS endpoint.
112+
func (s *Handler) sockJSHandler(sess sockjs.Session) {
113+
s.handleSession(sess)
114+
}
115+
116+
// sockJSHandler called when new client connection comes to SockJS endpoint.
117+
func (s *Handler) handleSession(sess sockjs.Session) {
118+
// Separate goroutine for better GC of caller's data.
119+
go func() {
120+
transport := newSockjsTransport(sess, sockjsTransportOptions{
121+
pingPong: s.config.PingPongConfig,
122+
})
123+
124+
select {
125+
case <-s.node.NotifyShutdown():
126+
_ = transport.Close(centrifuge.DisconnectShutdown)
127+
return
128+
default:
129+
}
130+
131+
ctxCh := make(chan struct{})
132+
defer close(ctxCh)
133+
c, closeFn, err := centrifuge.NewClient(NewCancelContext(sess.Request().Context(), ctxCh), s.node, transport)
134+
if err != nil {
135+
s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error creating client", map[string]any{"transport": transportSockJS}))
136+
return
137+
}
138+
defer func() { _ = closeFn() }()
139+
140+
if s.node.LogEnabled(centrifuge.LogLevelDebug) {
141+
s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "client connection established", map[string]any{"client": c.ID(), "transport": transportSockJS}))
142+
defer func(started time.Time) {
143+
s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "client connection completed", map[string]any{"client": c.ID(), "transport": transportSockJS, "duration": time.Since(started)}))
144+
}(time.Now())
145+
}
146+
147+
var needWaitLoop bool
148+
149+
for {
150+
if msg, err := sess.Recv(); err == nil {
151+
reader := GetStringReader(msg)
152+
if ok := centrifuge.HandleReadFrame(c, reader); !ok {
153+
PutStringReader(reader)
154+
needWaitLoop = true
155+
break
156+
}
157+
PutStringReader(reader)
158+
continue
159+
}
160+
break
161+
}
162+
163+
if needWaitLoop {
164+
// One extra loop till we get an error from session,
165+
// this is required to wait until close frame will be sent
166+
// into connection inside Client implementation and transport
167+
// closed with proper disconnect reason.
168+
for {
169+
if _, err := sess.Recv(); err != nil {
170+
break
171+
}
172+
}
173+
}
174+
}()
175+
}
176+
177+
const (
178+
transportSockJS = "sockjs"
179+
)
180+
181+
type sockjsTransportOptions struct {
182+
pingPong centrifuge.PingPongConfig
183+
}
184+
185+
type sockjsTransport struct {
186+
mu sync.RWMutex
187+
closeCh chan struct{}
188+
session sockjs.Session
189+
opts sockjsTransportOptions
190+
closed bool
191+
}
192+
193+
func newSockjsTransport(s sockjs.Session, opts sockjsTransportOptions) *sockjsTransport {
194+
t := &sockjsTransport{
195+
session: s,
196+
closeCh: make(chan struct{}),
197+
opts: opts,
198+
}
199+
return t
200+
}
201+
202+
// Name returns name of transport.
203+
func (t *sockjsTransport) Name() string {
204+
return transportSockJS
205+
}
206+
207+
// Protocol returns transport protocol.
208+
func (t *sockjsTransport) Protocol() centrifuge.ProtocolType {
209+
return centrifuge.ProtocolTypeJSON
210+
}
211+
212+
// ProtocolVersion returns transport ProtocolVersion.
213+
func (t *sockjsTransport) ProtocolVersion() centrifuge.ProtocolVersion {
214+
return centrifuge.ProtocolVersion2
215+
}
216+
217+
// Unidirectional returns whether transport is unidirectional.
218+
func (t *sockjsTransport) Unidirectional() bool {
219+
return false
220+
}
221+
222+
// Emulation ...
223+
func (t *sockjsTransport) Emulation() bool {
224+
return false
225+
}
226+
227+
// DisabledPushFlags ...
228+
func (t *sockjsTransport) DisabledPushFlags() uint64 {
229+
// SockJS has its own close frames to mimic WebSocket Close frames,
230+
// so we don't need to send Disconnect pushes.
231+
return centrifuge.PushFlagDisconnect
232+
}
233+
234+
// PingPongConfig ...
235+
func (t *sockjsTransport) PingPongConfig() centrifuge.PingPongConfig {
236+
return t.opts.pingPong
237+
}
238+
239+
// Write data to transport.
240+
func (t *sockjsTransport) Write(message []byte) error {
241+
select {
242+
case <-t.closeCh:
243+
return nil
244+
default:
245+
// No need to use protocol encoders here since
246+
// SockJS only supports JSON.
247+
return t.session.Send(string(message))
248+
}
249+
}
250+
251+
// WriteMany messages to transport.
252+
func (t *sockjsTransport) WriteMany(messages ...[]byte) error {
253+
select {
254+
case <-t.closeCh:
255+
return nil
256+
default:
257+
encoder := protocol.GetDataEncoder(protocol.Type(centrifuge.ProtocolTypeJSON))
258+
defer protocol.PutDataEncoder(protocol.Type(centrifuge.ProtocolTypeJSON), encoder)
259+
for i := range messages {
260+
_ = encoder.Encode(messages[i])
261+
}
262+
return t.session.Send(string(encoder.Finish()))
263+
}
264+
}
265+
266+
// Close closes transport.
267+
func (t *sockjsTransport) Close(disconnect centrifuge.Disconnect) error {
268+
t.mu.Lock()
269+
if t.closed {
270+
// Already closed, noop.
271+
t.mu.Unlock()
272+
return nil
273+
}
274+
t.closed = true
275+
close(t.closeCh)
276+
t.mu.Unlock()
277+
return t.session.Close(disconnect.Code, disconnect.Reason)
278+
}

0 commit comments

Comments
 (0)