Skip to content

Commit

Permalink
[core] Refactor CRcvQueue::storePkt(..) for better resource management.
Browse files Browse the repository at this point in the history
  • Loading branch information
maxsharabayko committed Aug 8, 2023
1 parent 256244f commit e4a7b0e
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 24 deletions.
2 changes: 1 addition & 1 deletion srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4817,7 +4817,7 @@ EConnectStatus srt::CUDT::postConnect(const CPacket* pResponse, bool rendezvous,
// XXX Problem around CONN_CONFUSED!
// If some too-eager packets were received from a listener
// that thinks it's connected, but his last handshake was missed,
// they are collected by CRcvQueue::storePkt. The removeConnector
// they are collected by CRcvQueue::storePktClone. The removeConnector
// function will want to delete them all, so it would be nice
// if these packets can be re-delivered. Of course the listener
// should be prepared to resend them (as every packet can be lost
Expand Down
8 changes: 3 additions & 5 deletions srtcore/packet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ void CPacket::deallocate()
if (m_data_owned)
delete[](char*) m_PacketVector[PV_DATA].data();
m_PacketVector[PV_DATA].set(NULL, 0);
m_data_owned = false;
}

char* CPacket::release()
Expand All @@ -241,8 +242,7 @@ CPacket::~CPacket()
{
// PV_HEADER is always owned, PV_DATA may use a "borrowed" buffer.
// Delete the internal buffer only if it was declared as owned.
if (m_data_owned)
delete[](char*) m_PacketVector[PV_DATA].data();
deallocate();
}

size_t CPacket::getLength() const
Expand Down Expand Up @@ -561,10 +561,8 @@ CPacket* CPacket::clone() const
{
CPacket* pkt = new CPacket;
memcpy((pkt->m_nHeader), m_nHeader, HDR_SIZE);
pkt->m_pcData = new char[m_PacketVector[PV_DATA].size()];
pkt->allocate(m_PacketVector[PV_DATA].size());
memcpy((pkt->m_pcData), m_pcData, m_PacketVector[PV_DATA].size());
pkt->m_PacketVector[PV_DATA].setLength(m_PacketVector[PV_DATA].size());

pkt->m_DestAddr = m_DestAddr;

return pkt;
Expand Down
25 changes: 8 additions & 17 deletions srtcore/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1162,7 +1162,6 @@ srt::CRcvQueue::~CRcvQueue()
while (!i->second.empty())
{
CPacket* pkt = i->second.front();
delete[] pkt->m_pcData;
delete pkt;
i->second.pop();
}
Expand Down Expand Up @@ -1365,14 +1364,12 @@ srt::EReadStatus srt::CRcvQueue::worker_RetrieveUnit(int32_t& w_id, CUnit*& w_un
{
// no space, skip this packet
CPacket temp;
temp.m_pcData = new char[m_szPayloadSize];
temp.setLength(m_szPayloadSize);
temp.allocate(m_szPayloadSize);
THREAD_PAUSED();
EReadStatus rst = m_pChannel->recvfrom((w_addr), (temp));
THREAD_RESUMED();
// Note: this will print nothing about the packet details unless heavy logging is on.
LOGC(qrlog.Error, log << CONID() << "LOCAL STORAGE DEPLETED. Dropping 1 packet: " << temp.Info());
delete[] temp.m_pcData;

// Be transparent for RST_ERROR, but ignore the correct
// data read and fake that the packet was dropped.
Expand Down Expand Up @@ -1541,7 +1538,7 @@ srt::EConnectStatus srt::CRcvQueue::worker_TryAsyncRend_OrStore(int32_t id, CUni
if (cst == CONN_CONFUSED)
{
LOGC(cnlog.Warn, log << "AsyncOrRND: PACKET NOT HANDSHAKE - re-requesting handshake from peer");
storePkt(id, unit->m_Packet.clone());
storePktClone(id, unit->m_Packet);
if (!u->processAsyncConnectRequest(RST_AGAIN, CONN_CONTINUE, &unit->m_Packet, u->m_PeerAddr))
{
// Reuse previous behavior to reject a packet
Expand Down Expand Up @@ -1616,7 +1613,7 @@ srt::EConnectStatus srt::CRcvQueue::worker_TryAsyncRend_OrStore(int32_t id, CUni
log << "AsyncOrRND: packet RESOLVED TO ID=" << id << " -- continuing through CENTRAL PACKET QUEUE");
// This is where also the packets for rendezvous connection will be landing,
// in case of a synchronous connection.
storePkt(id, unit->m_Packet.clone());
storePktClone(id, unit->m_Packet);

return CONN_CONTINUE;
}
Expand Down Expand Up @@ -1680,7 +1677,6 @@ int srt::CRcvQueue::recvfrom(int32_t id, CPacket& w_packet)
w_packet.setLength(newpkt->getLength());
w_packet.m_DestAddr = newpkt->m_DestAddr;

delete[] newpkt->m_pcData;
delete newpkt;

// remove this message from queue,
Expand Down Expand Up @@ -1735,7 +1731,6 @@ void srt::CRcvQueue::removeConnector(const SRTSOCKET& id)
log << "removeConnector: ... and its packet queue with " << i->second.size() << " packets collected");
while (!i->second.empty())
{
delete[] i->second.front()->m_pcData;
delete i->second.front();
i->second.pop();
}
Expand Down Expand Up @@ -1768,34 +1763,30 @@ srt::CUDT* srt::CRcvQueue::getNewEntry()
return u;
}

void srt::CRcvQueue::storePkt(int32_t id, CPacket* pkt)
void srt::CRcvQueue::storePktClone(int32_t id, const CPacket& pkt)
{
CUniqueSync passcond(m_BufferLock, m_BufferCond);

map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id);

if (i == m_mBuffer.end())
{
m_mBuffer[id].push(pkt);
m_mBuffer[id].push(pkt.clone());
passcond.notify_one();
}
else
{
// avoid storing too many packets, in case of malfunction or attack
// Avoid storing too many packets, in case of malfunction or attack.
if (i->second.size() > 16)
{
delete[] pkt->m_pcData;
delete pkt;
return;
}

i->second.push(pkt);
i->second.push(pkt.clone());
}
}

void srt::CMultiplexer::destroy()
{
// Reverse order of the assigned
// Reverse order of the assigned.
delete m_pRcvQueue;
delete m_pSndQueue;
delete m_pTimer;
Expand Down
2 changes: 1 addition & 1 deletion srtcore/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ class CRcvQueue
bool ifNewEntry();
CUDT* getNewEntry();

void storePkt(int32_t id, CPacket* pkt);
void storePktClone(int32_t id, const CPacket& pkt);

private:
sync::Mutex m_LSLock;
Expand Down

0 comments on commit e4a7b0e

Please sign in to comment.