Skip to content

Commit

Permalink
Merge pull request #2867 from subspace/xdm_relayer_client_broadcast
Browse files Browse the repository at this point in the history
XDM: Chain channel state broadcast and use channel state to filter messages
  • Loading branch information
vedhavyas committed Jun 26, 2024
2 parents 06241a9 + 592d55f commit ee13cb8
Show file tree
Hide file tree
Showing 33 changed files with 2,301 additions and 721 deletions.
1,486 changes: 963 additions & 523 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion crates/pallet-domains/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2523,7 +2523,7 @@ impl<T: Config> Pallet<T> {
Ok(code)
}

pub fn is_domain_runtime_updraded_since(
pub fn is_domain_runtime_upgraded_since(
domain_id: DomainId,
at: BlockNumberFor<T>,
) -> Option<bool> {
Expand Down
2 changes: 1 addition & 1 deletion crates/sp-domains/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1496,7 +1496,7 @@ sp_api::decl_runtime_apis! {
fn storage_fund_account_balance(operator_id: OperatorId) -> Balance;

/// Return if the domain runtime code is upgraded since `at`
fn is_domain_runtime_updraded_since(domain_id: DomainId, at: NumberFor<Block>) -> Option<bool>;
fn is_domain_runtime_upgraded_since(domain_id: DomainId, at: NumberFor<Block>) -> Option<bool>;
}

pub trait BundleProducerElectionApi<Balance: Encode + Decode> {
Expand Down
20 changes: 20 additions & 0 deletions crates/sp-domains/src/proof_provider_and_verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ extern crate alloc;
#[cfg(not(feature = "std"))]
use alloc::collections::BTreeSet;
#[cfg(not(feature = "std"))]
use alloc::fmt;
#[cfg(not(feature = "std"))]
use alloc::vec::Vec;
use frame_support::PalletError;
use hash_db::Hasher;
Expand All @@ -22,6 +24,8 @@ use sp_trie::{read_trie_value, LayoutV1, StorageProof};
#[cfg(feature = "std")]
use std::collections::BTreeSet;
#[cfg(feature = "std")]
use std::fmt;
#[cfg(feature = "std")]
use trie_db::{DBValue, TrieDBMutBuilder, TrieLayout, TrieMut};

/// Verification error.
Expand All @@ -37,6 +41,22 @@ pub enum VerificationError {
UnusedNodesInTheProof,
}

impl fmt::Display for VerificationError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
VerificationError::InvalidProof => write!(f, "Given storage proof is invalid"),
VerificationError::MissingValue => {
write!(f, "Value doesn't exist in the Db for this key")
}
VerificationError::FailedToDecode => write!(f, "Failed to decode value"),
VerificationError::UnusedNodesInTheProof => write!(
f,
"Storage proof contains unused nodes after reading the necessary keys"
),
}
}
}

/// Type that provides utilities to verify the storage proof.
pub struct StorageProofVerifier<H: Hasher>(PhantomData<H>);

Expand Down
13 changes: 11 additions & 2 deletions crates/subspace-fake-runtime-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ use sp_domains::{
use sp_domains_fraud_proof::fraud_proof::FraudProof;
use sp_domains_fraud_proof::storage_proof::FraudProofStorageKeyRequest;
use sp_messenger::messages::{
BlockMessagesWithStorageKey, ChainId, CrossDomainMessage, MessageId, MessageKey,
BlockMessagesWithStorageKey, ChainId, ChannelId, CrossDomainMessage, MessageId, MessageKey,
};
use sp_runtime::traits::{Block as BlockT, NumberFor};
use sp_runtime::transaction_validity::{TransactionSource, TransactionValidity};
use sp_runtime::{ApplyExtrinsicResult, ExtrinsicInclusionMode};
use sp_version::RuntimeVersion;
use std::collections::btree_map::BTreeMap;
use std::collections::btree_set::BTreeSet;
use subspace_core_primitives::objects::BlockObjectMapping;
use subspace_core_primitives::{
HistorySize, Randomness, SegmentCommitment, SegmentHeader, SegmentIndex, U256,
Expand Down Expand Up @@ -301,7 +302,7 @@ sp_api::impl_runtime_apis! {
unreachable!()
}

fn is_domain_runtime_updraded_since(_domain_id: DomainId, _at: NumberFor<Block>) -> Option<bool> {
fn is_domain_runtime_upgraded_since(_domain_id: DomainId, _at: NumberFor<Block>) -> Option<bool> {
unreachable!()
}
}
Expand Down Expand Up @@ -399,6 +400,14 @@ sp_api::impl_runtime_apis! {
fn should_relay_inbox_message_response(_dst_chain_id: ChainId, _msg_id: MessageId) -> bool {
unreachable!()
}

fn updated_channels() -> BTreeSet<(ChainId, ChannelId)> {
unreachable!()
}

fn channel_storage_key(_chain_id: ChainId, _channel_id: ChannelId) -> Vec<u8> {
unreachable!()
}
}

impl sp_domains_fraud_proof::FraudProofApi<Block, DomainHeader> for Runtime {
Expand Down
1 change: 1 addition & 0 deletions crates/subspace-malicious-operator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ sc-cli = { git = "https://github.com/subspace/polkadot-sdk", rev = "98914adb256f
sc-client-api = { git = "https://github.com/subspace/polkadot-sdk", rev = "98914adb256fed32c13ce251c5b4c9972af8ea0f" }
sc-consensus-slots = { git = "https://github.com/subspace/polkadot-sdk", rev = "98914adb256fed32c13ce251c5b4c9972af8ea0f" }
sc-consensus-subspace = { version = "0.1.0", path = "../sc-consensus-subspace" }
sc-domains = { version = "0.1.0", path = "../sc-domains" }
sc-network = { git = "https://github.com/subspace/polkadot-sdk", rev = "98914adb256fed32c13ce251c5b4c9972af8ea0f" }
sc-service = { git = "https://github.com/subspace/polkadot-sdk", rev = "98914adb256fed32c13ce251c5b4c9972af8ea0f", default-features = false }
sc-storage-monitor = { git = "https://github.com/subspace/polkadot-sdk", rev = "98914adb256fed32c13ce251c5b4c9972af8ea0f", default-features = false }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ fn main() -> Result<(), Error> {

// start relayer for consensus chain
let mut xdm_gossip_worker_builder = GossipWorkerBuilder::new();
{
let consensus_msg_receiver = {
let span = sc_tracing::tracing::info_span!(
sc_tracing::logging::PREFIX_LOG_SPAN,
name = "Consensus"
Expand Down Expand Up @@ -294,37 +294,35 @@ fn main() -> Result<(), Error> {
Box::pin(relayer_worker),
);

let (consensus_msg_sink, consensus_msg_receiver) =
tracing_unbounded("consensus_message_channel", 100);

// Start cross domain message listener for Consensus chain to receive messages from domains in the network
let consensus_listener =
cross_domain_message_gossip::start_cross_chain_message_listener(
let channel_update_worker =
domain_client_message_relayer::worker::gossip_channel_updates::<_, _, Block, _>(
ChainId::Consensus,
consensus_chain_node.client.clone(),
consensus_chain_node.transaction_pool.clone(),
consensus_chain_node.network_service.clone(),
consensus_msg_receiver,
consensus_chain_node.sync_service.clone(),
xdm_gossip_worker_builder.gossip_msg_sink(),
);

consensus_chain_node
.task_manager
.spawn_essential_handle()
.spawn_essential_blocking(
"consensus-message-listener",
"consensus-chain-channel-update-worker",
None,
Box::pin(consensus_listener),
Box::pin(channel_update_worker),
);

xdm_gossip_worker_builder
.push_chain_tx_pool_sink(ChainId::Consensus, consensus_msg_sink);
}
let (consensus_msg_sink, consensus_msg_receiver) =
tracing_unbounded("consensus_message_channel", 100);

xdm_gossip_worker_builder.push_chain_sink(ChainId::Consensus, consensus_msg_sink);
consensus_msg_receiver
};

let (domain_message_sink, domain_message_receiver) =
tracing_unbounded("domain_message_channel", 100);

xdm_gossip_worker_builder
.push_chain_tx_pool_sink(ChainId::Domain(domain_id), domain_message_sink);
.push_chain_sink(ChainId::Domain(domain_id), domain_message_sink);

let domain_starter = DomainInstanceStarter {
domain_cli,
Expand All @@ -348,6 +346,8 @@ fn main() -> Result<(), Error> {
consensus_state_pruning,
};

let consensus_network_service = consensus_chain_node.network_service.clone();
let consensus_task_spawn_essential_handler = consensus_chain_node.task_manager.spawn_essential_handle();
consensus_chain_node
.task_manager
.spawn_essential_handle()
Expand All @@ -366,10 +366,44 @@ fn main() -> Result<(), Error> {
return;
}
};
if let Err(error) =
domain_starter.start(bootstrap_result, sudo_account).await
{
log::error!("Domain starter exited with an error {error:?}");

match domain_starter.start(bootstrap_result, sudo_account).await {
Ok(domain_code_executor) => {
let span = sc_tracing::tracing::info_span!(
sc_tracing::logging::PREFIX_LOG_SPAN,
name = "Consensus"
);
let _enter = span.enter();
// Start cross domain message listener for Consensus chain to receive messages from domains in the network
let consensus_listener =
cross_domain_message_gossip::start_cross_chain_message_listener::<
_,
_,
_,
_,
_,
DomainBlock,
_,
>(
ChainId::Consensus,
consensus_chain_node.client.clone(),
consensus_chain_node.client.clone(),
consensus_chain_node.transaction_pool.clone(),
consensus_network_service,
consensus_msg_receiver,
domain_code_executor
);

consensus_task_spawn_essential_handler
.spawn_essential_blocking(
"consensus-message-listener",
None,
Box::pin(consensus_listener),
);
}
Err(err) => {
log::error!("Domain starter exited with an error {err:?}");
}
}
}),
);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::malicious_bundle_producer::MaliciousBundleProducer;
use crate::{create_malicious_operator_configuration, DomainCli};
use cross_domain_message_gossip::{ChainTxPoolMsg, Message};
use cross_domain_message_gossip::{ChainMsg, Message};
use domain_client_operator::{BootstrapResult, OperatorStreams};
use domain_eth_service::provider::EthProvider;
use domain_eth_service::DefaultEthConfig;
Expand Down Expand Up @@ -44,7 +44,7 @@ pub struct DomainInstanceStarter {
SubspaceNotificationStream<BlockImportingNotification<CBlock>>,
pub new_slot_notification_stream: SubspaceNotificationStream<NewSlotNotification>,
pub consensus_sync_service: Arc<sc_network_sync::SyncingService<CBlock>>,
pub domain_message_receiver: TracingUnboundedReceiver<ChainTxPoolMsg>,
pub domain_message_receiver: TracingUnboundedReceiver<ChainMsg>,
pub gossip_message_sink: TracingUnboundedSender<Message>,
pub consensus_network: Arc<dyn NetworkPeers + Send + Sync>,
pub consensus_state_pruning: PruningMode,
Expand All @@ -55,7 +55,7 @@ impl DomainInstanceStarter {
self,
bootstrap_result: BootstrapResult<CBlock>,
sudo_account: AccountId,
) -> Result<(), Box<dyn std::error::Error>> {
) -> Result<Arc<sc_domains::RuntimeExecutor>, Box<dyn std::error::Error>> {
let BootstrapResult {
domain_instance_data,
domain_created_at,
Expand Down Expand Up @@ -203,7 +203,7 @@ impl DomainInstanceStarter {

domain_node.task_manager.future().await?;

Ok(())
Ok(domain_node.code_executor.clone())
}
RuntimeType::AutoId => {
let domain_params = domain_service::DomainParams {
Expand Down Expand Up @@ -262,7 +262,7 @@ impl DomainInstanceStarter {

domain_node.task_manager.future().await?;

Ok(())
Ok(domain_node.code_executor.clone())
}
}
}
Expand Down
73 changes: 57 additions & 16 deletions crates/subspace-node/src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ pub async fn run(run_options: RunOptions) -> Result<(), Error> {
tracing_unbounded("domain_message_channel", 100);

// Start relayer for consensus chain
{
let consensus_msg_receiver = {
let span = info_span!("Consensus");
let _enter = span.enter();
let consensus_best_hash = consensus_chain_node.client.info().best_hash;
Expand All @@ -251,30 +251,32 @@ pub async fn run(run_options: RunOptions) -> Result<(), Error> {
),
);

let (consensus_msg_sink, consensus_msg_receiver) =
tracing_unbounded("consensus_message_channel", 100);

// Start cross domain message listener for Consensus chain to receive messages from domains in the network
consensus_chain_node
.task_manager
.spawn_essential_handle()
.spawn_essential_blocking(
"consensus-message-listener",
"consensus-chain-channel-update-worker",
None,
Box::pin(
cross_domain_message_gossip::start_cross_chain_message_listener(
domain_client_message_relayer::worker::gossip_channel_updates::<
_,
_,
Block,
_,
>(
ChainId::Consensus,
consensus_chain_node.client.clone(),
consensus_chain_node.transaction_pool.clone(),
consensus_chain_node.network_service.clone(),
consensus_msg_receiver,
consensus_chain_node.sync_service.clone(),
xdm_gossip_worker_builder.gossip_msg_sink(),
),
),
);

xdm_gossip_worker_builder
.push_chain_tx_pool_sink(ChainId::Consensus, consensus_msg_sink);
xdm_gossip_worker_builder.push_chain_tx_pool_sink(
let (consensus_msg_sink, consensus_msg_receiver) =
tracing_unbounded("consensus_message_channel", 100);

xdm_gossip_worker_builder.push_chain_sink(ChainId::Consensus, consensus_msg_sink);
xdm_gossip_worker_builder.push_chain_sink(
ChainId::Domain(domain_configuration.domain_id),
domain_message_sink,
);
Expand All @@ -294,7 +296,15 @@ pub async fn run(run_options: RunOptions) -> Result<(), Error> {
None,
Box::pin(cross_domain_message_gossip_worker.run()),
);
}

consensus_msg_receiver
};

let consensus_client = consensus_chain_node.client.clone();
let consensus_network_service = consensus_chain_node.network_service.clone();
let consensus_tx_pool = consensus_chain_node.transaction_pool.clone();
let consensus_task_essential_handler =
consensus_chain_node.task_manager.spawn_essential_handle();

let domain_start_options = DomainStartOptions {
consensus_client: consensus_chain_node.client,
Expand Down Expand Up @@ -339,8 +349,39 @@ pub async fn run(run_options: RunOptions) -> Result<(), Error> {
domain_start_options,
);

if let Err(error) = start_domain.await {
error!(%error, "Domain starter exited with an error");
match start_domain.await {
Ok(domain_code_executor) => {
let span = info_span!("Consensus");
let _enter = span.enter();
// Start cross domain message listener for Consensus chain to receive messages from domains in the network
consensus_task_essential_handler
.spawn_essential_blocking(
"consensus-message-listener",
None,
Box::pin(
cross_domain_message_gossip::start_cross_chain_message_listener::<
_,
_,
_,
_,
_,
DomainBlock,
_,
>(
ChainId::Consensus,
consensus_client.clone(),
consensus_client.clone(),
consensus_tx_pool,
consensus_network_service,
consensus_msg_receiver,
domain_code_executor
),
),
);
}
Err(err) => {
error!(%err, "Domain starter exited with an error");
}
}
}),
);
Expand Down
Loading

0 comments on commit ee13cb8

Please sign in to comment.