Skip to content

Commit

Permalink
Fixed connection timeout problem for VSCP MQTT client on slow connect…
Browse files Browse the repository at this point in the history
…ions
  • Loading branch information
grodansparadis committed Feb 10, 2025
1 parent 3bc434b commit d59ae90
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 24 deletions.
104 changes: 80 additions & 24 deletions src/vscp/common/vscp-client-mqtt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ mqtt_on_connect(struct mosquitto *mosq, void *pData, int rv)
vscpClientMqtt *pClient = reinterpret_cast<vscpClientMqtt *>(pData);
pClient->m_bConnected = true;

if (nullptr != pClient) {
pClient->sendWillPayload();
pClient->doSubscribe();
}

spdlog::debug("VSCP MQTT CLIENT: v3.11 connect: rv={0:X} flags={1:X} {2}", rv, mosquitto_strerror(rv));

if (nullptr != pClient->m_parentCallbackConnect) {
Expand Down Expand Up @@ -178,6 +183,11 @@ mqtt_on_connect_flags(struct mosquitto *mosq, void *pData, int rv, int flags)

spdlog::debug("VSCP MQTT CLIENT: v3.11 connect: rv={0:X} flags={1:X} {2}", rv, flags, mosquitto_strerror(rv));

if (nullptr != pClient) {
pClient->sendWillPayload();
pClient->doSubscribe();
}

if (nullptr != pClient->m_parentCallbackConnect) {
pClient->m_parentCallbackConnect(mosq, pClient->m_pParent, rv);
}
Expand Down Expand Up @@ -621,6 +631,10 @@ vscpClientMqtt::vscpClientMqtt(void)
m_keepAlive = 30; // 30 seconds for keepalive
// m_bCleanSession = false; // Do not start with a clean session

// Defaults for timing
m_timeoutConnection = 5000; // 5 seconds
m_timeoutResponse = 200;

m_bTLS = false;
m_tls_cafile = "";
m_tls_capath = "";
Expand Down Expand Up @@ -847,7 +861,8 @@ vscpClientMqtt::initFromJson(const std::string &config)

if (jj.contains("tls-kpass-sha1") && j["tls-kpass-sha1"].is_string()) {
m_mapMqttStringOptions["tls-kpass-sha1"] = jj["tls-kpass-sha1"].get<std::string>();
spdlog::debug("VSCP MQTT CLIENT: json mqtt init: tls-kpass-sha1 set to {}.", m_mapMqttIntOptions["tls-kpass-sha1"]);
spdlog::debug("VSCP MQTT CLIENT: json mqtt init: tls-kpass-sha1 set to {}.",
m_mapMqttIntOptions["tls-kpass-sha1"]);
}

if (jj.contains("tls-alpn") && j["tls-alpn"].is_string()) {
Expand Down Expand Up @@ -1053,7 +1068,8 @@ vscpClientMqtt::initFromJson(const std::string &config)

if (jj.contains("no-hostname-checking") && j["no-hostname-checking"].is_boolean()) {
m_tls_bNoHostNameCheck = jj["no-hostname-checking"].get<bool>();
spdlog::debug("VSCP MQTT CLIENT: json mqtt init: 'tls no-hostname-checking' Set to {}.", m_tls_bNoHostNameCheck);
spdlog::debug("VSCP MQTT CLIENT: json mqtt init: 'tls no-hostname-checking' Set to {}.",
m_tls_bNoHostNameCheck);
}

if (jj.contains("cert-reqs") && j["cert-reqs"].is_number()) {
Expand Down Expand Up @@ -1089,17 +1105,19 @@ vscpClientMqtt::initFromJson(const std::string &config)

// If certfile == NULL, keyfile must also be NULL and no client certificate will be used.
if (!m_tls_certfile.length()) {
spdlog::warn("VSCP MQTT CLIENT: json mqtt init: 'TLS' If certfile == NULL, keyfile must also be NULL and no client "
"certificate will be "
"used. keyfile set to NULL.");
spdlog::warn(
"VSCP MQTT CLIENT: json mqtt init: 'TLS' If certfile == NULL, keyfile must also be NULL and no client "
"certificate will be "
"used. keyfile set to NULL.");
m_tls_keyfile = "";
}

// If m_tls_keyfile == NULL, certfile must also be NULL and no client certificate will be used.
if (!m_tls_keyfile.length()) {
spdlog::warn("VSCP MQTT CLIENT: json mqtt init: 'TLS' If m_tls_keyfile == NULL, certfile must also be NULL and no "
"client certificate "
"will be used. certfile set to NULL.");
spdlog::warn(
"VSCP MQTT CLIENT: json mqtt init: 'TLS' If m_tls_keyfile == NULL, certfile must also be NULL and no "
"client certificate "
"will be used. certfile set to NULL.");
m_tls_certfile = "";
}
}
Expand Down Expand Up @@ -1859,7 +1877,9 @@ vscpClientMqtt::init(void)
#if LIBMOSQUITTO_MAJOR >= 2
if (MOSQ_ERR_SUCCESS !=
(rv = mosquitto_int_option(m_mosq, MOSQ_OPT_TCP_NODELAY, m_mapMqttIntOptions["tcp-nodelay"]))) {
spdlog::error("VSCP MQTT CLIENT: Failed to set option MOSQ_OPT_TCP_NODELAY. rv={0} {1}", rv, mosquitto_strerror(rv));
spdlog::error("VSCP MQTT CLIENT: Failed to set option MOSQ_OPT_TCP_NODELAY. rv={0} {1}",
rv,
mosquitto_strerror(rv));
}
#endif

Expand Down Expand Up @@ -1892,13 +1912,17 @@ vscpClientMqtt::init(void)
// receive-maximum
if (MOSQ_ERR_SUCCESS !=
(rv = mosquitto_int_option(m_mosq, MOSQ_OPT_RECEIVE_MAXIMUM, m_mapMqttIntOptions["receive-maximum"]))) {
spdlog::error("VSCP MQTT CLIENT: Failed to set option MOSQ_OPT_RECEIVE_MAXIMUM. rv={0} {1}", rv, mosquitto_strerror(rv));
spdlog::error("VSCP MQTT CLIENT: Failed to set option MOSQ_OPT_RECEIVE_MAXIMUM. rv={0} {1}",
rv,
mosquitto_strerror(rv));
}

// send-maximum
if (MOSQ_ERR_SUCCESS !=
(rv = mosquitto_int_option(m_mosq, MOSQ_OPT_SEND_MAXIMUM, m_mapMqttIntOptions["send-maximum"]))) {
spdlog::error("VSCP MQTT CLIENT: Failed to set option MOSQ_OPT_SEND_MAXIMUM. rv={0} {1}", rv, mosquitto_strerror(rv));
spdlog::error("VSCP MQTT CLIENT: Failed to set option MOSQ_OPT_SEND_MAXIMUM. rv={0} {1}",
rv,
mosquitto_strerror(rv));
}
#endif

Expand Down Expand Up @@ -2014,19 +2038,13 @@ vscpClientMqtt::connect(void)
}
else {
rv = mosquitto_connect(m_mosq, m_host.c_str(), m_port, m_keepAlive);
spdlog::trace("VSCP MQTT CLIENT: Waiting for connection to be established.");
#ifdef WIN32
Sleep(1000); // Wait for connection to be established TODO
#else
sleep(1); // Wait for connection to be established TODO
#endif
}

if (MOSQ_ERR_SUCCESS != rv) {
spdlog::error("VSCP MQTT CLIENT: Failed to connect to MQTT remote host. rv={0} {1}", rv, mosquitto_strerror(rv));
return VSCP_ERROR_NOT_CONNECTED;
if (MOSQ_ERR_SUCCESS != rv) {
spdlog::error("VSCP MQTT CLIENT: Failed to connect to MQTT remote host. rv={0} {1}", rv, mosquitto_strerror(rv));
return VSCP_ERROR_NOT_CONNECTED;
}
}

// Set the mosquitto to use threaded mode
// mosquitto_threaded_set(m_mosq, true);

// Start the worker loop
Expand All @@ -2037,6 +2055,28 @@ vscpClientMqtt::connect(void)
return VSCP_ERROR_ERROR;
}

spdlog::trace("VSCP MQTT CLIENT: Waiting for connection to be established.");
uint32_t now = vscp_getMsTimeStamp();
while (!m_bConnected) {
// Wait for connection to be established m_timeoutConnection
if ((vscp_getMsTimeStamp() - now) > m_timeoutConnection) {
spdlog::error("VSCP MQTT CLIENT: Connection timeout.");
return VSCP_ERROR_TIMEOUT;
}
}

return VSCP_ERROR_SUCCESS;
}

///////////////////////////////////////////////////////////////////////////////
// sendWillPayload
//

int
vscpClientMqtt::sendWillPayload(void)
{
int rv;

// We send an empty payload if will is defined
if (m_will_bretain && m_will_payload.length() && m_will_topic.length()) {

Expand Down Expand Up @@ -2091,6 +2131,18 @@ vscpClientMqtt::connect(void)
}
}

return VSCP_ERROR_SUCCESS;
}

///////////////////////////////////////////////////////////////////////////////
// doSubscribe
//

int
vscpClientMqtt::doSubscribe(void)
{
int rv;

// Only subscribe if subscription topic is defined
for (std::list<subscribeTopic *>::const_iterator it = m_mqtt_subscribeTopicList.begin();
it != m_mqtt_subscribeTopicList.end();
Expand Down Expand Up @@ -2428,7 +2480,9 @@ vscpClientMqtt::send(vscpEvent &ev)
payload,
ppublish->getQos(),
ppublish->getRetain()))) {
spdlog::error("VSCP MQTT CLIENT: sendEvent: mosquitto_publish (ev) failed. rv={0} {1}", rv, mosquitto_strerror(rv));
spdlog::error("VSCP MQTT CLIENT: sendEvent: mosquitto_publish (ev) failed. rv={0} {1}",
rv,
mosquitto_strerror(rv));
// printf("mosquitto_publish: %s\n", mosquitto_strerror(rv));
}

Expand Down Expand Up @@ -2693,7 +2747,9 @@ vscpClientMqtt::send(vscpEventEx &ex)
payload,
ppublish->getQos(),
ppublish->getRetain()))) {
spdlog::error("VSCP MQTT CLIENT: sendEvent: mosquitto_publish (ex) failed. rv={0} {1}", rv, mosquitto_strerror(rv));
spdlog::error("VSCP MQTT CLIENT: sendEvent: mosquitto_publish (ex) failed. rv={0} {1}",
rv,
mosquitto_strerror(rv));
}

} // for each topic
Expand Down
12 changes: 12 additions & 0 deletions src/vscp/common/vscp-client-mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,18 @@ class vscpClientMqtt : public CVscpClient {
*/
int clearRetain4Topic(std::string &strTopic);

/*!
We send an empty payload if will is defined
!!! Must be done after we are confirmed to be connected (connect callback).
*/
int sendWillPayload(void);

/*!
Do the actual subscribe
!!! Must be done after we are confirmed to be connected (connect callback).
*/
int doSubscribe(void);

public:
// Timeout in milliseconds for host connection.
uint32_t m_timeoutConnection;
Expand Down

0 comments on commit d59ae90

Please sign in to comment.