From 455a26a3110d3a038da8d26082cd4b091328910a Mon Sep 17 00:00:00 2001
From: Josh Lind <josh.lind@hotmail.com>
Date: Thu, 23 Jan 2025 17:13:15 -0500
Subject: [PATCH] [Consensus Observer] Fix new pipeline.

---
 .../src/config/consensus_observer_config.rs   |  4 +-
 .../observer/consensus_observer.rs            | 61 ++++++++++++-------
 consensus/src/epoch_manager.rs                |  2 +
 consensus/src/pipeline/buffer_manager.rs      |  9 ++-
 .../src/pipeline/decoupled_execution_utils.rs |  2 +
 consensus/src/pipeline/execution_client.rs    |  6 ++
 consensus/src/pipeline/pipeline_builder.rs    |  7 ++-
 .../pipeline/tests/buffer_manager_tests.rs    |  1 +
 .../src/test_utils/mock_execution_client.rs   |  1 +
 9 files changed, 65 insertions(+), 28 deletions(-)

diff --git a/config/src/config/consensus_observer_config.rs b/config/src/config/consensus_observer_config.rs
index 602542ba55ac1..fb82275b91d81 100644
--- a/config/src/config/consensus_observer_config.rs
+++ b/config/src/config/consensus_observer_config.rs
@@ -64,7 +64,7 @@ impl Default for ConsensusObserverConfig {
         Self {
             observer_enabled: false,
             publisher_enabled: false,
-            enable_pipeline: false,
+            enable_pipeline: true,
             max_network_channel_size: 1000,
             max_parallel_serialization_tasks: num_cpus::get(), // Default to the number of CPUs
             network_request_timeout_ms: 5_000,                 // 5 seconds
@@ -77,7 +77,7 @@ impl Default for ConsensusObserverConfig {
             subscription_peer_change_interval_ms: 180_000,     // 3 minutes
             subscription_refresh_interval_ms: 600_000,         // 10 minutes
             observer_fallback_duration_ms: 600_000,            // 10 minutes
-            observer_fallback_startup_period_ms: 60_000,       // 60 seconds
+            observer_fallback_startup_period_ms: 90_000,       // 90 seconds
             observer_fallback_progress_threshold_ms: 10_000,   // 10 seconds
             observer_fallback_sync_lag_threshold_ms: 15_000,   // 15 seconds
         }
diff --git a/consensus/src/consensus_observer/observer/consensus_observer.rs b/consensus/src/consensus_observer/observer/consensus_observer.rs
index 52617d044ff2e..b8539f97ead83 100644
--- a/consensus/src/consensus_observer/observer/consensus_observer.rs
+++ b/consensus/src/consensus_observer/observer/consensus_observer.rs
@@ -266,11 +266,38 @@ impl ConsensusObserver {
     async fn finalize_ordered_block(&mut self, ordered_block: OrderedBlock) {
         info!(
             LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
-                "Forwarding ordered blocks to the execution pipeline: {}",
-                ordered_block.proof_block_info()
+                "Forwarding ordered blocks to the execution pipeline: {}. New pipeline enabled: {}",
+                ordered_block.proof_block_info(),
+                self.pipeline_enabled()
             ))
         );
 
+        // If the pipeline is enabled, send the ordered blocks to the pipeline.
+        // Note: we still want to call finalize_order() below as the buffer manager
+        // differentiates based on whether the futures are present or not.
+        if self.pipeline_enabled() {
+            for block in ordered_block.blocks() {
+                // Create the commit callback (to be called after the execution pipeline)
+                let commit_callback = self.active_observer_state.create_commit_callback(
+                    self.ordered_block_store.clone(),
+                    self.block_payload_store.clone(),
+                );
+
+                // Send the ordered block to the execution pipeline
+                if let Some(pipeline_futs) = self.get_last_pipeline_futs() {
+                    self.pipeline_builder()
+                        .build(block, pipeline_futs, commit_callback);
+                } else {
+                    warn!(
+                        LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
+                            "Parent block's pipeline futures for ordered block is missing! Ignoring: {:?}",
+                            ordered_block.proof_block_info()
+                        ))
+                    );
+                }
+            }
+        }
+
         // Create the commit callback (to be called after the execution pipeline)
         let commit_callback = self
             .active_observer_state
@@ -354,17 +381,19 @@ impl ConsensusObserver {
         }
     }
 
-    /// Returns the last block's pipeline futures, should only be called when pipeline is enabled
-    fn get_last_pipeline_futs(&self) -> PipelineFutures {
+    /// Returns the last block's pipeline futures (should only be called when pipeline is enabled)
+    fn get_last_pipeline_futs(&self) -> Option<PipelineFutures> {
         if let Some(last_ordered_block) = self.ordered_block_store.lock().get_last_ordered_block() {
-            last_ordered_block
-                .pipeline_futs()
-                .expect("Pipeline futures should exist when enabled")
+            let result = last_ordered_block.pipeline_futs();
+            if result.is_none() {
+                warn!("last_ordered_block: {:?}", last_ordered_block);
+            }
+            result
         } else {
-            self.pipeline_builder().build_root(
+            Some(self.pipeline_builder().build_root(
                 StateComputeResult::new_dummy(),
                 self.active_observer_state.root().clone(),
-            )
+            ))
         }
     }
 
@@ -779,19 +808,6 @@ impl ConsensusObserver {
                 metrics::ORDERED_BLOCK_LABEL,
             );
 
-            if self.pipeline_enabled() {
-                for block in ordered_block.blocks() {
-                    let commit_callback = self.active_observer_state.create_commit_callback(
-                        self.ordered_block_store.clone(),
-                        self.block_payload_store.clone(),
-                    );
-                    self.pipeline_builder().build(
-                        block,
-                        self.get_last_pipeline_futs(),
-                        commit_callback,
-                    );
-                }
-            }
             // Insert the ordered block into the pending blocks
             self.ordered_block_store
                 .lock()
@@ -1008,6 +1024,7 @@ impl ConsensusObserver {
                 None,
                 rand_msg_rx,
                 0,
+                self.pipeline_enabled(),
             )
             .await;
         if self.pipeline_enabled() {
diff --git a/consensus/src/epoch_manager.rs b/consensus/src/epoch_manager.rs
index 14ed035334525..ba80f3d806e03 100644
--- a/consensus/src/epoch_manager.rs
+++ b/consensus/src/epoch_manager.rs
@@ -822,6 +822,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
                 fast_rand_config.clone(),
                 rand_msg_rx,
                 recovery_data.root_block().round(),
+                self.config.enable_pipeline,
             )
             .await;
         let consensus_sk = consensus_key;
@@ -1377,6 +1378,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
                 fast_rand_config,
                 rand_msg_rx,
                 highest_committed_round,
+                self.config.enable_pipeline,
             )
             .await;
 
diff --git a/consensus/src/pipeline/buffer_manager.rs b/consensus/src/pipeline/buffer_manager.rs
index 633fb9780b6f3..eac32a2b32abe 100644
--- a/consensus/src/pipeline/buffer_manager.rs
+++ b/consensus/src/pipeline/buffer_manager.rs
@@ -174,6 +174,7 @@ pub struct BufferManager {
     // If the buffer manager receives a commit vote for a block that is not in buffer items, then
     // the vote will be cached. We can cache upto max_pending_rounds_in_commit_vote_cache (100) blocks.
     pending_commit_votes: BTreeMap<Round, HashMap<AccountAddress, CommitVote>>,
+    new_pipeline_enabled: bool,
 }
 
 impl BufferManager {
@@ -205,6 +206,7 @@ impl BufferManager {
         consensus_observer_config: ConsensusObserverConfig,
         consensus_publisher: Option<Arc<ConsensusPublisher>>,
         max_pending_rounds_in_commit_vote_cache: u64,
+        new_pipeline_enabled: bool,
     ) -> Self {
         let buffer = Buffer::<BufferItem>::new();
 
@@ -269,6 +271,7 @@ impl BufferManager {
 
             max_pending_rounds_in_commit_vote_cache,
             pending_commit_votes: BTreeMap::new(),
+            new_pipeline_enabled,
         }
     }
 
@@ -995,8 +998,10 @@ impl BufferManager {
                     }});
                 },
                 _ = self.execution_schedule_retry_rx.next() => {
-                    monitor!("buffer_manager_process_execution_schedule_retry",
-                        self.retry_schedule_phase().await);
+                    if !self.new_pipeline_enabled {
+                        monitor!("buffer_manager_process_execution_schedule_retry",
+                            self.retry_schedule_phase().await);
+                    }
                 },
                 Some(response) = self.signing_phase_rx.next() => {
                     monitor!("buffer_manager_process_signing_response", {
diff --git a/consensus/src/pipeline/decoupled_execution_utils.rs b/consensus/src/pipeline/decoupled_execution_utils.rs
index 0fedc3eb5a82d..1f3488031f284 100644
--- a/consensus/src/pipeline/decoupled_execution_utils.rs
+++ b/consensus/src/pipeline/decoupled_execution_utils.rs
@@ -45,6 +45,7 @@ pub fn prepare_phases_and_buffer_manager(
     consensus_observer_config: ConsensusObserverConfig,
     consensus_publisher: Option<Arc<ConsensusPublisher>>,
     max_pending_rounds_in_commit_vote_cache: u64,
+    new_pipeline_enabled: bool,
 ) -> (
     PipelinePhase<ExecutionSchedulePhase>,
     PipelinePhase<ExecutionWaitPhase>,
@@ -137,6 +138,7 @@ pub fn prepare_phases_and_buffer_manager(
             consensus_observer_config,
             consensus_publisher,
             max_pending_rounds_in_commit_vote_cache,
+            new_pipeline_enabled,
         ),
     )
 }
diff --git a/consensus/src/pipeline/execution_client.rs b/consensus/src/pipeline/execution_client.rs
index aa7aef606d1a9..cab4b9bf15cb3 100644
--- a/consensus/src/pipeline/execution_client.rs
+++ b/consensus/src/pipeline/execution_client.rs
@@ -70,6 +70,7 @@ pub trait TExecutionClient: Send + Sync {
         fast_rand_config: Option<RandConfig>,
         rand_msg_rx: aptos_channel::Receiver<AccountAddress, IncomingRandGenRequest>,
         highest_committed_round: Round,
+        new_pipeline_enabled: bool,
     );
 
     /// This is needed for some DAG tests. Clean this up as a TODO.
@@ -208,6 +209,7 @@ impl ExecutionProxyClient {
         buffer_manager_back_pressure_enabled: bool,
         consensus_observer_config: ConsensusObserverConfig,
         consensus_publisher: Option<Arc<ConsensusPublisher>>,
+        new_pipeline_enabled: bool,
     ) {
         let network_sender = NetworkSender::new(
             self.author,
@@ -295,6 +297,7 @@ impl ExecutionProxyClient {
             consensus_publisher,
             self.consensus_config
                 .max_pending_rounds_in_commit_vote_cache,
+            new_pipeline_enabled,
         );
 
         tokio::spawn(execution_schedule_phase.start());
@@ -320,6 +323,7 @@ impl TExecutionClient for ExecutionProxyClient {
         fast_rand_config: Option<RandConfig>,
         rand_msg_rx: aptos_channel::Receiver<AccountAddress, IncomingRandGenRequest>,
         highest_committed_round: Round,
+        new_pipeline_enabled: bool,
     ) {
         let maybe_rand_msg_tx = self.spawn_decoupled_execution(
             maybe_consensus_key,
@@ -333,6 +337,7 @@ impl TExecutionClient for ExecutionProxyClient {
             self.consensus_config.enable_pre_commit,
             self.consensus_observer_config,
             self.consensus_publisher.clone(),
+            new_pipeline_enabled,
         );
 
         let transaction_shuffler =
@@ -536,6 +541,7 @@ impl TExecutionClient for DummyExecutionClient {
         _fast_rand_config: Option<RandConfig>,
         _rand_msg_rx: aptos_channel::Receiver<AccountAddress, IncomingRandGenRequest>,
         _highest_committed_round: Round,
+        _new_pipeline_enabled: bool,
     ) {
     }
 
diff --git a/consensus/src/pipeline/pipeline_builder.rs b/consensus/src/pipeline/pipeline_builder.rs
index 85ddac2f87e37..7c5790e90273e 100644
--- a/consensus/src/pipeline/pipeline_builder.rs
+++ b/consensus/src/pipeline/pipeline_builder.rs
@@ -82,7 +82,10 @@ fn spawn_shared_fut<
     async move {
         match join_handle.await {
             Ok(Ok(res)) => Ok(res),
-            Ok(Err(e)) => Err(TaskError::PropagatedError(Box::new(e))),
+            Ok(e @ Err(TaskError::PropagatedError(_))) => e,
+            Ok(Err(e @ TaskError::InternalError(_) | e @ TaskError::JoinError(_))) => {
+                Err(TaskError::PropagatedError(Box::new(e)))
+            },
             Err(e) => Err(TaskError::JoinError(Arc::new(e))),
         }
     }
@@ -151,7 +154,7 @@ impl Tracker {
             .with_label_values(&[self.name, "work_time"])
             .observe(work_time.as_secs_f64());
         info!(
-            "[Pipeline] Block {} {} {} finishes {}, waits {}, takes {}",
+            "[Pipeline] Block {} {} {} finishes {}, waits {}ms, takes {}ms",
             self.block_id,
             self.epoch,
             self.round,
diff --git a/consensus/src/pipeline/tests/buffer_manager_tests.rs b/consensus/src/pipeline/tests/buffer_manager_tests.rs
index 574ec837054b9..d89c37cd56060 100644
--- a/consensus/src/pipeline/tests/buffer_manager_tests.rs
+++ b/consensus/src/pipeline/tests/buffer_manager_tests.rs
@@ -162,6 +162,7 @@ pub fn prepare_buffer_manager(
         ConsensusObserverConfig::default(),
         None,
         100,
+        true,
     );
 
     (
diff --git a/consensus/src/test_utils/mock_execution_client.rs b/consensus/src/test_utils/mock_execution_client.rs
index d957449ed883f..dcc5aa648fdfd 100644
--- a/consensus/src/test_utils/mock_execution_client.rs
+++ b/consensus/src/test_utils/mock_execution_client.rs
@@ -108,6 +108,7 @@ impl TExecutionClient for MockExecutionClient {
         _fast_rand_config: Option<RandConfig>,
         _rand_msg_rx: aptos_channel::Receiver<AccountAddress, IncomingRandGenRequest>,
         _highest_committed_round: Round,
+        _new_pipeline_enabled: bool,
     ) {
     }