Skip to content

Commit

Permalink
net: Use mockable time for tx download
Browse files Browse the repository at this point in the history
  • Loading branch information
codablock authored and Duddino committed Feb 14, 2025
1 parent 8d68f12 commit 33f6090
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 50 deletions.
153 changes: 104 additions & 49 deletions src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,14 @@ static constexpr int32_t MAX_PEER_TX_IN_FLIGHT = 100;
/** Maximum number of announced transactions from a peer */
static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ;
/** How many microseconds to delay requesting transactions from inbound peers */
static constexpr int64_t INBOUND_PEER_TX_DELAY = 2 * 1000000;
static constexpr std::chrono::microseconds INBOUND_PEER_TX_DELAY{std::chrono::seconds{2}};
/** How long to wait (in microseconds) before downloading a transaction from an additional peer */
static constexpr int64_t GETDATA_TX_INTERVAL = 60 * 1000000;
static constexpr std::chrono::microseconds GETDATA_TX_INTERVAL{std::chrono::seconds{60}};
/** Maximum delay (in microseconds) for transaction requests to avoid biasing some peers over others. */
/** How long to wait (expiry * factor microseconds) before expiring an in-flight getdata request to a peer */
static constexpr int64_t TX_EXPIRY_INTERVAL_FACTOR = 10;
/** Maximum delay (in microseconds) for transaction requests to avoid biasing some peers over others. */
static constexpr int64_t MAX_GETDATA_RANDOM_DELAY = 2 * 1000000;
static constexpr std::chrono::microseconds MAX_GETDATA_RANDOM_DELAY{std::chrono::seconds{2}};
static_assert(INBOUND_PEER_TX_DELAY >= MAX_GETDATA_RANDOM_DELAY,
"To preserve security, MAX_GETDATA_RANDOM_DELAY should not exceed INBOUND_PEER_DELAY");
/** Limit to avoid sending big packets. Not used in processing incoming GETDATA for compatibility */
Expand Down Expand Up @@ -298,13 +299,16 @@ struct CNodeState {
/* Track when to attempt download of announced transactions (process
* time in micros -> txid)
*/
std::multimap<int64_t, CInv> m_tx_process_time;
std::multimap<std::chrono::microseconds, CInv> m_tx_process_time;

//! Store all the transactions a peer has recently announced
std::set<CInv> m_tx_announced;

//! Store transactions which were requested by us
std::set<CInv> m_tx_in_flight;
//! Store transactions which were requested by us, with timestamp
std::map<CInv, std::chrono::microseconds> m_tx_in_flight;

//! Periodically check for stuck getdata requests
std::chrono::microseconds m_check_expiry_timer{0};
};

TxDownloadState m_tx_download;
Expand All @@ -325,8 +329,8 @@ struct CNodeState {
};

// Keeps track of the time (in microseconds) when transactions were requested last time
limitedmap<uint256, int64_t> g_already_asked_for(MAX_INV_SZ);
limitedmap<uint256, int64_t> g_erased_object_requests(MAX_INV_SZ);
limitedmap<uint256, std::chrono::microseconds> g_already_asked_for(MAX_INV_SZ);
limitedmap<uint256, std::chrono::microseconds> g_erased_object_requests(MAX_INV_SZ);

/** Map maintaining per-node state. Requires cs_main. */
std::map<NodeId, CNodeState> mapNodeState;
Expand Down Expand Up @@ -544,7 +548,7 @@ void EraseObjectRequest(CNodeState* nodestate, const CInv& inv) EXCLUSIVE_LOCKS_
AssertLockHeld(cs_main);
LogPrint(BCLog::NET, "%s -- inv=(%s)\n", __func__, inv.ToString());
g_already_asked_for.erase(inv.hash);
g_erased_object_requests.insert(std::make_pair(inv.hash, GetTimeMillis()));
g_erased_object_requests.insert(std::make_pair(inv.hash, GetTime<std::chrono::milliseconds>()));

if (nodestate) {
nodestate->m_tx_download.m_tx_announced.erase(inv);
Expand All @@ -562,17 +566,17 @@ void EraseObjectRequest(NodeId nodeId, const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED
EraseObjectRequest(state, inv);
}

int64_t GetObjectRequestTime(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
std::chrono::microseconds GetObjectRequestTime(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
{
AssertLockHeld(cs_main);
auto it = g_already_asked_for.find(hash);
if (it != g_already_asked_for.end()) {
return it->second;
}
return 0;
return {};
}

void UpdateObjectRequestTime(const uint256& hash, int64_t request_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
void UpdateObjectRequestTime(const uint256& hash, std::chrono::microseconds request_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
{
auto it = g_already_asked_for.find(hash);

Expand All @@ -583,40 +587,39 @@ void UpdateObjectRequestTime(const uint256& hash, int64_t request_time) EXCLUSIV
}
}


int64_t GetObjectInterval(int invType)
std::chrono::microseconds GetObjectInterval(int invType)
{
// some messages need to be re-requested faster when the first announcing peer did not answer to GETDATA
switch (invType) {
case MSG_QUORUM_RECOVERED_SIG:
return 15 * 1000000;
return std::chrono::seconds(15);
case MSG_CLSIG:
return 5 * 1000000;
return std::chrono::seconds(5);
default:
return GETDATA_TX_INTERVAL;
}
}

int64_t GetObjectExpiryInterval(int invType)
std::chrono::microseconds GetObjectExpiryInterval(int invType)
{
return GetObjectInterval(invType) * TX_EXPIRY_INTERVAL_FACTOR;
}

int64_t GetObjectRandomDelay(int invType)
std::chrono::microseconds GetObjectRandomDelay(int invType)
{
if (invType == MSG_TX) {
return GetRand(MAX_GETDATA_RANDOM_DELAY);
return GetRandMicros(MAX_GETDATA_RANDOM_DELAY);
}
return 0;
return {};
}

int64_t CalculateObjectGetDataTime(const CInv& inv, int64_t current_time, bool use_inbound_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
std::chrono::microseconds CalculateObjectGetDataTime(const CInv& inv, std::chrono::microseconds current_time, bool use_inbound_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
{
AssertLockHeld(cs_main);
int64_t process_time;
int64_t last_request_time = GetObjectRequestTime(inv.hash);
std::chrono::microseconds process_time;
const auto last_request_time = GetObjectRequestTime(inv.hash);
// First time requesting this tx
if (last_request_time == 0) {
if (last_request_time.count() == 0) {
process_time = current_time;
} else {
// Randomize the delay to avoid biasing some peers over others (such as due to
Expand All @@ -629,7 +632,7 @@ int64_t CalculateObjectGetDataTime(const CInv& inv, int64_t current_time, bool u
return process_time;
}

void RequestObject(CNodeState* state, const CInv& inv, int64_t nNow) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
void RequestObject(CNodeState* state, const CInv& inv, std::chrono::microseconds current_time, bool fForce = false) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
{
CNodeState::TxDownloadState& peer_download_state = state->m_tx_download;
if (peer_download_state.m_tx_announced.size() >= MAX_PEER_TX_ANNOUNCEMENTS || peer_download_state.m_tx_announced.count(inv)) {
Expand All @@ -640,24 +643,48 @@ void RequestObject(CNodeState* state, const CInv& inv, int64_t nNow) EXCLUSIVE_L
peer_download_state.m_tx_announced.insert(inv);
// Calculate the time to try requesting this transaction. Use
// fPreferredDownload as a proxy for outbound peers.
int64_t process_time = CalculateObjectGetDataTime(inv, nNow, !state->fPreferredDownload);
std::chrono::microseconds process_time = CalculateObjectGetDataTime(inv, current_time, !state->fPreferredDownload);

peer_download_state.m_tx_process_time.emplace(process_time, inv);
if (fForce) {
// make sure this object is actually requested ASAP
g_erased_object_requests.erase(inv.hash);
g_already_asked_for.erase(inv.hash);
}

LogPrint(BCLog::NET, "%s -- inv=(%s), nNow=%d, process_time=%d, delta=%d\n", __func__, inv.ToString(), nNow, process_time, process_time - nNow);
LogPrint(BCLog::NET, "%s -- inv=(%s), current_time=%d, process_time=%d, delta=%d\n", __func__, inv.ToString(), current_time.count(), process_time.count(), (process_time - current_time).count());
}

void RequestObject(NodeId nodeId, const CInv& inv, int64_t nNow) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
void RequestObject(NodeId nodeId, const CInv& inv, std::chrono::microseconds current_time, bool fForce) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
{
AssertLockHeld(cs_main);
auto* state = State(nodeId);
if (!state) {
return;
}
RequestObject(state, inv, nNow);

RequestObject(state, inv, current_time, fForce);
}

void PeerLogicValidation::InitializeNode(CNode *pnode) {
size_t GetRequestedObjectCount(NodeId nodeId)
{
AssertLockHeld(cs_main);
auto* state = State(nodeId);
if (!state) {
return 0;
}
return state->m_tx_download.m_tx_process_time.size();
}

// Returns true for outbound peers, excluding manual connections, feelers, and
// one-shots
bool IsOutboundDisconnectionCandidate(const CNode* node)
{
return !(node->fInbound || node->fFeeler || node->fOneShot);
}

void PeerLogicValidation::InitializeNode(CNode* pnode)
{
CAddress addr = pnode->addr;
std::string addrName = pnode->GetAddrName();
NodeId nodeid = pnode->GetId();
Expand Down Expand Up @@ -1788,7 +1815,7 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR
}

LOCK(cs_main);
int64_t nNow = GetTimeMicros();
const auto current_time = GetTime<std::chrono::microseconds>();

std::vector<CInv> vToFetch;

Expand Down Expand Up @@ -1836,8 +1863,8 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR
// wait until we are sync
if (!fAlreadyHave) {
bool allowWhileInIBD = allowWhileInIBDObjs.count(inv.type);
if (allowWhileInIBD || !IsInitialBlockDownload()) {
RequestObject(State(pfrom->GetId()), inv, nNow);
if (allowWhileInIBD || (!fImporting && !fReindex && !IsInitialBlockDownload())) {
RequestObject(State(pfrom->GetId()), inv, current_time);
}
}
}
Expand Down Expand Up @@ -1869,7 +1896,6 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR


else if (strCommand == NetMsgType::GETBLOCKS || strCommand == NetMsgType::GETHEADERS) {

// Don't relay blocks inv to masternode-only connections
if (!pfrom->CanRelay()) {
LogPrint(BCLog::NET, "getblocks, don't relay blocks inv to masternode connection. peer=%d\n", pfrom->GetId());
Expand Down Expand Up @@ -2062,11 +2088,16 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR
}
}
if (!fRejectedParents) {
int64_t nNow = GetTimeMicros();
for (const uint256& parent_txid : unique_parents) {
CInv _inv(MSG_TX, parent_txid);
const auto current_time = GetTime<std::chrono::microseconds>();

for (const CTxIn& txin : tx.vin) {
CInv _inv(MSG_TX, txin.prevout.hash);
pfrom->AddInventoryKnown(_inv);
if (!AlreadyHave(_inv)) RequestObject(State(pfrom->GetId()), _inv, nNow);
if (!AlreadyHave(_inv)) RequestObject(State(pfrom->GetId()), _inv, current_time);
// We don't know if the previous tx was a regular or a mixing one, try both
CInv _inv2(MSG_DSTX, txin.prevout.hash);
pfrom->AddInventoryKnown(_inv2);
if (!AlreadyHave(_inv2)) RequestObject(State(pfrom->GetId()), _inv2, current_time);
}
AddOrphanTx(ptx, pfrom->GetId());

Expand All @@ -2076,7 +2107,7 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR
if (nEvicted > 0)
LogPrint(BCLog::MEMPOOL, "mapOrphan overflow, removed %u tx\n", nEvicted);
} else {
LogPrint(BCLog::MEMPOOL, "not keeping orphan with rejected parents %s\n",tx.GetHash().ToString());
LogPrint(BCLog::MEMPOOL, "not keeping orphan with rejected parents %s\n", tx.GetHash().ToString());
}
} else {
// AcceptToMemoryPool() returned false, possibly because the tx is
Expand Down Expand Up @@ -2225,7 +2256,6 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR


else if (strCommand == NetMsgType::MEMPOOL) {

if (!(pfrom->GetLocalServices() & NODE_BLOOM) && !pfrom->fWhitelisted) {
LogPrint(BCLog::NET, "mempool request with bloom filters disabled, disconnect peer=%d\n", pfrom->GetId());
pfrom->fDisconnect = true;
Expand Down Expand Up @@ -2793,7 +2823,9 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic<bool>& interruptM

// Detect whether we're stalling
current_time = GetTime<std::chrono::microseconds>();
nNow = GetTimeMicros();
// nNow is the current system time (GetTimeMicros is not mockable) and
// should be replaced by the mockable current_time eventually

if (state.nStallingSince && state.nStallingSince < nNow - 1000000 * BLOCK_STALLING_TIMEOUT) {
// Stalling only triggers when the block download window cannot move. During normal steady state,
// the download window should be much larger than the to-be-downloaded set of blocks, so disconnection
Expand Down Expand Up @@ -2838,9 +2870,31 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic<bool>& interruptM
//
// Message: getdata (non-blocks)
//

// For robustness, expire old requests after a long timeout, so that
// we can resume downloading transactions from a peer even if they
// were unresponsive in the past.
// Eventually we should consider disconnecting peers, but this is
// conservative.
if (state.m_tx_download.m_check_expiry_timer <= current_time) {
for (auto it=state.m_tx_download.m_tx_in_flight.begin(); it != state.m_tx_download.m_tx_in_flight.end();) {
if (it->second <= current_time - GetObjectExpiryInterval(it->first.type)) {
LogPrint(BCLog::NET, "timeout of inflight tx %s from peer=%d\n", it->first.ToString(), pto->GetId());
state.m_tx_download.m_tx_announced.erase(it->first);
state.m_tx_download.m_tx_in_flight.erase(it++);
} else {
++it;
}
}
// On average, we do this check every TX_EXPIRY_INTERVAL. Randomize
// so that we're not doing this for all peers at the same time.
state.m_tx_download.m_check_expiry_timer = current_time + GetObjectExpiryInterval(MSG_TX) / 2 + GetRandMicros(GetObjectExpiryInterval(MSG_TX));
}

// DASH this code also handles non-TXs (Dash specific messages)
auto& tx_process_time = state.m_tx_download.m_tx_process_time;
while (!tx_process_time.empty() && tx_process_time.begin()->first <= nNow && state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) {
const auto& inv = tx_process_time.begin()->second;
while (!tx_process_time.empty() && tx_process_time.begin()->first <= current_time && state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) {
const CInv inv = tx_process_time.begin()->second;
// Erase this entry from tx_process_time (it may be added back for
// processing at a later time, see below)
tx_process_time.erase(tx_process_time.begin());
Expand All @@ -2853,24 +2907,25 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic<bool>& interruptM
if (!AlreadyHave(inv)) {
// If this transaction was last requested more than 1 minute ago,
// then request.
int64_t last_request_time = GetObjectRequestTime(inv.hash);
if (last_request_time <= nNow - GetObjectExpiryInterval(inv.type)) {
const auto last_request_time = GetObjectRequestTime(inv.hash);
if (last_request_time <= current_time - GetObjectInterval(inv.type)) {
LogPrint(BCLog::NET, "Requesting %s peer=%d\n", inv.ToString(), pto->GetId());
vGetData.push_back(inv);
if (vGetData.size() >= MAX_GETDATA_SZ) {
connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData));
vGetData.clear();
}
UpdateObjectRequestTime(inv.hash, nNow);
state.m_tx_download.m_tx_in_flight.insert(inv);

UpdateObjectRequestTime(inv.hash, current_time);
state.m_tx_download.m_tx_in_flight.emplace(inv, current_time);
} else {
// This transaction is in flight from someone else; queue
// up processing to happen after the download times out
// (with a slight delay for inbound peers, to prefer
// requests to outbound peers).
int64_t next_process_time = CalculateObjectGetDataTime(inv, nNow, !state.fPreferredDownload);
const auto next_process_time = CalculateObjectGetDataTime(inv, current_time, !state.fPreferredDownload);
tx_process_time.emplace(next_process_time, inv);
LogPrint(BCLog::NET, "%s -- GETDATA re-queue inv=(%s), next_process_time=%d, delta=%d, peer=%d\n", __func__, inv.ToString(), next_process_time, next_process_time - nNow, pto->GetId());
LogPrint(BCLog::NET, "%s -- GETDATA re-queue inv=(%s), next_process_time=%d, delta=%d, peer=%d\n", __func__, inv.ToString(), next_process_time.count(), (next_process_time - current_time).count(), pto->GetId());
}
} else {
// We have already seen this transaction, no need to download.
Expand Down
6 changes: 6 additions & 0 deletions src/random.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -720,3 +720,9 @@ void RandomInit()

ReportHardwareRand();
}


std::chrono::microseconds GetRandMicros(std::chrono::microseconds duration_max) noexcept
{
return std::chrono::microseconds{GetRand(duration_max.count())};
}
5 changes: 4 additions & 1 deletion src/random.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
#include "crypto/common.h"
#include "uint256.h"

#include <stdint.h>
#include <chrono> // For std::chrono::microseconds
#include <limits>
#include <stdint.h>

/**
* Overall design of the RNG and entropy sources.
Expand Down Expand Up @@ -251,4 +252,6 @@ bool Random_SanityCheck();
*/
void RandomInit();

std::chrono::microseconds GetRandMicros(std::chrono::microseconds duration_max) noexcept;

#endif // PIVX_RANDOM_H

0 comments on commit 33f6090

Please sign in to comment.