From 72fa7ee6e2c5ca929c0f27e2a8df334653435913 Mon Sep 17 00:00:00 2001 From: Alex Coats Date: Thu, 27 Apr 2023 15:16:51 -0400 Subject: [PATCH] Add richest addresses measurements to analytics --- docker/docker-compose.yml | 130 +++++++++--------- src/analytics/influx.rs | 95 +++++++++---- src/analytics/ledger/address_balance.rs | 34 ++++- src/analytics/ledger/mod.rs | 2 +- src/analytics/mod.rs | 44 +++--- src/bin/inx-chronicle/cli/analytics.rs | 6 +- src/bin/inx-chronicle/inx/influx/analytics.rs | 6 +- .../payload/transaction/output/address/mod.rs | 7 + src/model/mod.rs | 2 +- tests/ledger_updates.rs | 2 +- 10 files changed, 194 insertions(+), 134 deletions(-) diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 07433abf7..21afcfb5e 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -7,40 +7,40 @@ services: command: ["--quiet", "--logpath", "/dev/null"] volumes: - ./data/chronicle/mongodb:/data/db - environment: - - MONGO_INITDB_ROOT_USERNAME=${MONGODB_USERNAME} - - MONGO_INITDB_ROOT_PASSWORD=${MONGODB_PASSWORD} + # environment: + # - MONGO_INITDB_ROOT_USERNAME=${MONGODB_USERNAME} + # - MONGO_INITDB_ROOT_PASSWORD=${MONGODB_PASSWORD} ports: - 27017:27017 - inx-chronicle: - container_name: inx-chronicle - depends_on: - influx: - condition: service_started - hornet: - condition: service_healthy - build: - context: .. - dockerfile: docker/Dockerfile.debug - image: inx-chronicle:dev - ports: - - "8042:8042/tcp" # REST API - - "9100:9100/tcp" # Metrics - tty: true - deploy: - restart_policy: - condition: on-failure - delay: 5s - max_attempts: 3 - command: - - "--mongodb-conn-str=${MONGODB_CONN_STR}" - - "--influxdb-url=http://influx:8086" - - "--influxdb-username=${INFLUXDB_USERNAME}" - - "--influxdb-password=${INFLUXDB_PASSWORD}" - - "--inx-url=http://hornet:9029" - - "--jwt-password=${JWT_PASSWORD}" - - "--jwt-salt=${JWT_SALT}" + # inx-chronicle: + # container_name: inx-chronicle + # depends_on: + # influx: + # condition: service_started + # hornet: + # condition: service_healthy + # build: + # context: .. + # dockerfile: docker/Dockerfile.debug + # image: inx-chronicle:dev + # ports: + # - "8042:8042/tcp" # REST API + # - "9100:9100/tcp" # Metrics + # tty: true + # deploy: + # restart_policy: + # condition: on-failure + # delay: 5s + # max_attempts: 3 + # command: + # - "--mongodb-conn-str=${MONGODB_CONN_STR}" + # - "--influxdb-url=http://influx:8086" + # - "--influxdb-username=${INFLUXDB_USERNAME}" + # - "--influxdb-password=${INFLUXDB_PASSWORD}" + # - "--inx-url=http://hornet:9029" + # - "--jwt-password=${JWT_PASSWORD}" + # - "--jwt-salt=${JWT_SALT}" influx: image: influxdb:1.8 @@ -55,39 +55,39 @@ services: ports: - 8086:8086 - hornet: - image: iotaledger/hornet:2.0-rc - container_name: hornet - ulimits: - nofile: - soft: 8192 - hard: 8192 - stop_grace_period: 5m - ports: - - "15600:15600/tcp" # Gossip - - "14626:14626/udp" # Autopeering - - "14265:14265/tcp" # REST API - - "8081:8081/tcp" # Dashboard - - "8091:8091/tcp" # Faucet - - "9311:9311/tcp" # Prometheus - - "9029:9029/tcp" # INX - cap_drop: - - ALL - volumes: - - ./data/hornet/alphanet/:/app/alphanet - - ./data/hornet/testnet/:/app/testnet - - ./data/hornet/shimmer/:/app/shimmer - - ./config.testnet.hornet.json:/app/config_testnet.json:ro - - ./config.alphanet.hornet.json:/app/config_alphanet.json:ro - command: - # We can connect to the non-default networks by choosing a different Hornet configuration file. - # - "-c" - # - "config_testnet.json" - # - "config_alphanet.json" - - "--inx.enabled=true" - - "--inx.bindAddress=hornet:9029" - - "--prometheus.enabled=true" - - "--prometheus.bindAddress=0.0.0.0:9311" + # hornet: + # image: iotaledger/hornet:2.0-rc + # container_name: hornet + # ulimits: + # nofile: + # soft: 8192 + # hard: 8192 + # stop_grace_period: 5m + # ports: + # - "15600:15600/tcp" # Gossip + # - "14626:14626/udp" # Autopeering + # - "14265:14265/tcp" # REST API + # - "8081:8081/tcp" # Dashboard + # - "8091:8091/tcp" # Faucet + # - "9311:9311/tcp" # Prometheus + # - "9029:9029/tcp" # INX + # cap_drop: + # - ALL + # volumes: + # - ./data/hornet/alphanet/:/app/alphanet + # - ./data/hornet/testnet/:/app/testnet + # - ./data/hornet/shimmer/:/app/shimmer + # - ./config.testnet.hornet.json:/app/config_testnet.json:ro + # - ./config.alphanet.hornet.json:/app/config_alphanet.json:ro + # command: + # # We can connect to the non-default networks by choosing a different Hornet configuration file. + # # - "-c" + # # - "config_testnet.json" + # # - "config_alphanet.json" + # - "--inx.enabled=true" + # - "--inx.bindAddress=hornet:9029" + # - "--prometheus.enabled=true" + # - "--prometheus.bindAddress=0.0.0.0:9311" ################################################################################ # The following services can be enabled by setting the `debug` profile. @@ -139,7 +139,7 @@ services: ports: - 9216:9261 command: - - "--mongodb.uri=mongodb://${MONGODB_USERNAME}:${MONGODB_PASSWORD}@mongo:27017" + - "--mongodb.uri=mongodb://mongo:27017" - "--mongodb.direct-connect=true" - "--web.listen-address=:9216" - "--log.level=info" diff --git a/src/analytics/influx.rs b/src/analytics/influx.rs index 165a79b04..f6770fa64 100644 --- a/src/analytics/influx.rs +++ b/src/analytics/influx.rs @@ -7,14 +7,17 @@ use influxdb::{InfluxDbWriteable, WriteQuery}; use super::{ ledger::{ - AddressActivityMeasurement, AddressBalanceMeasurement, BaseTokenActivityMeasurement, LedgerOutputMeasurement, - LedgerSizeMeasurement, OutputActivityMeasurement, TransactionSizeMeasurement, UnclaimedTokenMeasurement, - UnlockConditionMeasurement, + AddressActivityMeasurement, AddressBalanceMeasurement, AddressesWithBalanceMeasurement, + BaseTokenActivityMeasurement, LedgerOutputMeasurement, LedgerSizeMeasurement, OutputActivityMeasurement, + TransactionSizeMeasurement, UnclaimedTokenMeasurement, UnlockConditionMeasurement, }, tangle::{BlockActivityMeasurement, MilestoneSizeMeasurement}, AnalyticsInterval, PerInterval, PerMilestone, }; -use crate::{db::influxdb::InfluxDb, model::ProtocolParameters}; +use crate::{ + db::influxdb::InfluxDb, + model::{payload::milestone::MilestoneIndexTimestamp, ProtocolParameters}, +}; /// A trait that defines an InfluxDb measurement. trait Measurement { @@ -46,45 +49,69 @@ impl AddFields for WriteQuery { } } -pub trait PrepareQuery: Send + Sync { - fn prepare_query(&self) -> Vec; +pub trait PerMilestoneQuery: Send + Sync { + fn per_milestone_query(&self, at: MilestoneIndexTimestamp) -> Vec; } -impl PrepareQuery for Box { - fn prepare_query(&self) -> Vec { - (**self).prepare_query() +impl PerMilestoneQuery for Box { + fn per_milestone_query(&self, at: MilestoneIndexTimestamp) -> Vec { + (&**self).per_milestone_query(at) } } -impl PrepareQuery for PerMilestone -where - M: Measurement, -{ - fn prepare_query(&self) -> Vec { +impl PerMilestoneQuery for M { + fn per_milestone_query(&self, at: MilestoneIndexTimestamp) -> Vec { vec![ - influxdb::Timestamp::from(self.at.milestone_timestamp) + influxdb::Timestamp::from(at.milestone_timestamp) .into_query(M::NAME) - .add_field("milestone_index", self.at.milestone_index) - .add_fields(&self.inner), + .add_field("milestone_index", at.milestone_index) + .add_fields(self), ] } } -impl PrepareQuery for PerMilestone> { +impl PerMilestoneQuery for Vec { + fn per_milestone_query(&self, at: MilestoneIndexTimestamp) -> Vec { + self.iter().flat_map(|inner| inner.per_milestone_query(at)).collect() + } +} + +impl PerMilestoneQuery for Option { + fn per_milestone_query(&self, at: MilestoneIndexTimestamp) -> Vec { + self.iter().flat_map(|inner| inner.per_milestone_query(at)).collect() + } +} + +macro_rules! impl_per_milestone_query_tuple { + ($($idx:tt $t:tt),+) => { + impl<$($t: PerMilestoneQuery + Send + Sync),*> PerMilestoneQuery for ($($t),*,) + { + fn per_milestone_query(&self, at: MilestoneIndexTimestamp) -> Vec { + let mut queries = Vec::new(); + $( + queries.extend(self.$idx.per_milestone_query(at)); + )* + queries + } + } + }; +} +// Right now we only need this one +impl_per_milestone_query_tuple!(0 T0, 1 T1); + +pub(crate) trait PrepareQuery: Send + Sync { + fn prepare_query(&self) -> Vec; +} + +impl PrepareQuery for Box { fn prepare_query(&self) -> Vec { - self.inner.iter().flat_map(|inner| inner.prepare_query()).collect() + (**self).prepare_query() } } -impl PrepareQuery for PerMilestone> -where - M: Measurement, -{ +impl PrepareQuery for PerMilestone { fn prepare_query(&self) -> Vec { - self.inner - .iter() - .flat_map(|inner| PerMilestone { at: self.at, inner }.prepare_query()) - .collect() + self.inner.per_milestone_query(self.at) } } @@ -101,8 +128,8 @@ where } } -impl Measurement for AddressBalanceMeasurement { - const NAME: &'static str = "stardust_addresses"; +impl Measurement for AddressesWithBalanceMeasurement { + const NAME: &'static str = "stardust_addresses_with_balance"; fn add_fields(&self, query: WriteQuery) -> WriteQuery { let mut query = query.add_field("address_with_balance_count", self.address_with_balance_count as u64); @@ -115,6 +142,16 @@ impl Measurement for AddressBalanceMeasurement { } } +impl Measurement for AddressBalanceMeasurement { + const NAME: &'static str = "stardust_address_balance"; + + fn add_fields(&self, query: WriteQuery) -> WriteQuery { + query + .add_tag("address", self.address.clone()) + .add_field("balance", self.balance.0) + } +} + impl Measurement for BaseTokenActivityMeasurement { const NAME: &'static str = "stardust_base_token_activity"; diff --git a/src/analytics/ledger/address_balance.rs b/src/analytics/ledger/address_balance.rs index afc4a41b2..fcb402ea5 100644 --- a/src/analytics/ledger/address_balance.rs +++ b/src/analytics/ledger/address_balance.rs @@ -7,7 +7,7 @@ use super::*; use crate::model::utxo::{Address, TokenAmount}; #[derive(Debug)] -pub(crate) struct AddressBalanceMeasurement { +pub(crate) struct AddressesWithBalanceMeasurement { pub(crate) address_with_balance_count: usize, pub(crate) token_distribution: Vec, } @@ -21,6 +21,13 @@ pub(crate) struct DistributionStat { pub(crate) total_amount: TokenAmount, } +/// Statistics for an address's balance. +#[derive(Clone, Debug)] +pub(crate) struct AddressBalanceMeasurement { + pub(crate) address: String, + pub(crate) balance: TokenAmount, +} + /// Computes the number of addresses the currently hold a balance. #[derive(Serialize, Deserialize)] pub(crate) struct AddressBalancesAnalytics { @@ -41,7 +48,7 @@ impl AddressBalancesAnalytics { } impl Analytics for AddressBalancesAnalytics { - type Measurement = AddressBalanceMeasurement; + type Measurement = (AddressesWithBalanceMeasurement, Vec); fn handle_transaction(&mut self, consumed: &[LedgerSpent], created: &[LedgerOutput], _ctx: &dyn AnalyticsContext) { for output in consumed { @@ -74,9 +81,24 @@ impl Analytics for AddressBalancesAnalytics { token_distribution[index].address_count += 1; token_distribution[index].total_amount += *amount; } - AddressBalanceMeasurement { - address_with_balance_count: self.balances.len(), - token_distribution, - } + + let mut sorted_addresses = self.balances.iter().collect::>(); + sorted_addresses.sort_unstable_by_key(|(_, v)| v.0); + let bech32_hrp = &ctx.protocol_params().bech32_hrp; + ( + AddressesWithBalanceMeasurement { + address_with_balance_count: self.balances.len(), + token_distribution, + }, + sorted_addresses + .iter() + .rev() + .take(100) + .map(|(a, v)| AddressBalanceMeasurement { + address: a.to_bech32(bech32_hrp), + balance: **v, + }) + .collect(), + ) } } diff --git a/src/analytics/ledger/mod.rs b/src/analytics/ledger/mod.rs index 1a6131024..fa779d121 100644 --- a/src/analytics/ledger/mod.rs +++ b/src/analytics/ledger/mod.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; pub(super) use self::{ active_addresses::{AddressActivityAnalytics, AddressActivityMeasurement}, - address_balance::{AddressBalanceMeasurement, AddressBalancesAnalytics}, + address_balance::{AddressBalanceMeasurement, AddressBalancesAnalytics, AddressesWithBalanceMeasurement}, base_token::BaseTokenActivityMeasurement, ledger_outputs::LedgerOutputMeasurement, ledger_size::{LedgerSizeAnalytics, LedgerSizeMeasurement}, diff --git a/src/analytics/mod.rs b/src/analytics/mod.rs index 98c986a07..ce1d63789 100644 --- a/src/analytics/mod.rs +++ b/src/analytics/mod.rs @@ -7,7 +7,7 @@ use futures::TryStreamExt; use thiserror::Error; use self::{ - influx::PrepareQuery, + influx::{PerMilestoneQuery, PrepareQuery}, ledger::{ AddressActivityAnalytics, AddressActivityMeasurement, AddressBalancesAnalytics, BaseTokenActivityMeasurement, LedgerOutputMeasurement, LedgerSizeAnalytics, OutputActivityMeasurement, TransactionSizeMeasurement, @@ -75,12 +75,12 @@ pub trait Analytics { trait DynAnalytics: Send { fn handle_transaction(&mut self, consumed: &[LedgerSpent], created: &[LedgerOutput], ctx: &dyn AnalyticsContext); fn handle_block(&mut self, block_data: &BlockData, ctx: &dyn AnalyticsContext); - fn take_measurement(&mut self, ctx: &dyn AnalyticsContext) -> Box; + fn take_measurement(&mut self, ctx: &dyn AnalyticsContext) -> Box; } impl DynAnalytics for T where - PerMilestone: 'static + PrepareQuery, + T::Measurement: 'static + PerMilestoneQuery, { fn handle_transaction(&mut self, consumed: &[LedgerSpent], created: &[LedgerOutput], ctx: &dyn AnalyticsContext) { Analytics::handle_transaction(self, consumed, created, ctx) @@ -90,11 +90,8 @@ where Analytics::handle_block(self, block_data, ctx) } - fn take_measurement(&mut self, ctx: &dyn AnalyticsContext) -> Box { - Box::new(PerMilestone { - at: *ctx.at(), - inner: Analytics::take_measurement(self, ctx), - }) as _ + fn take_measurement(&mut self, ctx: &dyn AnalyticsContext) -> Box { + Box::new(Analytics::take_measurement(self, ctx)) as _ } } @@ -172,26 +169,19 @@ impl Analytic { } } -impl> Analytics for T { - type Measurement = Vec>; +impl Analytics for Analytic { + type Measurement = Box; fn handle_block(&mut self, block_data: &BlockData, ctx: &dyn AnalyticsContext) { - for analytic in self.as_mut().iter_mut() { - analytic.0.handle_block(block_data, ctx); - } + self.0.handle_block(block_data, ctx); } fn handle_transaction(&mut self, consumed: &[LedgerSpent], created: &[LedgerOutput], ctx: &dyn AnalyticsContext) { - for analytic in self.as_mut().iter_mut() { - analytic.0.handle_transaction(consumed, created, ctx); - } + self.0.handle_transaction(consumed, created, ctx); } fn take_measurement(&mut self, ctx: &dyn AnalyticsContext) -> Self::Measurement { - self.as_mut() - .iter_mut() - .map(|analytic| analytic.0.take_measurement(ctx)) - .collect() + self.0.take_measurement(ctx) } } @@ -230,7 +220,7 @@ impl<'a, I: InputSource> Milestone<'a, I> { influxdb: &InfluxDb, ) -> eyre::Result<()> where - PerMilestone: 'static + PrepareQuery, + A::Measurement: 'static + PerMilestoneQuery, { let mut cone_stream = self.cone_stream().await?; @@ -239,7 +229,10 @@ impl<'a, I: InputSource> Milestone<'a, I> { } influxdb - .insert_measurement((analytics as &mut dyn DynAnalytics).take_measurement(self)) + .insert_measurement(PerMilestone { + at: self.at, + inner: (analytics as &mut dyn DynAnalytics).take_measurement(self), + }) .await?; Ok(()) @@ -380,7 +373,8 @@ mod test { use super::{ ledger::{ AddressActivityAnalytics, AddressActivityMeasurement, AddressBalanceMeasurement, - BaseTokenActivityMeasurement, LedgerSizeMeasurement, OutputActivityMeasurement, TransactionSizeMeasurement, + AddressesWithBalanceMeasurement, BaseTokenActivityMeasurement, LedgerSizeMeasurement, + OutputActivityMeasurement, TransactionSizeMeasurement, }, tangle::{BlockActivityMeasurement, MilestoneSizeMeasurement}, Analytics, AnalyticsContext, @@ -463,7 +457,7 @@ mod test { #[derive(Debug)] struct TestMeasurements { active_addresses: AddressActivityMeasurement, - address_balance: AddressBalanceMeasurement, + address_balance: (AddressesWithBalanceMeasurement, Vec), base_tokens: BaseTokenActivityMeasurement, ledger_outputs: LedgerOutputMeasurement, ledger_size: LedgerSizeMeasurement, @@ -543,7 +537,7 @@ mod test { } assert_expected!(analytics.active_addresses.count); - assert_expected!(analytics.address_balance.address_with_balance_count); + assert_expected!(analytics.address_balance.0.address_with_balance_count); assert_expected!(analytics.base_tokens.booked_amount.0); assert_expected!(analytics.base_tokens.transferred_amount.0); diff --git a/src/bin/inx-chronicle/cli/analytics.rs b/src/bin/inx-chronicle/cli/analytics.rs index 3e29efea0..9c69b2bf8 100644 --- a/src/bin/inx-chronicle/cli/analytics.rs +++ b/src/bin/inx-chronicle/cli/analytics.rs @@ -273,9 +273,9 @@ pub async fn fill_analytics( } // Unwrap: safe because we guarantee it is initialized above - milestone - .update_analytics(&mut state.as_mut().unwrap().analytics, &influx_db) - .await?; + for analytic in &mut state.as_mut().unwrap().analytics { + milestone.update_analytics(analytic, &influx_db).await?; + } let elapsed = start_time.elapsed(); #[cfg(feature = "metrics")] diff --git a/src/bin/inx-chronicle/inx/influx/analytics.rs b/src/bin/inx-chronicle/inx/influx/analytics.rs index 02f76be58..77e467058 100644 --- a/src/bin/inx-chronicle/inx/influx/analytics.rs +++ b/src/bin/inx-chronicle/inx/influx/analytics.rs @@ -81,9 +81,9 @@ impl InxWorker { } // Unwrap: safe because we guarantee it is initialized above - milestone - .update_analytics(&mut state.as_mut().unwrap().analytics, influx_db) - .await?; + for analytic in &mut state.as_mut().unwrap().analytics { + milestone.update_analytics(analytic, influx_db).await?; + } } } diff --git a/src/model/block/payload/transaction/output/address/mod.rs b/src/model/block/payload/transaction/output/address/mod.rs index e1cbfb68d..fe6d5d9de 100644 --- a/src/model/block/payload/transaction/output/address/mod.rs +++ b/src/model/block/payload/transaction/output/address/mod.rs @@ -27,6 +27,13 @@ pub enum Address { Nft(NftAddress), } +impl Address { + /// Encodes this address to a bech32 string with the given Human Readable Part as prefix. + pub fn to_bech32(&self, bech32_hrp: &str) -> String { + iota::Address::from(*self).to_bech32(bech32_hrp) + } +} + impl From for Address { fn from(value: iota::Address) -> Self { match value { diff --git a/src/model/mod.rs b/src/model/mod.rs index b2b8630d2..5daf1b108 100644 --- a/src/model/mod.rs +++ b/src/model/mod.rs @@ -19,7 +19,7 @@ pub mod utxo { //! A logical grouping of UTXO types for convenience. pub use super::block::payload::transaction::{ input::*, - output::{address::*, unlock_condition::*, *}, + output::{address::*, unlock_condition, *}, unlock::*, }; } diff --git a/tests/ledger_updates.rs b/tests/ledger_updates.rs index 61d44a6f1..8497d67d7 100644 --- a/tests/ledger_updates.rs +++ b/tests/ledger_updates.rs @@ -18,7 +18,7 @@ mod test_rand { ledger::{LedgerOutput, LedgerSpent, RentStructureBytes}, metadata::SpentMetadata, tangle::MilestoneIndexTimestamp, - utxo::{AddressUnlockCondition, BasicOutput, Output, OutputId}, + utxo::{unlock_condition::AddressUnlockCondition, BasicOutput, Output, OutputId}, BlockId, }, };