Skip to content

Commit f6cbfe1

Browse files
committed
namespace scoped web sockets
Signed-off-by: John Hosie <[email protected]>
1 parent 13bf4e1 commit f6cbfe1

File tree

8 files changed

+327
-21
lines changed

8 files changed

+327
-21
lines changed

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ $(eval $(call makemock, internal/operations, Manager, operat
7979
$(eval $(call makemock, internal/multiparty, Manager, multipartymocks))
8080
$(eval $(call makemock, internal/apiserver, FFISwaggerGen, apiservermocks))
8181
$(eval $(call makemock, internal/apiserver, Server, apiservermocks))
82+
$(eval $(call makemock, internal/events/websockets, WebSocketsNamespaced, websocketsmocks))
8283

8384
firefly-nocgo: ${GOFILES}
8485
CGO_ENABLED=0 $(VGO) build -o ${BINARY_NAME}-nocgo -ldflags "-X main.buildDate=$(DATE) -X main.buildVersion=$(BUILD_VERSION) -X 'github.com/hyperledger/firefly/cmd.BuildVersionOverride=$(BUILD_VERSION)' -X 'github.com/hyperledger/firefly/cmd.BuildDate=$(DATE)' -X 'github.com/hyperledger/firefly/cmd.BuildCommit=$(GIT_REF)'" -tags=prod -tags=prod -v

internal/apiserver/server.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,9 @@ func (as *apiServer) createMuxRouter(ctx context.Context, mgr namespace.Manager)
385385
ws.(*websockets.WebSockets).SetAuthorizer(mgr)
386386
r.HandleFunc(`/ws`, ws.(*websockets.WebSockets).ServeHTTP)
387387

388+
// namespace scoped web sockets
389+
r.HandleFunc("/api/v1/namespaces/{ns}/ws", hf.APIWrapper(getNamespacedWebSocketHandler(ws.(*websockets.WebSockets), mgr)))
390+
388391
uiPath := config.GetString(coreconfig.UIPath)
389392
if uiPath != "" && config.GetBool(coreconfig.UIEnabled) {
390393
r.PathPrefix(`/ui`).Handler(newStaticHandler(uiPath, "index.html", `/ui`))
@@ -394,6 +397,23 @@ func (as *apiServer) createMuxRouter(ctx context.Context, mgr namespace.Manager)
394397
return r
395398
}
396399

400+
func getNamespacedWebSocketHandler(ws websockets.WebSocketsNamespaced, mgr namespace.Manager) ffapi.HandlerFunction {
401+
return func(res http.ResponseWriter, req *http.Request) (status int, err error) {
402+
403+
vars := mux.Vars(req)
404+
namespace := vars["ns"]
405+
or, err := mgr.Orchestrator(req.Context(), namespace, false)
406+
if err != nil || or == nil {
407+
return 404, i18n.NewError(req.Context(), coremsgs.Msg404NotFound)
408+
}
409+
410+
ws.ServeHTTPNamespaced(namespace, res, req)
411+
412+
return 200, nil
413+
}
414+
415+
}
416+
397417
func (as *apiServer) notFoundHandler(res http.ResponseWriter, req *http.Request) (status int, err error) {
398418
res.Header().Add("Content-Type", "application/json")
399419
return 404, i18n.NewError(req.Context(), coremsgs.Msg404NotFound)

internal/apiserver/server_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"bytes"
2121
"context"
2222
"encoding/json"
23+
"errors"
2324
"fmt"
2425
"io"
2526
"mime/multipart"
@@ -43,6 +44,7 @@ import (
4344
"github.com/hyperledger/firefly/mocks/namespacemocks"
4445
"github.com/hyperledger/firefly/mocks/orchestratormocks"
4546
"github.com/hyperledger/firefly/mocks/spieventsmocks"
47+
"github.com/hyperledger/firefly/mocks/websocketsmocks"
4648
"github.com/hyperledger/firefly/pkg/core"
4749
"github.com/stretchr/testify/assert"
4850
"github.com/stretchr/testify/mock"
@@ -513,3 +515,37 @@ func TestGetOrchestratorMissingTag(t *testing.T) {
513515
_, err := getOrchestrator(context.Background(), &namespacemocks.Manager{}, "", nil)
514516
assert.Regexp(t, "FF10437", err)
515517
}
518+
519+
func TestGetNamespacedWebSocketHandler(t *testing.T) {
520+
mgr, _, _ := newTestServer()
521+
mwsns := &websocketsmocks.WebSocketsNamespaced{}
522+
mwsns.On("ServeHTTPNamespaced", "ns1", mock.Anything, mock.Anything).Return()
523+
524+
var b bytes.Buffer
525+
req := httptest.NewRequest("GET", "/api/v1/namespaces/ns1/ws", &b)
526+
req = mux.SetURLVars(req, map[string]string{"ns": "ns1"})
527+
req.Header.Set("Content-Type", "application/json; charset=utf-8")
528+
res := httptest.NewRecorder()
529+
530+
handler := getNamespacedWebSocketHandler(mwsns, mgr)
531+
status, err := handler(res, req)
532+
assert.NoError(t, err)
533+
assert.Equal(t, 200, status)
534+
}
535+
536+
func TestGetNamespacedWebSocketHandlerUnknownNamespace(t *testing.T) {
537+
mgr, _, _ := newTestServer()
538+
mwsns := &websocketsmocks.WebSocketsNamespaced{}
539+
540+
mgr.On("Orchestrator", mock.Anything, "unknown", false).Return(nil, errors.New("unknown namespace")).Maybe()
541+
var b bytes.Buffer
542+
req := httptest.NewRequest("GET", "/api/v1/namespaces/unknown/ws", &b)
543+
req = mux.SetURLVars(req, map[string]string{"ns": "unknown"})
544+
req.Header.Set("Content-Type", "application/json; charset=utf-8")
545+
res := httptest.NewRecorder()
546+
547+
handler := getNamespacedWebSocketHandler(mwsns, mgr)
548+
status, err := handler(res, req)
549+
assert.Error(t, err)
550+
assert.Equal(t, 404, status)
551+
}

internal/coremsgs/en_error_messages.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,4 +300,5 @@ var (
300300
MsgTokensRESTErrConflict = ffe("FF10459", "Conflict from tokens service: %s", 409)
301301
MsgBatchWithDataNotSupported = ffe("FF10460", "Provided subscription '%s' enables batching and withData which is not supported", 400)
302302
MsgBatchDeliveryNotSupported = ffe("FF10461", "Batch delivery not supported by transport '%s'", 400)
303+
MsgWSWrongNamespace = ffe("FF10462", "Websocket request received on a namespace scoped connection but the provided namespace does not match")
303304
)

internal/events/websockets/websocket_connection.go

Lines changed: 50 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -38,23 +38,25 @@ type websocketStartedSub struct {
3838
}
3939

4040
type websocketConnection struct {
41-
ctx context.Context
42-
ws *WebSockets
43-
wsConn *websocket.Conn
44-
cancelCtx func()
45-
connID string
46-
sendMessages chan interface{}
47-
senderDone chan struct{}
48-
receiverDone chan struct{}
49-
autoAck bool
50-
started []*websocketStartedSub
51-
inflight []*core.EventDeliveryResponse
52-
mux sync.Mutex
53-
closed bool
54-
remoteAddr string
55-
userAgent string
56-
header http.Header
57-
auth core.Authorizer
41+
ctx context.Context
42+
ws *WebSockets
43+
wsConn *websocket.Conn
44+
cancelCtx func()
45+
connID string
46+
sendMessages chan interface{}
47+
senderDone chan struct{}
48+
receiverDone chan struct{}
49+
autoAck bool
50+
started []*websocketStartedSub
51+
inflight []*core.EventDeliveryResponse
52+
mux sync.Mutex
53+
closed bool
54+
remoteAddr string
55+
userAgent string
56+
header http.Header
57+
auth core.Authorizer
58+
namespaceScoped bool // if true then any request to listen is asserted to be in the context of namespace
59+
namespace string
5860
}
5961

6062
func newConnection(pCtx context.Context, ws *WebSockets, wsConn *websocket.Conn, req *http.Request, auth core.Authorizer) *websocketConnection {
@@ -80,6 +82,18 @@ func newConnection(pCtx context.Context, ws *WebSockets, wsConn *websocket.Conn,
8082
return wc
8183
}
8284

85+
func (wc *websocketConnection) assertNamespace(namespace string) (string, error) {
86+
87+
if wc.namespaceScoped {
88+
if namespace == "" {
89+
namespace = wc.namespace
90+
} else if namespace != wc.namespace {
91+
return "", i18n.NewError(wc.ctx, coremsgs.MsgWSWrongNamespace)
92+
}
93+
}
94+
return namespace, nil
95+
}
96+
8397
// processAutoStart gives a helper to specify query parameters to auto-start your subscription
8498
func (wc *websocketConnection) processAutoStart(req *http.Request) {
8599
query := req.URL.Query()
@@ -88,12 +102,18 @@ func (wc *websocketConnection) processAutoStart(req *http.Request) {
88102
_, hasName := query["name"]
89103
autoAck, hasAutoack := req.URL.Query()["autoack"]
90104
isAutoack := hasAutoack && (len(autoAck) == 0 || autoAck[0] != "false")
105+
namespace, err := wc.assertNamespace(query.Get("namespace"))
106+
if err != nil {
107+
wc.protocolError(err)
108+
return
109+
}
110+
91111
if hasEphemeral || hasName {
92112
filter := core.NewSubscriptionFilterFromQuery(query)
93113
err := wc.handleStart(&core.WSStart{
94114
AutoAck: &isAutoack,
95115
Ephemeral: isEphemeral,
96-
Namespace: query.Get("namespace"),
116+
Namespace: namespace,
97117
Name: query.Get("name"),
98118
Filter: filter,
99119
})
@@ -157,7 +177,10 @@ func (wc *websocketConnection) receiveLoop() {
157177
var msg core.WSStart
158178
err = json.Unmarshal(msgData, &msg)
159179
if err == nil {
160-
err = wc.authorizeMessage(msg.Namespace)
180+
msg.Namespace, err = wc.assertNamespace(msg.Namespace)
181+
if err == nil {
182+
err = wc.authorizeMessage(msg.Namespace)
183+
}
161184
if err == nil {
162185
err = wc.handleStart(&msg)
163186
}
@@ -251,6 +274,14 @@ func (wc *websocketConnection) restartForNamespace(ns string, startTime time.Tim
251274
}
252275

253276
func (wc *websocketConnection) handleStart(start *core.WSStart) (err error) {
277+
// this will very likely already be checked before we get here but
278+
// it doesn't do any harm to do a final assertion just in case it hasn't been done yet
279+
280+
start.Namespace, err = wc.assertNamespace(start.Namespace)
281+
if err != nil {
282+
return err
283+
}
284+
254285
wc.mux.Lock()
255286
if start.AutoAck != nil {
256287
if *start.AutoAck != wc.autoAck && len(wc.started) > 0 {

internal/events/websockets/websockets.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ import (
3131
"github.com/hyperledger/firefly/pkg/events"
3232
)
3333

34+
type WebSocketsNamespaced interface {
35+
ServeHTTPNamespaced(namespace string, res http.ResponseWriter, req *http.Request)
36+
}
37+
3438
type WebSockets struct {
3539
ctx context.Context
3640
capabilities *events.Capabilities
@@ -122,6 +126,25 @@ func (ws *WebSockets) ServeHTTP(res http.ResponseWriter, req *http.Request) {
122126
wc.processAutoStart(req)
123127
}
124128

129+
func (ws *WebSockets) ServeHTTPNamespaced(namespace string, res http.ResponseWriter, req *http.Request) {
130+
131+
wsConn, err := ws.upgrader.Upgrade(res, req, nil)
132+
if err != nil {
133+
log.L(ws.ctx).Errorf("WebSocket upgrade failed: %s", err)
134+
return
135+
}
136+
137+
ws.connMux.Lock()
138+
wc := newConnection(ws.ctx, ws, wsConn, req, ws.auth)
139+
wc.namespaceScoped = true
140+
wc.namespace = namespace
141+
ws.connections[wc.connID] = wc
142+
ws.connMux.Unlock()
143+
144+
wc.processAutoStart(req)
145+
146+
}
147+
125148
func (ws *WebSockets) ack(connID string, inflight *core.EventDeliveryResponse) {
126149
if cb, ok := ws.callbacks.handlers[inflight.Subscription.Namespace]; ok {
127150
cb.DeliveryResponse(connID, inflight)

0 commit comments

Comments
 (0)