diff --git a/.dockerignore b/.dockerignore
index 0f3fdd3..f152a5b 100644
--- a/.dockerignore
+++ b/.dockerignore
@@ -20,6 +20,7 @@
# Docker related files
.dockerignore
+Dockerfile*
docker-compose.yml
# Documentation
diff --git a/.gitignore b/.gitignore
index 18b3360..540ea1e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,3 +3,6 @@
inx-mqtt
go.work
go.work.sum
+
+# IDE related files
+.vscode/
diff --git a/Dockerfile.debug b/Dockerfile.debug
new file mode 100644
index 0000000..7e5be3a
--- /dev/null
+++ b/Dockerfile.debug
@@ -0,0 +1,55 @@
+# https://hub.docker.com/_/golang
+FROM golang:1.21-bullseye AS build
+
+# Ensure ca-certificates are up to date
+RUN update-ca-certificates
+
+# Set the current Working Directory inside the container
+RUN mkdir /scratch
+WORKDIR /scratch
+
+# Prepare the folder where we are putting all the files
+RUN mkdir /app
+
+# Copy everything from the current directory to the PWD(Present Working Directory) inside the container
+COPY . .
+
+# Download go modules
+RUN go mod download
+RUN go mod verify
+
+# Build the binary (disable inlining and optimizations that can interfere with debugging)
+RUN CGO_ENABLED=0 go build -a -gcflags "all=-N -l" -o /app/inx-mqtt .
+
+# Copy the assets
+COPY ./config_defaults.json /app/config.json
+
+############################
+# Image
+############################
+FROM golang:1.21-alpine
+
+# Install delve
+RUN CGO_ENABLED=0 go install -ldflags "-s -w -extldflags '-static'" github.com/go-delve/delve/cmd/dlv@latest
+
+# Create the nonroot user
+ARG USERNAME=nonroot_user
+ARG USER_UID=65532
+ARG USER_GID=$USER_UID
+ARG USER_HOME=/home/nonroot
+RUN addgroup --g $USER_GID $USERNAME && adduser --disabled-password --uid $USER_UID --ingroup "${USERNAME}" --shell /sbin/nologin --home $USER_HOME $USERNAME
+
+EXPOSE 1883/tcp
+EXPOSE 1888/tcp
+# Delve
+EXPOSE 4000
+
+# Copy the app dir into distroless image
+COPY --chown=nonroot:nonroot --from=build /app /app
+
+WORKDIR /app
+
+# Set USER nonroot
+USER $USERNAME
+
+ENTRYPOINT [ "/go/bin/dlv", "--listen=:4000", "--headless=true", "--log=true", "--accept-multiclient", "--api-version=2", "exec", "/app/inx-mqtt", "--" ]
\ No newline at end of file
diff --git a/components/app/app.go b/components/app/app.go
index 3e0e617..37ad2da 100644
--- a/components/app/app.go
+++ b/components/app/app.go
@@ -14,7 +14,7 @@ var (
Name = "inx-mqtt"
// Version of the app.
- Version = "2.0.0-alpha.5"
+ Version = "2.0.0-alpha.6"
)
func App() *app.App {
diff --git a/components/mqtt/component.go b/components/mqtt/component.go
index 83db251..fa9e400 100644
--- a/components/mqtt/component.go
+++ b/components/mqtt/component.go
@@ -45,21 +45,23 @@ func provide(c *dig.Container) error {
return c.Provide(func(deps inDeps) (*mqtt.Server, error) {
broker, err := broker.NewBroker(
- broker.WithBufferSize(ParamsMQTT.BufferSize),
- broker.WithBufferBlockSize(ParamsMQTT.BufferBlockSize),
- broker.WithMaxTopicSubscriptionsPerClient(ParamsMQTT.Subscriptions.MaxTopicSubscriptionsPerClient),
- broker.WithTopicCleanupThresholdCount(ParamsMQTT.Subscriptions.TopicsCleanupThresholdCount),
- broker.WithTopicCleanupThresholdRatio(ParamsMQTT.Subscriptions.TopicsCleanupThresholdRatio),
broker.WithWebsocketEnabled(ParamsMQTT.Websocket.Enabled),
broker.WithWebsocketBindAddress(ParamsMQTT.Websocket.BindAddress),
broker.WithTCPEnabled(ParamsMQTT.TCP.Enabled),
broker.WithTCPBindAddress(ParamsMQTT.TCP.BindAddress),
- broker.WithTCPAuthEnabled(ParamsMQTT.TCP.Auth.Enabled),
- broker.WithTCPAuthPasswordSalt(ParamsMQTT.TCP.Auth.PasswordSalt),
- broker.WithTCPAuthUsers(ParamsMQTT.TCP.Auth.Users),
broker.WithTCPTLSEnabled(ParamsMQTT.TCP.TLS.Enabled),
broker.WithTCPTLSCertificatePath(ParamsMQTT.TCP.TLS.CertificatePath),
broker.WithTCPTLSPrivateKeyPath(ParamsMQTT.TCP.TLS.PrivateKeyPath),
+ broker.WithAuthPasswordSalt(ParamsMQTT.Auth.PasswordSalt),
+ broker.WithAuthUsers(ParamsMQTT.Auth.Users),
+ broker.WithPublicTopics(ParamsMQTT.PublicTopics),
+ broker.WithProtectedTopics(ParamsMQTT.ProtectedTopics),
+ broker.WithMaxTopicSubscriptionsPerClient(ParamsMQTT.Subscriptions.MaxTopicSubscriptionsPerClient),
+ broker.WithTopicCleanupThresholdCount(ParamsMQTT.Subscriptions.TopicsCleanupThresholdCount),
+ broker.WithTopicCleanupThresholdRatio(ParamsMQTT.Subscriptions.TopicsCleanupThresholdRatio),
+ broker.WithMaximumClientWritesPending(ParamsMQTT.MaximumClientWritesPending),
+ broker.WithClientWriteBufferSize(ParamsMQTT.ClientWriteBufferSize),
+ broker.WithClientReadBufferSize(ParamsMQTT.ClientReadBufferSize),
)
if err != nil {
return nil, ierrors.Wrap(err, "failed to create MQTT broker")
diff --git a/components/mqtt/params.go b/components/mqtt/params.go
index e1da867..5b2827b 100644
--- a/components/mqtt/params.go
+++ b/components/mqtt/params.go
@@ -3,15 +3,6 @@ package mqtt
import "github.com/iotaledger/hive.go/app"
type ParametersMQTT struct {
- BufferSize int `default:"0" usage:"the size of the client buffers in bytes"`
- BufferBlockSize int `default:"0" usage:"the size per client buffer R/W block in bytes"`
-
- Subscriptions struct {
- MaxTopicSubscriptionsPerClient int `default:"1000" usage:"the maximum number of topic subscriptions per client before the client gets dropped (DOS protection)"`
- TopicsCleanupThresholdCount int `default:"10000" usage:"the number of deleted topics that trigger a garbage collection of the subscription manager"`
- TopicsCleanupThresholdRatio float32 `default:"1.0" usage:"the ratio of subscribed topics to deleted topics that trigger a garbage collection of the subscription manager"`
- }
-
Websocket struct {
Enabled bool `default:"true" usage:"whether to enable the websocket connection of the MQTT broker"`
BindAddress string `default:"localhost:1888" usage:"the websocket bind address on which the MQTT broker listens on"`
@@ -22,25 +13,46 @@ type ParametersMQTT struct {
Enabled bool `default:"false" usage:"whether to enable the TCP connection of the MQTT broker"`
BindAddress string `default:"localhost:1883" usage:"the TCP bind address on which the MQTT broker listens on"`
- Auth struct {
- Enabled bool `default:"false" usage:"whether to enable auth for TCP connections"`
- PasswordSalt string `default:"0000000000000000000000000000000000000000000000000000000000000000" usage:"the auth salt used for hashing the passwords of the users"`
- Users map[string]string `usage:"the list of allowed users with their password+salt as a scrypt hash"`
- }
-
TLS struct {
Enabled bool `default:"false" usage:"whether to enable TLS for TCP connections"`
PrivateKeyPath string `default:"private_key.pem" usage:"the path to the private key file (x509 PEM) for TCP connections with TLS"`
CertificatePath string `default:"certificate.pem" usage:"the path to the certificate file (x509 PEM) for TCP connections with TLS"`
} `name:"tls"`
} `name:"tcp"`
+
+ Auth struct {
+ PasswordSalt string `default:"0000000000000000000000000000000000000000000000000000000000000000" usage:"the auth salt used for hashing the passwords of the users"`
+ Users map[string]string `usage:"the list of allowed users with their password+salt as a scrypt hash"`
+ }
+
+ PublicTopics []string `usage:"the MQTT topics which can be subscribed to without authorization. Wildcards using * are allowed"`
+ ProtectedTopics []string `usage:"the MQTT topics which only can be subscribed to with valid authorization. Wildcards using * are allowed"`
+
+ Subscriptions struct {
+ MaxTopicSubscriptionsPerClient int `default:"1000" usage:"the maximum number of topic subscriptions per client before the client gets dropped (DOS protection)"`
+ TopicsCleanupThresholdCount int `default:"10000" usage:"the number of deleted topics that trigger a garbage collection of the subscription manager"`
+ TopicsCleanupThresholdRatio float32 `default:"1.0" usage:"the ratio of subscribed topics to deleted topics that trigger a garbage collection of the subscription manager"`
+ }
+
+ MaximumClientWritesPending int `default:"0" usage:"the maximum number of pending message writes for a client"`
+ ClientWriteBufferSize int `default:"0" usage:"the size of the client write buffer"`
+ ClientReadBufferSize int `default:"0" usage:"the size of the client read buffer"`
}
-var ParamsMQTT = &ParametersMQTT{}
+var ParamsMQTT = &ParametersMQTT{
+ PublicTopics: []string{
+ "commitments/*",
+ "blocks/*",
+ "transactions/*",
+ "block-metadata/*",
+ "outputs/*",
+ },
+ ProtectedTopics: []string{},
+}
var params = &app.ComponentParams{
Params: map[string]any{
"mqtt": ParamsMQTT,
},
- Masked: nil,
+ Masked: []string{"mqtt.auth.passwordSalt"},
}
diff --git a/components/prometheus/component.go b/components/prometheus/component.go
index bcf3e6c..29b0e69 100644
--- a/components/prometheus/component.go
+++ b/components/prometheus/component.go
@@ -7,13 +7,13 @@ import (
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
- "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/dig"
"github.com/iotaledger/hive.go/app"
+ "github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/inx-mqtt/pkg/daemon"
"github.com/iotaledger/inx-mqtt/pkg/mqtt"
)
@@ -94,7 +94,7 @@ func run() error {
go func() {
Component.LogInfof("You can now access the Prometheus exporter using: http://%s/metrics", ParamsPrometheus.BindAddress)
- if err := deps.PrometheusEcho.Start(ParamsPrometheus.BindAddress); err != nil && !errors.Is(err, http.ErrServerClosed) {
+ if err := deps.PrometheusEcho.Start(ParamsPrometheus.BindAddress); err != nil && !ierrors.Is(err, http.ErrServerClosed) {
Component.LogWarnf("Stopped Prometheus exporter due to an error (%s)", err)
}
}()
diff --git a/components/prometheus/metrics.go b/components/prometheus/metrics.go
index b32be61..a609edc 100644
--- a/components/prometheus/metrics.go
+++ b/components/prometheus/metrics.go
@@ -9,22 +9,25 @@ import (
var (
mqttBrokerAppInfo *prometheus.GaugeVec
mqttBrokerStarted prometheus.Gauge
+ mqttBrokerTime prometheus.Gauge
mqttBrokerUptime prometheus.Gauge
- mqttBrokerBytesRecv prometheus.Gauge
+ mqttBrokerBytesReceived prometheus.Gauge
mqttBrokerBytesSent prometheus.Gauge
mqttBrokerClientsConnected prometheus.Gauge
mqttBrokerClientsDisconnected prometheus.Gauge
- mqttBrokerClientsMax prometheus.Gauge
+ mqttBrokerClientsMaximum prometheus.Gauge
mqttBrokerClientsTotal prometheus.Gauge
- mqttBrokerConnectionsTotal prometheus.Gauge
- mqttBrokerMessagesRecv prometheus.Gauge
+ mqttBrokerMessagesReceived prometheus.Gauge
mqttBrokerMessagesSent prometheus.Gauge
- mqttBrokerPublishDropped prometheus.Gauge
- mqttBrokerPublishRecv prometheus.Gauge
- mqttBrokerPublishSent prometheus.Gauge
+ mqttBrokerMessagesDropped prometheus.Gauge
mqttBrokerRetained prometheus.Gauge
mqttBrokerInflight prometheus.Gauge
+ mqttBrokerInflightDropped prometheus.Gauge
mqttBrokerSubscriptions prometheus.Gauge
+ mqttBrokerPacketsReceived prometheus.Gauge
+ mqttBrokerPacketsSent prometheus.Gauge
+ mqttBrokerMemoryAlloc prometheus.Gauge
+ mqttBrokerThreads prometheus.Gauge
mqttBrokerSubscriptionManagerSubscribersSize prometheus.Gauge
mqttBrokerSubscriptionManagerTopicsSize prometheus.Gauge
)
@@ -58,49 +61,57 @@ func registerNewMQTTBrokerGauge(registry *prometheus.Registry, name string, help
func registerMQTTMetrics(registry *prometheus.Registry) {
mqttBrokerAppInfo = registerNewMQTTBrokerGaugeVec(registry, "app_info", []string{"name", "version", "broker_version"}, "The current version of the server.")
mqttBrokerStarted = registerNewMQTTBrokerGauge(registry, "started", "The time the server started in unix seconds.")
+ mqttBrokerTime = registerNewMQTTBrokerGauge(registry, "time", "Current time on the server.")
mqttBrokerUptime = registerNewMQTTBrokerGauge(registry, "uptime", "The number of seconds the server has been online.")
- mqttBrokerBytesRecv = registerNewMQTTBrokerGauge(registry, "bytes_recv", "The total number of bytes received in all packets.")
- mqttBrokerBytesSent = registerNewMQTTBrokerGauge(registry, "bytes_sent", "The total number of bytes sent to clients.")
- mqttBrokerClientsConnected = registerNewMQTTBrokerGauge(registry, "clients_connected", "The number of currently connected clients.")
- mqttBrokerClientsDisconnected = registerNewMQTTBrokerGauge(registry, "clients_disconnected", "The number of disconnected non-cleansession clients.")
- mqttBrokerClientsMax = registerNewMQTTBrokerGauge(registry, "clients_max", "The maximum number of clients that have been concurrently connected.")
- mqttBrokerClientsTotal = registerNewMQTTBrokerGauge(registry, "clients_total", "The sum of all clients, connected and disconnected.")
- mqttBrokerConnectionsTotal = registerNewMQTTBrokerGauge(registry, "connections_total", "The sum number of clients which have ever connected.")
- mqttBrokerMessagesRecv = registerNewMQTTBrokerGauge(registry, "messages_recv", "The total number of packets received.")
- mqttBrokerMessagesSent = registerNewMQTTBrokerGauge(registry, "messages_sent", "The total number of packets sent.")
- mqttBrokerPublishDropped = registerNewMQTTBrokerGauge(registry, "publish_dropped", "The number of in-flight publish messages which were dropped.")
- mqttBrokerPublishRecv = registerNewMQTTBrokerGauge(registry, "publish_recv", "The total number of received publish packets.")
- mqttBrokerPublishSent = registerNewMQTTBrokerGauge(registry, "publish_sent", "The total number of sent publish packets.")
- mqttBrokerRetained = registerNewMQTTBrokerGauge(registry, "retained", "The number of messages currently retained.")
+ mqttBrokerBytesReceived = registerNewMQTTBrokerGauge(registry, "bytes_received", "Total number of bytes received since the broker started.")
+ mqttBrokerBytesSent = registerNewMQTTBrokerGauge(registry, "bytes_sent", "Total number of bytes sent since the broker started.")
+ mqttBrokerClientsConnected = registerNewMQTTBrokerGauge(registry, "clients_connected", "Number of currently connected clients.")
+ mqttBrokerClientsDisconnected = registerNewMQTTBrokerGauge(registry, "clients_disconnected", "Total number of persistent clients (with clean session disabled) that are registered at the broker but are currently disconnected.")
+ mqttBrokerClientsMaximum = registerNewMQTTBrokerGauge(registry, "clients_maximum", "Maximum number of active clients that have been connected.")
+ mqttBrokerClientsTotal = registerNewMQTTBrokerGauge(registry, "clients_total", "Total number of connected and disconnected clients with a persistent session currently connected and registered.")
+ mqttBrokerMessagesReceived = registerNewMQTTBrokerGauge(registry, "messages_received", "Total number of publish messages received.")
+ mqttBrokerMessagesSent = registerNewMQTTBrokerGauge(registry, "messages_sent", "Total number of publish messages sent.")
+ mqttBrokerMessagesDropped = registerNewMQTTBrokerGauge(registry, "messages_dropped", "Total number of publish messages dropped to slow subscriber.")
+ mqttBrokerRetained = registerNewMQTTBrokerGauge(registry, "retained", "Total number of retained messages active on the broker.")
mqttBrokerInflight = registerNewMQTTBrokerGauge(registry, "inflight", "The number of messages currently in-flight.")
- mqttBrokerSubscriptions = registerNewMQTTBrokerGauge(registry, "subscriptions", "The total number of filter subscriptions.")
+ mqttBrokerInflightDropped = registerNewMQTTBrokerGauge(registry, "inflight_dropped", "The number of inflight messages which were dropped.")
+ mqttBrokerSubscriptions = registerNewMQTTBrokerGauge(registry, "subscriptions", "Total number of subscriptions active on the broker.")
+ mqttBrokerPacketsReceived = registerNewMQTTBrokerGauge(registry, "packets_received", "The total number of publish messages received.")
+ mqttBrokerPacketsSent = registerNewMQTTBrokerGauge(registry, "packets_sent", "Total number of messages of any type sent since the broker started.")
+ mqttBrokerMemoryAlloc = registerNewMQTTBrokerGauge(registry, "memory_alloc", "Memory currently allocated.")
+ mqttBrokerThreads = registerNewMQTTBrokerGauge(registry, "threads", "Number of active goroutines, named as threads for platform ambiguity.")
mqttBrokerSubscriptionManagerSubscribersSize = registerNewMQTTBrokerGauge(registry, "subscription_manager_subscribers_size", "The number of active subscribers in the subscription manager.")
mqttBrokerSubscriptionManagerTopicsSize = registerNewMQTTBrokerGauge(registry, "subscription_manager_topics_size", "The number of active topics in the subscription manager.")
}
func collectMQTTBroker(server *mqtt.Server) {
+ brokerSystemInfo := server.MQTTBroker.SystemInfo()
+
mqttBrokerAppInfo.With(prometheus.Labels{
"name": Component.App().Info().Name,
"version": Component.App().Info().Version,
- "broker_version": server.MQTTBroker.SystemInfo().Version,
+ "broker_version": brokerSystemInfo.Version,
}).Set(1)
- mqttBrokerStarted.Set(float64(server.MQTTBroker.SystemInfo().Started))
- mqttBrokerUptime.Set(float64(server.MQTTBroker.SystemInfo().Uptime))
- mqttBrokerBytesRecv.Set(float64(server.MQTTBroker.SystemInfo().BytesRecv))
- mqttBrokerBytesSent.Set(float64(server.MQTTBroker.SystemInfo().BytesSent))
- mqttBrokerClientsConnected.Set(float64(server.MQTTBroker.SystemInfo().ClientsConnected))
- mqttBrokerClientsDisconnected.Set(float64(server.MQTTBroker.SystemInfo().ClientsDisconnected))
- mqttBrokerClientsMax.Set(float64(server.MQTTBroker.SystemInfo().ClientsMax))
- mqttBrokerClientsTotal.Set(float64(server.MQTTBroker.SystemInfo().ClientsTotal))
- mqttBrokerConnectionsTotal.Set(float64(server.MQTTBroker.SystemInfo().ConnectionsTotal))
- mqttBrokerMessagesRecv.Set(float64(server.MQTTBroker.SystemInfo().MessagesRecv))
- mqttBrokerMessagesSent.Set(float64(server.MQTTBroker.SystemInfo().MessagesSent))
- mqttBrokerPublishDropped.Set(float64(server.MQTTBroker.SystemInfo().PublishDropped))
- mqttBrokerPublishRecv.Set(float64(server.MQTTBroker.SystemInfo().PublishRecv))
- mqttBrokerPublishSent.Set(float64(server.MQTTBroker.SystemInfo().PublishSent))
- mqttBrokerRetained.Set(float64(server.MQTTBroker.SystemInfo().Retained))
- mqttBrokerInflight.Set(float64(server.MQTTBroker.SystemInfo().Inflight))
- mqttBrokerSubscriptions.Set(float64(server.MQTTBroker.SystemInfo().Subscriptions))
+ mqttBrokerStarted.Set(float64(brokerSystemInfo.Started))
+ mqttBrokerTime.Set(float64(brokerSystemInfo.Time))
+ mqttBrokerUptime.Set(float64(brokerSystemInfo.Uptime))
+ mqttBrokerBytesReceived.Set(float64(brokerSystemInfo.BytesReceived))
+ mqttBrokerBytesSent.Set(float64(brokerSystemInfo.BytesSent))
+ mqttBrokerClientsConnected.Set(float64(brokerSystemInfo.ClientsConnected))
+ mqttBrokerClientsDisconnected.Set(float64(brokerSystemInfo.ClientsDisconnected))
+ mqttBrokerClientsMaximum.Set(float64(brokerSystemInfo.ClientsMaximum))
+ mqttBrokerClientsTotal.Set(float64(brokerSystemInfo.ClientsTotal))
+ mqttBrokerMessagesReceived.Set(float64(brokerSystemInfo.MessagesReceived))
+ mqttBrokerMessagesSent.Set(float64(brokerSystemInfo.MessagesSent))
+ mqttBrokerMessagesDropped.Set(float64(brokerSystemInfo.MessagesDropped))
+ mqttBrokerRetained.Set(float64(brokerSystemInfo.Retained))
+ mqttBrokerInflight.Set(float64(brokerSystemInfo.Inflight))
+ mqttBrokerInflightDropped.Set(float64(brokerSystemInfo.InflightDropped))
+ mqttBrokerSubscriptions.Set(float64(brokerSystemInfo.Subscriptions))
+ mqttBrokerPacketsReceived.Set(float64(brokerSystemInfo.PacketsReceived))
+ mqttBrokerPacketsSent.Set(float64(brokerSystemInfo.PacketsSent))
+ mqttBrokerMemoryAlloc.Set(float64(brokerSystemInfo.MemoryAlloc))
+ mqttBrokerThreads.Set(float64(brokerSystemInfo.Threads))
mqttBrokerSubscriptionManagerSubscribersSize.Set(float64(server.MQTTBroker.SubscribersSize()))
mqttBrokerSubscriptionManagerTopicsSize.Set(float64(server.MQTTBroker.TopicsSize()))
}
diff --git a/config_defaults.json b/config_defaults.json
index 2f946d9..bca5233 100644
--- a/config_defaults.json
+++ b/config_defaults.json
@@ -29,13 +29,6 @@
"targetNetworkName": ""
},
"mqtt": {
- "bufferSize": 0,
- "bufferBlockSize": 0,
- "subscriptions": {
- "maxTopicSubscriptionsPerClient": 1000,
- "topicsCleanupThresholdCount": 10000,
- "topicsCleanupThresholdRatio": 1
- },
"websocket": {
"enabled": true,
"bindAddress": "localhost:1888",
@@ -44,17 +37,32 @@
"tcp": {
"enabled": false,
"bindAddress": "localhost:1883",
- "auth": {
- "enabled": false,
- "passwordSalt": "0000000000000000000000000000000000000000000000000000000000000000",
- "users": null
- },
"tls": {
"enabled": false,
"privateKeyPath": "private_key.pem",
"certificatePath": "certificate.pem"
}
- }
+ },
+ "auth": {
+ "passwordSalt": "0000000000000000000000000000000000000000000000000000000000000000",
+ "users": null
+ },
+ "publicTopics": [
+ "commitments/*",
+ "blocks/*",
+ "transactions/*",
+ "block-metadata/*",
+ "outputs/*"
+ ],
+ "protectedTopics": [],
+ "subscriptions": {
+ "maxTopicSubscriptionsPerClient": 1000,
+ "topicsCleanupThresholdCount": 10000,
+ "topicsCleanupThresholdRatio": 1
+ },
+ "maximumClientWritesPending": 0,
+ "clientWriteBufferSize": 0,
+ "clientReadBufferSize": 0
},
"profiling": {
"enabled": false,
diff --git a/configuration.md b/configuration.md
index 743249c..704469f 100755
--- a/configuration.md
+++ b/configuration.md
@@ -131,21 +131,17 @@ Example:
## 4. MQTT
-| Name | Description | Type | Default value |
-| ------------------------------------ | --------------------------------------------- | ------ | ------------- |
-| bufferSize | The size of the client buffers in bytes | int | 0 |
-| bufferBlockSize | The size per client buffer R/W block in bytes | int | 0 |
-| [subscriptions](#mqtt_subscriptions) | Configuration for subscriptions | object | |
-| [websocket](#mqtt_websocket) | Configuration for websocket | object | |
-| [tcp](#mqtt_tcp) | Configuration for TCP | object | |
-
-### Subscriptions
-
-| Name | Description | Type | Default value |
-| ------------------------------ | -------------------------------------------------------------------------------------------------------------- | ----- | ------------- |
-| maxTopicSubscriptionsPerClient | The maximum number of topic subscriptions per client before the client gets dropped (DOS protection) | int | 1000 |
-| topicsCleanupThresholdCount | The number of deleted topics that trigger a garbage collection of the subscription manager | int | 10000 |
-| topicsCleanupThresholdRatio | The ratio of subscribed topics to deleted topics that trigger a garbage collection of the subscription manager | float | 1.0 |
+| Name | Description | Type | Default value |
+| ------------------------------------ | ------------------------------------------------------------------------------------------------------- | ------ | -------------------------------------------------------------------------------- |
+| [websocket](#mqtt_websocket) | Configuration for websocket | object | |
+| [tcp](#mqtt_tcp) | Configuration for TCP | object | |
+| [auth](#mqtt_auth) | Configuration for auth | object | |
+| publicTopics | The MQTT topics which can be subscribed to without authorization. Wildcards using \* are allowed | array | commitments/\*
blocks/\*
transactions/\*
block-metadata/\*
outputs/\* |
+| protectedTopics | The MQTT topics which only can be subscribed to with valid authorization. Wildcards using \* are allowed | array | |
+| [subscriptions](#mqtt_subscriptions) | Configuration for subscriptions | object | |
+| maximumClientWritesPending | The maximum number of pending message writes for a client | int | 0 |
+| clientWriteBufferSize | The size of the client write buffer | int | 0 |
+| clientReadBufferSize | The size of the client read buffer | int | 0 |
### Websocket
@@ -157,20 +153,11 @@ Example:
### TCP
-| Name | Description | Type | Default value |
-| ---------------------- | -------------------------------------------------------- | ------- | ---------------- |
-| enabled | Whether to enable the TCP connection of the MQTT broker | boolean | false |
-| bindAddress | The TCP bind address on which the MQTT broker listens on | string | "localhost:1883" |
-| [auth](#mqtt_tcp_auth) | Configuration for auth | object | |
-| [tls](#mqtt_tcp_tls) | Configuration for TLS | object | |
-
-### Auth
-
-| Name | Description | Type | Default value |
-| ------------ | ------------------------------------------------------------------- | ------- | ------------------------------------------------------------------ |
-| enabled | Whether to enable auth for TCP connections | boolean | false |
-| passwordSalt | The auth salt used for hashing the passwords of the users | string | "0000000000000000000000000000000000000000000000000000000000000000" |
-| users | The list of allowed users with their password+salt as a scrypt hash | object | [] |
+| Name | Description | Type | Default value |
+| -------------------- | -------------------------------------------------------- | ------- | ---------------- |
+| enabled | Whether to enable the TCP connection of the MQTT broker | boolean | false |
+| bindAddress | The TCP bind address on which the MQTT broker listens on | string | "localhost:1883" |
+| [tls](#mqtt_tcp_tls) | Configuration for TLS | object | |
### TLS
@@ -180,18 +167,26 @@ Example:
| privateKeyPath | The path to the private key file (x509 PEM) for TCP connections with TLS | string | "private_key.pem" |
| certificatePath | The path to the certificate file (x509 PEM) for TCP connections with TLS | string | "certificate.pem" |
+### Auth
+
+| Name | Description | Type | Default value |
+| ------------ | ------------------------------------------------------------------- | ------ | ------------------------------------------------------------------ |
+| passwordSalt | The auth salt used for hashing the passwords of the users | string | "0000000000000000000000000000000000000000000000000000000000000000" |
+| users | The list of allowed users with their password+salt as a scrypt hash | object | [] |
+
+### Subscriptions
+
+| Name | Description | Type | Default value |
+| ------------------------------ | -------------------------------------------------------------------------------------------------------------- | ----- | ------------- |
+| maxTopicSubscriptionsPerClient | The maximum number of topic subscriptions per client before the client gets dropped (DOS protection) | int | 1000 |
+| topicsCleanupThresholdCount | The number of deleted topics that trigger a garbage collection of the subscription manager | int | 10000 |
+| topicsCleanupThresholdRatio | The ratio of subscribed topics to deleted topics that trigger a garbage collection of the subscription manager | float | 1.0 |
+
Example:
```json
{
"mqtt": {
- "bufferSize": 0,
- "bufferBlockSize": 0,
- "subscriptions": {
- "maxTopicSubscriptionsPerClient": 1000,
- "topicsCleanupThresholdCount": 10000,
- "topicsCleanupThresholdRatio": 1
- },
"websocket": {
"enabled": true,
"bindAddress": "localhost:1888",
@@ -200,17 +195,32 @@ Example:
"tcp": {
"enabled": false,
"bindAddress": "localhost:1883",
- "auth": {
- "enabled": false,
- "passwordSalt": "0000000000000000000000000000000000000000000000000000000000000000",
- "users": null
- },
"tls": {
"enabled": false,
"privateKeyPath": "private_key.pem",
"certificatePath": "certificate.pem"
}
- }
+ },
+ "auth": {
+ "passwordSalt": "0000000000000000000000000000000000000000000000000000000000000000",
+ "users": null
+ },
+ "publicTopics": [
+ "commitments/*",
+ "blocks/*",
+ "transactions/*",
+ "block-metadata/*",
+ "outputs/*"
+ ],
+ "protectedTopics": [],
+ "subscriptions": {
+ "maxTopicSubscriptionsPerClient": 1000,
+ "topicsCleanupThresholdCount": 10000,
+ "topicsCleanupThresholdRatio": 1
+ },
+ "maximumClientWritesPending": 0,
+ "clientWriteBufferSize": 0,
+ "clientReadBufferSize": 0
}
}
```
diff --git a/go.mod b/go.mod
index 0b6d910..3e4ce9c 100644
--- a/go.mod
+++ b/go.mod
@@ -2,21 +2,18 @@ module github.com/iotaledger/inx-mqtt
go 1.21
-replace github.com/mochi-co/mqtt => github.com/alexsporn/mqtt v0.0.0-20220909140721-d60c438960a4
-
require (
- github.com/iotaledger/hive.go/app v0.0.0-20231206114953-6a65a82e30ad
- github.com/iotaledger/hive.go/ierrors v0.0.0-20231206114953-6a65a82e30ad
- github.com/iotaledger/hive.go/lo v0.0.0-20231206114953-6a65a82e30ad
- github.com/iotaledger/hive.go/logger v0.0.0-20231206114953-6a65a82e30ad
- github.com/iotaledger/hive.go/runtime v0.0.0-20231206114953-6a65a82e30ad
- github.com/iotaledger/hive.go/web v0.0.0-20231130122510-e3dddb0214f0
+ github.com/iotaledger/hive.go/app v0.0.0-20231207181026-f482ac139305
+ github.com/iotaledger/hive.go/ierrors v0.0.0-20231207181026-f482ac139305
+ github.com/iotaledger/hive.go/lo v0.0.0-20231207181026-f482ac139305
+ github.com/iotaledger/hive.go/logger v0.0.0-20231207181026-f482ac139305
+ github.com/iotaledger/hive.go/runtime v0.0.0-20231207181026-f482ac139305
+ github.com/iotaledger/hive.go/web v0.0.0-20231207181026-f482ac139305
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20231206124511-b78dc962031f
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20231206124145-f773dfe3927e
github.com/iotaledger/iota.go/v4 v4.0.0-20231206123921-2af411eef0b5
github.com/labstack/echo/v4 v4.11.3
- github.com/mochi-co/mqtt v1.3.2
- github.com/pkg/errors v0.9.1
+ github.com/mochi-mqtt/server/v2 v2.4.2
github.com/prometheus/client_golang v1.17.0
github.com/stretchr/testify v1.8.4
go.uber.org/dig v1.17.1
@@ -46,12 +43,12 @@ require (
github.com/hashicorp/go-version v1.6.0 // indirect
github.com/holiman/uint256 v1.2.4 // indirect
github.com/iancoleman/orderedmap v0.3.0 // indirect
- github.com/iotaledger/hive.go/constraints v0.0.0-20231206114953-6a65a82e30ad // indirect
- github.com/iotaledger/hive.go/core v1.0.0-rc.3.0.20231206114953-6a65a82e30ad // indirect
- github.com/iotaledger/hive.go/crypto v0.0.0-20231206114953-6a65a82e30ad // indirect
- github.com/iotaledger/hive.go/ds v0.0.0-20231206114953-6a65a82e30ad // indirect
- github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20231206114953-6a65a82e30ad // indirect
- github.com/iotaledger/hive.go/stringify v0.0.0-20231206114953-6a65a82e30ad // indirect
+ github.com/iotaledger/hive.go/constraints v0.0.0-20231207181026-f482ac139305 // indirect
+ github.com/iotaledger/hive.go/core v1.0.0-rc.3.0.20231207181026-f482ac139305 // indirect
+ github.com/iotaledger/hive.go/crypto v0.0.0-20231207181026-f482ac139305 // indirect
+ github.com/iotaledger/hive.go/ds v0.0.0-20231207181026-f482ac139305 // indirect
+ github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20231207181026-f482ac139305 // indirect
+ github.com/iotaledger/hive.go/stringify v0.0.0-20231207181026-f482ac139305 // indirect
github.com/knadh/koanf v1.5.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/labstack/gommon v0.4.1 // indirect
@@ -64,7 +61,7 @@ require (
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/pasztorpisti/qs v0.0.0-20171216220353-8d6c33ee906c // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
- github.com/petermattis/goid v0.0.0-20231126143041-f558c26febf5 // indirect
+ github.com/petermattis/goid v0.0.0-20231207134359-e60b3f734c67 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.45.0 // indirect
diff --git a/go.sum b/go.sum
index ccfe246..3bfa23c 100644
--- a/go.sum
+++ b/go.sum
@@ -8,8 +8,6 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
-github.com/alexsporn/mqtt v0.0.0-20220909140721-d60c438960a4 h1:gH1kgQ6DT3UWcls15FgGBuoose4XfthuqM9AW+H+iXc=
-github.com/alexsporn/mqtt v0.0.0-20220909140721-d60c438960a4/go.mod h1:o0lhQFWL8QtR1+8a9JZmbY8FhZ89MF8vGOGHJNFbCB8=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
@@ -184,30 +182,30 @@ github.com/holiman/uint256 v1.2.4/go.mod h1:EOMSn4q6Nyt9P6efbI3bueV4e1b3dGlUCXei
github.com/iancoleman/orderedmap v0.3.0 h1:5cbR2grmZR/DiVt+VJopEhtVs9YGInGIxAoMJn+Ichc=
github.com/iancoleman/orderedmap v0.3.0/go.mod h1:XuLcCUkdL5owUCQeF2Ue9uuw1EptkJDkXXS7VoV7XGE=
github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w=
-github.com/iotaledger/hive.go/app v0.0.0-20231206114953-6a65a82e30ad h1:v7dkbVLSsmzgOWT2vjvv1MdKQXvqFbvIkx8mvh6VK7g=
-github.com/iotaledger/hive.go/app v0.0.0-20231206114953-6a65a82e30ad/go.mod h1:hTHKGFbZnuiW8yEgDuuL7ZjQTCnl8bXyHLmj3LPa648=
-github.com/iotaledger/hive.go/constraints v0.0.0-20231206114953-6a65a82e30ad h1:4XL7IIvdsWHxSKQfU+sgq3H9egN54053LF9TwMfDcTg=
-github.com/iotaledger/hive.go/constraints v0.0.0-20231206114953-6a65a82e30ad/go.mod h1:dOBOM2s4se3HcWefPe8sQLUalGXJ8yVXw58oK8jke3s=
-github.com/iotaledger/hive.go/core v1.0.0-rc.3.0.20231206114953-6a65a82e30ad h1:iNzb/Oy/nucIOXOzRcwSqqFsaeKwr2JZpZYSLp8xjlE=
-github.com/iotaledger/hive.go/core v1.0.0-rc.3.0.20231206114953-6a65a82e30ad/go.mod h1:CO28KMA6Pp5LJPiigPQQ276zQofES+jMod08U5pyRFA=
-github.com/iotaledger/hive.go/crypto v0.0.0-20231206114953-6a65a82e30ad h1:pUL2UZbF4S8FIV7uKo9p+IGfZ658K1VNorQ6rzDMRvs=
-github.com/iotaledger/hive.go/crypto v0.0.0-20231206114953-6a65a82e30ad/go.mod h1:7vHoF//1Pt3nu0l8nDIw7bEgv2GfbL3kSgjp7Rdqhd4=
-github.com/iotaledger/hive.go/ds v0.0.0-20231206114953-6a65a82e30ad h1:adLrD6dOEkM5Xdg6AOPt9/HYqy/pQ5FrprDpW4/VqUU=
-github.com/iotaledger/hive.go/ds v0.0.0-20231206114953-6a65a82e30ad/go.mod h1:NmZRIoxtL6iQdVK6n5W+JOx58K/0Yn8k7WuSvpKPQ+M=
-github.com/iotaledger/hive.go/ierrors v0.0.0-20231206114953-6a65a82e30ad h1:WDl58zJKHfwbzHs+ZB8Jq3YNgVQE5Neu2NeaX3FZuyU=
-github.com/iotaledger/hive.go/ierrors v0.0.0-20231206114953-6a65a82e30ad/go.mod h1:HcE8B5lP96enc/OALTb2/rIIi+yOLouRoHOKRclKmC8=
-github.com/iotaledger/hive.go/lo v0.0.0-20231206114953-6a65a82e30ad h1:qpCsjw+InLL824QPu3lY/osck4DhucBKhCs5/E8OH+A=
-github.com/iotaledger/hive.go/lo v0.0.0-20231206114953-6a65a82e30ad/go.mod h1:ETXGXymFyNcUq2t4I9e7ZK18f9bxUWYat4pjZ9W0rWc=
-github.com/iotaledger/hive.go/logger v0.0.0-20231206114953-6a65a82e30ad h1:fazCxogqOLDEPNDPWYDLTDpYmwgTJgIaC2Z6VN52S4M=
-github.com/iotaledger/hive.go/logger v0.0.0-20231206114953-6a65a82e30ad/go.mod h1:hVaVODS+Uik0obf3SVEHFQNruUko/uqIgD/GKwhn49M=
-github.com/iotaledger/hive.go/runtime v0.0.0-20231206114953-6a65a82e30ad h1:HpupWK8iqFt+Sdogkh2/N8ojalmevYy+FzhjOuy7Y7E=
-github.com/iotaledger/hive.go/runtime v0.0.0-20231206114953-6a65a82e30ad/go.mod h1:Z9NFsByMh1Kf98f3v3ifeZRycbS2db1hjswTQG1MxnE=
-github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20231206114953-6a65a82e30ad h1:c8uwbBZDqpiCNN9/9Jji7Z4lL0GdVnORp8WMouiuknk=
-github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20231206114953-6a65a82e30ad/go.mod h1:FoH3T6yKlZJp8xm8K+zsQiibSynp32v21CpWx8xkek8=
-github.com/iotaledger/hive.go/stringify v0.0.0-20231206114953-6a65a82e30ad h1:VC3OgdSbyngY7/gxVj66fKd/nGmN6P0/myr348nx7vA=
-github.com/iotaledger/hive.go/stringify v0.0.0-20231206114953-6a65a82e30ad/go.mod h1:FTo/UWzNYgnQ082GI9QVM9HFDERqf9rw9RivNpqrnTs=
-github.com/iotaledger/hive.go/web v0.0.0-20231130122510-e3dddb0214f0 h1:bIfWp0AOQXMv3VkDAOQwqvMYLz1Ila1Z2hqiY2RJ3Io=
-github.com/iotaledger/hive.go/web v0.0.0-20231130122510-e3dddb0214f0/go.mod h1:L/CLz7skt9dvidhBOw2gmMGhmrUBHXlA0b3paugdsE4=
+github.com/iotaledger/hive.go/app v0.0.0-20231207181026-f482ac139305 h1:NPeZUkOI0xTKnuWefzcLRKfHr/shXmfoXDsfqXq8P0Q=
+github.com/iotaledger/hive.go/app v0.0.0-20231207181026-f482ac139305/go.mod h1:hTHKGFbZnuiW8yEgDuuL7ZjQTCnl8bXyHLmj3LPa648=
+github.com/iotaledger/hive.go/constraints v0.0.0-20231207181026-f482ac139305 h1:ar+IWfqO7B1M5+kuKGUJnfg0i/YuuM1oN5i8byp/F7A=
+github.com/iotaledger/hive.go/constraints v0.0.0-20231207181026-f482ac139305/go.mod h1:dOBOM2s4se3HcWefPe8sQLUalGXJ8yVXw58oK8jke3s=
+github.com/iotaledger/hive.go/core v1.0.0-rc.3.0.20231207181026-f482ac139305 h1:bKDeNAB2zm5mDJgmyRiHBbekkg9OtnfYVGKKRqKyLKk=
+github.com/iotaledger/hive.go/core v1.0.0-rc.3.0.20231207181026-f482ac139305/go.mod h1:CO28KMA6Pp5LJPiigPQQ276zQofES+jMod08U5pyRFA=
+github.com/iotaledger/hive.go/crypto v0.0.0-20231207181026-f482ac139305 h1:OR2TClxTtst906F4tok9xzhBTKO81qrUFdxIAoaZVvE=
+github.com/iotaledger/hive.go/crypto v0.0.0-20231207181026-f482ac139305/go.mod h1:7vHoF//1Pt3nu0l8nDIw7bEgv2GfbL3kSgjp7Rdqhd4=
+github.com/iotaledger/hive.go/ds v0.0.0-20231207181026-f482ac139305 h1:07ujwFv6qWLMRm32ZWZqMpgEugtsIoJ8vCmsn+vpO0E=
+github.com/iotaledger/hive.go/ds v0.0.0-20231207181026-f482ac139305/go.mod h1:NmZRIoxtL6iQdVK6n5W+JOx58K/0Yn8k7WuSvpKPQ+M=
+github.com/iotaledger/hive.go/ierrors v0.0.0-20231207181026-f482ac139305 h1:v7/zMhNcr6hibXFZXZ4xV/S27ESUytQFgUQ1oo10iic=
+github.com/iotaledger/hive.go/ierrors v0.0.0-20231207181026-f482ac139305/go.mod h1:HcE8B5lP96enc/OALTb2/rIIi+yOLouRoHOKRclKmC8=
+github.com/iotaledger/hive.go/lo v0.0.0-20231207181026-f482ac139305 h1:zxVbTEWutMvZhS0VLu/OmBk2WpMjrXQ7l67VBwsExtc=
+github.com/iotaledger/hive.go/lo v0.0.0-20231207181026-f482ac139305/go.mod h1:ETXGXymFyNcUq2t4I9e7ZK18f9bxUWYat4pjZ9W0rWc=
+github.com/iotaledger/hive.go/logger v0.0.0-20231207181026-f482ac139305 h1:ZuOnh3vNZqamoN1ibSj0ZMzIWsYZi6fpBp7BhTi2Qf4=
+github.com/iotaledger/hive.go/logger v0.0.0-20231207181026-f482ac139305/go.mod h1:hVaVODS+Uik0obf3SVEHFQNruUko/uqIgD/GKwhn49M=
+github.com/iotaledger/hive.go/runtime v0.0.0-20231207181026-f482ac139305 h1:7CW1/EbG+RvkjbyOf6JA1u1feax/cpex/6a8CLbaA4I=
+github.com/iotaledger/hive.go/runtime v0.0.0-20231207181026-f482ac139305/go.mod h1:Z9NFsByMh1Kf98f3v3ifeZRycbS2db1hjswTQG1MxnE=
+github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20231207181026-f482ac139305 h1:p79SQs2hE6Mo/MNEfEUfrKTY7gWp6c3oUeqK6I+o1lQ=
+github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20231207181026-f482ac139305/go.mod h1:FoH3T6yKlZJp8xm8K+zsQiibSynp32v21CpWx8xkek8=
+github.com/iotaledger/hive.go/stringify v0.0.0-20231207181026-f482ac139305 h1:KjbaklWvZb4zIcXBETHzl6XFTAf8wtAlFDfaF0Z1Daw=
+github.com/iotaledger/hive.go/stringify v0.0.0-20231207181026-f482ac139305/go.mod h1:FTo/UWzNYgnQ082GI9QVM9HFDERqf9rw9RivNpqrnTs=
+github.com/iotaledger/hive.go/web v0.0.0-20231207181026-f482ac139305 h1:dVzMN31qnSSvxTFzdkq93YNkVOFYsXqvy1Pqx5+OQTg=
+github.com/iotaledger/hive.go/web v0.0.0-20231207181026-f482ac139305/go.mod h1:0ke/puDLuEQptxeUiMaCayFFg6Bqwtcis4o0i6C9uxI=
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20231206124511-b78dc962031f h1:V68Ijq1A64gB9r0Rhc4ybLGH66rXqZ2Ly0L4uuaLrMg=
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20231206124511-b78dc962031f/go.mod h1:Dy3Gv4Dn1zufB177x6IXETP3zTeiWQ1+HMVQR0Bt/ew=
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20231206124145-f773dfe3927e h1:jbtiUlmTpTdGiRBW1pniPSqRcDMJaIW8fGS+uORryas=
@@ -279,6 +277,8 @@ github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RR
github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ=
github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
+github.com/mochi-mqtt/server/v2 v2.4.2 h1:x7xC41Qn/ek1hOWNcZraRm+Cmqc2yrfhD5VA1NFnXhc=
+github.com/mochi-mqtt/server/v2 v2.4.2/go.mod h1:M1lZnLbyowXUyQBIlHYlX1wasxXqv/qFWwQxAzfphwA=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
@@ -299,12 +299,11 @@ github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAv
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o=
-github.com/petermattis/goid v0.0.0-20231126143041-f558c26febf5 h1:+qIP3OMrT7SN5kLnTcVEISPOMB/97RyAKTg1UWA738E=
-github.com/petermattis/goid v0.0.0-20231126143041-f558c26febf5/go.mod h1:pxMtw7cyUw6B2bRH0ZBANSPg+AoSud1I1iyJHI69jH4=
+github.com/petermattis/goid v0.0.0-20231207134359-e60b3f734c67 h1:jik8PHtAIsPlCRJjJzl4udgEf7hawInF9texMeO2jrU=
+github.com/petermattis/goid v0.0.0-20231207134359-e60b3f734c67/go.mod h1:pxMtw7cyUw6B2bRH0ZBANSPg+AoSud1I1iyJHI69jH4=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
-github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
diff --git a/pkg/broker/auth.go b/pkg/broker/auth.go
deleted file mode 100644
index a8d25de..0000000
--- a/pkg/broker/auth.go
+++ /dev/null
@@ -1,87 +0,0 @@
-package broker
-
-import (
- "encoding/hex"
- "errors"
- "fmt"
-
- "github.com/iotaledger/hive.go/web/basicauth"
-)
-
-// AuthAllowEveryone allows everyone, but without write permission.
-type AuthAllowEveryone struct{}
-
-// Authenticate returns true if a username and password are acceptable.
-//
-//nolint:revive // we don't want to remove the user variable here, even if unused
-func (a *AuthAllowEveryone) Authenticate(user, password []byte) bool {
- return true
-}
-
-// ACL returns true if a user has access permissions to read or write on a topic.
-//
-//nolint:revive // we don't want to remove the user variable here, even if unused
-func (a *AuthAllowEveryone) ACL(user []byte, topic string, write bool) bool {
- // clients are not allowed to write
- return !write
-}
-
-// AuthAllowBasicAuth allows users that authenticate with basic auth, but without write permission.
-type AuthAllowBasicAuth struct {
- Salt []byte
- Users map[string][]byte
-}
-
-func NewAuthAllowUsers(passwordSaltHex string, users map[string]string) (*AuthAllowBasicAuth, error) {
-
- if len(passwordSaltHex) != 64 {
- return nil, errors.New("password salt must be 64 (hex encoded) in length")
- }
-
- passwordSalt, err := hex.DecodeString(passwordSaltHex)
- if err != nil {
- return nil, fmt.Errorf("parsing password salt failed: %w", err)
- }
-
- usersWithHashedPasswords := make(map[string][]byte)
- for user, passwordHashHex := range users {
- if len(passwordHashHex) != 64 {
- return nil, fmt.Errorf("password hash for user %s must be 64 (hex encoded scrypt hash) in length", user)
- }
-
- password, err := hex.DecodeString(passwordHashHex)
- if err != nil {
- return nil, fmt.Errorf("parsing password hash for user %s failed: %w", user, err)
- }
-
- usersWithHashedPasswords[user] = password
- }
-
- return &AuthAllowBasicAuth{
- Users: usersWithHashedPasswords,
- Salt: passwordSalt,
- }, nil
-}
-
-// Authenticate returns true if a username and password are acceptable.
-func (a *AuthAllowBasicAuth) Authenticate(user, password []byte) bool {
- // If the user exists in the auth users map, and the password is correct,
- // then they can connect to the server.
- if hashedPassword, exists := a.Users[string(user)]; exists {
-
- // error is ignored because it returns false in case it can't be derived
- if valid, _ := basicauth.VerifyPassword(password, a.Salt, hashedPassword); valid {
- return true
- }
- }
-
- return false
-}
-
-// ACL returns true if a user has access permissions to read or write on a topic.
-//
-//nolint:revive // we don't want to remove the user variable here, even if unused
-func (a *AuthAllowBasicAuth) ACL(user []byte, topic string, write bool) bool {
- // clients are not allowed to write
- return !write
-}
diff --git a/pkg/broker/broker.go b/pkg/broker/broker.go
index 2a11260..b75786d 100644
--- a/pkg/broker/broker.go
+++ b/pkg/broker/broker.go
@@ -2,16 +2,15 @@ package broker
import (
"crypto/tls"
- "errors"
- "fmt"
+ "math"
"net"
- mqtt "github.com/mochi-co/mqtt/server"
- "github.com/mochi-co/mqtt/server/events"
- "github.com/mochi-co/mqtt/server/listeners"
- "github.com/mochi-co/mqtt/server/listeners/auth"
- "github.com/mochi-co/mqtt/server/system"
+ mqtt "github.com/mochi-mqtt/server/v2"
+ "github.com/mochi-mqtt/server/v2/listeners"
+ "github.com/mochi-mqtt/server/v2/system"
+ "github.com/iotaledger/hive.go/ierrors"
+ "github.com/iotaledger/hive.go/web/basicauth"
"github.com/iotaledger/hive.go/web/subscriptionmanager"
)
@@ -48,28 +47,42 @@ func NewBroker(brokerOpts ...Option) (Broker, error) {
opts.ApplyOnDefault(brokerOpts...)
if !opts.WebsocketEnabled && !opts.TCPEnabled {
- return nil, errors.New("at least websocket or TCP must be enabled")
+ return nil, ierrors.New("at least websocket or TCP must be enabled")
}
- server := mqtt.NewServer(&mqtt.Options{
- BufferSize: opts.BufferSize,
- BufferBlockSize: opts.BufferBlockSize,
- InflightTTL: 30,
+ server := mqtt.New(&mqtt.Options{
+ // Capabilities defines the server features and behavior.
+ Capabilities: &mqtt.Capabilities{
+ MaximumSessionExpiryInterval: 0, // we don't keep disconnected sessions
+ MaximumMessageExpiryInterval: 10, // maximum message expiry if message expiry is 0 or over
+ ReceiveMaximum: 1024, // maximum number of concurrent qos messages per client
+ MaximumQos: 0, // we don't support QoS 1 and 2, we only "fire and forget"
+ RetainAvailable: 0, // retain messages is disabled in our usecase, we handle it ourselves
+ MaximumPacketSize: 1024, // we allow some bytes for the CONNECT and SUBSCRIBE messages, but otherwise the client is not allowed to send any messages anyways
+ TopicAliasMaximum: math.MaxUint16, // maximum topic alias value (default)
+ WildcardSubAvailable: 0, // wildcard subscriptions are disabled because we handle them ourselves, not on server side
+ SubIDAvailable: 0, // subscription identifiers are disabled, since we don't enable wildcard subscriptions in the server
+ SharedSubAvailable: 0, // shared subscriptions are prohibited in our usecase
+ MinimumProtocolVersion: 3, // minimum supported mqtt version (3.0.0) (default)
+ MaximumClientWritesPending: int32(opts.MaximumClientWritesPending),
+ },
+ ClientNetWriteBufferSize: opts.ClientWriteBufferSize,
+ ClientNetReadBufferSize: opts.ClientReadBufferSize,
+ InlineClient: true, // this needs to be set to true to allow the broker to send messages itself
})
if opts.WebsocketEnabled {
// check websocket bind address
_, _, err := net.SplitHostPort(opts.WebsocketBindAddress)
if err != nil {
- return nil, fmt.Errorf("parsing websocket bind address (%s) failed: %w", opts.WebsocketBindAddress, err)
+ return nil, ierrors.Errorf("parsing websocket bind address (%s) failed: %w", opts.WebsocketBindAddress, err)
}
- ws := listeners.NewWebsocket("ws1", opts.WebsocketBindAddress)
- if err := server.AddListener(ws, &listeners.Config{
- Auth: &AuthAllowEveryone{},
- TLS: nil,
- }); err != nil {
- return nil, fmt.Errorf("adding websocket listener failed: %w", err)
+ // skip TLS config since it is added via traefik in our recommended setup anyway
+ ws := listeners.NewWebsocket("ws1", opts.WebsocketBindAddress, &listeners.Config{TLSConfig: nil})
+
+ if err := server.AddListener(ws); err != nil {
+ return nil, ierrors.Errorf("adding websocket listener failed: %w", err)
}
}
@@ -77,48 +90,34 @@ func NewBroker(brokerOpts ...Option) (Broker, error) {
// check tcp bind address
_, _, err := net.SplitHostPort(opts.TCPBindAddress)
if err != nil {
- return nil, fmt.Errorf("parsing TCP bind address (%s) failed: %w", opts.TCPBindAddress, err)
- }
-
- tcp := listeners.NewTCP("t1", opts.TCPBindAddress)
-
- var tcpAuthController auth.Controller
- if opts.TCPAuthEnabled {
- var err error
- tcpAuthController, err = NewAuthAllowUsers(opts.TCPAuthPasswordSalt, opts.TCPAuthUsers)
- if err != nil {
- return nil, fmt.Errorf("enabling TCP Authentication failed: %w", err)
- }
- } else {
- tcpAuthController = &AuthAllowEveryone{}
+ return nil, ierrors.Errorf("parsing TCP bind address (%s) failed: %w", opts.TCPBindAddress, err)
}
var tlsConfig *tls.Config
if opts.TCPTLSEnabled {
- var err error
-
tlsConfig, err = NewTLSConfig(opts.TCPTLSCertificatePath, opts.TCPTLSPrivateKeyPath)
if err != nil {
- return nil, fmt.Errorf("enabling TCP TLS failed: %w", err)
+ return nil, ierrors.Errorf("enabling TCP TLS failed: %w", err)
}
}
- if err := server.AddListener(tcp, &listeners.Config{
- Auth: tcpAuthController,
+ tcp := listeners.NewTCP("t1", opts.TCPBindAddress, &listeners.Config{
TLSConfig: tlsConfig,
- }); err != nil {
- return nil, fmt.Errorf("adding TCP listener failed: %w", err)
+ })
+
+ if err := server.AddListener(tcp); err != nil {
+ return nil, ierrors.Errorf("adding TCP listener failed: %w", err)
}
}
- s := subscriptionmanager.New(
+ subscriptionManager := subscriptionmanager.New(
subscriptionmanager.WithMaxTopicSubscriptionsPerClient[string, string](opts.MaxTopicSubscriptionsPerClient),
subscriptionmanager.WithCleanupThresholdCount[string, string](opts.TopicCleanupThresholdCount),
subscriptionmanager.WithCleanupThresholdRatio[string, string](opts.TopicCleanupThresholdRatio),
)
// this event is used to drop malicious clients
- unhook := s.Events().DropClient.Hook(func(event *subscriptionmanager.DropClientEvent[string]) {
+ unhook := subscriptionManager.Events().DropClient.Hook(func(event *subscriptionmanager.DropClientEvent[string]) {
client, exists := server.Clients.Get(event.ClientID)
if !exists {
return
@@ -131,27 +130,25 @@ func NewBroker(brokerOpts ...Option) (Broker, error) {
server.Clients.Delete(event.ClientID)
}).Unhook
- // bind the broker events to the SubscriptionManager to track the subscriptions
- server.Events.OnConnect = func(cl events.Client, pk events.Packet) {
- s.Connect(cl.ID)
- }
-
- server.Events.OnDisconnect = func(cl events.Client, err error) {
- s.Disconnect(cl.ID)
+ basicAuthManager, err := basicauth.NewBasicAuthManager(opts.AuthUsers, opts.AuthPasswordSalt)
+ if err != nil {
+ return nil, ierrors.Errorf("failed to create basic auth manager: %w", err)
}
- server.Events.OnSubscribe = func(topic string, cl events.Client, qos byte) {
- s.Subscribe(cl.ID, topic)
+ // bind the broker events to the SubscriptionManager to track the subscriptions
+ brokerHook, err := NewBrokerHook(subscriptionManager, basicAuthManager, opts.PublicTopics, opts.ProtectedTopics)
+ if err != nil {
+ return nil, ierrors.Errorf("failed to create broker hook: %w", err)
}
- server.Events.OnUnsubscribe = func(topic string, cl events.Client) {
- s.Unsubscribe(cl.ID, topic)
+ if err := server.AddHook(brokerHook, nil); err != nil {
+ return nil, ierrors.Errorf("failed to add broker hook for subscriptions: %w", err)
}
return &broker{
server: server,
opts: opts,
- subscriptionManager: s,
+ subscriptionManager: subscriptionManager,
unhook: unhook,
}, nil
}
@@ -182,12 +179,13 @@ func (b *broker) HasSubscribers(topic string) bool {
// Send publishes a message.
func (b *broker) Send(topic string, payload []byte) error {
- return b.server.Publish(topic, payload, false)
+ // we publish all messages with QoS 0 ("fire and forget")
+ return b.server.Publish(topic, payload, false, 0)
}
// SystemInfo returns the metrics of the broker.
func (b *broker) SystemInfo() *system.Info {
- return b.server.System
+ return b.server.Info.Clone()
}
// SubscribersSize returns the size of the underlying map of the SubscriptionManager.
diff --git a/pkg/broker/broker_hook.go b/pkg/broker/broker_hook.go
new file mode 100644
index 0000000..973b20b
--- /dev/null
+++ b/pkg/broker/broker_hook.go
@@ -0,0 +1,181 @@
+package broker
+
+import (
+ "bytes"
+ "regexp"
+ "strings"
+
+ mqtt "github.com/mochi-mqtt/server/v2"
+ "github.com/mochi-mqtt/server/v2/packets"
+
+ "github.com/iotaledger/hive.go/ierrors"
+ "github.com/iotaledger/hive.go/web/basicauth"
+ "github.com/iotaledger/hive.go/web/subscriptionmanager"
+)
+
+// BrokerHook is the glue code between the mochi-mqtt server and inx-mqtt.
+// It is responsible for keeping the subscription manager up to date.
+// It also handles authorization and protection of topics.
+//
+//nolint:revive
+type BrokerHook struct {
+ mqtt.HookBase
+
+ subscriptionManager *subscriptionmanager.SubscriptionManager[string, string]
+ basicAuthManager *basicauth.BasicAuthManager
+ matchPublicTopics func(topic string) bool
+ matchProtectedTopics func(topic string) bool
+}
+
+func NewBrokerHook(subscriptionManager *subscriptionmanager.SubscriptionManager[string, string],
+ basicAuthManager *basicauth.BasicAuthManager,
+ publicTopics []string,
+ protectedTopics []string) (*BrokerHook, error) {
+
+ regexesPublicTopics, err := compileTopicsAsRegexes(publicTopics)
+ if err != nil {
+ return nil, ierrors.Wrap(err, "failed to compile public topics")
+ }
+
+ regexesProtectedTopics, err := compileTopicsAsRegexes(protectedTopics)
+ if err != nil {
+ return nil, ierrors.Wrap(err, "failed to compile protected topics")
+ }
+
+ matchPublicTopics := func(topic string) bool {
+ loweredTopic := strings.ToLower(topic)
+
+ for _, reg := range regexesPublicTopics {
+ if reg.MatchString(loweredTopic) {
+ return true
+ }
+ }
+
+ return false
+ }
+
+ matchProtectedTopics := func(topic string) bool {
+ loweredTopic := strings.ToLower(topic)
+
+ for _, reg := range regexesProtectedTopics {
+ if reg.MatchString(loweredTopic) {
+ return true
+ }
+ }
+
+ return false
+ }
+
+ return &BrokerHook{
+ subscriptionManager: subscriptionManager,
+ basicAuthManager: basicAuthManager,
+ matchPublicTopics: matchPublicTopics,
+ matchProtectedTopics: matchProtectedTopics,
+ }, nil
+}
+
+// ID returns the ID of the hook.
+func (h *BrokerHook) ID() string {
+ return "iota-mqtt-broker-hook"
+}
+
+// Provides indicates which methods a hook provides. The default is none - this method
+// should be overridden by the embedding hook.
+func (h *BrokerHook) Provides(b byte) bool {
+ //nolint:gocritic // false positive, the argument order is correct
+ return bytes.Contains([]byte{
+ mqtt.OnConnectAuthenticate,
+ mqtt.OnACLCheck,
+ mqtt.OnSessionEstablished,
+ mqtt.OnDisconnect,
+ mqtt.OnSubscribed,
+ mqtt.OnUnsubscribed,
+ }, []byte{b})
+}
+
+// OnConnectAuthenticate returns true if the connecting client has rules which provide access
+// in the auth ledger.
+func (h *BrokerHook) OnConnectAuthenticate(cl *mqtt.Client, pk packets.Packet) bool {
+ username := string(cl.Properties.Username)
+ password := pk.Connect.Password
+
+ // check if the given user exists in the users map
+ if !h.basicAuthManager.Exists(username) {
+ // if the user doesn't exist, the user is still allowed to connect, but without access to the protected topics.
+ return true
+ }
+
+ // if the user exists in the users map, we need to check if the given password is correct, otherwise the user is not allowed to connect.
+ // All successfully connected users which are know in the users map automatically get elevated priviledges to access protected topics.
+ return h.basicAuthManager.VerifyUsernameAndPasswordBytes(username, password)
+}
+
+// OnACLCheck returns true if the connecting client has matching read or write access to subscribe
+// or publish to a given topic.
+func (h *BrokerHook) OnACLCheck(cl *mqtt.Client, topic string, write bool) bool {
+ // clients are not allowed to write
+ if write {
+ return false
+ }
+
+ // check if the topic is protected
+ if h.matchProtectedTopics(topic) {
+ // check if the client is authenticated
+ // if the client is not authenticated, it is not allowed to access protected topics
+ return h.basicAuthManager.Exists(string(cl.Properties.Username))
+ }
+
+ // allow everyone to read public topics
+ return h.matchPublicTopics(topic)
+}
+
+// OnSessionEstablished is called when a new client establishes a session (after OnConnect).
+func (h *BrokerHook) OnSessionEstablished(cl *mqtt.Client, _ packets.Packet) {
+ h.subscriptionManager.Connect(cl.ID)
+}
+
+// OnDisconnect is called when a client is disconnected for any reason.
+func (h *BrokerHook) OnDisconnect(cl *mqtt.Client, _ error, _ bool) {
+ h.subscriptionManager.Disconnect(cl.ID)
+}
+
+// OnSubscribed is called when a client subscribes to one or more filters.
+func (h *BrokerHook) OnSubscribed(cl *mqtt.Client, pk packets.Packet, _ []byte) {
+ for _, subscription := range pk.Filters {
+ h.subscriptionManager.Subscribe(cl.ID, subscription.Filter)
+ }
+}
+
+// OnUnsubscribed is called when a client unsubscribes from one or more filters.
+func (h *BrokerHook) OnUnsubscribed(cl *mqtt.Client, pk packets.Packet) {
+ for _, subscription := range pk.Filters {
+ h.subscriptionManager.Unsubscribe(cl.ID, subscription.Filter)
+ }
+}
+
+func compileTopicAsRegex(topic string) *regexp.Regexp {
+
+ r := regexp.QuoteMeta(topic)
+ r = strings.ReplaceAll(r, `\*`, "(.*?)")
+ r += "$"
+
+ reg, err := regexp.Compile(r)
+ if err != nil {
+ return nil
+ }
+
+ return reg
+}
+
+func compileTopicsAsRegexes(topics []string) ([]*regexp.Regexp, error) {
+ regexes := make([]*regexp.Regexp, len(topics))
+ for i, topic := range topics {
+ reg := compileTopicAsRegex(topic)
+ if reg == nil {
+ return nil, ierrors.Errorf("invalid topic in config: %s", topic)
+ }
+ regexes[i] = reg
+ }
+
+ return regexes, nil
+}
diff --git a/pkg/broker/options.go b/pkg/broker/options.go
index 0f1cad3..dd650b8 100644
--- a/pkg/broker/options.go
+++ b/pkg/broker/options.go
@@ -2,57 +2,65 @@ package broker
// Options are options around the broker.
type Options struct {
- // BufferSize is the size of the client buffers in bytes.
- BufferSize int
- // BufferBlockSize is the size per client buffer R/W block in bytes.
- BufferBlockSize int
-
- // MaxTopicSubscriptionsPerClient defines the maximum number of topic subscriptions per client before the client gets dropped (DOS protection).
- MaxTopicSubscriptionsPerClient int
- // TopicCleanupThresholdCount defines the number of deleted topics that trigger a garbage collection of the SubscriptionManager.
- TopicCleanupThresholdCount int
- // TopicCleanupThresholdRatio defines the ratio of subscribed topics to deleted topics that trigger a garbage collection of the SubscriptionManager.
- TopicCleanupThresholdRatio float32
-
// WebsocketEnabled defines whether to enable the websocket connection of the MQTT broker.
WebsocketEnabled bool
// WebsocketBindAddress defines the websocket bind address on which the MQTT broker listens on.
WebsocketBindAddress string
+
// TCPEnabled defines whether to enable the TCP connection of the MQTT broker.
TCPEnabled bool
// TCPBindAddress defines the TCP bind address on which the MQTT broker listens on.
TCPBindAddress string
-
- // TCPAuthEnabled defines whether to enable auth for TCP connections.
- TCPAuthEnabled bool
- // TCPAuthPasswordSalt is the auth salt used for hashing the passwords of the users.
- TCPAuthPasswordSalt string
- // TCPAuthUsers is the list of allowed users with their password+salt as a scrypt hash.
- TCPAuthUsers map[string]string
-
// TCPTLSEnabled defines whether to enable TLS for TCP connections.
TCPTLSEnabled bool
// TCPTLSCertificatePath is the path to the certificate file (x509 PEM) for TCP connections with TLS.
TCPTLSCertificatePath string
// TCPTLSPrivateKeyPath is the path to the private key file (x509 PEM) for TCP connections with TLS.
TCPTLSPrivateKeyPath string
+
+ // AuthPasswordSalt is the auth salt used for hashing the passwords of the users.
+ AuthPasswordSalt string
+ // AuthUsers is the list of allowed users with their password+salt as a scrypt hash.
+ AuthUsers map[string]string
+
+ // PublicTopics are the MQTT topics which can be subscribed to without authorization. Wildcards using * are allowed
+ PublicTopics []string
+ // ProtectedTopics are the MQTT topics which only can be subscribed to with valid authorization. Wildcards using * are allowed
+ ProtectedTopics []string
+
+ // MaxTopicSubscriptionsPerClient defines the maximum number of topic subscriptions per client before the client gets dropped (DOS protection).
+ MaxTopicSubscriptionsPerClient int
+ // TopicCleanupThresholdCount defines the number of deleted topics that trigger a garbage collection of the SubscriptionManager.
+ TopicCleanupThresholdCount int
+ // TopicCleanupThresholdRatio defines the ratio of subscribed topics to deleted topics that trigger a garbage collection of the SubscriptionManager.
+ TopicCleanupThresholdRatio float32
+
+ // MaximumClientWritesPending specifies the maximum number of pending message writes for a client.
+ MaximumClientWritesPending int
+ // ClientWriteBufferSize specifies the size of the client write buffer.
+ ClientWriteBufferSize int
+ // ClientNetReadBufferSize specifies the size of the client read buffer.
+ ClientReadBufferSize int
}
var defaultOpts = []Option{
- WithBufferSize(0),
- WithBufferBlockSize(0),
- WithTopicCleanupThresholdCount(10000),
- WithTopicCleanupThresholdRatio(1.0),
WithWebsocketEnabled(true),
WithWebsocketBindAddress("localhost:1888"),
WithTCPEnabled(false),
WithTCPBindAddress("localhost:1883"),
- WithTCPAuthEnabled(false),
- WithTCPAuthPasswordSalt("0000000000000000000000000000000000000000000000000000000000000000"),
- WithTCPAuthUsers(map[string]string{}),
WithTCPTLSEnabled(false),
WithTCPTLSCertificatePath(""),
WithTCPTLSPrivateKeyPath(""),
+ WithAuthPasswordSalt("0000000000000000000000000000000000000000000000000000000000000000"),
+ WithAuthUsers(map[string]string{}),
+ WithPublicTopics([]string{"*"}),
+ WithProtectedTopics([]string{}),
+ WithMaxTopicSubscriptionsPerClient(1000),
+ WithTopicCleanupThresholdCount(10000),
+ WithTopicCleanupThresholdRatio(1.0),
+ WithMaximumClientWritesPending(1024 * 8),
+ WithClientWriteBufferSize(1024 * 2),
+ WithClientReadBufferSize(1024 * 2),
}
// applies the given Option.
@@ -71,107 +79,121 @@ func (bo *Options) ApplyOnDefault(opts ...Option) {
// Option is a function which sets an option on a Options instance.
type Option func(options *Options)
-// WithBufferSize sets the size of the client buffers in bytes.
-func WithBufferSize(bufferSize int) Option {
+// WithWebsocketEnabled sets whether to enable the websocket connection of the MQTT broker.
+func WithWebsocketEnabled(websocketEnabled bool) Option {
return func(options *Options) {
- options.BufferSize = bufferSize
+ options.WebsocketEnabled = websocketEnabled
}
}
-// WithBufferBlockSize sets the size per client buffer R/W block in bytes.
-func WithBufferBlockSize(bufferBlockSize int) Option {
+// WithWebsocketBindAddress sets the websocket bind address on which the MQTT broker listens on.
+func WithWebsocketBindAddress(websocketBindAddress string) Option {
return func(options *Options) {
- options.BufferBlockSize = bufferBlockSize
+ options.WebsocketBindAddress = websocketBindAddress
}
}
-// WithMaxTopicSubscriptionsPerClient sets the maximum number of topic subscriptions per client before the client gets dropped (DOS protection).
-func WithMaxTopicSubscriptionsPerClient(maxTopicSubscriptionsPerClient int) Option {
+// WithTCPEnabled sets whether to enable the TCP connection of the MQTT broker.
+func WithTCPEnabled(tcpEnabled bool) Option {
return func(options *Options) {
- options.MaxTopicSubscriptionsPerClient = maxTopicSubscriptionsPerClient
+ options.TCPEnabled = tcpEnabled
}
}
-// WithTopicCleanupThresholdCount sets the number of deleted topics that trigger a garbage collection of the SubscriptionManager.
-func WithTopicCleanupThresholdCount(topicCleanupThresholdCount int) Option {
+// WithTCPBindAddress sets the TCP bind address on which the MQTT broker listens on.
+func WithTCPBindAddress(tcpBindAddress string) Option {
return func(options *Options) {
- options.TopicCleanupThresholdCount = topicCleanupThresholdCount
+ options.TCPBindAddress = tcpBindAddress
}
}
-// WithTopicCleanupThresholdRatio the ratio of subscribed topics to deleted topics that trigger a garbage collection of the SubscriptionManager.
-func WithTopicCleanupThresholdRatio(topicCleanupThresholdRatio float32) Option {
+// WithTCPTLSEnabled sets whether to enable TLS for TCP connections.
+func WithTCPTLSEnabled(tcpTLSEnabled bool) Option {
return func(options *Options) {
- options.TopicCleanupThresholdRatio = topicCleanupThresholdRatio
+ options.TCPTLSEnabled = tcpTLSEnabled
}
}
-// WithWebsocketEnabled sets whether to enable the websocket connection of the MQTT broker.
-func WithWebsocketEnabled(websocketEnabled bool) Option {
+// WithTCPTLSCertificatePath sets the path to the certificate file (x509 PEM) for TCP connections with TLS.
+func WithTCPTLSCertificatePath(tcpTLSCertificatePath string) Option {
return func(options *Options) {
- options.WebsocketEnabled = websocketEnabled
+ options.TCPTLSCertificatePath = tcpTLSCertificatePath
}
}
-// WithWebsocketBindAddress sets the websocket bind address on which the MQTT broker listens on.
-func WithWebsocketBindAddress(websocketBindAddress string) Option {
+// WithTCPTLSPrivateKeyPath sets the path to the private key file (x509 PEM) for TCP connections with TLS.
+func WithTCPTLSPrivateKeyPath(tcpTLSPrivateKeyPath string) Option {
return func(options *Options) {
- options.WebsocketBindAddress = websocketBindAddress
+ options.TCPTLSPrivateKeyPath = tcpTLSPrivateKeyPath
}
}
-// WithTCPEnabled sets whether to enable the TCP connection of the MQTT broker.
-func WithTCPEnabled(tcpEnabled bool) Option {
+// WithAuthPasswordSalt sets the auth salt used for hashing the passwords of the users.
+func WithAuthPasswordSalt(tcpAuthPasswordSalt string) Option {
return func(options *Options) {
- options.TCPEnabled = tcpEnabled
+ options.AuthPasswordSalt = tcpAuthPasswordSalt
}
}
-// WithTCPBindAddress sets the TCP bind address on which the MQTT broker listens on.
-func WithTCPBindAddress(tcpBindAddress string) Option {
+// WithAuthUsers sets the list of allowed users with their password+salt as a scrypt hash.
+func WithAuthUsers(tcpAuthUsers map[string]string) Option {
return func(options *Options) {
- options.TCPBindAddress = tcpBindAddress
+ options.AuthUsers = tcpAuthUsers
+ }
+}
+
+// WithPublicTopics sets the MQTT topics which can be subscribed to without authorization. Wildcards using * are allowed.
+func WithPublicTopics(publicTopics []string) Option {
+ return func(options *Options) {
+ options.PublicTopics = publicTopics
}
}
-// WithTCPAuthEnabled sets whether to enable auth for TCP connections.
-func WithTCPAuthEnabled(tcpAuthEnabled bool) Option {
+// WithProtectedTopics sets the MQTT topics which only can be subscribed to with valid authorization. Wildcards using * are allowed.
+func WithProtectedTopics(protectedTopics []string) Option {
return func(options *Options) {
- options.TCPAuthEnabled = tcpAuthEnabled
+ options.ProtectedTopics = protectedTopics
}
}
-// WithTCPAuthPasswordSalt sets the auth salt used for hashing the passwords of the users.
-func WithTCPAuthPasswordSalt(tcpAuthPasswordSalt string) Option {
+// WithMaxTopicSubscriptionsPerClient sets the maximum number of topic subscriptions per client before the client gets dropped (DOS protection).
+func WithMaxTopicSubscriptionsPerClient(maxTopicSubscriptionsPerClient int) Option {
return func(options *Options) {
- options.TCPAuthPasswordSalt = tcpAuthPasswordSalt
+ options.MaxTopicSubscriptionsPerClient = maxTopicSubscriptionsPerClient
}
}
-// WithTCPAuthUsers sets the list of allowed users with their password+salt as a scrypt hash.
-func WithTCPAuthUsers(tcpAuthUsers map[string]string) Option {
+// WithTopicCleanupThresholdCount sets the number of deleted topics that trigger a garbage collection of the SubscriptionManager.
+func WithTopicCleanupThresholdCount(topicCleanupThresholdCount int) Option {
return func(options *Options) {
- options.TCPAuthUsers = tcpAuthUsers
+ options.TopicCleanupThresholdCount = topicCleanupThresholdCount
}
}
-// WithTCPTLSEnabled sets whether to enable TLS for TCP connections.
-func WithTCPTLSEnabled(tcpTLSEnabled bool) Option {
+// WithTopicCleanupThresholdRatio the ratio of subscribed topics to deleted topics that trigger a garbage collection of the SubscriptionManager.
+func WithTopicCleanupThresholdRatio(topicCleanupThresholdRatio float32) Option {
return func(options *Options) {
- options.TCPTLSEnabled = tcpTLSEnabled
+ options.TopicCleanupThresholdRatio = topicCleanupThresholdRatio
}
}
-// WithTCPTLSCertificatePath sets the path to the certificate file (x509 PEM) for TCP connections with TLS.
-func WithTCPTLSCertificatePath(tcpTLSCertificatePath string) Option {
+// WithMaximumClientWritesPending specifies the maximum number of pending message writes for a client.
+func WithMaximumClientWritesPending(maximumClientWritesPending int) Option {
return func(options *Options) {
- options.TCPTLSCertificatePath = tcpTLSCertificatePath
+ options.MaximumClientWritesPending = maximumClientWritesPending
}
}
-// WithTCPTLSPrivateKeyPath sets the path to the private key file (x509 PEM) for TCP connections with TLS.
-func WithTCPTLSPrivateKeyPath(tcpTLSPrivateKeyPath string) Option {
+// WithClientWriteBufferSize specifies the size of the client write buffer.
+func WithClientWriteBufferSize(clientWriteBufferSize int) Option {
return func(options *Options) {
- options.TCPTLSPrivateKeyPath = tcpTLSPrivateKeyPath
+ options.ClientWriteBufferSize = clientWriteBufferSize
+ }
+}
+
+// WithClientReadBufferSize specifies the size of the client read buffer.
+func WithClientReadBufferSize(clientReadBufferSize int) Option {
+ return func(options *Options) {
+ options.ClientReadBufferSize = clientReadBufferSize
}
}
diff --git a/pkg/broker/tls.go b/pkg/broker/tls.go
index 6a5d3f7..b9762e1 100644
--- a/pkg/broker/tls.go
+++ b/pkg/broker/tls.go
@@ -2,8 +2,9 @@ package broker
import (
"crypto/tls"
- "fmt"
"os"
+
+ "github.com/iotaledger/hive.go/ierrors"
)
func NewTLSConfig(tcpTLSCertificatePath string, tcpTLSPrivateKeyPath string) (*tls.Config, error) {
@@ -11,34 +12,34 @@ func NewTLSConfig(tcpTLSCertificatePath string, tcpTLSPrivateKeyPath string) (*t
if _, err := os.Stat(tcpTLSCertificatePath); err != nil {
if os.IsNotExist(err) {
// file does not exist
- return nil, fmt.Errorf("TCP TLS certificate file not found (%s)", tcpTLSCertificatePath)
+ return nil, ierrors.Errorf("TCP TLS certificate file not found (%s)", tcpTLSCertificatePath)
}
- return nil, fmt.Errorf("unable to check TCP TLS certificate file (%s): %w", tcpTLSCertificatePath, err)
+ return nil, ierrors.Errorf("unable to check TCP TLS certificate file (%s): %w", tcpTLSCertificatePath, err)
}
if _, err := os.Stat(tcpTLSPrivateKeyPath); err != nil {
if os.IsNotExist(err) {
// file does not exist
- return nil, fmt.Errorf("TCP TLS private key file not found (%s)", tcpTLSPrivateKeyPath)
+ return nil, ierrors.Errorf("TCP TLS private key file not found (%s)", tcpTLSPrivateKeyPath)
}
- return nil, fmt.Errorf("unable to check TCP TLS private key file (%s): %w", tcpTLSPrivateKeyPath, err)
+ return nil, ierrors.Errorf("unable to check TCP TLS private key file (%s): %w", tcpTLSPrivateKeyPath, err)
}
tcpTLSCertificate, err := os.ReadFile(tcpTLSCertificatePath)
if err != nil {
- return nil, fmt.Errorf("unable to read TCP TLS certificate: %w", err)
+ return nil, ierrors.Errorf("unable to read TCP TLS certificate: %w", err)
}
tcpTLSPrivateKey, err := os.ReadFile(tcpTLSPrivateKeyPath)
if err != nil {
- return nil, fmt.Errorf("unable to read TCP TLS private key: %w", err)
+ return nil, ierrors.Errorf("unable to read TCP TLS private key: %w", err)
}
cert, err := tls.X509KeyPair(tcpTLSCertificate, tcpTLSPrivateKey)
if err != nil {
- return nil, fmt.Errorf("loading TCP TLS configuration failed: %w", err)
+ return nil, ierrors.Errorf("loading TCP TLS configuration failed: %w", err)
}
return &tls.Config{
diff --git a/pkg/mqtt/server.go b/pkg/mqtt/server.go
index cd385cc..15fe724 100644
--- a/pkg/mqtt/server.go
+++ b/pkg/mqtt/server.go
@@ -7,7 +7,6 @@ import (
"sync"
"time"
- "github.com/pkg/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -357,7 +356,7 @@ func (s *Server) startListenIfNeeded(ctx context.Context, grpcCall string, liste
go func() {
s.LogInfof("Listen to %s", grpcCall)
- if err := listenFunc(sub.context); err != nil && !errors.Is(err, context.Canceled) {
+ if err := listenFunc(sub.context); err != nil && !ierrors.Is(err, context.Canceled) {
s.LogErrorf("Finished listen to %s with error: %s", grpcCall, err.Error())
if status.Code(err) == codes.Unavailable && s.shutdownHandler != nil {
s.shutdownHandler.SelfShutdown("INX became unavailable", true)
diff --git a/pkg/testsuite/broker_mock.go b/pkg/testsuite/broker_mock.go
index 30f1018..866d598 100644
--- a/pkg/testsuite/broker_mock.go
+++ b/pkg/testsuite/broker_mock.go
@@ -5,7 +5,7 @@ import (
"sync"
"testing"
- "github.com/mochi-co/mqtt/server/system"
+ "github.com/mochi-mqtt/server/v2/system"
"github.com/stretchr/testify/require"
"github.com/iotaledger/hive.go/web/subscriptionmanager"
diff --git a/tools/gendoc/go.mod b/tools/gendoc/go.mod
index 13899fe..2466217 100644
--- a/tools/gendoc/go.mod
+++ b/tools/gendoc/go.mod
@@ -5,7 +5,7 @@ go 1.21
replace github.com/iotaledger/inx-mqtt => ../../
require (
- github.com/iotaledger/hive.go/app v0.0.0-20231206114953-6a65a82e30ad
+ github.com/iotaledger/hive.go/app v0.0.0-20231207181026-f482ac139305
github.com/iotaledger/hive.go/apputils v0.0.0-20230829152614-7afc7a4d89b3
github.com/iotaledger/inx-mqtt v0.0.0-00010101000000-000000000000
)
@@ -33,17 +33,17 @@ require (
github.com/hashicorp/go-version v1.6.0 // indirect
github.com/holiman/uint256 v1.2.4 // indirect
github.com/iancoleman/orderedmap v0.3.0 // indirect
- github.com/iotaledger/hive.go/constraints v0.0.0-20231206114953-6a65a82e30ad // indirect
- github.com/iotaledger/hive.go/core v1.0.0-rc.3.0.20231206114953-6a65a82e30ad // indirect
- github.com/iotaledger/hive.go/crypto v0.0.0-20231206114953-6a65a82e30ad // indirect
- github.com/iotaledger/hive.go/ds v0.0.0-20231206114953-6a65a82e30ad // indirect
- github.com/iotaledger/hive.go/ierrors v0.0.0-20231206114953-6a65a82e30ad // indirect
- github.com/iotaledger/hive.go/lo v0.0.0-20231206114953-6a65a82e30ad // indirect
- github.com/iotaledger/hive.go/logger v0.0.0-20231206114953-6a65a82e30ad // indirect
- github.com/iotaledger/hive.go/runtime v0.0.0-20231206114953-6a65a82e30ad // indirect
- github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20231206114953-6a65a82e30ad // indirect
- github.com/iotaledger/hive.go/stringify v0.0.0-20231206114953-6a65a82e30ad // indirect
- github.com/iotaledger/hive.go/web v0.0.0-20231130122510-e3dddb0214f0 // indirect
+ github.com/iotaledger/hive.go/constraints v0.0.0-20231207181026-f482ac139305 // indirect
+ github.com/iotaledger/hive.go/core v1.0.0-rc.3.0.20231207181026-f482ac139305 // indirect
+ github.com/iotaledger/hive.go/crypto v0.0.0-20231207181026-f482ac139305 // indirect
+ github.com/iotaledger/hive.go/ds v0.0.0-20231207181026-f482ac139305 // indirect
+ github.com/iotaledger/hive.go/ierrors v0.0.0-20231207181026-f482ac139305 // indirect
+ github.com/iotaledger/hive.go/lo v0.0.0-20231207181026-f482ac139305 // indirect
+ github.com/iotaledger/hive.go/logger v0.0.0-20231207181026-f482ac139305 // indirect
+ github.com/iotaledger/hive.go/runtime v0.0.0-20231207181026-f482ac139305 // indirect
+ github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20231207181026-f482ac139305 // indirect
+ github.com/iotaledger/hive.go/stringify v0.0.0-20231207181026-f482ac139305 // indirect
+ github.com/iotaledger/hive.go/web v0.0.0-20231207181026-f482ac139305 // indirect
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20231206124511-b78dc962031f // indirect
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20231206124145-f773dfe3927e // indirect
github.com/iotaledger/iota.go/v4 v4.0.0-20231206123921-2af411eef0b5 // indirect
@@ -57,12 +57,11 @@ require (
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
- github.com/mochi-co/mqtt v1.3.2 // indirect
+ github.com/mochi-mqtt/server/v2 v2.4.2 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/pasztorpisti/qs v0.0.0-20171216220353-8d6c33ee906c // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
- github.com/petermattis/goid v0.0.0-20231126143041-f558c26febf5 // indirect
- github.com/pkg/errors v0.9.1 // indirect
+ github.com/petermattis/goid v0.0.0-20231207134359-e60b3f734c67 // indirect
github.com/prometheus/client_golang v1.17.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.45.0 // indirect
diff --git a/tools/gendoc/go.sum b/tools/gendoc/go.sum
index eb6ccd0..b3fa888 100644
--- a/tools/gendoc/go.sum
+++ b/tools/gendoc/go.sum
@@ -184,32 +184,32 @@ github.com/holiman/uint256 v1.2.4/go.mod h1:EOMSn4q6Nyt9P6efbI3bueV4e1b3dGlUCXei
github.com/iancoleman/orderedmap v0.3.0 h1:5cbR2grmZR/DiVt+VJopEhtVs9YGInGIxAoMJn+Ichc=
github.com/iancoleman/orderedmap v0.3.0/go.mod h1:XuLcCUkdL5owUCQeF2Ue9uuw1EptkJDkXXS7VoV7XGE=
github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w=
-github.com/iotaledger/hive.go/app v0.0.0-20231206114953-6a65a82e30ad h1:v7dkbVLSsmzgOWT2vjvv1MdKQXvqFbvIkx8mvh6VK7g=
-github.com/iotaledger/hive.go/app v0.0.0-20231206114953-6a65a82e30ad/go.mod h1:hTHKGFbZnuiW8yEgDuuL7ZjQTCnl8bXyHLmj3LPa648=
+github.com/iotaledger/hive.go/app v0.0.0-20231207181026-f482ac139305 h1:NPeZUkOI0xTKnuWefzcLRKfHr/shXmfoXDsfqXq8P0Q=
+github.com/iotaledger/hive.go/app v0.0.0-20231207181026-f482ac139305/go.mod h1:hTHKGFbZnuiW8yEgDuuL7ZjQTCnl8bXyHLmj3LPa648=
github.com/iotaledger/hive.go/apputils v0.0.0-20230829152614-7afc7a4d89b3 h1:4aVJTc0KS77uEw0Tny4r0n1ORwcbAQDECaCclgf/6lE=
github.com/iotaledger/hive.go/apputils v0.0.0-20230829152614-7afc7a4d89b3/go.mod h1:TZeAqieDu+xDOZp2e9+S+8pZp1PrfgcwLUnxmd8IgLU=
-github.com/iotaledger/hive.go/constraints v0.0.0-20231206114953-6a65a82e30ad h1:4XL7IIvdsWHxSKQfU+sgq3H9egN54053LF9TwMfDcTg=
-github.com/iotaledger/hive.go/constraints v0.0.0-20231206114953-6a65a82e30ad/go.mod h1:dOBOM2s4se3HcWefPe8sQLUalGXJ8yVXw58oK8jke3s=
-github.com/iotaledger/hive.go/core v1.0.0-rc.3.0.20231206114953-6a65a82e30ad h1:iNzb/Oy/nucIOXOzRcwSqqFsaeKwr2JZpZYSLp8xjlE=
-github.com/iotaledger/hive.go/core v1.0.0-rc.3.0.20231206114953-6a65a82e30ad/go.mod h1:CO28KMA6Pp5LJPiigPQQ276zQofES+jMod08U5pyRFA=
-github.com/iotaledger/hive.go/crypto v0.0.0-20231206114953-6a65a82e30ad h1:pUL2UZbF4S8FIV7uKo9p+IGfZ658K1VNorQ6rzDMRvs=
-github.com/iotaledger/hive.go/crypto v0.0.0-20231206114953-6a65a82e30ad/go.mod h1:7vHoF//1Pt3nu0l8nDIw7bEgv2GfbL3kSgjp7Rdqhd4=
-github.com/iotaledger/hive.go/ds v0.0.0-20231206114953-6a65a82e30ad h1:adLrD6dOEkM5Xdg6AOPt9/HYqy/pQ5FrprDpW4/VqUU=
-github.com/iotaledger/hive.go/ds v0.0.0-20231206114953-6a65a82e30ad/go.mod h1:NmZRIoxtL6iQdVK6n5W+JOx58K/0Yn8k7WuSvpKPQ+M=
-github.com/iotaledger/hive.go/ierrors v0.0.0-20231206114953-6a65a82e30ad h1:WDl58zJKHfwbzHs+ZB8Jq3YNgVQE5Neu2NeaX3FZuyU=
-github.com/iotaledger/hive.go/ierrors v0.0.0-20231206114953-6a65a82e30ad/go.mod h1:HcE8B5lP96enc/OALTb2/rIIi+yOLouRoHOKRclKmC8=
-github.com/iotaledger/hive.go/lo v0.0.0-20231206114953-6a65a82e30ad h1:qpCsjw+InLL824QPu3lY/osck4DhucBKhCs5/E8OH+A=
-github.com/iotaledger/hive.go/lo v0.0.0-20231206114953-6a65a82e30ad/go.mod h1:ETXGXymFyNcUq2t4I9e7ZK18f9bxUWYat4pjZ9W0rWc=
-github.com/iotaledger/hive.go/logger v0.0.0-20231206114953-6a65a82e30ad h1:fazCxogqOLDEPNDPWYDLTDpYmwgTJgIaC2Z6VN52S4M=
-github.com/iotaledger/hive.go/logger v0.0.0-20231206114953-6a65a82e30ad/go.mod h1:hVaVODS+Uik0obf3SVEHFQNruUko/uqIgD/GKwhn49M=
-github.com/iotaledger/hive.go/runtime v0.0.0-20231206114953-6a65a82e30ad h1:HpupWK8iqFt+Sdogkh2/N8ojalmevYy+FzhjOuy7Y7E=
-github.com/iotaledger/hive.go/runtime v0.0.0-20231206114953-6a65a82e30ad/go.mod h1:Z9NFsByMh1Kf98f3v3ifeZRycbS2db1hjswTQG1MxnE=
-github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20231206114953-6a65a82e30ad h1:c8uwbBZDqpiCNN9/9Jji7Z4lL0GdVnORp8WMouiuknk=
-github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20231206114953-6a65a82e30ad/go.mod h1:FoH3T6yKlZJp8xm8K+zsQiibSynp32v21CpWx8xkek8=
-github.com/iotaledger/hive.go/stringify v0.0.0-20231206114953-6a65a82e30ad h1:VC3OgdSbyngY7/gxVj66fKd/nGmN6P0/myr348nx7vA=
-github.com/iotaledger/hive.go/stringify v0.0.0-20231206114953-6a65a82e30ad/go.mod h1:FTo/UWzNYgnQ082GI9QVM9HFDERqf9rw9RivNpqrnTs=
-github.com/iotaledger/hive.go/web v0.0.0-20231130122510-e3dddb0214f0 h1:bIfWp0AOQXMv3VkDAOQwqvMYLz1Ila1Z2hqiY2RJ3Io=
-github.com/iotaledger/hive.go/web v0.0.0-20231130122510-e3dddb0214f0/go.mod h1:L/CLz7skt9dvidhBOw2gmMGhmrUBHXlA0b3paugdsE4=
+github.com/iotaledger/hive.go/constraints v0.0.0-20231207181026-f482ac139305 h1:ar+IWfqO7B1M5+kuKGUJnfg0i/YuuM1oN5i8byp/F7A=
+github.com/iotaledger/hive.go/constraints v0.0.0-20231207181026-f482ac139305/go.mod h1:dOBOM2s4se3HcWefPe8sQLUalGXJ8yVXw58oK8jke3s=
+github.com/iotaledger/hive.go/core v1.0.0-rc.3.0.20231207181026-f482ac139305 h1:bKDeNAB2zm5mDJgmyRiHBbekkg9OtnfYVGKKRqKyLKk=
+github.com/iotaledger/hive.go/core v1.0.0-rc.3.0.20231207181026-f482ac139305/go.mod h1:CO28KMA6Pp5LJPiigPQQ276zQofES+jMod08U5pyRFA=
+github.com/iotaledger/hive.go/crypto v0.0.0-20231207181026-f482ac139305 h1:OR2TClxTtst906F4tok9xzhBTKO81qrUFdxIAoaZVvE=
+github.com/iotaledger/hive.go/crypto v0.0.0-20231207181026-f482ac139305/go.mod h1:7vHoF//1Pt3nu0l8nDIw7bEgv2GfbL3kSgjp7Rdqhd4=
+github.com/iotaledger/hive.go/ds v0.0.0-20231207181026-f482ac139305 h1:07ujwFv6qWLMRm32ZWZqMpgEugtsIoJ8vCmsn+vpO0E=
+github.com/iotaledger/hive.go/ds v0.0.0-20231207181026-f482ac139305/go.mod h1:NmZRIoxtL6iQdVK6n5W+JOx58K/0Yn8k7WuSvpKPQ+M=
+github.com/iotaledger/hive.go/ierrors v0.0.0-20231207181026-f482ac139305 h1:v7/zMhNcr6hibXFZXZ4xV/S27ESUytQFgUQ1oo10iic=
+github.com/iotaledger/hive.go/ierrors v0.0.0-20231207181026-f482ac139305/go.mod h1:HcE8B5lP96enc/OALTb2/rIIi+yOLouRoHOKRclKmC8=
+github.com/iotaledger/hive.go/lo v0.0.0-20231207181026-f482ac139305 h1:zxVbTEWutMvZhS0VLu/OmBk2WpMjrXQ7l67VBwsExtc=
+github.com/iotaledger/hive.go/lo v0.0.0-20231207181026-f482ac139305/go.mod h1:ETXGXymFyNcUq2t4I9e7ZK18f9bxUWYat4pjZ9W0rWc=
+github.com/iotaledger/hive.go/logger v0.0.0-20231207181026-f482ac139305 h1:ZuOnh3vNZqamoN1ibSj0ZMzIWsYZi6fpBp7BhTi2Qf4=
+github.com/iotaledger/hive.go/logger v0.0.0-20231207181026-f482ac139305/go.mod h1:hVaVODS+Uik0obf3SVEHFQNruUko/uqIgD/GKwhn49M=
+github.com/iotaledger/hive.go/runtime v0.0.0-20231207181026-f482ac139305 h1:7CW1/EbG+RvkjbyOf6JA1u1feax/cpex/6a8CLbaA4I=
+github.com/iotaledger/hive.go/runtime v0.0.0-20231207181026-f482ac139305/go.mod h1:Z9NFsByMh1Kf98f3v3ifeZRycbS2db1hjswTQG1MxnE=
+github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20231207181026-f482ac139305 h1:p79SQs2hE6Mo/MNEfEUfrKTY7gWp6c3oUeqK6I+o1lQ=
+github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20231207181026-f482ac139305/go.mod h1:FoH3T6yKlZJp8xm8K+zsQiibSynp32v21CpWx8xkek8=
+github.com/iotaledger/hive.go/stringify v0.0.0-20231207181026-f482ac139305 h1:KjbaklWvZb4zIcXBETHzl6XFTAf8wtAlFDfaF0Z1Daw=
+github.com/iotaledger/hive.go/stringify v0.0.0-20231207181026-f482ac139305/go.mod h1:FTo/UWzNYgnQ082GI9QVM9HFDERqf9rw9RivNpqrnTs=
+github.com/iotaledger/hive.go/web v0.0.0-20231207181026-f482ac139305 h1:dVzMN31qnSSvxTFzdkq93YNkVOFYsXqvy1Pqx5+OQTg=
+github.com/iotaledger/hive.go/web v0.0.0-20231207181026-f482ac139305/go.mod h1:0ke/puDLuEQptxeUiMaCayFFg6Bqwtcis4o0i6C9uxI=
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20231206124511-b78dc962031f h1:V68Ijq1A64gB9r0Rhc4ybLGH66rXqZ2Ly0L4uuaLrMg=
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20231206124511-b78dc962031f/go.mod h1:Dy3Gv4Dn1zufB177x6IXETP3zTeiWQ1+HMVQR0Bt/ew=
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20231206124145-f773dfe3927e h1:jbtiUlmTpTdGiRBW1pniPSqRcDMJaIW8fGS+uORryas=
@@ -281,8 +281,8 @@ github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RR
github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ=
github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
-github.com/mochi-co/mqtt v1.3.2 h1:cRqBjKdL1yCEWkz/eHWtaN/ZSpkMpK66+biZnrLrHC8=
-github.com/mochi-co/mqtt v1.3.2/go.mod h1:o0lhQFWL8QtR1+8a9JZmbY8FhZ89MF8vGOGHJNFbCB8=
+github.com/mochi-mqtt/server/v2 v2.4.2 h1:x7xC41Qn/ek1hOWNcZraRm+Cmqc2yrfhD5VA1NFnXhc=
+github.com/mochi-mqtt/server/v2 v2.4.2/go.mod h1:M1lZnLbyowXUyQBIlHYlX1wasxXqv/qFWwQxAzfphwA=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
@@ -303,12 +303,11 @@ github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAv
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o=
-github.com/petermattis/goid v0.0.0-20231126143041-f558c26febf5 h1:+qIP3OMrT7SN5kLnTcVEISPOMB/97RyAKTg1UWA738E=
-github.com/petermattis/goid v0.0.0-20231126143041-f558c26febf5/go.mod h1:pxMtw7cyUw6B2bRH0ZBANSPg+AoSud1I1iyJHI69jH4=
+github.com/petermattis/goid v0.0.0-20231207134359-e60b3f734c67 h1:jik8PHtAIsPlCRJjJzl4udgEf7hawInF9texMeO2jrU=
+github.com/petermattis/goid v0.0.0-20231207134359-e60b3f734c67/go.mod h1:pxMtw7cyUw6B2bRH0ZBANSPg+AoSud1I1iyJHI69jH4=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
-github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=