Skip to content

Commit

Permalink
network: remove tier3_request_queue (#12353)
Browse files Browse the repository at this point in the history
Incoming state part requests are [throttled by the view client
actor](https://github.com/near/nearcore/blob/8e30ccdd425ecbbeeec8d96bfc9a7e02bc35c2d3/chain/client/src/view_client_actor.rs#L1346-L1349),
which drops excess requests to avoid overloading the resources of the
node. This PR removes the additional layer of queueing on the network
side of things. There is little benefit from queueing:

- State part requests are made at a regular rate over a long period of
time and distributed well across nodes; there is no need for serving
nodes to modulate spikes in demand
- If the steady flow of incoming requests is too high for the view
client's throttling rate, queueing will change _which_ requests are
served but does not increase _how many_ requests are served; there is no
benefit in that

and a major downside:

- Queueing makes response time less predictable, forcing the requesting
node to wait longer before deciding that a request was
dropped/failed/lost

The queue served a secondary purpose, which is that it allowed
PeerActors receiving state part requests to hand them off to the
PeerManagerActor. Instead we set up an adapter so that the requests can
be passed as actix messages.
  • Loading branch information
saketh-are authored Nov 8, 2024
1 parent 22259d0 commit 6ccbd62
Show file tree
Hide file tree
Showing 15 changed files with 138 additions and 93 deletions.
6 changes: 6 additions & 0 deletions chain/client/src/test_utils/peer_manager_mock.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use near_network::types::{
PeerManagerMessageRequest, PeerManagerMessageResponse, SetChainInfo, StateSyncEvent,
Tier3Request,
};

pub struct PeerManagerMock {
Expand Down Expand Up @@ -43,3 +44,8 @@ impl actix::Handler<StateSyncEvent> for PeerManagerMock {
type Result = ();
fn handle(&mut self, _msg: StateSyncEvent, _ctx: &mut Self::Context) {}
}

impl actix::Handler<Tier3Request> for PeerManagerMock {
type Result = ();
fn handle(&mut self, _msg: Tier3Request, _ctx: &mut Self::Context) {}
}
2 changes: 1 addition & 1 deletion chain/network/src/peer/peer_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,7 @@ impl PeerActor {
)]
async fn receive_routed_message(
clock: &time::Clock,
network_state: &NetworkState,
network_state: &Arc<NetworkState>,
peer_id: PeerId,
msg_hash: CryptoHash,
body: RoutedMessageBody,
Expand Down
9 changes: 9 additions & 0 deletions chain/network/src/peer/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::state_witness::{
use crate::store;
use crate::tcp;
use crate::testonly::actix::ActixSystem;
use crate::types::{PeerManagerSenderForNetworkInput, PeerManagerSenderForNetworkMessage};
use near_async::messaging::{IntoMultiSender, Sender};
use near_async::time;
use near_o11y::WithSpanContextExt;
Expand Down Expand Up @@ -47,6 +48,7 @@ pub(crate) enum Event {
Client(ClientSenderForNetworkInput),
Network(peer_manager_actor::Event),
PartialWitness(PartialWitnessSenderForNetworkInput),
PeerManager(PeerManagerSenderForNetworkInput),
}

pub(crate) struct PeerHandle {
Expand Down Expand Up @@ -117,6 +119,12 @@ impl PeerHandle {
send.send(Event::Client(event.into_input()));
}
});
let peer_manager_sender = Sender::from_fn({
let send = send.clone();
move |event: PeerManagerSenderForNetworkMessage| {
send.send(Event::PeerManager(event.into_input()));
}
});
let shards_manager_sender = Sender::from_fn({
let send = send.clone();
move |event| {
Expand All @@ -136,6 +144,7 @@ impl PeerHandle {
network_cfg.verify().unwrap(),
cfg.chain.genesis_id.clone(),
client_sender.break_apart().into_multi_sender(),
peer_manager_sender.break_apart().into_multi_sender(),
shards_manager_sender,
state_witness_sender.break_apart().into_multi_sender(),
vec![],
Expand Down
39 changes: 14 additions & 25 deletions chain/network/src/peer_manager/network_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ use crate::stats::metrics;
use crate::store;
use crate::tcp;
use crate::types::{
ChainInfo, PeerType, ReasonForBan, StatePartRequestBody, Tier3Request, Tier3RequestBody,
ChainInfo, PeerManagerSenderForNetwork, PeerType, ReasonForBan, StatePartRequestBody,
Tier3Request, Tier3RequestBody,
};
use anyhow::Context;
use arc_swap::ArcSwap;
Expand All @@ -42,7 +43,6 @@ use near_primitives::hash::CryptoHash;
use near_primitives::network::PeerId;
use near_primitives::types::AccountId;
use parking_lot::{Mutex, RwLock};
use std::collections::VecDeque;
use std::net::SocketAddr;
use std::num::NonZeroUsize;
use std::sync::atomic::AtomicUsize;
Expand All @@ -69,9 +69,6 @@ pub const PRUNE_EDGES_AFTER: time::Duration = time::Duration::minutes(30);
/// How long to wait between reconnection attempts to the same peer
pub(crate) const RECONNECT_ATTEMPT_INTERVAL: time::Duration = time::Duration::seconds(10);

/// Limit number of pending tier3 requests to avoid OOM.
pub(crate) const LIMIT_TIER3_REQUESTS: usize = 60;

impl WhitelistNode {
pub fn from_peer_info(pi: &PeerInfo) -> anyhow::Result<Self> {
Ok(Self {
Expand Down Expand Up @@ -110,6 +107,7 @@ pub(crate) struct NetworkState {
/// GenesisId of the chain.
pub genesis_id: GenesisId,
pub client: ClientSenderForNetwork,
pub peer_manager_adapter: PeerManagerSenderForNetwork,
pub shards_manager_adapter: Sender<ShardsManagerRequestFromNetwork>,
pub partial_witness_adapter: PartialWitnessSenderForNetwork,

Expand Down Expand Up @@ -153,9 +151,6 @@ pub(crate) struct NetworkState {
/// TODO(gprusak): consider removing it altogether.
pub tier1_route_back: Mutex<RouteBackCache>,

/// Queue of received requests to which a response should be made over TIER3.
pub tier3_requests: Mutex<VecDeque<Tier3Request>>,

/// Shared counter across all PeerActors, which counts number of `RoutedMessageBody::ForwardTx`
/// messages sincce last block.
pub txns_since_last_block: AtomicUsize,
Expand Down Expand Up @@ -185,6 +180,7 @@ impl NetworkState {
config: config::VerifiedConfig,
genesis_id: GenesisId,
client: ClientSenderForNetwork,
peer_manager_adapter: PeerManagerSenderForNetwork,
shards_manager_adapter: Sender<ShardsManagerRequestFromNetwork>,
partial_witness_adapter: PartialWitnessSenderForNetwork,
whitelist_nodes: Vec<WhitelistNode>,
Expand All @@ -202,6 +198,7 @@ impl NetworkState {
})),
genesis_id,
client,
peer_manager_adapter,
shards_manager_adapter,
partial_witness_adapter,
chain_info: Default::default(),
Expand All @@ -218,7 +215,6 @@ impl NetworkState {
account_announcements: Arc::new(AnnounceAccountCache::new(store)),
tier2_route_back: Mutex::new(RouteBackCache::default()),
tier1_route_back: Mutex::new(RouteBackCache::default()),
tier3_requests: Mutex::new(VecDeque::<Tier3Request>::new()),
recent_routed_messages: Mutex::new(lru::LruCache::new(
NonZeroUsize::new(RECENT_ROUTED_MESSAGES_CACHE_SIZE).unwrap(),
)),
Expand Down Expand Up @@ -696,7 +692,7 @@ impl NetworkState {
}

pub async fn receive_routed_message(
&self,
self: &Arc<Self>,
clock: &time::Clock,
peer_id: PeerId,
msg_hash: CryptoHash,
Expand Down Expand Up @@ -775,21 +771,14 @@ impl NetworkState {
None
}
RoutedMessageBody::StatePartRequest(request) => {
let mut queue = self.tier3_requests.lock();
if queue.len() < LIMIT_TIER3_REQUESTS {
queue.push_back(Tier3Request {
peer_info: PeerInfo {
id: peer_id,
addr: Some(request.addr),
account_id: None,
},
body: Tier3RequestBody::StatePart(StatePartRequestBody {
shard_id: request.shard_id,
sync_hash: request.sync_hash,
part_id: request.part_id,
}),
});
}
self.peer_manager_adapter.send(Tier3Request {
peer_info: PeerInfo { id: peer_id, addr: Some(request.addr), account_id: None },
body: Tier3RequestBody::StatePart(StatePartRequestBody {
shard_id: request.shard_id,
sync_hash: request.sync_hash,
part_id: request.part_id,
}),
});
None
}
RoutedMessageBody::ChunkContractAccesses(accesses) => {
Expand Down
123 changes: 63 additions & 60 deletions chain/network/src/peer_manager/peer_manager_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ use crate::store;
use crate::tcp;
use crate::types::{
ConnectedPeerInfo, HighestHeightPeerInfo, KnownProducer, NetworkInfo, NetworkRequests,
NetworkResponses, PeerInfo, PeerManagerMessageRequest, PeerManagerMessageResponse, PeerType,
SetChainInfo, SnapshotHostInfo, StatePartRequestBody, StateSyncEvent, Tier3RequestBody,
NetworkResponses, PeerInfo, PeerManagerMessageRequest, PeerManagerMessageResponse,
PeerManagerSenderForNetwork, PeerType, SetChainInfo, SnapshotHostInfo, StatePartRequestBody,
StateSyncEvent, Tier3Request, Tier3RequestBody,
};
use ::time::ext::InstantExt as _;
use actix::fut::future::wrap_future;
Expand Down Expand Up @@ -87,8 +88,6 @@ pub(crate) const UPDATE_CONNECTION_STORE_INTERVAL: time::Duration = time::Durati
/// How often to poll the NetworkState for closed connections we'd like to re-establish.
pub(crate) const POLL_CONNECTION_STORE_INTERVAL: time::Duration = time::Duration::minutes(1);

/// How often we check for and process pending Tier3 requests
const PROCESS_TIER3_REQUESTS_INTERVAL: time::Duration = time::Duration::seconds(1);
/// The length of time that a Tier3 connection is allowed to idle before it is stopped
const TIER3_IDLE_TIMEOUT: time::Duration = time::Duration::seconds(15);

Expand Down Expand Up @@ -215,6 +214,7 @@ impl PeerManagerActor {
store: Arc<dyn near_store::db::Database>,
config: config::NetworkConfig,
client: ClientSenderForNetwork,
peer_manager_adapter: PeerManagerSenderForNetwork,
shards_manager_adapter: Sender<ShardsManagerRequestFromNetwork>,
partial_witness_adapter: PartialWitnessSenderForNetwork,
genesis_id: GenesisId,
Expand Down Expand Up @@ -246,6 +246,7 @@ impl PeerManagerActor {
config,
genesis_id,
client,
peer_manager_adapter,
shards_manager_adapter,
partial_witness_adapter,
whitelist_nodes,
Expand Down Expand Up @@ -344,62 +345,6 @@ impl PeerManagerActor {
}
}
});
// Periodically process pending Tier3 requests.
arbiter.spawn({
let clock = clock.clone();
let state = state.clone();
let arbiter = arbiter.clone();
let mut interval = time::Interval::new(clock.now(), PROCESS_TIER3_REQUESTS_INTERVAL);
async move {
loop {
interval.tick(&clock).await;

if let Some(request) = state.tier3_requests.lock().pop_front() {
arbiter.spawn({
let clock = clock.clone();
let state = state.clone();
async move {
let tier3_response = match request.body {
Tier3RequestBody::StatePart(StatePartRequestBody { shard_id, sync_hash, part_id }) => {
match state.client.send_async(StateRequestPart { shard_id, sync_hash, part_id }).await {
Ok(Some(client_response)) => {
PeerMessage::VersionedStateResponse(*client_response.0)
}
Ok(None) => {
tracing::debug!(target: "network", "client declined to respond to {:?}", request);
return;
}
Err(err) => {
tracing::error!(target: "network", ?err, "client failed to respond to {:?}", request);
return;
}
}
}
};

if !state.tier3.load().ready.contains_key(&request.peer_info.id) {
let result = async {
let stream = tcp::Stream::connect(
&request.peer_info,
tcp::Tier::T3,
&state.config.socket_options
).await.context("tcp::Stream::connect()")?;
PeerActor::spawn_and_handshake(clock.clone(),stream,None,state.clone()).await.context("PeerActor::spawn()")?;
anyhow::Ok(())
}.await;

if let Err(ref err) = result {
tracing::info!(target: "network", err = format!("{:#}", err), "tier3 failed to connect to {}", request.peer_info);
}
}

state.tier3.send_message(request.peer_info.id, Arc::new(tier3_response));
}
});
}
}
}
});
}
});
Ok(Self::start_in_arbiter(&arbiter, move |_ctx| Self {
Expand Down Expand Up @@ -1317,6 +1262,64 @@ impl actix::Handler<WithSpanContext<StateSyncEvent>> for PeerManagerActor {
}
}

impl actix::Handler<WithSpanContext<Tier3Request>> for PeerManagerActor {
type Result = ();
#[perf]
fn handle(
&mut self,
request: WithSpanContext<Tier3Request>,
ctx: &mut Self::Context,
) -> Self::Result {
let (_span, request) = handler_debug_span!(target: "network", request);
let _timer = metrics::PEER_MANAGER_TIER3_REQUEST_TIME
.with_label_values(&[(&request.body).into()])
.start_timer();

let state = self.state.clone();
let clock = self.clock.clone();
ctx.spawn(wrap_future(
async move {
let tier3_response = match request.body {
Tier3RequestBody::StatePart(StatePartRequestBody { shard_id, sync_hash, part_id }) => {
match state.client.send_async(StateRequestPart { shard_id, sync_hash, part_id }).await {
Ok(Some(client_response)) => {
PeerMessage::VersionedStateResponse(*client_response.0)
}
Ok(None) => {
tracing::debug!(target: "network", "client declined to respond to {:?}", request);
return;
}
Err(err) => {
tracing::error!(target: "network", ?err, "client failed to respond to {:?}", request);
return;
}
}
}
};

// Establish a tier3 connection if we don't have one already
if !state.tier3.load().ready.contains_key(&request.peer_info.id) {
let result = async {
let stream = tcp::Stream::connect(
&request.peer_info,
tcp::Tier::T3,
&state.config.socket_options
).await.context("tcp::Stream::connect()")?;
PeerActor::spawn_and_handshake(clock.clone(),stream,None,state.clone()).await.context("PeerActor::spawn()")?;
anyhow::Ok(())
}.await;

if let Err(ref err) = result {
tracing::info!(target: "network", err = format!("{:#}", err), "tier3 failed to connect to {}", request.peer_info);
}
}

state.tier3.send_message(request.peer_info.id, Arc::new(tier3_response));
}
));
}
}

impl actix::Handler<GetDebugStatus> for PeerManagerActor {
type Result = DebugStatus;
#[perf]
Expand Down
10 changes: 9 additions & 1 deletion chain/network/src/peer_manager/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::test_utils;
use crate::testonly::actix::ActixSystem;
use crate::types::{
AccountKeys, ChainInfo, KnownPeerStatus, NetworkRequests, PeerManagerMessageRequest,
ReasonForBan,
PeerManagerSenderForNetworkInput, PeerManagerSenderForNetworkMessage, ReasonForBan,
};
use crate::PeerManagerActor;
use futures::FutureExt;
Expand Down Expand Up @@ -74,6 +74,7 @@ pub enum Event {
ShardsManager(ShardsManagerRequestFromNetwork),
Client(ClientSenderForNetworkInput),
PeerManager(PME),
PeerManagerSender(PeerManagerSenderForNetworkInput),
PartialWitness(PartialWitnessSenderForNetworkInput),
}

Expand Down Expand Up @@ -628,6 +629,12 @@ pub(crate) async fn start(
}
}
});
let peer_manager_sender = Sender::from_fn({
let send = send.clone();
move |event: PeerManagerSenderForNetworkMessage| {
send.send(Event::PeerManagerSender(event.into_input()));
}
});
let shards_manager_sender = Sender::from_fn({
let send = send.clone();
move |event| {
Expand All @@ -645,6 +652,7 @@ pub(crate) async fn start(
store,
cfg,
client_sender.break_apart().into_multi_sender(),
peer_manager_sender.break_apart().into_multi_sender(),
shards_manager_sender,
state_witness_sender.break_apart().into_multi_sender(),
genesis_id,
Expand Down
9 changes: 9 additions & 0 deletions chain/network/src/stats/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,15 @@ pub(crate) static PEER_MANAGER_MESSAGES_TIME: LazyLock<HistogramVec> = LazyLock:
)
.unwrap()
});
pub(crate) static PEER_MANAGER_TIER3_REQUEST_TIME: LazyLock<HistogramVec> = LazyLock::new(|| {
try_create_histogram_vec(
"near_peer_manager_tier3_request_time",
"Time that PeerManagerActor spends on handling tier3 requests",
&["request"],
Some(exponential_buckets(0.0001, 2., 15).unwrap()),
)
.unwrap()
});
pub(crate) static ROUTED_MESSAGE_DROPPED: LazyLock<IntCounterVec> = LazyLock::new(|| {
try_create_int_counter_vec(
"near_routed_message_dropped",
Expand Down
Loading

0 comments on commit 6ccbd62

Please sign in to comment.