diff --git a/.dockerignore b/.dockerignore index 0f3fdd3..f152a5b 100644 --- a/.dockerignore +++ b/.dockerignore @@ -20,6 +20,7 @@ # Docker related files .dockerignore +Dockerfile* docker-compose.yml # Documentation diff --git a/.gitignore b/.gitignore index 18b3360..540ea1e 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,6 @@ inx-mqtt go.work go.work.sum + +# IDE related files +.vscode/ diff --git a/Dockerfile.debug b/Dockerfile.debug new file mode 100644 index 0000000..7e5be3a --- /dev/null +++ b/Dockerfile.debug @@ -0,0 +1,55 @@ +# https://hub.docker.com/_/golang +FROM golang:1.21-bullseye AS build + +# Ensure ca-certificates are up to date +RUN update-ca-certificates + +# Set the current Working Directory inside the container +RUN mkdir /scratch +WORKDIR /scratch + +# Prepare the folder where we are putting all the files +RUN mkdir /app + +# Copy everything from the current directory to the PWD(Present Working Directory) inside the container +COPY . . + +# Download go modules +RUN go mod download +RUN go mod verify + +# Build the binary (disable inlining and optimizations that can interfere with debugging) +RUN CGO_ENABLED=0 go build -a -gcflags "all=-N -l" -o /app/inx-mqtt . + +# Copy the assets +COPY ./config_defaults.json /app/config.json + +############################ +# Image +############################ +FROM golang:1.21-alpine + +# Install delve +RUN CGO_ENABLED=0 go install -ldflags "-s -w -extldflags '-static'" github.com/go-delve/delve/cmd/dlv@latest + +# Create the nonroot user +ARG USERNAME=nonroot_user +ARG USER_UID=65532 +ARG USER_GID=$USER_UID +ARG USER_HOME=/home/nonroot +RUN addgroup --g $USER_GID $USERNAME && adduser --disabled-password --uid $USER_UID --ingroup "${USERNAME}" --shell /sbin/nologin --home $USER_HOME $USERNAME + +EXPOSE 1883/tcp +EXPOSE 1888/tcp +# Delve +EXPOSE 4000 + +# Copy the app dir into distroless image +COPY --chown=nonroot:nonroot --from=build /app /app + +WORKDIR /app + +# Set USER nonroot +USER $USERNAME + +ENTRYPOINT [ "/go/bin/dlv", "--listen=:4000", "--headless=true", "--log=true", "--accept-multiclient", "--api-version=2", "exec", "/app/inx-mqtt", "--" ] \ No newline at end of file diff --git a/components/app/app.go b/components/app/app.go index 3e0e617..37ad2da 100644 --- a/components/app/app.go +++ b/components/app/app.go @@ -14,7 +14,7 @@ var ( Name = "inx-mqtt" // Version of the app. - Version = "2.0.0-alpha.5" + Version = "2.0.0-alpha.6" ) func App() *app.App { diff --git a/components/mqtt/component.go b/components/mqtt/component.go index 83db251..fa9e400 100644 --- a/components/mqtt/component.go +++ b/components/mqtt/component.go @@ -45,21 +45,23 @@ func provide(c *dig.Container) error { return c.Provide(func(deps inDeps) (*mqtt.Server, error) { broker, err := broker.NewBroker( - broker.WithBufferSize(ParamsMQTT.BufferSize), - broker.WithBufferBlockSize(ParamsMQTT.BufferBlockSize), - broker.WithMaxTopicSubscriptionsPerClient(ParamsMQTT.Subscriptions.MaxTopicSubscriptionsPerClient), - broker.WithTopicCleanupThresholdCount(ParamsMQTT.Subscriptions.TopicsCleanupThresholdCount), - broker.WithTopicCleanupThresholdRatio(ParamsMQTT.Subscriptions.TopicsCleanupThresholdRatio), broker.WithWebsocketEnabled(ParamsMQTT.Websocket.Enabled), broker.WithWebsocketBindAddress(ParamsMQTT.Websocket.BindAddress), broker.WithTCPEnabled(ParamsMQTT.TCP.Enabled), broker.WithTCPBindAddress(ParamsMQTT.TCP.BindAddress), - broker.WithTCPAuthEnabled(ParamsMQTT.TCP.Auth.Enabled), - broker.WithTCPAuthPasswordSalt(ParamsMQTT.TCP.Auth.PasswordSalt), - broker.WithTCPAuthUsers(ParamsMQTT.TCP.Auth.Users), broker.WithTCPTLSEnabled(ParamsMQTT.TCP.TLS.Enabled), broker.WithTCPTLSCertificatePath(ParamsMQTT.TCP.TLS.CertificatePath), broker.WithTCPTLSPrivateKeyPath(ParamsMQTT.TCP.TLS.PrivateKeyPath), + broker.WithAuthPasswordSalt(ParamsMQTT.Auth.PasswordSalt), + broker.WithAuthUsers(ParamsMQTT.Auth.Users), + broker.WithPublicTopics(ParamsMQTT.PublicTopics), + broker.WithProtectedTopics(ParamsMQTT.ProtectedTopics), + broker.WithMaxTopicSubscriptionsPerClient(ParamsMQTT.Subscriptions.MaxTopicSubscriptionsPerClient), + broker.WithTopicCleanupThresholdCount(ParamsMQTT.Subscriptions.TopicsCleanupThresholdCount), + broker.WithTopicCleanupThresholdRatio(ParamsMQTT.Subscriptions.TopicsCleanupThresholdRatio), + broker.WithMaximumClientWritesPending(ParamsMQTT.MaximumClientWritesPending), + broker.WithClientWriteBufferSize(ParamsMQTT.ClientWriteBufferSize), + broker.WithClientReadBufferSize(ParamsMQTT.ClientReadBufferSize), ) if err != nil { return nil, ierrors.Wrap(err, "failed to create MQTT broker") diff --git a/components/mqtt/params.go b/components/mqtt/params.go index e1da867..5b2827b 100644 --- a/components/mqtt/params.go +++ b/components/mqtt/params.go @@ -3,15 +3,6 @@ package mqtt import "github.com/iotaledger/hive.go/app" type ParametersMQTT struct { - BufferSize int `default:"0" usage:"the size of the client buffers in bytes"` - BufferBlockSize int `default:"0" usage:"the size per client buffer R/W block in bytes"` - - Subscriptions struct { - MaxTopicSubscriptionsPerClient int `default:"1000" usage:"the maximum number of topic subscriptions per client before the client gets dropped (DOS protection)"` - TopicsCleanupThresholdCount int `default:"10000" usage:"the number of deleted topics that trigger a garbage collection of the subscription manager"` - TopicsCleanupThresholdRatio float32 `default:"1.0" usage:"the ratio of subscribed topics to deleted topics that trigger a garbage collection of the subscription manager"` - } - Websocket struct { Enabled bool `default:"true" usage:"whether to enable the websocket connection of the MQTT broker"` BindAddress string `default:"localhost:1888" usage:"the websocket bind address on which the MQTT broker listens on"` @@ -22,25 +13,46 @@ type ParametersMQTT struct { Enabled bool `default:"false" usage:"whether to enable the TCP connection of the MQTT broker"` BindAddress string `default:"localhost:1883" usage:"the TCP bind address on which the MQTT broker listens on"` - Auth struct { - Enabled bool `default:"false" usage:"whether to enable auth for TCP connections"` - PasswordSalt string `default:"0000000000000000000000000000000000000000000000000000000000000000" usage:"the auth salt used for hashing the passwords of the users"` - Users map[string]string `usage:"the list of allowed users with their password+salt as a scrypt hash"` - } - TLS struct { Enabled bool `default:"false" usage:"whether to enable TLS for TCP connections"` PrivateKeyPath string `default:"private_key.pem" usage:"the path to the private key file (x509 PEM) for TCP connections with TLS"` CertificatePath string `default:"certificate.pem" usage:"the path to the certificate file (x509 PEM) for TCP connections with TLS"` } `name:"tls"` } `name:"tcp"` + + Auth struct { + PasswordSalt string `default:"0000000000000000000000000000000000000000000000000000000000000000" usage:"the auth salt used for hashing the passwords of the users"` + Users map[string]string `usage:"the list of allowed users with their password+salt as a scrypt hash"` + } + + PublicTopics []string `usage:"the MQTT topics which can be subscribed to without authorization. Wildcards using * are allowed"` + ProtectedTopics []string `usage:"the MQTT topics which only can be subscribed to with valid authorization. Wildcards using * are allowed"` + + Subscriptions struct { + MaxTopicSubscriptionsPerClient int `default:"1000" usage:"the maximum number of topic subscriptions per client before the client gets dropped (DOS protection)"` + TopicsCleanupThresholdCount int `default:"10000" usage:"the number of deleted topics that trigger a garbage collection of the subscription manager"` + TopicsCleanupThresholdRatio float32 `default:"1.0" usage:"the ratio of subscribed topics to deleted topics that trigger a garbage collection of the subscription manager"` + } + + MaximumClientWritesPending int `default:"0" usage:"the maximum number of pending message writes for a client"` + ClientWriteBufferSize int `default:"0" usage:"the size of the client write buffer"` + ClientReadBufferSize int `default:"0" usage:"the size of the client read buffer"` } -var ParamsMQTT = &ParametersMQTT{} +var ParamsMQTT = &ParametersMQTT{ + PublicTopics: []string{ + "commitments/*", + "blocks/*", + "transactions/*", + "block-metadata/*", + "outputs/*", + }, + ProtectedTopics: []string{}, +} var params = &app.ComponentParams{ Params: map[string]any{ "mqtt": ParamsMQTT, }, - Masked: nil, + Masked: []string{"mqtt.auth.passwordSalt"}, } diff --git a/components/prometheus/component.go b/components/prometheus/component.go index bcf3e6c..29b0e69 100644 --- a/components/prometheus/component.go +++ b/components/prometheus/component.go @@ -7,13 +7,13 @@ import ( "github.com/labstack/echo/v4" "github.com/labstack/echo/v4/middleware" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/client_golang/prometheus/promhttp" "go.uber.org/dig" "github.com/iotaledger/hive.go/app" + "github.com/iotaledger/hive.go/ierrors" "github.com/iotaledger/inx-mqtt/pkg/daemon" "github.com/iotaledger/inx-mqtt/pkg/mqtt" ) @@ -94,7 +94,7 @@ func run() error { go func() { Component.LogInfof("You can now access the Prometheus exporter using: http://%s/metrics", ParamsPrometheus.BindAddress) - if err := deps.PrometheusEcho.Start(ParamsPrometheus.BindAddress); err != nil && !errors.Is(err, http.ErrServerClosed) { + if err := deps.PrometheusEcho.Start(ParamsPrometheus.BindAddress); err != nil && !ierrors.Is(err, http.ErrServerClosed) { Component.LogWarnf("Stopped Prometheus exporter due to an error (%s)", err) } }() diff --git a/components/prometheus/metrics.go b/components/prometheus/metrics.go index b32be61..a609edc 100644 --- a/components/prometheus/metrics.go +++ b/components/prometheus/metrics.go @@ -9,22 +9,25 @@ import ( var ( mqttBrokerAppInfo *prometheus.GaugeVec mqttBrokerStarted prometheus.Gauge + mqttBrokerTime prometheus.Gauge mqttBrokerUptime prometheus.Gauge - mqttBrokerBytesRecv prometheus.Gauge + mqttBrokerBytesReceived prometheus.Gauge mqttBrokerBytesSent prometheus.Gauge mqttBrokerClientsConnected prometheus.Gauge mqttBrokerClientsDisconnected prometheus.Gauge - mqttBrokerClientsMax prometheus.Gauge + mqttBrokerClientsMaximum prometheus.Gauge mqttBrokerClientsTotal prometheus.Gauge - mqttBrokerConnectionsTotal prometheus.Gauge - mqttBrokerMessagesRecv prometheus.Gauge + mqttBrokerMessagesReceived prometheus.Gauge mqttBrokerMessagesSent prometheus.Gauge - mqttBrokerPublishDropped prometheus.Gauge - mqttBrokerPublishRecv prometheus.Gauge - mqttBrokerPublishSent prometheus.Gauge + mqttBrokerMessagesDropped prometheus.Gauge mqttBrokerRetained prometheus.Gauge mqttBrokerInflight prometheus.Gauge + mqttBrokerInflightDropped prometheus.Gauge mqttBrokerSubscriptions prometheus.Gauge + mqttBrokerPacketsReceived prometheus.Gauge + mqttBrokerPacketsSent prometheus.Gauge + mqttBrokerMemoryAlloc prometheus.Gauge + mqttBrokerThreads prometheus.Gauge mqttBrokerSubscriptionManagerSubscribersSize prometheus.Gauge mqttBrokerSubscriptionManagerTopicsSize prometheus.Gauge ) @@ -58,49 +61,57 @@ func registerNewMQTTBrokerGauge(registry *prometheus.Registry, name string, help func registerMQTTMetrics(registry *prometheus.Registry) { mqttBrokerAppInfo = registerNewMQTTBrokerGaugeVec(registry, "app_info", []string{"name", "version", "broker_version"}, "The current version of the server.") mqttBrokerStarted = registerNewMQTTBrokerGauge(registry, "started", "The time the server started in unix seconds.") + mqttBrokerTime = registerNewMQTTBrokerGauge(registry, "time", "Current time on the server.") mqttBrokerUptime = registerNewMQTTBrokerGauge(registry, "uptime", "The number of seconds the server has been online.") - mqttBrokerBytesRecv = registerNewMQTTBrokerGauge(registry, "bytes_recv", "The total number of bytes received in all packets.") - mqttBrokerBytesSent = registerNewMQTTBrokerGauge(registry, "bytes_sent", "The total number of bytes sent to clients.") - mqttBrokerClientsConnected = registerNewMQTTBrokerGauge(registry, "clients_connected", "The number of currently connected clients.") - mqttBrokerClientsDisconnected = registerNewMQTTBrokerGauge(registry, "clients_disconnected", "The number of disconnected non-cleansession clients.") - mqttBrokerClientsMax = registerNewMQTTBrokerGauge(registry, "clients_max", "The maximum number of clients that have been concurrently connected.") - mqttBrokerClientsTotal = registerNewMQTTBrokerGauge(registry, "clients_total", "The sum of all clients, connected and disconnected.") - mqttBrokerConnectionsTotal = registerNewMQTTBrokerGauge(registry, "connections_total", "The sum number of clients which have ever connected.") - mqttBrokerMessagesRecv = registerNewMQTTBrokerGauge(registry, "messages_recv", "The total number of packets received.") - mqttBrokerMessagesSent = registerNewMQTTBrokerGauge(registry, "messages_sent", "The total number of packets sent.") - mqttBrokerPublishDropped = registerNewMQTTBrokerGauge(registry, "publish_dropped", "The number of in-flight publish messages which were dropped.") - mqttBrokerPublishRecv = registerNewMQTTBrokerGauge(registry, "publish_recv", "The total number of received publish packets.") - mqttBrokerPublishSent = registerNewMQTTBrokerGauge(registry, "publish_sent", "The total number of sent publish packets.") - mqttBrokerRetained = registerNewMQTTBrokerGauge(registry, "retained", "The number of messages currently retained.") + mqttBrokerBytesReceived = registerNewMQTTBrokerGauge(registry, "bytes_received", "Total number of bytes received since the broker started.") + mqttBrokerBytesSent = registerNewMQTTBrokerGauge(registry, "bytes_sent", "Total number of bytes sent since the broker started.") + mqttBrokerClientsConnected = registerNewMQTTBrokerGauge(registry, "clients_connected", "Number of currently connected clients.") + mqttBrokerClientsDisconnected = registerNewMQTTBrokerGauge(registry, "clients_disconnected", "Total number of persistent clients (with clean session disabled) that are registered at the broker but are currently disconnected.") + mqttBrokerClientsMaximum = registerNewMQTTBrokerGauge(registry, "clients_maximum", "Maximum number of active clients that have been connected.") + mqttBrokerClientsTotal = registerNewMQTTBrokerGauge(registry, "clients_total", "Total number of connected and disconnected clients with a persistent session currently connected and registered.") + mqttBrokerMessagesReceived = registerNewMQTTBrokerGauge(registry, "messages_received", "Total number of publish messages received.") + mqttBrokerMessagesSent = registerNewMQTTBrokerGauge(registry, "messages_sent", "Total number of publish messages sent.") + mqttBrokerMessagesDropped = registerNewMQTTBrokerGauge(registry, "messages_dropped", "Total number of publish messages dropped to slow subscriber.") + mqttBrokerRetained = registerNewMQTTBrokerGauge(registry, "retained", "Total number of retained messages active on the broker.") mqttBrokerInflight = registerNewMQTTBrokerGauge(registry, "inflight", "The number of messages currently in-flight.") - mqttBrokerSubscriptions = registerNewMQTTBrokerGauge(registry, "subscriptions", "The total number of filter subscriptions.") + mqttBrokerInflightDropped = registerNewMQTTBrokerGauge(registry, "inflight_dropped", "The number of inflight messages which were dropped.") + mqttBrokerSubscriptions = registerNewMQTTBrokerGauge(registry, "subscriptions", "Total number of subscriptions active on the broker.") + mqttBrokerPacketsReceived = registerNewMQTTBrokerGauge(registry, "packets_received", "The total number of publish messages received.") + mqttBrokerPacketsSent = registerNewMQTTBrokerGauge(registry, "packets_sent", "Total number of messages of any type sent since the broker started.") + mqttBrokerMemoryAlloc = registerNewMQTTBrokerGauge(registry, "memory_alloc", "Memory currently allocated.") + mqttBrokerThreads = registerNewMQTTBrokerGauge(registry, "threads", "Number of active goroutines, named as threads for platform ambiguity.") mqttBrokerSubscriptionManagerSubscribersSize = registerNewMQTTBrokerGauge(registry, "subscription_manager_subscribers_size", "The number of active subscribers in the subscription manager.") mqttBrokerSubscriptionManagerTopicsSize = registerNewMQTTBrokerGauge(registry, "subscription_manager_topics_size", "The number of active topics in the subscription manager.") } func collectMQTTBroker(server *mqtt.Server) { + brokerSystemInfo := server.MQTTBroker.SystemInfo() + mqttBrokerAppInfo.With(prometheus.Labels{ "name": Component.App().Info().Name, "version": Component.App().Info().Version, - "broker_version": server.MQTTBroker.SystemInfo().Version, + "broker_version": brokerSystemInfo.Version, }).Set(1) - mqttBrokerStarted.Set(float64(server.MQTTBroker.SystemInfo().Started)) - mqttBrokerUptime.Set(float64(server.MQTTBroker.SystemInfo().Uptime)) - mqttBrokerBytesRecv.Set(float64(server.MQTTBroker.SystemInfo().BytesRecv)) - mqttBrokerBytesSent.Set(float64(server.MQTTBroker.SystemInfo().BytesSent)) - mqttBrokerClientsConnected.Set(float64(server.MQTTBroker.SystemInfo().ClientsConnected)) - mqttBrokerClientsDisconnected.Set(float64(server.MQTTBroker.SystemInfo().ClientsDisconnected)) - mqttBrokerClientsMax.Set(float64(server.MQTTBroker.SystemInfo().ClientsMax)) - mqttBrokerClientsTotal.Set(float64(server.MQTTBroker.SystemInfo().ClientsTotal)) - mqttBrokerConnectionsTotal.Set(float64(server.MQTTBroker.SystemInfo().ConnectionsTotal)) - mqttBrokerMessagesRecv.Set(float64(server.MQTTBroker.SystemInfo().MessagesRecv)) - mqttBrokerMessagesSent.Set(float64(server.MQTTBroker.SystemInfo().MessagesSent)) - mqttBrokerPublishDropped.Set(float64(server.MQTTBroker.SystemInfo().PublishDropped)) - mqttBrokerPublishRecv.Set(float64(server.MQTTBroker.SystemInfo().PublishRecv)) - mqttBrokerPublishSent.Set(float64(server.MQTTBroker.SystemInfo().PublishSent)) - mqttBrokerRetained.Set(float64(server.MQTTBroker.SystemInfo().Retained)) - mqttBrokerInflight.Set(float64(server.MQTTBroker.SystemInfo().Inflight)) - mqttBrokerSubscriptions.Set(float64(server.MQTTBroker.SystemInfo().Subscriptions)) + mqttBrokerStarted.Set(float64(brokerSystemInfo.Started)) + mqttBrokerTime.Set(float64(brokerSystemInfo.Time)) + mqttBrokerUptime.Set(float64(brokerSystemInfo.Uptime)) + mqttBrokerBytesReceived.Set(float64(brokerSystemInfo.BytesReceived)) + mqttBrokerBytesSent.Set(float64(brokerSystemInfo.BytesSent)) + mqttBrokerClientsConnected.Set(float64(brokerSystemInfo.ClientsConnected)) + mqttBrokerClientsDisconnected.Set(float64(brokerSystemInfo.ClientsDisconnected)) + mqttBrokerClientsMaximum.Set(float64(brokerSystemInfo.ClientsMaximum)) + mqttBrokerClientsTotal.Set(float64(brokerSystemInfo.ClientsTotal)) + mqttBrokerMessagesReceived.Set(float64(brokerSystemInfo.MessagesReceived)) + mqttBrokerMessagesSent.Set(float64(brokerSystemInfo.MessagesSent)) + mqttBrokerMessagesDropped.Set(float64(brokerSystemInfo.MessagesDropped)) + mqttBrokerRetained.Set(float64(brokerSystemInfo.Retained)) + mqttBrokerInflight.Set(float64(brokerSystemInfo.Inflight)) + mqttBrokerInflightDropped.Set(float64(brokerSystemInfo.InflightDropped)) + mqttBrokerSubscriptions.Set(float64(brokerSystemInfo.Subscriptions)) + mqttBrokerPacketsReceived.Set(float64(brokerSystemInfo.PacketsReceived)) + mqttBrokerPacketsSent.Set(float64(brokerSystemInfo.PacketsSent)) + mqttBrokerMemoryAlloc.Set(float64(brokerSystemInfo.MemoryAlloc)) + mqttBrokerThreads.Set(float64(brokerSystemInfo.Threads)) mqttBrokerSubscriptionManagerSubscribersSize.Set(float64(server.MQTTBroker.SubscribersSize())) mqttBrokerSubscriptionManagerTopicsSize.Set(float64(server.MQTTBroker.TopicsSize())) } diff --git a/config_defaults.json b/config_defaults.json index 2f946d9..bca5233 100644 --- a/config_defaults.json +++ b/config_defaults.json @@ -29,13 +29,6 @@ "targetNetworkName": "" }, "mqtt": { - "bufferSize": 0, - "bufferBlockSize": 0, - "subscriptions": { - "maxTopicSubscriptionsPerClient": 1000, - "topicsCleanupThresholdCount": 10000, - "topicsCleanupThresholdRatio": 1 - }, "websocket": { "enabled": true, "bindAddress": "localhost:1888", @@ -44,17 +37,32 @@ "tcp": { "enabled": false, "bindAddress": "localhost:1883", - "auth": { - "enabled": false, - "passwordSalt": "0000000000000000000000000000000000000000000000000000000000000000", - "users": null - }, "tls": { "enabled": false, "privateKeyPath": "private_key.pem", "certificatePath": "certificate.pem" } - } + }, + "auth": { + "passwordSalt": "0000000000000000000000000000000000000000000000000000000000000000", + "users": null + }, + "publicTopics": [ + "commitments/*", + "blocks/*", + "transactions/*", + "block-metadata/*", + "outputs/*" + ], + "protectedTopics": [], + "subscriptions": { + "maxTopicSubscriptionsPerClient": 1000, + "topicsCleanupThresholdCount": 10000, + "topicsCleanupThresholdRatio": 1 + }, + "maximumClientWritesPending": 0, + "clientWriteBufferSize": 0, + "clientReadBufferSize": 0 }, "profiling": { "enabled": false, diff --git a/configuration.md b/configuration.md index 743249c..704469f 100755 --- a/configuration.md +++ b/configuration.md @@ -131,21 +131,17 @@ Example: ## 4. MQTT -| Name | Description | Type | Default value | -| ------------------------------------ | --------------------------------------------- | ------ | ------------- | -| bufferSize | The size of the client buffers in bytes | int | 0 | -| bufferBlockSize | The size per client buffer R/W block in bytes | int | 0 | -| [subscriptions](#mqtt_subscriptions) | Configuration for subscriptions | object | | -| [websocket](#mqtt_websocket) | Configuration for websocket | object | | -| [tcp](#mqtt_tcp) | Configuration for TCP | object | | - -### Subscriptions - -| Name | Description | Type | Default value | -| ------------------------------ | -------------------------------------------------------------------------------------------------------------- | ----- | ------------- | -| maxTopicSubscriptionsPerClient | The maximum number of topic subscriptions per client before the client gets dropped (DOS protection) | int | 1000 | -| topicsCleanupThresholdCount | The number of deleted topics that trigger a garbage collection of the subscription manager | int | 10000 | -| topicsCleanupThresholdRatio | The ratio of subscribed topics to deleted topics that trigger a garbage collection of the subscription manager | float | 1.0 | +| Name | Description | Type | Default value | +| ------------------------------------ | ------------------------------------------------------------------------------------------------------- | ------ | -------------------------------------------------------------------------------- | +| [websocket](#mqtt_websocket) | Configuration for websocket | object | | +| [tcp](#mqtt_tcp) | Configuration for TCP | object | | +| [auth](#mqtt_auth) | Configuration for auth | object | | +| publicTopics | The MQTT topics which can be subscribed to without authorization. Wildcards using \* are allowed | array | commitments/\*
blocks/\*
transactions/\*
block-metadata/\*
outputs/\* | +| protectedTopics | The MQTT topics which only can be subscribed to with valid authorization. Wildcards using \* are allowed | array | | +| [subscriptions](#mqtt_subscriptions) | Configuration for subscriptions | object | | +| maximumClientWritesPending | The maximum number of pending message writes for a client | int | 0 | +| clientWriteBufferSize | The size of the client write buffer | int | 0 | +| clientReadBufferSize | The size of the client read buffer | int | 0 | ### Websocket @@ -157,20 +153,11 @@ Example: ### TCP -| Name | Description | Type | Default value | -| ---------------------- | -------------------------------------------------------- | ------- | ---------------- | -| enabled | Whether to enable the TCP connection of the MQTT broker | boolean | false | -| bindAddress | The TCP bind address on which the MQTT broker listens on | string | "localhost:1883" | -| [auth](#mqtt_tcp_auth) | Configuration for auth | object | | -| [tls](#mqtt_tcp_tls) | Configuration for TLS | object | | - -### Auth - -| Name | Description | Type | Default value | -| ------------ | ------------------------------------------------------------------- | ------- | ------------------------------------------------------------------ | -| enabled | Whether to enable auth for TCP connections | boolean | false | -| passwordSalt | The auth salt used for hashing the passwords of the users | string | "0000000000000000000000000000000000000000000000000000000000000000" | -| users | The list of allowed users with their password+salt as a scrypt hash | object | [] | +| Name | Description | Type | Default value | +| -------------------- | -------------------------------------------------------- | ------- | ---------------- | +| enabled | Whether to enable the TCP connection of the MQTT broker | boolean | false | +| bindAddress | The TCP bind address on which the MQTT broker listens on | string | "localhost:1883" | +| [tls](#mqtt_tcp_tls) | Configuration for TLS | object | | ### TLS @@ -180,18 +167,26 @@ Example: | privateKeyPath | The path to the private key file (x509 PEM) for TCP connections with TLS | string | "private_key.pem" | | certificatePath | The path to the certificate file (x509 PEM) for TCP connections with TLS | string | "certificate.pem" | +### Auth + +| Name | Description | Type | Default value | +| ------------ | ------------------------------------------------------------------- | ------ | ------------------------------------------------------------------ | +| passwordSalt | The auth salt used for hashing the passwords of the users | string | "0000000000000000000000000000000000000000000000000000000000000000" | +| users | The list of allowed users with their password+salt as a scrypt hash | object | [] | + +### Subscriptions + +| Name | Description | Type | Default value | +| ------------------------------ | -------------------------------------------------------------------------------------------------------------- | ----- | ------------- | +| maxTopicSubscriptionsPerClient | The maximum number of topic subscriptions per client before the client gets dropped (DOS protection) | int | 1000 | +| topicsCleanupThresholdCount | The number of deleted topics that trigger a garbage collection of the subscription manager | int | 10000 | +| topicsCleanupThresholdRatio | The ratio of subscribed topics to deleted topics that trigger a garbage collection of the subscription manager | float | 1.0 | + Example: ```json { "mqtt": { - "bufferSize": 0, - "bufferBlockSize": 0, - "subscriptions": { - "maxTopicSubscriptionsPerClient": 1000, - "topicsCleanupThresholdCount": 10000, - "topicsCleanupThresholdRatio": 1 - }, "websocket": { "enabled": true, "bindAddress": "localhost:1888", @@ -200,17 +195,32 @@ Example: "tcp": { "enabled": false, "bindAddress": "localhost:1883", - "auth": { - "enabled": false, - "passwordSalt": "0000000000000000000000000000000000000000000000000000000000000000", - "users": null - }, "tls": { "enabled": false, "privateKeyPath": "private_key.pem", "certificatePath": "certificate.pem" } - } + }, + "auth": { + "passwordSalt": "0000000000000000000000000000000000000000000000000000000000000000", + "users": null + }, + "publicTopics": [ + "commitments/*", + "blocks/*", + "transactions/*", + "block-metadata/*", + "outputs/*" + ], + "protectedTopics": [], + "subscriptions": { + "maxTopicSubscriptionsPerClient": 1000, + "topicsCleanupThresholdCount": 10000, + "topicsCleanupThresholdRatio": 1 + }, + "maximumClientWritesPending": 0, + "clientWriteBufferSize": 0, + "clientReadBufferSize": 0 } } ``` diff --git a/go.mod b/go.mod index 0b6d910..3e4ce9c 100644 --- a/go.mod +++ b/go.mod @@ -2,21 +2,18 @@ module github.com/iotaledger/inx-mqtt go 1.21 -replace github.com/mochi-co/mqtt => github.com/alexsporn/mqtt v0.0.0-20220909140721-d60c438960a4 - require ( - github.com/iotaledger/hive.go/app v0.0.0-20231206114953-6a65a82e30ad - github.com/iotaledger/hive.go/ierrors v0.0.0-20231206114953-6a65a82e30ad - github.com/iotaledger/hive.go/lo v0.0.0-20231206114953-6a65a82e30ad - github.com/iotaledger/hive.go/logger v0.0.0-20231206114953-6a65a82e30ad - github.com/iotaledger/hive.go/runtime v0.0.0-20231206114953-6a65a82e30ad - github.com/iotaledger/hive.go/web v0.0.0-20231130122510-e3dddb0214f0 + github.com/iotaledger/hive.go/app v0.0.0-20231207181026-f482ac139305 + github.com/iotaledger/hive.go/ierrors v0.0.0-20231207181026-f482ac139305 + github.com/iotaledger/hive.go/lo v0.0.0-20231207181026-f482ac139305 + github.com/iotaledger/hive.go/logger v0.0.0-20231207181026-f482ac139305 + github.com/iotaledger/hive.go/runtime v0.0.0-20231207181026-f482ac139305 + github.com/iotaledger/hive.go/web v0.0.0-20231207181026-f482ac139305 github.com/iotaledger/inx-app v1.0.0-rc.3.0.20231206124511-b78dc962031f github.com/iotaledger/inx/go v1.0.0-rc.2.0.20231206124145-f773dfe3927e github.com/iotaledger/iota.go/v4 v4.0.0-20231206123921-2af411eef0b5 github.com/labstack/echo/v4 v4.11.3 - github.com/mochi-co/mqtt v1.3.2 - github.com/pkg/errors v0.9.1 + github.com/mochi-mqtt/server/v2 v2.4.2 github.com/prometheus/client_golang v1.17.0 github.com/stretchr/testify v1.8.4 go.uber.org/dig v1.17.1 @@ -46,12 +43,12 @@ require ( github.com/hashicorp/go-version v1.6.0 // indirect github.com/holiman/uint256 v1.2.4 // indirect github.com/iancoleman/orderedmap v0.3.0 // indirect - github.com/iotaledger/hive.go/constraints v0.0.0-20231206114953-6a65a82e30ad // indirect - github.com/iotaledger/hive.go/core v1.0.0-rc.3.0.20231206114953-6a65a82e30ad // indirect - github.com/iotaledger/hive.go/crypto v0.0.0-20231206114953-6a65a82e30ad // indirect - github.com/iotaledger/hive.go/ds v0.0.0-20231206114953-6a65a82e30ad // indirect - github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20231206114953-6a65a82e30ad // indirect - github.com/iotaledger/hive.go/stringify v0.0.0-20231206114953-6a65a82e30ad // indirect + github.com/iotaledger/hive.go/constraints v0.0.0-20231207181026-f482ac139305 // indirect + github.com/iotaledger/hive.go/core v1.0.0-rc.3.0.20231207181026-f482ac139305 // indirect + github.com/iotaledger/hive.go/crypto v0.0.0-20231207181026-f482ac139305 // indirect + github.com/iotaledger/hive.go/ds v0.0.0-20231207181026-f482ac139305 // indirect + github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20231207181026-f482ac139305 // indirect + github.com/iotaledger/hive.go/stringify v0.0.0-20231207181026-f482ac139305 // indirect github.com/knadh/koanf v1.5.0 // indirect github.com/kr/text v0.2.0 // indirect github.com/labstack/gommon v0.4.1 // indirect @@ -64,7 +61,7 @@ require ( github.com/mr-tron/base58 v1.2.0 // indirect github.com/pasztorpisti/qs v0.0.0-20171216220353-8d6c33ee906c // indirect github.com/pelletier/go-toml/v2 v2.1.0 // indirect - github.com/petermattis/goid v0.0.0-20231126143041-f558c26febf5 // indirect + github.com/petermattis/goid v0.0.0-20231207134359-e60b3f734c67 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.45.0 // indirect diff --git a/go.sum b/go.sum index ccfe246..3bfa23c 100644 --- a/go.sum +++ b/go.sum @@ -8,8 +8,6 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/alexsporn/mqtt v0.0.0-20220909140721-d60c438960a4 h1:gH1kgQ6DT3UWcls15FgGBuoose4XfthuqM9AW+H+iXc= -github.com/alexsporn/mqtt v0.0.0-20220909140721-d60c438960a4/go.mod h1:o0lhQFWL8QtR1+8a9JZmbY8FhZ89MF8vGOGHJNFbCB8= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= @@ -184,30 +182,30 @@ github.com/holiman/uint256 v1.2.4/go.mod h1:EOMSn4q6Nyt9P6efbI3bueV4e1b3dGlUCXei github.com/iancoleman/orderedmap v0.3.0 h1:5cbR2grmZR/DiVt+VJopEhtVs9YGInGIxAoMJn+Ichc= github.com/iancoleman/orderedmap v0.3.0/go.mod h1:XuLcCUkdL5owUCQeF2Ue9uuw1EptkJDkXXS7VoV7XGE= github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w= -github.com/iotaledger/hive.go/app v0.0.0-20231206114953-6a65a82e30ad h1:v7dkbVLSsmzgOWT2vjvv1MdKQXvqFbvIkx8mvh6VK7g= -github.com/iotaledger/hive.go/app v0.0.0-20231206114953-6a65a82e30ad/go.mod h1:hTHKGFbZnuiW8yEgDuuL7ZjQTCnl8bXyHLmj3LPa648= -github.com/iotaledger/hive.go/constraints v0.0.0-20231206114953-6a65a82e30ad h1:4XL7IIvdsWHxSKQfU+sgq3H9egN54053LF9TwMfDcTg= -github.com/iotaledger/hive.go/constraints v0.0.0-20231206114953-6a65a82e30ad/go.mod h1:dOBOM2s4se3HcWefPe8sQLUalGXJ8yVXw58oK8jke3s= -github.com/iotaledger/hive.go/core v1.0.0-rc.3.0.20231206114953-6a65a82e30ad h1:iNzb/Oy/nucIOXOzRcwSqqFsaeKwr2JZpZYSLp8xjlE= -github.com/iotaledger/hive.go/core v1.0.0-rc.3.0.20231206114953-6a65a82e30ad/go.mod h1:CO28KMA6Pp5LJPiigPQQ276zQofES+jMod08U5pyRFA= -github.com/iotaledger/hive.go/crypto v0.0.0-20231206114953-6a65a82e30ad h1:pUL2UZbF4S8FIV7uKo9p+IGfZ658K1VNorQ6rzDMRvs= -github.com/iotaledger/hive.go/crypto v0.0.0-20231206114953-6a65a82e30ad/go.mod h1:7vHoF//1Pt3nu0l8nDIw7bEgv2GfbL3kSgjp7Rdqhd4= -github.com/iotaledger/hive.go/ds v0.0.0-20231206114953-6a65a82e30ad h1:adLrD6dOEkM5Xdg6AOPt9/HYqy/pQ5FrprDpW4/VqUU= -github.com/iotaledger/hive.go/ds v0.0.0-20231206114953-6a65a82e30ad/go.mod h1:NmZRIoxtL6iQdVK6n5W+JOx58K/0Yn8k7WuSvpKPQ+M= -github.com/iotaledger/hive.go/ierrors v0.0.0-20231206114953-6a65a82e30ad h1:WDl58zJKHfwbzHs+ZB8Jq3YNgVQE5Neu2NeaX3FZuyU= -github.com/iotaledger/hive.go/ierrors v0.0.0-20231206114953-6a65a82e30ad/go.mod h1:HcE8B5lP96enc/OALTb2/rIIi+yOLouRoHOKRclKmC8= -github.com/iotaledger/hive.go/lo v0.0.0-20231206114953-6a65a82e30ad h1:qpCsjw+InLL824QPu3lY/osck4DhucBKhCs5/E8OH+A= -github.com/iotaledger/hive.go/lo v0.0.0-20231206114953-6a65a82e30ad/go.mod h1:ETXGXymFyNcUq2t4I9e7ZK18f9bxUWYat4pjZ9W0rWc= -github.com/iotaledger/hive.go/logger v0.0.0-20231206114953-6a65a82e30ad h1:fazCxogqOLDEPNDPWYDLTDpYmwgTJgIaC2Z6VN52S4M= -github.com/iotaledger/hive.go/logger v0.0.0-20231206114953-6a65a82e30ad/go.mod h1:hVaVODS+Uik0obf3SVEHFQNruUko/uqIgD/GKwhn49M= -github.com/iotaledger/hive.go/runtime v0.0.0-20231206114953-6a65a82e30ad h1:HpupWK8iqFt+Sdogkh2/N8ojalmevYy+FzhjOuy7Y7E= -github.com/iotaledger/hive.go/runtime v0.0.0-20231206114953-6a65a82e30ad/go.mod h1:Z9NFsByMh1Kf98f3v3ifeZRycbS2db1hjswTQG1MxnE= -github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20231206114953-6a65a82e30ad h1:c8uwbBZDqpiCNN9/9Jji7Z4lL0GdVnORp8WMouiuknk= -github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20231206114953-6a65a82e30ad/go.mod h1:FoH3T6yKlZJp8xm8K+zsQiibSynp32v21CpWx8xkek8= -github.com/iotaledger/hive.go/stringify v0.0.0-20231206114953-6a65a82e30ad h1:VC3OgdSbyngY7/gxVj66fKd/nGmN6P0/myr348nx7vA= -github.com/iotaledger/hive.go/stringify v0.0.0-20231206114953-6a65a82e30ad/go.mod h1:FTo/UWzNYgnQ082GI9QVM9HFDERqf9rw9RivNpqrnTs= -github.com/iotaledger/hive.go/web v0.0.0-20231130122510-e3dddb0214f0 h1:bIfWp0AOQXMv3VkDAOQwqvMYLz1Ila1Z2hqiY2RJ3Io= -github.com/iotaledger/hive.go/web v0.0.0-20231130122510-e3dddb0214f0/go.mod h1:L/CLz7skt9dvidhBOw2gmMGhmrUBHXlA0b3paugdsE4= +github.com/iotaledger/hive.go/app v0.0.0-20231207181026-f482ac139305 h1:NPeZUkOI0xTKnuWefzcLRKfHr/shXmfoXDsfqXq8P0Q= +github.com/iotaledger/hive.go/app v0.0.0-20231207181026-f482ac139305/go.mod h1:hTHKGFbZnuiW8yEgDuuL7ZjQTCnl8bXyHLmj3LPa648= +github.com/iotaledger/hive.go/constraints v0.0.0-20231207181026-f482ac139305 h1:ar+IWfqO7B1M5+kuKGUJnfg0i/YuuM1oN5i8byp/F7A= +github.com/iotaledger/hive.go/constraints v0.0.0-20231207181026-f482ac139305/go.mod h1:dOBOM2s4se3HcWefPe8sQLUalGXJ8yVXw58oK8jke3s= +github.com/iotaledger/hive.go/core v1.0.0-rc.3.0.20231207181026-f482ac139305 h1:bKDeNAB2zm5mDJgmyRiHBbekkg9OtnfYVGKKRqKyLKk= +github.com/iotaledger/hive.go/core v1.0.0-rc.3.0.20231207181026-f482ac139305/go.mod h1:CO28KMA6Pp5LJPiigPQQ276zQofES+jMod08U5pyRFA= +github.com/iotaledger/hive.go/crypto v0.0.0-20231207181026-f482ac139305 h1:OR2TClxTtst906F4tok9xzhBTKO81qrUFdxIAoaZVvE= +github.com/iotaledger/hive.go/crypto v0.0.0-20231207181026-f482ac139305/go.mod h1:7vHoF//1Pt3nu0l8nDIw7bEgv2GfbL3kSgjp7Rdqhd4= +github.com/iotaledger/hive.go/ds v0.0.0-20231207181026-f482ac139305 h1:07ujwFv6qWLMRm32ZWZqMpgEugtsIoJ8vCmsn+vpO0E= +github.com/iotaledger/hive.go/ds v0.0.0-20231207181026-f482ac139305/go.mod h1:NmZRIoxtL6iQdVK6n5W+JOx58K/0Yn8k7WuSvpKPQ+M= +github.com/iotaledger/hive.go/ierrors v0.0.0-20231207181026-f482ac139305 h1:v7/zMhNcr6hibXFZXZ4xV/S27ESUytQFgUQ1oo10iic= +github.com/iotaledger/hive.go/ierrors v0.0.0-20231207181026-f482ac139305/go.mod h1:HcE8B5lP96enc/OALTb2/rIIi+yOLouRoHOKRclKmC8= +github.com/iotaledger/hive.go/lo v0.0.0-20231207181026-f482ac139305 h1:zxVbTEWutMvZhS0VLu/OmBk2WpMjrXQ7l67VBwsExtc= +github.com/iotaledger/hive.go/lo v0.0.0-20231207181026-f482ac139305/go.mod h1:ETXGXymFyNcUq2t4I9e7ZK18f9bxUWYat4pjZ9W0rWc= +github.com/iotaledger/hive.go/logger v0.0.0-20231207181026-f482ac139305 h1:ZuOnh3vNZqamoN1ibSj0ZMzIWsYZi6fpBp7BhTi2Qf4= +github.com/iotaledger/hive.go/logger v0.0.0-20231207181026-f482ac139305/go.mod h1:hVaVODS+Uik0obf3SVEHFQNruUko/uqIgD/GKwhn49M= +github.com/iotaledger/hive.go/runtime v0.0.0-20231207181026-f482ac139305 h1:7CW1/EbG+RvkjbyOf6JA1u1feax/cpex/6a8CLbaA4I= +github.com/iotaledger/hive.go/runtime v0.0.0-20231207181026-f482ac139305/go.mod h1:Z9NFsByMh1Kf98f3v3ifeZRycbS2db1hjswTQG1MxnE= +github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20231207181026-f482ac139305 h1:p79SQs2hE6Mo/MNEfEUfrKTY7gWp6c3oUeqK6I+o1lQ= +github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20231207181026-f482ac139305/go.mod h1:FoH3T6yKlZJp8xm8K+zsQiibSynp32v21CpWx8xkek8= +github.com/iotaledger/hive.go/stringify v0.0.0-20231207181026-f482ac139305 h1:KjbaklWvZb4zIcXBETHzl6XFTAf8wtAlFDfaF0Z1Daw= +github.com/iotaledger/hive.go/stringify v0.0.0-20231207181026-f482ac139305/go.mod h1:FTo/UWzNYgnQ082GI9QVM9HFDERqf9rw9RivNpqrnTs= +github.com/iotaledger/hive.go/web v0.0.0-20231207181026-f482ac139305 h1:dVzMN31qnSSvxTFzdkq93YNkVOFYsXqvy1Pqx5+OQTg= +github.com/iotaledger/hive.go/web v0.0.0-20231207181026-f482ac139305/go.mod h1:0ke/puDLuEQptxeUiMaCayFFg6Bqwtcis4o0i6C9uxI= github.com/iotaledger/inx-app v1.0.0-rc.3.0.20231206124511-b78dc962031f h1:V68Ijq1A64gB9r0Rhc4ybLGH66rXqZ2Ly0L4uuaLrMg= github.com/iotaledger/inx-app v1.0.0-rc.3.0.20231206124511-b78dc962031f/go.mod h1:Dy3Gv4Dn1zufB177x6IXETP3zTeiWQ1+HMVQR0Bt/ew= github.com/iotaledger/inx/go v1.0.0-rc.2.0.20231206124145-f773dfe3927e h1:jbtiUlmTpTdGiRBW1pniPSqRcDMJaIW8fGS+uORryas= @@ -279,6 +277,8 @@ github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RR github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= +github.com/mochi-mqtt/server/v2 v2.4.2 h1:x7xC41Qn/ek1hOWNcZraRm+Cmqc2yrfhD5VA1NFnXhc= +github.com/mochi-mqtt/server/v2 v2.4.2/go.mod h1:M1lZnLbyowXUyQBIlHYlX1wasxXqv/qFWwQxAzfphwA= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= @@ -299,12 +299,11 @@ github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAv github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o= -github.com/petermattis/goid v0.0.0-20231126143041-f558c26febf5 h1:+qIP3OMrT7SN5kLnTcVEISPOMB/97RyAKTg1UWA738E= -github.com/petermattis/goid v0.0.0-20231126143041-f558c26febf5/go.mod h1:pxMtw7cyUw6B2bRH0ZBANSPg+AoSud1I1iyJHI69jH4= +github.com/petermattis/goid v0.0.0-20231207134359-e60b3f734c67 h1:jik8PHtAIsPlCRJjJzl4udgEf7hawInF9texMeO2jrU= +github.com/petermattis/goid v0.0.0-20231207134359-e60b3f734c67/go.mod h1:pxMtw7cyUw6B2bRH0ZBANSPg+AoSud1I1iyJHI69jH4= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= diff --git a/pkg/broker/auth.go b/pkg/broker/auth.go deleted file mode 100644 index a8d25de..0000000 --- a/pkg/broker/auth.go +++ /dev/null @@ -1,87 +0,0 @@ -package broker - -import ( - "encoding/hex" - "errors" - "fmt" - - "github.com/iotaledger/hive.go/web/basicauth" -) - -// AuthAllowEveryone allows everyone, but without write permission. -type AuthAllowEveryone struct{} - -// Authenticate returns true if a username and password are acceptable. -// -//nolint:revive // we don't want to remove the user variable here, even if unused -func (a *AuthAllowEveryone) Authenticate(user, password []byte) bool { - return true -} - -// ACL returns true if a user has access permissions to read or write on a topic. -// -//nolint:revive // we don't want to remove the user variable here, even if unused -func (a *AuthAllowEveryone) ACL(user []byte, topic string, write bool) bool { - // clients are not allowed to write - return !write -} - -// AuthAllowBasicAuth allows users that authenticate with basic auth, but without write permission. -type AuthAllowBasicAuth struct { - Salt []byte - Users map[string][]byte -} - -func NewAuthAllowUsers(passwordSaltHex string, users map[string]string) (*AuthAllowBasicAuth, error) { - - if len(passwordSaltHex) != 64 { - return nil, errors.New("password salt must be 64 (hex encoded) in length") - } - - passwordSalt, err := hex.DecodeString(passwordSaltHex) - if err != nil { - return nil, fmt.Errorf("parsing password salt failed: %w", err) - } - - usersWithHashedPasswords := make(map[string][]byte) - for user, passwordHashHex := range users { - if len(passwordHashHex) != 64 { - return nil, fmt.Errorf("password hash for user %s must be 64 (hex encoded scrypt hash) in length", user) - } - - password, err := hex.DecodeString(passwordHashHex) - if err != nil { - return nil, fmt.Errorf("parsing password hash for user %s failed: %w", user, err) - } - - usersWithHashedPasswords[user] = password - } - - return &AuthAllowBasicAuth{ - Users: usersWithHashedPasswords, - Salt: passwordSalt, - }, nil -} - -// Authenticate returns true if a username and password are acceptable. -func (a *AuthAllowBasicAuth) Authenticate(user, password []byte) bool { - // If the user exists in the auth users map, and the password is correct, - // then they can connect to the server. - if hashedPassword, exists := a.Users[string(user)]; exists { - - // error is ignored because it returns false in case it can't be derived - if valid, _ := basicauth.VerifyPassword(password, a.Salt, hashedPassword); valid { - return true - } - } - - return false -} - -// ACL returns true if a user has access permissions to read or write on a topic. -// -//nolint:revive // we don't want to remove the user variable here, even if unused -func (a *AuthAllowBasicAuth) ACL(user []byte, topic string, write bool) bool { - // clients are not allowed to write - return !write -} diff --git a/pkg/broker/broker.go b/pkg/broker/broker.go index 2a11260..b75786d 100644 --- a/pkg/broker/broker.go +++ b/pkg/broker/broker.go @@ -2,16 +2,15 @@ package broker import ( "crypto/tls" - "errors" - "fmt" + "math" "net" - mqtt "github.com/mochi-co/mqtt/server" - "github.com/mochi-co/mqtt/server/events" - "github.com/mochi-co/mqtt/server/listeners" - "github.com/mochi-co/mqtt/server/listeners/auth" - "github.com/mochi-co/mqtt/server/system" + mqtt "github.com/mochi-mqtt/server/v2" + "github.com/mochi-mqtt/server/v2/listeners" + "github.com/mochi-mqtt/server/v2/system" + "github.com/iotaledger/hive.go/ierrors" + "github.com/iotaledger/hive.go/web/basicauth" "github.com/iotaledger/hive.go/web/subscriptionmanager" ) @@ -48,28 +47,42 @@ func NewBroker(brokerOpts ...Option) (Broker, error) { opts.ApplyOnDefault(brokerOpts...) if !opts.WebsocketEnabled && !opts.TCPEnabled { - return nil, errors.New("at least websocket or TCP must be enabled") + return nil, ierrors.New("at least websocket or TCP must be enabled") } - server := mqtt.NewServer(&mqtt.Options{ - BufferSize: opts.BufferSize, - BufferBlockSize: opts.BufferBlockSize, - InflightTTL: 30, + server := mqtt.New(&mqtt.Options{ + // Capabilities defines the server features and behavior. + Capabilities: &mqtt.Capabilities{ + MaximumSessionExpiryInterval: 0, // we don't keep disconnected sessions + MaximumMessageExpiryInterval: 10, // maximum message expiry if message expiry is 0 or over + ReceiveMaximum: 1024, // maximum number of concurrent qos messages per client + MaximumQos: 0, // we don't support QoS 1 and 2, we only "fire and forget" + RetainAvailable: 0, // retain messages is disabled in our usecase, we handle it ourselves + MaximumPacketSize: 1024, // we allow some bytes for the CONNECT and SUBSCRIBE messages, but otherwise the client is not allowed to send any messages anyways + TopicAliasMaximum: math.MaxUint16, // maximum topic alias value (default) + WildcardSubAvailable: 0, // wildcard subscriptions are disabled because we handle them ourselves, not on server side + SubIDAvailable: 0, // subscription identifiers are disabled, since we don't enable wildcard subscriptions in the server + SharedSubAvailable: 0, // shared subscriptions are prohibited in our usecase + MinimumProtocolVersion: 3, // minimum supported mqtt version (3.0.0) (default) + MaximumClientWritesPending: int32(opts.MaximumClientWritesPending), + }, + ClientNetWriteBufferSize: opts.ClientWriteBufferSize, + ClientNetReadBufferSize: opts.ClientReadBufferSize, + InlineClient: true, // this needs to be set to true to allow the broker to send messages itself }) if opts.WebsocketEnabled { // check websocket bind address _, _, err := net.SplitHostPort(opts.WebsocketBindAddress) if err != nil { - return nil, fmt.Errorf("parsing websocket bind address (%s) failed: %w", opts.WebsocketBindAddress, err) + return nil, ierrors.Errorf("parsing websocket bind address (%s) failed: %w", opts.WebsocketBindAddress, err) } - ws := listeners.NewWebsocket("ws1", opts.WebsocketBindAddress) - if err := server.AddListener(ws, &listeners.Config{ - Auth: &AuthAllowEveryone{}, - TLS: nil, - }); err != nil { - return nil, fmt.Errorf("adding websocket listener failed: %w", err) + // skip TLS config since it is added via traefik in our recommended setup anyway + ws := listeners.NewWebsocket("ws1", opts.WebsocketBindAddress, &listeners.Config{TLSConfig: nil}) + + if err := server.AddListener(ws); err != nil { + return nil, ierrors.Errorf("adding websocket listener failed: %w", err) } } @@ -77,48 +90,34 @@ func NewBroker(brokerOpts ...Option) (Broker, error) { // check tcp bind address _, _, err := net.SplitHostPort(opts.TCPBindAddress) if err != nil { - return nil, fmt.Errorf("parsing TCP bind address (%s) failed: %w", opts.TCPBindAddress, err) - } - - tcp := listeners.NewTCP("t1", opts.TCPBindAddress) - - var tcpAuthController auth.Controller - if opts.TCPAuthEnabled { - var err error - tcpAuthController, err = NewAuthAllowUsers(opts.TCPAuthPasswordSalt, opts.TCPAuthUsers) - if err != nil { - return nil, fmt.Errorf("enabling TCP Authentication failed: %w", err) - } - } else { - tcpAuthController = &AuthAllowEveryone{} + return nil, ierrors.Errorf("parsing TCP bind address (%s) failed: %w", opts.TCPBindAddress, err) } var tlsConfig *tls.Config if opts.TCPTLSEnabled { - var err error - tlsConfig, err = NewTLSConfig(opts.TCPTLSCertificatePath, opts.TCPTLSPrivateKeyPath) if err != nil { - return nil, fmt.Errorf("enabling TCP TLS failed: %w", err) + return nil, ierrors.Errorf("enabling TCP TLS failed: %w", err) } } - if err := server.AddListener(tcp, &listeners.Config{ - Auth: tcpAuthController, + tcp := listeners.NewTCP("t1", opts.TCPBindAddress, &listeners.Config{ TLSConfig: tlsConfig, - }); err != nil { - return nil, fmt.Errorf("adding TCP listener failed: %w", err) + }) + + if err := server.AddListener(tcp); err != nil { + return nil, ierrors.Errorf("adding TCP listener failed: %w", err) } } - s := subscriptionmanager.New( + subscriptionManager := subscriptionmanager.New( subscriptionmanager.WithMaxTopicSubscriptionsPerClient[string, string](opts.MaxTopicSubscriptionsPerClient), subscriptionmanager.WithCleanupThresholdCount[string, string](opts.TopicCleanupThresholdCount), subscriptionmanager.WithCleanupThresholdRatio[string, string](opts.TopicCleanupThresholdRatio), ) // this event is used to drop malicious clients - unhook := s.Events().DropClient.Hook(func(event *subscriptionmanager.DropClientEvent[string]) { + unhook := subscriptionManager.Events().DropClient.Hook(func(event *subscriptionmanager.DropClientEvent[string]) { client, exists := server.Clients.Get(event.ClientID) if !exists { return @@ -131,27 +130,25 @@ func NewBroker(brokerOpts ...Option) (Broker, error) { server.Clients.Delete(event.ClientID) }).Unhook - // bind the broker events to the SubscriptionManager to track the subscriptions - server.Events.OnConnect = func(cl events.Client, pk events.Packet) { - s.Connect(cl.ID) - } - - server.Events.OnDisconnect = func(cl events.Client, err error) { - s.Disconnect(cl.ID) + basicAuthManager, err := basicauth.NewBasicAuthManager(opts.AuthUsers, opts.AuthPasswordSalt) + if err != nil { + return nil, ierrors.Errorf("failed to create basic auth manager: %w", err) } - server.Events.OnSubscribe = func(topic string, cl events.Client, qos byte) { - s.Subscribe(cl.ID, topic) + // bind the broker events to the SubscriptionManager to track the subscriptions + brokerHook, err := NewBrokerHook(subscriptionManager, basicAuthManager, opts.PublicTopics, opts.ProtectedTopics) + if err != nil { + return nil, ierrors.Errorf("failed to create broker hook: %w", err) } - server.Events.OnUnsubscribe = func(topic string, cl events.Client) { - s.Unsubscribe(cl.ID, topic) + if err := server.AddHook(brokerHook, nil); err != nil { + return nil, ierrors.Errorf("failed to add broker hook for subscriptions: %w", err) } return &broker{ server: server, opts: opts, - subscriptionManager: s, + subscriptionManager: subscriptionManager, unhook: unhook, }, nil } @@ -182,12 +179,13 @@ func (b *broker) HasSubscribers(topic string) bool { // Send publishes a message. func (b *broker) Send(topic string, payload []byte) error { - return b.server.Publish(topic, payload, false) + // we publish all messages with QoS 0 ("fire and forget") + return b.server.Publish(topic, payload, false, 0) } // SystemInfo returns the metrics of the broker. func (b *broker) SystemInfo() *system.Info { - return b.server.System + return b.server.Info.Clone() } // SubscribersSize returns the size of the underlying map of the SubscriptionManager. diff --git a/pkg/broker/broker_hook.go b/pkg/broker/broker_hook.go new file mode 100644 index 0000000..973b20b --- /dev/null +++ b/pkg/broker/broker_hook.go @@ -0,0 +1,181 @@ +package broker + +import ( + "bytes" + "regexp" + "strings" + + mqtt "github.com/mochi-mqtt/server/v2" + "github.com/mochi-mqtt/server/v2/packets" + + "github.com/iotaledger/hive.go/ierrors" + "github.com/iotaledger/hive.go/web/basicauth" + "github.com/iotaledger/hive.go/web/subscriptionmanager" +) + +// BrokerHook is the glue code between the mochi-mqtt server and inx-mqtt. +// It is responsible for keeping the subscription manager up to date. +// It also handles authorization and protection of topics. +// +//nolint:revive +type BrokerHook struct { + mqtt.HookBase + + subscriptionManager *subscriptionmanager.SubscriptionManager[string, string] + basicAuthManager *basicauth.BasicAuthManager + matchPublicTopics func(topic string) bool + matchProtectedTopics func(topic string) bool +} + +func NewBrokerHook(subscriptionManager *subscriptionmanager.SubscriptionManager[string, string], + basicAuthManager *basicauth.BasicAuthManager, + publicTopics []string, + protectedTopics []string) (*BrokerHook, error) { + + regexesPublicTopics, err := compileTopicsAsRegexes(publicTopics) + if err != nil { + return nil, ierrors.Wrap(err, "failed to compile public topics") + } + + regexesProtectedTopics, err := compileTopicsAsRegexes(protectedTopics) + if err != nil { + return nil, ierrors.Wrap(err, "failed to compile protected topics") + } + + matchPublicTopics := func(topic string) bool { + loweredTopic := strings.ToLower(topic) + + for _, reg := range regexesPublicTopics { + if reg.MatchString(loweredTopic) { + return true + } + } + + return false + } + + matchProtectedTopics := func(topic string) bool { + loweredTopic := strings.ToLower(topic) + + for _, reg := range regexesProtectedTopics { + if reg.MatchString(loweredTopic) { + return true + } + } + + return false + } + + return &BrokerHook{ + subscriptionManager: subscriptionManager, + basicAuthManager: basicAuthManager, + matchPublicTopics: matchPublicTopics, + matchProtectedTopics: matchProtectedTopics, + }, nil +} + +// ID returns the ID of the hook. +func (h *BrokerHook) ID() string { + return "iota-mqtt-broker-hook" +} + +// Provides indicates which methods a hook provides. The default is none - this method +// should be overridden by the embedding hook. +func (h *BrokerHook) Provides(b byte) bool { + //nolint:gocritic // false positive, the argument order is correct + return bytes.Contains([]byte{ + mqtt.OnConnectAuthenticate, + mqtt.OnACLCheck, + mqtt.OnSessionEstablished, + mqtt.OnDisconnect, + mqtt.OnSubscribed, + mqtt.OnUnsubscribed, + }, []byte{b}) +} + +// OnConnectAuthenticate returns true if the connecting client has rules which provide access +// in the auth ledger. +func (h *BrokerHook) OnConnectAuthenticate(cl *mqtt.Client, pk packets.Packet) bool { + username := string(cl.Properties.Username) + password := pk.Connect.Password + + // check if the given user exists in the users map + if !h.basicAuthManager.Exists(username) { + // if the user doesn't exist, the user is still allowed to connect, but without access to the protected topics. + return true + } + + // if the user exists in the users map, we need to check if the given password is correct, otherwise the user is not allowed to connect. + // All successfully connected users which are know in the users map automatically get elevated priviledges to access protected topics. + return h.basicAuthManager.VerifyUsernameAndPasswordBytes(username, password) +} + +// OnACLCheck returns true if the connecting client has matching read or write access to subscribe +// or publish to a given topic. +func (h *BrokerHook) OnACLCheck(cl *mqtt.Client, topic string, write bool) bool { + // clients are not allowed to write + if write { + return false + } + + // check if the topic is protected + if h.matchProtectedTopics(topic) { + // check if the client is authenticated + // if the client is not authenticated, it is not allowed to access protected topics + return h.basicAuthManager.Exists(string(cl.Properties.Username)) + } + + // allow everyone to read public topics + return h.matchPublicTopics(topic) +} + +// OnSessionEstablished is called when a new client establishes a session (after OnConnect). +func (h *BrokerHook) OnSessionEstablished(cl *mqtt.Client, _ packets.Packet) { + h.subscriptionManager.Connect(cl.ID) +} + +// OnDisconnect is called when a client is disconnected for any reason. +func (h *BrokerHook) OnDisconnect(cl *mqtt.Client, _ error, _ bool) { + h.subscriptionManager.Disconnect(cl.ID) +} + +// OnSubscribed is called when a client subscribes to one or more filters. +func (h *BrokerHook) OnSubscribed(cl *mqtt.Client, pk packets.Packet, _ []byte) { + for _, subscription := range pk.Filters { + h.subscriptionManager.Subscribe(cl.ID, subscription.Filter) + } +} + +// OnUnsubscribed is called when a client unsubscribes from one or more filters. +func (h *BrokerHook) OnUnsubscribed(cl *mqtt.Client, pk packets.Packet) { + for _, subscription := range pk.Filters { + h.subscriptionManager.Unsubscribe(cl.ID, subscription.Filter) + } +} + +func compileTopicAsRegex(topic string) *regexp.Regexp { + + r := regexp.QuoteMeta(topic) + r = strings.ReplaceAll(r, `\*`, "(.*?)") + r += "$" + + reg, err := regexp.Compile(r) + if err != nil { + return nil + } + + return reg +} + +func compileTopicsAsRegexes(topics []string) ([]*regexp.Regexp, error) { + regexes := make([]*regexp.Regexp, len(topics)) + for i, topic := range topics { + reg := compileTopicAsRegex(topic) + if reg == nil { + return nil, ierrors.Errorf("invalid topic in config: %s", topic) + } + regexes[i] = reg + } + + return regexes, nil +} diff --git a/pkg/broker/options.go b/pkg/broker/options.go index 0f1cad3..dd650b8 100644 --- a/pkg/broker/options.go +++ b/pkg/broker/options.go @@ -2,57 +2,65 @@ package broker // Options are options around the broker. type Options struct { - // BufferSize is the size of the client buffers in bytes. - BufferSize int - // BufferBlockSize is the size per client buffer R/W block in bytes. - BufferBlockSize int - - // MaxTopicSubscriptionsPerClient defines the maximum number of topic subscriptions per client before the client gets dropped (DOS protection). - MaxTopicSubscriptionsPerClient int - // TopicCleanupThresholdCount defines the number of deleted topics that trigger a garbage collection of the SubscriptionManager. - TopicCleanupThresholdCount int - // TopicCleanupThresholdRatio defines the ratio of subscribed topics to deleted topics that trigger a garbage collection of the SubscriptionManager. - TopicCleanupThresholdRatio float32 - // WebsocketEnabled defines whether to enable the websocket connection of the MQTT broker. WebsocketEnabled bool // WebsocketBindAddress defines the websocket bind address on which the MQTT broker listens on. WebsocketBindAddress string + // TCPEnabled defines whether to enable the TCP connection of the MQTT broker. TCPEnabled bool // TCPBindAddress defines the TCP bind address on which the MQTT broker listens on. TCPBindAddress string - - // TCPAuthEnabled defines whether to enable auth for TCP connections. - TCPAuthEnabled bool - // TCPAuthPasswordSalt is the auth salt used for hashing the passwords of the users. - TCPAuthPasswordSalt string - // TCPAuthUsers is the list of allowed users with their password+salt as a scrypt hash. - TCPAuthUsers map[string]string - // TCPTLSEnabled defines whether to enable TLS for TCP connections. TCPTLSEnabled bool // TCPTLSCertificatePath is the path to the certificate file (x509 PEM) for TCP connections with TLS. TCPTLSCertificatePath string // TCPTLSPrivateKeyPath is the path to the private key file (x509 PEM) for TCP connections with TLS. TCPTLSPrivateKeyPath string + + // AuthPasswordSalt is the auth salt used for hashing the passwords of the users. + AuthPasswordSalt string + // AuthUsers is the list of allowed users with their password+salt as a scrypt hash. + AuthUsers map[string]string + + // PublicTopics are the MQTT topics which can be subscribed to without authorization. Wildcards using * are allowed + PublicTopics []string + // ProtectedTopics are the MQTT topics which only can be subscribed to with valid authorization. Wildcards using * are allowed + ProtectedTopics []string + + // MaxTopicSubscriptionsPerClient defines the maximum number of topic subscriptions per client before the client gets dropped (DOS protection). + MaxTopicSubscriptionsPerClient int + // TopicCleanupThresholdCount defines the number of deleted topics that trigger a garbage collection of the SubscriptionManager. + TopicCleanupThresholdCount int + // TopicCleanupThresholdRatio defines the ratio of subscribed topics to deleted topics that trigger a garbage collection of the SubscriptionManager. + TopicCleanupThresholdRatio float32 + + // MaximumClientWritesPending specifies the maximum number of pending message writes for a client. + MaximumClientWritesPending int + // ClientWriteBufferSize specifies the size of the client write buffer. + ClientWriteBufferSize int + // ClientNetReadBufferSize specifies the size of the client read buffer. + ClientReadBufferSize int } var defaultOpts = []Option{ - WithBufferSize(0), - WithBufferBlockSize(0), - WithTopicCleanupThresholdCount(10000), - WithTopicCleanupThresholdRatio(1.0), WithWebsocketEnabled(true), WithWebsocketBindAddress("localhost:1888"), WithTCPEnabled(false), WithTCPBindAddress("localhost:1883"), - WithTCPAuthEnabled(false), - WithTCPAuthPasswordSalt("0000000000000000000000000000000000000000000000000000000000000000"), - WithTCPAuthUsers(map[string]string{}), WithTCPTLSEnabled(false), WithTCPTLSCertificatePath(""), WithTCPTLSPrivateKeyPath(""), + WithAuthPasswordSalt("0000000000000000000000000000000000000000000000000000000000000000"), + WithAuthUsers(map[string]string{}), + WithPublicTopics([]string{"*"}), + WithProtectedTopics([]string{}), + WithMaxTopicSubscriptionsPerClient(1000), + WithTopicCleanupThresholdCount(10000), + WithTopicCleanupThresholdRatio(1.0), + WithMaximumClientWritesPending(1024 * 8), + WithClientWriteBufferSize(1024 * 2), + WithClientReadBufferSize(1024 * 2), } // applies the given Option. @@ -71,107 +79,121 @@ func (bo *Options) ApplyOnDefault(opts ...Option) { // Option is a function which sets an option on a Options instance. type Option func(options *Options) -// WithBufferSize sets the size of the client buffers in bytes. -func WithBufferSize(bufferSize int) Option { +// WithWebsocketEnabled sets whether to enable the websocket connection of the MQTT broker. +func WithWebsocketEnabled(websocketEnabled bool) Option { return func(options *Options) { - options.BufferSize = bufferSize + options.WebsocketEnabled = websocketEnabled } } -// WithBufferBlockSize sets the size per client buffer R/W block in bytes. -func WithBufferBlockSize(bufferBlockSize int) Option { +// WithWebsocketBindAddress sets the websocket bind address on which the MQTT broker listens on. +func WithWebsocketBindAddress(websocketBindAddress string) Option { return func(options *Options) { - options.BufferBlockSize = bufferBlockSize + options.WebsocketBindAddress = websocketBindAddress } } -// WithMaxTopicSubscriptionsPerClient sets the maximum number of topic subscriptions per client before the client gets dropped (DOS protection). -func WithMaxTopicSubscriptionsPerClient(maxTopicSubscriptionsPerClient int) Option { +// WithTCPEnabled sets whether to enable the TCP connection of the MQTT broker. +func WithTCPEnabled(tcpEnabled bool) Option { return func(options *Options) { - options.MaxTopicSubscriptionsPerClient = maxTopicSubscriptionsPerClient + options.TCPEnabled = tcpEnabled } } -// WithTopicCleanupThresholdCount sets the number of deleted topics that trigger a garbage collection of the SubscriptionManager. -func WithTopicCleanupThresholdCount(topicCleanupThresholdCount int) Option { +// WithTCPBindAddress sets the TCP bind address on which the MQTT broker listens on. +func WithTCPBindAddress(tcpBindAddress string) Option { return func(options *Options) { - options.TopicCleanupThresholdCount = topicCleanupThresholdCount + options.TCPBindAddress = tcpBindAddress } } -// WithTopicCleanupThresholdRatio the ratio of subscribed topics to deleted topics that trigger a garbage collection of the SubscriptionManager. -func WithTopicCleanupThresholdRatio(topicCleanupThresholdRatio float32) Option { +// WithTCPTLSEnabled sets whether to enable TLS for TCP connections. +func WithTCPTLSEnabled(tcpTLSEnabled bool) Option { return func(options *Options) { - options.TopicCleanupThresholdRatio = topicCleanupThresholdRatio + options.TCPTLSEnabled = tcpTLSEnabled } } -// WithWebsocketEnabled sets whether to enable the websocket connection of the MQTT broker. -func WithWebsocketEnabled(websocketEnabled bool) Option { +// WithTCPTLSCertificatePath sets the path to the certificate file (x509 PEM) for TCP connections with TLS. +func WithTCPTLSCertificatePath(tcpTLSCertificatePath string) Option { return func(options *Options) { - options.WebsocketEnabled = websocketEnabled + options.TCPTLSCertificatePath = tcpTLSCertificatePath } } -// WithWebsocketBindAddress sets the websocket bind address on which the MQTT broker listens on. -func WithWebsocketBindAddress(websocketBindAddress string) Option { +// WithTCPTLSPrivateKeyPath sets the path to the private key file (x509 PEM) for TCP connections with TLS. +func WithTCPTLSPrivateKeyPath(tcpTLSPrivateKeyPath string) Option { return func(options *Options) { - options.WebsocketBindAddress = websocketBindAddress + options.TCPTLSPrivateKeyPath = tcpTLSPrivateKeyPath } } -// WithTCPEnabled sets whether to enable the TCP connection of the MQTT broker. -func WithTCPEnabled(tcpEnabled bool) Option { +// WithAuthPasswordSalt sets the auth salt used for hashing the passwords of the users. +func WithAuthPasswordSalt(tcpAuthPasswordSalt string) Option { return func(options *Options) { - options.TCPEnabled = tcpEnabled + options.AuthPasswordSalt = tcpAuthPasswordSalt } } -// WithTCPBindAddress sets the TCP bind address on which the MQTT broker listens on. -func WithTCPBindAddress(tcpBindAddress string) Option { +// WithAuthUsers sets the list of allowed users with their password+salt as a scrypt hash. +func WithAuthUsers(tcpAuthUsers map[string]string) Option { return func(options *Options) { - options.TCPBindAddress = tcpBindAddress + options.AuthUsers = tcpAuthUsers + } +} + +// WithPublicTopics sets the MQTT topics which can be subscribed to without authorization. Wildcards using * are allowed. +func WithPublicTopics(publicTopics []string) Option { + return func(options *Options) { + options.PublicTopics = publicTopics } } -// WithTCPAuthEnabled sets whether to enable auth for TCP connections. -func WithTCPAuthEnabled(tcpAuthEnabled bool) Option { +// WithProtectedTopics sets the MQTT topics which only can be subscribed to with valid authorization. Wildcards using * are allowed. +func WithProtectedTopics(protectedTopics []string) Option { return func(options *Options) { - options.TCPAuthEnabled = tcpAuthEnabled + options.ProtectedTopics = protectedTopics } } -// WithTCPAuthPasswordSalt sets the auth salt used for hashing the passwords of the users. -func WithTCPAuthPasswordSalt(tcpAuthPasswordSalt string) Option { +// WithMaxTopicSubscriptionsPerClient sets the maximum number of topic subscriptions per client before the client gets dropped (DOS protection). +func WithMaxTopicSubscriptionsPerClient(maxTopicSubscriptionsPerClient int) Option { return func(options *Options) { - options.TCPAuthPasswordSalt = tcpAuthPasswordSalt + options.MaxTopicSubscriptionsPerClient = maxTopicSubscriptionsPerClient } } -// WithTCPAuthUsers sets the list of allowed users with their password+salt as a scrypt hash. -func WithTCPAuthUsers(tcpAuthUsers map[string]string) Option { +// WithTopicCleanupThresholdCount sets the number of deleted topics that trigger a garbage collection of the SubscriptionManager. +func WithTopicCleanupThresholdCount(topicCleanupThresholdCount int) Option { return func(options *Options) { - options.TCPAuthUsers = tcpAuthUsers + options.TopicCleanupThresholdCount = topicCleanupThresholdCount } } -// WithTCPTLSEnabled sets whether to enable TLS for TCP connections. -func WithTCPTLSEnabled(tcpTLSEnabled bool) Option { +// WithTopicCleanupThresholdRatio the ratio of subscribed topics to deleted topics that trigger a garbage collection of the SubscriptionManager. +func WithTopicCleanupThresholdRatio(topicCleanupThresholdRatio float32) Option { return func(options *Options) { - options.TCPTLSEnabled = tcpTLSEnabled + options.TopicCleanupThresholdRatio = topicCleanupThresholdRatio } } -// WithTCPTLSCertificatePath sets the path to the certificate file (x509 PEM) for TCP connections with TLS. -func WithTCPTLSCertificatePath(tcpTLSCertificatePath string) Option { +// WithMaximumClientWritesPending specifies the maximum number of pending message writes for a client. +func WithMaximumClientWritesPending(maximumClientWritesPending int) Option { return func(options *Options) { - options.TCPTLSCertificatePath = tcpTLSCertificatePath + options.MaximumClientWritesPending = maximumClientWritesPending } } -// WithTCPTLSPrivateKeyPath sets the path to the private key file (x509 PEM) for TCP connections with TLS. -func WithTCPTLSPrivateKeyPath(tcpTLSPrivateKeyPath string) Option { +// WithClientWriteBufferSize specifies the size of the client write buffer. +func WithClientWriteBufferSize(clientWriteBufferSize int) Option { return func(options *Options) { - options.TCPTLSPrivateKeyPath = tcpTLSPrivateKeyPath + options.ClientWriteBufferSize = clientWriteBufferSize + } +} + +// WithClientReadBufferSize specifies the size of the client read buffer. +func WithClientReadBufferSize(clientReadBufferSize int) Option { + return func(options *Options) { + options.ClientReadBufferSize = clientReadBufferSize } } diff --git a/pkg/broker/tls.go b/pkg/broker/tls.go index 6a5d3f7..b9762e1 100644 --- a/pkg/broker/tls.go +++ b/pkg/broker/tls.go @@ -2,8 +2,9 @@ package broker import ( "crypto/tls" - "fmt" "os" + + "github.com/iotaledger/hive.go/ierrors" ) func NewTLSConfig(tcpTLSCertificatePath string, tcpTLSPrivateKeyPath string) (*tls.Config, error) { @@ -11,34 +12,34 @@ func NewTLSConfig(tcpTLSCertificatePath string, tcpTLSPrivateKeyPath string) (*t if _, err := os.Stat(tcpTLSCertificatePath); err != nil { if os.IsNotExist(err) { // file does not exist - return nil, fmt.Errorf("TCP TLS certificate file not found (%s)", tcpTLSCertificatePath) + return nil, ierrors.Errorf("TCP TLS certificate file not found (%s)", tcpTLSCertificatePath) } - return nil, fmt.Errorf("unable to check TCP TLS certificate file (%s): %w", tcpTLSCertificatePath, err) + return nil, ierrors.Errorf("unable to check TCP TLS certificate file (%s): %w", tcpTLSCertificatePath, err) } if _, err := os.Stat(tcpTLSPrivateKeyPath); err != nil { if os.IsNotExist(err) { // file does not exist - return nil, fmt.Errorf("TCP TLS private key file not found (%s)", tcpTLSPrivateKeyPath) + return nil, ierrors.Errorf("TCP TLS private key file not found (%s)", tcpTLSPrivateKeyPath) } - return nil, fmt.Errorf("unable to check TCP TLS private key file (%s): %w", tcpTLSPrivateKeyPath, err) + return nil, ierrors.Errorf("unable to check TCP TLS private key file (%s): %w", tcpTLSPrivateKeyPath, err) } tcpTLSCertificate, err := os.ReadFile(tcpTLSCertificatePath) if err != nil { - return nil, fmt.Errorf("unable to read TCP TLS certificate: %w", err) + return nil, ierrors.Errorf("unable to read TCP TLS certificate: %w", err) } tcpTLSPrivateKey, err := os.ReadFile(tcpTLSPrivateKeyPath) if err != nil { - return nil, fmt.Errorf("unable to read TCP TLS private key: %w", err) + return nil, ierrors.Errorf("unable to read TCP TLS private key: %w", err) } cert, err := tls.X509KeyPair(tcpTLSCertificate, tcpTLSPrivateKey) if err != nil { - return nil, fmt.Errorf("loading TCP TLS configuration failed: %w", err) + return nil, ierrors.Errorf("loading TCP TLS configuration failed: %w", err) } return &tls.Config{ diff --git a/pkg/mqtt/server.go b/pkg/mqtt/server.go index cd385cc..15fe724 100644 --- a/pkg/mqtt/server.go +++ b/pkg/mqtt/server.go @@ -7,7 +7,6 @@ import ( "sync" "time" - "github.com/pkg/errors" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -357,7 +356,7 @@ func (s *Server) startListenIfNeeded(ctx context.Context, grpcCall string, liste go func() { s.LogInfof("Listen to %s", grpcCall) - if err := listenFunc(sub.context); err != nil && !errors.Is(err, context.Canceled) { + if err := listenFunc(sub.context); err != nil && !ierrors.Is(err, context.Canceled) { s.LogErrorf("Finished listen to %s with error: %s", grpcCall, err.Error()) if status.Code(err) == codes.Unavailable && s.shutdownHandler != nil { s.shutdownHandler.SelfShutdown("INX became unavailable", true) diff --git a/pkg/testsuite/broker_mock.go b/pkg/testsuite/broker_mock.go index 30f1018..866d598 100644 --- a/pkg/testsuite/broker_mock.go +++ b/pkg/testsuite/broker_mock.go @@ -5,7 +5,7 @@ import ( "sync" "testing" - "github.com/mochi-co/mqtt/server/system" + "github.com/mochi-mqtt/server/v2/system" "github.com/stretchr/testify/require" "github.com/iotaledger/hive.go/web/subscriptionmanager" diff --git a/tools/gendoc/go.mod b/tools/gendoc/go.mod index 13899fe..2466217 100644 --- a/tools/gendoc/go.mod +++ b/tools/gendoc/go.mod @@ -5,7 +5,7 @@ go 1.21 replace github.com/iotaledger/inx-mqtt => ../../ require ( - github.com/iotaledger/hive.go/app v0.0.0-20231206114953-6a65a82e30ad + github.com/iotaledger/hive.go/app v0.0.0-20231207181026-f482ac139305 github.com/iotaledger/hive.go/apputils v0.0.0-20230829152614-7afc7a4d89b3 github.com/iotaledger/inx-mqtt v0.0.0-00010101000000-000000000000 ) @@ -33,17 +33,17 @@ require ( github.com/hashicorp/go-version v1.6.0 // indirect github.com/holiman/uint256 v1.2.4 // indirect github.com/iancoleman/orderedmap v0.3.0 // indirect - github.com/iotaledger/hive.go/constraints v0.0.0-20231206114953-6a65a82e30ad // indirect - github.com/iotaledger/hive.go/core v1.0.0-rc.3.0.20231206114953-6a65a82e30ad // indirect - github.com/iotaledger/hive.go/crypto v0.0.0-20231206114953-6a65a82e30ad // indirect - github.com/iotaledger/hive.go/ds v0.0.0-20231206114953-6a65a82e30ad // indirect - github.com/iotaledger/hive.go/ierrors v0.0.0-20231206114953-6a65a82e30ad // indirect - github.com/iotaledger/hive.go/lo v0.0.0-20231206114953-6a65a82e30ad // indirect - github.com/iotaledger/hive.go/logger v0.0.0-20231206114953-6a65a82e30ad // indirect - github.com/iotaledger/hive.go/runtime v0.0.0-20231206114953-6a65a82e30ad // indirect - github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20231206114953-6a65a82e30ad // indirect - github.com/iotaledger/hive.go/stringify v0.0.0-20231206114953-6a65a82e30ad // indirect - github.com/iotaledger/hive.go/web v0.0.0-20231130122510-e3dddb0214f0 // indirect + github.com/iotaledger/hive.go/constraints v0.0.0-20231207181026-f482ac139305 // indirect + github.com/iotaledger/hive.go/core v1.0.0-rc.3.0.20231207181026-f482ac139305 // indirect + github.com/iotaledger/hive.go/crypto v0.0.0-20231207181026-f482ac139305 // indirect + github.com/iotaledger/hive.go/ds v0.0.0-20231207181026-f482ac139305 // indirect + github.com/iotaledger/hive.go/ierrors v0.0.0-20231207181026-f482ac139305 // indirect + github.com/iotaledger/hive.go/lo v0.0.0-20231207181026-f482ac139305 // indirect + github.com/iotaledger/hive.go/logger v0.0.0-20231207181026-f482ac139305 // indirect + github.com/iotaledger/hive.go/runtime v0.0.0-20231207181026-f482ac139305 // indirect + github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20231207181026-f482ac139305 // indirect + github.com/iotaledger/hive.go/stringify v0.0.0-20231207181026-f482ac139305 // indirect + github.com/iotaledger/hive.go/web v0.0.0-20231207181026-f482ac139305 // indirect github.com/iotaledger/inx-app v1.0.0-rc.3.0.20231206124511-b78dc962031f // indirect github.com/iotaledger/inx/go v1.0.0-rc.2.0.20231206124145-f773dfe3927e // indirect github.com/iotaledger/iota.go/v4 v4.0.0-20231206123921-2af411eef0b5 // indirect @@ -57,12 +57,11 @@ require ( github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect - github.com/mochi-co/mqtt v1.3.2 // indirect + github.com/mochi-mqtt/server/v2 v2.4.2 // indirect github.com/mr-tron/base58 v1.2.0 // indirect github.com/pasztorpisti/qs v0.0.0-20171216220353-8d6c33ee906c // indirect github.com/pelletier/go-toml/v2 v2.1.0 // indirect - github.com/petermattis/goid v0.0.0-20231126143041-f558c26febf5 // indirect - github.com/pkg/errors v0.9.1 // indirect + github.com/petermattis/goid v0.0.0-20231207134359-e60b3f734c67 // indirect github.com/prometheus/client_golang v1.17.0 // indirect github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.45.0 // indirect diff --git a/tools/gendoc/go.sum b/tools/gendoc/go.sum index eb6ccd0..b3fa888 100644 --- a/tools/gendoc/go.sum +++ b/tools/gendoc/go.sum @@ -184,32 +184,32 @@ github.com/holiman/uint256 v1.2.4/go.mod h1:EOMSn4q6Nyt9P6efbI3bueV4e1b3dGlUCXei github.com/iancoleman/orderedmap v0.3.0 h1:5cbR2grmZR/DiVt+VJopEhtVs9YGInGIxAoMJn+Ichc= github.com/iancoleman/orderedmap v0.3.0/go.mod h1:XuLcCUkdL5owUCQeF2Ue9uuw1EptkJDkXXS7VoV7XGE= github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w= -github.com/iotaledger/hive.go/app v0.0.0-20231206114953-6a65a82e30ad h1:v7dkbVLSsmzgOWT2vjvv1MdKQXvqFbvIkx8mvh6VK7g= -github.com/iotaledger/hive.go/app v0.0.0-20231206114953-6a65a82e30ad/go.mod h1:hTHKGFbZnuiW8yEgDuuL7ZjQTCnl8bXyHLmj3LPa648= +github.com/iotaledger/hive.go/app v0.0.0-20231207181026-f482ac139305 h1:NPeZUkOI0xTKnuWefzcLRKfHr/shXmfoXDsfqXq8P0Q= +github.com/iotaledger/hive.go/app v0.0.0-20231207181026-f482ac139305/go.mod h1:hTHKGFbZnuiW8yEgDuuL7ZjQTCnl8bXyHLmj3LPa648= github.com/iotaledger/hive.go/apputils v0.0.0-20230829152614-7afc7a4d89b3 h1:4aVJTc0KS77uEw0Tny4r0n1ORwcbAQDECaCclgf/6lE= github.com/iotaledger/hive.go/apputils v0.0.0-20230829152614-7afc7a4d89b3/go.mod h1:TZeAqieDu+xDOZp2e9+S+8pZp1PrfgcwLUnxmd8IgLU= -github.com/iotaledger/hive.go/constraints v0.0.0-20231206114953-6a65a82e30ad h1:4XL7IIvdsWHxSKQfU+sgq3H9egN54053LF9TwMfDcTg= -github.com/iotaledger/hive.go/constraints v0.0.0-20231206114953-6a65a82e30ad/go.mod h1:dOBOM2s4se3HcWefPe8sQLUalGXJ8yVXw58oK8jke3s= -github.com/iotaledger/hive.go/core v1.0.0-rc.3.0.20231206114953-6a65a82e30ad h1:iNzb/Oy/nucIOXOzRcwSqqFsaeKwr2JZpZYSLp8xjlE= -github.com/iotaledger/hive.go/core v1.0.0-rc.3.0.20231206114953-6a65a82e30ad/go.mod h1:CO28KMA6Pp5LJPiigPQQ276zQofES+jMod08U5pyRFA= -github.com/iotaledger/hive.go/crypto v0.0.0-20231206114953-6a65a82e30ad h1:pUL2UZbF4S8FIV7uKo9p+IGfZ658K1VNorQ6rzDMRvs= -github.com/iotaledger/hive.go/crypto v0.0.0-20231206114953-6a65a82e30ad/go.mod h1:7vHoF//1Pt3nu0l8nDIw7bEgv2GfbL3kSgjp7Rdqhd4= -github.com/iotaledger/hive.go/ds v0.0.0-20231206114953-6a65a82e30ad h1:adLrD6dOEkM5Xdg6AOPt9/HYqy/pQ5FrprDpW4/VqUU= -github.com/iotaledger/hive.go/ds v0.0.0-20231206114953-6a65a82e30ad/go.mod h1:NmZRIoxtL6iQdVK6n5W+JOx58K/0Yn8k7WuSvpKPQ+M= -github.com/iotaledger/hive.go/ierrors v0.0.0-20231206114953-6a65a82e30ad h1:WDl58zJKHfwbzHs+ZB8Jq3YNgVQE5Neu2NeaX3FZuyU= -github.com/iotaledger/hive.go/ierrors v0.0.0-20231206114953-6a65a82e30ad/go.mod h1:HcE8B5lP96enc/OALTb2/rIIi+yOLouRoHOKRclKmC8= -github.com/iotaledger/hive.go/lo v0.0.0-20231206114953-6a65a82e30ad h1:qpCsjw+InLL824QPu3lY/osck4DhucBKhCs5/E8OH+A= -github.com/iotaledger/hive.go/lo v0.0.0-20231206114953-6a65a82e30ad/go.mod h1:ETXGXymFyNcUq2t4I9e7ZK18f9bxUWYat4pjZ9W0rWc= -github.com/iotaledger/hive.go/logger v0.0.0-20231206114953-6a65a82e30ad h1:fazCxogqOLDEPNDPWYDLTDpYmwgTJgIaC2Z6VN52S4M= -github.com/iotaledger/hive.go/logger v0.0.0-20231206114953-6a65a82e30ad/go.mod h1:hVaVODS+Uik0obf3SVEHFQNruUko/uqIgD/GKwhn49M= -github.com/iotaledger/hive.go/runtime v0.0.0-20231206114953-6a65a82e30ad h1:HpupWK8iqFt+Sdogkh2/N8ojalmevYy+FzhjOuy7Y7E= -github.com/iotaledger/hive.go/runtime v0.0.0-20231206114953-6a65a82e30ad/go.mod h1:Z9NFsByMh1Kf98f3v3ifeZRycbS2db1hjswTQG1MxnE= -github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20231206114953-6a65a82e30ad h1:c8uwbBZDqpiCNN9/9Jji7Z4lL0GdVnORp8WMouiuknk= -github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20231206114953-6a65a82e30ad/go.mod h1:FoH3T6yKlZJp8xm8K+zsQiibSynp32v21CpWx8xkek8= -github.com/iotaledger/hive.go/stringify v0.0.0-20231206114953-6a65a82e30ad h1:VC3OgdSbyngY7/gxVj66fKd/nGmN6P0/myr348nx7vA= -github.com/iotaledger/hive.go/stringify v0.0.0-20231206114953-6a65a82e30ad/go.mod h1:FTo/UWzNYgnQ082GI9QVM9HFDERqf9rw9RivNpqrnTs= -github.com/iotaledger/hive.go/web v0.0.0-20231130122510-e3dddb0214f0 h1:bIfWp0AOQXMv3VkDAOQwqvMYLz1Ila1Z2hqiY2RJ3Io= -github.com/iotaledger/hive.go/web v0.0.0-20231130122510-e3dddb0214f0/go.mod h1:L/CLz7skt9dvidhBOw2gmMGhmrUBHXlA0b3paugdsE4= +github.com/iotaledger/hive.go/constraints v0.0.0-20231207181026-f482ac139305 h1:ar+IWfqO7B1M5+kuKGUJnfg0i/YuuM1oN5i8byp/F7A= +github.com/iotaledger/hive.go/constraints v0.0.0-20231207181026-f482ac139305/go.mod h1:dOBOM2s4se3HcWefPe8sQLUalGXJ8yVXw58oK8jke3s= +github.com/iotaledger/hive.go/core v1.0.0-rc.3.0.20231207181026-f482ac139305 h1:bKDeNAB2zm5mDJgmyRiHBbekkg9OtnfYVGKKRqKyLKk= +github.com/iotaledger/hive.go/core v1.0.0-rc.3.0.20231207181026-f482ac139305/go.mod h1:CO28KMA6Pp5LJPiigPQQ276zQofES+jMod08U5pyRFA= +github.com/iotaledger/hive.go/crypto v0.0.0-20231207181026-f482ac139305 h1:OR2TClxTtst906F4tok9xzhBTKO81qrUFdxIAoaZVvE= +github.com/iotaledger/hive.go/crypto v0.0.0-20231207181026-f482ac139305/go.mod h1:7vHoF//1Pt3nu0l8nDIw7bEgv2GfbL3kSgjp7Rdqhd4= +github.com/iotaledger/hive.go/ds v0.0.0-20231207181026-f482ac139305 h1:07ujwFv6qWLMRm32ZWZqMpgEugtsIoJ8vCmsn+vpO0E= +github.com/iotaledger/hive.go/ds v0.0.0-20231207181026-f482ac139305/go.mod h1:NmZRIoxtL6iQdVK6n5W+JOx58K/0Yn8k7WuSvpKPQ+M= +github.com/iotaledger/hive.go/ierrors v0.0.0-20231207181026-f482ac139305 h1:v7/zMhNcr6hibXFZXZ4xV/S27ESUytQFgUQ1oo10iic= +github.com/iotaledger/hive.go/ierrors v0.0.0-20231207181026-f482ac139305/go.mod h1:HcE8B5lP96enc/OALTb2/rIIi+yOLouRoHOKRclKmC8= +github.com/iotaledger/hive.go/lo v0.0.0-20231207181026-f482ac139305 h1:zxVbTEWutMvZhS0VLu/OmBk2WpMjrXQ7l67VBwsExtc= +github.com/iotaledger/hive.go/lo v0.0.0-20231207181026-f482ac139305/go.mod h1:ETXGXymFyNcUq2t4I9e7ZK18f9bxUWYat4pjZ9W0rWc= +github.com/iotaledger/hive.go/logger v0.0.0-20231207181026-f482ac139305 h1:ZuOnh3vNZqamoN1ibSj0ZMzIWsYZi6fpBp7BhTi2Qf4= +github.com/iotaledger/hive.go/logger v0.0.0-20231207181026-f482ac139305/go.mod h1:hVaVODS+Uik0obf3SVEHFQNruUko/uqIgD/GKwhn49M= +github.com/iotaledger/hive.go/runtime v0.0.0-20231207181026-f482ac139305 h1:7CW1/EbG+RvkjbyOf6JA1u1feax/cpex/6a8CLbaA4I= +github.com/iotaledger/hive.go/runtime v0.0.0-20231207181026-f482ac139305/go.mod h1:Z9NFsByMh1Kf98f3v3ifeZRycbS2db1hjswTQG1MxnE= +github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20231207181026-f482ac139305 h1:p79SQs2hE6Mo/MNEfEUfrKTY7gWp6c3oUeqK6I+o1lQ= +github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20231207181026-f482ac139305/go.mod h1:FoH3T6yKlZJp8xm8K+zsQiibSynp32v21CpWx8xkek8= +github.com/iotaledger/hive.go/stringify v0.0.0-20231207181026-f482ac139305 h1:KjbaklWvZb4zIcXBETHzl6XFTAf8wtAlFDfaF0Z1Daw= +github.com/iotaledger/hive.go/stringify v0.0.0-20231207181026-f482ac139305/go.mod h1:FTo/UWzNYgnQ082GI9QVM9HFDERqf9rw9RivNpqrnTs= +github.com/iotaledger/hive.go/web v0.0.0-20231207181026-f482ac139305 h1:dVzMN31qnSSvxTFzdkq93YNkVOFYsXqvy1Pqx5+OQTg= +github.com/iotaledger/hive.go/web v0.0.0-20231207181026-f482ac139305/go.mod h1:0ke/puDLuEQptxeUiMaCayFFg6Bqwtcis4o0i6C9uxI= github.com/iotaledger/inx-app v1.0.0-rc.3.0.20231206124511-b78dc962031f h1:V68Ijq1A64gB9r0Rhc4ybLGH66rXqZ2Ly0L4uuaLrMg= github.com/iotaledger/inx-app v1.0.0-rc.3.0.20231206124511-b78dc962031f/go.mod h1:Dy3Gv4Dn1zufB177x6IXETP3zTeiWQ1+HMVQR0Bt/ew= github.com/iotaledger/inx/go v1.0.0-rc.2.0.20231206124145-f773dfe3927e h1:jbtiUlmTpTdGiRBW1pniPSqRcDMJaIW8fGS+uORryas= @@ -281,8 +281,8 @@ github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RR github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= -github.com/mochi-co/mqtt v1.3.2 h1:cRqBjKdL1yCEWkz/eHWtaN/ZSpkMpK66+biZnrLrHC8= -github.com/mochi-co/mqtt v1.3.2/go.mod h1:o0lhQFWL8QtR1+8a9JZmbY8FhZ89MF8vGOGHJNFbCB8= +github.com/mochi-mqtt/server/v2 v2.4.2 h1:x7xC41Qn/ek1hOWNcZraRm+Cmqc2yrfhD5VA1NFnXhc= +github.com/mochi-mqtt/server/v2 v2.4.2/go.mod h1:M1lZnLbyowXUyQBIlHYlX1wasxXqv/qFWwQxAzfphwA= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= @@ -303,12 +303,11 @@ github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAv github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o= -github.com/petermattis/goid v0.0.0-20231126143041-f558c26febf5 h1:+qIP3OMrT7SN5kLnTcVEISPOMB/97RyAKTg1UWA738E= -github.com/petermattis/goid v0.0.0-20231126143041-f558c26febf5/go.mod h1:pxMtw7cyUw6B2bRH0ZBANSPg+AoSud1I1iyJHI69jH4= +github.com/petermattis/goid v0.0.0-20231207134359-e60b3f734c67 h1:jik8PHtAIsPlCRJjJzl4udgEf7hawInF9texMeO2jrU= +github.com/petermattis/goid v0.0.0-20231207134359-e60b3f734c67/go.mod h1:pxMtw7cyUw6B2bRH0ZBANSPg+AoSud1I1iyJHI69jH4= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=