Skip to content

Commit

Permalink
Merge pull request #162 from paritytech/vstakhov-hrmp-pc
Browse files Browse the repository at this point in the history
Display HRMP/XCM channels for parachains
  • Loading branch information
sandreim authored Nov 25, 2022
2 parents 33f2e1f + 25f20a6 commit 756d2fe
Show file tree
Hide file tree
Showing 2 changed files with 284 additions and 4 deletions.
187 changes: 184 additions & 3 deletions src/core/api/subxt_wrapper.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::BTreeMap;
// Copyright 2022 Parity Technologies (UK) Ltd.
// This file is part of polkadot-introspector.
//
Expand Down Expand Up @@ -27,7 +28,8 @@ use codec::{Compact, Decode, Encode};
use log::error;

use crate::core::subxt_subscription::polkadot::{
runtime_types as subxt_runtime_types, runtime_types::polkadot_primitives as polkadot_rt_primitives,
runtime_types as subxt_runtime_types,
runtime_types::{polkadot_primitives as polkadot_rt_primitives, polkadot_runtime_parachains::hrmp::HrmpChannel},
};

pub use subxt_runtime_types::polkadot_runtime::Call as SubxtCall;
Expand Down Expand Up @@ -67,6 +69,12 @@ pub enum RequestType {
GetSessionInfo(u32),
/// Get information about validators account keys in some session.
GetSessionAccountKeys(u32),
/// Get information about inbound HRMP channels, accepts block hash and destination ParaId
GetInboundHRMPChannels(<DefaultConfig as subxt::Config>::Hash, u32),
/// Get data from a specific inbound HRMP channel
GetHRMPData(<DefaultConfig as subxt::Config>::Hash, u32, u32),
/// Get information about inbound HRMP channels, accepts block hash and destination ParaId
GetOutboundHRMPChannels(<DefaultConfig as subxt::Config>::Hash, u32),
}

/// The `InherentData` constructed with the subxt API.
Expand Down Expand Up @@ -98,8 +106,12 @@ pub enum Response {
SessionIndex(u32),
/// Session info
SessionInfo(Option<polkadot_rt_primitives::v2::SessionInfo>),
/// Session info
/// Session keys
SessionAccountKeys(Option<Vec<AccountId32>>),
/// HRMP channels for some parachain (e.g. who are sending messages to us)
HRMPChannels(BTreeMap<u32, SubxtHrmpChannel>),
/// HRMP content for a specific channel
HRMPContent(Vec<Vec<u8>>),
}

#[derive(Debug)]
Expand Down Expand Up @@ -261,7 +273,68 @@ impl RequestExecutor {

match receiver.await {
Ok(Response::SessionAccountKeys(maybe_keys)) => maybe_keys,
_ => panic!("Expected SessionInfo, got something else."),
_ => panic!("Expected AccountKeys, got something else."),
}
}

pub async fn get_inbound_hrmp_channels(
&self,
url: String,
block_hash: <DefaultConfig as subxt::Config>::Hash,
para_id: u32,
) -> BTreeMap<u32, SubxtHrmpChannel> {
let (sender, receiver) = oneshot::channel::<Response>();
let request = Request {
url,
request_type: RequestType::GetInboundHRMPChannels(block_hash, para_id),
response_sender: sender,
};
self.to_api.send(request).await.expect("Channel closed");

match receiver.await {
Ok(Response::HRMPChannels(channels)) => channels,
_ => panic!("Expected HRMPChannels, got something else."),
}
}

pub async fn get_outbound_hrmp_channels(
&self,
url: String,
block_hash: <DefaultConfig as subxt::Config>::Hash,
para_id: u32,
) -> BTreeMap<u32, SubxtHrmpChannel> {
let (sender, receiver) = oneshot::channel::<Response>();
let request = Request {
url,
request_type: RequestType::GetOutboundHRMPChannels(block_hash, para_id),
response_sender: sender,
};
self.to_api.send(request).await.expect("Channel closed");

match receiver.await {
Ok(Response::HRMPChannels(channels)) => channels,
_ => panic!("Expected HRMPChannels, got something else."),
}
}

pub async fn get_hrmp_content(
&self,
url: String,
block_hash: <DefaultConfig as subxt::Config>::Hash,
receiver_id: u32,
sender_id: u32,
) -> Vec<Vec<u8>> {
let (sender, receiver) = oneshot::channel::<Response>();
let request = Request {
url,
request_type: RequestType::GetHRMPData(block_hash, receiver_id, sender_id),
response_sender: sender,
};
self.to_api.send(request).await.expect("Channel closed");

match receiver.await {
Ok(Response::HRMPContent(data)) => data,
_ => panic!("Expected HRMPInboundContent, got something else."),
}
}
}
Expand Down Expand Up @@ -335,6 +408,12 @@ pub(crate) async fn api_handler_task(mut api: Receiver<Request>) {
RequestType::GetSessionInfo(session_index) => subxt_get_session_info(api, session_index).await,
RequestType::GetSessionAccountKeys(session_index) =>
subxt_get_session_account_keys(api, session_index).await,
RequestType::GetInboundHRMPChannels(hash, para_id) =>
subxt_get_inbound_hrmp_channels(api, hash, para_id).await,
RequestType::GetOutboundHRMPChannels(hash, para_id) =>
subxt_get_outbound_hrmp_channels(api, hash, para_id).await,
RequestType::GetHRMPData(hash, para_id, sender) =>
subxt_get_hrmp_content(api, hash, para_id, sender).await,
}
} else {
// Remove the faulty websocket from connection pool.
Expand Down Expand Up @@ -501,6 +580,108 @@ async fn subxt_get_session_account_keys(
Ok(Response::SessionAccountKeys(session_keys))
}

/// A wrapper over subxt HRMP channel configuration
#[derive(Debug, Clone)]
pub struct SubxtHrmpChannel {
pub max_capacity: u32,
pub max_total_size: u32,
pub max_message_size: u32,
pub msg_count: u32,
pub total_size: u32,
pub mqc_head: Option<H256>,
pub sender_deposit: u128,
pub recipient_deposit: u128,
}

impl From<HrmpChannel> for SubxtHrmpChannel {
fn from(channel: HrmpChannel) -> Self {
SubxtHrmpChannel {
max_capacity: channel.max_capacity,
max_total_size: channel.max_total_size,
max_message_size: channel.max_message_size,
msg_count: channel.msg_count,
total_size: channel.total_size,
mqc_head: channel.mqc_head,
sender_deposit: channel.sender_deposit,
recipient_deposit: channel.recipient_deposit,
}
}
}

async fn subxt_get_inbound_hrmp_channels(
api: &polkadot::RuntimeApi<DefaultConfig, PolkadotExtrinsicParams<DefaultConfig>>,
block_hash: H256,
para_id: u32,
) -> Result {
use subxt_runtime_types::polkadot_parachain::primitives::{HrmpChannelId, Id};

let hrmp_channels = api
.storage()
.hrmp()
.hrmp_ingress_channels_index(&Id(para_id), Some(block_hash))
.await
.map_err(Error::SubxtError)?;
let mut channels_configuration: BTreeMap<u32, SubxtHrmpChannel> = BTreeMap::new();
for peer_parachain_id in hrmp_channels.into_iter().map(|id| id.0) {
let id = HrmpChannelId { sender: Id(peer_parachain_id), recipient: Id(para_id) };
api.storage()
.hrmp()
.hrmp_channels(&id, Some(block_hash))
.await
.map_err(Error::SubxtError)?
.map(|hrmp_channel_configuration| {
channels_configuration.insert(peer_parachain_id, hrmp_channel_configuration.into())
});
}
Ok(Response::HRMPChannels(channels_configuration))
}

async fn subxt_get_outbound_hrmp_channels(
api: &polkadot::RuntimeApi<DefaultConfig, PolkadotExtrinsicParams<DefaultConfig>>,
block_hash: H256,
para_id: u32,
) -> Result {
use subxt_runtime_types::polkadot_parachain::primitives::{HrmpChannelId, Id};

let hrmp_channels = api
.storage()
.hrmp()
.hrmp_egress_channels_index(&Id(para_id), Some(block_hash))
.await
.map_err(Error::SubxtError)?;
let mut channels_configuration: BTreeMap<u32, SubxtHrmpChannel> = BTreeMap::new();
for peer_parachain_id in hrmp_channels.into_iter().map(|id| id.0) {
let id = HrmpChannelId { sender: Id(para_id), recipient: Id(peer_parachain_id) };
api.storage()
.hrmp()
.hrmp_channels(&id, Some(block_hash))
.await
.map_err(Error::SubxtError)?
.map(|hrmp_channel_configuration| {
channels_configuration.insert(peer_parachain_id, hrmp_channel_configuration.into())
});
}
Ok(Response::HRMPChannels(channels_configuration))
}

async fn subxt_get_hrmp_content(
api: &polkadot::RuntimeApi<DefaultConfig, PolkadotExtrinsicParams<DefaultConfig>>,
block_hash: H256,
receiver: u32,
sender: u32,
) -> Result {
use subxt_runtime_types::polkadot_parachain::primitives::{HrmpChannelId, Id};

let id = HrmpChannelId { sender: Id(sender), recipient: Id(receiver) };
let hrmp_content = api
.storage()
.hrmp()
.hrmp_channel_contents(&id, Some(block_hash))
.await
.map_err(Error::SubxtError)?;
Ok(Response::HRMPContent(hrmp_content.into_iter().map(|hrmp_content| hrmp_content.data).collect()))
}

fn subxt_extract_parainherent(block: &subxt::rpc::ChainBlock<DefaultConfig>) -> Result {
// `ParaInherent` data is always at index #1.
let bytes = block.block.extrinsics[1].encode();
Expand Down
101 changes: 100 additions & 1 deletion src/pc/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ use crate::core::{
RequestExecutor, ValidatorIndex,
},
polkadot::runtime_types::polkadot_primitives::v2::{DisputeStatement, DisputeStatementSet},
SubxtHrmpChannel,
};
use codec::{Decode, Encode};
use color_eyre::owo_colors::OwoColorize;
use crossterm::style::Stylize;
use log::error;
use log::{debug, error};
use std::{
collections::BTreeMap,
fmt,
fmt::{Debug, Display},
};
Expand Down Expand Up @@ -61,6 +63,38 @@ struct SubxtSessionTracker {
prev_keys: Option<Vec<AccountId32>>,
}

/// A structure that tracks messages (UMP, HRMP, DMP etc)
pub struct SubxtMessageQueuesTracker {
/// Known inbound HRMP channels, indexed by source parachain id
pub inbound_hrmp_channels: BTreeMap<u32, SubxtHrmpChannel>,
/// Known outbound HRMP channels, indexed by source parachain id
pub outbound_hrmp_channels: BTreeMap<u32, SubxtHrmpChannel>,
}

impl SubxtMessageQueuesTracker {
/// Create new message queues tracker
pub fn new() -> Self {
Self { inbound_hrmp_channels: BTreeMap::new(), outbound_hrmp_channels: BTreeMap::new() }
}

/// Update the content of hrmp channels
pub fn update_hrmp_channels(
&mut self,
inbound_channels: BTreeMap<u32, SubxtHrmpChannel>,
outbound_channels: BTreeMap<u32, SubxtHrmpChannel>,
) {
debug!("hrmp channels configured: {:?} in, {:?} out", &inbound_channels, &outbound_channels);
self.inbound_hrmp_channels = inbound_channels;
self.outbound_hrmp_channels = outbound_channels;
}

/// Returns if there ae
pub fn has_hrmp_messages(&self) -> bool {
self.inbound_hrmp_channels.values().any(|channel| channel.total_size > 0) ||
self.outbound_hrmp_channels.values().any(|channel| channel.total_size > 0)
}
}

/// A subxt based parachain candidate tracker.
pub struct SubxtTracker {
/// Parachain ID to track.
Expand All @@ -85,6 +119,8 @@ pub struct SubxtTracker {
last_relay_block_ts: Option<u64>,
/// Session tracker
session_data: Option<SubxtSessionTracker>,
/// Messages queues status
message_queues: SubxtMessageQueuesTracker,
}

impl Display for SubxtTracker {
Expand Down Expand Up @@ -183,6 +219,17 @@ impl ParachainBlockTracker for SubxtTracker {
.await
{
self.set_relay_block(header.number, block_hash);
let inbound_hrmp_channels = self
.executor
.get_inbound_hrmp_channels(self.node_rpc_url.clone(), block_hash, self.para_id)
.await;
let outbound_hrmp_channels = self
.executor
.get_outbound_hrmp_channels(self.node_rpc_url.clone(), block_hash, self.para_id)
.await;
self.message_queues
.update_hrmp_channels(inbound_hrmp_channels, outbound_hrmp_channels);

let cur_session = self.executor.get_session_index(self.node_rpc_url.clone(), block_hash).await;
if let Some(stored_session) = self.get_current_session_index() {
if cur_session != stored_session {
Expand Down Expand Up @@ -230,6 +277,7 @@ impl SubxtTracker {
current_relay_block_ts: None,
last_relay_block_ts: None,
session_data: None,
message_queues: SubxtMessageQueuesTracker::new(),
}
}

Expand Down Expand Up @@ -403,6 +451,7 @@ impl SubxtTracker {

/// Updates cashed session with a new one, storing the previous session if needed
fn new_session(&mut self, session_index: u32, account_keys: Vec<AccountId32>) {
debug!("new session: {}", session_index);
if let Some(session_data) = &mut self.session_data {
let old_current = std::mem::replace(&mut session_data.current_keys, account_keys);
session_data.prev_keys.replace(old_current);
Expand Down Expand Up @@ -514,6 +563,52 @@ impl SubxtTracker {
Ok(())
}

fn display_hrmp(&self, f: &mut fmt::Formatter) -> fmt::Result {
let total: u32 = self
.message_queues
.inbound_hrmp_channels
.values()
.map(|channel| channel.total_size)
.sum();

if total > 0 {
writeln!(f, "\t👉 Inbound HRMP messages, received {} bytes in total", total)?;

for (peer_parachain, channel) in &self.message_queues.inbound_hrmp_channels {
if channel.total_size > 0 {
writeln!(
f,
"\t\t📩 From parachain: {}, {} bytes / {} max",
peer_parachain, channel.total_size, channel.max_message_size
)?;
}
}
}

let total: u32 = self
.message_queues
.outbound_hrmp_channels
.values()
.map(|channel| channel.total_size)
.sum();

if total > 0 {
writeln!(f, "\t👈 Outbound HRMP messages, sent {} bytes in total", total)?;

for (peer_parachain, channel) in &self.message_queues.outbound_hrmp_channels {
if channel.total_size > 0 {
writeln!(
f,
"\t\t📩 To parachain: {}, {} bytes / {} max",
peer_parachain, channel.total_size, channel.max_message_size
)?;
}
}
}

Ok(())
}

fn display_block_info(&self, f: &mut fmt::Formatter) -> fmt::Result {
if let Some(backed_candidate) = self.current_candidate.candidate.as_ref() {
let commitments_hash = BlakeTwo256::hash_of(&backed_candidate.candidate.commitments);
Expand All @@ -530,6 +625,10 @@ impl SubxtTracker {
self.display_disputes(f)?;
}

if self.message_queues.has_hrmp_messages() {
self.display_hrmp(f)?;
}

Ok(())
}

Expand Down

0 comments on commit 756d2fe

Please sign in to comment.