Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generalize sync ActiveRequests #6398

Merged
merged 13 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions beacon_node/lighthouse_network/src/service/api_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,6 @@ pub struct SingleLookupReqId {
pub req_id: Id,
}

/// Request ID for data_columns_by_root requests. Block lookup do not issue this requests directly.
/// Wrapping this particular req_id, ensures not mixing this requests with a custody req_id.
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub struct DataColumnsByRootRequestId(pub Id);

/// Id of rpc requests sent by sync to the network.
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum SyncRequestId {
Expand All @@ -35,11 +30,19 @@ pub enum SyncRequestId {
/// Request searching for a set of blobs given a hash.
SingleBlob { id: SingleLookupReqId },
/// Request searching for a set of data columns given a hash and list of column indices.
DataColumnsByRoot(DataColumnsByRootRequestId, DataColumnsByRootRequester),
DataColumnsByRoot(DataColumnsByRootRequestId),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having the requester inside the ID just simplifies downstream code and makes it symmetrical with the other variants

/// Range request that is composed by both a block range request and a blob range request.
RangeBlockAndBlobs { id: Id },
}

/// Request ID for data_columns_by_root requests. Block lookups do not issue this request directly.
/// Wrapping this particular req_id, ensures not mixing this request with a custody req_id.
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub struct DataColumnsByRootRequestId {
pub id: Id,
pub requester: DataColumnsByRootRequester,
}

#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum DataColumnsByRootRequester {
Sampling(SamplingId),
Expand Down Expand Up @@ -173,8 +176,9 @@ impl slog::Value for RequestId {
}
}

// This custom impl reduces log boilerplate not printing `DataColumnsByRootRequestId` on each id log
impl std::fmt::Display for DataColumnsByRootRequestId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
write!(f, "{} {:?}", self.id, self.requester)
}
}
58 changes: 20 additions & 38 deletions beacon_node/network/src/sync/block_lookups/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use beacon_chain::{
use beacon_processor::WorkEvent;
use lighthouse_network::rpc::{RPCError, RequestType, RpcErrorResponse};
use lighthouse_network::service::api_types::{
AppRequestId, DataColumnsByRootRequester, Id, SamplingRequester, SingleLookupReqId,
SyncRequestId,
AppRequestId, DataColumnsByRootRequestId, DataColumnsByRootRequester, Id, SamplingRequester,
SingleLookupReqId, SyncRequestId,
};
use lighthouse_network::types::SyncState;
use lighthouse_network::NetworkConfig;
Expand Down Expand Up @@ -745,10 +745,10 @@ impl TestRig {
let first_dc = data_columns.first().unwrap();
let block_root = first_dc.block_root();
let sampling_request_id = match id.0 {
SyncRequestId::DataColumnsByRoot(
_,
_requester @ DataColumnsByRootRequester::Sampling(sampling_id),
) => sampling_id.sampling_request_id,
SyncRequestId::DataColumnsByRoot(DataColumnsByRootRequestId {
requester: DataColumnsByRootRequester::Sampling(sampling_id),
..
}) => sampling_id.sampling_request_id,
_ => unreachable!(),
};
self.complete_data_columns_by_root_request(id, data_columns);
Expand All @@ -773,14 +773,15 @@ impl TestRig {
data_columns: Vec<Arc<DataColumnSidecar<E>>>,
missing_components: bool,
) {
let lookup_id =
if let SyncRequestId::DataColumnsByRoot(_, DataColumnsByRootRequester::Custody(id)) =
ids.first().unwrap().0
{
id.requester.0.lookup_id
} else {
panic!("not a custody requester")
};
let lookup_id = if let SyncRequestId::DataColumnsByRoot(DataColumnsByRootRequestId {
requester: DataColumnsByRootRequester::Custody(id),
..
}) = ids.first().unwrap().0
{
id.requester.0.lookup_id
} else {
panic!("not a custody requester")
};

let first_column = data_columns.first().cloned().unwrap();

Expand Down Expand Up @@ -1189,6 +1190,7 @@ impl TestRig {
penalty_msg, expect_penalty_msg,
"Unexpected penalty msg for {peer_id}"
);
self.log(&format!("Found expected penalty {penalty_msg}"));
}

pub fn expect_single_penalty(&mut self, peer_id: PeerId, expect_penalty_msg: &'static str) {
Expand Down Expand Up @@ -1416,7 +1418,7 @@ fn test_single_block_lookup_empty_response() {

// The peer does not have the block. It should be penalized.
r.single_lookup_block_response(id, peer_id, None);
r.expect_penalty(peer_id, "NoResponseReturned");
r.expect_penalty(peer_id, "NotEnoughResponsesReturned");
// it should be retried
let id = r.expect_block_lookup_request(block_root);
// Send the right block this time.
Expand Down Expand Up @@ -2160,7 +2162,7 @@ fn sampling_batch_requests_not_enough_responses_returned() {
r.assert_sampling_request_ongoing(block_root, &column_indexes);

// Split the indexes to simulate the case where the supernode doesn't have the requested column.
let (_column_indexes_supernode_does_not_have, column_indexes_to_complete) =
let (column_indexes_supernode_does_not_have, column_indexes_to_complete) =
column_indexes.split_at(1);

// Complete the requests but only partially, so a NotEnoughResponsesReturned error occurs.
Expand All @@ -2176,7 +2178,7 @@ fn sampling_batch_requests_not_enough_responses_returned() {

// The request status should be set to NoPeers since the supernode, the only peer, returned not enough responses.
r.log_sampling_requests(block_root, &column_indexes);
r.assert_sampling_request_nopeers(block_root, &column_indexes);
r.assert_sampling_request_nopeers(block_root, column_indexes_supernode_does_not_have);

// The sampling request stalls.
r.expect_empty_network();
Expand Down Expand Up @@ -2721,11 +2723,6 @@ mod deneb_only {
self.blobs.pop().expect("blobs");
self
}
fn invalidate_blobs_too_many(mut self) -> Self {
let first_blob = self.blobs.first().expect("blob").clone();
self.blobs.push(first_blob);
self
}
fn expect_block_process(mut self) -> Self {
self.rig.expect_block_process(ResponseType::Block);
self
Expand Down Expand Up @@ -2814,21 +2811,6 @@ mod deneb_only {
.expect_no_block_request();
}

#[test]
fn single_block_response_then_too_many_blobs_response_attestation() {
let Some(tester) = DenebTester::new(RequestTrigger::AttestationUnknownBlock) else {
return;
};
tester
.block_response_triggering_process()
.invalidate_blobs_too_many()
.blobs_response()
.expect_penalty("TooManyResponses")
// Network context returns "download success" because the request has enough blobs + it
// downscores the peer for returning too many.
.expect_no_block_request();
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test needs a scenario where sync receives more blob chunks than originally requested. This can never happen in practice so I choose to not handle this scenario to reduce complexity

// Should never happen, ReqResp network behaviour enforces a max count of chunks
// When `max_remaining_chunks <= 1` a the inbound stream in terminated in
// `rpc/handler.rs`. Handling this case adds complexity for no gain. Even if an
// attacker could abuse this, there's no gain in sending garbage chunks that
// will be ignored anyway.
State::CompletedEarly => None,


// Test peer returning block that has unknown parent, and a new lookup is created
#[test]
fn parent_block_unknown_parent() {
Expand Down Expand Up @@ -2869,7 +2851,7 @@ mod deneb_only {
};
tester
.empty_block_response()
.expect_penalty("NoResponseReturned")
.expect_penalty("NotEnoughResponsesReturned")
.expect_block_request()
.expect_no_blobs_request()
.block_response_and_expect_blob_request()
Expand Down
16 changes: 5 additions & 11 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,13 +472,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
SyncRequestId::SingleBlob { id } => {
self.on_single_blob_response(id, peer_id, RpcEvent::RPCError(error))
}
SyncRequestId::DataColumnsByRoot(req_id, requester) => self
.on_data_columns_by_root_response(
req_id,
requester,
peer_id,
RpcEvent::RPCError(error),
),
SyncRequestId::DataColumnsByRoot(req_id) => {
self.on_data_columns_by_root_response(req_id, peer_id, RpcEvent::RPCError(error))
}
SyncRequestId::RangeBlockAndBlobs { id } => {
if let Some(sender_id) = self.network.range_request_failed(id) {
match sender_id {
Expand Down Expand Up @@ -1104,10 +1100,9 @@ impl<T: BeaconChainTypes> SyncManager<T> {
seen_timestamp: Duration,
) {
match request_id {
SyncRequestId::DataColumnsByRoot(req_id, requester) => {
SyncRequestId::DataColumnsByRoot(req_id) => {
self.on_data_columns_by_root_response(
req_id,
requester,
peer_id,
match data_column {
Some(data_column) => RpcEvent::Response(data_column, seen_timestamp),
Expand Down Expand Up @@ -1149,15 +1144,14 @@ impl<T: BeaconChainTypes> SyncManager<T> {
fn on_data_columns_by_root_response(
&mut self,
req_id: DataColumnsByRootRequestId,
requester: DataColumnsByRootRequester,
peer_id: PeerId,
data_column: RpcEvent<Arc<DataColumnSidecar<T::EthSpec>>>,
) {
if let Some(resp) =
self.network
.on_data_columns_by_root_response(req_id, peer_id, data_column)
{
match requester {
match req_id.requester {
DataColumnsByRootRequester::Sampling(id) => {
if let Some((requester, result)) =
self.sampling
Expand Down
Loading
Loading