From 043d52b7dfe0d270efe4c7bc36615279f849dad6 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Mon, 16 Sep 2024 13:39:47 +0200 Subject: [PATCH] Generalize sync ActiveRequests --- .../src/service/api_types.rs | 17 +- .../network/src/sync/block_lookups/tests.rs | 33 +-- beacon_node/network/src/sync/manager.rs | 16 +- .../network/src/sync/network_context.rs | 237 +++++++----------- .../src/sync/network_context/requests.rs | 168 ++++++++++++- .../network_context/requests/blobs_by_root.rs | 59 ++--- .../requests/blocks_by_root.rs | 43 ++-- .../requests/data_columns_by_root.rs | 58 +---- 8 files changed, 327 insertions(+), 304 deletions(-) diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index 30400db3b6..3e7d19bbdd 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -29,11 +29,6 @@ pub struct SingleLookupReqId { pub req_id: Id, } -/// Request ID for data_columns_by_root requests. Block lookup do not issue this requests directly. -/// Wrapping this particular req_id, ensures not mixing this requests with a custody req_id. -#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] -pub struct DataColumnsByRootRequestId(pub Id); - /// Id of rpc requests sent by sync to the network. #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub enum SyncRequestId { @@ -42,11 +37,19 @@ pub enum SyncRequestId { /// Request searching for a set of blobs given a hash. SingleBlob { id: SingleLookupReqId }, /// Request searching for a set of data columns given a hash and list of column indices. - DataColumnsByRoot(DataColumnsByRootRequestId, DataColumnsByRootRequester), + DataColumnsByRoot(DataColumnsByRootRequestId), /// Range request that is composed by both a block range request and a blob range request. RangeBlockAndBlobs { id: Id }, } +/// Request ID for data_columns_by_root requests. Block lookup do not issue this requests directly. +/// Wrapping this particular req_id, ensures not mixing this requests with a custody req_id. +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct DataColumnsByRootRequestId { + pub id: Id, + pub requester: DataColumnsByRootRequester, +} + #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub enum DataColumnsByRootRequester { Sampling(SamplingId), @@ -247,6 +250,6 @@ impl slog::Value for RequestId { impl std::fmt::Display for DataColumnsByRootRequestId { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.0) + write!(f, "{}", self.id) } } diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 5b4f17ac0d..f50bd888c0 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -24,8 +24,8 @@ use beacon_chain::{ use beacon_processor::WorkEvent; use lighthouse_network::rpc::{RPCError, RPCResponseErrorCode}; use lighthouse_network::service::api_types::{ - AppRequestId, DataColumnsByRootRequester, Id, SamplingRequester, SingleLookupReqId, - SyncRequestId, + AppRequestId, DataColumnsByRootRequestId, DataColumnsByRootRequester, Id, SamplingRequester, + SingleLookupReqId, SyncRequestId, }; use lighthouse_network::types::SyncState; use lighthouse_network::{NetworkGlobals, Request}; @@ -713,10 +713,10 @@ impl TestRig { let first_dc = data_columns.first().unwrap(); let block_root = first_dc.block_root(); let sampling_request_id = match id.0 { - SyncRequestId::DataColumnsByRoot( - _, - _requester @ DataColumnsByRootRequester::Sampling(sampling_id), - ) => sampling_id.sampling_request_id, + SyncRequestId::DataColumnsByRoot(DataColumnsByRootRequestId { + requester: DataColumnsByRootRequester::Sampling(sampling_id), + .. + }) => sampling_id.sampling_request_id, _ => unreachable!(), }; self.complete_data_columns_by_root_request(id, data_columns); @@ -741,14 +741,15 @@ impl TestRig { data_columns: Vec>>, missing_components: bool, ) { - let lookup_id = - if let SyncRequestId::DataColumnsByRoot(_, DataColumnsByRootRequester::Custody(id)) = - ids.first().unwrap().0 - { - id.requester.0.lookup_id - } else { - panic!("not a custody requester") - }; + let lookup_id = if let SyncRequestId::DataColumnsByRoot(DataColumnsByRootRequestId { + requester: DataColumnsByRootRequester::Custody(id), + .. + }) = ids.first().unwrap().0 + { + id.requester.0.lookup_id + } else { + panic!("not a custody requester") + }; let first_column = data_columns.first().cloned().unwrap(); @@ -1339,7 +1340,7 @@ fn test_single_block_lookup_empty_response() { // The peer does not have the block. It should be penalized. r.single_lookup_block_response(id, peer_id, None); - r.expect_penalty(peer_id, "NoResponseReturned"); + r.expect_penalty(peer_id, "NotEnoughResponsesReturned"); // it should be retried let id = r.expect_block_lookup_request(block_root); // Send the right block this time. @@ -2698,7 +2699,7 @@ mod deneb_only { }; tester .empty_block_response() - .expect_penalty("NoResponseReturned") + .expect_penalty("NotEnoughResponsesReturned") .expect_block_request() .expect_no_blobs_request() .block_response_and_expect_blob_request() diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index ed91c73d8b..718206873b 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -380,13 +380,9 @@ impl SyncManager { SyncRequestId::SingleBlob { id } => { self.on_single_blob_response(id, peer_id, RpcEvent::RPCError(error)) } - SyncRequestId::DataColumnsByRoot(req_id, requester) => self - .on_data_columns_by_root_response( - req_id, - requester, - peer_id, - RpcEvent::RPCError(error), - ), + SyncRequestId::DataColumnsByRoot(req_id) => { + self.on_data_columns_by_root_response(req_id, peer_id, RpcEvent::RPCError(error)) + } SyncRequestId::RangeBlockAndBlobs { id } => { if let Some(sender_id) = self.network.range_request_failed(id) { match sender_id { @@ -991,10 +987,9 @@ impl SyncManager { seen_timestamp: Duration, ) { match request_id { - SyncRequestId::DataColumnsByRoot(req_id, requester) => { + SyncRequestId::DataColumnsByRoot(req_id) => { self.on_data_columns_by_root_response( req_id, - requester, peer_id, match data_column { Some(data_column) => RpcEvent::Response(data_column, seen_timestamp), @@ -1036,7 +1031,6 @@ impl SyncManager { fn on_data_columns_by_root_response( &mut self, req_id: DataColumnsByRootRequestId, - requester: DataColumnsByRootRequester, peer_id: PeerId, data_column: RpcEvent>>, ) { @@ -1044,7 +1038,7 @@ impl SyncManager { self.network .on_data_columns_by_root_response(req_id, peer_id, data_column) { - match requester { + match req_id.requester { DataColumnsByRootRequester::Sampling(id) => { if let Some((requester, result)) = self.sampling diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index b9f6d180c1..bfb9bd44a6 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -2,7 +2,6 @@ //! channel and stores a global RPC ID to perform requests. use self::custody::{ActiveCustodyRequest, Error as CustodyRequestError}; -use self::requests::{ActiveBlobsByRootRequest, ActiveBlocksByRootRequest}; pub use self::requests::{BlocksByRootSingleRequest, DataColumnsByRootSingleBlockRequest}; use super::block_sidecar_coupling::RangeBlockComponentsRequest; use super::manager::BlockProcessType; @@ -25,8 +24,11 @@ use lighthouse_network::service::api_types::{ use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request}; use rand::seq::SliceRandom; use rand::thread_rng; -use requests::ActiveDataColumnsByRootRequest; pub use requests::LookupVerifyError; +use requests::{ + ActiveRequests, BlobsByRootRequestItems, BlocksByRootRequestItems, + DataColumnsByRootRequestItems, +}; use slog::{debug, error, warn}; use std::collections::hash_map::Entry; use std::collections::HashMap; @@ -164,18 +166,17 @@ pub struct SyncNetworkContext { request_id: Id, /// A mapping of active BlocksByRoot requests, including both current slot and parent lookups. - blocks_by_root_requests: FnvHashMap, - + blocks_by_root_requests: + ActiveRequests>, /// A mapping of active BlobsByRoot requests, including both current slot and parent lookups. - blobs_by_root_requests: FnvHashMap>, + blobs_by_root_requests: ActiveRequests>, + /// A mapping of active DataColumnsByRoot requests + data_columns_by_root_requests: + ActiveRequests>, /// Mapping of active custody column requests for a block root custody_by_root_requests: FnvHashMap>, - /// A mapping of active DataColumnsByRoot requests - data_columns_by_root_requests: - FnvHashMap>, - /// BlocksByRange requests paired with BlobsByRange range_block_components_requests: FnvHashMap)>, @@ -223,9 +224,17 @@ impl SyncNetworkContext { network_send, execution_engine_state: EngineState::Online, // always assume `Online` at the start request_id: 1, - blocks_by_root_requests: <_>::default(), - blobs_by_root_requests: <_>::default(), - data_columns_by_root_requests: <_>::default(), + // true = enfore max_requests as returned for blocks_by_root. We always request a single + // block and the peer must have it. + blocks_by_root_requests: ActiveRequests::new(true, "blocks_by_root"), + // true = enfore max_requests are returned for blobs_by_root. We only issue requests for + // blocks after we know the block has data, and only request peers after they claim to + // have imported the block+blobs. + blobs_by_root_requests: ActiveRequests::new(true, "blobs_by_root"), + // true = enforce max_requests are returned data_columns_by_root. We only issue requests + // for blocks after we know the block has data, and only request peers after they claim to + // have imported the block+columns and claim to be custodians + data_columns_by_root_requests: ActiveRequests::new(true, "data_columns_by_root"), custody_by_root_requests: <_>::default(), range_block_components_requests: FnvHashMap::default(), network_beacon_processor, @@ -249,34 +258,19 @@ impl SyncNetworkContext { let failed_block_ids = self .blocks_by_root_requests - .iter() - .filter_map(|(id, request)| { - if request.peer_id == *peer_id { - Some(SyncRequestId::SingleBlock { id: *id }) - } else { - None - } - }); + .active_requests_of_peer(peer_id) + .into_iter() + .map(|id| SyncRequestId::SingleBlock { id: *id }); let failed_blob_ids = self .blobs_by_root_requests - .iter() - .filter_map(|(id, request)| { - if request.peer_id == *peer_id { - Some(SyncRequestId::SingleBlob { id: *id }) - } else { - None - } - }); - let failed_data_column_by_root_ids = - self.data_columns_by_root_requests - .iter() - .filter_map(|(req_id, request)| { - if request.peer_id == *peer_id { - Some(SyncRequestId::DataColumnsByRoot(*req_id, request.requester)) - } else { - None - } - }); + .active_requests_of_peer(peer_id) + .into_iter() + .map(|id| SyncRequestId::SingleBlob { id: *id }); + let failed_data_column_by_root_ids = self + .data_columns_by_root_requests + .active_requests_of_peer(peer_id) + .into_iter() + .map(|req_id| SyncRequestId::DataColumnsByRoot(*req_id)); failed_range_ids .chain(failed_block_ids) @@ -579,7 +573,7 @@ impl SyncNetworkContext { .map_err(|_| RpcRequestSendError::NetworkSendError)?; self.blocks_by_root_requests - .insert(id, ActiveBlocksByRootRequest::new(request, peer_id)); + .insert(id, peer_id, BlocksByRootRequestItems::new(request)); Ok(LookupRequestResult::RequestSent(req_id)) } @@ -677,7 +671,7 @@ impl SyncNetworkContext { .map_err(|_| RpcRequestSendError::NetworkSendError)?; self.blobs_by_root_requests - .insert(id, ActiveBlobsByRootRequest::new(request, peer_id)); + .insert(id, peer_id, BlobsByRootRequestItems::new(request)); Ok(LookupRequestResult::RequestSent(req_id)) } @@ -689,7 +683,10 @@ impl SyncNetworkContext { peer_id: PeerId, request: DataColumnsByRootSingleBlockRequest, ) -> Result, &'static str> { - let req_id = DataColumnsByRootRequestId(self.next_id()); + let req_id = DataColumnsByRootRequestId { + id: self.next_id(), + requester, + }; debug!( self.log, "Sending DataColumnsByRoot Request"; @@ -704,12 +701,13 @@ impl SyncNetworkContext { self.send_network_msg(NetworkMessage::SendRequest { peer_id, request: Request::DataColumnsByRoot(request.clone().into_request(&self.chain.spec)), - request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRoot(req_id, requester)), + request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRoot(req_id)), })?; self.data_columns_by_root_requests.insert( req_id, - ActiveDataColumnsByRootRequest::new(request, peer_id, requester), + peer_id, + DataColumnsByRootRequestItems::new(request), ); Ok(LookupRequestResult::RequestSent(req_id)) @@ -917,142 +915,77 @@ impl SyncNetworkContext { // Request handlers - pub fn on_single_block_response( + pub(crate) fn on_single_block_response( &mut self, - request_id: SingleLookupReqId, + id: SingleLookupReqId, peer_id: PeerId, rpc_event: RpcEvent>>, ) -> Option>>> { - let Entry::Occupied(mut request) = self.blocks_by_root_requests.entry(request_id) else { - metrics::inc_counter_vec(&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, &["blocks_by_root"]); - return None; - }; - - let resp = match rpc_event { - RpcEvent::Response(block, seen_timestamp) => { - match request.get_mut().add_response(block) { - Ok(block) => Ok((block, seen_timestamp)), - Err(e) => { - // The request must be dropped after receiving an error. - request.remove(); - Err(e.into()) - } + let r = self.blocks_by_root_requests.on_response(id, rpc_event); + let r = match r { + // Enforce that exactly one chunk = one block is returned. ReqResp behavior limits the + // response count to at most 1. + Some(Ok((mut blocks, seen_timestamp))) => match blocks.pop() { + Some(block) => Some(Ok((block, seen_timestamp))), + // Should never happen, `blocks_by_root_requests` enforces that we receive at least + // 1 chunk. + None => Some(Err(LookupVerifyError::NotEnoughResponsesReturned { + actual: 0, } - } - RpcEvent::StreamTermination => match request.remove().terminate() { - Ok(_) => return None, - Err(e) => Err(e.into()), + .into())), }, - RpcEvent::RPCError(e) => { - request.remove(); - Err(e.into()) - } + Some(Err(e)) => Some(Err(e)), + None => None, }; - - if let Err(RpcResponseError::VerifyError(e)) = &resp { + if let Some(Err(RpcResponseError::VerifyError(e))) = &r { self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); } - Some(resp) + r } - pub fn on_single_blob_response( + pub(crate) fn on_single_blob_response( &mut self, - request_id: SingleLookupReqId, + id: SingleLookupReqId, peer_id: PeerId, rpc_event: RpcEvent>>, ) -> Option>> { - let Entry::Occupied(mut request) = self.blobs_by_root_requests.entry(request_id) else { - metrics::inc_counter_vec(&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, &["blobs_by_root"]); - return None; - }; - - let resp = match rpc_event { - RpcEvent::Response(blob, seen_timestamp) => { - let request = request.get_mut(); - match request.add_response(blob) { - Ok(Some(blobs)) => to_fixed_blob_sidecar_list(blobs) - .map(|blobs| (blobs, seen_timestamp)) - .map_err(|e| (e.into(), request.resolve())), - Ok(None) => return None, - Err(e) => Err((e.into(), request.resolve())), - } - } - RpcEvent::StreamTermination => match request.remove().terminate() { - Ok(_) => return None, - // (err, false = not resolved) because terminate returns Ok() if resolved - Err(e) => Err((e.into(), false)), + let r = self.blobs_by_root_requests.on_response(id, rpc_event); + let r = match r { + Some(Ok((blobs, seen_timestamp))) => match to_fixed_blob_sidecar_list(blobs) { + Ok(blobs) => Some(Ok((blobs, seen_timestamp))), + Err(e) => Some(Err(e.into())), }, - RpcEvent::RPCError(e) => Err((e.into(), request.remove().resolve())), + Some(Err(e)) => Some(Err(e)), + None => None, }; - - match resp { - Ok(resp) => Some(Ok(resp)), - // Track if this request has already returned some value downstream. Ensure that - // downstream code only receives a single Result per request. If the serving peer does - // multiple penalizable actions per request, downscore and return None. This allows to - // catch if a peer is returning more blobs than requested or if the excess blobs are - // invalid. - Err((e, resolved)) => { - if let RpcResponseError::VerifyError(e) = &e { - self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); - } - if resolved { - None - } else { - Some(Err(e)) - } - } + if let Some(Err(RpcResponseError::VerifyError(e))) = &r { + self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); } + r } #[allow(clippy::type_complexity)] - pub fn on_data_columns_by_root_response( + pub(crate) fn on_data_columns_by_root_response( &mut self, id: DataColumnsByRootRequestId, - _peer_id: PeerId, + peer_id: PeerId, rpc_event: RpcEvent>>, ) -> Option>>>> { - let Entry::Occupied(mut request) = self.data_columns_by_root_requests.entry(id) else { - return None; - }; - - let resp = match rpc_event { - RpcEvent::Response(data_column, seen_timestamp) => { - let request = request.get_mut(); - match request.add_response(data_column) { - Ok(Some(data_columns)) => Ok((data_columns, seen_timestamp)), - Ok(None) => return None, - Err(e) => Err((e.into(), request.resolve())), - } - } - RpcEvent::StreamTermination => match request.remove().terminate() { - Ok(_) => return None, - // (err, false = not resolved) because terminate returns Ok() if resolved - Err(e) => Err((e.into(), false)), - }, - RpcEvent::RPCError(e) => Err((e.into(), request.remove().resolve())), - }; + let resp = self + .data_columns_by_root_requests + .on_response(id, rpc_event); + self.report_rpc_response_errors(resp, peer_id) + } - match resp { - Ok(resp) => Some(Ok(resp)), - // Track if this request has already returned some value downstream. Ensure that - // downstream code only receives a single Result per request. If the serving peer does - // multiple penalizable actions per request, downscore and return None. This allows to - // catch if a peer is returning more columns than requested or if the excess blobs are - // invalid. - Err((e, resolved)) => { - if let RpcResponseError::VerifyError(_e) = &e { - // TODO(das): this is a bug, we should not penalise peer in this case. - // confirm this can be removed. - // self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); - } - if resolved { - None - } else { - Some(Err(e)) - } - } + fn report_rpc_response_errors( + &mut self, + resp: Option>, + peer_id: PeerId, + ) -> Option> { + if let Some(Err(RpcResponseError::VerifyError(e))) = &resp { + self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); } + resp } /// Insert a downloaded column into an active custody request. Then make progress on the diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index 0c2f59d143..16c4cde44d 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -1,23 +1,181 @@ +use std::{collections::hash_map::Entry, hash::Hash}; + +use beacon_chain::validator_monitor::timestamp_now; +use fnv::FnvHashMap; +use lighthouse_network::PeerId; use strum::IntoStaticStr; use types::Hash256; -pub use blobs_by_root::{ActiveBlobsByRootRequest, BlobsByRootSingleBlockRequest}; -pub use blocks_by_root::{ActiveBlocksByRootRequest, BlocksByRootSingleRequest}; +pub use blobs_by_root::{BlobsByRootRequestItems, BlobsByRootSingleBlockRequest}; +pub use blocks_by_root::{BlocksByRootRequestItems, BlocksByRootSingleRequest}; pub use data_columns_by_root::{ - ActiveDataColumnsByRootRequest, DataColumnsByRootSingleBlockRequest, + DataColumnsByRootRequestItems, DataColumnsByRootSingleBlockRequest, }; +use crate::metrics; + +use super::{RpcEvent, RpcResponseResult}; + mod blobs_by_root; mod blocks_by_root; mod data_columns_by_root; #[derive(Debug, PartialEq, Eq, IntoStaticStr)] pub enum LookupVerifyError { - NoResponseReturned, - NotEnoughResponsesReturned { expected: usize, actual: usize }, + NotEnoughResponsesReturned { actual: usize }, TooManyResponses, UnrequestedBlockRoot(Hash256), UnrequestedIndex(u64), InvalidInclusionProof, DuplicateData, } + +/// Collection of active requests of a single ReqResp method, i.e. `blocks_by_root` +pub struct ActiveRequests { + requests: FnvHashMap>, + name: &'static str, + expect_max_responses: bool, +} + +/// Stateful container for a single active ReqResp request +struct ActiveRequest { + state: State, + peer_id: PeerId, +} + +enum State { + Active(T), + CompletedEarly, + Errored, +} + +impl ActiveRequests { + pub fn new(expect_max_responses: bool, name: &'static str) -> Self { + Self { + requests: <_>::default(), + name, + expect_max_responses, + } + } + + pub fn insert(&mut self, id: K, peer_id: PeerId, items: T) { + self.requests.insert( + id, + ActiveRequest { + state: State::Active(items), + peer_id, + }, + ); + } + + /// Handle an `RpcEvent` for a specific request index by `id`. + /// + /// Lighthouse ReqResp protocol API promises to send 0 or more `RpcEvent::Response` chunks, + /// and EITHER a single `RpcEvent::RPCError` or RpcEvent::StreamTermination. + /// + /// Downstream code expects to receive a single `Result` value per request ID. However, + /// `add_item` may convert ReqResp success chunks into errors. This function handles the + /// multiple errors / stream termination internally ensuring that a single `Some` is + /// returned. + pub fn on_response( + &mut self, + id: K, + rpc_event: RpcEvent, + ) -> Option>> { + let Entry::Occupied(mut request) = self.requests.entry(id) else { + metrics::inc_counter_vec(&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, &[self.name]); + return None; + }; + + match rpc_event { + // Handler of a success ReqResp chunk. Adds the item to the request accumulator. + // `ActiveRequestItems` validates the item before appending to its internal state. + RpcEvent::Response(item, seen_timestamp) => { + let request = &mut request.get_mut(); + match &mut request.state { + State::Active(items) => { + match items.add(item) { + // Received all items we are expecting for, return early, but keep the request + // struct to handle the stream termination gracefully. + Ok(true) => { + let items = items.consume(); + request.state = State::CompletedEarly; + Some(Ok((items, seen_timestamp))) + } + // Received item, but we are still expecting more + Ok(false) => None, + // Received an invalid item + Err(e) => { + request.state = State::Errored; + Some(Err(e.into())) + } + } + } + // Should never happen, ReqResp network behaviour enforces a max count of chunks + State::CompletedEarly => None, + // Ignore items after errors. We may want to penalize repeated invalid chunks + // for the same response. But that's an optimization to ban peers sending + // invalid data faster that we choose to not adopt for now. + State::Errored => None, + } + } + RpcEvent::StreamTermination => { + // After stream termination we must forget about this request, there will be no more + // messages coming from the network + match request.remove().state { + // Received a stream termination in a valid sequence, consume items + State::Active(mut items) => { + if self.expect_max_responses { + Some(Err(LookupVerifyError::NotEnoughResponsesReturned { + actual: items.consume().len(), + } + .into())) + } else { + Some(Ok((items.consume(), timestamp_now()))) + } + } + // Items already returned, ignore stream termination + State::CompletedEarly => None, + // Returned an error earlier, ignore stream termination + State::Errored => None, + } + } + RpcEvent::RPCError(e) => { + // After an Error event from the network we must forget about this request as this + // may be the last message for this request. + match request.remove().state { + // Received error while request is still active, propagate error. + State::Active(_) => Some(Err(e.into())), + // Received error after completing the request, ignore the error. This is okay + // because the network has already registered a downscore event if necessary for + // this message. + State::CompletedEarly => None, + // Received a network error after a validity error. Okay to ignore, see above + State::Errored => None, + } + } + } + } + + pub fn active_requests_of_peer(&self, peer_id: &PeerId) -> Vec<&K> { + self.requests + .iter() + .filter(|(_, request)| &request.peer_id == peer_id) + .map(|(id, _)| id) + .collect() + } + + pub fn len(&self) -> usize { + self.requests.len() + } +} + +pub trait ActiveRequestItems { + type Item; + + /// Add a new item into the accumulator. Returns true if all expected items have been received. + fn add(&mut self, item: Self::Item) -> Result; + + /// Return all accumulated items consuming them. + fn consume(&mut self) -> Vec; +} diff --git a/beacon_node/network/src/sync/network_context/requests/blobs_by_root.rs b/beacon_node/network/src/sync/network_context/requests/blobs_by_root.rs index cb2b1a42ec..fefb27a5ef 100644 --- a/beacon_node/network/src/sync/network_context/requests/blobs_by_root.rs +++ b/beacon_node/network/src/sync/network_context/requests/blobs_by_root.rs @@ -1,8 +1,8 @@ -use lighthouse_network::{rpc::methods::BlobsByRootRequest, PeerId}; +use lighthouse_network::rpc::methods::BlobsByRootRequest; use std::sync::Arc; use types::{blob_sidecar::BlobIdentifier, BlobSidecar, ChainSpec, EthSpec, Hash256}; -use super::LookupVerifyError; +use super::{ActiveRequestItems, LookupVerifyError}; #[derive(Debug, Clone)] pub struct BlobsByRootSingleBlockRequest { @@ -25,34 +25,27 @@ impl BlobsByRootSingleBlockRequest { } } -pub struct ActiveBlobsByRootRequest { +pub struct BlobsByRootRequestItems { request: BlobsByRootSingleBlockRequest, - blobs: Vec>>, - resolved: bool, - pub(crate) peer_id: PeerId, + items: Vec>>, } -impl ActiveBlobsByRootRequest { - pub fn new(request: BlobsByRootSingleBlockRequest, peer_id: PeerId) -> Self { +impl BlobsByRootRequestItems { + pub fn new(request: BlobsByRootSingleBlockRequest) -> Self { Self { request, - blobs: vec![], - resolved: false, - peer_id, + items: vec![], } } +} + +impl ActiveRequestItems for BlobsByRootRequestItems { + type Item = Arc>; /// Appends a chunk to this multi-item request. If all expected chunks are received, this /// method returns `Some`, resolving the request before the stream terminator. /// The active request SHOULD be dropped after `add_response` returns an error - pub fn add_response( - &mut self, - blob: Arc>, - ) -> Result>>>, LookupVerifyError> { - if self.resolved { - return Err(LookupVerifyError::TooManyResponses); - } - + fn add(&mut self, blob: Self::Item) -> Result { let block_root = blob.block_root(); if self.request.block_root != block_root { return Err(LookupVerifyError::UnrequestedBlockRoot(block_root)); @@ -63,34 +56,16 @@ impl ActiveBlobsByRootRequest { if !self.request.indices.contains(&blob.index) { return Err(LookupVerifyError::UnrequestedIndex(blob.index)); } - if self.blobs.iter().any(|b| b.index == blob.index) { + if self.items.iter().any(|b| b.index == blob.index) { return Err(LookupVerifyError::DuplicateData); } - self.blobs.push(blob); - if self.blobs.len() >= self.request.indices.len() { - // All expected chunks received, return result early - self.resolved = true; - Ok(Some(std::mem::take(&mut self.blobs))) - } else { - Ok(None) - } - } + self.items.push(blob); - pub fn terminate(self) -> Result<(), LookupVerifyError> { - if self.resolved { - Ok(()) - } else { - Err(LookupVerifyError::NotEnoughResponsesReturned { - expected: self.request.indices.len(), - actual: self.blobs.len(), - }) - } + Ok(self.items.len() >= self.request.indices.len()) } - /// Mark request as resolved (= has returned something downstream) while marking this status as - /// true for future calls. - pub fn resolve(&mut self) -> bool { - std::mem::replace(&mut self.resolved, true) + fn consume(&mut self) -> Vec { + std::mem::take(&mut self.items) } } diff --git a/beacon_node/network/src/sync/network_context/requests/blocks_by_root.rs b/beacon_node/network/src/sync/network_context/requests/blocks_by_root.rs index a15d4e3935..f3cdcbe714 100644 --- a/beacon_node/network/src/sync/network_context/requests/blocks_by_root.rs +++ b/beacon_node/network/src/sync/network_context/requests/blocks_by_root.rs @@ -1,9 +1,9 @@ use beacon_chain::get_block_root; -use lighthouse_network::{rpc::BlocksByRootRequest, PeerId}; +use lighthouse_network::rpc::BlocksByRootRequest; use std::sync::Arc; use types::{ChainSpec, EthSpec, Hash256, SignedBeaconBlock}; -use super::LookupVerifyError; +use super::{ActiveRequestItems, LookupVerifyError}; #[derive(Debug, Copy, Clone)] pub struct BlocksByRootSingleRequest(pub Hash256); @@ -14,47 +14,38 @@ impl BlocksByRootSingleRequest { } } -pub struct ActiveBlocksByRootRequest { +pub struct BlocksByRootRequestItems { request: BlocksByRootSingleRequest, - resolved: bool, - pub(crate) peer_id: PeerId, + items: Vec>>, } -impl ActiveBlocksByRootRequest { - pub fn new(request: BlocksByRootSingleRequest, peer_id: PeerId) -> Self { +impl BlocksByRootRequestItems { + pub fn new(request: BlocksByRootSingleRequest) -> Self { Self { request, - resolved: false, - peer_id, + items: vec![], } } +} + +impl ActiveRequestItems for BlocksByRootRequestItems { + type Item = Arc>; /// Append a response to the single chunk request. If the chunk is valid, the request is /// resolved immediately. /// The active request SHOULD be dropped after `add_response` returns an error - pub fn add_response( - &mut self, - block: Arc>, - ) -> Result>, LookupVerifyError> { - if self.resolved { - return Err(LookupVerifyError::TooManyResponses); - } - + fn add(&mut self, block: Self::Item) -> Result { let block_root = get_block_root(&block); if self.request.0 != block_root { return Err(LookupVerifyError::UnrequestedBlockRoot(block_root)); } - // Valid data, blocks by root expects a single response - self.resolved = true; - Ok(block) + self.items.push(block); + // Always returns true, blocks by root expects a single response + Ok(true) } - pub fn terminate(self) -> Result<(), LookupVerifyError> { - if self.resolved { - Ok(()) - } else { - Err(LookupVerifyError::NoResponseReturned) - } + fn consume(&mut self) -> Vec { + std::mem::take(&mut self.items) } } diff --git a/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs b/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs index a42ae7ca41..1b8d46ff07 100644 --- a/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs +++ b/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs @@ -1,9 +1,8 @@ -use lighthouse_network::service::api_types::DataColumnsByRootRequester; -use lighthouse_network::{rpc::methods::DataColumnsByRootRequest, PeerId}; +use lighthouse_network::rpc::methods::DataColumnsByRootRequest; use std::sync::Arc; use types::{ChainSpec, DataColumnIdentifier, DataColumnSidecar, EthSpec, Hash256}; -use super::LookupVerifyError; +use super::{ActiveRequestItems, LookupVerifyError}; #[derive(Debug, Clone)] pub struct DataColumnsByRootSingleBlockRequest { @@ -26,40 +25,27 @@ impl DataColumnsByRootSingleBlockRequest { } } -pub struct ActiveDataColumnsByRootRequest { +pub struct DataColumnsByRootRequestItems { request: DataColumnsByRootSingleBlockRequest, items: Vec>>, - resolved: bool, - pub(crate) peer_id: PeerId, - pub(crate) requester: DataColumnsByRootRequester, } -impl ActiveDataColumnsByRootRequest { - pub fn new( - request: DataColumnsByRootSingleBlockRequest, - peer_id: PeerId, - requester: DataColumnsByRootRequester, - ) -> Self { +impl DataColumnsByRootRequestItems { + pub fn new(request: DataColumnsByRootSingleBlockRequest) -> Self { Self { request, items: vec![], - resolved: false, - peer_id, - requester, } } +} + +impl ActiveRequestItems for DataColumnsByRootRequestItems { + type Item = Arc>; /// Appends a chunk to this multi-item request. If all expected chunks are received, this /// method returns `Some`, resolving the request before the stream terminator. /// The active request SHOULD be dropped after `add_response` returns an error - pub fn add_response( - &mut self, - data_column: Arc>, - ) -> Result>>>, LookupVerifyError> { - if self.resolved { - return Err(LookupVerifyError::TooManyResponses); - } - + fn add(&mut self, data_column: Self::Item) -> Result { let block_root = data_column.block_root(); if self.request.block_root != block_root { return Err(LookupVerifyError::UnrequestedBlockRoot(block_root)); @@ -75,29 +61,11 @@ impl ActiveDataColumnsByRootRequest { } self.items.push(data_column); - if self.items.len() >= self.request.indices.len() { - // All expected chunks received, return result early - self.resolved = true; - Ok(Some(std::mem::take(&mut self.items))) - } else { - Ok(None) - } - } - pub fn terminate(self) -> Result<(), LookupVerifyError> { - if self.resolved { - Ok(()) - } else { - Err(LookupVerifyError::NotEnoughResponsesReturned { - expected: self.request.indices.len(), - actual: self.items.len(), - }) - } + Ok(self.items.len() >= self.request.indices.len()) } - /// Mark request as resolved (= has returned something downstream) while marking this status as - /// true for future calls. - pub fn resolve(&mut self) -> bool { - std::mem::replace(&mut self.resolved, true) + fn consume(&mut self) -> Vec { + std::mem::take(&mut self.items) } }