Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Display HRMP/XCM channels for parachains #162

Merged
merged 8 commits into from
Nov 25, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 129 additions & 2 deletions src/core/api/subxt_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ pub enum RequestType {
GetSessionInfo(u32),
/// Get information about validators account keys in some session.
GetSessionAccountKeys(u32),
/// Get information about inbound HRMP channels, accepts block hash and destination ParaId
GetInboundHRMPChannels(<DefaultConfig as subxt::Config>::Hash, u32),
/// Get data from a specific inbound HRMP channel
GetHRMPData(<DefaultConfig as subxt::Config>::Hash, u32, u32),
/// Get information about inbound HRMP channels, accepts block hash and destination ParaId
GetOutboundHRMPChannels(<DefaultConfig as subxt::Config>::Hash, u32),
}

/// The `InherentData` constructed with the subxt API.
Expand Down Expand Up @@ -98,8 +104,12 @@ pub enum Response {
SessionIndex(u32),
/// Session info
SessionInfo(Option<polkadot_rt_primitives::v2::SessionInfo>),
/// Session info
/// Session keys
SessionAccountKeys(Option<Vec<AccountId32>>),
/// HRMP channels for some parachain (e.g. who are sending messages to us)
HRMPChannels(Vec<u32>),
/// HRMP content for a specific channel
HRMPContent(Vec<Vec<u8>>),
}

#[derive(Debug)]
Expand Down Expand Up @@ -261,7 +271,68 @@ impl RequestExecutor {

match receiver.await {
Ok(Response::SessionAccountKeys(maybe_keys)) => maybe_keys,
_ => panic!("Expected SessionInfo, got something else."),
_ => panic!("Expected AccountKeys, got something else."),
}
}

pub async fn get_inbound_hrmp_channels(
&self,
url: String,
block_hash: <DefaultConfig as subxt::Config>::Hash,
para_id: u32,
) -> Vec<u32> {
let (sender, receiver) = oneshot::channel::<Response>();
let request = Request {
url,
request_type: RequestType::GetInboundHRMPChannels(block_hash, para_id),
response_sender: sender,
};
self.to_api.send(request).await.expect("Channel closed");

match receiver.await {
Ok(Response::HRMPChannels(channels)) => channels,
_ => panic!("Expected HRMPChannels, got something else."),
}
}

pub async fn get_outbound_hrmp_channels(
&self,
url: String,
block_hash: <DefaultConfig as subxt::Config>::Hash,
para_id: u32,
) -> Vec<u32> {
let (sender, receiver) = oneshot::channel::<Response>();
let request = Request {
url,
request_type: RequestType::GetOutboundHRMPChannels(block_hash, para_id),
response_sender: sender,
};
self.to_api.send(request).await.expect("Channel closed");

match receiver.await {
Ok(Response::HRMPChannels(channels)) => channels,
_ => panic!("Expected HRMPChannels, got something else."),
}
}

pub async fn get_hrmp_content(
&self,
url: String,
block_hash: <DefaultConfig as subxt::Config>::Hash,
receiver_id: u32,
sender_id: u32,
) -> Vec<Vec<u8>> {
let (sender, receiver) = oneshot::channel::<Response>();
let request = Request {
url,
request_type: RequestType::GetHRMPData(block_hash, receiver_id, sender_id),
response_sender: sender,
};
self.to_api.send(request).await.expect("Channel closed");

match receiver.await {
Ok(Response::HRMPContent(data)) => data,
_ => panic!("Expected HRMPInboundContent, got something else."),
}
}
}
Expand Down Expand Up @@ -335,6 +406,12 @@ pub(crate) async fn api_handler_task(mut api: Receiver<Request>) {
RequestType::GetSessionInfo(session_index) => subxt_get_session_info(api, session_index).await,
RequestType::GetSessionAccountKeys(session_index) =>
subxt_get_session_account_keys(api, session_index).await,
RequestType::GetInboundHRMPChannels(hash, para_id) =>
subxt_get_inbound_hrmp_channels(api, hash, para_id).await,
RequestType::GetOutboundHRMPChannels(hash, para_id) =>
subxt_get_outbound_hrmp_channels(api, hash, para_id).await,
RequestType::GetHRMPData(hash, para_id, sender) =>
subxt_get_hrmp_content(api, hash, para_id, sender).await,
}
} else {
// Remove the faulty websocket from connection pool.
Expand Down Expand Up @@ -501,6 +578,56 @@ async fn subxt_get_session_account_keys(
Ok(Response::SessionAccountKeys(session_keys))
}

async fn subxt_get_inbound_hrmp_channels(
api: &polkadot::RuntimeApi<DefaultConfig, PolkadotExtrinsicParams<DefaultConfig>>,
block_hash: H256,
para_id: u32,
) -> Result {
use crate::core::api::subxt_wrapper::subxt_runtime_types::polkadot_parachain::primitives::Id;

let hrmp_channels = api
.storage()
.hrmp()
.hrmp_ingress_channels_index(&Id(para_id), Some(block_hash))
.await
.map_err(Error::SubxtError)?;
Ok(Response::HRMPChannels(hrmp_channels.into_iter().map(|id| id.0).collect()))
}

async fn subxt_get_outbound_hrmp_channels(
api: &polkadot::RuntimeApi<DefaultConfig, PolkadotExtrinsicParams<DefaultConfig>>,
block_hash: H256,
para_id: u32,
) -> Result {
use crate::core::api::subxt_wrapper::subxt_runtime_types::polkadot_parachain::primitives::Id;

let hrmp_channels = api
.storage()
.hrmp()
.hrmp_egress_channels_index(&Id(para_id), Some(block_hash))
.await
.map_err(Error::SubxtError)?;
Ok(Response::HRMPChannels(hrmp_channels.into_iter().map(|id| id.0).collect()))
}

async fn subxt_get_hrmp_content(
api: &polkadot::RuntimeApi<DefaultConfig, PolkadotExtrinsicParams<DefaultConfig>>,
block_hash: H256,
receiver: u32,
sender: u32,
) -> Result {
use crate::core::api::subxt_wrapper::subxt_runtime_types::polkadot_parachain::primitives::{HrmpChannelId, Id};

let id = HrmpChannelId { sender: Id(sender), recipient: Id(receiver) };
let hrmp_content = api
.storage()
.hrmp()
.hrmp_channel_contents(&id, Some(block_hash))
.await
.map_err(Error::SubxtError)?;
Ok(Response::HRMPContent(hrmp_content.into_iter().map(|hrmp_content| hrmp_content.data).collect()))
}

fn subxt_extract_parainherent(block: &subxt::rpc::ChainBlock<DefaultConfig>) -> Result {
// `ParaInherent` data is always at index #1.
let bytes = block.block.extrinsics[1].encode();
Expand Down
81 changes: 80 additions & 1 deletion src/pc/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ use crate::core::{
use codec::{Decode, Encode};
use color_eyre::owo_colors::OwoColorize;
use crossterm::style::Stylize;
use log::error;
use log::{debug, error};
use std::{
collections::BTreeMap,
fmt,
fmt::{Debug, Display},
};
Expand Down Expand Up @@ -85,6 +86,14 @@ pub struct SubxtTracker {
last_relay_block_ts: Option<u64>,
/// Session tracker
session_data: Option<SubxtSessionTracker>,
/// Known inbound HRMP channels, indexed by source parachain id
inbound_hrmp_channels: Vec<u32>,
vstakhov marked this conversation as resolved.
Show resolved Hide resolved
/// Inbound hrmp content for the parachain, indexed by sender parachain id
inbound_hrmp_messages: BTreeMap<u32, Vec<u8>>,
/// Known outbound HRMP channels, indexed by source parachain id
outbound_hrmp_channels: Vec<u32>,
/// Inbound hrmp content for the parachain, indexed by sender parachain id
outbound_hrmp_messages: BTreeMap<u32, Vec<u8>>,
vstakhov marked this conversation as resolved.
Show resolved Hide resolved
}

impl Display for SubxtTracker {
Expand Down Expand Up @@ -183,6 +192,33 @@ impl ParachainBlockTracker for SubxtTracker {
.await
{
self.set_relay_block(header.number, block_hash);
let inbound_hrmp_channels = self
.executor
.get_inbound_hrmp_channels(self.node_rpc_url.clone(), block_hash, self.para_id)
.await;
let outbound_hrmp_channels = self
.executor
.get_outbound_hrmp_channels(self.node_rpc_url.clone(), block_hash, self.para_id)
.await;
self.update_hrmp_channels(inbound_hrmp_channels, outbound_hrmp_channels);

for channel in &self.inbound_hrmp_channels {
let content = self
.executor
.get_hrmp_content(self.node_rpc_url.clone(), block_hash, self.para_id, *channel)
.await;
self.inbound_hrmp_messages
.insert(*channel, content.into_iter().flatten().collect());
}
for channel in &self.outbound_hrmp_channels {
let content = self
.executor
.get_hrmp_content(self.node_rpc_url.clone(), block_hash, *channel, self.para_id)
.await;
self.outbound_hrmp_messages
.insert(*channel, content.into_iter().flatten().collect());
}

let cur_session = self.executor.get_session_index(self.node_rpc_url.clone(), block_hash).await;
if let Some(stored_session) = self.get_current_session_index() {
if cur_session != stored_session {
Expand Down Expand Up @@ -230,6 +266,10 @@ impl SubxtTracker {
current_relay_block_ts: None,
last_relay_block_ts: None,
session_data: None,
inbound_hrmp_channels: vec![],
inbound_hrmp_messages: BTreeMap::new(),
outbound_hrmp_channels: vec![],
outbound_hrmp_messages: BTreeMap::new(),
}
}

Expand Down Expand Up @@ -403,6 +443,7 @@ impl SubxtTracker {

/// Updates cashed session with a new one, storing the previous session if needed
fn new_session(&mut self, session_index: u32, account_keys: Vec<AccountId32>) {
debug!("new session: {}", session_index);
if let Some(session_data) = &mut self.session_data {
let old_current = std::mem::replace(&mut session_data.current_keys, account_keys);
session_data.prev_keys.replace(old_current);
Expand All @@ -412,6 +453,14 @@ impl SubxtTracker {
}
}

fn update_hrmp_channels(&mut self, inbound_channels: Vec<u32>, outbound_channels: Vec<u32>) {
debug!("hrmp channels configured: {:?} in, {:?} out", &inbound_channels, &outbound_channels);
self.inbound_hrmp_channels = inbound_channels;
self.outbound_hrmp_channels = outbound_channels;
self.inbound_hrmp_messages.clear();
self.outbound_hrmp_messages.clear();
}

fn display_core_assignment(&self, f: &mut fmt::Formatter) -> fmt::Result {
if let Some(assigned_core) = self.last_assignment {
writeln!(f, "\t- Parachain {} assigned to core index {}", self.para_id, assigned_core)
Expand Down Expand Up @@ -514,6 +563,30 @@ impl SubxtTracker {
Ok(())
}

fn display_hrmp(&self, f: &mut fmt::Formatter) -> fmt::Result {
let total: usize = self.inbound_hrmp_messages.values().map(|data| data.len()).sum();

if total > 0 {
writeln!(f, "\t👉 Inbound HRMP messages, received {} bytes in total", total)?;

for (input_parachain, data) in self.inbound_hrmp_messages.iter() {
writeln!(f, "\t\t📩 From parachain: {}, {} bytes", input_parachain, data.len())?;
}
}

let total: usize = self.outbound_hrmp_messages.values().map(|data| data.len()).sum();

if total > 0 {
writeln!(f, "\t👈 Outbound HRMP messages, sent {} bytes in total", total)?;

for (output, data) in self.outbound_hrmp_messages.iter() {
writeln!(f, "\t\t📩 To parachain: {}, {} bytes", output, data.len())?;
}
}

Ok(())
}

fn display_block_info(&self, f: &mut fmt::Formatter) -> fmt::Result {
if let Some(backed_candidate) = self.current_candidate.candidate.as_ref() {
let commitments_hash = BlakeTwo256::hash_of(&backed_candidate.candidate.commitments);
Expand All @@ -530,6 +603,12 @@ impl SubxtTracker {
self.display_disputes(f)?;
}

if self.inbound_hrmp_messages.values().any(|data| !data.is_empty()) ||
self.outbound_hrmp_messages.values().any(|data| !data.is_empty())
{
self.display_hrmp(f)?;
}

Ok(())
}

Expand Down