Skip to content

Commit

Permalink
Upgrade MQTT broker to v2
Browse files Browse the repository at this point in the history
  • Loading branch information
muXxer committed Dec 7, 2023
1 parent aae771d commit 72f6205
Show file tree
Hide file tree
Showing 22 changed files with 656 additions and 438 deletions.
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

# Docker related files
.dockerignore
Dockerfile*
docker-compose.yml

# Documentation
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@
inx-mqtt
go.work
go.work.sum

# IDE related files
.vscode/
55 changes: 55 additions & 0 deletions Dockerfile.debug
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# https://hub.docker.com/_/golang
FROM golang:1.21-bullseye AS build

# Ensure ca-certificates are up to date
RUN update-ca-certificates

# Set the current Working Directory inside the container
RUN mkdir /scratch
WORKDIR /scratch

# Prepare the folder where we are putting all the files
RUN mkdir /app

# Copy everything from the current directory to the PWD(Present Working Directory) inside the container
COPY . .

# Download go modules
RUN go mod download
RUN go mod verify

# Build the binary (disable inlining and optimizations that can interfere with debugging)
RUN CGO_ENABLED=0 go build -a -gcflags "all=-N -l" -o /app/inx-mqtt .

# Copy the assets
COPY ./config_defaults.json /app/config.json

############################
# Image
############################
FROM golang:1.21-alpine

# Install delve
RUN CGO_ENABLED=0 go install -ldflags "-s -w -extldflags '-static'" github.com/go-delve/delve/cmd/dlv@latest

# Create the nonroot user
ARG USERNAME=nonroot_user
ARG USER_UID=65532
ARG USER_GID=$USER_UID
ARG USER_HOME=/home/nonroot
RUN addgroup --g $USER_GID $USERNAME && adduser --disabled-password --uid $USER_UID --ingroup "${USERNAME}" --shell /sbin/nologin --home $USER_HOME $USERNAME

EXPOSE 1883/tcp
EXPOSE 1888/tcp
# Delve
EXPOSE 4000

# Copy the app dir into distroless image
COPY --chown=nonroot:nonroot --from=build /app /app

WORKDIR /app

# Set USER nonroot
USER $USERNAME

ENTRYPOINT [ "/go/bin/dlv", "--listen=:4000", "--headless=true", "--log=true", "--accept-multiclient", "--api-version=2", "exec", "/app/inx-mqtt", "--" ]
2 changes: 1 addition & 1 deletion components/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var (
Name = "inx-mqtt"

// Version of the app.
Version = "2.0.0-alpha.5"
Version = "2.0.0-alpha.6"
)

func App() *app.App {
Expand Down
18 changes: 10 additions & 8 deletions components/mqtt/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,23 @@ func provide(c *dig.Container) error {

return c.Provide(func(deps inDeps) (*mqtt.Server, error) {
broker, err := broker.NewBroker(
broker.WithBufferSize(ParamsMQTT.BufferSize),
broker.WithBufferBlockSize(ParamsMQTT.BufferBlockSize),
broker.WithMaxTopicSubscriptionsPerClient(ParamsMQTT.Subscriptions.MaxTopicSubscriptionsPerClient),
broker.WithTopicCleanupThresholdCount(ParamsMQTT.Subscriptions.TopicsCleanupThresholdCount),
broker.WithTopicCleanupThresholdRatio(ParamsMQTT.Subscriptions.TopicsCleanupThresholdRatio),
broker.WithWebsocketEnabled(ParamsMQTT.Websocket.Enabled),
broker.WithWebsocketBindAddress(ParamsMQTT.Websocket.BindAddress),
broker.WithTCPEnabled(ParamsMQTT.TCP.Enabled),
broker.WithTCPBindAddress(ParamsMQTT.TCP.BindAddress),
broker.WithTCPAuthEnabled(ParamsMQTT.TCP.Auth.Enabled),
broker.WithTCPAuthPasswordSalt(ParamsMQTT.TCP.Auth.PasswordSalt),
broker.WithTCPAuthUsers(ParamsMQTT.TCP.Auth.Users),
broker.WithTCPTLSEnabled(ParamsMQTT.TCP.TLS.Enabled),
broker.WithTCPTLSCertificatePath(ParamsMQTT.TCP.TLS.CertificatePath),
broker.WithTCPTLSPrivateKeyPath(ParamsMQTT.TCP.TLS.PrivateKeyPath),
broker.WithAuthPasswordSalt(ParamsMQTT.Auth.PasswordSalt),
broker.WithAuthUsers(ParamsMQTT.Auth.Users),
broker.WithPublicTopics(ParamsMQTT.PublicTopics),
broker.WithProtectedTopics(ParamsMQTT.ProtectedTopics),
broker.WithMaxTopicSubscriptionsPerClient(ParamsMQTT.Subscriptions.MaxTopicSubscriptionsPerClient),
broker.WithTopicCleanupThresholdCount(ParamsMQTT.Subscriptions.TopicsCleanupThresholdCount),
broker.WithTopicCleanupThresholdRatio(ParamsMQTT.Subscriptions.TopicsCleanupThresholdRatio),
broker.WithMaximumClientWritesPending(ParamsMQTT.MaximumClientWritesPending),
broker.WithClientWriteBufferSize(ParamsMQTT.ClientWriteBufferSize),
broker.WithClientReadBufferSize(ParamsMQTT.ClientReadBufferSize),
)
if err != nil {
return nil, ierrors.Wrap(err, "failed to create MQTT broker")
Expand Down
46 changes: 29 additions & 17 deletions components/mqtt/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,6 @@ package mqtt
import "github.com/iotaledger/hive.go/app"

type ParametersMQTT struct {
BufferSize int `default:"0" usage:"the size of the client buffers in bytes"`
BufferBlockSize int `default:"0" usage:"the size per client buffer R/W block in bytes"`

Subscriptions struct {
MaxTopicSubscriptionsPerClient int `default:"1000" usage:"the maximum number of topic subscriptions per client before the client gets dropped (DOS protection)"`
TopicsCleanupThresholdCount int `default:"10000" usage:"the number of deleted topics that trigger a garbage collection of the subscription manager"`
TopicsCleanupThresholdRatio float32 `default:"1.0" usage:"the ratio of subscribed topics to deleted topics that trigger a garbage collection of the subscription manager"`
}

Websocket struct {
Enabled bool `default:"true" usage:"whether to enable the websocket connection of the MQTT broker"`
BindAddress string `default:"localhost:1888" usage:"the websocket bind address on which the MQTT broker listens on"`
Expand All @@ -22,25 +13,46 @@ type ParametersMQTT struct {
Enabled bool `default:"false" usage:"whether to enable the TCP connection of the MQTT broker"`
BindAddress string `default:"localhost:1883" usage:"the TCP bind address on which the MQTT broker listens on"`

Auth struct {
Enabled bool `default:"false" usage:"whether to enable auth for TCP connections"`
PasswordSalt string `default:"0000000000000000000000000000000000000000000000000000000000000000" usage:"the auth salt used for hashing the passwords of the users"`
Users map[string]string `usage:"the list of allowed users with their password+salt as a scrypt hash"`
}

TLS struct {
Enabled bool `default:"false" usage:"whether to enable TLS for TCP connections"`
PrivateKeyPath string `default:"private_key.pem" usage:"the path to the private key file (x509 PEM) for TCP connections with TLS"`
CertificatePath string `default:"certificate.pem" usage:"the path to the certificate file (x509 PEM) for TCP connections with TLS"`
} `name:"tls"`
} `name:"tcp"`

Auth struct {
PasswordSalt string `default:"0000000000000000000000000000000000000000000000000000000000000000" usage:"the auth salt used for hashing the passwords of the users"`
Users map[string]string `usage:"the list of allowed users with their password+salt as a scrypt hash"`
}

PublicTopics []string `usage:"the MQTT topics which can be subscribed to without authorization. Wildcards using * are allowed"`
ProtectedTopics []string `usage:"the MQTT topics which only can be subscribed to with valid authorization. Wildcards using * are allowed"`

Subscriptions struct {
MaxTopicSubscriptionsPerClient int `default:"1000" usage:"the maximum number of topic subscriptions per client before the client gets dropped (DOS protection)"`
TopicsCleanupThresholdCount int `default:"10000" usage:"the number of deleted topics that trigger a garbage collection of the subscription manager"`
TopicsCleanupThresholdRatio float32 `default:"1.0" usage:"the ratio of subscribed topics to deleted topics that trigger a garbage collection of the subscription manager"`
}

MaximumClientWritesPending int `default:"0" usage:"the maximum number of pending message writes for a client"`
ClientWriteBufferSize int `default:"0" usage:"the size of the client write buffer"`
ClientReadBufferSize int `default:"0" usage:"the size of the client read buffer"`
}

var ParamsMQTT = &ParametersMQTT{}
var ParamsMQTT = &ParametersMQTT{
PublicTopics: []string{
"commitments/*",
"blocks/*",
"transactions/*",
"block-metadata/*",
"outputs/*",
},
ProtectedTopics: []string{},
}

var params = &app.ComponentParams{
Params: map[string]any{
"mqtt": ParamsMQTT,
},
Masked: nil,
Masked: []string{"mqtt.auth.passwordSalt"},
}
4 changes: 2 additions & 2 deletions components/prometheus/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import (

"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/dig"

"github.com/iotaledger/hive.go/app"
"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/inx-mqtt/pkg/daemon"
"github.com/iotaledger/inx-mqtt/pkg/mqtt"
)
Expand Down Expand Up @@ -94,7 +94,7 @@ func run() error {

go func() {
Component.LogInfof("You can now access the Prometheus exporter using: http://%s/metrics", ParamsPrometheus.BindAddress)
if err := deps.PrometheusEcho.Start(ParamsPrometheus.BindAddress); err != nil && !errors.Is(err, http.ErrServerClosed) {
if err := deps.PrometheusEcho.Start(ParamsPrometheus.BindAddress); err != nil && !ierrors.Is(err, http.ErrServerClosed) {
Component.LogWarnf("Stopped Prometheus exporter due to an error (%s)", err)
}
}()
Expand Down
89 changes: 50 additions & 39 deletions components/prometheus/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,25 @@ import (
var (
mqttBrokerAppInfo *prometheus.GaugeVec
mqttBrokerStarted prometheus.Gauge
mqttBrokerTime prometheus.Gauge
mqttBrokerUptime prometheus.Gauge
mqttBrokerBytesRecv prometheus.Gauge
mqttBrokerBytesReceived prometheus.Gauge
mqttBrokerBytesSent prometheus.Gauge
mqttBrokerClientsConnected prometheus.Gauge
mqttBrokerClientsDisconnected prometheus.Gauge
mqttBrokerClientsMax prometheus.Gauge
mqttBrokerClientsMaximum prometheus.Gauge
mqttBrokerClientsTotal prometheus.Gauge
mqttBrokerConnectionsTotal prometheus.Gauge
mqttBrokerMessagesRecv prometheus.Gauge
mqttBrokerMessagesReceived prometheus.Gauge
mqttBrokerMessagesSent prometheus.Gauge
mqttBrokerPublishDropped prometheus.Gauge
mqttBrokerPublishRecv prometheus.Gauge
mqttBrokerPublishSent prometheus.Gauge
mqttBrokerMessagesDropped prometheus.Gauge
mqttBrokerRetained prometheus.Gauge
mqttBrokerInflight prometheus.Gauge
mqttBrokerInflightDropped prometheus.Gauge
mqttBrokerSubscriptions prometheus.Gauge
mqttBrokerPacketsReceived prometheus.Gauge
mqttBrokerPacketsSent prometheus.Gauge
mqttBrokerMemoryAlloc prometheus.Gauge
mqttBrokerThreads prometheus.Gauge
mqttBrokerSubscriptionManagerSubscribersSize prometheus.Gauge
mqttBrokerSubscriptionManagerTopicsSize prometheus.Gauge
)
Expand Down Expand Up @@ -58,49 +61,57 @@ func registerNewMQTTBrokerGauge(registry *prometheus.Registry, name string, help
func registerMQTTMetrics(registry *prometheus.Registry) {
mqttBrokerAppInfo = registerNewMQTTBrokerGaugeVec(registry, "app_info", []string{"name", "version", "broker_version"}, "The current version of the server.")
mqttBrokerStarted = registerNewMQTTBrokerGauge(registry, "started", "The time the server started in unix seconds.")
mqttBrokerTime = registerNewMQTTBrokerGauge(registry, "time", "Current time on the server.")
mqttBrokerUptime = registerNewMQTTBrokerGauge(registry, "uptime", "The number of seconds the server has been online.")
mqttBrokerBytesRecv = registerNewMQTTBrokerGauge(registry, "bytes_recv", "The total number of bytes received in all packets.")
mqttBrokerBytesSent = registerNewMQTTBrokerGauge(registry, "bytes_sent", "The total number of bytes sent to clients.")
mqttBrokerClientsConnected = registerNewMQTTBrokerGauge(registry, "clients_connected", "The number of currently connected clients.")
mqttBrokerClientsDisconnected = registerNewMQTTBrokerGauge(registry, "clients_disconnected", "The number of disconnected non-cleansession clients.")
mqttBrokerClientsMax = registerNewMQTTBrokerGauge(registry, "clients_max", "The maximum number of clients that have been concurrently connected.")
mqttBrokerClientsTotal = registerNewMQTTBrokerGauge(registry, "clients_total", "The sum of all clients, connected and disconnected.")
mqttBrokerConnectionsTotal = registerNewMQTTBrokerGauge(registry, "connections_total", "The sum number of clients which have ever connected.")
mqttBrokerMessagesRecv = registerNewMQTTBrokerGauge(registry, "messages_recv", "The total number of packets received.")
mqttBrokerMessagesSent = registerNewMQTTBrokerGauge(registry, "messages_sent", "The total number of packets sent.")
mqttBrokerPublishDropped = registerNewMQTTBrokerGauge(registry, "publish_dropped", "The number of in-flight publish messages which were dropped.")
mqttBrokerPublishRecv = registerNewMQTTBrokerGauge(registry, "publish_recv", "The total number of received publish packets.")
mqttBrokerPublishSent = registerNewMQTTBrokerGauge(registry, "publish_sent", "The total number of sent publish packets.")
mqttBrokerRetained = registerNewMQTTBrokerGauge(registry, "retained", "The number of messages currently retained.")
mqttBrokerBytesReceived = registerNewMQTTBrokerGauge(registry, "bytes_received", "Total number of bytes received since the broker started.")
mqttBrokerBytesSent = registerNewMQTTBrokerGauge(registry, "bytes_sent", "Total number of bytes sent since the broker started.")
mqttBrokerClientsConnected = registerNewMQTTBrokerGauge(registry, "clients_connected", "Number of currently connected clients.")
mqttBrokerClientsDisconnected = registerNewMQTTBrokerGauge(registry, "clients_disconnected", "Total number of persistent clients (with clean session disabled) that are registered at the broker but are currently disconnected.")
mqttBrokerClientsMaximum = registerNewMQTTBrokerGauge(registry, "clients_maximum", "Maximum number of active clients that have been connected.")
mqttBrokerClientsTotal = registerNewMQTTBrokerGauge(registry, "clients_total", "Total number of connected and disconnected clients with a persistent session currently connected and registered.")
mqttBrokerMessagesReceived = registerNewMQTTBrokerGauge(registry, "messages_received", "Total number of publish messages received.")
mqttBrokerMessagesSent = registerNewMQTTBrokerGauge(registry, "messages_sent", "Total number of publish messages sent.")
mqttBrokerMessagesDropped = registerNewMQTTBrokerGauge(registry, "messages_dropped", "Total number of publish messages dropped to slow subscriber.")
mqttBrokerRetained = registerNewMQTTBrokerGauge(registry, "retained", "Total number of retained messages active on the broker.")
mqttBrokerInflight = registerNewMQTTBrokerGauge(registry, "inflight", "The number of messages currently in-flight.")
mqttBrokerSubscriptions = registerNewMQTTBrokerGauge(registry, "subscriptions", "The total number of filter subscriptions.")
mqttBrokerInflightDropped = registerNewMQTTBrokerGauge(registry, "inflight_dropped", "The number of inflight messages which were dropped.")
mqttBrokerSubscriptions = registerNewMQTTBrokerGauge(registry, "subscriptions", "Total number of subscriptions active on the broker.")
mqttBrokerPacketsReceived = registerNewMQTTBrokerGauge(registry, "packets_received", "The total number of publish messages received.")
mqttBrokerPacketsSent = registerNewMQTTBrokerGauge(registry, "packets_sent", "Total number of messages of any type sent since the broker started.")
mqttBrokerMemoryAlloc = registerNewMQTTBrokerGauge(registry, "memory_alloc", "Memory currently allocated.")
mqttBrokerThreads = registerNewMQTTBrokerGauge(registry, "threads", "Number of active goroutines, named as threads for platform ambiguity.")
mqttBrokerSubscriptionManagerSubscribersSize = registerNewMQTTBrokerGauge(registry, "subscription_manager_subscribers_size", "The number of active subscribers in the subscription manager.")
mqttBrokerSubscriptionManagerTopicsSize = registerNewMQTTBrokerGauge(registry, "subscription_manager_topics_size", "The number of active topics in the subscription manager.")
}

func collectMQTTBroker(server *mqtt.Server) {
brokerSystemInfo := server.MQTTBroker.SystemInfo()

mqttBrokerAppInfo.With(prometheus.Labels{
"name": Component.App().Info().Name,
"version": Component.App().Info().Version,
"broker_version": server.MQTTBroker.SystemInfo().Version,
"broker_version": brokerSystemInfo.Version,
}).Set(1)
mqttBrokerStarted.Set(float64(server.MQTTBroker.SystemInfo().Started))
mqttBrokerUptime.Set(float64(server.MQTTBroker.SystemInfo().Uptime))
mqttBrokerBytesRecv.Set(float64(server.MQTTBroker.SystemInfo().BytesRecv))
mqttBrokerBytesSent.Set(float64(server.MQTTBroker.SystemInfo().BytesSent))
mqttBrokerClientsConnected.Set(float64(server.MQTTBroker.SystemInfo().ClientsConnected))
mqttBrokerClientsDisconnected.Set(float64(server.MQTTBroker.SystemInfo().ClientsDisconnected))
mqttBrokerClientsMax.Set(float64(server.MQTTBroker.SystemInfo().ClientsMax))
mqttBrokerClientsTotal.Set(float64(server.MQTTBroker.SystemInfo().ClientsTotal))
mqttBrokerConnectionsTotal.Set(float64(server.MQTTBroker.SystemInfo().ConnectionsTotal))
mqttBrokerMessagesRecv.Set(float64(server.MQTTBroker.SystemInfo().MessagesRecv))
mqttBrokerMessagesSent.Set(float64(server.MQTTBroker.SystemInfo().MessagesSent))
mqttBrokerPublishDropped.Set(float64(server.MQTTBroker.SystemInfo().PublishDropped))
mqttBrokerPublishRecv.Set(float64(server.MQTTBroker.SystemInfo().PublishRecv))
mqttBrokerPublishSent.Set(float64(server.MQTTBroker.SystemInfo().PublishSent))
mqttBrokerRetained.Set(float64(server.MQTTBroker.SystemInfo().Retained))
mqttBrokerInflight.Set(float64(server.MQTTBroker.SystemInfo().Inflight))
mqttBrokerSubscriptions.Set(float64(server.MQTTBroker.SystemInfo().Subscriptions))
mqttBrokerStarted.Set(float64(brokerSystemInfo.Started))
mqttBrokerTime.Set(float64(brokerSystemInfo.Time))
mqttBrokerUptime.Set(float64(brokerSystemInfo.Uptime))
mqttBrokerBytesReceived.Set(float64(brokerSystemInfo.BytesReceived))
mqttBrokerBytesSent.Set(float64(brokerSystemInfo.BytesSent))
mqttBrokerClientsConnected.Set(float64(brokerSystemInfo.ClientsConnected))
mqttBrokerClientsDisconnected.Set(float64(brokerSystemInfo.ClientsDisconnected))
mqttBrokerClientsMaximum.Set(float64(brokerSystemInfo.ClientsMaximum))
mqttBrokerClientsTotal.Set(float64(brokerSystemInfo.ClientsTotal))
mqttBrokerMessagesReceived.Set(float64(brokerSystemInfo.MessagesReceived))
mqttBrokerMessagesSent.Set(float64(brokerSystemInfo.MessagesSent))
mqttBrokerMessagesDropped.Set(float64(brokerSystemInfo.MessagesDropped))
mqttBrokerRetained.Set(float64(brokerSystemInfo.Retained))
mqttBrokerInflight.Set(float64(brokerSystemInfo.Inflight))
mqttBrokerInflightDropped.Set(float64(brokerSystemInfo.InflightDropped))
mqttBrokerSubscriptions.Set(float64(brokerSystemInfo.Subscriptions))
mqttBrokerPacketsReceived.Set(float64(brokerSystemInfo.PacketsReceived))
mqttBrokerPacketsSent.Set(float64(brokerSystemInfo.PacketsSent))
mqttBrokerMemoryAlloc.Set(float64(brokerSystemInfo.MemoryAlloc))
mqttBrokerThreads.Set(float64(brokerSystemInfo.Threads))
mqttBrokerSubscriptionManagerSubscribersSize.Set(float64(server.MQTTBroker.SubscribersSize()))
mqttBrokerSubscriptionManagerTopicsSize.Set(float64(server.MQTTBroker.TopicsSize()))
}
Loading

0 comments on commit 72f6205

Please sign in to comment.