Skip to content

Commit

Permalink
Add P1 queue to BP
Browse files Browse the repository at this point in the history
  • Loading branch information
paulhauner committed Jul 4, 2023
1 parent be09f9e commit 1328e72
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 2 deletions.
19 changes: 18 additions & 1 deletion beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ const MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN: usize = 1_024;
/// they begin to be dropped.
const MAX_API_REQUEST_P0_QUEUE_LEN: usize = 1_024;

/// The maximum number of priority-1 (second-highest priority) messages that will be queued before
/// they begin to be dropped.
const MAX_API_REQUEST_P1_QUEUE_LEN: usize = 1_024;

/// The name of the manager tokio task.
const MANAGER_TASK_NAME: &str = "beacon_processor_manager";

Expand Down Expand Up @@ -220,6 +224,7 @@ pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate";
pub const UNKNOWN_LIGHT_CLIENT_UPDATE: &str = "unknown_light_client_update";
pub const GOSSIP_BLS_TO_EXECUTION_CHANGE: &str = "gossip_bls_to_execution_change";
pub const API_REQUEST_P0: &str = "api_request_p0";
pub const API_REQUEST_P1: &str = "api_request_p1";

/// A simple first-in-first-out queue with a maximum length.
struct FifoQueue<T> {
Expand Down Expand Up @@ -533,6 +538,7 @@ pub enum Work<E: EthSpec> {
GossipBlsToExecutionChange(BlockingFn),
LightClientBootstrapRequest(BlockingFn),
ApiRequestP0(BlockingOrAsync),
ApiRequestP1(BlockingOrAsync),
}

impl<E: EthSpec> fmt::Debug for Work<E> {
Expand Down Expand Up @@ -571,6 +577,7 @@ impl<E: EthSpec> Work<E> {
Work::GossipBlsToExecutionChange(_) => GOSSIP_BLS_TO_EXECUTION_CHANGE,
Work::UnknownLightClientOptimisticUpdate { .. } => UNKNOWN_LIGHT_CLIENT_UPDATE,
Work::ApiRequestP0 { .. } => API_REQUEST_P0,
Work::ApiRequestP1 { .. } => API_REQUEST_P1,
}
}
}
Expand Down Expand Up @@ -725,6 +732,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
let mut lcbootstrap_queue = FifoQueue::new(MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN);

let mut api_request_p0_queue = FifoQueue::new(MAX_API_REQUEST_P0_QUEUE_LEN);
let mut api_request_p1_queue = FifoQueue::new(MAX_API_REQUEST_P1_QUEUE_LEN);

// Channels for sending work to the re-process scheduler (`work_reprocessing_tx`) and to
// receive them back once they are ready (`ready_work_rx`).
Expand Down Expand Up @@ -1018,6 +1026,12 @@ impl<E: EthSpec> BeaconProcessor<E> {
self.spawn_worker(item, idle_tx);
} else if let Some(item) = gossip_bls_to_execution_change_queue.pop() {
self.spawn_worker(item, idle_tx);
// Check the priority 1 API requests after we've
// processed all the interesting things from the network
// and things required for us to stay in good repute
// with our P2P peers.
} else if let Some(item) = api_request_p0_queue.pop() {
self.spawn_worker(item, idle_tx);
// Handle backfill sync chain segments.
} else if let Some(item) = backfill_chain_segment.pop() {
self.spawn_worker(item, idle_tx);
Expand Down Expand Up @@ -1143,6 +1157,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::ApiRequestP0 { .. } => {
api_request_p0_queue.push(work, work_id, &self.log)
}
Work::ApiRequestP1 { .. } => {
api_request_p1_queue.push(work, work_id, &self.log)
}
}
}
}
Expand Down Expand Up @@ -1314,7 +1331,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
task_spawner.spawn_blocking_with_manual_send_idle(work)
}
Work::ChainSegmentBackfill(process_fn) => task_spawner.spawn_async(process_fn),
Work::ApiRequestP0(process_fn) => match process_fn {
Work::ApiRequestP0(process_fn) | Work::ApiRequestP1(process_fn) => match process_fn {
BlockingOrAsync::Blocking(process_fn) => task_spawner.spawn_blocking(process_fn),
BlockingOrAsync::Async(process_fn) => task_spawner.spawn_async(process_fn),
},
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/http_api/src/task_spawner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ impl Priority {
fn work_event<E: EthSpec>(&self, process_fn: BlockingOrAsync) -> WorkEvent<E> {
let work = match self {
Priority::P0 => Work::ApiRequestP0(process_fn),
Priority::P1 => unimplemented!("P1"),
Priority::P1 => Work::ApiRequestP1(process_fn),
};
WorkEvent {
drop_during_sync: false,
Expand Down

0 comments on commit 1328e72

Please sign in to comment.