Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Display HRMP/XCM channels for parachains #162

Merged
merged 8 commits into from
Nov 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 184 additions & 3 deletions src/core/api/subxt_wrapper.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::BTreeMap;
// Copyright 2022 Parity Technologies (UK) Ltd.
// This file is part of polkadot-introspector.
//
Expand Down Expand Up @@ -27,7 +28,8 @@ use codec::{Compact, Decode, Encode};
use log::error;

use crate::core::subxt_subscription::polkadot::{
runtime_types as subxt_runtime_types, runtime_types::polkadot_primitives as polkadot_rt_primitives,
runtime_types as subxt_runtime_types,
runtime_types::{polkadot_primitives as polkadot_rt_primitives, polkadot_runtime_parachains::hrmp::HrmpChannel},
};

pub use subxt_runtime_types::polkadot_runtime::Call as SubxtCall;
Expand Down Expand Up @@ -67,6 +69,12 @@ pub enum RequestType {
GetSessionInfo(u32),
/// Get information about validators account keys in some session.
GetSessionAccountKeys(u32),
/// Get information about inbound HRMP channels, accepts block hash and destination ParaId
GetInboundHRMPChannels(<DefaultConfig as subxt::Config>::Hash, u32),
/// Get data from a specific inbound HRMP channel
GetHRMPData(<DefaultConfig as subxt::Config>::Hash, u32, u32),
/// Get information about inbound HRMP channels, accepts block hash and destination ParaId
GetOutboundHRMPChannels(<DefaultConfig as subxt::Config>::Hash, u32),
}

/// The `InherentData` constructed with the subxt API.
Expand Down Expand Up @@ -98,8 +106,12 @@ pub enum Response {
SessionIndex(u32),
/// Session info
SessionInfo(Option<polkadot_rt_primitives::v2::SessionInfo>),
/// Session info
/// Session keys
SessionAccountKeys(Option<Vec<AccountId32>>),
/// HRMP channels for some parachain (e.g. who are sending messages to us)
HRMPChannels(BTreeMap<u32, SubxtHrmpChannel>),
/// HRMP content for a specific channel
HRMPContent(Vec<Vec<u8>>),
}

#[derive(Debug)]
Expand Down Expand Up @@ -261,7 +273,68 @@ impl RequestExecutor {

match receiver.await {
Ok(Response::SessionAccountKeys(maybe_keys)) => maybe_keys,
_ => panic!("Expected SessionInfo, got something else."),
_ => panic!("Expected AccountKeys, got something else."),
}
}

pub async fn get_inbound_hrmp_channels(
&self,
url: String,
block_hash: <DefaultConfig as subxt::Config>::Hash,
para_id: u32,
) -> BTreeMap<u32, SubxtHrmpChannel> {
let (sender, receiver) = oneshot::channel::<Response>();
let request = Request {
url,
request_type: RequestType::GetInboundHRMPChannels(block_hash, para_id),
response_sender: sender,
};
self.to_api.send(request).await.expect("Channel closed");

match receiver.await {
Ok(Response::HRMPChannels(channels)) => channels,
_ => panic!("Expected HRMPChannels, got something else."),
}
}

pub async fn get_outbound_hrmp_channels(
&self,
url: String,
block_hash: <DefaultConfig as subxt::Config>::Hash,
para_id: u32,
) -> BTreeMap<u32, SubxtHrmpChannel> {
let (sender, receiver) = oneshot::channel::<Response>();
let request = Request {
url,
request_type: RequestType::GetOutboundHRMPChannels(block_hash, para_id),
response_sender: sender,
};
self.to_api.send(request).await.expect("Channel closed");

match receiver.await {
Ok(Response::HRMPChannels(channels)) => channels,
_ => panic!("Expected HRMPChannels, got something else."),
}
}

pub async fn get_hrmp_content(
&self,
url: String,
block_hash: <DefaultConfig as subxt::Config>::Hash,
receiver_id: u32,
sender_id: u32,
) -> Vec<Vec<u8>> {
let (sender, receiver) = oneshot::channel::<Response>();
let request = Request {
url,
request_type: RequestType::GetHRMPData(block_hash, receiver_id, sender_id),
response_sender: sender,
};
self.to_api.send(request).await.expect("Channel closed");

match receiver.await {
Ok(Response::HRMPContent(data)) => data,
_ => panic!("Expected HRMPInboundContent, got something else."),
}
}
}
Expand Down Expand Up @@ -335,6 +408,12 @@ pub(crate) async fn api_handler_task(mut api: Receiver<Request>) {
RequestType::GetSessionInfo(session_index) => subxt_get_session_info(api, session_index).await,
RequestType::GetSessionAccountKeys(session_index) =>
subxt_get_session_account_keys(api, session_index).await,
RequestType::GetInboundHRMPChannels(hash, para_id) =>
subxt_get_inbound_hrmp_channels(api, hash, para_id).await,
RequestType::GetOutboundHRMPChannels(hash, para_id) =>
subxt_get_outbound_hrmp_channels(api, hash, para_id).await,
RequestType::GetHRMPData(hash, para_id, sender) =>
subxt_get_hrmp_content(api, hash, para_id, sender).await,
}
} else {
// Remove the faulty websocket from connection pool.
Expand Down Expand Up @@ -501,6 +580,108 @@ async fn subxt_get_session_account_keys(
Ok(Response::SessionAccountKeys(session_keys))
}

/// A wrapper over subxt HRMP channel configuration
#[derive(Debug, Clone)]
pub struct SubxtHrmpChannel {
pub max_capacity: u32,
pub max_total_size: u32,
pub max_message_size: u32,
pub msg_count: u32,
pub total_size: u32,
pub mqc_head: Option<H256>,
pub sender_deposit: u128,
pub recipient_deposit: u128,
}

impl From<HrmpChannel> for SubxtHrmpChannel {
fn from(channel: HrmpChannel) -> Self {
SubxtHrmpChannel {
max_capacity: channel.max_capacity,
max_total_size: channel.max_total_size,
max_message_size: channel.max_message_size,
msg_count: channel.msg_count,
total_size: channel.total_size,
mqc_head: channel.mqc_head,
sender_deposit: channel.sender_deposit,
recipient_deposit: channel.recipient_deposit,
}
}
}

async fn subxt_get_inbound_hrmp_channels(
api: &polkadot::RuntimeApi<DefaultConfig, PolkadotExtrinsicParams<DefaultConfig>>,
block_hash: H256,
para_id: u32,
) -> Result {
use subxt_runtime_types::polkadot_parachain::primitives::{HrmpChannelId, Id};

let hrmp_channels = api
.storage()
.hrmp()
.hrmp_ingress_channels_index(&Id(para_id), Some(block_hash))
.await
.map_err(Error::SubxtError)?;
let mut channels_configuration: BTreeMap<u32, SubxtHrmpChannel> = BTreeMap::new();
for peer_parachain_id in hrmp_channels.into_iter().map(|id| id.0) {
let id = HrmpChannelId { sender: Id(peer_parachain_id), recipient: Id(para_id) };
api.storage()
.hrmp()
.hrmp_channels(&id, Some(block_hash))
.await
.map_err(Error::SubxtError)?
.map(|hrmp_channel_configuration| {
channels_configuration.insert(peer_parachain_id, hrmp_channel_configuration.into())
});
}
Ok(Response::HRMPChannels(channels_configuration))
}

async fn subxt_get_outbound_hrmp_channels(
api: &polkadot::RuntimeApi<DefaultConfig, PolkadotExtrinsicParams<DefaultConfig>>,
block_hash: H256,
para_id: u32,
) -> Result {
use subxt_runtime_types::polkadot_parachain::primitives::{HrmpChannelId, Id};

let hrmp_channels = api
.storage()
.hrmp()
.hrmp_egress_channels_index(&Id(para_id), Some(block_hash))
.await
.map_err(Error::SubxtError)?;
let mut channels_configuration: BTreeMap<u32, SubxtHrmpChannel> = BTreeMap::new();
for peer_parachain_id in hrmp_channels.into_iter().map(|id| id.0) {
let id = HrmpChannelId { sender: Id(para_id), recipient: Id(peer_parachain_id) };
api.storage()
.hrmp()
.hrmp_channels(&id, Some(block_hash))
.await
.map_err(Error::SubxtError)?
.map(|hrmp_channel_configuration| {
channels_configuration.insert(peer_parachain_id, hrmp_channel_configuration.into())
});
}
Ok(Response::HRMPChannels(channels_configuration))
}

async fn subxt_get_hrmp_content(
api: &polkadot::RuntimeApi<DefaultConfig, PolkadotExtrinsicParams<DefaultConfig>>,
block_hash: H256,
receiver: u32,
sender: u32,
) -> Result {
use subxt_runtime_types::polkadot_parachain::primitives::{HrmpChannelId, Id};

let id = HrmpChannelId { sender: Id(sender), recipient: Id(receiver) };
let hrmp_content = api
.storage()
.hrmp()
.hrmp_channel_contents(&id, Some(block_hash))
.await
.map_err(Error::SubxtError)?;
Ok(Response::HRMPContent(hrmp_content.into_iter().map(|hrmp_content| hrmp_content.data).collect()))
}

fn subxt_extract_parainherent(block: &subxt::rpc::ChainBlock<DefaultConfig>) -> Result {
// `ParaInherent` data is always at index #1.
let bytes = block.block.extrinsics[1].encode();
Expand Down
101 changes: 100 additions & 1 deletion src/pc/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ use crate::core::{
RequestExecutor, ValidatorIndex,
},
polkadot::runtime_types::polkadot_primitives::v2::{DisputeStatement, DisputeStatementSet},
SubxtHrmpChannel,
};
use codec::{Decode, Encode};
use color_eyre::owo_colors::OwoColorize;
use crossterm::style::Stylize;
use log::error;
use log::{debug, error};
use std::{
collections::BTreeMap,
fmt,
fmt::{Debug, Display},
};
Expand Down Expand Up @@ -61,6 +63,38 @@ struct SubxtSessionTracker {
prev_keys: Option<Vec<AccountId32>>,
}

/// A structure that tracks messages (UMP, HRMP, DMP etc)
pub struct SubxtMessageQueuesTracker {
/// Known inbound HRMP channels, indexed by source parachain id
pub inbound_hrmp_channels: BTreeMap<u32, SubxtHrmpChannel>,
/// Known outbound HRMP channels, indexed by source parachain id
pub outbound_hrmp_channels: BTreeMap<u32, SubxtHrmpChannel>,
}

impl SubxtMessageQueuesTracker {
/// Create new message queues tracker
pub fn new() -> Self {
Self { inbound_hrmp_channels: BTreeMap::new(), outbound_hrmp_channels: BTreeMap::new() }
}

/// Update the content of hrmp channels
pub fn update_hrmp_channels(
&mut self,
inbound_channels: BTreeMap<u32, SubxtHrmpChannel>,
outbound_channels: BTreeMap<u32, SubxtHrmpChannel>,
) {
debug!("hrmp channels configured: {:?} in, {:?} out", &inbound_channels, &outbound_channels);
self.inbound_hrmp_channels = inbound_channels;
self.outbound_hrmp_channels = outbound_channels;
}

/// Returns if there ae
pub fn has_hrmp_messages(&self) -> bool {
self.inbound_hrmp_channels.values().any(|channel| channel.total_size > 0) ||
self.outbound_hrmp_channels.values().any(|channel| channel.total_size > 0)
}
}

/// A subxt based parachain candidate tracker.
pub struct SubxtTracker {
/// Parachain ID to track.
Expand All @@ -85,6 +119,8 @@ pub struct SubxtTracker {
last_relay_block_ts: Option<u64>,
/// Session tracker
session_data: Option<SubxtSessionTracker>,
/// Messages queues status
message_queues: SubxtMessageQueuesTracker,
}

impl Display for SubxtTracker {
Expand Down Expand Up @@ -183,6 +219,17 @@ impl ParachainBlockTracker for SubxtTracker {
.await
{
self.set_relay_block(header.number, block_hash);
let inbound_hrmp_channels = self
.executor
.get_inbound_hrmp_channels(self.node_rpc_url.clone(), block_hash, self.para_id)
.await;
let outbound_hrmp_channels = self
.executor
.get_outbound_hrmp_channels(self.node_rpc_url.clone(), block_hash, self.para_id)
.await;
self.message_queues
.update_hrmp_channels(inbound_hrmp_channels, outbound_hrmp_channels);

let cur_session = self.executor.get_session_index(self.node_rpc_url.clone(), block_hash).await;
if let Some(stored_session) = self.get_current_session_index() {
if cur_session != stored_session {
Expand Down Expand Up @@ -230,6 +277,7 @@ impl SubxtTracker {
current_relay_block_ts: None,
last_relay_block_ts: None,
session_data: None,
message_queues: SubxtMessageQueuesTracker::new(),
}
}

Expand Down Expand Up @@ -403,6 +451,7 @@ impl SubxtTracker {

/// Updates cashed session with a new one, storing the previous session if needed
fn new_session(&mut self, session_index: u32, account_keys: Vec<AccountId32>) {
debug!("new session: {}", session_index);
if let Some(session_data) = &mut self.session_data {
let old_current = std::mem::replace(&mut session_data.current_keys, account_keys);
session_data.prev_keys.replace(old_current);
Expand Down Expand Up @@ -514,6 +563,52 @@ impl SubxtTracker {
Ok(())
}

fn display_hrmp(&self, f: &mut fmt::Formatter) -> fmt::Result {
let total: u32 = self
.message_queues
.inbound_hrmp_channels
.values()
.map(|channel| channel.total_size)
.sum();

if total > 0 {
writeln!(f, "\t👉 Inbound HRMP messages, received {} bytes in total", total)?;

for (peer_parachain, channel) in &self.message_queues.inbound_hrmp_channels {
if channel.total_size > 0 {
writeln!(
f,
"\t\t📩 From parachain: {}, {} bytes / {} max",
peer_parachain, channel.total_size, channel.max_message_size
)?;
}
}
}

let total: u32 = self
.message_queues
.outbound_hrmp_channels
.values()
.map(|channel| channel.total_size)
.sum();

if total > 0 {
writeln!(f, "\t👈 Outbound HRMP messages, sent {} bytes in total", total)?;

for (peer_parachain, channel) in &self.message_queues.outbound_hrmp_channels {
if channel.total_size > 0 {
writeln!(
f,
"\t\t📩 To parachain: {}, {} bytes / {} max",
peer_parachain, channel.total_size, channel.max_message_size
)?;
}
}
}

Ok(())
}

fn display_block_info(&self, f: &mut fmt::Formatter) -> fmt::Result {
if let Some(backed_candidate) = self.current_candidate.candidate.as_ref() {
let commitments_hash = BlakeTwo256::hash_of(&backed_candidate.candidate.commitments);
Expand All @@ -530,6 +625,10 @@ impl SubxtTracker {
self.display_disputes(f)?;
}

if self.message_queues.has_hrmp_messages() {
self.display_hrmp(f)?;
}

Ok(())
}

Expand Down