Skip to content

Commit

Permalink
MQTT proof-of-concept for outgoing connections
Browse files Browse the repository at this point in the history
  • Loading branch information
rechrtb committed Nov 30, 2022
1 parent 30f839d commit f559125
Show file tree
Hide file tree
Showing 15 changed files with 537 additions and 15 deletions.
5 changes: 5 additions & 0 deletions .cproject
Original file line number Diff line number Diff line change
Expand Up @@ -1163,6 +1163,7 @@
<option id="gnu.c.compiler.option.misc.other.1176394435" name="Other flags" superClass="gnu.c.compiler.option.misc.other" useByScannerDiscovery="true" value="-c -mcpu=cortex-m4 -mthumb -mfpu=fpv4-sp-d16 -mfloat-abi=hard -mfp16-format=ieee -ffunction-sections -fdata-sections -nostdlib -Wundef -Wdouble-promotion -Werror=return-type -Werror=implicit -fsingle-precision-constant &quot;-Wa,-ahl=$*.s&quot;" valueType="string"/>
<option IS_BUILTIN_EMPTY="false" IS_VALUE_EMPTY="false" id="gnu.c.compiler.option.include.paths.468883064" name="Include paths (-I)" superClass="gnu.c.compiler.option.include.paths" useByScannerDiscovery="false" valueType="includePath">
<listOptionValue builtIn="false" value="&quot;${workspace_loc:/CoreN2G}&quot;"/>
<listOptionValue builtIn="false" value="&quot;${workspace_loc:/RRFLibraries/src/ThirdParty/MQTT-C/include}&quot;"/>
<listOptionValue builtIn="false" value="&quot;${workspace_loc:/CoreN2G/src}&quot;"/>
<listOptionValue builtIn="false" value="&quot;${workspace_loc:/CoreN2G/src//SAME5x_C21/SAME5x}&quot;"/>
<listOptionValue builtIn="false" value="&quot;${workspace_loc:/CoreN2G/src/atmel/SAME54_DFP/1.1.134/include}&quot;"/>
Expand All @@ -1174,6 +1175,7 @@
</option>
<option IS_BUILTIN_EMPTY="false" IS_VALUE_EMPTY="false" id="gnu.c.compiler.option.preprocessor.def.symbols.126387254" name="Defined symbols (-D)" superClass="gnu.c.compiler.option.preprocessor.def.symbols" useByScannerDiscovery="false" valueType="definedSymbols">
<listOptionValue builtIn="false" value="__SAME54P20A__"/>
<listOptionValue builtIn="false" value="MQTTC_PAL_FILE=${workspace_loc:/RepRapFirmware/src/Networking/MqttClient.h}"/>
<listOptionValue builtIn="false" value="RTOS"/>
<listOptionValue builtIn="false" value="DUET3MINI_V04"/>
<listOptionValue builtIn="false" value="noexcept="/>
Expand Down Expand Up @@ -1212,6 +1214,8 @@
<option id="gnu.cpp.compiler.option.other.other.1951413447" name="Other flags" superClass="gnu.cpp.compiler.option.other.other" useByScannerDiscovery="true" value="-c -mcpu=cortex-m4 -mthumb -mfpu=fpv4-sp-d16 -mfloat-abi=hard -mfp16-format=ieee -ffunction-sections -fdata-sections -fno-threadsafe-statics -fno-rtti -fexceptions -nostdlib -Wundef -Wdouble-promotion -Werror=return-type -Wsuggest-override -fsingle-precision-constant &quot;-Wa,-ahl=$*.s&quot; -fstack-usage" valueType="string"/>
<option IS_BUILTIN_EMPTY="false" IS_VALUE_EMPTY="false" id="gnu.cpp.compiler.option.include.paths.1562950186" name="Include paths (-I)" superClass="gnu.cpp.compiler.option.include.paths" useByScannerDiscovery="false" valueType="includePath">
<listOptionValue builtIn="false" value="&quot;${workspace_loc:/CoreN2G}&quot;"/>
<listOptionValue builtIn="false" value="&quot;${workspace_loc:/RRFLibraries/src/ThirdParty/MQTT-C/include}&quot;"/>
<listOptionValue builtIn="false" value="&quot;${workspace_loc:/RRFLibraries/MQTT}&quot;"/>
<listOptionValue builtIn="false" value="&quot;${workspace_loc:/RRFLibraries}&quot;"/>
<listOptionValue builtIn="false" value="&quot;${workspace_loc:/FreeRTOS/src/include}&quot;"/>
<listOptionValue builtIn="false" value="&quot;${workspace_loc:/FreeRTOS/src/portable/GCC/ARM_CM4F}&quot;"/>
Expand All @@ -1237,6 +1241,7 @@
</option>
<option IS_BUILTIN_EMPTY="false" IS_VALUE_EMPTY="false" id="gnu.cpp.compiler.option.preprocessor.def.1275759816" name="Defined symbols (-D)" superClass="gnu.cpp.compiler.option.preprocessor.def" useByScannerDiscovery="false" valueType="definedSymbols">
<listOptionValue builtIn="false" value="__SAME54P20A__"/>
<listOptionValue builtIn="false" value="MQTTC_PAL_FILE=${workspace_loc:/RepRapFirmware/src/Networking/MqttClient.h}"/>
<listOptionValue builtIn="false" value="RTOS"/>
<listOptionValue builtIn="false" value="DUET3MINI_V04"/>
<listOptionValue builtIn="false" value="_XOPEN_SOURCE"/>
Expand Down
4 changes: 4 additions & 0 deletions src/Config/Pins.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@
# define SUPPORT_TELNET HAS_NETWORKING
#endif

#ifndef SUPPORT_MQTT
# define SUPPORT_MQTT HAS_NETWORKING
#endif

#ifndef SUPPORT_MULTICAST_DISCOVERY
# define SUPPORT_MULTICAST_DISCOVERY 0
#endif
Expand Down
1 change: 1 addition & 0 deletions src/Config/Pins_Duet3Mini.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ constexpr uint32_t IAP_IMAGE_START = 0x20038000;
#define SUPPORT_OBJECT_MODEL 1
#define SUPPORT_FTP 1
#define SUPPORT_TELNET 1
#define SUPPORT_MQTT 1
#define SUPPORT_ASYNC_MOVES 1
#define SUPPORT_PROBE_POINTS_FILE 1

Expand Down
14 changes: 12 additions & 2 deletions src/GCodes/GCodes2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2028,6 +2028,10 @@ bool GCodes::HandleMcode(GCodeBuffer& gb, const StringRef& reply) THROWS(GCodeEx
type = Aux2Message;
break;
#endif

case 6: // MQTT
type = MqttMessage;
break;
default:
reply.printf("Invalid message type: %" PRIi32, param);
result = GCodeResult::error;
Expand Down Expand Up @@ -3787,11 +3791,17 @@ bool GCodes::HandleMcode(GCodeBuffer& gb, const StringRef& reply) THROWS(GCodeEx
{
const int port = (gb.Seen('R')) ? gb.GetIValue() : -1;
const int secure = (gb.Seen('T')) ? gb.GetIValue() : -1;

IPAddress ip;
if (gb.Seen('H'))

if (protocol == MqttProtocol)
{
gb.GetIPAddress(ip);
gb.MustSee('H');
{
gb.GetIPAddress(ip);
}
}

result = reprap.GetNetwork().EnableProtocol(interface, protocol, port,
ip.GetV4LittleEndian(), secure, reply);
}
Expand Down
8 changes: 8 additions & 0 deletions src/Networking/ESP8266WiFi/WiFiInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <Networking/HttpResponder.h>
#include <Networking/FtpResponder.h>
#include <Networking/TelnetResponder.h>
#include <Networking/MqttClient.h>
#include "WifiFirmwareUploader.h"
#include <General/IP4String.h>
#include "WiFiSocket.h"
Expand Down Expand Up @@ -444,6 +445,9 @@ void WiFiInterface::Connect(NetworkProtocol protocol) noexcept

switch(protocol)
{
case MqttProtocol:
SendConnectCommand(portNumbers[protocol], protocol, ipAddresses[protocol]);
break;

default:
break;
Expand Down Expand Up @@ -476,6 +480,10 @@ void WiFiInterface::ShutdownProtocol(NetworkProtocol protocol) noexcept
TerminateSockets(portNumbers[protocol]);
break;

case MqttProtocol:
TerminateSockets(portNumbers[protocol], false);
break;

default:
break;
}
Expand Down
276 changes: 276 additions & 0 deletions src/Networking/MqttClient.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
/*
* MqttClient.cpp
*/
#include "MqttClient.h"

#if SUPPORT_MQTT

#include <Platform/Platform.h>
#include <Networking/Network.h>

#include "Socket.h"

#include "mqtt.h"

MqttClient::MqttClient(NetworkResponder *n, NetworkClient *c) noexcept : NetworkClient(n, c),
sendBuf(nullptr),
recvBuf(nullptr),
client(nullptr),
username(nullptr),
password(nullptr),
clientId(nullptr),
publishTopic(nullptr),
willClient(nullptr),
willTopic(nullptr),
willMessage(nullptr),
willMessageSize(0),
keepAlive(DefaultKeepAlive),
qos(QOS::QOS0),
retain(false),
dup(false)
{
client = new mqtt_client;
publishMutex.Create("MqttPublishBuffer");

sendBuf = new uint8_t[SendBufferSize];
recvBuf = new uint8_t[ReceiveBufferSize];

mqtt_init(client, skt, sendBuf, SendBufferSize, recvBuf, ReceiveBufferSize, PublishCallback);
}

bool MqttClient::Subscribe(const char *topic, QOS maxQos)
{
enum MQTTErrors err = mqtt_subscribe(client, topic, static_cast<int>(maxQos));
return err == MQTT_OK;
}

bool MqttClient::Unsubscribe(const char *topic)
{
enum MQTTErrors err = mqtt_unsubscribe(client, topic);
return err == MQTT_OK;
}

bool MqttClient::Publish(const char *topic, const void *message, size_t messageSize, QOS qos, bool retain, bool dup)
{
uint8_t flags = 0;

switch (qos)
{
case QOS::QOS0:
flags |= MQTT_PUBLISH_QOS_0;
break;

case QOS::QOS1:
flags |= MQTT_PUBLISH_QOS_1;
break;

case QOS::QOS2:
flags |= MQTT_PUBLISH_QOS_2;
break;

default:
break;
}

flags |= (retain) ? MQTT_PUBLISH_RETAIN : 0;
flags |= (dup) ? MQTT_PUBLISH_DUP : 0;

enum MQTTErrors err = mqtt_publish(client, topic, message, messageSize, flags);
return err == MQTT_OK;
}

bool MqttClient::Spin() noexcept
{
bool res = false;

if (skt && skt->CanSend())
{
switch (responderState)
{
case ResponderState::connecting:
{
mqtt_sync(client);

if (client->typical_response_time >= 0.0f) // ack was received
{
if (client->error == MQTT_OK)
{
responderState = ResponderState::connected;
}
else
{
// Re-attempt MQTT connection
Connect();
}

// Since at this stage we only expect ack (or some error), only work is done
// when it is actually received and processed.
res = true;
}
}
break;

case ResponderState::connected:
{
// Save the current receive buffer, so it can be checked if
// it moved forward (received something).
uint8_t *curr = client->recv_buffer.curr;

// Process the socket receive buffer
mqtt_sync(client);

bool ok = true, sent = false;
{
MutexLocker lock(publishMutex);

while (outBuf && ok)
{
ok = Publish(publishTopic ? publishTopic : reprap.GetNetwork().GetHostname(),
outBuf->Data(),outBuf->DataLength(), qos, retain, dup);
if (ok)
{
outBuf = OutputBuffer::Release(outBuf);
sent = true;
}
}
}

if (sent)
{
// Buffer the messages in the socket for the next loop
mqtt_sync(client);
}

// Did work if processed or sent something
res = (client->recv_buffer.curr > curr || sent);
}
break;

default:
break;
}
}
else
{
if (responderState == ResponderState::connecting || responderState == ResponderState::connected)
{
// Lost the connection
debugPrintf("connection lost\n");
ConnectionLost();
}
}

return res;
}

void MqttClient::Connect()
{
mqtt_reinit(client, skt, sendBuf, SendBufferSize, recvBuf, ReceiveBufferSize);
mqtt_connect(client, clientId, willTopic, willMessage, willMessageSize, username, password, MQTT_CONNECT_CLEAN_SESSION, keepAlive);
// Buffer the CONNECT message, so the next mqtt_sync can process CONNACK if available.
mqtt_sync(client);
responderState = ResponderState::connecting;
}

bool MqttClient::Accept(Socket *s, NetworkProtocol protocol) noexcept
{
if (responderState == ResponderState::free && HandlesProtocol(protocol) && s->GetInterface() == interface)
{
skt = s;
Connect();
return true;
}

return false;
}

void MqttClient::Disable() noexcept
{
MutexLocker lock(MqttClient::Instance()->publishMutex);
OutputBuffer::ReleaseAll(MqttClient::Instance()->outBuf);
}

bool MqttClient::HandlesProtocol(NetworkProtocol protocol) noexcept
{
return protocol == MqttProtocol;
}

void MqttClient::Terminate(NetworkProtocol protocol, NetworkInterface *interface) noexcept
{
if (responderState != ResponderState::free && (HandlesProtocol(protocol) || protocol == AnyProtocol) && skt != nullptr && GetInterface() == interface)
{
ConnectionLost();
return;
}
}

void MqttClient::Diagnostics(MessageType mt) const noexcept
{
GetPlatform().MessageF(mt, " MQTT(%d)", (int)responderState);
}

/*static*/ void MqttClient::QueueMessage(const char *msg) noexcept
{
if (strlen(msg) < OUTPUT_BUFFER_SIZE - 1)
{
MutexLocker lock(MqttClient::Instance()->publishMutex);
OutputBuffer *buf = nullptr;
if (OutputBuffer::Allocate(buf))
{
buf->copy(msg);

if (MqttClient::Instance()->outBuf)
{
MqttClient::Instance()->outBuf->Append(buf);
}
else
{
MqttClient::Instance()->outBuf = buf;
}
}
};
}

/*static*/ void MqttClient::PublishCallback(void** state, struct mqtt_response_publish *published)
{
debugPrintf("Received message from topic '%s':\n %s \n",
(const char*) published->topic_name, (const char*) published->application_message);
}

/*static*/ MqttClient *MqttClient::instance;

/*
* MQTT-C PAL layer implementation
*/

ssize_t mqtt_pal_sendall(mqtt_pal_socket_handle fd, const void* buf, size_t len, int flags)
{
Socket* skt = static_cast<Socket*>(fd);

size_t res = 0;

if (len)
{
res = skt->Send((const uint8_t*)buf, len);
}

return res;
}

ssize_t mqtt_pal_recvall(mqtt_pal_socket_handle fd, void* buf, size_t bufsz, int flags)
{
Socket* skt = static_cast<Socket*>(fd);

size_t read = 0;
char c = 0;

while (read < bufsz && skt->ReadChar(c))
{
((char*) buf)[read] = c;
read++;
}

return read;
}

#endif
Loading

0 comments on commit f559125

Please sign in to comment.