Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Comms: TCPLink Threading Updates #12051

Merged
merged 1 commit into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Comms/LinkManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ void LinkManager::_linkDisconnected()
for (auto it = _rgLinks.begin(); it != _rgLinks.end(); ++it) {
if (it->get() == link) {
qCDebug(LinkManagerLog) << "LinkManager::_linkDisconnected" << it->get()->linkConfiguration()->name() << it->use_count();
SharedLinkConfigurationPtr config = it->get()->linkConfiguration();
config->setLink(nullptr);
(void) _rgLinks.erase(it);
return;
}
Expand Down
267 changes: 135 additions & 132 deletions src/Comms/TCPLink.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,62 +18,25 @@
QGC_LOGGING_CATEGORY(TCPLinkLog, "qgc.comms.tcplink")

namespace {
constexpr int SEND_BUFFER_SIZE = 1024; // >= MAVLINK_MAX_PACKET_LEN
constexpr int RECEIVE_BUFFER_SIZE = 1024; // >= MAVLINK_MAX_PACKET_LEN
constexpr int READ_BUFFER_SIZE = 1024; // >= MAVLINK_MAX_PACKET_LEN
constexpr int CONNECT_TIMEOUT_MS = 1000;
constexpr int TYPE_OF_SERVICE = 32; // Optional: Set ToS for low delay
constexpr int MAX_RECONNECTION_ATTEMPTS = 5;
constexpr int TYPE_OF_SERVICE = 32; // Set ToS for low delay
constexpr int MAX_RECONNECTION_ATTEMPTS = 3;
}

TCPLink::TCPLink(SharedLinkConfigurationPtr &config, QObject *parent)
: LinkInterface(config, parent)
, _tcpConfig(qobject_cast<const TCPConfiguration*>(config.get()))
, _socket(new QTcpSocket())
TCPWorker::TCPWorker(const QString &host, quint16 port, QObject *parent)
: QObject(parent)
, _socket(new QTcpSocket(this))
, _host(host)
, _port(port)
{
// qCDebug(TCPLinkLog) << Q_FUNC_INFO << this;

Q_CHECK_PTR(_tcpConfig);
if (!_tcpConfig) {
qCWarning(TCPLinkLog) << "Invalid TCPConfiguration provided.";
emit communicationError(
tr("Configuration Error"),
tr("Link %1: Invalid TCP configuration.").arg(config->name())
);
return;
}

_socket->setSocketOption(QAbstractSocket::SendBufferSizeSocketOption, SEND_BUFFER_SIZE);
_socket->setSocketOption(QAbstractSocket::ReceiveBufferSizeSocketOption, RECEIVE_BUFFER_SIZE);
_socket->setReadBufferSize(READ_BUFFER_SIZE);
_socket->setSocketOption(QAbstractSocket::LowDelayOption, 1);
_socket->setSocketOption(QAbstractSocket::KeepAliveOption, 1);
// _socket->setSocketOption(QAbstractSocket::TypeOfServiceOption, TYPE_OF_SERVICE);

(void) QObject::connect(_socket, &QTcpSocket::connected, this, [this]() {
_isConnected = true;
_reconnectionAttempts = 0;
qCDebug(TCPLinkLog) << "TCP connected to" << _tcpConfig->host() << ":" << _tcpConfig->port();
emit connected();
}, Qt::AutoConnection);
_socket->setSocketOption(QAbstractSocket::TypeOfServiceOption, TYPE_OF_SERVICE);

(void) QObject::connect(_socket, &QTcpSocket::disconnected, this, [this]() {
_isConnected = false;
qCDebug(TCPLinkLog) << "TCP disconnected from" << _tcpConfig->host() << ":" << _tcpConfig->port();
emit disconnected();
// TODO: Uncomment after threading changes
// _attemptReconnection();
}, Qt::AutoConnection);

(void) QObject::connect(_socket, &QTcpSocket::readyRead, this, &TCPLink::_readBytes, Qt::AutoConnection);

(void) QObject::connect(_socket, &QTcpSocket::errorOccurred, this, [this](QTcpSocket::SocketError error) {
qCWarning(TCPLinkLog) << "TCP Link Error:" << error << _socket->errorString();
emit communicationError(
tr("TCP Link Error"),
tr("Link %1: %2.").arg(_tcpConfig->name(), _socket->errorString())
);
}, Qt::AutoConnection);
(void) connect(_socket, &QTcpSocket::connected, this, &TCPWorker::_onSocketConnected);
(void) connect(_socket, &QTcpSocket::disconnected, this, &TCPWorker::_onSocketDisconnected);
(void) connect(_socket, &QTcpSocket::readyRead, this, &TCPWorker::_onSocketReadyRead);
(void) connect(_socket, &QTcpSocket::errorOccurred, this, &TCPWorker::_onSocketErrorOccurred);

#ifdef QT_DEBUG
(void) QObject::connect(_socket, &QTcpSocket::stateChanged, this, [](QTcpSocket::SocketState state) {
Expand All @@ -86,132 +49,172 @@ TCPLink::TCPLink(SharedLinkConfigurationPtr &config, QObject *parent)
#endif
}

TCPLink::~TCPLink()
TCPWorker::~TCPWorker()
{
if (_socket->isOpen()) {
_socket->disconnectFromHost();
if (_socket->state() != QAbstractSocket::UnconnectedState) {
_socket->waitForDisconnected(CONNECT_TIMEOUT_MS);
}
}

_socket->deleteLater();

// qCDebug(TCPLinkLog) << Q_FUNC_INFO << this;
disconnectFromHost();
}

void TCPLink::disconnect()
bool TCPWorker::isConnected() const
{
if (isConnected()) {
_socket->disconnectFromHost();
} else {
emit disconnected();
}
return (_socket->isOpen() && (_socket->state() == QAbstractSocket::ConnectedState));
}

bool TCPLink::_connect()
void TCPWorker::connectToHost()
{
if (isConnected()) {
qCWarning(TCPLinkLog) << "Already connected to" << _tcpConfig->host() << ":" << _tcpConfig->port();
return true;
qCWarning(TCPLinkLog) << "Already connected to" << _host << ":" << _port;
return;
}

QSignalSpy errorSpy(_socket, &QTcpSocket::errorOccurred);

qCDebug(TCPLinkLog) << "Attempting to connect to host:" << _tcpConfig->host() << "port:" << _tcpConfig->port();
_socket->connectToHost(_tcpConfig->host(), _tcpConfig->port());
qCDebug(TCPLinkLog) << "Attempting to connect to host:" << _host << "port:" << _port;
_socket->connectToHost(_host, _port);

// TODO: Switch Blocking to Signals after Threading Changes
if (!_socket->waitForConnected(CONNECT_TIMEOUT_MS)) {
qCWarning(TCPLinkLog) << "Connection to" << _tcpConfig->host() << ":" << _tcpConfig->port() << "failed:" << _socket->errorString();
qCWarning(TCPLinkLog) << "Connection to" << _host << ":" << _port << "failed:" << _socket->errorString();
if (errorSpy.count() == 0) {
emit communicationError(
tr("TCP Link Connect Error"),
tr("Link %1: %2.").arg(_tcpConfig->name(), tr("Connection Failed: %1").arg(_socket->errorString()))
);
emit errorOccurred(tr("Connection Failed: %1").arg(_socket->errorString()));
}
return false;
_onSocketDisconnected();
}

qCDebug(TCPLinkLog) << "Successfully connected to" << _tcpConfig->host() << ":" << _tcpConfig->port();
return true;
qCDebug(TCPLinkLog) << "Successfully connected to" << _host.toString() << ":" << _port;
}

void TCPLink::_writeBytes(const QByteArray &bytes)
void TCPWorker::disconnectFromHost()
{
if (!_socket->isValid()) {
return;
if (isConnected()) {
_socket->disconnectFromHost();
}
}

static const QString title = tr("TCP Link Write Error");
void TCPWorker::writeData(const QByteArray &data)
{
if (isConnected()) {
qint64 totalBytesWritten = 0;
while (totalBytesWritten < data.size()) {
const qint64 bytesWritten = _socket->write(data.constData() + totalBytesWritten, data.size() - totalBytesWritten);
if (bytesWritten <= 0) {
break;
}

totalBytesWritten += bytesWritten;
}

if (!isConnected()) {
emit communicationError(
title,
tr("Link %1: Could Not Send Data - Link is Disconnected!").arg(_tcpConfig->name())
);
return;
if (totalBytesWritten < 0) {
emit errorOccurred(tr("Could Not Send Data - Write Failed: %1").arg(_socket->errorString()));
}
} else {
emit errorOccurred(tr("Socket is not connected"));
}
}

const qint64 bytesWritten = _socket->write(bytes);
if (bytesWritten < 0) {
emit communicationError(
title,
tr("Link %1: Could Not Send Data - Write Failed: %2").arg(_tcpConfig->name(), _socket->errorString())
);
return;
}
void TCPWorker::_onSocketConnected()
{
emit connected();
}

if (bytesWritten < bytes.size()) {
qCWarning(TCPLinkLog) << "Wrote" << bytesWritten << "Out of" << bytes.size() << "total bytes";
const QByteArray remainingBytes = bytes.mid(bytesWritten);
writeBytesThreadSafe(remainingBytes.constData(), remainingBytes.size());
}
void TCPWorker::_onSocketDisconnected()
{
emit disconnected();
}

emit bytesSent(this, bytes);
void TCPWorker::_onSocketReadyRead()
{
const QByteArray data = _socket->readAll();
emit dataReceived(data);
}

void TCPLink::_readBytes()
void TCPWorker::_onSocketErrorOccurred(QAbstractSocket::SocketError socketError)
{
if (!_socket->isValid()) {
return;
}
Q_UNUSED(socketError);
emit errorOccurred(_socket->errorString());
}

if (!isConnected()) {
/*===========================================================================*/

TCPLink::TCPLink(SharedLinkConfigurationPtr &config, QObject *parent)
: LinkInterface(config, parent)
, _tcpConfig(qobject_cast<const TCPConfiguration*>(config.get()))
, _worker(nullptr)
, _workerThread(new QThread(this))
{
Q_CHECK_PTR(_tcpConfig);
if (!_tcpConfig) {
qCWarning(TCPLinkLog) << "Invalid TCPConfiguration provided.";
emit communicationError(
tr("TCP Link Read Error"),
tr("Link %1: Could Not Read Data - Link is Disconnected!").arg(_tcpConfig->name())
tr("Configuration Error"),
tr("Link %1: Invalid TCP configuration.").arg(config->name())
);
return;
}

const QByteArray buffer = _socket->readAll();
_worker = new TCPWorker(_tcpConfig->host(), _tcpConfig->port());

if (buffer.isEmpty()) {
emit communicationError(
tr("TCP Link Read Error"),
tr("Link %1: Could Not Read Data - No Data Available!").arg(_tcpConfig->name())
);
return;
}
_worker->moveToThread(_workerThread);

(void) connect(_workerThread, &QThread::finished, _worker, &QObject::deleteLater);

(void) connect(_worker, &TCPWorker::connected, this, &TCPLink::_onConnected);
(void) connect(_worker, &TCPWorker::disconnected, this, &TCPLink::_onDisconnected);
(void) connect(_worker, &TCPWorker::errorOccurred, this, &TCPLink::_onErrorOccurred);
(void) connect(_worker, &TCPWorker::dataReceived, this, &TCPLink::_onDataReceived);

#ifdef QT_DEBUG
_workerThread->setObjectName(QString("TCP_%1").arg(config->name()));
#endif

_workerThread->start();
}

TCPLink::~TCPLink()
{
disconnect();

emit bytesReceived(this, buffer);
_workerThread->quit();
_workerThread->wait();
}

void TCPLink::_attemptReconnection()
bool TCPLink::isConnected() const
{
if (_reconnectionAttempts >= MAX_RECONNECTION_ATTEMPTS) {
qCWarning(TCPLinkLog) << "Max reconnection attempts reached for" << _tcpConfig->host() << ":" << _tcpConfig->port();
emit communicationError(tr("TCP Link Reconnect Error"), tr("Link %1: Maximum reconnection attempts reached.").arg(_tcpConfig->name()));
return;
}
return (_worker && _worker->isConnected());
}

bool TCPLink::_connect()
{
return QMetaObject::invokeMethod(_worker, "connectToHost", Qt::QueuedConnection);
}

void TCPLink::disconnect()
{
(void) QMetaObject::invokeMethod(_worker, "disconnectFromHost", Qt::QueuedConnection);
}

void TCPLink::_onConnected()
{
emit connected();
}

void TCPLink::_onDisconnected()
{
emit disconnected();
}

_reconnectionAttempts++;
const int delay = qPow(2, _reconnectionAttempts) * 1000; // Exponential backoff
qCDebug(TCPLinkLog) << "Attempting reconnection #" << _reconnectionAttempts << "in" << delay << "ms";
QTimer::singleShot(delay, this, [this]() {
_connect();
});
void TCPLink::_onErrorOccurred(const QString &errorString)
{
emit communicationError(tr("TCP Link Error"), tr("Link %1: %2").arg(_tcpConfig->name(), errorString));
}

void TCPLink::_onDataReceived(const QByteArray &data)
{
// Process data or emit signal
emit bytesReceived(this, data);
}

void TCPLink::_writeBytes(const QByteArray& bytes)
{
(void) QMetaObject::invokeMethod(_worker, "writeData", Qt::QueuedConnection, Q_ARG(QByteArray, bytes));
}

bool TCPLink::isSecureConnection()
Expand Down
Loading
Loading