Skip to content

Commit b14ff9e

Browse files
committed
Comms: TCPLink threading updates
1 parent 617b651 commit b14ff9e

File tree

3 files changed

+182
-140
lines changed

3 files changed

+182
-140
lines changed

src/Comms/LinkManager.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,8 @@ void LinkManager::_linkDisconnected()
229229
for (auto it = _rgLinks.begin(); it != _rgLinks.end(); ++it) {
230230
if (it->get() == link) {
231231
qCDebug(LinkManagerLog) << "LinkManager::_linkDisconnected" << it->get()->linkConfiguration()->name() << it->use_count();
232+
SharedLinkConfigurationPtr config = it->get()->linkConfiguration();
233+
config->setLink(nullptr);
232234
(void) _rgLinks.erase(it);
233235
return;
234236
}

src/Comms/TCPLink.cc

Lines changed: 135 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -18,62 +18,25 @@
1818
QGC_LOGGING_CATEGORY(TCPLinkLog, "qgc.comms.tcplink")
1919

2020
namespace {
21-
constexpr int SEND_BUFFER_SIZE = 1024; // >= MAVLINK_MAX_PACKET_LEN
22-
constexpr int RECEIVE_BUFFER_SIZE = 1024; // >= MAVLINK_MAX_PACKET_LEN
23-
constexpr int READ_BUFFER_SIZE = 1024; // >= MAVLINK_MAX_PACKET_LEN
2421
constexpr int CONNECT_TIMEOUT_MS = 1000;
25-
constexpr int TYPE_OF_SERVICE = 32; // Optional: Set ToS for low delay
26-
constexpr int MAX_RECONNECTION_ATTEMPTS = 5;
22+
constexpr int TYPE_OF_SERVICE = 32; // Set ToS for low delay
23+
constexpr int MAX_RECONNECTION_ATTEMPTS = 3;
2724
}
2825

29-
TCPLink::TCPLink(SharedLinkConfigurationPtr &config, QObject *parent)
30-
: LinkInterface(config, parent)
31-
, _tcpConfig(qobject_cast<const TCPConfiguration*>(config.get()))
32-
, _socket(new QTcpSocket())
26+
TCPWorker::TCPWorker(const QString &host, quint16 port, QObject *parent)
27+
: QObject(parent)
28+
, _socket(new QTcpSocket(this))
29+
, _host(host)
30+
, _port(port)
3331
{
34-
// qCDebug(TCPLinkLog) << Q_FUNC_INFO << this;
35-
36-
Q_CHECK_PTR(_tcpConfig);
37-
if (!_tcpConfig) {
38-
qCWarning(TCPLinkLog) << "Invalid TCPConfiguration provided.";
39-
emit communicationError(
40-
tr("Configuration Error"),
41-
tr("Link %1: Invalid TCP configuration.").arg(config->name())
42-
);
43-
return;
44-
}
45-
46-
_socket->setSocketOption(QAbstractSocket::SendBufferSizeSocketOption, SEND_BUFFER_SIZE);
47-
_socket->setSocketOption(QAbstractSocket::ReceiveBufferSizeSocketOption, RECEIVE_BUFFER_SIZE);
48-
_socket->setReadBufferSize(READ_BUFFER_SIZE);
4932
_socket->setSocketOption(QAbstractSocket::LowDelayOption, 1);
5033
_socket->setSocketOption(QAbstractSocket::KeepAliveOption, 1);
51-
// _socket->setSocketOption(QAbstractSocket::TypeOfServiceOption, TYPE_OF_SERVICE);
52-
53-
(void) QObject::connect(_socket, &QTcpSocket::connected, this, [this]() {
54-
_isConnected = true;
55-
_reconnectionAttempts = 0;
56-
qCDebug(TCPLinkLog) << "TCP connected to" << _tcpConfig->host() << ":" << _tcpConfig->port();
57-
emit connected();
58-
}, Qt::AutoConnection);
34+
_socket->setSocketOption(QAbstractSocket::TypeOfServiceOption, TYPE_OF_SERVICE);
5935

60-
(void) QObject::connect(_socket, &QTcpSocket::disconnected, this, [this]() {
61-
_isConnected = false;
62-
qCDebug(TCPLinkLog) << "TCP disconnected from" << _tcpConfig->host() << ":" << _tcpConfig->port();
63-
emit disconnected();
64-
// TODO: Uncomment after threading changes
65-
// _attemptReconnection();
66-
}, Qt::AutoConnection);
67-
68-
(void) QObject::connect(_socket, &QTcpSocket::readyRead, this, &TCPLink::_readBytes, Qt::AutoConnection);
69-
70-
(void) QObject::connect(_socket, &QTcpSocket::errorOccurred, this, [this](QTcpSocket::SocketError error) {
71-
qCWarning(TCPLinkLog) << "TCP Link Error:" << error << _socket->errorString();
72-
emit communicationError(
73-
tr("TCP Link Error"),
74-
tr("Link %1: %2.").arg(_tcpConfig->name(), _socket->errorString())
75-
);
76-
}, Qt::AutoConnection);
36+
(void) connect(_socket, &QTcpSocket::connected, this, &TCPWorker::_onSocketConnected);
37+
(void) connect(_socket, &QTcpSocket::disconnected, this, &TCPWorker::_onSocketDisconnected);
38+
(void) connect(_socket, &QTcpSocket::readyRead, this, &TCPWorker::_onSocketReadyRead);
39+
(void) connect(_socket, &QTcpSocket::errorOccurred, this, &TCPWorker::_onSocketErrorOccurred);
7740

7841
#ifdef QT_DEBUG
7942
(void) QObject::connect(_socket, &QTcpSocket::stateChanged, this, [](QTcpSocket::SocketState state) {
@@ -86,132 +49,172 @@ TCPLink::TCPLink(SharedLinkConfigurationPtr &config, QObject *parent)
8649
#endif
8750
}
8851

89-
TCPLink::~TCPLink()
52+
TCPWorker::~TCPWorker()
9053
{
91-
if (_socket->isOpen()) {
92-
_socket->disconnectFromHost();
93-
if (_socket->state() != QAbstractSocket::UnconnectedState) {
94-
_socket->waitForDisconnected(CONNECT_TIMEOUT_MS);
95-
}
96-
}
97-
98-
_socket->deleteLater();
99-
100-
// qCDebug(TCPLinkLog) << Q_FUNC_INFO << this;
54+
disconnectFromHost();
10155
}
10256

103-
void TCPLink::disconnect()
57+
bool TCPWorker::isConnected() const
10458
{
105-
if (isConnected()) {
106-
_socket->disconnectFromHost();
107-
} else {
108-
emit disconnected();
109-
}
59+
return (_socket->isOpen() && (_socket->state() == QAbstractSocket::ConnectedState));
11060
}
11161

112-
bool TCPLink::_connect()
62+
void TCPWorker::connectToHost()
11363
{
11464
if (isConnected()) {
115-
qCWarning(TCPLinkLog) << "Already connected to" << _tcpConfig->host() << ":" << _tcpConfig->port();
116-
return true;
65+
qCWarning(TCPLinkLog) << "Already connected to" << _host << ":" << _port;
66+
return;
11767
}
11868

11969
QSignalSpy errorSpy(_socket, &QTcpSocket::errorOccurred);
12070

121-
qCDebug(TCPLinkLog) << "Attempting to connect to host:" << _tcpConfig->host() << "port:" << _tcpConfig->port();
122-
_socket->connectToHost(_tcpConfig->host(), _tcpConfig->port());
71+
qCDebug(TCPLinkLog) << "Attempting to connect to host:" << _host << "port:" << _port;
72+
_socket->connectToHost(_host, _port);
12373

124-
// TODO: Switch Blocking to Signals after Threading Changes
12574
if (!_socket->waitForConnected(CONNECT_TIMEOUT_MS)) {
126-
qCWarning(TCPLinkLog) << "Connection to" << _tcpConfig->host() << ":" << _tcpConfig->port() << "failed:" << _socket->errorString();
75+
qCWarning(TCPLinkLog) << "Connection to" << _host << ":" << _port << "failed:" << _socket->errorString();
12776
if (errorSpy.count() == 0) {
128-
emit communicationError(
129-
tr("TCP Link Connect Error"),
130-
tr("Link %1: %2.").arg(_tcpConfig->name(), tr("Connection Failed: %1").arg(_socket->errorString()))
131-
);
77+
emit errorOccurred(tr("Connection Failed: %1").arg(_socket->errorString()));
13278
}
133-
return false;
79+
_onSocketDisconnected();
13480
}
13581

136-
qCDebug(TCPLinkLog) << "Successfully connected to" << _tcpConfig->host() << ":" << _tcpConfig->port();
137-
return true;
82+
qCDebug(TCPLinkLog) << "Successfully connected to" << _host.toString() << ":" << _port;
13883
}
13984

140-
void TCPLink::_writeBytes(const QByteArray &bytes)
85+
void TCPWorker::disconnectFromHost()
14186
{
142-
if (!_socket->isValid()) {
143-
return;
87+
if (isConnected()) {
88+
_socket->disconnectFromHost();
14489
}
90+
}
14591

146-
static const QString title = tr("TCP Link Write Error");
92+
void TCPWorker::writeData(const QByteArray &data)
93+
{
94+
if (isConnected()) {
95+
qint64 totalBytesWritten = 0;
96+
while (totalBytesWritten < data.size()) {
97+
const qint64 bytesWritten = _socket->write(data.constData() + totalBytesWritten, data.size() - totalBytesWritten);
98+
if (bytesWritten <= 0) {
99+
break;
100+
}
101+
102+
totalBytesWritten += bytesWritten;
103+
}
147104

148-
if (!isConnected()) {
149-
emit communicationError(
150-
title,
151-
tr("Link %1: Could Not Send Data - Link is Disconnected!").arg(_tcpConfig->name())
152-
);
153-
return;
105+
if (totalBytesWritten < 0) {
106+
emit errorOccurred(tr("Could Not Send Data - Write Failed: %1").arg(_socket->errorString()));
107+
}
108+
} else {
109+
emit errorOccurred(tr("Socket is not connected"));
154110
}
111+
}
155112

156-
const qint64 bytesWritten = _socket->write(bytes);
157-
if (bytesWritten < 0) {
158-
emit communicationError(
159-
title,
160-
tr("Link %1: Could Not Send Data - Write Failed: %2").arg(_tcpConfig->name(), _socket->errorString())
161-
);
162-
return;
163-
}
113+
void TCPWorker::_onSocketConnected()
114+
{
115+
emit connected();
116+
}
164117

165-
if (bytesWritten < bytes.size()) {
166-
qCWarning(TCPLinkLog) << "Wrote" << bytesWritten << "Out of" << bytes.size() << "total bytes";
167-
const QByteArray remainingBytes = bytes.mid(bytesWritten);
168-
writeBytesThreadSafe(remainingBytes.constData(), remainingBytes.size());
169-
}
118+
void TCPWorker::_onSocketDisconnected()
119+
{
120+
emit disconnected();
121+
}
170122

171-
emit bytesSent(this, bytes);
123+
void TCPWorker::_onSocketReadyRead()
124+
{
125+
const QByteArray data = _socket->readAll();
126+
emit dataReceived(data);
172127
}
173128

174-
void TCPLink::_readBytes()
129+
void TCPWorker::_onSocketErrorOccurred(QAbstractSocket::SocketError socketError)
175130
{
176-
if (!_socket->isValid()) {
177-
return;
178-
}
131+
Q_UNUSED(socketError);
132+
emit errorOccurred(_socket->errorString());
133+
}
179134

180-
if (!isConnected()) {
135+
/*===========================================================================*/
136+
137+
TCPLink::TCPLink(SharedLinkConfigurationPtr &config, QObject *parent)
138+
: LinkInterface(config, parent)
139+
, _tcpConfig(qobject_cast<const TCPConfiguration*>(config.get()))
140+
, _worker(nullptr)
141+
, _workerThread(new QThread(this))
142+
{
143+
Q_CHECK_PTR(_tcpConfig);
144+
if (!_tcpConfig) {
145+
qCWarning(TCPLinkLog) << "Invalid TCPConfiguration provided.";
181146
emit communicationError(
182-
tr("TCP Link Read Error"),
183-
tr("Link %1: Could Not Read Data - Link is Disconnected!").arg(_tcpConfig->name())
147+
tr("Configuration Error"),
148+
tr("Link %1: Invalid TCP configuration.").arg(config->name())
184149
);
185150
return;
186151
}
187152

188-
const QByteArray buffer = _socket->readAll();
153+
_worker = new TCPWorker(_tcpConfig->host(), _tcpConfig->port());
189154

190-
if (buffer.isEmpty()) {
191-
emit communicationError(
192-
tr("TCP Link Read Error"),
193-
tr("Link %1: Could Not Read Data - No Data Available!").arg(_tcpConfig->name())
194-
);
195-
return;
196-
}
155+
_worker->moveToThread(_workerThread);
156+
157+
(void) connect(_workerThread, &QThread::finished, _worker, &QObject::deleteLater);
158+
159+
(void) connect(_worker, &TCPWorker::connected, this, &TCPLink::_onConnected);
160+
(void) connect(_worker, &TCPWorker::disconnected, this, &TCPLink::_onDisconnected);
161+
(void) connect(_worker, &TCPWorker::errorOccurred, this, &TCPLink::_onErrorOccurred);
162+
(void) connect(_worker, &TCPWorker::dataReceived, this, &TCPLink::_onDataReceived);
163+
164+
#ifdef QT_DEBUG
165+
_workerThread->setObjectName(QString("TCP_%1").arg(config->name()));
166+
#endif
167+
168+
_workerThread->start();
169+
}
170+
171+
TCPLink::~TCPLink()
172+
{
173+
disconnect();
197174

198-
emit bytesReceived(this, buffer);
175+
_workerThread->quit();
176+
_workerThread->wait();
199177
}
200178

201-
void TCPLink::_attemptReconnection()
179+
bool TCPLink::isConnected() const
202180
{
203-
if (_reconnectionAttempts >= MAX_RECONNECTION_ATTEMPTS) {
204-
qCWarning(TCPLinkLog) << "Max reconnection attempts reached for" << _tcpConfig->host() << ":" << _tcpConfig->port();
205-
emit communicationError(tr("TCP Link Reconnect Error"), tr("Link %1: Maximum reconnection attempts reached.").arg(_tcpConfig->name()));
206-
return;
207-
}
181+
return (_worker && _worker->isConnected());
182+
}
183+
184+
bool TCPLink::_connect()
185+
{
186+
return QMetaObject::invokeMethod(_worker, "connectToHost", Qt::QueuedConnection);
187+
}
188+
189+
void TCPLink::disconnect()
190+
{
191+
(void) QMetaObject::invokeMethod(_worker, "disconnectFromHost", Qt::QueuedConnection);
192+
}
193+
194+
void TCPLink::_onConnected()
195+
{
196+
emit connected();
197+
}
198+
199+
void TCPLink::_onDisconnected()
200+
{
201+
emit disconnected();
202+
}
208203

209-
_reconnectionAttempts++;
210-
const int delay = qPow(2, _reconnectionAttempts) * 1000; // Exponential backoff
211-
qCDebug(TCPLinkLog) << "Attempting reconnection #" << _reconnectionAttempts << "in" << delay << "ms";
212-
QTimer::singleShot(delay, this, [this]() {
213-
_connect();
214-
});
204+
void TCPLink::_onErrorOccurred(const QString &errorString)
205+
{
206+
emit communicationError(tr("TCP Link Error"), tr("Link %1: %2").arg(_tcpConfig->name(), errorString));
207+
}
208+
209+
void TCPLink::_onDataReceived(const QByteArray &data)
210+
{
211+
// Process data or emit signal
212+
emit bytesReceived(this, data);
213+
}
214+
215+
void TCPLink::_writeBytes(const QByteArray& bytes)
216+
{
217+
(void) QMetaObject::invokeMethod(_worker, "writeData", Qt::QueuedConnection, Q_ARG(QByteArray, bytes));
215218
}
216219

217220
bool TCPLink::isSecureConnection()

0 commit comments

Comments
 (0)