Skip to content

Commit

Permalink
[Consensus Observer] Fix new pipeline.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Jan 24, 2025
1 parent 3ddcf7f commit 920f86f
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 48 deletions.
4 changes: 2 additions & 2 deletions config/src/config/consensus_observer_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl Default for ConsensusObserverConfig {
Self {
observer_enabled: false,
publisher_enabled: false,
enable_pipeline: false,
enable_pipeline: true,
max_network_channel_size: 1000,
max_parallel_serialization_tasks: num_cpus::get(), // Default to the number of CPUs
network_request_timeout_ms: 5_000, // 5 seconds
Expand All @@ -77,7 +77,7 @@ impl Default for ConsensusObserverConfig {
subscription_peer_change_interval_ms: 180_000, // 3 minutes
subscription_refresh_interval_ms: 600_000, // 10 minutes
observer_fallback_duration_ms: 600_000, // 10 minutes
observer_fallback_startup_period_ms: 60_000, // 60 seconds
observer_fallback_startup_period_ms: 90_000, // 90 seconds
observer_fallback_progress_threshold_ms: 10_000, // 10 seconds
observer_fallback_sync_lag_threshold_ms: 15_000, // 15 seconds
}
Expand Down
104 changes: 59 additions & 45 deletions consensus/src/consensus_observer/observer/consensus_observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,35 +266,60 @@ impl ConsensusObserver {
async fn finalize_ordered_block(&mut self, ordered_block: OrderedBlock) {
info!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Forwarding ordered blocks to the execution pipeline: {}",
ordered_block.proof_block_info()
"Forwarding ordered blocks to the execution pipeline: {}. New pipeline enabled: {}",
ordered_block.proof_block_info(),
self.pipeline_enabled()
))
);

// Create the commit callback (to be called after the execution pipeline)
let commit_callback = self
.active_observer_state
.create_commit_callback_deprecated(
self.ordered_block_store.clone(),
self.block_payload_store.clone(),
);
// Finalize the ordered block using the appropriate execution pipeline
if self.pipeline_enabled() {
for block in ordered_block.blocks() {
// Create the commit callback (to be called after the execution pipeline)
let commit_callback = self.active_observer_state.create_commit_callback(
self.ordered_block_store.clone(),
self.block_payload_store.clone(),
);

// Send the ordered block to the execution pipeline
if let Err(error) = self
.execution_client
.finalize_order(
ordered_block.blocks(),
ordered_block.ordered_proof().clone(),
commit_callback,
)
.await
{
error!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Failed to finalize ordered block! Error: {:?}",
error
))
);
// Send the ordered block to the execution pipeline
if let Some(pipeline_futs) = self.get_last_pipeline_futs() {
self.pipeline_builder()
.build(block, pipeline_futs, commit_callback);
} else {
warn!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Parent block's pipeline futures for ordered block is missing! Ignoring: {:?}",
ordered_block.proof_block_info()
))
);
}
}
} else {
// Create the commit callback (to be called after the execution pipeline)
let commit_callback = self
.active_observer_state
.create_commit_callback_deprecated(
self.ordered_block_store.clone(),
self.block_payload_store.clone(),
);

// Send the ordered block to the execution pipeline
if let Err(error) = self
.execution_client
.finalize_order(
ordered_block.blocks(),
ordered_block.ordered_proof().clone(),
commit_callback,
)
.await
{
error!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Failed to finalize ordered block! Error: {:?}",
error
))
);
}
}
}

Expand Down Expand Up @@ -354,17 +379,19 @@ impl ConsensusObserver {
}
}

/// Returns the last block's pipeline futures, should only be called when pipeline is enabled
fn get_last_pipeline_futs(&self) -> PipelineFutures {
/// Returns the last block's pipeline futures (should only be called when pipeline is enabled)
fn get_last_pipeline_futs(&self) -> Option<PipelineFutures> {
if let Some(last_ordered_block) = self.ordered_block_store.lock().get_last_ordered_block() {
last_ordered_block
.pipeline_futs()
.expect("Pipeline futures should exist when enabled")
let result = last_ordered_block.pipeline_futs();
if result.is_none() {
println!("last_ordered_block: {:?}", last_ordered_block);
}
result
} else {
self.pipeline_builder().build_root(
Some(self.pipeline_builder().build_root(
StateComputeResult::new_dummy(),
self.active_observer_state.root().clone(),
)
))
}
}

Expand Down Expand Up @@ -779,19 +806,6 @@ impl ConsensusObserver {
metrics::ORDERED_BLOCK_LABEL,
);

if self.pipeline_enabled() {
for block in ordered_block.blocks() {
let commit_callback = self.active_observer_state.create_commit_callback(
self.ordered_block_store.clone(),
self.block_payload_store.clone(),
);
self.pipeline_builder().build(
block,
self.get_last_pipeline_futs(),
commit_callback,
);
}
}
// Insert the ordered block into the pending blocks
self.ordered_block_store
.lock()
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/pipeline/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl Tracker {
.with_label_values(&[self.name, "work_time"])
.observe(work_time.as_secs_f64());
info!(
"[Pipeline] Block {} {} {} finishes {}, waits {}, takes {}",
"[Pipeline] Block {} {} {} finishes {}, waits {}ms, takes {}ms",
self.block_id,
self.epoch,
self.round,
Expand Down

0 comments on commit 920f86f

Please sign in to comment.