Skip to content

Commit caa1ddf

Browse files
committed
Merge remote-tracking branch 'origin/candidate-9.0.x' into candidate-9.2.x
Signed-off-by: Gavin Halliday <[email protected]> # Conflicts: # helm/hpcc/Chart.yaml # helm/hpcc/templates/_helpers.tpl # helm/hpcc/templates/dafilesrv.yaml # helm/hpcc/templates/dali.yaml # helm/hpcc/templates/dfuserver.yaml # helm/hpcc/templates/eclagent.yaml # helm/hpcc/templates/eclccserver.yaml # helm/hpcc/templates/eclscheduler.yaml # helm/hpcc/templates/esp.yaml # helm/hpcc/templates/localroxie.yaml # helm/hpcc/templates/roxie.yaml # helm/hpcc/templates/sasha.yaml # helm/hpcc/templates/thor.yaml # version.cmake
2 parents d10611a + 7a2d946 commit caa1ddf

File tree

2 files changed

+34
-9
lines changed

2 files changed

+34
-9
lines changed

roxie/udplib/udptrr.cpp

+30-6
Original file line numberDiff line numberDiff line change
@@ -1303,7 +1303,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
13031303
Thread::start();
13041304
started.wait();
13051305
}
1306-
1306+
13071307
~receive_data()
13081308
{
13091309
DBGLOG("Total data packets seen = %u OOO(%u) Requests(%u) Permits(%u)", dataPacketsReceived.load(), packetsOOO.load(), flowRequestsReceived.load(), flowRequestsSent.load());
@@ -1336,20 +1336,44 @@ class CReceiveManager : implements IReceiveManager, public CInterface
13361336
try
13371337
{
13381338
unsigned int res;
1339+
UdpPacketHeader &hdr = *(UdpPacketHeader *) b->data;
13391340
while (true)
13401341
{
1341-
receive_socket->readtms(b->data, 1, DATA_PAYLOAD, res, timeout);
1342-
if (res!=sizeof(UdpRequestToSendMsg))
1343-
break;
1342+
//Read at least the size of the smallest packet we can receive
1343+
//static assert to check we are reading the smaller of the two possible packet types
1344+
static_assert(sizeof(UdpRequestToSendMsg) <= sizeof(UdpPacketHeader));
1345+
receive_socket->readtms(b->data, sizeof(UdpRequestToSendMsg), DATA_PAYLOAD, res, timeout);
1346+
1347+
//Even if a UDP packet is not split, very occasionally only some of the data may be present for the read.
1348+
//Slightly horribly this packet could be one of two different formats(!)
1349+
// a UdpRequestToSendMsg, which has a 2 byte command at the start of the header, with a maximum value of max_flow_cmd
1350+
// a UdpPacketHeader which has a 2 byte length. This length must be > sizeof(UdpPacketHeader).
1351+
//Since max_flow_cmd < sizeof(UdpPacketHeader) this can be used to distinguish a true data packet(!)
1352+
static_assert(flowType::max_flow_cmd < sizeof(UdpPacketHeader)); // assert to check the above comment is correct
1353+
1354+
if (hdr.length >= sizeof(UdpPacketHeader))
1355+
{
1356+
if (res == hdr.length)
1357+
break;
1358+
1359+
//Very rare situation - log it so that there is some evidence that it is occurring
1360+
OWARNLOG("Received partial network packet - %u bytes out of %u received", res, hdr.length);
1361+
1362+
//Because we are reading UDP datgrams rather than tcp packets, if we failed to read the whole datagram
1363+
//the rest of the datgram is lost - you cannot call readtms to read the rest of the datagram.
1364+
//Therefore throw this incomplete datagram away and allow the resend mechanism to retransmit it.
1365+
continue;
1366+
}
1367+
1368+
//Sanity check
1369+
assertex(res == sizeof(UdpRequestToSendMsg));
13441370

13451371
//Sending flow packets (eg send_completed) to the data thread ensures they do not overtake the data
13461372
//Redirect them to the flow thread to process them.
13471373
selfFlowSocket->write(b->data, res);
13481374
}
13491375

13501376
dataPacketsReceived++;
1351-
UdpPacketHeader &hdr = *(UdpPacketHeader *) b->data;
1352-
assert(hdr.length == res && hdr.length > sizeof(hdr));
13531377
UdpSenderEntry *sender = &parent.sendersTable[hdr.node];
13541378
if (sender->noteSeen(hdr))
13551379
{

roxie/udplib/udptrs.cpp

+4-3
Original file line numberDiff line numberDiff line change
@@ -572,9 +572,10 @@ class UdpReceiverEntry : public IUdpReceiverEntry
572572
const char *data = buffer->data + sizeof(UdpPacketHeader);
573573
const MemoryAttr &udpkey = getSecretUdpKey(true);
574574
aesEncrypt(udpkey.get(), udpkey.length(), data, length, encryptBuffer);
575-
header->length = encryptBuffer.length();
576-
encryptBuffer.writeDirect(0, sizeof(UdpPacketHeader), header); // Only really need length updating
577-
assert(length <= DATA_PAYLOAD);
575+
UdpPacketHeader newHeader;
576+
newHeader.length = encryptBuffer.length();
577+
encryptBuffer.writeDirect(offsetof(UdpPacketHeader, length), sizeof(newHeader.length), &newHeader.length); // Only need to update the length - rest is the same
578+
assertex(encryptBuffer.length() <= DATA_PAYLOAD);
578579
if (udpTraceLevel > 5)
579580
DBGLOG("ENCRYPT: Writing %u bytes to data socket", encryptBuffer.length());
580581
data_socket->write(encryptBuffer.toByteArray(), encryptBuffer.length());

0 commit comments

Comments
 (0)