From 455a26a3110d3a038da8d26082cd4b091328910a Mon Sep 17 00:00:00 2001 From: Josh Lind <josh.lind@hotmail.com> Date: Thu, 23 Jan 2025 17:13:15 -0500 Subject: [PATCH] [Consensus Observer] Fix new pipeline. --- .../src/config/consensus_observer_config.rs | 4 +- .../observer/consensus_observer.rs | 61 ++++++++++++------- consensus/src/epoch_manager.rs | 2 + consensus/src/pipeline/buffer_manager.rs | 9 ++- .../src/pipeline/decoupled_execution_utils.rs | 2 + consensus/src/pipeline/execution_client.rs | 6 ++ consensus/src/pipeline/pipeline_builder.rs | 7 ++- .../pipeline/tests/buffer_manager_tests.rs | 1 + .../src/test_utils/mock_execution_client.rs | 1 + 9 files changed, 65 insertions(+), 28 deletions(-) diff --git a/config/src/config/consensus_observer_config.rs b/config/src/config/consensus_observer_config.rs index 602542ba55ac1..fb82275b91d81 100644 --- a/config/src/config/consensus_observer_config.rs +++ b/config/src/config/consensus_observer_config.rs @@ -64,7 +64,7 @@ impl Default for ConsensusObserverConfig { Self { observer_enabled: false, publisher_enabled: false, - enable_pipeline: false, + enable_pipeline: true, max_network_channel_size: 1000, max_parallel_serialization_tasks: num_cpus::get(), // Default to the number of CPUs network_request_timeout_ms: 5_000, // 5 seconds @@ -77,7 +77,7 @@ impl Default for ConsensusObserverConfig { subscription_peer_change_interval_ms: 180_000, // 3 minutes subscription_refresh_interval_ms: 600_000, // 10 minutes observer_fallback_duration_ms: 600_000, // 10 minutes - observer_fallback_startup_period_ms: 60_000, // 60 seconds + observer_fallback_startup_period_ms: 90_000, // 90 seconds observer_fallback_progress_threshold_ms: 10_000, // 10 seconds observer_fallback_sync_lag_threshold_ms: 15_000, // 15 seconds } diff --git a/consensus/src/consensus_observer/observer/consensus_observer.rs b/consensus/src/consensus_observer/observer/consensus_observer.rs index 52617d044ff2e..b8539f97ead83 100644 --- a/consensus/src/consensus_observer/observer/consensus_observer.rs +++ b/consensus/src/consensus_observer/observer/consensus_observer.rs @@ -266,11 +266,38 @@ impl ConsensusObserver { async fn finalize_ordered_block(&mut self, ordered_block: OrderedBlock) { info!( LogSchema::new(LogEntry::ConsensusObserver).message(&format!( - "Forwarding ordered blocks to the execution pipeline: {}", - ordered_block.proof_block_info() + "Forwarding ordered blocks to the execution pipeline: {}. New pipeline enabled: {}", + ordered_block.proof_block_info(), + self.pipeline_enabled() )) ); + // If the pipeline is enabled, send the ordered blocks to the pipeline. + // Note: we still want to call finalize_order() below as the buffer manager + // differentiates based on whether the futures are present or not. + if self.pipeline_enabled() { + for block in ordered_block.blocks() { + // Create the commit callback (to be called after the execution pipeline) + let commit_callback = self.active_observer_state.create_commit_callback( + self.ordered_block_store.clone(), + self.block_payload_store.clone(), + ); + + // Send the ordered block to the execution pipeline + if let Some(pipeline_futs) = self.get_last_pipeline_futs() { + self.pipeline_builder() + .build(block, pipeline_futs, commit_callback); + } else { + warn!( + LogSchema::new(LogEntry::ConsensusObserver).message(&format!( + "Parent block's pipeline futures for ordered block is missing! Ignoring: {:?}", + ordered_block.proof_block_info() + )) + ); + } + } + } + // Create the commit callback (to be called after the execution pipeline) let commit_callback = self .active_observer_state @@ -354,17 +381,19 @@ impl ConsensusObserver { } } - /// Returns the last block's pipeline futures, should only be called when pipeline is enabled - fn get_last_pipeline_futs(&self) -> PipelineFutures { + /// Returns the last block's pipeline futures (should only be called when pipeline is enabled) + fn get_last_pipeline_futs(&self) -> Option<PipelineFutures> { if let Some(last_ordered_block) = self.ordered_block_store.lock().get_last_ordered_block() { - last_ordered_block - .pipeline_futs() - .expect("Pipeline futures should exist when enabled") + let result = last_ordered_block.pipeline_futs(); + if result.is_none() { + warn!("last_ordered_block: {:?}", last_ordered_block); + } + result } else { - self.pipeline_builder().build_root( + Some(self.pipeline_builder().build_root( StateComputeResult::new_dummy(), self.active_observer_state.root().clone(), - ) + )) } } @@ -779,19 +808,6 @@ impl ConsensusObserver { metrics::ORDERED_BLOCK_LABEL, ); - if self.pipeline_enabled() { - for block in ordered_block.blocks() { - let commit_callback = self.active_observer_state.create_commit_callback( - self.ordered_block_store.clone(), - self.block_payload_store.clone(), - ); - self.pipeline_builder().build( - block, - self.get_last_pipeline_futs(), - commit_callback, - ); - } - } // Insert the ordered block into the pending blocks self.ordered_block_store .lock() @@ -1008,6 +1024,7 @@ impl ConsensusObserver { None, rand_msg_rx, 0, + self.pipeline_enabled(), ) .await; if self.pipeline_enabled() { diff --git a/consensus/src/epoch_manager.rs b/consensus/src/epoch_manager.rs index 14ed035334525..ba80f3d806e03 100644 --- a/consensus/src/epoch_manager.rs +++ b/consensus/src/epoch_manager.rs @@ -822,6 +822,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> { fast_rand_config.clone(), rand_msg_rx, recovery_data.root_block().round(), + self.config.enable_pipeline, ) .await; let consensus_sk = consensus_key; @@ -1377,6 +1378,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> { fast_rand_config, rand_msg_rx, highest_committed_round, + self.config.enable_pipeline, ) .await; diff --git a/consensus/src/pipeline/buffer_manager.rs b/consensus/src/pipeline/buffer_manager.rs index 633fb9780b6f3..eac32a2b32abe 100644 --- a/consensus/src/pipeline/buffer_manager.rs +++ b/consensus/src/pipeline/buffer_manager.rs @@ -174,6 +174,7 @@ pub struct BufferManager { // If the buffer manager receives a commit vote for a block that is not in buffer items, then // the vote will be cached. We can cache upto max_pending_rounds_in_commit_vote_cache (100) blocks. pending_commit_votes: BTreeMap<Round, HashMap<AccountAddress, CommitVote>>, + new_pipeline_enabled: bool, } impl BufferManager { @@ -205,6 +206,7 @@ impl BufferManager { consensus_observer_config: ConsensusObserverConfig, consensus_publisher: Option<Arc<ConsensusPublisher>>, max_pending_rounds_in_commit_vote_cache: u64, + new_pipeline_enabled: bool, ) -> Self { let buffer = Buffer::<BufferItem>::new(); @@ -269,6 +271,7 @@ impl BufferManager { max_pending_rounds_in_commit_vote_cache, pending_commit_votes: BTreeMap::new(), + new_pipeline_enabled, } } @@ -995,8 +998,10 @@ impl BufferManager { }}); }, _ = self.execution_schedule_retry_rx.next() => { - monitor!("buffer_manager_process_execution_schedule_retry", - self.retry_schedule_phase().await); + if !self.new_pipeline_enabled { + monitor!("buffer_manager_process_execution_schedule_retry", + self.retry_schedule_phase().await); + } }, Some(response) = self.signing_phase_rx.next() => { monitor!("buffer_manager_process_signing_response", { diff --git a/consensus/src/pipeline/decoupled_execution_utils.rs b/consensus/src/pipeline/decoupled_execution_utils.rs index 0fedc3eb5a82d..1f3488031f284 100644 --- a/consensus/src/pipeline/decoupled_execution_utils.rs +++ b/consensus/src/pipeline/decoupled_execution_utils.rs @@ -45,6 +45,7 @@ pub fn prepare_phases_and_buffer_manager( consensus_observer_config: ConsensusObserverConfig, consensus_publisher: Option<Arc<ConsensusPublisher>>, max_pending_rounds_in_commit_vote_cache: u64, + new_pipeline_enabled: bool, ) -> ( PipelinePhase<ExecutionSchedulePhase>, PipelinePhase<ExecutionWaitPhase>, @@ -137,6 +138,7 @@ pub fn prepare_phases_and_buffer_manager( consensus_observer_config, consensus_publisher, max_pending_rounds_in_commit_vote_cache, + new_pipeline_enabled, ), ) } diff --git a/consensus/src/pipeline/execution_client.rs b/consensus/src/pipeline/execution_client.rs index aa7aef606d1a9..cab4b9bf15cb3 100644 --- a/consensus/src/pipeline/execution_client.rs +++ b/consensus/src/pipeline/execution_client.rs @@ -70,6 +70,7 @@ pub trait TExecutionClient: Send + Sync { fast_rand_config: Option<RandConfig>, rand_msg_rx: aptos_channel::Receiver<AccountAddress, IncomingRandGenRequest>, highest_committed_round: Round, + new_pipeline_enabled: bool, ); /// This is needed for some DAG tests. Clean this up as a TODO. @@ -208,6 +209,7 @@ impl ExecutionProxyClient { buffer_manager_back_pressure_enabled: bool, consensus_observer_config: ConsensusObserverConfig, consensus_publisher: Option<Arc<ConsensusPublisher>>, + new_pipeline_enabled: bool, ) { let network_sender = NetworkSender::new( self.author, @@ -295,6 +297,7 @@ impl ExecutionProxyClient { consensus_publisher, self.consensus_config .max_pending_rounds_in_commit_vote_cache, + new_pipeline_enabled, ); tokio::spawn(execution_schedule_phase.start()); @@ -320,6 +323,7 @@ impl TExecutionClient for ExecutionProxyClient { fast_rand_config: Option<RandConfig>, rand_msg_rx: aptos_channel::Receiver<AccountAddress, IncomingRandGenRequest>, highest_committed_round: Round, + new_pipeline_enabled: bool, ) { let maybe_rand_msg_tx = self.spawn_decoupled_execution( maybe_consensus_key, @@ -333,6 +337,7 @@ impl TExecutionClient for ExecutionProxyClient { self.consensus_config.enable_pre_commit, self.consensus_observer_config, self.consensus_publisher.clone(), + new_pipeline_enabled, ); let transaction_shuffler = @@ -536,6 +541,7 @@ impl TExecutionClient for DummyExecutionClient { _fast_rand_config: Option<RandConfig>, _rand_msg_rx: aptos_channel::Receiver<AccountAddress, IncomingRandGenRequest>, _highest_committed_round: Round, + _new_pipeline_enabled: bool, ) { } diff --git a/consensus/src/pipeline/pipeline_builder.rs b/consensus/src/pipeline/pipeline_builder.rs index 85ddac2f87e37..7c5790e90273e 100644 --- a/consensus/src/pipeline/pipeline_builder.rs +++ b/consensus/src/pipeline/pipeline_builder.rs @@ -82,7 +82,10 @@ fn spawn_shared_fut< async move { match join_handle.await { Ok(Ok(res)) => Ok(res), - Ok(Err(e)) => Err(TaskError::PropagatedError(Box::new(e))), + Ok(e @ Err(TaskError::PropagatedError(_))) => e, + Ok(Err(e @ TaskError::InternalError(_) | e @ TaskError::JoinError(_))) => { + Err(TaskError::PropagatedError(Box::new(e))) + }, Err(e) => Err(TaskError::JoinError(Arc::new(e))), } } @@ -151,7 +154,7 @@ impl Tracker { .with_label_values(&[self.name, "work_time"]) .observe(work_time.as_secs_f64()); info!( - "[Pipeline] Block {} {} {} finishes {}, waits {}, takes {}", + "[Pipeline] Block {} {} {} finishes {}, waits {}ms, takes {}ms", self.block_id, self.epoch, self.round, diff --git a/consensus/src/pipeline/tests/buffer_manager_tests.rs b/consensus/src/pipeline/tests/buffer_manager_tests.rs index 574ec837054b9..d89c37cd56060 100644 --- a/consensus/src/pipeline/tests/buffer_manager_tests.rs +++ b/consensus/src/pipeline/tests/buffer_manager_tests.rs @@ -162,6 +162,7 @@ pub fn prepare_buffer_manager( ConsensusObserverConfig::default(), None, 100, + true, ); ( diff --git a/consensus/src/test_utils/mock_execution_client.rs b/consensus/src/test_utils/mock_execution_client.rs index d957449ed883f..dcc5aa648fdfd 100644 --- a/consensus/src/test_utils/mock_execution_client.rs +++ b/consensus/src/test_utils/mock_execution_client.rs @@ -108,6 +108,7 @@ impl TExecutionClient for MockExecutionClient { _fast_rand_config: Option<RandConfig>, _rand_msg_rx: aptos_channel::Receiver<AccountAddress, IncomingRandGenRequest>, _highest_committed_round: Round, + _new_pipeline_enabled: bool, ) { }