Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] feat(analytics): add richest addresses measurements to analytics #1226

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 65 additions & 65 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,40 +7,40 @@ services:
command: ["--quiet", "--logpath", "/dev/null"]
volumes:
- ./data/chronicle/mongodb:/data/db
environment:
- MONGO_INITDB_ROOT_USERNAME=${MONGODB_USERNAME}
- MONGO_INITDB_ROOT_PASSWORD=${MONGODB_PASSWORD}
# environment:
# - MONGO_INITDB_ROOT_USERNAME=${MONGODB_USERNAME}
# - MONGO_INITDB_ROOT_PASSWORD=${MONGODB_PASSWORD}
ports:
- 27017:27017

inx-chronicle:
container_name: inx-chronicle
depends_on:
influx:
condition: service_started
hornet:
condition: service_healthy
build:
context: ..
dockerfile: docker/Dockerfile.debug
image: inx-chronicle:dev
ports:
- "8042:8042/tcp" # REST API
- "9100:9100/tcp" # Metrics
tty: true
deploy:
restart_policy:
condition: on-failure
delay: 5s
max_attempts: 3
command:
- "--mongodb-conn-str=${MONGODB_CONN_STR}"
- "--influxdb-url=http://influx:8086"
- "--influxdb-username=${INFLUXDB_USERNAME}"
- "--influxdb-password=${INFLUXDB_PASSWORD}"
- "--inx-url=http://hornet:9029"
- "--jwt-password=${JWT_PASSWORD}"
- "--jwt-salt=${JWT_SALT}"
# inx-chronicle:
Copy link

@shufps shufps May 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can create a copy of docker-compose.yml and use it with docker-compose -f docker-compose-copy.yml. This way you don't need to revert the commented lines before push.

# container_name: inx-chronicle
# depends_on:
# influx:
# condition: service_started
# hornet:
# condition: service_healthy
# build:
# context: ..
# dockerfile: docker/Dockerfile.debug
# image: inx-chronicle:dev
# ports:
# - "8042:8042/tcp" # REST API
# - "9100:9100/tcp" # Metrics
# tty: true
# deploy:
# restart_policy:
# condition: on-failure
# delay: 5s
# max_attempts: 3
# command:
# - "--mongodb-conn-str=${MONGODB_CONN_STR}"
# - "--influxdb-url=http://influx:8086"
# - "--influxdb-username=${INFLUXDB_USERNAME}"
# - "--influxdb-password=${INFLUXDB_PASSWORD}"
# - "--inx-url=http://hornet:9029"
# - "--jwt-password=${JWT_PASSWORD}"
# - "--jwt-salt=${JWT_SALT}"

influx:
image: influxdb:1.8
Expand All @@ -55,39 +55,39 @@ services:
ports:
- 8086:8086

hornet:
image: iotaledger/hornet:2.0-rc
container_name: hornet
ulimits:
nofile:
soft: 8192
hard: 8192
stop_grace_period: 5m
ports:
- "15600:15600/tcp" # Gossip
- "14626:14626/udp" # Autopeering
- "14265:14265/tcp" # REST API
- "8081:8081/tcp" # Dashboard
- "8091:8091/tcp" # Faucet
- "9311:9311/tcp" # Prometheus
- "9029:9029/tcp" # INX
cap_drop:
- ALL
volumes:
- ./data/hornet/alphanet/:/app/alphanet
- ./data/hornet/testnet/:/app/testnet
- ./data/hornet/shimmer/:/app/shimmer
- ./config.testnet.hornet.json:/app/config_testnet.json:ro
- ./config.alphanet.hornet.json:/app/config_alphanet.json:ro
command:
# We can connect to the non-default networks by choosing a different Hornet configuration file.
# - "-c"
# - "config_testnet.json"
# - "config_alphanet.json"
- "--inx.enabled=true"
- "--inx.bindAddress=hornet:9029"
- "--prometheus.enabled=true"
- "--prometheus.bindAddress=0.0.0.0:9311"
# hornet:
# image: iotaledger/hornet:2.0-rc
# container_name: hornet
# ulimits:
# nofile:
# soft: 8192
# hard: 8192
# stop_grace_period: 5m
# ports:
# - "15600:15600/tcp" # Gossip
# - "14626:14626/udp" # Autopeering
# - "14265:14265/tcp" # REST API
# - "8081:8081/tcp" # Dashboard
# - "8091:8091/tcp" # Faucet
# - "9311:9311/tcp" # Prometheus
# - "9029:9029/tcp" # INX
# cap_drop:
# - ALL
# volumes:
# - ./data/hornet/alphanet/:/app/alphanet
# - ./data/hornet/testnet/:/app/testnet
# - ./data/hornet/shimmer/:/app/shimmer
# - ./config.testnet.hornet.json:/app/config_testnet.json:ro
# - ./config.alphanet.hornet.json:/app/config_alphanet.json:ro
# command:
# # We can connect to the non-default networks by choosing a different Hornet configuration file.
# # - "-c"
# # - "config_testnet.json"
# # - "config_alphanet.json"
# - "--inx.enabled=true"
# - "--inx.bindAddress=hornet:9029"
# - "--prometheus.enabled=true"
# - "--prometheus.bindAddress=0.0.0.0:9311"

################################################################################
# The following services can be enabled by setting the `debug` profile.
Expand Down Expand Up @@ -139,7 +139,7 @@ services:
ports:
- 9216:9261
command:
- "--mongodb.uri=mongodb://${MONGODB_USERNAME}:${MONGODB_PASSWORD}@mongo:27017"
- "--mongodb.uri=mongodb://mongo:27017"
- "--mongodb.direct-connect=true"
- "--web.listen-address=:9216"
- "--log.level=info"
Expand Down
95 changes: 66 additions & 29 deletions src/analytics/influx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@ use influxdb::{InfluxDbWriteable, WriteQuery};

use super::{
ledger::{
AddressActivityMeasurement, AddressBalanceMeasurement, BaseTokenActivityMeasurement, LedgerOutputMeasurement,
LedgerSizeMeasurement, OutputActivityMeasurement, TransactionSizeMeasurement, UnclaimedTokenMeasurement,
UnlockConditionMeasurement,
AddressActivityMeasurement, AddressBalanceMeasurement, AddressesWithBalanceMeasurement,
BaseTokenActivityMeasurement, LedgerOutputMeasurement, LedgerSizeMeasurement, OutputActivityMeasurement,
TransactionSizeMeasurement, UnclaimedTokenMeasurement, UnlockConditionMeasurement,
},
tangle::{BlockActivityMeasurement, MilestoneSizeMeasurement},
AnalyticsInterval, PerInterval, PerMilestone,
};
use crate::{db::influxdb::InfluxDb, model::ProtocolParameters};
use crate::{
db::influxdb::InfluxDb,
model::{payload::milestone::MilestoneIndexTimestamp, ProtocolParameters},
};

/// A trait that defines an InfluxDb measurement.
trait Measurement {
Expand Down Expand Up @@ -46,45 +49,69 @@ impl<M: Measurement> AddFields<M> for WriteQuery {
}
}

pub trait PrepareQuery: Send + Sync {
fn prepare_query(&self) -> Vec<WriteQuery>;
pub trait PerMilestoneQuery: Send + Sync {
fn per_milestone_query(&self, at: MilestoneIndexTimestamp) -> Vec<WriteQuery>;
}

impl<T: PrepareQuery + ?Sized> PrepareQuery for Box<T> {
fn prepare_query(&self) -> Vec<WriteQuery> {
(**self).prepare_query()
impl PerMilestoneQuery for Box<dyn PerMilestoneQuery> {
fn per_milestone_query(&self, at: MilestoneIndexTimestamp) -> Vec<WriteQuery> {
(&**self).per_milestone_query(at)
}
}

impl<M: Send + Sync> PrepareQuery for PerMilestone<M>
where
M: Measurement,
{
fn prepare_query(&self) -> Vec<WriteQuery> {
impl<M: Measurement + Send + Sync> PerMilestoneQuery for M {
fn per_milestone_query(&self, at: MilestoneIndexTimestamp) -> Vec<WriteQuery> {
vec![
influxdb::Timestamp::from(self.at.milestone_timestamp)
influxdb::Timestamp::from(at.milestone_timestamp)
.into_query(M::NAME)
.add_field("milestone_index", self.at.milestone_index)
.add_fields(&self.inner),
.add_field("milestone_index", at.milestone_index)
.add_fields(self),
]
}
}

impl<T: PrepareQuery> PrepareQuery for PerMilestone<Vec<T>> {
impl<T: PerMilestoneQuery + Send + Sync> PerMilestoneQuery for Vec<T> {
fn per_milestone_query(&self, at: MilestoneIndexTimestamp) -> Vec<WriteQuery> {
self.iter().flat_map(|inner| inner.per_milestone_query(at)).collect()
}
}

impl<T: PerMilestoneQuery + Send + Sync> PerMilestoneQuery for Option<T> {
fn per_milestone_query(&self, at: MilestoneIndexTimestamp) -> Vec<WriteQuery> {
self.iter().flat_map(|inner| inner.per_milestone_query(at)).collect()
}
}

macro_rules! impl_per_milestone_query_tuple {
($($idx:tt $t:tt),+) => {
impl<$($t: PerMilestoneQuery + Send + Sync),*> PerMilestoneQuery for ($($t),*,)
{
fn per_milestone_query(&self, at: MilestoneIndexTimestamp) -> Vec<WriteQuery> {
let mut queries = Vec::new();
$(
queries.extend(self.$idx.per_milestone_query(at));
)*
queries
}
}
};
}
// Right now we only need this one
impl_per_milestone_query_tuple!(0 T0, 1 T1);

pub(crate) trait PrepareQuery: Send + Sync {
fn prepare_query(&self) -> Vec<WriteQuery>;
}

impl<T: PrepareQuery + ?Sized> PrepareQuery for Box<T> {
fn prepare_query(&self) -> Vec<WriteQuery> {
self.inner.iter().flat_map(|inner| inner.prepare_query()).collect()
(**self).prepare_query()
}
}

impl<M: Send + Sync> PrepareQuery for PerMilestone<Option<M>>
where
M: Measurement,
{
impl<T: PerMilestoneQuery + Send + Sync> PrepareQuery for PerMilestone<T> {
fn prepare_query(&self) -> Vec<WriteQuery> {
self.inner
.iter()
.flat_map(|inner| PerMilestone { at: self.at, inner }.prepare_query())
.collect()
self.inner.per_milestone_query(self.at)
}
}

Expand All @@ -101,8 +128,8 @@ where
}
}

impl Measurement for AddressBalanceMeasurement {
const NAME: &'static str = "stardust_addresses";
impl Measurement for AddressesWithBalanceMeasurement {
const NAME: &'static str = "stardust_addresses_with_balance";

fn add_fields(&self, query: WriteQuery) -> WriteQuery {
let mut query = query.add_field("address_with_balance_count", self.address_with_balance_count as u64);
Expand All @@ -115,6 +142,16 @@ impl Measurement for AddressBalanceMeasurement {
}
}

impl Measurement for AddressBalanceMeasurement {
const NAME: &'static str = "stardust_address_balance";

fn add_fields(&self, query: WriteQuery) -> WriteQuery {
query
.add_tag("address", self.address.clone())
.add_field("balance", self.balance.0)
}
}

impl Measurement for BaseTokenActivityMeasurement {
const NAME: &'static str = "stardust_base_token_activity";

Expand Down
34 changes: 28 additions & 6 deletions src/analytics/ledger/address_balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use super::*;
use crate::model::utxo::{Address, TokenAmount};

#[derive(Debug)]
pub(crate) struct AddressBalanceMeasurement {
pub(crate) struct AddressesWithBalanceMeasurement {
pub(crate) address_with_balance_count: usize,
pub(crate) token_distribution: Vec<DistributionStat>,
}
Expand All @@ -21,6 +21,13 @@ pub(crate) struct DistributionStat {
pub(crate) total_amount: TokenAmount,
}

/// Statistics for an address's balance.
#[derive(Clone, Debug)]
pub(crate) struct AddressBalanceMeasurement {
pub(crate) address: String,
pub(crate) balance: TokenAmount,
}

/// Computes the number of addresses the currently hold a balance.
#[derive(Serialize, Deserialize)]
pub(crate) struct AddressBalancesAnalytics {
Expand All @@ -41,7 +48,7 @@ impl AddressBalancesAnalytics {
}

impl Analytics for AddressBalancesAnalytics {
type Measurement = AddressBalanceMeasurement;
type Measurement = (AddressesWithBalanceMeasurement, Vec<AddressBalanceMeasurement>);

fn handle_transaction(&mut self, consumed: &[LedgerSpent], created: &[LedgerOutput], _ctx: &dyn AnalyticsContext) {
for output in consumed {
Expand Down Expand Up @@ -74,9 +81,24 @@ impl Analytics for AddressBalancesAnalytics {
token_distribution[index].address_count += 1;
token_distribution[index].total_amount += *amount;
}
AddressBalanceMeasurement {
address_with_balance_count: self.balances.len(),
token_distribution,
}

let mut sorted_addresses = self.balances.iter().collect::<Vec<_>>();
sorted_addresses.sort_unstable_by_key(|(_, v)| v.0);
let bech32_hrp = &ctx.protocol_params().bech32_hrp;
(
AddressesWithBalanceMeasurement {
address_with_balance_count: self.balances.len(),
token_distribution,
},
sorted_addresses
.iter()
.rev()
.take(100)
.map(|(a, v)| AddressBalanceMeasurement {
address: a.to_bech32(bech32_hrp),
balance: **v,
})
.collect(),
)
}
}
2 changes: 1 addition & 1 deletion src/analytics/ledger/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};

pub(super) use self::{
active_addresses::{AddressActivityAnalytics, AddressActivityMeasurement},
address_balance::{AddressBalanceMeasurement, AddressBalancesAnalytics},
address_balance::{AddressBalanceMeasurement, AddressBalancesAnalytics, AddressesWithBalanceMeasurement},
base_token::BaseTokenActivityMeasurement,
ledger_outputs::LedgerOutputMeasurement,
ledger_size::{LedgerSizeAnalytics, LedgerSizeMeasurement},
Expand Down
Loading