Skip to content

Commit

Permalink
Remove duplicate NetworkGlobals
Browse files Browse the repository at this point in the history
  • Loading branch information
paulhauner committed Jul 10, 2023
1 parent fa4770e commit d2e5f9d
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 30 deletions.
1 change: 0 additions & 1 deletion beacon_node/network/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ impl<T: BeaconChainTypes> Router<T> {
crate::sync::manager::spawn(
executor.clone(),
beacon_chain.clone(),
network_globals.clone(),
network_send.clone(),
network_beacon_processor.clone(),
sync_recv,
Expand Down
3 changes: 1 addition & 2 deletions beacon_node/network/src/sync/block_lookups/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl TestRig {
let (network_tx, network_rx) = mpsc::unbounded_channel();
let globals = Arc::new(NetworkGlobals::new_test_globals(Vec::new(), &log));
let (network_beacon_processor, beacon_processor_rx) =
NetworkBeaconProcessor::null_for_testing(globals.clone());
NetworkBeaconProcessor::null_for_testing(globals);
let rng = XorShiftRng::from_seed([42; 16]);
let rig = TestRig {
beacon_processor_rx,
Expand All @@ -56,7 +56,6 @@ impl TestRig {
let cx = {
SyncNetworkContext::new(
network_tx,
globals,
Arc::new(network_beacon_processor),
log.new(slog::o!("component" => "network_context")),
)
Expand Down
35 changes: 15 additions & 20 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,6 @@ pub struct SyncManager<T: BeaconChainTypes> {
/// A reference to the underlying beacon chain.
chain: Arc<BeaconChain<T>>,

/// A reference to the network globals and peer-db.
network_globals: Arc<NetworkGlobals<T::EthSpec>>,

/// A receiving channel sent by the message processor thread.
input_channel: mpsc::UnboundedReceiver<SyncMessage<T::EthSpec>>,

Expand All @@ -186,7 +183,6 @@ pub struct SyncManager<T: BeaconChainTypes> {
pub fn spawn<T: BeaconChainTypes>(
executor: task_executor::TaskExecutor,
beacon_chain: Arc<BeaconChain<T>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
beacon_processor: Arc<NetworkBeaconProcessor<T>>,
sync_recv: mpsc::UnboundedReceiver<SyncMessage<T::EthSpec>>,
Expand All @@ -198,16 +194,11 @@ pub fn spawn<T: BeaconChainTypes>(
);

// create an instance of the SyncManager
let network_globals = beacon_processor.network_globals.clone();
let mut sync_manager = SyncManager {
chain: beacon_chain.clone(),
network_globals: network_globals.clone(),
input_channel: sync_recv,
network: SyncNetworkContext::new(
network_send,
network_globals.clone(),
beacon_processor,
log.clone(),
),
network: SyncNetworkContext::new(network_send, beacon_processor, log.clone()),
range_sync: RangeSync::new(beacon_chain.clone(), log.clone()),
backfill_sync: BackFillSync::new(beacon_chain, network_globals, log.clone()),
block_lookups: BlockLookups::new(log.clone()),
Expand All @@ -220,6 +211,10 @@ pub fn spawn<T: BeaconChainTypes>(
}

impl<T: BeaconChainTypes> SyncManager<T> {
fn network_globals(&self) -> &NetworkGlobals<T::EthSpec> {
self.network.network_globals()
}

/* Input Handling Functions */

/// A peer has connected which has blocks that are unknown to us.
Expand Down Expand Up @@ -320,12 +315,12 @@ impl<T: BeaconChainTypes> SyncManager<T> {
let rpr = new_state.as_str();
// Drop the write lock
let update_sync_status = self
.network_globals
.network_globals()
.peers
.write()
.update_sync_status(peer_id, new_state.clone());
if let Some(was_updated) = update_sync_status {
let is_connected = self.network_globals.peers.read().is_connected(peer_id);
let is_connected = self.network_globals().peers.read().is_connected(peer_id);
if was_updated {
debug!(
self.log,
Expand Down Expand Up @@ -381,7 +376,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
let head = self.chain.best_slot();
let current_slot = self.chain.slot().unwrap_or_else(|_| Slot::new(0));

let peers = self.network_globals.peers.read();
let peers = self.network_globals().peers.read();
if current_slot >= head
&& current_slot.sub(head) <= (SLOT_IMPORT_TOLERANCE as u64)
&& head > 0
Expand Down Expand Up @@ -443,8 +438,8 @@ impl<T: BeaconChainTypes> SyncManager<T> {
},
};

let old_state = self.network_globals.set_sync_state(new_state);
let new_state = self.network_globals.sync_state.read();
let old_state = self.network_globals().set_sync_state(new_state);
let new_state = self.network_globals().sync_state.read().clone();
if !new_state.eq(&old_state) {
info!(self.log, "Sync state updated"; "old_state" => %old_state, "new_state" => %new_state);
// If we have become synced - Subscribe to all the core subnet topics
Expand Down Expand Up @@ -503,7 +498,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
SyncMessage::UnknownBlock(peer_id, block, block_root) => {
// If we are not synced or within SLOT_IMPORT_TOLERANCE of the block, ignore
if !self.network_globals.sync_state.read().is_synced() {
if !self.network_globals().sync_state.read().is_synced() {
let head_slot = self.chain.canonical_head.cached_head().head_slot();
let unknown_block_slot = block.slot();

Expand All @@ -517,7 +512,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
return;
}
}
if self.network_globals.peers.read().is_connected(&peer_id)
if self.network_globals().peers.read().is_connected(&peer_id)
&& self.network.is_execution_engine_online()
{
self.block_lookups
Expand All @@ -526,8 +521,8 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
SyncMessage::UnknownBlockHash(peer_id, block_hash) => {
// If we are not synced, ignore this block.
if self.network_globals.sync_state.read().is_synced()
&& self.network_globals.peers.read().is_connected(&peer_id)
if self.network_globals().sync_state.read().is_synced()
&& self.network_globals().peers.read().is_connected(&peer_id)
&& self.network.is_execution_engine_online()
{
self.block_lookups
Expand Down
11 changes: 5 additions & 6 deletions beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
/// The network channel to relay messages to the Network service.
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,

/// Access to the network global vars.
network_globals: Arc<NetworkGlobals<T::EthSpec>>,

/// A sequential ID for all RPC requests.
request_id: Id,

Expand All @@ -46,14 +43,12 @@ pub struct SyncNetworkContext<T: BeaconChainTypes> {
impl<T: BeaconChainTypes> SyncNetworkContext<T> {
pub fn new(
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>,
network_beacon_processor: Arc<NetworkBeaconProcessor<T>>,
log: slog::Logger,
) -> Self {
Self {
network_send,
execution_engine_state: EngineState::Online, // always assume `Online` at the start
network_globals,
request_id: 1,
range_requests: FnvHashMap::default(),
backfill_requests: FnvHashMap::default(),
Expand All @@ -62,9 +57,13 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
}
}

pub fn network_globals(&self) -> &NetworkGlobals<T::EthSpec> {
&self.network_beacon_processor.network_globals
}

/// Returns the Client type of the peer if known
pub fn client_type(&self, peer_id: &PeerId) -> Client {
self.network_globals
self.network_globals()
.peers
.read()
.peer_info(peer_id)
Expand Down
1 change: 0 additions & 1 deletion beacon_node/network/src/sync/range_sync/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,6 @@ mod tests {
NetworkBeaconProcessor::null_for_testing(globals.clone());
let cx = SyncNetworkContext::new(
network_tx,
globals.clone(),
Arc::new(network_beacon_processor),
log.new(o!("component" => "network_context")),
);
Expand Down

0 comments on commit d2e5f9d

Please sign in to comment.