diff --git a/broker.go b/broker.go new file mode 100644 index 0000000..644ea13 --- /dev/null +++ b/broker.go @@ -0,0 +1,166 @@ +package wampproto + +import ( + "fmt" + "sync" + + "github.com/xconnio/wampproto-go/messages" +) + +const ( + OptAcknowledge = "acknowledge" +) + +type Broker struct { + subscriptionsByTopic map[string]*Subscription + subscriptionsBySession map[int64]map[int64]*Subscription + sessions map[int64]*SessionDetails + + idGen *SessionScopeIDGenerator + sync.Mutex +} + +func NewBroker() *Broker { + return &Broker{ + sessions: map[int64]*SessionDetails{}, + subscriptionsByTopic: make(map[string]*Subscription), + subscriptionsBySession: make(map[int64]map[int64]*Subscription), + idGen: &SessionScopeIDGenerator{}, + } +} + +func (b *Broker) AddSession(details *SessionDetails) error { + b.Lock() + defer b.Unlock() + + _, exists := b.subscriptionsBySession[details.ID()] + if exists { + return fmt.Errorf("broker: cannot add session %b, it already exists", details.ID()) + } + + b.subscriptionsBySession[details.ID()] = map[int64]*Subscription{} + b.sessions[details.ID()] = details + return nil +} + +func (b *Broker) RemoveSession(id int64) error { + b.Lock() + defer b.Unlock() + + subscriptions, exists := b.subscriptionsBySession[id] + if !exists { + return fmt.Errorf("broker: cannot remove session %b, it doesn't exist", id) + } + + delete(b.subscriptionsBySession, id) + for _, v := range subscriptions { + subscription, ok := b.subscriptionsByTopic[v.Topic] + if ok { + delete(subscription.Subscribers, id) + } + + if len(subscription.Subscribers) == 0 { + delete(b.subscriptionsByTopic, v.Topic) + } + } + + delete(b.sessions, id) + + return nil +} + +func (b *Broker) HasSubscription(topic string) bool { + b.Lock() + defer b.Unlock() + + _, exists := b.subscriptionsByTopic[topic] + return exists +} + +func (b *Broker) ReceiveMessage(sessionID int64, msg messages.Message) (*MessageWithRecipient, error) { + b.Lock() + defer b.Unlock() + + switch msg.Type() { + case messages.MessageTypeSubscribe: + _, exists := b.subscriptionsBySession[sessionID] + if !exists { + return nil, fmt.Errorf("dealer: cannot subscribe, session %d doesn't exist", sessionID) + } + + subscribe := msg.(*messages.Subscribe) + subscription, exists := b.subscriptionsByTopic[subscribe.Topic()] + if exists { + subscription.Subscribers[sessionID] = sessionID + } else { + subscription = &Subscription{ + ID: b.idGen.NextID(), + Topic: subscribe.Topic(), + Subscribers: map[int64]int64{sessionID: sessionID}, + } + } + + b.subscriptionsBySession[sessionID][subscription.ID] = subscription + + subscribed := messages.NewSubscribed(subscribe.RequestID(), subscription.ID) + result := &MessageWithRecipient{Message: subscribed, Recipient: sessionID} + return result, nil + case messages.MessageTypeUnSubscribe: + unsubscribe := msg.(*messages.UnSubscribe) + subscriptions, exists := b.subscriptionsBySession[sessionID] + if !exists { + return nil, fmt.Errorf("dealer: cannot unsubscribe, session %d doesn't exist", sessionID) + } + + subscription, exists := subscriptions[unsubscribe.SubscriptionID()] + if !exists { + return nil, fmt.Errorf("broker: cannot unsubscribe non-existent subscription %d", + unsubscribe.SubscriptionID()) + } + + delete(subscription.Subscribers, sessionID) + if len(subscription.Subscribers) == 0 { + delete(b.subscriptionsByTopic, subscription.Topic) + } + + delete(b.subscriptionsBySession[sessionID], subscription.ID) + + unsubscribed := messages.NewUnSubscribed(unsubscribe.RequestID()) + result := &MessageWithRecipient{Message: unsubscribed, Recipient: sessionID} + return result, nil + case messages.MessageTypeError: + return nil, fmt.Errorf("dealer: error handling is not implemented yet") + default: + return nil, fmt.Errorf("dealer: received unexpected message of type %T", msg) + } +} + +func (b *Broker) ReceivePublish(sessionID int64, publish *messages.Publish) (*Publication, error) { + b.Lock() + defer b.Unlock() + + _, exists := b.subscriptionsBySession[sessionID] + if !exists { + return nil, fmt.Errorf("broker: cannot publish, session %d doesn't exist", sessionID) + } + + result := &Publication{} + publicationID := b.idGen.NextID() + + subscription, exists := b.subscriptionsByTopic[publish.Topic()] + if exists && len(subscription.Subscribers) > 0 { + event := messages.NewEvent(subscription.ID, publicationID, nil, publish.Args(), publish.KwArgs()) + result.Event = event + for _, subscriber := range subscription.Subscribers { + result.recipients = append(result.recipients, subscriber) + } + } + + ack, ok := publish.Options()[OptAcknowledge].(bool) + if ok && ack { + published := messages.NewPublished(publish.RequestID(), publicationID) + result.Ack = &MessageWithRecipient{Message: published, Recipient: sessionID} + } + + return result, nil +} diff --git a/broker_test.go b/broker_test.go new file mode 100644 index 0000000..90a8fb8 --- /dev/null +++ b/broker_test.go @@ -0,0 +1,54 @@ +package wampproto_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/xconnio/wampproto-go" + "github.com/xconnio/wampproto-go/messages" +) + +func TestBrokerAddRemoveSession(t *testing.T) { + broker := wampproto.NewBroker() + + t.Run("RemoveNonSession", func(t *testing.T) { + err := broker.RemoveSession(1) + require.Error(t, err) + }) + + t.Run("AddRemove", func(t *testing.T) { + details := wampproto.NewSessionDetails(1, "realm", "authid", "anonymous", false) + err := broker.AddSession(details) + require.NoError(t, err) + + err = broker.RemoveSession(details.ID()) + require.NoError(t, err) + + err = broker.RemoveSession(details.ID()) + require.Error(t, err) + }) +} + +func TestBrokerPublish(t *testing.T) { + broker := wampproto.NewBroker() + + details := wampproto.NewSessionDetails(1, "realm", "authid", "anonymous", false) + err := broker.AddSession(details) + require.NoError(t, err) + + args := []any{1, 2} + kwArgs := map[string]any{"name": "alex"} + options := map[string]any{wampproto.OptAcknowledge: true} + + t.Run("NoSubscriber", func(t *testing.T) { + publish := messages.NewPublish(1, options, "foo.bar", args, kwArgs) + publication, err := broker.ReceivePublish(details.ID(), publish) + require.NoError(t, err) + require.NotNil(t, publication) + + require.Equal(t, publication.Ack.Recipient, details.ID()) + require.Equal(t, publication.Ack.Message.Type(), messages.MessageTypePublished) + require.Nil(t, publication.Event) + }) +} diff --git a/types.go b/types.go index 52d1621..9c9f001 100644 --- a/types.go +++ b/types.go @@ -45,3 +45,15 @@ type MessageWithRecipient struct { Message messages.Message Recipient int64 } + +type Subscription struct { + ID int64 + Topic string + Subscribers map[int64]int64 +} + +type Publication struct { + Event *messages.Event + recipients []int64 + Ack *MessageWithRecipient +}