Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add sans-io WAMP broker #41

Merged
merged 2 commits into from
Jun 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 166 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package wampproto

import (
"fmt"
"sync"

"github.com/xconnio/wampproto-go/messages"
)

const (
OptAcknowledge = "acknowledge"
)

type Broker struct {
subscriptionsByTopic map[string]*Subscription
subscriptionsBySession map[int64]map[int64]*Subscription
sessions map[int64]*SessionDetails

idGen *SessionScopeIDGenerator
sync.Mutex
}

func NewBroker() *Broker {
return &Broker{
sessions: map[int64]*SessionDetails{},
subscriptionsByTopic: make(map[string]*Subscription),
subscriptionsBySession: make(map[int64]map[int64]*Subscription),
idGen: &SessionScopeIDGenerator{},
}
}

func (b *Broker) AddSession(details *SessionDetails) error {
b.Lock()
defer b.Unlock()

_, exists := b.subscriptionsBySession[details.ID()]
if exists {
return fmt.Errorf("broker: cannot add session %b, it already exists", details.ID())
}

b.subscriptionsBySession[details.ID()] = map[int64]*Subscription{}
b.sessions[details.ID()] = details
return nil
}

func (b *Broker) RemoveSession(id int64) error {
b.Lock()
defer b.Unlock()

subscriptions, exists := b.subscriptionsBySession[id]
if !exists {
return fmt.Errorf("broker: cannot remove session %b, it doesn't exist", id)
}

delete(b.subscriptionsBySession, id)
for _, v := range subscriptions {
subscription, ok := b.subscriptionsByTopic[v.Topic]
if ok {
delete(subscription.Subscribers, id)
}

if len(subscription.Subscribers) == 0 {
delete(b.subscriptionsByTopic, v.Topic)
}
}

delete(b.sessions, id)

return nil
}

func (b *Broker) HasSubscription(topic string) bool {
b.Lock()
defer b.Unlock()

_, exists := b.subscriptionsByTopic[topic]
return exists
}

func (b *Broker) ReceiveMessage(sessionID int64, msg messages.Message) (*MessageWithRecipient, error) {
b.Lock()
defer b.Unlock()

switch msg.Type() {
case messages.MessageTypeSubscribe:
_, exists := b.subscriptionsBySession[sessionID]
if !exists {
return nil, fmt.Errorf("dealer: cannot subscribe, session %d doesn't exist", sessionID)
}

subscribe := msg.(*messages.Subscribe)
subscription, exists := b.subscriptionsByTopic[subscribe.Topic()]
if exists {
subscription.Subscribers[sessionID] = sessionID
} else {
subscription = &Subscription{
ID: b.idGen.NextID(),
Topic: subscribe.Topic(),
Subscribers: map[int64]int64{sessionID: sessionID},
}
}

b.subscriptionsBySession[sessionID][subscription.ID] = subscription

subscribed := messages.NewSubscribed(subscribe.RequestID(), subscription.ID)
result := &MessageWithRecipient{Message: subscribed, Recipient: sessionID}
return result, nil
case messages.MessageTypeUnSubscribe:
unsubscribe := msg.(*messages.UnSubscribe)
subscriptions, exists := b.subscriptionsBySession[sessionID]
if !exists {
return nil, fmt.Errorf("dealer: cannot unsubscribe, session %d doesn't exist", sessionID)
}

subscription, exists := subscriptions[unsubscribe.SubscriptionID()]
if !exists {
return nil, fmt.Errorf("broker: cannot unsubscribe non-existent subscription %d",
unsubscribe.SubscriptionID())
}

delete(subscription.Subscribers, sessionID)
if len(subscription.Subscribers) == 0 {
delete(b.subscriptionsByTopic, subscription.Topic)
}

delete(b.subscriptionsBySession[sessionID], subscription.ID)

unsubscribed := messages.NewUnSubscribed(unsubscribe.RequestID())
result := &MessageWithRecipient{Message: unsubscribed, Recipient: sessionID}
return result, nil
case messages.MessageTypeError:
return nil, fmt.Errorf("dealer: error handling is not implemented yet")
default:
return nil, fmt.Errorf("dealer: received unexpected message of type %T", msg)
}
}

func (b *Broker) ReceivePublish(sessionID int64, publish *messages.Publish) (*Publication, error) {
b.Lock()
defer b.Unlock()

_, exists := b.subscriptionsBySession[sessionID]
if !exists {
return nil, fmt.Errorf("broker: cannot publish, session %d doesn't exist", sessionID)
}

result := &Publication{}
publicationID := b.idGen.NextID()

subscription, exists := b.subscriptionsByTopic[publish.Topic()]
if exists && len(subscription.Subscribers) > 0 {
event := messages.NewEvent(subscription.ID, publicationID, nil, publish.Args(), publish.KwArgs())
result.Event = event
for _, subscriber := range subscription.Subscribers {
result.recipients = append(result.recipients, subscriber)
}
}

ack, ok := publish.Options()[OptAcknowledge].(bool)
if ok && ack {
published := messages.NewPublished(publish.RequestID(), publicationID)
result.Ack = &MessageWithRecipient{Message: published, Recipient: sessionID}
}

return result, nil
}
54 changes: 54 additions & 0 deletions broker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package wampproto_test

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/xconnio/wampproto-go"
"github.com/xconnio/wampproto-go/messages"
)

func TestBrokerAddRemoveSession(t *testing.T) {
broker := wampproto.NewBroker()

t.Run("RemoveNonSession", func(t *testing.T) {
err := broker.RemoveSession(1)
require.Error(t, err)
})

t.Run("AddRemove", func(t *testing.T) {
details := wampproto.NewSessionDetails(1, "realm", "authid", "anonymous", false)
err := broker.AddSession(details)
require.NoError(t, err)

err = broker.RemoveSession(details.ID())
require.NoError(t, err)

err = broker.RemoveSession(details.ID())
require.Error(t, err)
})
}

func TestBrokerPublish(t *testing.T) {
broker := wampproto.NewBroker()

details := wampproto.NewSessionDetails(1, "realm", "authid", "anonymous", false)
err := broker.AddSession(details)
require.NoError(t, err)

args := []any{1, 2}
kwArgs := map[string]any{"name": "alex"}
options := map[string]any{wampproto.OptAcknowledge: true}

t.Run("NoSubscriber", func(t *testing.T) {
publish := messages.NewPublish(1, options, "foo.bar", args, kwArgs)
publication, err := broker.ReceivePublish(details.ID(), publish)
require.NoError(t, err)
require.NotNil(t, publication)

require.Equal(t, publication.Ack.Recipient, details.ID())
require.Equal(t, publication.Ack.Message.Type(), messages.MessageTypePublished)
require.Nil(t, publication.Event)
})
}
12 changes: 12 additions & 0 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,15 @@ type MessageWithRecipient struct {
Message messages.Message
Recipient int64
}

type Subscription struct {
ID int64
Topic string
Subscribers map[int64]int64
}

type Publication struct {
Event *messages.Event
recipients []int64
Ack *MessageWithRecipient
}
Loading