diff --git a/src/vscp/common/vscp-client-mqtt.cpp b/src/vscp/common/vscp-client-mqtt.cpp index 755f92c77..bdcadac41 100644 --- a/src/vscp/common/vscp-client-mqtt.cpp +++ b/src/vscp/common/vscp-client-mqtt.cpp @@ -148,6 +148,11 @@ mqtt_on_connect(struct mosquitto *mosq, void *pData, int rv) vscpClientMqtt *pClient = reinterpret_cast(pData); pClient->m_bConnected = true; + if (nullptr != pClient) { + pClient->sendWillPayload(); + pClient->doSubscribe(); + } + spdlog::debug("VSCP MQTT CLIENT: v3.11 connect: rv={0:X} flags={1:X} {2}", rv, mosquitto_strerror(rv)); if (nullptr != pClient->m_parentCallbackConnect) { @@ -178,6 +183,11 @@ mqtt_on_connect_flags(struct mosquitto *mosq, void *pData, int rv, int flags) spdlog::debug("VSCP MQTT CLIENT: v3.11 connect: rv={0:X} flags={1:X} {2}", rv, flags, mosquitto_strerror(rv)); + if (nullptr != pClient) { + pClient->sendWillPayload(); + pClient->doSubscribe(); + } + if (nullptr != pClient->m_parentCallbackConnect) { pClient->m_parentCallbackConnect(mosq, pClient->m_pParent, rv); } @@ -621,6 +631,10 @@ vscpClientMqtt::vscpClientMqtt(void) m_keepAlive = 30; // 30 seconds for keepalive // m_bCleanSession = false; // Do not start with a clean session + // Defaults for timing + m_timeoutConnection = 5000; // 5 seconds + m_timeoutResponse = 200; + m_bTLS = false; m_tls_cafile = ""; m_tls_capath = ""; @@ -847,7 +861,8 @@ vscpClientMqtt::initFromJson(const std::string &config) if (jj.contains("tls-kpass-sha1") && j["tls-kpass-sha1"].is_string()) { m_mapMqttStringOptions["tls-kpass-sha1"] = jj["tls-kpass-sha1"].get(); - spdlog::debug("VSCP MQTT CLIENT: json mqtt init: tls-kpass-sha1 set to {}.", m_mapMqttIntOptions["tls-kpass-sha1"]); + spdlog::debug("VSCP MQTT CLIENT: json mqtt init: tls-kpass-sha1 set to {}.", + m_mapMqttIntOptions["tls-kpass-sha1"]); } if (jj.contains("tls-alpn") && j["tls-alpn"].is_string()) { @@ -1053,7 +1068,8 @@ vscpClientMqtt::initFromJson(const std::string &config) if (jj.contains("no-hostname-checking") && j["no-hostname-checking"].is_boolean()) { m_tls_bNoHostNameCheck = jj["no-hostname-checking"].get(); - spdlog::debug("VSCP MQTT CLIENT: json mqtt init: 'tls no-hostname-checking' Set to {}.", m_tls_bNoHostNameCheck); + spdlog::debug("VSCP MQTT CLIENT: json mqtt init: 'tls no-hostname-checking' Set to {}.", + m_tls_bNoHostNameCheck); } if (jj.contains("cert-reqs") && j["cert-reqs"].is_number()) { @@ -1089,17 +1105,19 @@ vscpClientMqtt::initFromJson(const std::string &config) // If certfile == NULL, keyfile must also be NULL and no client certificate will be used. if (!m_tls_certfile.length()) { - spdlog::warn("VSCP MQTT CLIENT: json mqtt init: 'TLS' If certfile == NULL, keyfile must also be NULL and no client " - "certificate will be " - "used. keyfile set to NULL."); + spdlog::warn( + "VSCP MQTT CLIENT: json mqtt init: 'TLS' If certfile == NULL, keyfile must also be NULL and no client " + "certificate will be " + "used. keyfile set to NULL."); m_tls_keyfile = ""; } // If m_tls_keyfile == NULL, certfile must also be NULL and no client certificate will be used. if (!m_tls_keyfile.length()) { - spdlog::warn("VSCP MQTT CLIENT: json mqtt init: 'TLS' If m_tls_keyfile == NULL, certfile must also be NULL and no " - "client certificate " - "will be used. certfile set to NULL."); + spdlog::warn( + "VSCP MQTT CLIENT: json mqtt init: 'TLS' If m_tls_keyfile == NULL, certfile must also be NULL and no " + "client certificate " + "will be used. certfile set to NULL."); m_tls_certfile = ""; } } @@ -1859,7 +1877,9 @@ vscpClientMqtt::init(void) #if LIBMOSQUITTO_MAJOR >= 2 if (MOSQ_ERR_SUCCESS != (rv = mosquitto_int_option(m_mosq, MOSQ_OPT_TCP_NODELAY, m_mapMqttIntOptions["tcp-nodelay"]))) { - spdlog::error("VSCP MQTT CLIENT: Failed to set option MOSQ_OPT_TCP_NODELAY. rv={0} {1}", rv, mosquitto_strerror(rv)); + spdlog::error("VSCP MQTT CLIENT: Failed to set option MOSQ_OPT_TCP_NODELAY. rv={0} {1}", + rv, + mosquitto_strerror(rv)); } #endif @@ -1892,13 +1912,17 @@ vscpClientMqtt::init(void) // receive-maximum if (MOSQ_ERR_SUCCESS != (rv = mosquitto_int_option(m_mosq, MOSQ_OPT_RECEIVE_MAXIMUM, m_mapMqttIntOptions["receive-maximum"]))) { - spdlog::error("VSCP MQTT CLIENT: Failed to set option MOSQ_OPT_RECEIVE_MAXIMUM. rv={0} {1}", rv, mosquitto_strerror(rv)); + spdlog::error("VSCP MQTT CLIENT: Failed to set option MOSQ_OPT_RECEIVE_MAXIMUM. rv={0} {1}", + rv, + mosquitto_strerror(rv)); } // send-maximum if (MOSQ_ERR_SUCCESS != (rv = mosquitto_int_option(m_mosq, MOSQ_OPT_SEND_MAXIMUM, m_mapMqttIntOptions["send-maximum"]))) { - spdlog::error("VSCP MQTT CLIENT: Failed to set option MOSQ_OPT_SEND_MAXIMUM. rv={0} {1}", rv, mosquitto_strerror(rv)); + spdlog::error("VSCP MQTT CLIENT: Failed to set option MOSQ_OPT_SEND_MAXIMUM. rv={0} {1}", + rv, + mosquitto_strerror(rv)); } #endif @@ -2014,19 +2038,13 @@ vscpClientMqtt::connect(void) } else { rv = mosquitto_connect(m_mosq, m_host.c_str(), m_port, m_keepAlive); - spdlog::trace("VSCP MQTT CLIENT: Waiting for connection to be established."); -#ifdef WIN32 - Sleep(1000); // Wait for connection to be established TODO -#else - sleep(1); // Wait for connection to be established TODO -#endif - } - - if (MOSQ_ERR_SUCCESS != rv) { - spdlog::error("VSCP MQTT CLIENT: Failed to connect to MQTT remote host. rv={0} {1}", rv, mosquitto_strerror(rv)); - return VSCP_ERROR_NOT_CONNECTED; + if (MOSQ_ERR_SUCCESS != rv) { + spdlog::error("VSCP MQTT CLIENT: Failed to connect to MQTT remote host. rv={0} {1}", rv, mosquitto_strerror(rv)); + return VSCP_ERROR_NOT_CONNECTED; + } } + // Set the mosquitto to use threaded mode // mosquitto_threaded_set(m_mosq, true); // Start the worker loop @@ -2037,6 +2055,28 @@ vscpClientMqtt::connect(void) return VSCP_ERROR_ERROR; } + spdlog::trace("VSCP MQTT CLIENT: Waiting for connection to be established."); + uint32_t now = vscp_getMsTimeStamp(); + while (!m_bConnected) { + // Wait for connection to be established m_timeoutConnection + if ((vscp_getMsTimeStamp() - now) > m_timeoutConnection) { + spdlog::error("VSCP MQTT CLIENT: Connection timeout."); + return VSCP_ERROR_TIMEOUT; + } + } + + return VSCP_ERROR_SUCCESS; +} + +/////////////////////////////////////////////////////////////////////////////// +// sendWillPayload +// + +int +vscpClientMqtt::sendWillPayload(void) +{ + int rv; + // We send an empty payload if will is defined if (m_will_bretain && m_will_payload.length() && m_will_topic.length()) { @@ -2091,6 +2131,18 @@ vscpClientMqtt::connect(void) } } + return VSCP_ERROR_SUCCESS; +} + +/////////////////////////////////////////////////////////////////////////////// +// doSubscribe +// + +int +vscpClientMqtt::doSubscribe(void) +{ + int rv; + // Only subscribe if subscription topic is defined for (std::list::const_iterator it = m_mqtt_subscribeTopicList.begin(); it != m_mqtt_subscribeTopicList.end(); @@ -2428,7 +2480,9 @@ vscpClientMqtt::send(vscpEvent &ev) payload, ppublish->getQos(), ppublish->getRetain()))) { - spdlog::error("VSCP MQTT CLIENT: sendEvent: mosquitto_publish (ev) failed. rv={0} {1}", rv, mosquitto_strerror(rv)); + spdlog::error("VSCP MQTT CLIENT: sendEvent: mosquitto_publish (ev) failed. rv={0} {1}", + rv, + mosquitto_strerror(rv)); // printf("mosquitto_publish: %s\n", mosquitto_strerror(rv)); } @@ -2693,7 +2747,9 @@ vscpClientMqtt::send(vscpEventEx &ex) payload, ppublish->getQos(), ppublish->getRetain()))) { - spdlog::error("VSCP MQTT CLIENT: sendEvent: mosquitto_publish (ex) failed. rv={0} {1}", rv, mosquitto_strerror(rv)); + spdlog::error("VSCP MQTT CLIENT: sendEvent: mosquitto_publish (ex) failed. rv={0} {1}", + rv, + mosquitto_strerror(rv)); } } // for each topic diff --git a/src/vscp/common/vscp-client-mqtt.h b/src/vscp/common/vscp-client-mqtt.h index 690e49042..ffe50e57c 100644 --- a/src/vscp/common/vscp-client-mqtt.h +++ b/src/vscp/common/vscp-client-mqtt.h @@ -698,6 +698,18 @@ class vscpClientMqtt : public CVscpClient { */ int clearRetain4Topic(std::string &strTopic); + /*! + We send an empty payload if will is defined + !!! Must be done after we are confirmed to be connected (connect callback). + */ + int sendWillPayload(void); + + /*! + Do the actual subscribe + !!! Must be done after we are confirmed to be connected (connect callback). + */ + int doSubscribe(void); + public: // Timeout in milliseconds for host connection. uint32_t m_timeoutConnection;