Skip to content

Commit 677f96a

Browse files
authored
Add data columns by root sync request (#6274)
* Add data columns by root sync request
1 parent 56d1c8c commit 677f96a

File tree

5 files changed

+287
-32
lines changed

5 files changed

+287
-32
lines changed

beacon_node/lighthouse_network/src/service/api_types.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,3 +212,9 @@ impl slog::Value for RequestId {
212212
}
213213
}
214214
}
215+
216+
impl std::fmt::Display for DataColumnsByRootRequestId {
217+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
218+
write!(f, "{}", self.0)
219+
}
220+
}

beacon_node/network/src/sync/manager.rs

Lines changed: 54 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ use beacon_chain::{
5353
};
5454
use futures::StreamExt;
5555
use lighthouse_network::rpc::RPCError;
56-
use lighthouse_network::service::api_types::{Id, SingleLookupReqId, SyncRequestId};
56+
use lighthouse_network::service::api_types::{
57+
DataColumnsByRootRequestId, Id, SingleLookupReqId, SyncRequestId,
58+
};
5759
use lighthouse_network::types::{NetworkGlobals, SyncState};
5860
use lighthouse_network::SyncInfo;
5961
use lighthouse_network::{PeerAction, PeerId};
@@ -345,9 +347,13 @@ impl<T: BeaconChainTypes> SyncManager<T> {
345347
SyncRequestId::SingleBlob { id } => {
346348
self.on_single_blob_response(id, peer_id, RpcEvent::RPCError(error))
347349
}
348-
SyncRequestId::DataColumnsByRoot { .. } => {
349-
// TODO(das)
350-
}
350+
SyncRequestId::DataColumnsByRoot(req_id, requester) => self
351+
.on_data_columns_by_root_response(
352+
req_id,
353+
requester,
354+
peer_id,
355+
RpcEvent::RPCError(error),
356+
),
351357
SyncRequestId::RangeBlockAndBlobs { id } => {
352358
if let Some(sender_id) = self.network.range_request_failed(id) {
353359
match sender_id {
@@ -860,15 +866,12 @@ impl<T: BeaconChainTypes> SyncManager<T> {
860866
None => RpcEvent::StreamTermination,
861867
},
862868
),
863-
SyncRequestId::SingleBlob { .. } => {
864-
crit!(self.log, "Block received during blob request"; "peer_id" => %peer_id );
865-
}
866-
SyncRequestId::DataColumnsByRoot { .. } => {
867-
// TODO(das)
868-
}
869869
SyncRequestId::RangeBlockAndBlobs { id } => {
870870
self.range_block_and_blobs_response(id, peer_id, block.into())
871871
}
872+
_ => {
873+
crit!(self.log, "bad request id for block"; "peer_id" => %peer_id );
874+
}
872875
}
873876
}
874877

@@ -897,9 +900,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
897900
seen_timestamp: Duration,
898901
) {
899902
match request_id {
900-
SyncRequestId::SingleBlock { .. } => {
901-
crit!(self.log, "Single blob received during block request"; "peer_id" => %peer_id );
902-
}
903903
SyncRequestId::SingleBlob { id } => self.on_single_blob_response(
904904
id,
905905
peer_id,
@@ -908,23 +908,41 @@ impl<T: BeaconChainTypes> SyncManager<T> {
908908
None => RpcEvent::StreamTermination,
909909
},
910910
),
911-
SyncRequestId::DataColumnsByRoot { .. } => {
912-
// TODO(das)
913-
}
914911
SyncRequestId::RangeBlockAndBlobs { id } => {
915912
self.range_block_and_blobs_response(id, peer_id, blob.into())
916913
}
914+
_ => {
915+
crit!(self.log, "bad request id for blob"; "peer_id" => %peer_id);
916+
}
917917
}
918918
}
919919

920920
fn rpc_data_column_received(
921921
&mut self,
922-
_request_id: SyncRequestId,
923-
_peer_id: PeerId,
924-
_data_column: Option<Arc<DataColumnSidecar<T::EthSpec>>>,
925-
_seen_timestamp: Duration,
922+
request_id: SyncRequestId,
923+
peer_id: PeerId,
924+
data_column: Option<Arc<DataColumnSidecar<T::EthSpec>>>,
925+
seen_timestamp: Duration,
926926
) {
927-
// TODO(das): implement handler
927+
match request_id {
928+
SyncRequestId::DataColumnsByRoot(req_id, requester) => {
929+
self.on_data_columns_by_root_response(
930+
req_id,
931+
requester,
932+
peer_id,
933+
match data_column {
934+
Some(data_column) => RpcEvent::Response(data_column, seen_timestamp),
935+
None => RpcEvent::StreamTermination,
936+
},
937+
);
938+
}
939+
SyncRequestId::RangeBlockAndBlobs { id: _ } => {
940+
// TODO(das): implement custody range sync
941+
}
942+
_ => {
943+
crit!(self.log, "bad request id for data_column"; "peer_id" => %peer_id);
944+
}
945+
}
928946
}
929947

930948
fn on_single_blob_response(
@@ -944,6 +962,21 @@ impl<T: BeaconChainTypes> SyncManager<T> {
944962
}
945963
}
946964

965+
fn on_data_columns_by_root_response(
966+
&mut self,
967+
req_id: DataColumnsByRootRequestId,
968+
_requester: SingleLookupReqId,
969+
peer_id: PeerId,
970+
rpc_event: RpcEvent<Arc<DataColumnSidecar<T::EthSpec>>>,
971+
) {
972+
if let Some(_resp) = self
973+
.network
974+
.on_data_columns_by_root_response(req_id, peer_id, rpc_event)
975+
{
976+
// TODO(das): pass data_columns_by_root result to consumer
977+
}
978+
}
979+
947980
/// Handles receiving a response for a range sync request that should have both blocks and
948981
/// blobs.
949982
fn range_block_and_blobs_response(

beacon_node/network/src/sync/network_context.rs

Lines changed: 115 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,21 @@ use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineStat
1616
use fnv::FnvHashMap;
1717
use lighthouse_network::rpc::methods::BlobsByRangeRequest;
1818
use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError};
19-
use lighthouse_network::service::api_types::{AppRequestId, Id, SingleLookupReqId, SyncRequestId};
19+
use lighthouse_network::service::api_types::{
20+
AppRequestId, DataColumnsByRootRequestId, Id, SingleLookupReqId, SyncRequestId,
21+
};
2022
use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request};
2123
pub use requests::LookupVerifyError;
24+
use requests::{ActiveDataColumnsByRootRequest, DataColumnsByRootSingleBlockRequest};
2225
use slog::{debug, error, trace, warn};
2326
use std::collections::hash_map::Entry;
2427
use std::sync::Arc;
2528
use std::time::Duration;
2629
use tokio::sync::mpsc;
2730
use types::blob_sidecar::FixedBlobSidecarList;
28-
use types::{BlobSidecar, DataColumnSidecarList, EthSpec, Hash256, SignedBeaconBlock};
31+
use types::{
32+
BlobSidecar, DataColumnSidecar, DataColumnSidecarList, EthSpec, Hash256, SignedBeaconBlock,
33+
};
2934

3035
mod requests;
3136

@@ -96,10 +101,10 @@ impl From<LookupVerifyError> for RpcResponseError {
96101
/// Sequential ID that uniquely identifies ReqResp outgoing requests
97102
pub type ReqId = u32;
98103

99-
pub enum LookupRequestResult {
104+
pub enum LookupRequestResult<I = ReqId> {
100105
/// A request is sent. Sync MUST receive an event from the network in the future for either:
101106
/// completed response or failed request
102-
RequestSent(ReqId),
107+
RequestSent(I),
103108
/// No request is sent, and no further action is necessary to consider this request completed
104109
NoRequestNeeded,
105110
/// No request is sent, but the request is not completed. Sync MUST receive some future event
@@ -123,6 +128,10 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
123128
/// A mapping of active BlobsByRoot requests, including both current slot and parent lookups.
124129
blobs_by_root_requests: FnvHashMap<SingleLookupReqId, ActiveBlobsByRootRequest<T::EthSpec>>,
125130

131+
/// A mapping of active DataColumnsByRoot requests
132+
data_columns_by_root_requests:
133+
FnvHashMap<DataColumnsByRootRequestId, ActiveDataColumnsByRootRequest<T::EthSpec>>,
134+
126135
/// BlocksByRange requests paired with BlobsByRange
127136
range_blocks_and_blobs_requests:
128137
FnvHashMap<Id, (RangeRequestId, BlocksAndBlobsRequestInfo<T::EthSpec>)>,
@@ -171,6 +180,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
171180
request_id: 1,
172181
blocks_by_root_requests: <_>::default(),
173182
blobs_by_root_requests: <_>::default(),
183+
data_columns_by_root_requests: <_>::default(),
174184
range_blocks_and_blobs_requests: FnvHashMap::default(),
175185
network_beacon_processor,
176186
chain,
@@ -211,10 +221,21 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
211221
None
212222
}
213223
});
224+
let failed_data_column_by_root_ids =
225+
self.data_columns_by_root_requests
226+
.iter()
227+
.filter_map(|(req_id, request)| {
228+
if request.peer_id == *peer_id {
229+
Some(SyncRequestId::DataColumnsByRoot(*req_id, request.requester))
230+
} else {
231+
None
232+
}
233+
});
214234

215235
failed_range_ids
216236
.chain(failed_block_ids)
217237
.chain(failed_blob_ids)
238+
.chain(failed_data_column_by_root_ids)
218239
.collect()
219240
}
220241

@@ -529,6 +550,43 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
529550
Ok(LookupRequestResult::RequestSent(req_id))
530551
}
531552

553+
/// Request to send a single `data_columns_by_root` request to the network.
554+
pub fn data_column_lookup_request(
555+
&mut self,
556+
requester: SingleLookupReqId,
557+
peer_id: PeerId,
558+
request: DataColumnsByRootSingleBlockRequest,
559+
) -> Result<LookupRequestResult<DataColumnsByRootRequestId>, &'static str> {
560+
let req_id = DataColumnsByRootRequestId(self.next_id());
561+
debug!(
562+
self.log,
563+
"Sending DataColumnsByRoot Request";
564+
"method" => "DataColumnsByRoot",
565+
"block_root" => ?request.block_root,
566+
"indices" => ?request.indices,
567+
"peer" => %peer_id,
568+
"requester" => ?requester,
569+
"req_id" => %req_id,
570+
);
571+
572+
self.send_network_msg(NetworkMessage::SendRequest {
573+
peer_id,
574+
request: Request::DataColumnsByRoot(request.clone().into_request(&self.chain.spec)),
575+
request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRoot(req_id, requester)),
576+
})?;
577+
578+
self.data_columns_by_root_requests.insert(
579+
req_id,
580+
ActiveDataColumnsByRootRequest::new(request, peer_id, requester),
581+
);
582+
583+
Ok(LookupRequestResult::RequestSent(req_id))
584+
}
585+
586+
/// Request to fetch all needed custody columns of a specific block. This function may not send
587+
/// any request to the network if no columns have to be fetched based on the import state of the
588+
/// node. A custody request is a "super request" that may trigger 0 or more `data_columns_by_root`
589+
/// requests.
532590
pub fn custody_lookup_request(
533591
&mut self,
534592
lookup_id: SingleLookupId,
@@ -707,14 +765,14 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
707765
&mut self,
708766
request_id: SingleLookupReqId,
709767
peer_id: PeerId,
710-
block: RpcEvent<Arc<SignedBeaconBlock<T::EthSpec>>>,
768+
rpc_event: RpcEvent<Arc<SignedBeaconBlock<T::EthSpec>>>,
711769
) -> Option<RpcResponseResult<Arc<SignedBeaconBlock<T::EthSpec>>>> {
712770
let Entry::Occupied(mut request) = self.blocks_by_root_requests.entry(request_id) else {
713771
metrics::inc_counter_vec(&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, &["blocks_by_root"]);
714772
return None;
715773
};
716774

717-
let resp = match block {
775+
let resp = match rpc_event {
718776
RpcEvent::Response(block, seen_timestamp) => {
719777
match request.get_mut().add_response(block) {
720778
Ok(block) => Ok((block, seen_timestamp)),
@@ -745,14 +803,14 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
745803
&mut self,
746804
request_id: SingleLookupReqId,
747805
peer_id: PeerId,
748-
blob: RpcEvent<Arc<BlobSidecar<T::EthSpec>>>,
806+
rpc_event: RpcEvent<Arc<BlobSidecar<T::EthSpec>>>,
749807
) -> Option<RpcResponseResult<FixedBlobSidecarList<T::EthSpec>>> {
750808
let Entry::Occupied(mut request) = self.blobs_by_root_requests.entry(request_id) else {
751809
metrics::inc_counter_vec(&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, &["blobs_by_root"]);
752810
return None;
753811
};
754812

755-
let resp = match blob {
813+
let resp = match rpc_event {
756814
RpcEvent::Response(blob, seen_timestamp) => {
757815
let request = request.get_mut();
758816
match request.add_response(blob) {
@@ -791,6 +849,54 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
791849
}
792850
}
793851

852+
#[allow(clippy::type_complexity)]
853+
pub fn on_data_columns_by_root_response(
854+
&mut self,
855+
id: DataColumnsByRootRequestId,
856+
peer_id: PeerId,
857+
rpc_event: RpcEvent<Arc<DataColumnSidecar<T::EthSpec>>>,
858+
) -> Option<RpcResponseResult<Vec<Arc<DataColumnSidecar<T::EthSpec>>>>> {
859+
let Entry::Occupied(mut request) = self.data_columns_by_root_requests.entry(id) else {
860+
return None;
861+
};
862+
863+
let resp = match rpc_event {
864+
RpcEvent::Response(data_column, seen_timestamp) => {
865+
let request = request.get_mut();
866+
match request.add_response(data_column) {
867+
Ok(Some(data_columns)) => Ok((data_columns, seen_timestamp)),
868+
Ok(None) => return None,
869+
Err(e) => Err((e.into(), request.resolve())),
870+
}
871+
}
872+
RpcEvent::StreamTermination => match request.remove().terminate() {
873+
Ok(_) => return None,
874+
// (err, false = not resolved) because terminate returns Ok() if resolved
875+
Err(e) => Err((e.into(), false)),
876+
},
877+
RpcEvent::RPCError(e) => Err((e.into(), request.remove().resolve())),
878+
};
879+
880+
match resp {
881+
Ok(resp) => Some(Ok(resp)),
882+
// Track if this request has already returned some value downstream. Ensure that
883+
// downstream code only receives a single Result per request. If the serving peer does
884+
// multiple penalizable actions per request, downscore and return None. This allows to
885+
// catch if a peer is returning more columns than requested or if the excess blobs are
886+
// invalid.
887+
Err((e, resolved)) => {
888+
if let RpcResponseError::VerifyError(e) = &e {
889+
self.report_peer(peer_id, PeerAction::LowToleranceError, e.into());
890+
}
891+
if resolved {
892+
None
893+
} else {
894+
Some(Err(e))
895+
}
896+
}
897+
}
898+
}
899+
794900
pub fn send_block_for_processing(
795901
&self,
796902
id: Id,
@@ -900,7 +1006,7 @@ fn to_fixed_blob_sidecar_list<E: EthSpec>(
9001006
let index = blob.index as usize;
9011007
*fixed_list
9021008
.get_mut(index)
903-
.ok_or(LookupVerifyError::UnrequestedBlobIndex(index as u64))? = Some(blob)
1009+
.ok_or(LookupVerifyError::UnrequestedIndex(index as u64))? = Some(blob)
9041010
}
9051011
Ok(fixed_list)
9061012
}

beacon_node/network/src/sync/network_context/requests.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,19 @@ use types::{
99
blob_sidecar::BlobIdentifier, BlobSidecar, ChainSpec, EthSpec, Hash256, SignedBeaconBlock,
1010
};
1111

12+
pub use data_columns_by_root::{
13+
ActiveDataColumnsByRootRequest, DataColumnsByRootSingleBlockRequest,
14+
};
15+
16+
mod data_columns_by_root;
17+
1218
#[derive(Debug, PartialEq, Eq, IntoStaticStr)]
1319
pub enum LookupVerifyError {
1420
NoResponseReturned,
1521
NotEnoughResponsesReturned { expected: usize, actual: usize },
1622
TooManyResponses,
1723
UnrequestedBlockRoot(Hash256),
18-
UnrequestedBlobIndex(u64),
24+
UnrequestedIndex(u64),
1925
InvalidInclusionProof,
2026
DuplicateData,
2127
}
@@ -131,7 +137,7 @@ impl<E: EthSpec> ActiveBlobsByRootRequest<E> {
131137
return Err(LookupVerifyError::InvalidInclusionProof);
132138
}
133139
if !self.request.indices.contains(&blob.index) {
134-
return Err(LookupVerifyError::UnrequestedBlobIndex(blob.index));
140+
return Err(LookupVerifyError::UnrequestedIndex(blob.index));
135141
}
136142
if self.blobs.iter().any(|b| b.index == blob.index) {
137143
return Err(LookupVerifyError::DuplicateData);

0 commit comments

Comments
 (0)