Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

collation-generation: resolve mismatch between descriptor and commitments core index #7104

Merged
merged 14 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 74 additions & 11 deletions polkadot/node/collation-generation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ use polkadot_primitives::{
node_features::FeatureIndex,
vstaging::{
transpose_claim_queue, CandidateDescriptorV2, CandidateReceiptV2 as CandidateReceipt,
CommittedCandidateReceiptV2, TransposedClaimQueue,
ClaimQueueOffset, CommittedCandidateReceiptV2, TransposedClaimQueue,
},
CandidateCommitments, CandidateDescriptor, CollatorPair, CoreIndex, Hash, Id as ParaId,
NodeFeatures, OccupiedCoreAssumption, PersistedValidationData, SessionIndex,
ValidationCodeHash,
};
use schnellru::{ByLength, LruMap};
use sp_core::crypto::Pair;
use std::sync::Arc;
use std::{collections::HashSet, sync::Arc};

mod error;

Expand Down Expand Up @@ -276,13 +276,15 @@ impl CollationGenerationSubsystem {
let claim_queue =
ClaimQueueSnapshot::from(request_claim_queue(relay_parent, ctx.sender()).await.await??);

let cores_to_build_on = claim_queue
.iter_claims_at_depth(0)
.filter_map(|(core_idx, para_id)| (para_id == config.para_id).then_some(core_idx))
.collect::<Vec<_>>();
let assigned_cores = claim_queue
.iter_all_claims()
.filter_map(|(core_idx, para_ids)| {
para_ids.iter().any(|&para_id| para_id == config.para_id).then_some(*core_idx)
})
.collect::<HashSet<_>>();
sw10pa marked this conversation as resolved.
Show resolved Hide resolved

// Nothing to do if no core assigned to us.
if cores_to_build_on.is_empty() {
// Nothing to do if no core is assigned to us at any depth.
if assigned_cores.is_empty() {
return Ok(())
}

Expand Down Expand Up @@ -342,9 +344,13 @@ impl CollationGenerationSubsystem {
ctx.spawn(
"chained-collation-builder",
Box::pin(async move {
let transposed_claim_queue = transpose_claim_queue(claim_queue.0);
let transposed_claim_queue = transpose_claim_queue(claim_queue.0.clone());

for core_index in cores_to_build_on {
// Track used core indexes not to submit collations on the same core.
let mut used_cores = HashSet::new();

for i in 0..assigned_cores.len() {
// Get the collation.
let collator_fn = match task_config.collator.as_ref() {
Some(x) => x,
None => return,
Expand All @@ -363,6 +369,63 @@ impl CollationGenerationSubsystem {
},
};

// Use the core_selector method from CandidateCommitments to extract
// CoreSelector and ClaimQueueOffset.
let mut commitments = CandidateCommitments::default();
commitments.upward_messages = collation.upward_messages.clone();

let (core_selector, cq_offset) = match commitments.core_selector() {
Ok(Some((sel, off))) => (Some(sel), off),
Ok(None) => (None, ClaimQueueOffset(0)),
Err(err) => {
gum::debug!(
target: LOG_TARGET,
?para_id,
"error processing UMP signals: {}",
err
);
return
},
};

// Identify the cores to build collations on using the given claim queue offset.
let cores_to_build_on = claim_queue
sandreim marked this conversation as resolved.
Show resolved Hide resolved
.iter_claims_at_depth(cq_offset.0 as usize)
.filter_map(|(core_idx, para_id)| {
(para_id == task_config.para_id).then_some(core_idx)
})
.collect::<Vec<_>>();

if cores_to_build_on.is_empty() {
gum::debug!(
sandreim marked this conversation as resolved.
Show resolved Hide resolved
target: LOG_TARGET,
?para_id,
"no core is assigned to para at depth {}",
cq_offset.0,
);
return
}

let index = match core_selector {
sw10pa marked this conversation as resolved.
Show resolved Hide resolved
// Use the CoreSelector's index if provided.
Some(core_selector) => core_selector.0 as usize,
// Fallback to the sequential index if no CoreSelector is provided.
None => i,
};
let descriptor_core_index = cores_to_build_on[index % cores_to_build_on.len()];

// Ensure the core index has not been used before.
if used_cores.contains(&descriptor_core_index.0) {
gum::debug!(
sw10pa marked this conversation as resolved.
Show resolved Hide resolved
target: LOG_TARGET,
?para_id,
"parachain repeatedly selected the same core index",
sw10pa marked this conversation as resolved.
Show resolved Hide resolved
);
return
}
used_cores.insert(descriptor_core_index.0);
sw10pa marked this conversation as resolved.
Show resolved Hide resolved

// Distribute the collation.
let parent_head = collation.head_data.clone();
if let Err(err) = construct_and_distribute_receipt(
PreparedCollation {
Expand All @@ -372,7 +435,7 @@ impl CollationGenerationSubsystem {
validation_data: validation_data.clone(),
validation_code_hash,
n_validators,
core_index,
core_index: descriptor_core_index,
session_index,
},
task_config.key.clone(),
Expand Down
153 changes: 134 additions & 19 deletions polkadot/node/collation-generation/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,27 @@

use super::*;
use assert_matches::assert_matches;
use futures::{
task::{Context as FuturesContext, Poll},
Future, StreamExt,
use futures::{self, Future, StreamExt};
use polkadot_node_primitives::{
BlockData, Collation, CollationResult, CollatorFn, MaybeCompressedPoV, PoV,
};
use polkadot_node_primitives::{BlockData, Collation, CollationResult, MaybeCompressedPoV, PoV};
use polkadot_node_subsystem::{
messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest},
ActivatedLeaf,
};
use polkadot_node_subsystem_test_helpers::TestSubsystemContextHandle;
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_primitives::{
node_features, vstaging::CandidateDescriptorVersion, CollatorPair, PersistedValidationData,
node_features,
vstaging::{CandidateDescriptorVersion, CoreSelector, UMPSignal, UMP_SEPARATOR},
CollatorPair, PersistedValidationData,
};
use polkadot_primitives_test_helpers::dummy_head_data;
use rstest::rstest;
use sp_keyring::sr25519::Keyring as Sr25519Keyring;
use std::{
collections::{BTreeMap, VecDeque},
pin::Pin,
sync::Mutex,
};

type VirtualOverseer = TestSubsystemContextHandle<CollationGenerationMessage>;
Expand Down Expand Up @@ -79,17 +80,49 @@ fn test_collation() -> Collation {
}
}

struct TestCollator;

impl Future for TestCollator {
type Output = Option<CollationResult>;
struct State {
core_selector_index: Option<u8>,
cq_offset: u8,
}

fn poll(self: Pin<&mut Self>, _cx: &mut FuturesContext) -> Poll<Self::Output> {
Poll::Ready(Some(CollationResult { collation: test_collation(), result_sender: None }))
impl State {
fn new(core_selector_index: Option<u8>, cq_offset: u8) -> Self {
sw10pa marked this conversation as resolved.
Show resolved Hide resolved
Self { core_selector_index, cq_offset }
}
}

impl Unpin for TestCollator {}
struct TestCollator {
state: Arc<Mutex<State>>,
}

impl TestCollator {
fn new(core_selector_index: Option<u8>, cq_offset: u8) -> Self {
Self { state: Arc::new(Mutex::new(State::new(core_selector_index, cq_offset))) }
}

pub fn create_collation_function(&self) -> CollatorFn {
let state = Arc::clone(&self.state);

Box::new(move |_relay_parent: Hash, _validation_data: &PersistedValidationData| {
let mut state_guard = state.lock().unwrap();
let mut collation = test_collation();

if let Some(index) = state_guard.core_selector_index {
collation.upward_messages.force_push(UMP_SEPARATOR);
collation.upward_messages.force_push(
UMPSignal::SelectCore(
CoreSelector(index),
ClaimQueueOffset(state_guard.cq_offset),
)
.encode(),
);
state_guard.core_selector_index = Some(index + 1);
}

async move { Some(CollationResult { collation, result_sender: None }) }.boxed()
})
}
}

const TIMEOUT: std::time::Duration = std::time::Duration::from_millis(2000);

Expand All @@ -101,10 +134,15 @@ async fn overseer_recv(overseer: &mut VirtualOverseer) -> AllMessages {
.expect(&format!("{:?} is long enough to receive messages", TIMEOUT))
}

fn test_config<Id: Into<ParaId>>(para_id: Id) -> CollationGenerationConfig {
fn test_config<Id: Into<ParaId>>(
para_id: Id,
core_selector_index: Option<u8>,
cq_offset: u8,
) -> CollationGenerationConfig {
let test_collator = TestCollator::new(core_selector_index, cq_offset);
CollationGenerationConfig {
key: CollatorPair::generate().0,
collator: Some(Box::new(|_: Hash, _vd: &PersistedValidationData| TestCollator.boxed())),
collator: Some(test_collator.create_collation_function()),
para_id: para_id.into(),
}
}
Expand Down Expand Up @@ -219,7 +257,7 @@ fn distribute_collation_only_for_assigned_para_id_at_offset_0() {
.collect::<BTreeMap<_, _>>();

test_harness(|mut virtual_overseer| async move {
helpers::initialize_collator(&mut virtual_overseer, para_id).await;
helpers::initialize_collator(&mut virtual_overseer, para_id, None, 0).await;
helpers::activate_new_head(&mut virtual_overseer, activated_hash).await;
helpers::handle_runtime_calls_on_new_head_activation(
&mut virtual_overseer,
Expand Down Expand Up @@ -259,7 +297,7 @@ fn distribute_collation_with_elastic_scaling(#[case] total_cores: u32) {
.collect::<BTreeMap<_, _>>();

test_harness(|mut virtual_overseer| async move {
helpers::initialize_collator(&mut virtual_overseer, para_id).await;
helpers::initialize_collator(&mut virtual_overseer, para_id, None, 0).await;
helpers::activate_new_head(&mut virtual_overseer, activated_hash).await;
helpers::handle_runtime_calls_on_new_head_activation(
&mut virtual_overseer,
Expand All @@ -281,6 +319,74 @@ fn distribute_collation_with_elastic_scaling(#[case] total_cores: u32) {
});
}

// Tests when submission core indexes need to be selected using the core selectors provided in the
// UMP signals. The core selector index is an increasing number that can start with a non-negative
// value (even greater than the core index), but the collation generation protocol uses the
// remainder to select the core. UMP signals may also contain a claim queue offset, based on which
// we need to select the assigned core indexes for the para from that offset in the claim queue.
#[rstest]
#[case(0, 0, 0)]
#[case(1, 0, 0)]
#[case(1, 5, 0)]
#[case(2, 0, 1)]
#[case(4, 2, 2)]
fn distribute_collation_with_core_selectors(
sw10pa marked this conversation as resolved.
Show resolved Hide resolved
#[case] total_cores: u32,
// The core selector index that will be obtained from the first collation.
#[case] init_core_selector_index: u8,
// Claim queue offset where the assigned cores will be stored.
#[case] cq_offset: u8,
) {
let activated_hash: Hash = [1; 32].into();
let para_id = ParaId::from(5);
let other_para_id = ParaId::from(10);

let claim_queue = (0..total_cores)
.into_iter()
.map(|idx| {
// Set all cores assigned to para_id 5 at the cq_offset depth.
let mut vec = VecDeque::from(vec![other_para_id; cq_offset as usize]);
vec.push_back(para_id);
(CoreIndex(idx), vec)
})
.collect::<BTreeMap<_, _>>();

test_harness(|mut virtual_overseer| async move {
helpers::initialize_collator(
&mut virtual_overseer,
para_id,
Some(init_core_selector_index),
cq_offset,
)
.await;
helpers::activate_new_head(&mut virtual_overseer, activated_hash).await;
helpers::handle_runtime_calls_on_new_head_activation(
&mut virtual_overseer,
activated_hash,
claim_queue,
NodeFeatures::EMPTY,
sw10pa marked this conversation as resolved.
Show resolved Hide resolved
)
.await;

let mut cores_assigned = (0..total_cores).collect::<Vec<_>>();
if total_cores > 1 && init_core_selector_index > 0 {
// We need to rotate the list of cores because the first core selector index was
// non-zero, which should change the sequence of submissions. However, collations should
// still be submitted on all cores.
cores_assigned.rotate_left((init_core_selector_index as u32 % total_cores) as usize);
}
helpers::handle_cores_processing_for_a_leaf(
&mut virtual_overseer,
activated_hash,
para_id,
cores_assigned,
)
.await;

virtual_overseer
});
}

#[rstest]
#[case(true)]
#[case(false)]
Expand Down Expand Up @@ -405,10 +511,19 @@ mod helpers {
use std::collections::{BTreeMap, VecDeque};

// Sends `Initialize` with a collator config
pub async fn initialize_collator(virtual_overseer: &mut VirtualOverseer, para_id: ParaId) {
pub async fn initialize_collator(
virtual_overseer: &mut VirtualOverseer,
para_id: ParaId,
core_selector_index: Option<u8>,
cq_offset: u8,
) {
virtual_overseer
.send(FromOrchestra::Communication {
msg: CollationGenerationMessage::Initialize(test_config(para_id)),
msg: CollationGenerationMessage::Initialize(test_config(
para_id,
core_selector_index,
cq_offset,
)),
})
.await;
}
Expand Down
20 changes: 20 additions & 0 deletions prdoc/pr_7104.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
title: "collation-generation: resolve mismatch between descriptor and commitments core index"

doc:
- audience: Node Dev
description: |
This PR resolves a bug where normal collators failed to generate and submit collations,
sw10pa marked this conversation as resolved.
Show resolved Hide resolved
resulting in the following error:

```
ERROR tokio-runtime-worker parachain::collation-generation: Failed to construct and
distribute collation: V2 core index check failed: The core index in commitments doesn't
match the one in descriptor.
```

When core selectors are provided in the UMP signals, core indexes will be chosen using them.
sw10pa marked this conversation as resolved.
Show resolved Hide resolved
The fix ensures that functionality remains unchanged for parachains not using UMP signals.
sw10pa marked this conversation as resolved.
Show resolved Hide resolved

crates:
- name: polkadot-node-collation-generation
bump: patch
Loading