Skip to content

Commit

Permalink
Separate broker and server options
Browse files Browse the repository at this point in the history
  • Loading branch information
muXxer committed Nov 21, 2023
1 parent 55e4f9b commit 83cf182
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 132 deletions.
22 changes: 16 additions & 6 deletions components/mqtt/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/iotaledger/hive.go/app"
"github.com/iotaledger/hive.go/app/shutdown"
"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/inx-app/pkg/nodebridge"
"github.com/iotaledger/inx-mqtt/pkg/broker"
"github.com/iotaledger/inx-mqtt/pkg/daemon"
Expand Down Expand Up @@ -38,23 +39,19 @@ func provide(c *dig.Container) error {

type inDeps struct {
dig.In
NodeBridge *nodebridge.NodeBridge
NodeBridge nodebridge.NodeBridge
*shutdown.ShutdownHandler
}

return c.Provide(func(deps inDeps) (*mqtt.Server, error) {
return mqtt.NewServer(
Component.Logger(),
deps.NodeBridge,
deps.ShutdownHandler,
broker, err := broker.NewBroker(
broker.WithBufferSize(ParamsMQTT.BufferSize),
broker.WithBufferBlockSize(ParamsMQTT.BufferBlockSize),
broker.WithMaxTopicSubscriptionsPerClient(ParamsMQTT.Subscriptions.MaxTopicSubscriptionsPerClient),
broker.WithTopicCleanupThresholdCount(ParamsMQTT.Subscriptions.TopicsCleanupThresholdCount),
broker.WithTopicCleanupThresholdRatio(ParamsMQTT.Subscriptions.TopicsCleanupThresholdRatio),
broker.WithWebsocketEnabled(ParamsMQTT.Websocket.Enabled),
broker.WithWebsocketBindAddress(ParamsMQTT.Websocket.BindAddress),
broker.WithWebsocketAdvertiseAddress(ParamsMQTT.Websocket.AdvertiseAddress),
broker.WithTCPEnabled(ParamsMQTT.TCP.Enabled),
broker.WithTCPBindAddress(ParamsMQTT.TCP.BindAddress),
broker.WithTCPAuthEnabled(ParamsMQTT.TCP.Auth.Enabled),
Expand All @@ -64,6 +61,19 @@ func provide(c *dig.Container) error {
broker.WithTCPTLSCertificatePath(ParamsMQTT.TCP.TLS.CertificatePath),
broker.WithTCPTLSPrivateKeyPath(ParamsMQTT.TCP.TLS.PrivateKeyPath),
)
if err != nil {
return nil, ierrors.Wrap(err, "failed to create MQTT broker")
}

return mqtt.NewServer(
Component.Logger(),
deps.NodeBridge,
broker,
deps.ShutdownHandler,
mqtt.WithWebsocketEnabled(ParamsMQTT.Websocket.Enabled),
mqtt.WithWebsocketBindAddress(ParamsMQTT.Websocket.BindAddress),
mqtt.WithWebsocketAdvertiseAddress(ParamsMQTT.Websocket.AdvertiseAddress),
)
})
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/iotaledger/hive.go/lo v0.0.0-20231113110812-4ca2b6cc9a42
github.com/iotaledger/hive.go/logger v0.0.0-20231113110812-4ca2b6cc9a42
github.com/iotaledger/hive.go/web v0.0.0-20231113110812-4ca2b6cc9a42
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20231121121055-b13a176c5180
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20231121161429-6546932cb164
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20231120082637-ccd5b8465251
github.com/iotaledger/iota.go/v4 v4.0.0-20231120063545-80c263f28140
github.com/labstack/echo/v4 v4.11.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ github.com/iotaledger/hive.go/stringify v0.0.0-20231113110812-4ca2b6cc9a42 h1:9c
github.com/iotaledger/hive.go/stringify v0.0.0-20231113110812-4ca2b6cc9a42/go.mod h1:FTo/UWzNYgnQ082GI9QVM9HFDERqf9rw9RivNpqrnTs=
github.com/iotaledger/hive.go/web v0.0.0-20231113110812-4ca2b6cc9a42 h1:HNh4L30LGePtoaoVOdk6qKu94CyZBTok0tByFlZz2mI=
github.com/iotaledger/hive.go/web v0.0.0-20231113110812-4ca2b6cc9a42/go.mod h1:L/CLz7skt9dvidhBOw2gmMGhmrUBHXlA0b3paugdsE4=
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20231121121055-b13a176c5180 h1:hAVWoyAF4FE+1gUd2IqvTBDTnQ4Z0GKE6qc8qw9QPqg=
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20231121121055-b13a176c5180/go.mod h1:iFiY6UukYeL8D3N1mtg4jh/9lxTBhzG0QgtD+w0gpps=
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20231121161429-6546932cb164 h1:DNBOmhRgNk0hprkWgXxIFga1diFbeoX/dTNWIUJnQKI=
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20231121161429-6546932cb164/go.mod h1:iFiY6UukYeL8D3N1mtg4jh/9lxTBhzG0QgtD+w0gpps=
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20231120082637-ccd5b8465251 h1:bYGO8jXNXJNMGPG9etGW7WXfLbRU9ofx1xdd29/sS9M=
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20231120082637-ccd5b8465251/go.mod h1:chzj8FDIeXHIh3D52QTZ7imADlzdkhg7o7E2Qr85MJ8=
github.com/iotaledger/iota.go/v4 v4.0.0-20231120063545-80c263f28140 h1:8zHRYT1KADR9bOLUg7Ia4XA3StBHzV4Tb2Qtp42KLN8=
Expand Down
119 changes: 71 additions & 48 deletions pkg/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,56 +15,77 @@ import (
"github.com/iotaledger/hive.go/web/subscriptionmanager"
)

type Broker interface {
// Events returns the events of the broker.
Events() subscriptionmanager.Events[string, string]
// Start the broker.
Start() error
// Stop the broker.
Stop() error
// HasSubscribers returns true if the topic has subscribers.
HasSubscribers(topic string) bool
// Send publishes a message.
Send(topic string, payload []byte) error
// SystemInfo returns the metrics of the broker.
SystemInfo() *system.Info
// SubscribersSize returns the size of the underlying map of the SubscriptionManager.
SubscribersSize() int
// TopicsSize returns the size of all underlying maps of the SubscriptionManager.
TopicsSize() int
}

// Broker is a simple mqtt publisher abstraction.
type Broker struct {
broker *mqtt.Server
opts *BrokerOptions
type broker struct {
server *mqtt.Server
opts *Options
subscriptionManager *subscriptionmanager.SubscriptionManager[string, string]
unhook func()
}

// NewBroker creates a new broker.
func NewBroker(brokerOpts *BrokerOptions) (*Broker, error) {
func NewBroker(brokerOpts ...Option) (Broker, error) {
opts := &Options{}
opts.ApplyOnDefault(brokerOpts...)

if !brokerOpts.WebsocketEnabled && !brokerOpts.TCPEnabled {
if !opts.WebsocketEnabled && !opts.TCPEnabled {
return nil, errors.New("at least websocket or TCP must be enabled")
}

broker := mqtt.NewServer(&mqtt.Options{
BufferSize: brokerOpts.BufferSize,
BufferBlockSize: brokerOpts.BufferBlockSize,
server := mqtt.NewServer(&mqtt.Options{
BufferSize: opts.BufferSize,
BufferBlockSize: opts.BufferBlockSize,
InflightTTL: 30,
})

if brokerOpts.WebsocketEnabled {
if opts.WebsocketEnabled {
// check websocket bind address
_, _, err := net.SplitHostPort(brokerOpts.WebsocketBindAddress)
_, _, err := net.SplitHostPort(opts.WebsocketBindAddress)
if err != nil {
return nil, fmt.Errorf("parsing websocket bind address (%s) failed: %w", brokerOpts.WebsocketBindAddress, err)
return nil, fmt.Errorf("parsing websocket bind address (%s) failed: %w", opts.WebsocketBindAddress, err)
}

ws := listeners.NewWebsocket("ws1", brokerOpts.WebsocketBindAddress)
if err := broker.AddListener(ws, &listeners.Config{
ws := listeners.NewWebsocket("ws1", opts.WebsocketBindAddress)
if err := server.AddListener(ws, &listeners.Config{
Auth: &AuthAllowEveryone{},
TLS: nil,
}); err != nil {
return nil, fmt.Errorf("adding websocket listener failed: %w", err)
}
}

if brokerOpts.TCPEnabled {
if opts.TCPEnabled {
// check tcp bind address
_, _, err := net.SplitHostPort(brokerOpts.TCPBindAddress)
_, _, err := net.SplitHostPort(opts.TCPBindAddress)
if err != nil {
return nil, fmt.Errorf("parsing TCP bind address (%s) failed: %w", brokerOpts.TCPBindAddress, err)
return nil, fmt.Errorf("parsing TCP bind address (%s) failed: %w", opts.TCPBindAddress, err)
}

tcp := listeners.NewTCP("t1", brokerOpts.TCPBindAddress)
tcp := listeners.NewTCP("t1", opts.TCPBindAddress)

var tcpAuthController auth.Controller
if brokerOpts.TCPAuthEnabled {
if opts.TCPAuthEnabled {
var err error
tcpAuthController, err = NewAuthAllowUsers(brokerOpts.TCPAuthPasswordSalt, brokerOpts.TCPAuthUsers)
tcpAuthController, err = NewAuthAllowUsers(opts.TCPAuthPasswordSalt, opts.TCPAuthUsers)
if err != nil {
return nil, fmt.Errorf("enabling TCP Authentication failed: %w", err)
}
Expand All @@ -73,16 +94,16 @@ func NewBroker(brokerOpts *BrokerOptions) (*Broker, error) {
}

var tlsConfig *tls.Config
if brokerOpts.TCPTLSEnabled {
if opts.TCPTLSEnabled {
var err error

tlsConfig, err = NewTLSConfig(brokerOpts.TCPTLSCertificatePath, brokerOpts.TCPTLSPrivateKeyPath)
tlsConfig, err = NewTLSConfig(opts.TCPTLSCertificatePath, opts.TCPTLSPrivateKeyPath)
if err != nil {
return nil, fmt.Errorf("enabling TCP TLS failed: %w", err)
}
}

if err := broker.AddListener(tcp, &listeners.Config{
if err := server.AddListener(tcp, &listeners.Config{
Auth: tcpAuthController,
TLSConfig: tlsConfig,
}); err != nil {
Expand All @@ -91,14 +112,14 @@ func NewBroker(brokerOpts *BrokerOptions) (*Broker, error) {
}

s := subscriptionmanager.New(
subscriptionmanager.WithMaxTopicSubscriptionsPerClient[string, string](brokerOpts.MaxTopicSubscriptionsPerClient),
subscriptionmanager.WithCleanupThresholdCount[string, string](brokerOpts.TopicCleanupThresholdCount),
subscriptionmanager.WithCleanupThresholdRatio[string, string](brokerOpts.TopicCleanupThresholdRatio),
subscriptionmanager.WithMaxTopicSubscriptionsPerClient[string, string](opts.MaxTopicSubscriptionsPerClient),
subscriptionmanager.WithCleanupThresholdCount[string, string](opts.TopicCleanupThresholdCount),
subscriptionmanager.WithCleanupThresholdRatio[string, string](opts.TopicCleanupThresholdRatio),
)

// this event is used to drop malicious clients
unhook := s.Events().DropClient.Hook(func(event *subscriptionmanager.DropClientEvent[string]) {
client, exists := broker.Clients.Get(event.ClientID)
client, exists := server.Clients.Get(event.ClientID)
if !exists {
return
}
Expand All @@ -107,72 +128,74 @@ func NewBroker(brokerOpts *BrokerOptions) (*Broker, error) {
client.Stop(event.Reason)

// delete the client from the broker
broker.Clients.Delete(event.ClientID)
server.Clients.Delete(event.ClientID)
}).Unhook

// bind the broker events to the SubscriptionManager to track the subscriptions
broker.Events.OnConnect = func(cl events.Client, pk events.Packet) {
server.Events.OnConnect = func(cl events.Client, pk events.Packet) {
s.Connect(cl.ID)
}

broker.Events.OnDisconnect = func(cl events.Client, err error) {
server.Events.OnDisconnect = func(cl events.Client, err error) {
s.Disconnect(cl.ID)
}

broker.Events.OnSubscribe = func(topic string, cl events.Client, qos byte) {
server.Events.OnSubscribe = func(topic string, cl events.Client, qos byte) {
s.Subscribe(cl.ID, topic)
}

broker.Events.OnUnsubscribe = func(topic string, cl events.Client) {
server.Events.OnUnsubscribe = func(topic string, cl events.Client) {
s.Unsubscribe(cl.ID, topic)
}

return &Broker{
broker: broker,
opts: brokerOpts,
return &broker{
server: server,
opts: opts,
subscriptionManager: s,
unhook: unhook,
}, nil
}

func (b *Broker) Events() subscriptionmanager.Events[string, string] {
// Events returns the events of the broker.
func (b *broker) Events() subscriptionmanager.Events[string, string] {
return *b.subscriptionManager.Events()
}

// Start the broker.
func (b *Broker) Start() error {
return b.broker.Serve()
func (b *broker) Start() error {
return b.server.Serve()
}

// Stop the broker.
func (b *Broker) Stop() error {
func (b *broker) Stop() error {
if b.unhook != nil {
b.unhook()
}

return b.broker.Close()
return b.server.Close()
}

// SystemInfo returns the metrics of the broker.
func (b *Broker) SystemInfo() *system.Info {
return b.broker.System
}

func (b *Broker) HasSubscribers(topic string) bool {
func (b *broker) HasSubscribers(topic string) bool {
return b.subscriptionManager.TopicHasSubscribers(topic)
}

// Send publishes a message.
func (b *Broker) Send(topic string, payload []byte) error {
return b.broker.Publish(topic, payload, false)
func (b *broker) Send(topic string, payload []byte) error {
return b.server.Publish(topic, payload, false)
}

// SystemInfo returns the metrics of the broker.
func (b *broker) SystemInfo() *system.Info {
return b.server.System
}

// SubscribersSize returns the size of the underlying map of the SubscriptionManager.
func (b *Broker) SubscribersSize() int {
func (b *broker) SubscribersSize() int {
return b.subscriptionManager.SubscribersSize()
}

// TopicsSize returns the size of all underlying maps of the SubscriptionManager.
func (b *Broker) TopicsSize() int {
func (b *broker) TopicsSize() int {
return b.subscriptionManager.TopicsSize()
}
Loading

0 comments on commit 83cf182

Please sign in to comment.