Skip to content

Issue 392 get all spenders info #424

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Sep 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion primitives/src/sentry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub mod message {

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
#[serde(try_from = "MessageTypes", into = "MessageTypes")]
pub struct Message<T: Type>(T);
pub struct Message<T: Type>(pub T);

impl<T: Type> Message<T> {
pub fn new(message: T) -> Self {
Expand Down Expand Up @@ -221,6 +221,14 @@ pub struct SpenderResponse {
pub spender: Spender,
}

#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct AllSpendersResponse {
pub spenders: HashMap<Address, Spender>,
#[serde(flatten)]
pub pagination: Pagination,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct ValidatorMessage {
pub from: ValidatorId,
Expand Down
16 changes: 16 additions & 0 deletions sentry/src/db/spendable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,22 @@ pub async fn fetch_spendable(
Ok(row.map(Spendable::from))
}

static GET_ALL_SPENDERS_STATEMENT: &str = "SELECT spender, channel_id, channel, total, still_on_create2 FROM spendable WHERE channel_id = $1";

// TODO: Include pagination
pub async fn get_all_spendables_for_channel(
pool: DbPool,
channel_id: &ChannelId,
) -> Result<Vec<Spendable>, PoolError> {
let client = pool.get().await?;
let statement = client.prepare(GET_ALL_SPENDERS_STATEMENT).await?;

let rows = client.query(&statement, &[channel_id]).await?;
let spendables: Vec<Spendable> = rows.into_iter().map(Spendable::from).collect();

Ok(spendables)
}

static UPDATE_SPENDABLE_STATEMENT: &str = "INSERT INTO spendable(spender, channel_id, channel, total, still_on_create2) VALUES($1, $2, $3, $4, $5) ON CONFLICT ON CONSTRAINT spendable_pkey DO UPDATE SET total = $4, still_on_create2 = $5 WHERE spendable.spender = $1 AND spendable.channel_id = $2 RETURNING spender, channel_id, channel, total, still_on_create2";

// Updates spendable entry deposit or inserts a new spendable entry if it doesn't exist
Expand Down
22 changes: 20 additions & 2 deletions sentry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use routes::analytics::{advanced_analytics, advertiser_analytics, analytics, pub
use routes::campaign::{create_campaign, update_campaign};
use routes::cfg::config;
use routes::channel::{
channel_list, channel_validate, create_channel, create_validator_messages, get_spender_limits,
last_approved,
channel_list, channel_validate, create_channel, create_validator_messages,
get_all_spender_limits, get_spender_limits, last_approved,
};
use slog::Logger;
use std::collections::HashMap;
Expand Down Expand Up @@ -76,6 +76,10 @@ static CLOSE_CAMPAIGN_BY_CAMPAIGN_ID: Lazy<Regex> = Lazy::new(|| {
static CAMPAIGN_UPDATE_BY_ID: Lazy<Regex> = Lazy::new(|| {
Regex::new(r"^/v5/campaign/0x([a-zA-Z0-9]{32})/?$").expect("The regex should be valid")
});
static CHANNEL_ALL_SPENDER_LIMITS: Lazy<Regex> = Lazy::new(|| {
Regex::new(r"^/v5/channel/0x([a-zA-Z0-9]{64})/spender/all/?$")
.expect("The regex should be valid")
});

#[derive(Debug, Clone)]
pub struct RouteParams(pub Vec<String>);
Expand Down Expand Up @@ -393,6 +397,20 @@ async fn channels_router<A: Adapter + 'static>(
.await?;

get_spender_limits(req, app).await
} else if let (Some(caps), &Method::GET) = (CHANNEL_ALL_SPENDER_LIMITS.captures(&path), method)
{
let param = RouteParams(vec![caps
.get(1)
.map_or("".to_string(), |m| m.as_str().to_string())]);
req.extensions_mut().insert(param);

req = Chain::new()
.chain(AuthRequired)
.chain(ChannelLoad)
.apply(req, app)
.await?;

get_all_spender_limits(req, app).await
} else {
Err(ResponseError::NotFound)
}
Expand Down
113 changes: 95 additions & 18 deletions sentry/src/routes/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::db::{
latest_new_state_v5,
},
get_channel_by_id, insert_channel, insert_validator_messages, list_channels,
spendable::{fetch_spendable, update_spendable},
spendable::{fetch_spendable, get_all_spendables_for_channel, update_spendable},
DbPool, PoolError,
};
use crate::{success_response, Application, Auth, ResponseError, RouteParams};
Expand All @@ -13,18 +13,19 @@ use hex::FromHex;
use hyper::{Body, Request, Response};
use primitives::{
adapter::Adapter,
balances::UncheckedState,
balances::{CheckedState, UncheckedState},
channel_v5::Channel as ChannelV5,
config::TokenInfo,
sentry::{
channel_list::{ChannelListQuery, LastApprovedQuery},
LastApproved, LastApprovedResponse, SpenderResponse, SuccessResponse,
AllSpendersResponse, LastApproved, LastApprovedResponse, Pagination, SpenderResponse,
SuccessResponse,
},
spender::{Deposit, Spendable, Spender, SpenderLeaf},
validator::MessageTypes,
validator::{MessageTypes, NewState},
Address, Channel, ChannelId, UnifiedNum,
};
use slog::error;
use slog::{error, Logger};
use std::{collections::HashMap, str::FromStr};
use tokio_postgres::error::SqlState;

Expand Down Expand Up @@ -287,10 +288,10 @@ pub async fn get_spender_limits<A: Adapter + 'static>(
.expect("Request should have Channel")
.to_owned();

let channel_id = channel.id();
let spender = Address::from_str(&route_params.index(1))?;

let latest_spendable = fetch_spendable(app.pool.clone(), &spender, &channel_id).await?;
let latest_spendable = fetch_spendable(app.pool.clone(), &spender, &channel.id()).await?;

let token_info = app
.config
.token_address_whitelist
Expand All @@ -311,21 +312,12 @@ pub async fn get_spender_limits<A: Adapter + 'static>(
}
};

let approve_state = match latest_approve_state_v5(&app.pool, &channel).await? {
Some(approve_state) => approve_state,
None => return spender_response_without_leaf(latest_spendable.deposit.total),
};

let state_root = approve_state.msg.state_root.clone();

let new_state = match latest_new_state_v5(&app.pool, &channel, &state_root).await? {
let new_state = match get_corresponding_new_state(&app.pool, &app.logger, &channel).await? {
Some(new_state) => new_state,
None => return spender_response_without_leaf(latest_spendable.deposit.total),
};

let new_state_checked = new_state.msg.into_inner().try_checked()?;

let total_spent = new_state_checked.balances.spenders.get(&spender);
let total_spent = new_state.balances.spenders.get(&spender);

let spender_leaf = total_spent.map(|total_spent| SpenderLeaf {
total_spent: *total_spent,
Expand All @@ -342,6 +334,91 @@ pub async fn get_spender_limits<A: Adapter + 'static>(
Ok(success_response(serde_json::to_string(&res)?))
}

pub async fn get_all_spender_limits<A: Adapter + 'static>(
req: Request<Body>,
app: &Application<A>,
) -> Result<Response<Body>, ResponseError> {
let channel = req
.extensions()
.get::<ChannelV5>()
.expect("Request should have Channel")
.to_owned();

let new_state = get_corresponding_new_state(&app.pool, &app.logger, &channel).await?;

let mut all_spender_limits: HashMap<Address, Spender> = HashMap::new();

let all_spendables = get_all_spendables_for_channel(app.pool.clone(), &channel.id()).await?;

// Using for loop to avoid async closures
for spendable in all_spendables {
let spender = spendable.spender;
let spender_leaf = match new_state {
Some(ref new_state) => new_state.balances.spenders.get(&spender).map(|balance| {
SpenderLeaf {
total_spent: spendable
.deposit
.total
.checked_sub(balance)
.unwrap_or_default(),
// merkle_proof: [u8; 32], // TODO
}
}),
None => None,
};

let spender_info = Spender {
total_deposited: spendable.deposit.total,
spender_leaf,
};

all_spender_limits.insert(spender, spender_info);
}

let res = AllSpendersResponse {
spenders: all_spender_limits,
pagination: Pagination {
// TODO
page: 1,
total: 1,
total_pages: 1,
},
};

Ok(success_response(serde_json::to_string(&res)?))
}

async fn get_corresponding_new_state(
pool: &DbPool,
logger: &Logger,
channel: &ChannelV5,
) -> Result<Option<NewState<CheckedState>>, ResponseError> {
let approve_state = match latest_approve_state_v5(pool, channel).await? {
Some(approve_state) => approve_state,
None => return Ok(None),
};

let state_root = approve_state.msg.state_root.clone();

let new_state = match latest_new_state_v5(pool, channel, &state_root).await? {
Some(new_state) => {
let new_state = new_state.msg.into_inner().try_checked().map_err(|err| {
error!(&logger, "Balances are not aligned in an approved NewState: {}", &err; "module" => "get_spender_limits");
ResponseError::BadRequest("Balances are not aligned in an approved NewState".to_string())
})?;
Ok(Some(new_state))
}
None => {
error!(&logger, "{}", "Fatal error! The NewState for the last ApproveState was not found"; "module" => "get_spender_limits");
return Err(ResponseError::BadRequest(
"Fatal error! The NewState for the last ApproveState was not found".to_string(),
));
}
};

new_state
}

#[cfg(test)]
mod test {
use super::*;
Expand Down