diff --git a/src/Comms/LinkManager.cc b/src/Comms/LinkManager.cc index 790913f68d1..4abc8abd04a 100644 --- a/src/Comms/LinkManager.cc +++ b/src/Comms/LinkManager.cc @@ -229,6 +229,8 @@ void LinkManager::_linkDisconnected() for (auto it = _rgLinks.begin(); it != _rgLinks.end(); ++it) { if (it->get() == link) { qCDebug(LinkManagerLog) << "LinkManager::_linkDisconnected" << it->get()->linkConfiguration()->name() << it->use_count(); + SharedLinkConfigurationPtr config = it->get()->linkConfiguration(); + config->setLink(nullptr); (void) _rgLinks.erase(it); return; } diff --git a/src/Comms/TCPLink.cc b/src/Comms/TCPLink.cc index 1626b0b26b4..c0388f2bc1e 100644 --- a/src/Comms/TCPLink.cc +++ b/src/Comms/TCPLink.cc @@ -18,62 +18,25 @@ QGC_LOGGING_CATEGORY(TCPLinkLog, "qgc.comms.tcplink") namespace { - constexpr int SEND_BUFFER_SIZE = 1024; // >= MAVLINK_MAX_PACKET_LEN - constexpr int RECEIVE_BUFFER_SIZE = 1024; // >= MAVLINK_MAX_PACKET_LEN - constexpr int READ_BUFFER_SIZE = 1024; // >= MAVLINK_MAX_PACKET_LEN constexpr int CONNECT_TIMEOUT_MS = 1000; - constexpr int TYPE_OF_SERVICE = 32; // Optional: Set ToS for low delay - constexpr int MAX_RECONNECTION_ATTEMPTS = 5; + constexpr int TYPE_OF_SERVICE = 32; // Set ToS for low delay + constexpr int MAX_RECONNECTION_ATTEMPTS = 3; } -TCPLink::TCPLink(SharedLinkConfigurationPtr &config, QObject *parent) - : LinkInterface(config, parent) - , _tcpConfig(qobject_cast(config.get())) - , _socket(new QTcpSocket()) +TCPWorker::TCPWorker(const QString &host, quint16 port, QObject *parent) + : QObject(parent) + , _socket(new QTcpSocket(this)) + , _host(host) + , _port(port) { - // qCDebug(TCPLinkLog) << Q_FUNC_INFO << this; - - Q_CHECK_PTR(_tcpConfig); - if (!_tcpConfig) { - qCWarning(TCPLinkLog) << "Invalid TCPConfiguration provided."; - emit communicationError( - tr("Configuration Error"), - tr("Link %1: Invalid TCP configuration.").arg(config->name()) - ); - return; - } - - _socket->setSocketOption(QAbstractSocket::SendBufferSizeSocketOption, SEND_BUFFER_SIZE); - _socket->setSocketOption(QAbstractSocket::ReceiveBufferSizeSocketOption, RECEIVE_BUFFER_SIZE); - _socket->setReadBufferSize(READ_BUFFER_SIZE); _socket->setSocketOption(QAbstractSocket::LowDelayOption, 1); _socket->setSocketOption(QAbstractSocket::KeepAliveOption, 1); - // _socket->setSocketOption(QAbstractSocket::TypeOfServiceOption, TYPE_OF_SERVICE); - - (void) QObject::connect(_socket, &QTcpSocket::connected, this, [this]() { - _isConnected = true; - _reconnectionAttempts = 0; - qCDebug(TCPLinkLog) << "TCP connected to" << _tcpConfig->host() << ":" << _tcpConfig->port(); - emit connected(); - }, Qt::AutoConnection); + _socket->setSocketOption(QAbstractSocket::TypeOfServiceOption, TYPE_OF_SERVICE); - (void) QObject::connect(_socket, &QTcpSocket::disconnected, this, [this]() { - _isConnected = false; - qCDebug(TCPLinkLog) << "TCP disconnected from" << _tcpConfig->host() << ":" << _tcpConfig->port(); - emit disconnected(); - // TODO: Uncomment after threading changes - // _attemptReconnection(); - }, Qt::AutoConnection); - - (void) QObject::connect(_socket, &QTcpSocket::readyRead, this, &TCPLink::_readBytes, Qt::AutoConnection); - - (void) QObject::connect(_socket, &QTcpSocket::errorOccurred, this, [this](QTcpSocket::SocketError error) { - qCWarning(TCPLinkLog) << "TCP Link Error:" << error << _socket->errorString(); - emit communicationError( - tr("TCP Link Error"), - tr("Link %1: %2.").arg(_tcpConfig->name(), _socket->errorString()) - ); - }, Qt::AutoConnection); + (void) connect(_socket, &QTcpSocket::connected, this, &TCPWorker::_onSocketConnected); + (void) connect(_socket, &QTcpSocket::disconnected, this, &TCPWorker::_onSocketDisconnected); + (void) connect(_socket, &QTcpSocket::readyRead, this, &TCPWorker::_onSocketReadyRead); + (void) connect(_socket, &QTcpSocket::errorOccurred, this, &TCPWorker::_onSocketErrorOccurred); #ifdef QT_DEBUG (void) QObject::connect(_socket, &QTcpSocket::stateChanged, this, [](QTcpSocket::SocketState state) { @@ -86,132 +49,172 @@ TCPLink::TCPLink(SharedLinkConfigurationPtr &config, QObject *parent) #endif } -TCPLink::~TCPLink() +TCPWorker::~TCPWorker() { - if (_socket->isOpen()) { - _socket->disconnectFromHost(); - if (_socket->state() != QAbstractSocket::UnconnectedState) { - _socket->waitForDisconnected(CONNECT_TIMEOUT_MS); - } - } - - _socket->deleteLater(); - - // qCDebug(TCPLinkLog) << Q_FUNC_INFO << this; + disconnectFromHost(); } -void TCPLink::disconnect() +bool TCPWorker::isConnected() const { - if (isConnected()) { - _socket->disconnectFromHost(); - } else { - emit disconnected(); - } + return (_socket->isOpen() && (_socket->state() == QAbstractSocket::ConnectedState)); } -bool TCPLink::_connect() +void TCPWorker::connectToHost() { if (isConnected()) { - qCWarning(TCPLinkLog) << "Already connected to" << _tcpConfig->host() << ":" << _tcpConfig->port(); - return true; + qCWarning(TCPLinkLog) << "Already connected to" << _host << ":" << _port; + return; } QSignalSpy errorSpy(_socket, &QTcpSocket::errorOccurred); - qCDebug(TCPLinkLog) << "Attempting to connect to host:" << _tcpConfig->host() << "port:" << _tcpConfig->port(); - _socket->connectToHost(_tcpConfig->host(), _tcpConfig->port()); + qCDebug(TCPLinkLog) << "Attempting to connect to host:" << _host << "port:" << _port; + _socket->connectToHost(_host, _port); - // TODO: Switch Blocking to Signals after Threading Changes if (!_socket->waitForConnected(CONNECT_TIMEOUT_MS)) { - qCWarning(TCPLinkLog) << "Connection to" << _tcpConfig->host() << ":" << _tcpConfig->port() << "failed:" << _socket->errorString(); + qCWarning(TCPLinkLog) << "Connection to" << _host << ":" << _port << "failed:" << _socket->errorString(); if (errorSpy.count() == 0) { - emit communicationError( - tr("TCP Link Connect Error"), - tr("Link %1: %2.").arg(_tcpConfig->name(), tr("Connection Failed: %1").arg(_socket->errorString())) - ); + emit errorOccurred(tr("Connection Failed: %1").arg(_socket->errorString())); } - return false; + _onSocketDisconnected(); } - qCDebug(TCPLinkLog) << "Successfully connected to" << _tcpConfig->host() << ":" << _tcpConfig->port(); - return true; + qCDebug(TCPLinkLog) << "Successfully connected to" << _host.toString() << ":" << _port; } -void TCPLink::_writeBytes(const QByteArray &bytes) +void TCPWorker::disconnectFromHost() { - if (!_socket->isValid()) { - return; + if (isConnected()) { + _socket->disconnectFromHost(); } +} - static const QString title = tr("TCP Link Write Error"); +void TCPWorker::writeData(const QByteArray &data) +{ + if (isConnected()) { + qint64 totalBytesWritten = 0; + while (totalBytesWritten < data.size()) { + const qint64 bytesWritten = _socket->write(data.constData() + totalBytesWritten, data.size() - totalBytesWritten); + if (bytesWritten <= 0) { + break; + } + + totalBytesWritten += bytesWritten; + } - if (!isConnected()) { - emit communicationError( - title, - tr("Link %1: Could Not Send Data - Link is Disconnected!").arg(_tcpConfig->name()) - ); - return; + if (totalBytesWritten < 0) { + emit errorOccurred(tr("Could Not Send Data - Write Failed: %1").arg(_socket->errorString())); + } + } else { + emit errorOccurred(tr("Socket is not connected")); } +} - const qint64 bytesWritten = _socket->write(bytes); - if (bytesWritten < 0) { - emit communicationError( - title, - tr("Link %1: Could Not Send Data - Write Failed: %2").arg(_tcpConfig->name(), _socket->errorString()) - ); - return; - } +void TCPWorker::_onSocketConnected() +{ + emit connected(); +} - if (bytesWritten < bytes.size()) { - qCWarning(TCPLinkLog) << "Wrote" << bytesWritten << "Out of" << bytes.size() << "total bytes"; - const QByteArray remainingBytes = bytes.mid(bytesWritten); - writeBytesThreadSafe(remainingBytes.constData(), remainingBytes.size()); - } +void TCPWorker::_onSocketDisconnected() +{ + emit disconnected(); +} - emit bytesSent(this, bytes); +void TCPWorker::_onSocketReadyRead() +{ + const QByteArray data = _socket->readAll(); + emit dataReceived(data); } -void TCPLink::_readBytes() +void TCPWorker::_onSocketErrorOccurred(QAbstractSocket::SocketError socketError) { - if (!_socket->isValid()) { - return; - } + Q_UNUSED(socketError); + emit errorOccurred(_socket->errorString()); +} - if (!isConnected()) { +/*===========================================================================*/ + +TCPLink::TCPLink(SharedLinkConfigurationPtr &config, QObject *parent) + : LinkInterface(config, parent) + , _tcpConfig(qobject_cast(config.get())) + , _worker(nullptr) + , _workerThread(new QThread(this)) +{ + Q_CHECK_PTR(_tcpConfig); + if (!_tcpConfig) { + qCWarning(TCPLinkLog) << "Invalid TCPConfiguration provided."; emit communicationError( - tr("TCP Link Read Error"), - tr("Link %1: Could Not Read Data - Link is Disconnected!").arg(_tcpConfig->name()) + tr("Configuration Error"), + tr("Link %1: Invalid TCP configuration.").arg(config->name()) ); return; } - const QByteArray buffer = _socket->readAll(); + _worker = new TCPWorker(_tcpConfig->host(), _tcpConfig->port()); - if (buffer.isEmpty()) { - emit communicationError( - tr("TCP Link Read Error"), - tr("Link %1: Could Not Read Data - No Data Available!").arg(_tcpConfig->name()) - ); - return; - } + _worker->moveToThread(_workerThread); + + (void) connect(_workerThread, &QThread::finished, _worker, &QObject::deleteLater); + + (void) connect(_worker, &TCPWorker::connected, this, &TCPLink::_onConnected); + (void) connect(_worker, &TCPWorker::disconnected, this, &TCPLink::_onDisconnected); + (void) connect(_worker, &TCPWorker::errorOccurred, this, &TCPLink::_onErrorOccurred); + (void) connect(_worker, &TCPWorker::dataReceived, this, &TCPLink::_onDataReceived); + +#ifdef QT_DEBUG + _workerThread->setObjectName(QString("TCP_%1").arg(config->name())); +#endif + + _workerThread->start(); +} + +TCPLink::~TCPLink() +{ + disconnect(); - emit bytesReceived(this, buffer); + _workerThread->quit(); + _workerThread->wait(); } -void TCPLink::_attemptReconnection() +bool TCPLink::isConnected() const { - if (_reconnectionAttempts >= MAX_RECONNECTION_ATTEMPTS) { - qCWarning(TCPLinkLog) << "Max reconnection attempts reached for" << _tcpConfig->host() << ":" << _tcpConfig->port(); - emit communicationError(tr("TCP Link Reconnect Error"), tr("Link %1: Maximum reconnection attempts reached.").arg(_tcpConfig->name())); - return; - } + return (_worker && _worker->isConnected()); +} + +bool TCPLink::_connect() +{ + return QMetaObject::invokeMethod(_worker, "connectToHost", Qt::QueuedConnection); +} + +void TCPLink::disconnect() +{ + (void) QMetaObject::invokeMethod(_worker, "disconnectFromHost", Qt::QueuedConnection); +} + +void TCPLink::_onConnected() +{ + emit connected(); +} + +void TCPLink::_onDisconnected() +{ + emit disconnected(); +} - _reconnectionAttempts++; - const int delay = qPow(2, _reconnectionAttempts) * 1000; // Exponential backoff - qCDebug(TCPLinkLog) << "Attempting reconnection #" << _reconnectionAttempts << "in" << delay << "ms"; - QTimer::singleShot(delay, this, [this]() { - _connect(); - }); +void TCPLink::_onErrorOccurred(const QString &errorString) +{ + emit communicationError(tr("TCP Link Error"), tr("Link %1: %2").arg(_tcpConfig->name(), errorString)); +} + +void TCPLink::_onDataReceived(const QByteArray &data) +{ + // Process data or emit signal + emit bytesReceived(this, data); +} + +void TCPLink::_writeBytes(const QByteArray& bytes) +{ + (void) QMetaObject::invokeMethod(_worker, "writeData", Qt::QueuedConnection, Q_ARG(QByteArray, bytes)); } bool TCPLink::isSecureConnection() diff --git a/src/Comms/TCPLink.h b/src/Comms/TCPLink.h index 9c829a28000..a1e62fb74f5 100644 --- a/src/Comms/TCPLink.h +++ b/src/Comms/TCPLink.h @@ -21,6 +21,41 @@ class QTcpSocket; Q_DECLARE_LOGGING_CATEGORY(TCPLinkLog) +class TCPWorker : public QObject +{ + Q_OBJECT + +public: + TCPWorker(const QString &host, quint16 port, QObject *parent = nullptr); + ~TCPWorker(); + + bool isConnected() const; + +public slots: + void connectToHost(); + void disconnectFromHost(); + void writeData(const QByteArray &data); + +signals: + void connected(); + void disconnected(); + void errorOccurred(const QString &errorString); + void dataReceived(const QByteArray &data); + +private slots: + void _onSocketConnected(); + void _onSocketDisconnected(); + void _onSocketReadyRead(); + void _onSocketErrorOccurred(QAbstractSocket::SocketError socketError); + +private: + QTcpSocket *_socket = nullptr; + QHostAddress _host; + quint16 _port = 0; +}; + +/*===========================================================================*/ + class TCPConfiguration : public LinkConfiguration { Q_OBJECT @@ -65,21 +100,23 @@ class TCPLink : public LinkInterface explicit TCPLink(SharedLinkConfigurationPtr &config, QObject *parent = nullptr); virtual ~TCPLink(); - void run() override {}; - bool isConnected() const override { return _isConnected; } + bool isConnected() const override; void disconnect() override; bool isSecureConnection() override; private slots: - void _writeBytes(const QByteArray &bytes) override; - void _readBytes(); + bool _connect() override; + void _onConnected(); + void _onDisconnected(); + void _onErrorOccurred(const QString &errorString); + void _onDataReceived(const QByteArray &data); private: - bool _connect() override; + void _writeBytes(const QByteArray &bytes) override; + void _setupSocket(); void _attemptReconnection(); + TCPWorker *_worker = nullptr; + QThread *_workerThread = nullptr; const TCPConfiguration *_tcpConfig = nullptr; - QTcpSocket *_socket = nullptr; - bool _isConnected = false; - int _reconnectionAttempts = 0; };