Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Consensus Observer] Fix new pipeline. #15803

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions config/src/config/consensus_observer_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl Default for ConsensusObserverConfig {
Self {
observer_enabled: false,
publisher_enabled: false,
enable_pipeline: false,
enable_pipeline: true,
max_network_channel_size: 1000,
max_parallel_serialization_tasks: num_cpus::get(), // Default to the number of CPUs
network_request_timeout_ms: 5_000, // 5 seconds
Expand All @@ -77,7 +77,7 @@ impl Default for ConsensusObserverConfig {
subscription_peer_change_interval_ms: 180_000, // 3 minutes
subscription_refresh_interval_ms: 600_000, // 10 minutes
observer_fallback_duration_ms: 600_000, // 10 minutes
observer_fallback_startup_period_ms: 60_000, // 60 seconds
observer_fallback_startup_period_ms: 90_000, // 90 seconds
observer_fallback_progress_threshold_ms: 10_000, // 10 seconds
observer_fallback_sync_lag_threshold_ms: 15_000, // 15 seconds
}
Expand Down
61 changes: 39 additions & 22 deletions consensus/src/consensus_observer/observer/consensus_observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,11 +266,38 @@ impl ConsensusObserver {
async fn finalize_ordered_block(&mut self, ordered_block: OrderedBlock) {
info!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Forwarding ordered blocks to the execution pipeline: {}",
ordered_block.proof_block_info()
"Forwarding ordered blocks to the execution pipeline: {}. New pipeline enabled: {}",
ordered_block.proof_block_info(),
self.pipeline_enabled()
))
);

// If the pipeline is enabled, send the ordered blocks to the pipeline.
// Note: we still want to call finalize_order() below as the buffer manager
// differentiates based on whether the futures are present or not.
if self.pipeline_enabled() {
for block in ordered_block.blocks() {
// Create the commit callback (to be called after the execution pipeline)
let commit_callback = self.active_observer_state.create_commit_callback(
self.ordered_block_store.clone(),
self.block_payload_store.clone(),
);

// Send the ordered block to the execution pipeline
if let Some(pipeline_futs) = self.get_last_pipeline_futs() {
self.pipeline_builder()
.build(block, pipeline_futs, commit_callback);
} else {
warn!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Parent block's pipeline futures for ordered block is missing! Ignoring: {:?}",
ordered_block.proof_block_info()
))
);
}
}
}

// Create the commit callback (to be called after the execution pipeline)
let commit_callback = self
.active_observer_state
Expand Down Expand Up @@ -354,17 +381,19 @@ impl ConsensusObserver {
}
}

/// Returns the last block's pipeline futures, should only be called when pipeline is enabled
fn get_last_pipeline_futs(&self) -> PipelineFutures {
/// Returns the last block's pipeline futures (should only be called when pipeline is enabled)
fn get_last_pipeline_futs(&self) -> Option<PipelineFutures> {
if let Some(last_ordered_block) = self.ordered_block_store.lock().get_last_ordered_block() {
last_ordered_block
.pipeline_futs()
.expect("Pipeline futures should exist when enabled")
let result = last_ordered_block.pipeline_futs();
if result.is_none() {
warn!("last_ordered_block: {:?}", last_ordered_block);
}
result
} else {
self.pipeline_builder().build_root(
Some(self.pipeline_builder().build_root(
StateComputeResult::new_dummy(),
self.active_observer_state.root().clone(),
)
))
}
}

Expand Down Expand Up @@ -779,19 +808,6 @@ impl ConsensusObserver {
metrics::ORDERED_BLOCK_LABEL,
);

if self.pipeline_enabled() {
for block in ordered_block.blocks() {
let commit_callback = self.active_observer_state.create_commit_callback(
self.ordered_block_store.clone(),
self.block_payload_store.clone(),
);
self.pipeline_builder().build(
block,
self.get_last_pipeline_futs(),
commit_callback,
);
}
}
// Insert the ordered block into the pending blocks
self.ordered_block_store
.lock()
Expand Down Expand Up @@ -1008,6 +1024,7 @@ impl ConsensusObserver {
None,
rand_msg_rx,
0,
self.pipeline_enabled(),
)
.await;
if self.pipeline_enabled() {
Expand Down
2 changes: 2 additions & 0 deletions consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
fast_rand_config.clone(),
rand_msg_rx,
recovery_data.root_block().round(),
self.config.enable_pipeline,
)
.await;
let consensus_sk = consensus_key;
Expand Down Expand Up @@ -1377,6 +1378,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
fast_rand_config,
rand_msg_rx,
highest_committed_round,
self.config.enable_pipeline,
)
.await;

Expand Down
9 changes: 7 additions & 2 deletions consensus/src/pipeline/buffer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ pub struct BufferManager {
// If the buffer manager receives a commit vote for a block that is not in buffer items, then
// the vote will be cached. We can cache upto max_pending_rounds_in_commit_vote_cache (100) blocks.
pending_commit_votes: BTreeMap<Round, HashMap<AccountAddress, CommitVote>>,
new_pipeline_enabled: bool,
}

impl BufferManager {
Expand Down Expand Up @@ -205,6 +206,7 @@ impl BufferManager {
consensus_observer_config: ConsensusObserverConfig,
consensus_publisher: Option<Arc<ConsensusPublisher>>,
max_pending_rounds_in_commit_vote_cache: u64,
new_pipeline_enabled: bool,
) -> Self {
let buffer = Buffer::<BufferItem>::new();

Expand Down Expand Up @@ -269,6 +271,7 @@ impl BufferManager {

max_pending_rounds_in_commit_vote_cache,
pending_commit_votes: BTreeMap::new(),
new_pipeline_enabled,
}
}

Expand Down Expand Up @@ -995,8 +998,10 @@ impl BufferManager {
}});
},
_ = self.execution_schedule_retry_rx.next() => {
monitor!("buffer_manager_process_execution_schedule_retry",
self.retry_schedule_phase().await);
if !self.new_pipeline_enabled {
monitor!("buffer_manager_process_execution_schedule_retry",
self.retry_schedule_phase().await);
}
},
Some(response) = self.signing_phase_rx.next() => {
monitor!("buffer_manager_process_signing_response", {
Expand Down
2 changes: 2 additions & 0 deletions consensus/src/pipeline/decoupled_execution_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub fn prepare_phases_and_buffer_manager(
consensus_observer_config: ConsensusObserverConfig,
consensus_publisher: Option<Arc<ConsensusPublisher>>,
max_pending_rounds_in_commit_vote_cache: u64,
new_pipeline_enabled: bool,
) -> (
PipelinePhase<ExecutionSchedulePhase>,
PipelinePhase<ExecutionWaitPhase>,
Expand Down Expand Up @@ -137,6 +138,7 @@ pub fn prepare_phases_and_buffer_manager(
consensus_observer_config,
consensus_publisher,
max_pending_rounds_in_commit_vote_cache,
new_pipeline_enabled,
),
)
}
6 changes: 6 additions & 0 deletions consensus/src/pipeline/execution_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ pub trait TExecutionClient: Send + Sync {
fast_rand_config: Option<RandConfig>,
rand_msg_rx: aptos_channel::Receiver<AccountAddress, IncomingRandGenRequest>,
highest_committed_round: Round,
new_pipeline_enabled: bool,
);

/// This is needed for some DAG tests. Clean this up as a TODO.
Expand Down Expand Up @@ -208,6 +209,7 @@ impl ExecutionProxyClient {
buffer_manager_back_pressure_enabled: bool,
consensus_observer_config: ConsensusObserverConfig,
consensus_publisher: Option<Arc<ConsensusPublisher>>,
new_pipeline_enabled: bool,
) {
let network_sender = NetworkSender::new(
self.author,
Expand Down Expand Up @@ -295,6 +297,7 @@ impl ExecutionProxyClient {
consensus_publisher,
self.consensus_config
.max_pending_rounds_in_commit_vote_cache,
new_pipeline_enabled,
);

tokio::spawn(execution_schedule_phase.start());
Expand All @@ -320,6 +323,7 @@ impl TExecutionClient for ExecutionProxyClient {
fast_rand_config: Option<RandConfig>,
rand_msg_rx: aptos_channel::Receiver<AccountAddress, IncomingRandGenRequest>,
highest_committed_round: Round,
new_pipeline_enabled: bool,
) {
let maybe_rand_msg_tx = self.spawn_decoupled_execution(
maybe_consensus_key,
Expand All @@ -333,6 +337,7 @@ impl TExecutionClient for ExecutionProxyClient {
self.consensus_config.enable_pre_commit,
self.consensus_observer_config,
self.consensus_publisher.clone(),
new_pipeline_enabled,
);

let transaction_shuffler =
Expand Down Expand Up @@ -536,6 +541,7 @@ impl TExecutionClient for DummyExecutionClient {
_fast_rand_config: Option<RandConfig>,
_rand_msg_rx: aptos_channel::Receiver<AccountAddress, IncomingRandGenRequest>,
_highest_committed_round: Round,
_new_pipeline_enabled: bool,
) {
}

Expand Down
7 changes: 5 additions & 2 deletions consensus/src/pipeline/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ fn spawn_shared_fut<
async move {
match join_handle.await {
Ok(Ok(res)) => Ok(res),
Ok(Err(e)) => Err(TaskError::PropagatedError(Box::new(e))),
Ok(e @ Err(TaskError::PropagatedError(_))) => e,
Ok(Err(e @ TaskError::InternalError(_) | e @ TaskError::JoinError(_))) => {
Err(TaskError::PropagatedError(Box::new(e)))
},
Err(e) => Err(TaskError::JoinError(Arc::new(e))),
}
}
Expand Down Expand Up @@ -151,7 +154,7 @@ impl Tracker {
.with_label_values(&[self.name, "work_time"])
.observe(work_time.as_secs_f64());
info!(
"[Pipeline] Block {} {} {} finishes {}, waits {}, takes {}",
"[Pipeline] Block {} {} {} finishes {}, waits {}ms, takes {}ms",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log message appends ms to the time values but as_secs_f64() returns seconds. To avoid misleading logs, either convert the values to milliseconds (multiply by 1000) before logging, or remove the ms suffix to reflect the actual units in seconds.

Spotted by Graphite Reviewer

Is this helpful? React 👍 or 👎 to let us know.

self.block_id,
self.epoch,
self.round,
Expand Down
1 change: 1 addition & 0 deletions consensus/src/pipeline/tests/buffer_manager_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ pub fn prepare_buffer_manager(
ConsensusObserverConfig::default(),
None,
100,
true,
);

(
Expand Down
1 change: 1 addition & 0 deletions consensus/src/test_utils/mock_execution_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ impl TExecutionClient for MockExecutionClient {
_fast_rand_config: Option<RandConfig>,
_rand_msg_rx: aptos_channel::Receiver<AccountAddress, IncomingRandGenRequest>,
_highest_committed_round: Round,
_new_pipeline_enabled: bool,
) {
}

Expand Down
Loading