Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

get stake snapshots using pallas observer #3

Merged
merged 12 commits into from
Feb 22, 2024
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

176 changes: 172 additions & 4 deletions mithril-common/src/chain_observer/pallas_observer.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
use anyhow::{anyhow, Context};
use async_trait::async_trait;
use bech32::{self, ToBase32, Variant};
use pallas_addresses::Address;
use pallas_codec::utils::{Bytes, CborWrap, TagWrap};
use pallas_network::{
facades::NodeClient,
miniprotocols::localstate::{
queries_v16::{
self, Addr, Addrs, PostAlonsoTransactionOutput, TransactionOutput, UTxOByAddress,
self, Addr, Addrs, PostAlonsoTransactionOutput, StakeSnapshot, Stakes,
Dismissed Show dismissed Hide dismissed
Dismissed Show dismissed Hide dismissed
TransactionOutput, UTxOByAddress,
},
Client,
},
};

use pallas_primitives::ToCanonicalJson;
use std::path::{Path, PathBuf};
use std::{
collections::BTreeSet,
path::{Path, PathBuf},
};

use crate::chain_observer::interface::*;
use crate::chain_observer::{ChainAddress, TxDatum};
Expand Down Expand Up @@ -175,6 +181,62 @@ impl PallasChainObserver {
Ok(utxo)
}

/// Fetches the current stake distribution using the provided `statequery` client.
async fn do_stake_snapshots_state_query(
&self,
statequery: &mut Client,
) -> StdResult<StakeSnapshot> {
statequery
.acquire(None)
.await
.map_err(|err| anyhow!(err))
.with_context(|| "PallasChainObserver failed to acquire statequery")?;

let era = queries_v16::get_current_era(statequery)
.await
.map_err(|err| anyhow!(err))
.with_context(|| "PallasChainObserver failed to get current era")?;

let state_snapshot = queries_v16::get_stake_snapshots(statequery, era, BTreeSet::new())
.await
.map_err(|err| anyhow!(err))
.with_context(|| "PallasChainObserver failed to get stake snapshot")?;

Ok(state_snapshot)
}

fn get_stake_pool_hash(
&self,
key: &Bytes,
stakes: &Stakes,
) -> Result<(String, u64), ChainObserverError> {
let pool_hash = bech32::encode("pool", key.to_base32(), Variant::Bech32)
.map_err(|err| anyhow!(err))
.with_context(|| "PallasChainObserver failed to encode stake pool hash")?;

Ok((pool_hash, stakes.snapshot_mark_pool))
}

async fn get_stake_distribution_snapshot(
&self,
client: &mut NodeClient,
) -> Result<Option<StakeDistribution>, ChainObserverError> {
let statequery = client.statequery();

let stake_snapshot = self.do_stake_snapshots_state_query(statequery).await?;

let mut stake_distribution = StakeDistribution::new();

for (key, stakes) in stake_snapshot.snapshots.stake_snapshots.iter() {
if stakes.snapshot_mark_pool > 0 {
let (pool_hash, stake) = self.get_stake_pool_hash(key, stakes)?;
stake_distribution.insert(pool_hash, stake);
}
}

Ok(Some(stake_distribution))
falcucci marked this conversation as resolved.
Show resolved Hide resolved
}

/// Processes a state query with the `NodeClient`, releasing the state query.
async fn process_statequery(&self, client: &mut NodeClient) -> StdResult<()> {
let statequery = client.statequery();
Expand Down Expand Up @@ -245,8 +307,15 @@ impl ChainObserver for PallasChainObserver {
async fn get_current_stake_distribution(
&self,
) -> Result<Option<StakeDistribution>, ChainObserverError> {
let fallback = self.get_fallback();
fallback.get_current_stake_distribution().await
let mut client = self.get_client().await?;

let stake_pools = self.get_stake_distribution_snapshot(&mut client).await?;

self.post_process_statequery(&mut client).await?;

client.abort().await;

Ok(stake_pools)
}

async fn get_current_kes_period(
Expand Down Expand Up @@ -304,6 +373,64 @@ mod tests {
localstate::queries_v16::UTxOByAddress { utxo }
}

fn get_fake_stake_snapshot() -> StakeSnapshot {
let stake_snapshots = KeyValuePairs::from(vec![
(
Bytes::from(
hex::decode("00000036d515e12e18cd3c88c74f09a67984c2c279a5296aa96efe89")
.unwrap(),
),
localstate::queries_v16::Stakes {
snapshot_mark_pool: 300000000001,
snapshot_set_pool: 300000000002,
snapshot_go_pool: 300000000000,
},
),
(
Bytes::from(
hex::decode("000000f66e28b0f18aef20555f4c4954234e3270dfbbdcc13f54e799")
.unwrap(),
),
localstate::queries_v16::Stakes {
snapshot_mark_pool: 600000000001,
snapshot_set_pool: 600000000002,
snapshot_go_pool: 600000000000,
},
),
(
Bytes::from(
hex::decode("00000110093effbf3ce788aebd3e7506b80322bd3995ad432e61fad5")
.unwrap(),
),
localstate::queries_v16::Stakes {
snapshot_mark_pool: 1200000000001,
snapshot_set_pool: 1200000000002,
snapshot_go_pool: 1200000000000,
},
),
(
Bytes::from(
hex::decode("00000ffff93effbf3ce788aebd3e7506b80322bd3995ad432e61fad5")
.unwrap(),
),
localstate::queries_v16::Stakes {
snapshot_mark_pool: 0,
snapshot_set_pool: 1300000000002,
snapshot_go_pool: 0,
},
),
]);

localstate::queries_v16::StakeSnapshot {
snapshots: localstate::queries_v16::Snapshots {
stake_snapshots,
snapshot_stake_mark_total: 2100000000003,
snapshot_stake_set_total: 2100000000006,
snapshot_stake_go_total: 2100000000000,
},
}
}

/// pallas responses mock server.
async fn mock_server(server: &mut pallas_network::facades::NodeServer) -> AnyCbor {
let query: localstate::queries_v16::Request =
Expand All @@ -330,6 +457,12 @@ mod tests {
localstate::queries_v16::BlockQuery::GetUTxOByAddress(_),
),
) => AnyCbor::from_encode(get_fake_utxo_by_address()),
localstate::queries_v16::Request::LedgerQuery(
localstate::queries_v16::LedgerQuery::BlockQuery(
_,
localstate::queries_v16::BlockQuery::GetStakeSnapshots(_),
),
) => AnyCbor::from_encode(get_fake_stake_snapshot()),
_ => panic!("unexpected query from client: {query:?}"),
}
}
Expand Down Expand Up @@ -410,4 +543,39 @@ mod tests {
let datums = client_res.expect("Client failed");
assert_eq!(vec![TxDatum(r#"{"constructor":0,"fields":[{"bytes":"7b226d61726b657273223a5b7b226e616d65223a227468616c6573222c2265706f6368223a307d5d2c227369676e6174757265223a2238356632326562626164"},{"bytes":"33333537633865613264663036323039376639613138306464333564396633626131643236383263373263386431323238386661643863623864306365656562"},{"bytes":"366134643665383465653865353631376164323037313836366363313930373466326137366538373864663166393733346438343061227d"}]}"#.to_string())], datums);
}

#[tokio::test]
async fn get_current_stake_distribution_fallback() {
let server = setup_server().await;

let client = tokio::spawn(async move {
let socket_path = std::env::temp_dir().join("pallas_chain_observer_test/node.socket");
let fallback = CardanoCliChainObserver::new(Box::<TestCliRunner>::default());
let observer = super::PallasChainObserver::new(
socket_path.as_path(),
CardanoNetwork::TestNet(10),
fallback,
);
observer.get_current_stake_distribution().await.unwrap()
});

let (_, client_res) = tokio::join!(server, client);
let computed_stake_distribution = client_res.unwrap().unwrap();

let mut expected_stake_distribution = StakeDistribution::new();
expected_stake_distribution.insert(
"pool1qqqqqdk4zhsjuxxd8jyvwncf5eucfskz0xjjj64fdmlgj735lr9".to_string(),
300000000001,
);
expected_stake_distribution.insert(
"pool1qqqqpanw9zc0rzh0yp247nzf2s35uvnsm7aaesfl2nnejaev0uc".to_string(),
600000000001,
);
expected_stake_distribution.insert(
"pool1qqqqzyqf8mlm70883zht60n4q6uqxg4a8x266sewv8ad2grkztl".to_string(),
1200000000001,
);

assert_eq!(expected_stake_distribution, computed_stake_distribution);
}
}
Loading