diff --git a/Scripts/MQTTDemo/README.md b/Scripts/MQTTDemo/README.md deleted file mode 100644 index 3f25da4bab..0000000000 --- a/Scripts/MQTTDemo/README.md +++ /dev/null @@ -1,154 +0,0 @@ - -# Requirements - -- [Python](https://www.python.org/downloads/) - runs the demo script, `echo.py`, which runs broker and `echo` client; additional pip dependencies: - - [Paho](https://www.eclipse.org/paho/index.php?page=clients/python/index.php) - used to implement the `echo` MQTT client - - [Colorama](https://pypi.org/project/colorama/) - used to colorize log output of `echo.py` - -- [Mosquitto](https://mosquitto.org/download/) - MQTT broker used in the demo - -# Overview - -## Setup - -The demo has three components: the MQTT broker, the `echo` MQTT client and the RRF MQTT client. - -The RRF MQTT client publishes message sent via `M118` under a topic `topic-duet`. -The `echo` MQTT client is subscribed to this topic, which retransmits the message under the topic `topic-echo`. Since the RRF MQTT client in turn is subscribed to this topic, it receives and displays the retransmitted message. - -## Broker Configuration - -Broker configuration can be found in [mosquitto.conf](./mosquitto.conf). This configuration: -- disallows anonymous clients, allowing only clients with authentication credentials to connect -- specifies the password file, [passwords.txt](./passwords.txt), whose contents are the allowed client usernames and the corresponding password hashes -- runs the MQTT broker on a different port, 1884 instead of the typical 1883 - -## Password File - -The clear text contents of the password file [passwords.txt](./passwords.txt) are as follows: - -``` -test-echo:test-echo-pswd -test-duet:test-duet-pswd -``` - -Running the command `mosquitto_passwd -U passwords.txt` on the clear text contents will replace the password part (the text after the colon on each row) with its hash. - -The first row are the credentials for the `echo` client; the second are the credentials for the RRF MQTT client. - - -# Running the Demo - -## Host - -Open a command line/terminal and `cd` into this directory, then run the command below. - -``` -python echo.py -``` - -Running the [GCode commands on RepRapFirmware](#reprapfirmware), a log similar to the following one should be seen. - -``` -1671016326: mosquitto version 2.0.15 starting -1671016326: Config loaded from mosquitto.conf. -1671016326: Opening ipv4 listen socket on port 1884. -1671016326: Opening ipv6 listen socket on port 1884. -1671016326: mosquitto version 2.0.15 running -echo: connecting to 'localhost' on port '1884' as 'echo'... -1671016327: New connection from ::1:39785 on port 1884. -1671016327: New client connected from ::1:39785 as echo (p2, c1, k60, u'test-echo'). -1671016327: No will message specified. -1671016327: Sending CONNACK to echo (0, 0) -echo: connect succeeded, subscribing to topic 'topic-duet'... -1671016327: Received SUBSCRIBE from echo -1671016327: topic-duet (QoS 0) -1671016327: echo 0 topic-duet -1671016327: Sending SUBACK to echo -echo: subscribe succeeded, waiting for messages... -1671016359: New connection from 192.168.10.125:60423 on port 1884. -1671016359: New client connected from 192.168.10.125:60423 as duet (p2, c1, k400, u'test-duet'). -1671016359: No will message specified. -1671016359: Sending CONNACK to duet (0, 0) -1671016359: Received SUBSCRIBE from duet -1671016359: topic-echo (QoS 0) -1671016359: duet 0 topic-echo -1671016359: Sending SUBACK to duet -1671016365: Received PUBLISH from duet (d0, q0, r0, m0, 'topic-duet', ... (12 bytes)) -1671016365: Sending PUBLISH to echo (d0, q0, r0, m0, 'topic-duet', ... (12 bytes)) -echo: received message with topic 'topic-duet': 'b'duet-message\n'', echoing... -echo: echo succeeded -1671016365: Received PUBLISH from echo (d0, q0, r0, m0, 'topic-echo', ... (12 bytes)) -1671016365: Sending PUBLISH to duet (d0, q0, r0, m0, 'topic-echo', ... (12 bytes)) -1671016372: Received DISCONNECT from duet -1671016372: Client duet disconnected. - -``` - - -## RepRapFirmware - -The WiFi interface must be configured and enabled. - -### Enable debugging messages (optional) - -``` -M111 P2 S1 -``` - -### Configure the MQTT client - -``` -M586.4 C"duet" -M586.4 U"test-duet" K"test-duet-pswd" -M586.4 P"topic-duet" D0 R0 Q0 -M586.4 S"topic-echo" Q0 -``` - -- `C` - Client ID -- `U`, `K` - Username and password () -- `S`, `Q` - Subcription topic and corresponding QOS -- `P`, `D`, `R`, `Q` - Publish settings: topic, duplicate flag, retain flag, QOS -### Enable the MQTT protocol - -``` -M586 P4 R1884 H192.168.10.244 S1 -``` - -### Publish a message via M118 - -``` -M118 P6 S"duet-message" -``` - -This message will be echoed back by the `echo` client, under topic `topic-echo`. -Since the RRF MQTT client is subscribed to this topic, it should receive that message: - -``` -Received message from topic 'topic-echo': 'duet-message' -``` - -### Disable the MQTT Protocol - -``` -M586 P4 S0 -``` -# Scenarios - - -1. MQTT protocol is enabled before `echo.py` can be started. This means the broker is not yet started when the RRF MQTT client attempts to connect. - - - This should be ok, as the MQTT client will attempt reconnection automatically. - -2. The running `echo.py` is terminated while RRF MQTT client is connected. - - - This should be ok, as the MQTT client will attempt reconnection automatically. - -3. MQTT protocol is configured and enabled before starting the network interface via `M552 S1`. - - - This should be ok, the MQTT client will only attempt connection once the network interface is active. - -4. MQTT client is configured while connecting/connected. - - - This is not ok, and should results in an error GCode result. Configuration is - only possible while the protocol is disabled. diff --git a/Scripts/MQTTDemo/echo.py b/Scripts/MQTTDemo/echo.py deleted file mode 100644 index aa9ef7a601..0000000000 --- a/Scripts/MQTTDemo/echo.py +++ /dev/null @@ -1,66 +0,0 @@ -# Runs the MQTT broker and echo client. See README.md for more information -# about the demonstration this Python script is a part of. - -import paho.mqtt.client as mqtt -import subprocess -import threading -import time - -from colorama import Fore -from colorama import Style - -HOST = "localhost" -PORT = 1884 -USERNAME = "test-echo" -PASSWORD = "test-echo-pswd" -CLIENT_ID = "echo" -SUBSCRIBE_TOPIC = "topic-duet" -PUBLISH_TOPIC = "topic-echo" - -def broker(): - # Start the Mosquitto broker in verbose mode, using the config file. - subprocess.run(["mosquitto", "-v", "-c", "mosquitto.conf"]) - -# The callback for when the client receives a CONNACK response from the server. -def on_connect(client, userdata, flags, rc): - if rc == 0: - print(f"{Fore.YELLOW}echo: connect succeeded, subscribing to topic '{SUBSCRIBE_TOPIC}'...{Style.RESET_ALL}") - client.subscribe(SUBSCRIBE_TOPIC) - else: - print(f"{Fore.YELLOW}echo: connect failed, result code: {rc}{Style.RESET_ALL}") - -# The callback for when a PUBLISH message is received from the server. -def on_message(client, userdata, msg): - print(f"{Fore.YELLOW}echo: received message with topic '{msg.topic}': '{msg.payload}', echoing... {Style.RESET_ALL}") - res = client.publish(PUBLISH_TOPIC, msg.payload, msg.qos, msg.retain) - - if res[0] == 0: - print(f"{Fore.YELLOW}echo: echo succeeded {Style.RESET_ALL}") - else: - print(f"{Fore.YELLOW}echo: echo failed, result code: ${res[0]}{Style.RESET_ALL}") - -def on_subscribe(client, userdata, mid, granted_qos): - print(f"{Fore.YELLOW}echo: subscribe succeeded, waiting for messages...{Style.RESET_ALL}") - -def echo(): - client = mqtt.Client(CLIENT_ID) - client.username_pw_set(USERNAME, PASSWORD) - - client.on_connect = on_connect - client.on_message = on_message - client.on_subscribe = on_subscribe - - print(f"{Fore.YELLOW}echo: connecting to '{HOST}' on port '{PORT}' as '{CLIENT_ID}'{Style.RESET_ALL}...") - client.connect(HOST, PORT) - client.loop_forever() - -def main(): - broker_thread = threading.Thread(target=broker) - echo_thread = threading.Thread(target=echo) - - broker_thread.start() - time.sleep(1) # make sure the broker is up - echo_thread.start() - -if __name__ == '__main__': - main() \ No newline at end of file diff --git a/Scripts/MQTTDemo/mosquitto.conf b/Scripts/MQTTDemo/mosquitto.conf deleted file mode 100644 index 1691259e8a..0000000000 --- a/Scripts/MQTTDemo/mosquitto.conf +++ /dev/null @@ -1,3 +0,0 @@ -allow_anonymous false -password_file passwords.txt -listener 1884 \ No newline at end of file diff --git a/Scripts/MQTTDemo/passwords.txt b/Scripts/MQTTDemo/passwords.txt deleted file mode 100644 index a182dfb913..0000000000 --- a/Scripts/MQTTDemo/passwords.txt +++ /dev/null @@ -1,2 +0,0 @@ -test-echo:$7$101$cZHHLpYfn5d4WH/6$NZYAZYWAMAoFyFpfGKHAUSCu/x9czfaLg+xO5WIaIOy7ap74srHX9dpf62zsKaTRdVbRViTqV3zvYsZXDmwplg== -test-duet:$7$101$bjDUJ7R4hobUf1Ph$b8zrtkAb9HxZR30HnGnV4NSGAqF5kD8whXepqlSMHRqZRa6vzuQ66XyjUuSrE4khm3+O4PdHA2xQ/WGEz74zHw== diff --git a/src/GCodes/GCodes2.cpp b/src/GCodes/GCodes2.cpp index 0458188df8..1aecc634cf 100644 --- a/src/GCodes/GCodes2.cpp +++ b/src/GCodes/GCodes2.cpp @@ -2079,14 +2079,32 @@ bool GCodes::HandleMcode(GCodeBuffer& gb, const StringRef& reply) THROWS(GCodeEx } #endif +#if SUPPORT_MQTT + if ((type & MqttMessage) && (result != GCodeResult::error)) + { + String topic; + gb.MustSee('T'); + gb.GetQuotedString(topic.GetRef()); + + bool seen = false; + + uint32_t qos = 0; + gb.TryGetLimitedUIValue('Q', qos, seen, 3); + + bool retain = 0; + gb.TryGetBValue('R', retain, seen); + + bool dup = 0; + gb.TryGetBValue('D', dup, seen); + + reprap.GetNetwork().MqttPublish(message.c_str(), topic.c_str(), qos, retain, dup); + } +#endif + if (result != GCodeResult::error) { // Append newline and send the message to the destinations, - // except for MqttMessage - if (type != MqttMessage) - { - message.cat('\n'); - } + message.cat('\n'); platform.Message(type, message.c_str()); } } diff --git a/src/Networking/MQTT/MqttClient.cpp b/src/Networking/MQTT/MqttClient.cpp index 1f6fd59c4f..b464fba320 100644 --- a/src/Networking/MQTT/MqttClient.cpp +++ b/src/Networking/MQTT/MqttClient.cpp @@ -16,17 +16,19 @@ #include "mqtt.h" MqttClient::MqttClient(NetworkResponder *n, NetworkClient *c) noexcept - : NetworkClient(n, c), - prevSub(nullptr), currSub(nullptr), currBuf(nullptr), messageTimer(0), next(clients) + : NetworkClient(n, c), prevSub(nullptr), currSub(nullptr), messageTimer(0) { - clients = this; - - publishMutex.Create("MqttPublishBuffer"); - - memset(sendBuf, 0, sizeof(sendBuf)); - memset(recvBuf, 0, sizeof(recvBuf)); - - mqtt_init(&client, skt, sendBuf, sizeof(sendBuf), recvBuf, sizeof(recvBuf), PublishCallback); + username = nullptr; + password = nullptr; + id = nullptr; + willTopic = nullptr; + willMessage = nullptr; + keepAlive = MqttClient::DefaultKeepAlive; + connectFlags = MQTT_CONNECT_CLEAN_SESSION; + subs = nullptr; + inited = false; + memset(sendBuf, 0, SendBufferSize); + memset(recvBuf, 0, ReceiveBufferSize); } bool MqttClient::Spin() noexcept @@ -59,7 +61,7 @@ bool MqttClient::Spin() noexcept { // Check if there is a queued CONNECT message struct mqtt_queued_message* msg = mqtt_mq_find(&client.mq, MQTT_CONTROL_CONNECT, NULL); - bool connecting = msg && msg->state != MQTT_QUEUED_COMPLETE; + bool connecting = (msg != nullptr); if (connecting) { @@ -91,7 +93,7 @@ bool MqttClient::Spin() noexcept { // Check if there is a queued SUBSCRIBE message struct mqtt_queued_message* msg = mqtt_mq_find(&client.mq, MQTT_CONTROL_SUBSCRIBE, NULL); - bool subscribing = (msg && msg->state != MQTT_QUEUED_COMPLETE); + bool subscribing = (msg != nullptr); if (client.error == MQTT_ERROR_SUBSCRIBE_FAILED) { @@ -134,7 +136,6 @@ bool MqttClient::Spin() noexcept else { // No more topics, prepare to publish messages - currBuf = nullptr; responderState = ResponderState::active; } } @@ -143,35 +144,20 @@ bool MqttClient::Spin() noexcept case ResponderState::active: { - MutexLocker lock(publishMutex); + struct mqtt_queued_message* msg = mqtt_mq_find(&client.mq, MQTT_CONTROL_PUBLISH, NULL); + bool publishing = (msg != nullptr); - if (currBuf) + if (publishing) { - outBuf = OutputBuffer::Release(currBuf); res = true; } - - currBuf = outBuf; - - if (currBuf) + else { - uint8_t flags = 0; - - // If not specified, publish under the hostname - const char *const topic = publishTopic ? publishTopic : reprap.GetNetwork().GetHostname(); - - flags |= (publishQos == 0) ? MQTT_PUBLISH_QOS_0 : - (publishQos == 1 ? MQTT_PUBLISH_QOS_1 : MQTT_PUBLISH_QOS_2); - flags |= (retain) ? MQTT_PUBLISH_RETAIN : 0; - flags |= (duplicate) ? MQTT_PUBLISH_DUP : 0; - - const MQTTErrors mqttErr = mqtt_publish(&client, topic, currBuf->Data(), currBuf->DataLength(), flags); - if (mqttErr == MQTT_ERROR_SEND_BUFFER_IS_FULL) - { - currBuf = nullptr; // retry to publish the same buffer on the next loop - } - else + // Workaround for a temporary bug where the error MQTT_ERROR_SEND_BUFFER_IS_FULL + // is not cleared even if the send buffer has been drained already. + if (err == MQTT_ERROR_SEND_BUFFER_IS_FULL) { + client.error = MQTT_OK; res = true; } } @@ -181,12 +167,12 @@ bool MqttClient::Spin() noexcept case ResponderState::disconnecting: { const mqtt_queued_message *const msg = mqtt_mq_find(&client.mq, MQTT_CONTROL_DISCONNECT, NULL); - const bool disconnecting = (msg != nullptr && msg->state != MQTT_QUEUED_COMPLETE); + const bool disconnecting = (msg != nullptr); // If received ACK for DISCONNECT regardless of result, or the time has expired. if (!disconnecting || millis() - messageTimer >= MqttClient::MessageTimeout) { - ConnectionLost(); + NetworkClient::Terminate(MqttProtocol, interface); if (reprap.Debug(Module::Webserver)) { debugPrintf("MQTT disconnected\n"); @@ -210,11 +196,37 @@ bool MqttClient::Accept(Socket *s) noexcept if (responderState == ResponderState::free) { skt = s; - mqtt_reinit(&client, skt, sendBuf, SendBufferSize, recvBuf, ReceiveBufferSize); - mqtt_connect(&client, id, willTopic, willMessage, strlen(willMessage), username, password, MQTT_CONNECT_CLEAN_SESSION, keepAlive); - responderState = ResponderState::connecting; - messageTimer = millis(); - return true; + + MQTTErrors err = MQTT_OK; + + if (inited) + { + mqtt_reinit(&client, skt, sendBuf, SendBufferSize, recvBuf, ReceiveBufferSize); + } + else + { + err = mqtt_init(&client, skt, sendBuf, SendBufferSize, recvBuf, ReceiveBufferSize, PublishCallback); + if (err == MQTT_OK) + { + inited = true; + } + } + + if (err == MQTT_OK) + { + err = mqtt_connect(&client, id, willTopic, willMessage, strlen(willMessage), username, password, connectFlags , keepAlive); + if (err == MQTT_OK) + { + responderState = ResponderState::connecting; + messageTimer = millis(); + return true; + } + } + + if (reprap.Debug(Module::Webserver)) + { + debugPrintf("Failed to start MQTT connection with error: %s\n", mqtt_error_str(err)); + } } return false; @@ -266,14 +278,10 @@ void MqttClient::ConnectionLost() noexcept /* static */ GCodeResult MqttClient::Configure(GCodeBuffer &gb, const StringRef& reply) THROWS(GCodeException) { - // Since the config is shared, make sure the protocol is not active on any interface. - for (MqttClient *c = clients; c != nullptr; c = c->next) + if (instance->responderState != ResponderState::free) { - if (c->responderState != ResponderState::free) - { - reply.copy("Unable to configure MQTT when active on an interface"); - return GCodeResult::error; - } + reply.copy("Unable to configure MQTT when active on an interface"); + return GCodeResult::error; } String param; @@ -306,7 +314,7 @@ void MqttClient::ConnectionLost() noexcept // Set username gb.GetQuotedString(param.GetRef()); - if (!setMemb(username)) + if (!setMemb(instance->username)) { return GCodeResult::error; } @@ -316,50 +324,61 @@ void MqttClient::ConnectionLost() noexcept if (gb.Seen('K')) { gb.GetQuotedString(param.GetRef()); - if (!setMemb(password)) + if (!setMemb(instance->password)) { - clearMemb(username); + clearMemb(instance->username); return GCodeResult::error; } } else { - clearMemb(password); + clearMemb(instance->password); } if (reprap.Debug(Module::Webserver)) { - debugPrintf("Username set to '%s'", username); - if (password) + debugPrintf("Username set to '%s'", instance->username); + if (instance->password) { - debugPrintf("with password '%s'", password); + debugPrintf("with password '%s'", instance->password); } debugPrintf("\n"); } } - if (gb.Seen('C')) + if (gb.Seen('C')) // Client ID { - // Set the client ID gb.GetQuotedString(param.GetRef()); - if (!setMemb(id)) + if (!setMemb(instance->id)) { return GCodeResult::error; } if (reprap.Debug(Module::Webserver)) { - debugPrintf("Client ID set to '%s'\n", id); + debugPrintf("Client ID set to '%s'\n", instance->id); } } - if (gb.Seen('W')) // Will message and topic + if (gb.Seen('W')) // LWT message, topic, retain and qos { // Set the will message gb.GetQuotedString(param.GetRef()); - if (!setMemb(willMessage)) + uint32_t qos = 0; + bool retain = 0; + bool seen = false; + + // Check qos + gb.TryGetLimitedUIValue('Q', qos, seen, 3); + + + // Check retain flag + gb.TryGetBValue('R', retain, seen); + + + if (!setMemb(instance->willMessage)) { return GCodeResult::error; } @@ -369,43 +388,60 @@ void MqttClient::ConnectionLost() noexcept gb.MustSee('T'); { gb.GetQuotedString(param.GetRef()); - if (!setMemb(willTopic)) + if (!setMemb(instance->willTopic)) { - clearMemb(willMessage); + clearMemb(instance->willMessage); return GCodeResult::error; } } + uint8_t flags = 0; + + switch (qos) + { + case 1: + flags |= MQTT_CONNECT_WILL_QOS_1; + break; + + case 2: + flags |= MQTT_CONNECT_WILL_QOS_2; + break; + + case 0: + default: + flags |= MQTT_CONNECT_WILL_QOS_0; + break; + } + + if (retain) + { + flags |= MQTT_CONNECT_WILL_RETAIN; + } + + // Create mask from relevant flags + static constexpr uint8_t mask = MQTT_CONNECT_WILL_QOS_0 | MQTT_CONNECT_WILL_QOS_1 | MQTT_CONNECT_WILL_QOS_2 | MQTT_CONNECT_WILL_RETAIN; + instance->connectFlags = (instance->connectFlags & ~mask) | (flags & mask); + if (reprap.Debug(Module::Webserver)) { - debugPrintf("Will message set to '%s'", willMessage); - if (willTopic) - { - debugPrintf("with topic '%s'", willTopic); - } - debugPrintf("\n"); + debugPrintf("Set will message '%s' with topic '%s', QOS=%lu, retain = %s\n", + instance->willMessage, instance->willTopic, qos, retain ? "true": "false"); } } - if (gb.Seen('S')) // Subscription + if (gb.Seen('S')) // Subscribe topic { gb.GetQuotedString(param.GetRef()); - // Check the QOS first - int qos = 0; - if (gb.Seen('Q')) - { - qos = gb.GetIValue(); - if (qos < 0 || qos > 2) - { - reply.copy("Invalid subscription QOS"); - return GCodeResult::badOrMissingParameter; - } - } + bool seen = false; + + // Check the max QOS first + uint32_t qos = 0; + gb.TryGetLimitedUIValue('O', qos, seen, 3); // Then check if the topic is already in the subscriptions, Subscription *sub; - for (sub = subs; sub != nullptr; sub = sub->next) + for (sub = instance->subs; sub != nullptr; sub = sub->next) { if (strcmp(sub->topic, param.c_str()) == 0) { @@ -417,6 +453,11 @@ void MqttClient::ConnectionLost() noexcept { // Just overwrite the existing QOS sub->qos = qos; + + if (reprap.Debug(Module::Webserver)) + { + debugPrintf("Subscription topic '%s' max QOS changed to %d \n", param.c_str(), qos); + } } else { @@ -426,7 +467,7 @@ void MqttClient::ConnectionLost() noexcept if (!sub) { - reply.copy("Unable to allocate mem"); + reply.copy("Unable to allocate memory"); return GCodeResult::error; } @@ -434,96 +475,66 @@ void MqttClient::ConnectionLost() noexcept sub->qos = qos; // Append to list of subscriptions - sub->next = subs; - subs = sub; + sub->next = instance->subs; + instance->subs = sub; if (reprap.Debug(Module::Webserver)) { - debugPrintf("MQTT added topic %s with QOS %d to subscriptions\n", param.c_str(), qos); + debugPrintf("Topic '%s' added with max QOS=%d to subscriptions\n", param.c_str(), qos); } } } - if (gb.Seen('P')) // Publish - { - gb.GetQuotedString(param.GetRef()); + return GCodeResult::ok; +} - if (!setMemb(publishTopic)) - { - return GCodeResult::error; - } +/* static */void MqttClient::Disable() noexcept +{ + // Nothing needed here +} - retain = duplicate = 0; - publishQos = 0; +/* static */void MqttClient::Publish(const char *msg, const char *topic, int qos, bool retain, bool dup) noexcept +{ + if (instance->responderState == ResponderState::active) + { + uint8_t flags = 0; - if (gb.Seen('R')) + switch (qos) { - retain = gb.GetIValue(); + case 1: + flags |= MQTT_PUBLISH_QOS_1; + break; + + case 2: + flags |= MQTT_PUBLISH_QOS_2; + break; + + case 0: + default: + flags |= MQTT_PUBLISH_QOS_0; + break; } - if (gb.Seen('D')) + if (retain) { - duplicate = gb.GetIValue(); + flags |= MQTT_PUBLISH_RETAIN; } - if (gb.Seen('Q')) + if (dup) { - int qos = gb.GetIValue(); - - if (qos < 0 || qos > 2) - { - reply.copy("Invalid publish QOS"); - return GCodeResult::badOrMissingParameter; - } - else - { - publishQos = qos; - if (reprap.Debug(Module::Webserver)) - { - debugPrintf("MQTT publish QOS set to %d\n", publishQos); - } - } + flags |= MQTT_PUBLISH_DUP; } - if (reprap.Debug(Module::Webserver)) + const MQTTErrors err = mqtt_publish(&instance->client, topic, msg, strlen(msg), flags); + + if (err != MQTT_OK) { - debugPrintf("Publish topic '%s', with settings duplicate = %d retain = %d qos = %d\n", publishTopic, retain, duplicate, publishQos); + GetPlatform().MessageF(UsbMessage, "Failed to publish MQTT message with error: %s\n", mqtt_error_str(err)); } } - - return GCodeResult::ok; -} - -/* static */void MqttClient::Disable() noexcept -{ - // Nothing needed here -} - -/* static */void MqttClient::Publish(const char *msg) noexcept -{ - for (MqttClient *c = clients; c != nullptr; c = c->next) + else { - if (c->responderState == ResponderState::active) - { - if (strlen(msg) < OUTPUT_BUFFER_SIZE - 1) - { - MutexLocker lock(c->publishMutex); - OutputBuffer *buf = nullptr; - if (OutputBuffer::Allocate(buf)) - { - buf->copy(msg); - - if (c->outBuf) - { - c->outBuf->Append(buf); - } - else - { - c->outBuf = buf; - } - } - } - }; + GetPlatform().MessageF(UsbMessage, "Failed to publish MQTT message, client not active\n"); } } @@ -537,20 +548,14 @@ void MqttClient::ConnectionLost() noexcept GetPlatform().MessageF(UsbMessage, "Received message from topic '%s': '%s'\n", topic, message); } +MqttClient *MqttClient::Init(NetworkResponder *n, NetworkClient *c) noexcept +{ + instance = new MqttClient(n, c); + return instance; +} + /* Static members */ -char *MqttClient::username = nullptr; -char *MqttClient::password = nullptr; -char *MqttClient::id = nullptr; -char *MqttClient::willTopic = nullptr; -char *MqttClient::willMessage = nullptr; -size_t MqttClient::keepAlive = MqttClient::DefaultKeepAlive; - -char *MqttClient::publishTopic = nullptr; -uint8_t MqttClient:: publishQos = 0; -bool MqttClient::duplicate = false; -bool MqttClient::retain = false; - -MqttClient::Subscription *MqttClient::subs = nullptr; -MqttClient *MqttClient::clients = nullptr; + +MqttClient *MqttClient::instance = nullptr; #endif diff --git a/src/Networking/MQTT/MqttClient.h b/src/Networking/MQTT/MqttClient.h index 59176ab8a5..b4a2be640b 100644 --- a/src/Networking/MQTT/MqttClient.h +++ b/src/Networking/MQTT/MqttClient.h @@ -18,25 +18,27 @@ class MqttClient : public NetworkClient { public: - MqttClient(NetworkResponder *n, NetworkClient *c) noexcept; + static MqttClient *Init(NetworkResponder *n, NetworkClient *c) noexcept; bool Spin() noexcept override; bool Accept(Socket *s) noexcept override; void Terminate() noexcept override; void Diagnostics(MessageType mtype) const noexcept override; - bool HandlesProtocol(NetworkProtocol p) noexcept override; static GCodeResult Configure(GCodeBuffer &gb, const StringRef& reply) THROWS(GCodeException); static void Disable() noexcept; - static void Publish(const char *msg) noexcept; + static void Publish(const char *msg, const char *topic, int qos, bool retain, bool dup) noexcept; private: - static const int SendBufferSize = 2048; - static const int ReceiveBufferSize = 1024; - static const size_t DefaultKeepAlive = 400; - static const size_t MessageTimeout = 5000; - static const size_t ReconnectCooldown = 1000; + MqttClient(NetworkResponder *n, NetworkClient *c) noexcept; + + static constexpr int SendBufferSize = 1024; + static constexpr int ReceiveBufferSize = 1024; + + static constexpr size_t DefaultKeepAlive = 400; + static constexpr size_t MessageTimeout = 5000; + static constexpr size_t ReconnectCooldown = 1000; struct Subscription { @@ -55,34 +57,28 @@ class MqttClient : public NetworkClient void ConnectionLost() noexcept override; static void PublishCallback(void** state, struct mqtt_response_publish *published); - mqtt_client client; uint8_t sendBuf[SendBufferSize]; uint8_t recvBuf[ReceiveBufferSize]; + mqtt_client client; Subscription *prevSub, *currSub; // Used for subscribing to topics - OutputBuffer *currBuf; // Current message being published uint32_t messageTimer; // General purpose variable for keeping track of queued messages timeout - Mutex publishMutex; - - MqttClient *next; static MqttClient *instance; - // MQTT configuration, shared by all MqttClient's - static char *username; - static char *password; - static char *id; - static char *willTopic; - static char *willMessage; - static size_t keepAlive; - static char *publishTopic; - static uint8_t publishQos; - static bool duplicate; - static bool retain; - static Subscription *subs; - - static MqttClient *clients; // List of all MQTT clients + // MQTT configuration, shared by all MqttClient + char *username; + char *password; + char *id; + char *willTopic; + char *willMessage; + Subscription *subs; + size_t keepAlive; + uint8_t connectFlags; + bool inited; + + NetworkInterface *enabledInterface; }; #endif // SUPPORT_MQTT diff --git a/src/Networking/MQTT/mqtt_pal.cpp b/src/Networking/MQTT/mqtt_pal.cpp index ff0ef87bac..ce2f21708f 100644 --- a/src/Networking/MQTT/mqtt_pal.cpp +++ b/src/Networking/MQTT/mqtt_pal.cpp @@ -5,6 +5,7 @@ * Author: rechrtb */ #include +#include #include #include "mqtt.h" @@ -53,4 +54,21 @@ ssize_t mqtt_pal_recvall(mqtt_pal_socket_handle fd, void* buf, size_t sz, int fl } return res; -} \ No newline at end of file +} + +void mqtt_pal_mutex_init(mqtt_pal_mutex_t *mutex) +{ + static const char name[] = "MQTTClient"; + mutex->m = new Mutex(); + mutex->m->Create(name); +} + +void mqtt_pal_mutex_lock(mqtt_pal_mutex_t *mutex) +{ + mutex->m->Take(); +} + +void mqtt_pal_mutex_unlock(mqtt_pal_mutex_t *mutex) +{ + mutex->m->Release(); +} diff --git a/src/Networking/MQTT/mqtt_pal.h b/src/Networking/MQTT/mqtt_pal.h index a085a99885..ea80906ce3 100644 --- a/src/Networking/MQTT/mqtt_pal.h +++ b/src/Networking/MQTT/mqtt_pal.h @@ -26,11 +26,14 @@ typedef time_t mqtt_pal_time_t; extern uint32_t millis() noexcept; #define MQTT_PAL_TIME() (millis() / 1000) -// Mutex - define to nothing; MQTT-C is used in a single thread context. -typedef int mqtt_pal_mutex_t; -#define MQTT_PAL_MUTEX_LOCK(x) -#define MQTT_PAL_MUTEX_INIT(x) -#define MQTT_PAL_MUTEX_UNLOCK(x) +typedef struct +{ + struct Mutex *m; +} mqtt_pal_mutex_t; + +#define MQTT_PAL_MUTEX_INIT(m) mqtt_pal_mutex_init(m) +#define MQTT_PAL_MUTEX_LOCK(m) mqtt_pal_mutex_lock(m) +#define MQTT_PAL_MUTEX_UNLOCK(m) mqtt_pal_mutex_unlock(m) // Byte order functions #define MQTT_PAL_HTONS(s) __builtin_bswap16(s) @@ -81,6 +84,10 @@ ssize_t mqtt_pal_sendall(mqtt_pal_socket_handle fd, const void* buf, size_t len, */ ssize_t mqtt_pal_recvall(mqtt_pal_socket_handle fd, void* buf, size_t bufsz, int flags); +void mqtt_pal_mutex_init(mqtt_pal_mutex_t *mutex); +void mqtt_pal_mutex_lock(mqtt_pal_mutex_t *mutex); +void mqtt_pal_mutex_unlock(mqtt_pal_mutex_t *mutex); + #ifdef __cplusplus } #endif diff --git a/src/Networking/Network.cpp b/src/Networking/Network.cpp index a431adfe5f..5facca11fe 100644 --- a/src/Networking/Network.cpp +++ b/src/Networking/Network.cpp @@ -226,6 +226,30 @@ GCodeResult Network::EnableProtocol(unsigned int interface, NetworkProtocol prot #if HAS_NETWORKING if (interface < GetNumNetworkInterfaces()) { + bool hasFree = false; + bool client = false; + // Check if there are enough clients to accomodate enabling the protocol. Check if + // a client is not yet associated with an interface, or there is already one on the same + // interface. + for (NetworkClient *c = clients; c != nullptr; c = c->GetNext()) + { + if (c->HandlesProtocol(protocol)) + { + hasFree |= c->GetInterface() == nullptr || c->GetInterface() == interfaces[interface]; + client |= true; + if (hasFree) + { + break; + } + } + } + + if (client && !hasFree) + { + reply.printf("No more instances for client protocol.\n"); + return GCodeResult::error; + } + return interfaces[interface]->EnableProtocol(protocol, port, ip, secure, reply); } @@ -493,10 +517,7 @@ void Network::Activate() noexcept #endif #if SUPPORT_MQTT - for (size_t i = 0; i < NumMqttClients; ++i) - { - responders = clients = new MqttClient(responders, clients); - } + responders = clients = MqttClient::Init(responders, clients); #endif // Finally, create the network task @@ -842,9 +863,9 @@ void Network::HandleTelnetGCodeReply(const char *msg) noexcept } #if SUPPORT_MQTT -void Network::MqttPublish(const char *msg) noexcept +void Network::MqttPublish(const char *msg, const char *topic, int qos, bool retain, bool dup) noexcept { - MqttClient::Publish(msg); + MqttClient::Publish(msg, topic, qos, retain, dup); } #endif diff --git a/src/Networking/Network.h b/src/Networking/Network.h index 08b44224b2..b030e666fa 100644 --- a/src/Networking/Network.h +++ b/src/Networking/Network.h @@ -25,12 +25,10 @@ const size_t MaxNetworkInterfaces = 1; #if SAME70 const size_t NumHttpResponders = 6; // the number of concurrent HTTP requests we can process const size_t NumTelnetResponders = 2; // the number of concurrent Telnet sessions we support -const size_t NumMqttClients = 1; // the number of concurrent MQTT clients #else // Limit the number of HTTP responders to 4 because they take around 2K of memory each const size_t NumHttpResponders = 4; // the number of concurrent HTTP requests we can process const size_t NumTelnetResponders = 1; // the number of concurrent Telnet sessions we support -const size_t NumMqttClients = 1; // the number of concurrent MQTT clients #endif // not SAME70 const size_t NumFtpResponders = 1; // the number of concurrent FTP sessions we support @@ -119,7 +117,7 @@ class Network INHERIT_OBJECT_MODEL void HandleTelnetGCodeReply(OutputBuffer *buf) noexcept; #if SUPPORT_MQTT - void MqttPublish(const char *msg) noexcept; + void MqttPublish(const char *msg, const char *topic, int qos, bool retain, bool dup) noexcept; #endif uint32_t GetHttpReplySeq() noexcept; diff --git a/src/Networking/NetworkClient.cpp b/src/Networking/NetworkClient.cpp index 42bcdcbebe..1e8cb11a8d 100644 --- a/src/Networking/NetworkClient.cpp +++ b/src/Networking/NetworkClient.cpp @@ -88,6 +88,7 @@ void NetworkClient::Terminate(NetworkProtocol protocol, NetworkInterface *iface) if ((HandlesProtocol(protocol) || protocol == AnyProtocol) && interface == iface) { Terminate(); + interface = nullptr; } } @@ -102,7 +103,6 @@ bool NetworkClient::Start() noexcept void NetworkClient::ConnectionLost() noexcept { NetworkResponder::ConnectionLost(); - interface = nullptr; } NetworkClient *NetworkClient::clients = nullptr; diff --git a/src/Platform/Platform.cpp b/src/Platform/Platform.cpp index 4f57729772..6500b766f5 100644 --- a/src/Platform/Platform.cpp +++ b/src/Platform/Platform.cpp @@ -3378,13 +3378,6 @@ void Platform::RawMessage(MessageType type, const char *_ecv_array message) noex reprap.GetNetwork().HandleTelnetGCodeReply(message); } -#if SUPPORT_MQTT - if ((type & MqttMessage) != 0) - { - reprap.GetNetwork().MqttPublish(message); - } -#endif - if ((type & Aux2Message) != 0) { AppendAuxReply(1, message, message[0] == '{' || (type & RawMessageFlag) != 0);