From 115b7b0bef98e4398ed0e17b2e34021e536b557d Mon Sep 17 00:00:00 2001 From: Lachezar Lechev Date: Wed, 23 Jun 2021 12:09:51 +0300 Subject: [PATCH 01/12] sentry - clean up Events code in Channel & access --- primitives/src/sentry.rs | 3 +-- sentry/src/access.rs | 22 +++++----------------- sentry/src/payout.rs | 2 +- sentry/src/routes/channel.rs | 4 ++-- 4 files changed, 9 insertions(+), 22 deletions(-) diff --git a/primitives/src/sentry.rs b/primitives/src/sentry.rs index afd8a6214..dbfc64740 100644 --- a/primitives/src/sentry.rs +++ b/primitives/src/sentry.rs @@ -1,7 +1,6 @@ use crate::{ - targeting::Rules, validator::{ApproveState, Heartbeat, MessageTypes, NewState, Type as MessageType}, - Address, BalancesMap, BigNum, Channel, ChannelId, ValidatorId, IPFS, + Address, BigNum, Channel, ChannelId, ValidatorId, IPFS, }; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; diff --git a/sentry/src/access.rs b/sentry/src/access.rs index cc49caf7c..a3fca1578 100644 --- a/sentry/src/access.rs +++ b/sentry/src/access.rs @@ -13,7 +13,7 @@ use thiserror::Error; #[derive(Debug, PartialEq, Eq, Error)] pub enum Error { - #[error("channel is expired")] + #[error("Campaign is expired")] CampaignIsExpired, #[error("event submission restricted")] ForbiddenReferrer, @@ -37,14 +37,7 @@ pub async fn check_access( if current_time > campaign.active.to { return Err(Error::CampaignIsExpired); } - - let (is_creator, auth_uid) = match auth { - Some(auth) => ( - auth.uid.to_address() == campaign.creator, - auth.uid.to_string(), - ), - None => (false, Default::default()), - }; + let auth_uid = auth.map(|auth| auth.uid.to_string()).unwrap_or_default(); // Rules for events if forbidden_country(&session) || forbidden_referrer(&session) { @@ -94,11 +87,7 @@ pub async fn check_access( ) })); - if let Err(rule_error) = apply_all_rules.await { - Err(Error::RulesError(rule_error)) - } else { - Ok(()) - } + apply_all_rules.await.map_err(Error::RulesError).map(|_| ()) } async fn apply_rule( @@ -188,9 +177,8 @@ mod test { config::configuration, event_submission::{RateLimit, Rule}, sentry::Event, - targeting::Rules, - util::tests::prep_db::{ADDRESSES, DUMMY_CAMPAIGN, DUMMY_CHANNEL, IDS}, - Channel, Config, EventSubmission, + util::tests::prep_db::{ADDRESSES, DUMMY_CAMPAIGN, IDS}, + Config, EventSubmission, }; use deadpool::managed::Object; diff --git a/sentry/src/payout.rs b/sentry/src/payout.rs index e0ad94930..29373aa28 100644 --- a/sentry/src/payout.rs +++ b/sentry/src/payout.rs @@ -11,6 +11,7 @@ use std::cmp::{max, min}; pub type Result = std::result::Result, Error>; +/// If None is returned this means that the targeting rules evaluation has set `show = false` pub fn get_payout( logger: &Logger, campaign: &Campaign, @@ -88,7 +89,6 @@ pub fn get_payout( } } } - _ => Ok(None), } } diff --git a/sentry/src/routes/channel.rs b/sentry/src/routes/channel.rs index 304bd3432..ca337b6f7 100644 --- a/sentry/src/routes/channel.rs +++ b/sentry/src/routes/channel.rs @@ -3,7 +3,7 @@ use crate::db::{ get_channel_by_id, insert_channel, insert_validator_messages, list_channels, update_exhausted_channel, PoolError, }; -use crate::{success_response, Application, Auth, ResponseError, RouteParams, Session}; +use crate::{success_response, Application, Auth, ResponseError, RouteParams}; use futures::future::try_join_all; use hex::FromHex; use hyper::{Body, Request, Response}; @@ -11,7 +11,7 @@ use primitives::{ adapter::Adapter, sentry::{ channel_list::{ChannelListQuery, LastApprovedQuery}, - Event, LastApproved, LastApprovedResponse, SuccessResponse, + LastApproved, LastApprovedResponse, SuccessResponse, }, validator::MessageTypes, Channel, ChannelId, From ccb1eac52f289a2bca604daee013a0120a5da9bf Mon Sep 17 00:00:00 2001 From: Lachezar Lechev Date: Wed, 30 Jun 2021 15:08:50 +0300 Subject: [PATCH 02/12] sentry - Cargo - add `postgres-types` `derive` feature --- Cargo.lock | 13 +++++++++++++ sentry/Cargo.toml | 1 + 2 files changed, 14 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 57dbdc1fe..f27f5d2a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2729,6 +2729,17 @@ dependencies = [ "tokio-postgres", ] +[[package]] +name = "postgres-derive" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c857dd221cb0e7d8414b894a0ce29eae44d453dda0baa132447878e75e701477" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "postgres-native-tls" version = "0.5.0" @@ -2769,6 +2780,7 @@ dependencies = [ "bytes 1.0.1", "chrono", "fallible-iterator", + "postgres-derive", "postgres-protocol", "serde", "serde_json", @@ -3496,6 +3508,7 @@ dependencies = [ "lazy_static", "migrant_lib", "once_cell", + "postgres-types", "primitives", "redis", "regex", diff --git a/sentry/Cargo.toml b/sentry/Cargo.toml index 7e25ca25c..746ee77da 100644 --- a/sentry/Cargo.toml +++ b/sentry/Cargo.toml @@ -29,6 +29,7 @@ redis = { version = "0.19", features = ["aio", "tokio-comp"] } deadpool = "0.8.0" deadpool-postgres = "0.8.0" tokio-postgres = { version = "0.7.0", features = ["with-chrono-0_4", "with-serde_json-1"] } +postgres-types = { version = "0.2.1", features = ["derive", "with-chrono-0_4", "with-serde_json-1"] } # Migrations migrant_lib = { version = "^0.32", features = ["d-postgres"] } From 99807db8fbe6b036355f9a417612761aa133fb16 Mon Sep 17 00:00:00 2001 From: Lachezar Lechev Date: Wed, 30 Jun 2021 15:09:06 +0300 Subject: [PATCH 03/12] sentry - migrations - accounting - flat structure --- .../migrations/20190806011140_initial-tables/up.sql | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/sentry/migrations/20190806011140_initial-tables/up.sql b/sentry/migrations/20190806011140_initial-tables/up.sql index 710e44898..f611fafd0 100644 --- a/sentry/migrations/20190806011140_initial-tables/up.sql +++ b/sentry/migrations/20190806011140_initial-tables/up.sql @@ -68,13 +68,16 @@ CREATE AGGREGATE jsonb_object_agg (jsonb) ( INITCOND = '{}' ); +CREATE TYPE AccountingSide AS ENUM ('Earner', 'Spender'); + CREATE TABLE accounting ( channel_id varchar(66) NOT NULL, - channel jsonb NOT NULL, - earners jsonb DEFAULT '{}' NULL, - spenders jsonb DEFAULT '{}' NULL, + side AccountingSide NOT NULL, + "address" varchar(42) NOT NULL, + amount bigint NOT NULL, updated timestamp(2) with time zone DEFAULT NULL NULL, created timestamp(2) with time zone NOT NULL, - PRIMARY KEY (channel_id) -) \ No newline at end of file + -- Do not rename the Primary key constraint (`accounting_pkey`)! + PRIMARY KEY (channel_id, side, "address") +); \ No newline at end of file From a3eb1017a7e10505e98aa25d7f050840a13aa74e Mon Sep 17 00:00:00 2001 From: Lachezar Lechev Date: Wed, 30 Jun 2021 15:15:30 +0300 Subject: [PATCH 04/12] sentry - db - accounting follows flat struture --- primitives/src/sentry/accounting.rs | 45 +--- sentry/src/db/accounting.rs | 319 +++++++++++++++++++++++----- 2 files changed, 280 insertions(+), 84 deletions(-) diff --git a/primitives/src/sentry/accounting.rs b/primitives/src/sentry/accounting.rs index 6c5b1b88d..64589ce23 100644 --- a/primitives/src/sentry/accounting.rs +++ b/primitives/src/sentry/accounting.rs @@ -1,9 +1,6 @@ -use std::{ - convert::TryFrom, - marker::PhantomData, -}; +use std::{convert::TryFrom, marker::PhantomData}; -use crate::{balances_map::UnifiedMap, Address, channel_v5::Channel, UnifiedNum}; +use crate::{balances_map::UnifiedMap, channel_v5::Channel, Address, UnifiedNum}; use chrono::{DateTime, Utc}; use serde::{Deserialize, Deserializer, Serialize}; use thiserror::Error; @@ -73,9 +70,11 @@ impl Balances { } } -#[derive(Debug)] +#[derive(Debug, Error)] pub enum OverflowError { + #[error("Spender {0} amount overflowed")] Spender(Address), + #[error("Earner {0} amount overflowed")] Earner(Address), } @@ -130,7 +129,8 @@ mod de { Ok(Self { channel: de_acc.channel, - balances: Balances::::try_from(de_acc.balances).map_err(serde::de::Error::custom)?, + balances: Balances::::try_from(de_acc.balances) + .map_err(serde::de::Error::custom)?, created: de_acc.created, updated: de_acc.updated, }) @@ -146,7 +146,10 @@ mod de { Ok(Self { channel: unchecked_acc.channel, - balances: unchecked_acc.balances.check().map_err(serde::de::Error::custom)?, + balances: unchecked_acc + .balances + .check() + .map_err(serde::de::Error::custom)?, created: unchecked_acc.created, updated: unchecked_acc.updated, }) @@ -195,29 +198,3 @@ mod de { } } } - -#[cfg(feature = "postgres")] -mod postgres { - use super::*; - use postgres_types::Json; - use tokio_postgres::Row; - - impl TryFrom<&Row> for Accounting { - type Error = Error; - - fn try_from(row: &Row) -> Result { - let balances = Balances:: { - earners: row.get::<_, Json<_>>("earners").0, - spenders: row.get::<_, Json<_>>("spenders").0, - state: PhantomData::default(), - }.check()?; - - Ok(Self { - channel: row.get("channel"), - balances, - updated: row.get("updated"), - created: row.get("created"), - }) - } - } -} \ No newline at end of file diff --git a/sentry/src/db/accounting.rs b/sentry/src/db/accounting.rs index ffc8e3db4..fe1f0b0d4 100644 --- a/sentry/src/db/accounting.rs +++ b/sentry/src/db/accounting.rs @@ -1,12 +1,8 @@ -use std::convert::TryFrom; - use chrono::{DateTime, Utc}; use primitives::{ - channel_v5::Channel, - sentry::accounting::{Accounting, Balances, CheckedState}, Address, ChannelId, UnifiedNum, }; -use tokio_postgres::types::Json; +use tokio_postgres::{IsolationLevel, Row, types::{FromSql, ToSql}}; use super::{DbPool, PoolError}; use thiserror::Error; @@ -19,49 +15,117 @@ pub enum Error { Postgres(#[from] PoolError), } +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Accounting { + pub channel_id: ChannelId, + pub side: Side, + pub address: Address, + pub amount: UnifiedNum, + pub updated: Option>, + pub created: DateTime, +} + +impl From<&Row> for Accounting { + fn from(row: &Row) -> Self { + Self { + channel_id: row.get("channel_id"), + side: row.get("side"), + address: row.get("address"), + amount: row.get("amount"), + updated: row.get("updated"), + created: row.get("created"), + } + } +} + +#[derive(Debug, Clone, Copy, ToSql, FromSql, PartialEq, Eq)] +#[postgres(name = "accountingside")] +pub enum Side { + Earner, + Spender, +} + +pub enum SpendError { + Pool(PoolError), + NoRecordsUpdated, +} + /// ```text -/// SELECT (spenders ->> $1)::bigint as spent FROM accounting WHERE channel_id = $2 +/// SELECT channel_id, side, address, amount, updated, created FROM accounting WHERE channel_id = $1 AND address = $2 AND side = $3 /// ``` -/// This function returns the spent amount in a `Channel` of a given spender -pub async fn get_accounting_spent( +pub async fn get_accounting( pool: DbPool, - spender: &Address, - channel_id: &ChannelId, -) -> Result { + channel_id: ChannelId, + address: Address, + side: Side, +) -> Result, PoolError> { let client = pool.get().await?; let statement = client - .prepare("SELECT (spenders ->> $1)::bigint as spent FROM accounting WHERE channel_id = $2") + .prepare("SELECT channel_id, side, address, amount, updated, created FROM accounting WHERE channel_id = $1 AND address = $2 AND side = $3") .await?; - let row = client.query_one(&statement, &[spender, channel_id]).await?; + let row = client + .query_opt(&statement, &[&channel_id, &address, &side]) + .await?; - Ok(row.get("spent")) + Ok(row.as_ref().map(Accounting::from)) } -// TODO This is still WIP -#[allow(dead_code)] -async fn insert_accounting( +/// Will update current Spender/Earner amount or insert a new Accounting record +/// +/// See `UPDATE_ACCOUNTING_STATEMENT` static for full query. +static UPDATE_ACCOUNTING_STATEMENT: &str = "INSERT INTO accounting(channel_id, side, address, amount, updated, created) VALUES($1, $2, $3, $4, $5, $6) ON CONFLICT ON CONSTRAINT accounting_pkey DO UPDATE SET amount = accounting.amount + $4, updated = $6 WHERE accounting.channel_id = $1 AND accounting.side = $2 AND accounting.address = $3 RETURNING channel_id, side, address, amount, updated, created"; +pub async fn update_accounting( pool: DbPool, - channel: Channel, - balances: Balances, -) -> Result, Error> { + channel_id: ChannelId, + address: Address, + side: Side, + amount: UnifiedNum, +) -> Result { let client = pool.get().await?; + let statement = client + .prepare(UPDATE_ACCOUNTING_STATEMENT) + .await?; - let statement = client.prepare("INSERT INTO accounting (channel_id, channel, earners, spenders, updated, created) VALUES ($1, $2, $3, $4, $5, NOW()) RETURNING channel, earners, spenders, updated, created").await.map_err(PoolError::Backend)?; - - let earners = Json(&balances.earners); - let spenders = Json(&balances.spenders); + let now = Utc::now(); let updated: Option> = None; let row = client .query_one( &statement, - &[&channel.id(), &channel, &earners, &spenders, &updated], + &[&channel_id, &side, &address, &amount, &updated, &now], ) - .await - .map_err(PoolError::Backend)?; + .await?; + + Ok(Accounting::from(&row)) +} + +/// Will use `UPDATE_ACCOUNTING_STATEMENT` to create and run the query twice - once for Earner and once for Spender accounting. +/// +/// It runs both queries in a transaction in order to rollback if one of the queries fails. +pub async fn spend_accountings( + pool: DbPool, + channel_id: ChannelId, + earner: Address, + spender: Address, + amount: UnifiedNum, +) -> Result<(Accounting, Accounting), PoolError> { + let mut client = pool.get().await?; + + // The reads and writes in this transaction must be able to be committed as an atomic “unit” with respect to reads and writes of all other concurrent serializable transactions without interleaving. + let transaction = client.build_transaction().isolation_level(IsolationLevel::Serializable).start().await?; + + let statement = transaction.prepare(UPDATE_ACCOUNTING_STATEMENT).await?; + + let now = Utc::now(); + let updated: Option> = None; + + let earner_row = transaction.query_one(&statement, &[&channel_id, &Side::Earner, &earner, &amount, &updated, &now]).await?; + let spender_row = transaction.query_one(&statement, &[&channel_id, &Side::Spender, &spender, &amount, &updated, &now]).await?; + + transaction.commit().await?; - Accounting::try_from(&row).map_err(Error::Balances) + Ok((Accounting::from(&earner_row), Accounting::from(&spender_row))) } #[cfg(test)] @@ -73,40 +137,195 @@ mod test { use super::*; #[tokio::test] - async fn get_spent() { + async fn insert_update_and_get_accounting() { let database = DATABASE_POOL.get().await.expect("Should get a DB pool"); setup_test_migrations(database.pool.clone()) .await .expect("Migrations should succeed"); - let channel = DUMMY_CAMPAIGN.channel.clone(); - - let spender = ADDRESSES["creator"]; + let channel_id = DUMMY_CAMPAIGN.channel.id(); let earner = ADDRESSES["publisher"]; + let spender = ADDRESSES["creator"]; + + let amount = UnifiedNum::from(100_000_000); + let update_amount = UnifiedNum::from(200_000_000); - let mut balances = Balances::default(); - let spend_amount = UnifiedNum::from(100); - balances - .spend(spender, earner, spend_amount) - .expect("Should be ok"); + // Spender insert/update + { + let inserted = update_accounting( + database.pool.clone(), + channel_id, + spender, + Side::Spender, + amount, + ) + .await + .expect("Should insert"); + assert_eq!(spender, inserted.address); + assert_eq!(Side::Spender, inserted.side); + assert_eq!(UnifiedNum::from(100_000_000), inserted.amount); - let accounting = insert_accounting(database.pool.clone(), channel.clone(), balances) + let updated = update_accounting( + database.pool.clone(), + channel_id, + spender, + Side::Spender, + update_amount, + ) .await .expect("Should insert"); + assert_eq!(spender, updated.address); + assert_eq!(Side::Spender, updated.side); + assert_eq!( + UnifiedNum::from(300_000_000), + updated.amount, + "Should add the newly spent amount to the existing one" + ); + + let spent = get_accounting(database.pool.clone(), channel_id, spender, Side::Spender).await.expect("Should query for the updated accounting"); + assert_eq!(Some(updated), spent); + + let earned = get_accounting(database.pool.clone(), channel_id, spender, Side::Earner).await.expect("Should query for accounting"); + assert!(earned.is_none(), "Spender shouldn't have an earned amount"); + } - let spent = get_accounting_spent(database.pool.clone(), &spender, &channel.id()) + // Earner insert/update + { + let inserted = update_accounting( + database.pool.clone(), + channel_id, + earner, + Side::Earner, + amount, + ) .await - .expect("Should get spent"); - - assert_eq!(spend_amount, spent); - assert_eq!( - accounting - .balances - .spenders - .get(&spender) - .expect("Should contain value"), - &spent - ); + .expect("Should insert"); + assert_eq!(earner, inserted.address); + assert_eq!(Side::Earner, inserted.side); + assert_eq!(UnifiedNum::from(100_000_000), inserted.amount); + + let updated = update_accounting( + database.pool.clone(), + channel_id, + earner, + Side::Earner, + update_amount, + ) + .await + .expect("Should insert"); + assert_eq!(earner, updated.address); + assert_eq!(Side::Earner, updated.side); + assert_eq!( + UnifiedNum::from(300_000_000), + updated.amount, + "Should add the newly earned amount to the existing one" + ); + + let earned = get_accounting(database.pool.clone(), channel_id, earner, Side::Earner).await.expect("Should query for the updated accounting"); + assert_eq!(Some(updated), earned); + + let spent = get_accounting(database.pool.clone(), channel_id, earner, Side::Spender).await.expect("Should query for accounting"); + assert!(spent.is_none(), "Earner shouldn't have a spent amount"); + } + + + // Spender as Earner & another Spender + // Will test the previously spent amount as well! + { + let spender_as_earner = spender; + + let inserted = update_accounting( + database.pool.clone(), + channel_id, + spender_as_earner, + Side::Earner, + amount, + ) + .await + .expect("Should insert"); + assert_eq!(spender_as_earner, inserted.address); + assert_eq!(Side::Earner, inserted.side); + assert_eq!(UnifiedNum::from(100_000_000), inserted.amount); + + let updated = update_accounting( + database.pool.clone(), + channel_id, + spender_as_earner, + Side::Earner, + UnifiedNum::from(999), + ) + .await + .expect("Should insert"); + assert_eq!(spender, updated.address); + assert_eq!(Side::Earner, updated.side); + assert_eq!( + UnifiedNum::from(100_000_999), + updated.amount, + "Should add the newly spent amount to the existing one" + ); + + let earned_acc = get_accounting(database.pool.clone(), channel_id, spender_as_earner, Side::Earner).await.expect("Should query for earned accounting").expect("Should have Earned accounting for Spender as Earner"); + assert_eq!(UnifiedNum::from(100_000_999), earned_acc.amount); + + let spent_acc = get_accounting(database.pool.clone(), channel_id, spender_as_earner, Side::Spender).await.expect("Should query for spent accounting").expect("Should have Spent accounting for Spender as Earner"); + assert_eq!(UnifiedNum::from(300_000_000), spent_acc.amount); + + } + } + + #[tokio::test] + async fn test_spending_accountings() { + let database = DATABASE_POOL.get().await.expect("Should get a DB pool"); + + setup_test_migrations(database.pool.clone()) + .await + .expect("Migrations should succeed"); + + let channel_id = DUMMY_CAMPAIGN.channel.id(); + let earner = ADDRESSES["publisher"]; + let spender = ADDRESSES["creator"]; + let other_spender = ADDRESSES["tester"]; + + let amount = UnifiedNum::from(100_000_000); + let update_amount = UnifiedNum::from(200_000_000); + + // Spender & Earner insert + let (inserted_earner, inserted_spender) = spend_accountings(database.pool.clone(), channel_id, earner, spender, amount).await.expect("Should insert Earner and Spender"); + assert_eq!(earner, inserted_earner.address); + assert_eq!(Side::Earner, inserted_earner.side); + assert_eq!(UnifiedNum::from(100_000_000), inserted_earner.amount); + + assert_eq!(spender, inserted_spender.address); + assert_eq!(Side::Spender, inserted_spender.side); + assert_eq!(UnifiedNum::from(100_000_000), inserted_spender.amount); + + // Spender & Earner update + let (updated_earner, updated_spender) = spend_accountings(database.pool.clone(), channel_id, earner, spender, update_amount).await.expect("Should update Earner and Spender"); + + assert_eq!(earner, updated_earner.address); + assert_eq!(Side::Earner, updated_earner.side); + assert_eq!(UnifiedNum::from(300_000_000), updated_earner.amount, "Should add the newly earned amount to the existing one"); + + assert_eq!(spender, updated_spender.address); + assert_eq!(Side::Spender, updated_spender.side); + assert_eq!(UnifiedNum::from(300_000_000), updated_spender.amount, "Should add the newly spend amount to the existing one"); + + // Spender as an Earner & another spender + let (spender_as_earner, inserted_other_spender) = spend_accountings(database.pool.clone(), channel_id, spender, other_spender, UnifiedNum::from(999)).await.expect("Should update Spender as Earner and the Other Spender"); + + assert_eq!(spender, spender_as_earner.address); + assert_eq!(Side::Earner, spender_as_earner.side); + assert_eq!(UnifiedNum::from(999), spender_as_earner.amount, "Should add earner accounting for the previous Spender"); + + assert_eq!(other_spender, inserted_other_spender.address); + assert_eq!(Side::Spender, inserted_other_spender.side); + assert_eq!(UnifiedNum::from(999), inserted_other_spender.amount); + + let earned = get_accounting(database.pool.clone(), channel_id, spender, Side::Earner).await.expect("Should query for accounting").expect("Should have Earned accounting for Spender as Earner"); + assert_eq!(UnifiedNum::from(999), earned.amount); + + let spent = get_accounting(database.pool.clone(), channel_id, spender, Side::Spender).await.expect("Should query for accounting").expect("Should have Spent accounting for Spender as Earner"); + assert_eq!(UnifiedNum::from(300_000_000), spent.amount); } } From 5cb26efc6b5b71c612312d668f8b817e765ae376 Mon Sep 17 00:00:00 2001 From: Lachezar Lechev Date: Fri, 9 Jul 2021 09:04:38 +0300 Subject: [PATCH 05/12] sentry - prefix campaign routes with `/v5/` --- sentry/src/lib.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/sentry/src/lib.rs b/sentry/src/lib.rs index cb789481c..9289adad8 100644 --- a/sentry/src/lib.rs +++ b/sentry/src/lib.rs @@ -64,10 +64,10 @@ lazy_static! { } static INSERT_EVENTS_BY_CAMPAIGN_ID: Lazy = Lazy::new(|| { - Regex::new(r"^/campaign/0x([a-zA-Z0-9]{32})/events/?$").expect("The regex should be valid") + Regex::new(r"^/v5/campaign/0x([a-zA-Z0-9]{32})/events/?$").expect("The regex should be valid") }); static CLOSE_CAMPAIGN_BY_CAMPAIGN_ID: Lazy = Lazy::new(|| { - Regex::new(r"^/campaign/0x([a-zA-Z0-9]{32})/close/?$").expect("The regex should be valid") + Regex::new(r"^/v5/campaign/0x([a-zA-Z0-9]{32})/close/?$").expect("The regex should be valid") }); #[derive(Debug, Clone)] @@ -162,7 +162,7 @@ impl Application { // This is important because it prevents us from doing // expensive regex matching for routes without /channel (path, _) if path.starts_with("/channel") => channels_router(req, &self).await, - (path, _) if path.starts_with("/campaign") => campaigns_router(req, &self).await, + (path, _) if path.starts_with("/v5/campaign") => campaigns_router(req, &self).await, _ => Err(ResponseError::NotFound), } .unwrap_or_else(map_response_error); @@ -217,8 +217,6 @@ async fn analytics_router( ) -> Result, ResponseError> { let (route, method) = (req.uri().path(), req.method()); - - // TODO AIP#61: Add routes for: // - POST /channel/:id/pay // #[serde(rename_all = "camelCase")] @@ -295,9 +293,8 @@ async fn channels_router( req.extensions_mut().insert(param); insert_events(req, app).await - } else */ - if let (Some(caps), &Method::GET) = (LAST_APPROVED_BY_CHANNEL_ID.captures(&path), method) - { + } else */ + if let (Some(caps), &Method::GET) = (LAST_APPROVED_BY_CHANNEL_ID.captures(&path), method) { let param = RouteParams(vec![caps .get(1) .map_or("".to_string(), |m| m.as_str().to_string())]); From be363277ad05f85c03d60badbc1845efb06ee5af Mon Sep 17 00:00:00 2001 From: Lachezar Lechev Date: Mon, 12 Jul 2021 09:54:39 +0300 Subject: [PATCH 06/12] sentry - db - accounting - spend_amount with delta Balances --- primitives/src/campaign.rs | 2 +- sentry/src/db/accounting.rs | 364 +++++++++++++++++++++++++++++------- 2 files changed, 301 insertions(+), 65 deletions(-) diff --git a/primitives/src/campaign.rs b/primitives/src/campaign.rs index bbbb5b3c4..ad3ca0167 100644 --- a/primitives/src/campaign.rs +++ b/primitives/src/campaign.rs @@ -195,7 +195,7 @@ impl Campaign { } } - /// Matches the Channel.leader to the Campaign.spec.leader + /// Matches the Channel.leader to the Campaign.validators.leader /// If they match it returns `Some`, otherwise, it returns `None` pub fn leader(&self) -> Option<&'_ ValidatorDesc> { self.validators.find(&self.channel.leader) diff --git a/sentry/src/db/accounting.rs b/sentry/src/db/accounting.rs index fe1f0b0d4..db1588994 100644 --- a/sentry/src/db/accounting.rs +++ b/sentry/src/db/accounting.rs @@ -1,8 +1,12 @@ use chrono::{DateTime, Utc}; use primitives::{ + sentry::accounting::{Balances, CheckedState}, Address, ChannelId, UnifiedNum, }; -use tokio_postgres::{IsolationLevel, Row, types::{FromSql, ToSql}}; +use tokio_postgres::{ + types::{FromSql, ToSql}, + IsolationLevel, Row, +}; use super::{DbPool, PoolError}; use thiserror::Error; @@ -15,6 +19,12 @@ pub enum Error { Postgres(#[from] PoolError), } +impl From for Error { + fn from(error: tokio_postgres::Error) -> Self { + Self::Postgres(PoolError::Backend(error)) + } +} + #[derive(Debug, Clone, PartialEq, Eq)] pub struct Accounting { pub channel_id: ChannelId, @@ -83,9 +93,7 @@ pub async fn update_accounting( amount: UnifiedNum, ) -> Result { let client = pool.get().await?; - let statement = client - .prepare(UPDATE_ACCOUNTING_STATEMENT) - .await?; + let statement = client.prepare(UPDATE_ACCOUNTING_STATEMENT).await?; let now = Utc::now(); let updated: Option> = None; @@ -100,32 +108,64 @@ pub async fn update_accounting( Ok(Accounting::from(&row)) } -/// Will use `UPDATE_ACCOUNTING_STATEMENT` to create and run the query twice - once for Earner and once for Spender accounting. -/// -/// It runs both queries in a transaction in order to rollback if one of the queries fails. -pub async fn spend_accountings( +/// `delta_balances` defines the Balances that need to be added to the spending or earnings of the `Accounting`s. +/// It will **not** override the whole `Accounting` value +/// Returns a tuple of `(Vec, Vec)` +pub async fn spend_amount( pool: DbPool, channel_id: ChannelId, - earner: Address, - spender: Address, - amount: UnifiedNum, -) -> Result<(Accounting, Accounting), PoolError> { + delta_balances: Balances, +) -> Result<(Vec, Vec), PoolError> { let mut client = pool.get().await?; // The reads and writes in this transaction must be able to be committed as an atomic “unit” with respect to reads and writes of all other concurrent serializable transactions without interleaving. - let transaction = client.build_transaction().isolation_level(IsolationLevel::Serializable).start().await?; + let transaction = client + .build_transaction() + .isolation_level(IsolationLevel::Serializable) + .start() + .await?; let statement = transaction.prepare(UPDATE_ACCOUNTING_STATEMENT).await?; let now = Utc::now(); let updated: Option> = None; - let earner_row = transaction.query_one(&statement, &[&channel_id, &Side::Earner, &earner, &amount, &updated, &now]).await?; - let spender_row = transaction.query_one(&statement, &[&channel_id, &Side::Spender, &spender, &amount, &updated, &now]).await?; + let (mut earners, mut spenders) = (vec![], vec![]); + + // Earners + for (earner, amount) in delta_balances.earners { + let row = transaction + .query_one( + &statement, + &[&channel_id, &Side::Earner, &earner, &amount, &updated, &now], + ) + .await?; + + earners.push(Accounting::from(&row)) + } + + // Spenders + for (spender, amount) in delta_balances.spenders { + let row = transaction + .query_one( + &statement, + &[ + &channel_id, + &Side::Spender, + &spender, + &amount, + &updated, + &now, + ], + ) + .await?; + + spenders.push(Accounting::from(&row)) + } transaction.commit().await?; - Ok((Accounting::from(&earner_row), Accounting::from(&spender_row))) + Ok((earners, spenders)) } #[cfg(test)] @@ -183,10 +223,14 @@ mod test { "Should add the newly spent amount to the existing one" ); - let spent = get_accounting(database.pool.clone(), channel_id, spender, Side::Spender).await.expect("Should query for the updated accounting"); + let spent = get_accounting(database.pool.clone(), channel_id, spender, Side::Spender) + .await + .expect("Should query for the updated accounting"); assert_eq!(Some(updated), spent); - let earned = get_accounting(database.pool.clone(), channel_id, spender, Side::Earner).await.expect("Should query for accounting"); + let earned = get_accounting(database.pool.clone(), channel_id, spender, Side::Earner) + .await + .expect("Should query for accounting"); assert!(earned.is_none(), "Spender shouldn't have an earned amount"); } @@ -222,14 +266,17 @@ mod test { "Should add the newly earned amount to the existing one" ); - let earned = get_accounting(database.pool.clone(), channel_id, earner, Side::Earner).await.expect("Should query for the updated accounting"); + let earned = get_accounting(database.pool.clone(), channel_id, earner, Side::Earner) + .await + .expect("Should query for the updated accounting"); assert_eq!(Some(updated), earned); - let spent = get_accounting(database.pool.clone(), channel_id, earner, Side::Spender).await.expect("Should query for accounting"); + let spent = get_accounting(database.pool.clone(), channel_id, earner, Side::Spender) + .await + .expect("Should query for accounting"); assert!(spent.is_none(), "Earner shouldn't have a spent amount"); } - // Spender as Earner & another Spender // Will test the previously spent amount as well! { @@ -265,17 +312,64 @@ mod test { "Should add the newly spent amount to the existing one" ); - let earned_acc = get_accounting(database.pool.clone(), channel_id, spender_as_earner, Side::Earner).await.expect("Should query for earned accounting").expect("Should have Earned accounting for Spender as Earner"); + let earned_acc = get_accounting( + database.pool.clone(), + channel_id, + spender_as_earner, + Side::Earner, + ) + .await + .expect("Should query for earned accounting") + .expect("Should have Earned accounting for Spender as Earner"); assert_eq!(UnifiedNum::from(100_000_999), earned_acc.amount); - - let spent_acc = get_accounting(database.pool.clone(), channel_id, spender_as_earner, Side::Spender).await.expect("Should query for spent accounting").expect("Should have Spent accounting for Spender as Earner"); + + let spent_acc = get_accounting( + database.pool.clone(), + channel_id, + spender_as_earner, + Side::Spender, + ) + .await + .expect("Should query for spent accounting") + .expect("Should have Spent accounting for Spender as Earner"); assert_eq!(UnifiedNum::from(300_000_000), spent_acc.amount); - } } - + + fn assert_accounting( + expected: (Address, Side, UnifiedNum), + accounting: Accounting, + with_set_updated: bool, + // ) -> anyhow::Result<()> { + ) { + assert_eq!( + expected.0, accounting.address, + "Accounting address is not the same" + ); + assert_eq!( + expected.1, accounting.side, + "Accounting side is not the same" + ); + assert_eq!( + expected.2, accounting.amount, + "Accounting amount is not the same" + ); + + if with_set_updated { + assert!( + accounting.updated.is_some(), + "Accounting should have been updated" + ) + } else { + assert!( + accounting.updated.is_none(), + "Accounting should not have been updated" + ) + } + } + #[tokio::test] - async fn test_spending_accountings() { + async fn test_spend_amount() { let database = DATABASE_POOL.get().await.expect("Should get a DB pool"); setup_test_migrations(database.pool.clone()) @@ -285,47 +379,189 @@ mod test { let channel_id = DUMMY_CAMPAIGN.channel.id(); let earner = ADDRESSES["publisher"]; let spender = ADDRESSES["creator"]; + let spender_as_earner = spender; let other_spender = ADDRESSES["tester"]; - let amount = UnifiedNum::from(100_000_000); - let update_amount = UnifiedNum::from(200_000_000); + let cases = [ + // Spender & Earner insert + ( + UnifiedNum::from(100_000_000), + earner, + spender, + [ + vec![(earner, Side::Earner, UnifiedNum::from(100_000_000), false)], + vec![(spender, Side::Spender, UnifiedNum::from(100_000_000), false)], + ], + ), + // Spender & Earner update + ( + UnifiedNum::from(200_000_000), + earner, + spender, + [ + vec![(earner, Side::Earner, UnifiedNum::from(300_000_000), true)], + vec![(spender, Side::Spender, UnifiedNum::from(300_000_000), true)], + ], + ), + // Spender as an Earner & another spender + ( + UnifiedNum::from(999), + spender_as_earner, + other_spender, + [ + vec![(spender, Side::Earner, UnifiedNum::from(999), false)], + vec![(other_spender, Side::Spender, UnifiedNum::from(999), false)], + ], + ), + ]; + + for (amount_to_spend, earner, spender, [earners, spenders]) in cases { + // Spender & Earner insert + let mut balances = Balances::::default(); + balances + .spend(spender, earner, amount_to_spend) + .expect("Should spend"); + + let (actual_earners, actual_spenders) = + spend_amount(database.pool.clone(), channel_id, balances) + .await + .expect("Should insert Earner and Spender"); + + for (actual, expected) in actual_earners.into_iter().zip(earners) { + assert_accounting((expected.0, expected.1, expected.2), actual, expected.3) + } + + for (actual, expected) in actual_spenders.into_iter().zip(spenders) { + assert_accounting((expected.0, expected.1, expected.2), actual, expected.3) + } + } - // Spender & Earner insert - let (inserted_earner, inserted_spender) = spend_accountings(database.pool.clone(), channel_id, earner, spender, amount).await.expect("Should insert Earner and Spender"); - assert_eq!(earner, inserted_earner.address); - assert_eq!(Side::Earner, inserted_earner.side); - assert_eq!(UnifiedNum::from(100_000_000), inserted_earner.amount); - - assert_eq!(spender, inserted_spender.address); - assert_eq!(Side::Spender, inserted_spender.side); - assert_eq!(UnifiedNum::from(100_000_000), inserted_spender.amount); - - // Spender & Earner update - let (updated_earner, updated_spender) = spend_accountings(database.pool.clone(), channel_id, earner, spender, update_amount).await.expect("Should update Earner and Spender"); - - assert_eq!(earner, updated_earner.address); - assert_eq!(Side::Earner, updated_earner.side); - assert_eq!(UnifiedNum::from(300_000_000), updated_earner.amount, "Should add the newly earned amount to the existing one"); - - assert_eq!(spender, updated_spender.address); - assert_eq!(Side::Spender, updated_spender.side); - assert_eq!(UnifiedNum::from(300_000_000), updated_spender.amount, "Should add the newly spend amount to the existing one"); - - // Spender as an Earner & another spender - let (spender_as_earner, inserted_other_spender) = spend_accountings(database.pool.clone(), channel_id, spender, other_spender, UnifiedNum::from(999)).await.expect("Should update Spender as Earner and the Other Spender"); - - assert_eq!(spender, spender_as_earner.address); - assert_eq!(Side::Earner, spender_as_earner.side); - assert_eq!(UnifiedNum::from(999), spender_as_earner.amount, "Should add earner accounting for the previous Spender"); - - assert_eq!(other_spender, inserted_other_spender.address); - assert_eq!(Side::Spender, inserted_other_spender.side); - assert_eq!(UnifiedNum::from(999), inserted_other_spender.amount); - - let earned = get_accounting(database.pool.clone(), channel_id, spender, Side::Earner).await.expect("Should query for accounting").expect("Should have Earned accounting for Spender as Earner"); + // Check the final amounts of Spent/Earned for the Spender + let earned = get_accounting(database.pool.clone(), channel_id, spender, Side::Earner) + .await + .expect("Should query for accounting") + .expect("Should have Earned accounting for Spender as Earner"); assert_eq!(UnifiedNum::from(999), earned.amount); - - let spent = get_accounting(database.pool.clone(), channel_id, spender, Side::Spender).await.expect("Should query for accounting").expect("Should have Spent accounting for Spender as Earner"); + + let spent = get_accounting(database.pool.clone(), channel_id, spender, Side::Spender) + .await + .expect("Should query for accounting") + .expect("Should have Spent accounting for Spender as Earner"); assert_eq!(UnifiedNum::from(300_000_000), spent.amount); } + + #[tokio::test] + async fn test_spend_amount_with_multiple_spends() { + let database = DATABASE_POOL.get().await.expect("Should get a DB pool"); + + setup_test_migrations(database.pool.clone()) + .await + .expect("Migrations should succeed"); + + let channel_id = DUMMY_CAMPAIGN.channel.id(); + let earner = ADDRESSES["publisher"]; + let other_earner = ADDRESSES["publisher2"]; + let spender = ADDRESSES["creator"]; + let spender_as_earner = spender; + let other_spender = ADDRESSES["tester"]; + let third_spender = ADDRESSES["user"]; + + // Spenders & Earners insert + { + let mut balances = Balances::::default(); + balances + .spend(spender, earner, UnifiedNum::from(400_000)) + .expect("Should spend"); + balances + .spend(other_spender, other_earner, UnifiedNum::from(500_000)) + .expect("Should spend"); + + let (earners_acc, spenders_acc) = + spend_amount(database.pool.clone(), channel_id, balances) + .await + .expect("Should insert Earners and Spenders"); + + assert_eq!(2, earners_acc.len()); + assert_eq!(2, spenders_acc.len()); + + // Earners assertions + assert_accounting( + (earner, Side::Earner, UnifiedNum::from(400_000)), + earners_acc.iter().find(|a| a.address == earner).unwrap().clone(), + false, + ); + assert_accounting( + (other_earner, Side::Earner, UnifiedNum::from(500_000)), + earners_acc.iter().find(|a| a.address == other_earner).unwrap().clone(), + false, + ); + + // Spenders assertions + assert_accounting( + (spender, Side::Spender, UnifiedNum::from(400_000)), + spenders_acc.iter().find(|a| a.address == spender).unwrap().clone(), + false, + ); + assert_accounting( + (other_spender, Side::Spender, UnifiedNum::from(500_000)), + spenders_acc.iter().find(|a| a.address == other_spender).unwrap().clone(), + false, + ); + } + // Spenders & Earners update with 1 insert (third_spender & spender_as_earner) + { + let mut balances = Balances::::default(); + balances + .spend(spender, earner, UnifiedNum::from(1_400_000)) + .expect("Should spend"); + balances + .spend(other_spender, other_earner, UnifiedNum::from(1_500_000)) + .expect("Should spend"); + balances + .spend(third_spender, spender_as_earner, UnifiedNum::from(600_000)) + .expect("Should spend"); + + let (earners_acc, spenders_acc) = + spend_amount(database.pool.clone(), channel_id, balances) + .await + .expect("Should update & insert new Earners and Spenders"); + + assert_eq!(3, earners_acc.len()); + assert_eq!(3, spenders_acc.len()); + + // Earners assertions + assert_accounting( + (earner, Side::Earner, UnifiedNum::from(1_800_000)), + earners_acc.iter().find(|a| a.address == earner).unwrap().clone(), + true, + ); + assert_accounting( + (other_earner, Side::Earner, UnifiedNum::from(2_000_000)), + earners_acc.iter().find(|a| a.address == other_earner).unwrap().clone(), + true, + ); + assert_accounting( + (spender_as_earner, Side::Earner, UnifiedNum::from(600_000)), + earners_acc.iter().find(|a| a.address == spender_as_earner).unwrap().clone(), + false, + ); + + // Spenders assertions + assert_accounting( + (spender, Side::Spender, UnifiedNum::from(1_800_000)), + spenders_acc.iter().find(|a| a.address == spender).unwrap().clone(), + true, + ); + assert_accounting( + (other_spender, Side::Spender, UnifiedNum::from(2_000_000)), + spenders_acc.iter().find(|a| a.address == other_spender).unwrap().clone(), + true, + ); + assert_accounting( + (third_spender, Side::Spender, UnifiedNum::from(600_000)), + spenders_acc.iter().find(|a| a.address == third_spender).unwrap().clone(), + false, + ); + } + } } From 6f51c7098f0978c546c0c2d45dbe51ed5da06c77 Mon Sep 17 00:00:00 2001 From: Lachezar Lechev Date: Mon, 12 Jul 2021 10:08:39 +0300 Subject: [PATCH 07/12] sentry - routes - insert_events route - run rustfmt & fix clippy warnings --- primitives/src/sentry/accounting.rs | 4 +- sentry/src/db/accounting.rs | 61 +++- sentry/src/lib.rs | 4 +- sentry/src/payout.rs | 4 +- sentry/src/routes/campaign.rs | 473 +++++++++++++++++++++++----- sentry/src/spender.rs | 30 +- 6 files changed, 467 insertions(+), 109 deletions(-) diff --git a/primitives/src/sentry/accounting.rs b/primitives/src/sentry/accounting.rs index 64589ce23..867397659 100644 --- a/primitives/src/sentry/accounting.rs +++ b/primitives/src/sentry/accounting.rs @@ -59,12 +59,12 @@ impl Balances { let spent = self.spenders.entry(spender).or_default(); *spent = spent .checked_add(&amount) - .ok_or_else(|| OverflowError::Spender(spender))?; + .ok_or(OverflowError::Spender(spender))?; let earned = self.earners.entry(earner).or_default(); *earned = earned .checked_add(&amount) - .ok_or_else(|| OverflowError::Earner(earner))?; + .ok_or(OverflowError::Earner(earner))?; Ok(()) } diff --git a/sentry/src/db/accounting.rs b/sentry/src/db/accounting.rs index db1588994..ae79ac7bd 100644 --- a/sentry/src/db/accounting.rs +++ b/sentry/src/db/accounting.rs @@ -340,7 +340,6 @@ mod test { expected: (Address, Side, UnifiedNum), accounting: Accounting, with_set_updated: bool, - // ) -> anyhow::Result<()> { ) { assert_eq!( expected.0, accounting.address, @@ -487,24 +486,40 @@ mod test { // Earners assertions assert_accounting( (earner, Side::Earner, UnifiedNum::from(400_000)), - earners_acc.iter().find(|a| a.address == earner).unwrap().clone(), + earners_acc + .iter() + .find(|a| a.address == earner) + .unwrap() + .clone(), false, ); assert_accounting( (other_earner, Side::Earner, UnifiedNum::from(500_000)), - earners_acc.iter().find(|a| a.address == other_earner).unwrap().clone(), + earners_acc + .iter() + .find(|a| a.address == other_earner) + .unwrap() + .clone(), false, ); // Spenders assertions assert_accounting( (spender, Side::Spender, UnifiedNum::from(400_000)), - spenders_acc.iter().find(|a| a.address == spender).unwrap().clone(), + spenders_acc + .iter() + .find(|a| a.address == spender) + .unwrap() + .clone(), false, ); assert_accounting( (other_spender, Side::Spender, UnifiedNum::from(500_000)), - spenders_acc.iter().find(|a| a.address == other_spender).unwrap().clone(), + spenders_acc + .iter() + .find(|a| a.address == other_spender) + .unwrap() + .clone(), false, ); } @@ -532,34 +547,58 @@ mod test { // Earners assertions assert_accounting( (earner, Side::Earner, UnifiedNum::from(1_800_000)), - earners_acc.iter().find(|a| a.address == earner).unwrap().clone(), + earners_acc + .iter() + .find(|a| a.address == earner) + .unwrap() + .clone(), true, ); assert_accounting( (other_earner, Side::Earner, UnifiedNum::from(2_000_000)), - earners_acc.iter().find(|a| a.address == other_earner).unwrap().clone(), + earners_acc + .iter() + .find(|a| a.address == other_earner) + .unwrap() + .clone(), true, ); assert_accounting( (spender_as_earner, Side::Earner, UnifiedNum::from(600_000)), - earners_acc.iter().find(|a| a.address == spender_as_earner).unwrap().clone(), + earners_acc + .iter() + .find(|a| a.address == spender_as_earner) + .unwrap() + .clone(), false, ); // Spenders assertions assert_accounting( (spender, Side::Spender, UnifiedNum::from(1_800_000)), - spenders_acc.iter().find(|a| a.address == spender).unwrap().clone(), + spenders_acc + .iter() + .find(|a| a.address == spender) + .unwrap() + .clone(), true, ); assert_accounting( (other_spender, Side::Spender, UnifiedNum::from(2_000_000)), - spenders_acc.iter().find(|a| a.address == other_spender).unwrap().clone(), + spenders_acc + .iter() + .find(|a| a.address == other_spender) + .unwrap() + .clone(), true, ); assert_accounting( (third_spender, Side::Spender, UnifiedNum::from(600_000)), - spenders_acc.iter().find(|a| a.address == third_spender).unwrap().clone(), + spenders_acc + .iter() + .find(|a| a.address == third_spender) + .unwrap() + .clone(), false, ); } diff --git a/sentry/src/lib.rs b/sentry/src/lib.rs index 9289adad8..71c56bba4 100644 --- a/sentry/src/lib.rs +++ b/sentry/src/lib.rs @@ -102,10 +102,10 @@ impl Application { ) -> Self { Self { adapter, - config, logger, redis, pool, + config, } } @@ -188,7 +188,7 @@ async fn campaigns_router( let req = CampaignLoad.call(req, app).await?; - campaign::insert_events(req, app).await + campaign::insert_events::handle_route(req, app).await } else if let (Some(_caps), &Method::POST) = (CLOSE_CAMPAIGN_BY_CAMPAIGN_ID.captures(&path), method) { diff --git a/sentry/src/payout.rs b/sentry/src/payout.rs index 69aaefa4e..3a805e119 100644 --- a/sentry/src/payout.rs +++ b/sentry/src/payout.rs @@ -66,7 +66,7 @@ pub fn get_payout( let mut output = Output { show: true, boost: 1.0, - price: vec![(event_type.clone(), pricing.min.clone())] + price: vec![(event_type.clone(), pricing.min)] .into_iter() .collect(), }; @@ -78,7 +78,7 @@ pub fn get_payout( if output.show { let price = match output.price.get(&event_type) { Some(output_price) => { - max(pricing.min, min(pricing.max, output_price.clone())) + max(pricing.min, min(pricing.max, *output_price)) } None => max(pricing.min, pricing.max), }; diff --git a/sentry/src/routes/campaign.rs b/sentry/src/routes/campaign.rs index 5936b8974..e9574f62f 100644 --- a/sentry/src/routes/campaign.rs +++ b/sentry/src/routes/campaign.rs @@ -1,15 +1,8 @@ -use std::collections::HashMap; - -use crate::{ - access::{self, check_access}, - success_response, Application, Auth, ResponseError, Session, -}; -use chrono::Utc; +use crate::{success_response, Application, ResponseError}; use hyper::{Body, Request, Response}; use primitives::{ adapter::Adapter, - sentry::{campaign_create::CreateCampaign, Event, SuccessResponse}, - Campaign, + sentry::{campaign_create::CreateCampaign, SuccessResponse}, }; pub async fn create_campaign( @@ -51,75 +44,413 @@ pub async fn create_campaign( Ok(success_response(serde_json::to_string(&campaign)?)) } -pub async fn insert_events( - req: Request, - app: &Application, -) -> Result, ResponseError> { - let (req_head, req_body) = req.into_parts(); +pub mod insert_events { - let auth = req_head.extensions.get::(); - let session = req_head - .extensions - .get::() - .expect("request should have session"); + use std::collections::HashMap; - let campaign = req_head - .extensions - .get::() - .expect("request should have a Campaign loaded"); + use crate::{ + access::{self, check_access}, + db::{accounting::spend_amount, DbPool, PoolError, RedisError}, + payout::get_payout, + spender::fee::calculate_fee, + Application, Auth, ResponseError, Session, + }; + use hyper::{Body, Request, Response}; + use primitives::{ + adapter::Adapter, + sentry::{ + accounting::{Balances, CheckedState, OverflowError}, + Event, SuccessResponse, + }, + Address, Campaign, CampaignId, DomainError, UnifiedNum, ValidatorDesc, + }; + use redis::aio::MultiplexedConnection; + use thiserror::Error; - let body_bytes = hyper::body::to_bytes(req_body).await?; - let mut request_body = serde_json::from_slice::>>(&body_bytes)?; + // TODO AIP#61: Use the Campaign Modify const here + pub const CAMPAIGN_REMAINING_KEY: &str = "campaignRemaining"; - let events = request_body - .remove("events") - .ok_or_else(|| ResponseError::BadRequest("invalid request".to_string()))?; + #[derive(Debug, Error)] + pub enum Error { + #[error(transparent)] + Event(#[from] EventError), + #[error(transparent)] + Redis(#[from] RedisError), + #[error(transparent)] + Postgres(#[from] PoolError), + #[error(transparent)] + Overflow(#[from] OverflowError), + } - let processed = process_events(app, auth, session, campaign, events).await?; + #[derive(Debug, Error, PartialEq)] + pub enum EventError { + #[error("Overflow when calculating Event payout for Event")] + EventPayoutOverflow, + #[error("Validator Fee calculation: {0}")] + FeeCalculation(#[from] DomainError), + #[error( + "The Campaign's remaining budget left to spend is not enough to cover the Event payout" + )] + CampaignRemainingNotEnoughForPayout, + #[error("Campaign ran out of remaining budget to spend")] + CampaignOutOfBudget, + } - Ok(Response::builder() - .header("Content-type", "application/json") - .body(serde_json::to_string(&SuccessResponse { success: processed })?.into()) - .unwrap()) -} + pub async fn handle_route( + req: Request, + app: &Application, + ) -> Result, ResponseError> { + let (req_head, req_body) = req.into_parts(); -async fn process_events( - app: &Application, - auth: Option<&Auth>, - session: &Session, - campaign: &Campaign, - events: Vec, -) -> Result { - if &Utc::now() > &campaign.active.to { - return Err(ResponseError::BadRequest("Campaign is expired".into())); + let auth = req_head.extensions.get::(); + let session = req_head + .extensions + .get::() + .expect("request should have session"); + + let campaign = req_head + .extensions + .get::() + .expect("request should have a Campaign loaded"); + + let body_bytes = hyper::body::to_bytes(req_body).await?; + let mut request_body = serde_json::from_slice::>>(&body_bytes)?; + + let events = request_body + .remove("events") + .ok_or_else(|| ResponseError::BadRequest("invalid request".to_string()))?; + + let processed = process_events(app, auth, session, campaign, events).await?; + + Ok(Response::builder() + .header("Content-type", "application/json") + .body(serde_json::to_string(&SuccessResponse { success: processed })?.into()) + .unwrap()) + } + + async fn process_events( + app: &Application, + auth: Option<&Auth>, + session: &Session, + campaign: &Campaign, + events: Vec, + ) -> Result { + // handle events - check access + check_access( + &app.redis, + session, + auth, + &app.config.ip_rate_limit, + &campaign, + &events, + ) + .await + .map_err(|e| match e { + access::Error::ForbiddenReferrer => ResponseError::Forbidden(e.to_string()), + access::Error::RulesError(error) => ResponseError::TooManyRequests(error), + access::Error::UnAuthenticated => ResponseError::Unauthorized, + _ => ResponseError::BadRequest(e.to_string()), + })?; + + let (leader, follower) = match (campaign.leader(), campaign.follower()) { + // ERROR! + (None, None) | (None, _) | (_, None) => { + return Err(ResponseError::BadRequest( + "Channel leader, follower or both were not found in Campaign validators." + .to_string(), + )) + } + (Some(leader), Some(follower)) => (leader, follower), + }; + + let mut events_success = vec![]; + for event in events.into_iter() { + let result: Result, Error> = { + // calculate earners payouts + let payout = get_payout(&app.logger, campaign, &event, session)?; + + match payout { + Some((earner, payout)) => spend_for_event( + &app.pool, + app.redis.clone(), + &campaign, + earner, + leader, + follower, + payout, + ) + .await + .map(Some), + None => Ok(None), + } + }; + + events_success.push((event, result)); + } + + // TODO AIP#61 - aggregate Events and put into analytics + + Ok(true) + } + + pub async fn spend_for_event( + pool: &DbPool, + mut redis: MultiplexedConnection, + campaign: &Campaign, + earner: Address, + leader: &ValidatorDesc, + follower: &ValidatorDesc, + amount: UnifiedNum, + ) -> Result<(), Error> { + // distribute fees + let leader_fee = + calculate_fee((earner, amount), &leader).map_err(EventError::FeeCalculation)?; + let follower_fee = + calculate_fee((earner, amount), &follower).map_err(EventError::FeeCalculation)?; + + // First update redis `campaignRemaining:{CampaignId}` key + let spending = [amount, leader_fee, follower_fee] + .iter() + .sum::>() + .ok_or(EventError::EventPayoutOverflow)?; + + if !has_enough_remaining_budget(&mut redis, campaign.id, spending).await? { + return Err(Error::Event( + EventError::CampaignRemainingNotEnoughForPayout, + )); + } + + // The event payout decreases the remaining budget for the Campaign + let remaining = decrease_remaining_budget(&mut redis, campaign.id, spending).await?; + + // Update the Accounting records accordingly + let channel_id = campaign.channel.id(); + let spender = campaign.creator; + + let mut delta_balances = Balances::::default(); + delta_balances.spend(spender, earner, amount)?; + delta_balances.spend(spender, leader.id.to_address(), leader_fee)?; + delta_balances.spend(spender, follower.id.to_address(), follower_fee)?; + + let (_earners, _spenders) = spend_amount(pool.clone(), channel_id, delta_balances).await?; + + // check if we still have budget to spend, after we've updated both Redis and Postgres + if remaining.is_negative() { + Err(Error::Event(EventError::CampaignOutOfBudget)) + } else { + Ok(()) + } + } + + async fn has_enough_remaining_budget( + redis: &mut MultiplexedConnection, + campaign: CampaignId, + amount: UnifiedNum, + ) -> Result { + let key = format!("{}:{}", CAMPAIGN_REMAINING_KEY, campaign); + + let remaining = redis::cmd("GET") + .arg(&key) + .query_async::<_, Option>(redis) + .await? + .unwrap_or_default(); + + Ok(remaining > 0 && remaining.unsigned_abs() > amount.to_u64()) + } + + async fn decrease_remaining_budget( + redis: &mut MultiplexedConnection, + campaign: CampaignId, + amount: UnifiedNum, + ) -> Result { + let key = format!("{}:{}", CAMPAIGN_REMAINING_KEY, campaign); + + let remaining = redis::cmd("DECRBY") + .arg(&key) + .arg(amount.to_u64()) + .query_async::<_, i64>(redis) + .await?; + + Ok(remaining) } - // - // TODO #381: AIP#61 Spender Aggregator should be called - // - - // handle events - check access - // handle events - Update targeting rules - // calculate payout - // distribute fees - // handle spending - Spender Aggregate - // handle events - aggregate Events and put into analytics - - check_access( - &app.redis, - session, - auth, - &app.config.ip_rate_limit, - &campaign, - &events, - ) - .await - .map_err(|e| match e { - access::Error::ForbiddenReferrer => ResponseError::Forbidden(e.to_string()), - access::Error::RulesError(error) => ResponseError::TooManyRequests(error), - access::Error::UnAuthenticated => ResponseError::Unauthorized, - _ => ResponseError::BadRequest(e.to_string()), - })?; - - Ok(true) + #[cfg(test)] + mod test { + use primitives::util::tests::prep_db::{ADDRESSES, DUMMY_CAMPAIGN}; + + use crate::db::{ + redis_pool::TESTS_POOL, + tests_postgres::{setup_test_migrations, DATABASE_POOL}, + }; + + use super::*; + + /// Helper function to get the Campaign Remaining budget in Redis for the tests + async fn get_campaign_remaining( + redis: &mut MultiplexedConnection, + campaign: CampaignId, + ) -> Option { + let key = format!("{}:{}", CAMPAIGN_REMAINING_KEY, campaign); + + redis::cmd("GET") + .arg(&key) + .query_async(redis) + .await + .expect("Should set Campaign remaining key") + } + + /// Helper function to set the Campaign Remaining budget in Redis for the tests + async fn set_campaign_remaining( + redis: &mut MultiplexedConnection, + campaign: CampaignId, + remaining: i64, + ) { + let key = format!("{}:{}", CAMPAIGN_REMAINING_KEY, campaign); + + redis::cmd("SET") + .arg(&key) + .arg(remaining) + .query_async::<_, ()>(redis) + .await + .expect("Should set Campaign remaining key"); + } + + #[tokio::test] + async fn test_has_enough_remaining_budget() { + let mut redis = TESTS_POOL.get().await.expect("Should get redis connection"); + let campaign = DUMMY_CAMPAIGN.id; + let amount = UnifiedNum::from(10_000); + + let no_remaining_budget_set = has_enough_remaining_budget(&mut redis, campaign, amount) + .await + .expect("Should check campaign remaining"); + assert!( + !no_remaining_budget_set, + "No remaining budget set, should return false" + ); + + set_campaign_remaining(&mut redis, campaign, 9_000).await; + + let not_enough_remaining_budget = + has_enough_remaining_budget(&mut redis, campaign, amount) + .await + .expect("Should check campaign remaining"); + assert!( + !not_enough_remaining_budget, + "Not enough remaining budget, should return false" + ); + + set_campaign_remaining(&mut redis, campaign, 11_000).await; + + let has_enough_remaining_budget = + has_enough_remaining_budget(&mut redis, campaign, amount) + .await + .expect("Should check campaign remaining"); + + assert!( + has_enough_remaining_budget, + "Should have enough budget for this amount" + ); + } + + #[tokio::test] + async fn test_decreasing_remaining_budget() { + let mut redis = TESTS_POOL.get().await.expect("Should get redis connection"); + let campaign = DUMMY_CAMPAIGN.id; + let amount = UnifiedNum::from(5_000); + + set_campaign_remaining(&mut redis, campaign, 9_000).await; + + let remaining = decrease_remaining_budget(&mut redis, campaign, amount) + .await + .expect("Should decrease campaign remaining"); + assert_eq!( + 4_000, remaining, + "Should decrease remaining budget with amount and be positive" + ); + + let remaining = decrease_remaining_budget(&mut redis, campaign, amount) + .await + .expect("Should decrease campaign remaining"); + assert_eq!( + -1_000, remaining, + "Should decrease remaining budget with amount and be negative" + ); + } + + #[tokio::test] + async fn test_spending_for_events_with_enough_remaining_budget() { + let mut redis = TESTS_POOL.get().await.expect("Should get redis connection"); + let database = DATABASE_POOL.get().await.expect("Should get a DB pool"); + + setup_test_migrations(database.pool.clone()) + .await + .expect("Migrations should succeed"); + + let campaign = DUMMY_CAMPAIGN.clone(); + + let publisher = ADDRESSES["publisher"]; + + let leader = campaign.leader().unwrap(); + let follower = campaign.follower().unwrap(); + let payout = UnifiedNum::from(300); + + // No Campaign Remaining set, should error + { + let spend_event = spend_for_event( + &database.pool, + redis.connection.clone(), + &campaign, + publisher, + leader, + follower, + payout, + ) + .await; + + assert!( + matches!( + spend_event, + Err(Error::Event( + EventError::CampaignRemainingNotEnoughForPayout + )) + ), + "Campaign budget has no remaining funds to spend" + ); + } + + // Repeat the same call, but set the Campaign remaining budget in Redis + { + set_campaign_remaining(&mut redis, campaign.id, 11_000).await; + + let spend_event = spend_for_event( + &database.pool, + redis.connection.clone(), + &campaign, + publisher, + leader, + follower, + payout, + ) + .await; + + assert!( + dbg!(spend_event).is_ok(), + "Campaign budget has no remaining funds to spend" + ); + + // Payout: 300 + // Leader fee: 100 + // Leader payout: 300 * 100 / 1000 = 30 + // Follower fee: 100 + // Follower payout: 300 * 100 / 1000 = 30 + assert_eq!( + 10_640_i64, + get_campaign_remaining(&mut redis.connection, campaign.id) + .await + .expect("Should have key") + ) + } + } + } } diff --git a/sentry/src/spender.rs b/sentry/src/spender.rs index da6b9cacd..46f91bc7e 100644 --- a/sentry/src/spender.rs +++ b/sentry/src/spender.rs @@ -26,31 +26,19 @@ impl Aggregator { pub mod fee { pub const PRO_MILLE: UnifiedNum = UnifiedNum::from_u64(1_000); - use primitives::{Address, Campaign, DomainError, UnifiedNum, ValidatorId}; + use primitives::{Address, DomainError, UnifiedNum, ValidatorDesc}; /// Calculates the fee for a specified validator /// This function will return None if the provided validator is not part of the Campaign / Channel /// In the case of overflow when calculating the payout, an error will be returned - pub fn calculate_fees( + pub fn calculate_fee( (_earner, payout): (Address, UnifiedNum), - campaign: &Campaign, - for_validator: ValidatorId, - ) -> Result, DomainError> { - let payout = match campaign.find_validator(&for_validator) { - Some(validator_role) => { - // should never overflow - let fee_payout = payout - .checked_mul(&validator_role.validator().fee) - .ok_or_else(|| { - DomainError::InvalidArgument("payout calculation overflow".to_string()) - })? - .div_floor(&PRO_MILLE); - - Some(fee_payout) - } - None => None, - }; - - Ok(payout) + validator: &ValidatorDesc, + ) -> Result { + // should never overflow + payout + .checked_mul(&validator.fee) + .map(|pro_mille_fee| pro_mille_fee.div_floor(&PRO_MILLE)) + .ok_or_else(|| DomainError::InvalidArgument("payout calculation overflow".to_string())) } } From 2752dea1b263a18e5bc98fbc1b74581662077387 Mon Sep 17 00:00:00 2001 From: Lachezar Lechev Date: Mon, 9 Aug 2021 12:02:26 +0300 Subject: [PATCH 08/12] fix PR comments --- sentry/src/db/accounting.rs | 13 +++++++------ sentry/src/routes/campaign.rs | 2 +- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/sentry/src/db/accounting.rs b/sentry/src/db/accounting.rs index ae79ac7bd..f09741a4b 100644 --- a/sentry/src/db/accounting.rs +++ b/sentry/src/db/accounting.rs @@ -11,6 +11,8 @@ use tokio_postgres::{ use super::{DbPool, PoolError}; use thiserror::Error; +static UPDATE_ACCOUNTING_STATEMENT: &str = "INSERT INTO accounting(channel_id, side, address, amount, updated, created) VALUES($1, $2, $3, $4, $5, $6) ON CONFLICT ON CONSTRAINT accounting_pkey DO UPDATE SET amount = accounting.amount + $4, updated = $6 WHERE accounting.channel_id = $1 AND accounting.side = $2 AND accounting.address = $3 RETURNING channel_id, side, address, amount, updated, created"; + #[derive(Debug, Error)] pub enum Error { #[error("Accounting Balances error: {0}")] @@ -84,7 +86,6 @@ pub async fn get_accounting( /// Will update current Spender/Earner amount or insert a new Accounting record /// /// See `UPDATE_ACCOUNTING_STATEMENT` static for full query. -static UPDATE_ACCOUNTING_STATEMENT: &str = "INSERT INTO accounting(channel_id, side, address, amount, updated, created) VALUES($1, $2, $3, $4, $5, $6) ON CONFLICT ON CONSTRAINT accounting_pkey DO UPDATE SET amount = accounting.amount + $4, updated = $6 WHERE accounting.channel_id = $1 AND accounting.side = $2 AND accounting.address = $3 RETURNING channel_id, side, address, amount, updated, created"; pub async fn update_accounting( pool: DbPool, channel_id: ChannelId, @@ -204,7 +205,7 @@ mod test { .expect("Should insert"); assert_eq!(spender, inserted.address); assert_eq!(Side::Spender, inserted.side); - assert_eq!(UnifiedNum::from(100_000_000), inserted.amount); + assert_eq!(amount, inserted.amount); let updated = update_accounting( database.pool.clone(), @@ -218,7 +219,7 @@ mod test { assert_eq!(spender, updated.address); assert_eq!(Side::Spender, updated.side); assert_eq!( - UnifiedNum::from(300_000_000), + amount + update_amount, updated.amount, "Should add the newly spent amount to the existing one" ); @@ -247,7 +248,7 @@ mod test { .expect("Should insert"); assert_eq!(earner, inserted.address); assert_eq!(Side::Earner, inserted.side); - assert_eq!(UnifiedNum::from(100_000_000), inserted.amount); + assert_eq!(amount, inserted.amount); let updated = update_accounting( database.pool.clone(), @@ -261,7 +262,7 @@ mod test { assert_eq!(earner, updated.address); assert_eq!(Side::Earner, updated.side); assert_eq!( - UnifiedNum::from(300_000_000), + amount + update_amount, updated.amount, "Should add the newly earned amount to the existing one" ); @@ -293,7 +294,7 @@ mod test { .expect("Should insert"); assert_eq!(spender_as_earner, inserted.address); assert_eq!(Side::Earner, inserted.side); - assert_eq!(UnifiedNum::from(100_000_000), inserted.amount); + assert_eq!(amount, inserted.amount); let updated = update_accounting( database.pool.clone(), diff --git a/sentry/src/routes/campaign.rs b/sentry/src/routes/campaign.rs index e9574f62f..e036a4db8 100644 --- a/sentry/src/routes/campaign.rs +++ b/sentry/src/routes/campaign.rs @@ -435,7 +435,7 @@ pub mod insert_events { .await; assert!( - dbg!(spend_event).is_ok(), + spend_event.is_ok(), "Campaign budget has no remaining funds to spend" ); From 70376b30313db79643bc1c69aacefb0b112b1212 Mon Sep 17 00:00:00 2001 From: Lachezar Lechev Date: Mon, 9 Aug 2021 17:42:35 +0300 Subject: [PATCH 09/12] Fix merge issues & run rustfmt --- sentry/src/lib.rs | 18 ++++--- sentry/src/main.rs | 20 ++++++-- sentry/src/payout.rs | 4 +- sentry/src/routes/campaign.rs | 89 ++++++++++++++++++----------------- 4 files changed, 75 insertions(+), 56 deletions(-) diff --git a/sentry/src/lib.rs b/sentry/src/lib.rs index 27c4be023..713aa80e7 100644 --- a/sentry/src/lib.rs +++ b/sentry/src/lib.rs @@ -105,7 +105,7 @@ impl Application { logger: Logger, redis: MultiplexedConnection, pool: DbPool, - campaign_remaining: CampaignRemaining + campaign_remaining: CampaignRemaining, ) -> Self { Self { adapter, @@ -511,14 +511,18 @@ pub mod test_util { use primitives::{ adapter::DummyAdapterOptions, config::configuration, - util::tests::{ - discard_logger, - prep_db::{IDS}, + util::tests::{discard_logger, prep_db::IDS}, + }; + + use crate::{ + db::{ + redis_pool::TESTS_POOL, + tests_postgres::{setup_test_migrations, DATABASE_POOL}, + CampaignRemaining, }, + Application, }; - use crate::{Application, db::{CampaignRemaining, redis_pool::TESTS_POOL, tests_postgres::{setup_test_migrations, DATABASE_POOL}}}; - /// Uses production configuration to setup the correct Contract addresses for tokens. pub async fn setup_dummy_app() -> Application { let config = configuration("production", None).expect("Should get Config"); @@ -546,7 +550,7 @@ pub mod test_util { discard_logger(), redis.connection.clone(), database.pool.clone(), - campaign_remaining + campaign_remaining, ); app diff --git a/sentry/src/main.rs b/sentry/src/main.rs index 634831e5d..0ab25891c 100644 --- a/sentry/src/main.rs +++ b/sentry/src/main.rs @@ -10,7 +10,7 @@ use primitives::adapter::{Adapter, DummyAdapterOptions, KeystoreOptions}; use primitives::config::configuration; use primitives::util::tests::prep_db::{AUTH, IDS}; use primitives::ValidatorId; -use sentry::db::{CampaignRemaining, postgres_connection, redis_connection, setup_migrations}; +use sentry::db::{postgres_connection, redis_connection, setup_migrations, CampaignRemaining}; use sentry::Application; use slog::{error, info, Logger}; use std::{ @@ -120,14 +120,28 @@ async fn main() -> Result<(), Box> { match adapter { AdapterTypes::EthereumAdapter(adapter) => { run( - Application::new(*adapter, config, logger, redis, postgres, campaign_remaining), + Application::new( + *adapter, + config, + logger, + redis, + postgres, + campaign_remaining, + ), socket_addr, ) .await } AdapterTypes::DummyAdapter(adapter) => { run( - Application::new(*adapter, config, logger, redis, postgres, campaign_remaining), + Application::new( + *adapter, + config, + logger, + redis, + postgres, + campaign_remaining, + ), socket_addr, ) .await diff --git a/sentry/src/payout.rs b/sentry/src/payout.rs index 3a805e119..9c95751a5 100644 --- a/sentry/src/payout.rs +++ b/sentry/src/payout.rs @@ -77,9 +77,7 @@ pub fn get_payout( if output.show { let price = match output.price.get(&event_type) { - Some(output_price) => { - max(pricing.min, min(pricing.max, *output_price)) - } + Some(output_price) => max(pricing.min, min(pricing.max, *output_price)), None => max(pricing.min, pricing.max), }; diff --git a/sentry/src/routes/campaign.rs b/sentry/src/routes/campaign.rs index bf68cfdd7..955e6ab6c 100644 --- a/sentry/src/routes/campaign.rs +++ b/sentry/src/routes/campaign.rs @@ -1,38 +1,22 @@ -<<<<<<< HEAD -use crate::{success_response, Application, ResponseError}; -use hyper::{Body, Request, Response}; -use primitives::{ - adapter::Adapter, - sentry::{campaign_create::CreateCampaign, SuccessResponse}, -======= use crate::{ - access::{self, check_access}, db::{ - accounting::get_accounting_spent, + accounting::{get_accounting, Side}, campaign::{get_campaigns_by_channel, insert_campaign, update_campaign}, spendable::fetch_spendable, CampaignRemaining, DbPool, RedisError, }, - success_response, Application, Auth, ResponseError, Session, + success_response, Application, Auth, ResponseError, }; -use chrono::Utc; use deadpool_postgres::PoolError; use hyper::{Body, Request, Response}; use primitives::{ adapter::Adapter, campaign_validator::Validator, - sentry::{ - campaign_create::{CreateCampaign, ModifyCampaign}, - Event, SuccessResponse, - }, + sentry::campaign_create::{CreateCampaign, ModifyCampaign}, Address, Campaign, UnifiedNum, }; use slog::error; -use std::{ - cmp::{max, Ordering}, - collections::HashMap, ->>>>>>> issue-382-campaign-routes -}; +use std::cmp::{max, Ordering}; use thiserror::Error; use tokio_postgres::error::SqlState; @@ -88,9 +72,15 @@ pub async fn create_campaign( let total_remaining = { - let accounting_spent = - get_accounting_spent(app.pool.clone(), &campaign.creator, &campaign.channel.id()) - .await?; + let accounting_spent = get_accounting( + app.pool.clone(), + campaign.channel.id(), + campaign.creator, + Side::Spender, + ) + .await? + .map(|accounting| accounting.amount) + .unwrap_or_default(); let latest_spendable = fetch_spendable(app.pool.clone(), &campaign.creator, &campaign.channel.id()) @@ -168,7 +158,7 @@ pub async fn create_campaign( } pub mod update_campaign { - use crate::db::CampaignRemaining; + use crate::db::{accounting::Side, CampaignRemaining}; use super::*; @@ -221,9 +211,15 @@ pub mod update_campaign { // sum(AllChannelCampaigns.map(getRemaining)) + DeltaBudgetForMutatedCampaign <= totalDeposited - totalSpent // sum(AllChannelCampaigns.map(getRemaining)) - DeltaBudgetForMutatedCampaign <= totalDeposited - totalSpent if let Some(delta_budget) = delta_budget { - let accounting_spent = - get_accounting_spent(pool.clone(), &campaign.creator, &campaign.channel.id()) - .await?; + let accounting_spent = get_accounting( + pool.clone(), + campaign.channel.id(), + campaign.creator, + Side::Spender, + ) + .await? + .map(|accounting| accounting.amount) + .unwrap_or_default(); let latest_spendable = fetch_spendable(pool.clone(), &campaign.creator, &campaign.channel.id()) @@ -779,12 +775,11 @@ pub mod insert_events { mod test { use super::{update_campaign::modify_campaign, *}; use crate::{ - db::{accounting::insert_accounting, spendable::insert_spendable}, + db::{accounting::update_accounting, spendable::insert_spendable}, test_util::setup_dummy_app, }; use hyper::StatusCode; use primitives::{ - sentry::accounting::{Balances, CheckedState}, spender::{Deposit, Spendable}, util::tests::prep_db::DUMMY_CAMPAIGN, ValidatorId, @@ -831,13 +826,15 @@ mod test { .await .expect("Should insert Spendable for Campaign creator")); - let mut balances = Balances::::default(); - balances.add_spender(create.creator); - - // TODO: Replace this once https://github.com/AdExNetwork/adex-validator-stack-rust/pull/413 is merged - let _accounting = insert_accounting(app.pool.clone(), create.channel.clone(), balances) - .await - .expect("Should create Accounting"); + let _accounting = update_accounting( + app.pool.clone(), + create.channel.id(), + create.creator, + Side::Spender, + UnifiedNum::default(), + ) + .await + .expect("Should create Accounting"); let create_response = create_campaign(build_request(create), &app) .await @@ -928,7 +925,12 @@ mod test { .await .expect_err("Should return Error response"); - assert_eq!(ResponseError::BadRequest("Not enough deposit left for the new campaign's budget".to_string()), create_err); + assert_eq!( + ResponseError::BadRequest( + "Not enough deposit left for the new campaign's budget".to_string() + ), + create_err + ); } // modify first campaign, by lowering the budget from 1000 to 900 @@ -967,7 +969,7 @@ mod test { .await .expect("Should return create campaign"); - let json = hyper::body::to_bytes(create_response.into_body()) + let json = hyper::body::to_bytes(create_response.into_body()) .await .expect("Should get json"); @@ -991,12 +993,13 @@ mod test { targeting_rules: None, }; - let modify_err = - modify_campaign(&app.pool, &app.campaign_remaining, modified, modify) - .await - .expect_err("Should return Error response"); + let modify_err = modify_campaign(&app.pool, &app.campaign_remaining, modified, modify) + .await + .expect_err("Should return Error response"); - assert!(matches!(modify_err, Error::NewBudget(string) if string == "Not enough deposit left for the campaign's new budget")); + assert!( + matches!(modify_err, Error::NewBudget(string) if string == "Not enough deposit left for the campaign's new budget") + ); } } } From 1fca9ec2e3eadbb18e44c80dca31ebfb8c64c355 Mon Sep 17 00:00:00 2001 From: Lachezar Lechev Date: Tue, 10 Aug 2021 09:05:16 +0300 Subject: [PATCH 10/12] fix clippy warnings --- adapter/src/dummy.rs | 4 +- adapter/src/ethereum.rs | 14 +++--- adview-manager/src/lib.rs | 2 +- primitives/src/adapter.rs | 2 +- primitives/src/big_num.rs | 2 +- primitives/src/campaign.rs | 2 +- primitives/src/campaign_validator.rs | 2 +- primitives/src/channel.rs | 6 +-- primitives/src/sentry.rs | 2 +- primitives/src/sentry/accounting.rs | 12 ++++-- primitives/src/util/tests/time.rs | 2 +- primitives/src/validator.rs | 4 +- sentry/src/access.rs | 10 ++--- sentry/src/analytics_recorder.rs | 1 - sentry/src/db.rs | 2 +- sentry/src/lib.rs | 54 ++++++++++-------------- sentry/src/payout.rs | 2 +- sentry/src/routes/analytics.rs | 4 +- sentry/src/routes/campaign.rs | 27 ++++++------ sentry/src/routes/channel.rs | 6 +-- sentry/src/routes/validator_message.rs | 2 +- validator_worker/src/core/events.rs | 4 +- validator_worker/src/follower.rs | 16 +++---- validator_worker/src/heartbeat.rs | 2 +- validator_worker/src/leader.rs | 8 ++-- validator_worker/src/main.rs | 14 +++--- validator_worker/src/sentry_interface.rs | 16 +++---- 27 files changed, 109 insertions(+), 113 deletions(-) diff --git a/adapter/src/dummy.rs b/adapter/src/dummy.rs index cc9c98b3a..ef5152332 100644 --- a/adapter/src/dummy.rs +++ b/adapter/src/dummy.rs @@ -92,8 +92,8 @@ impl Adapter for DummyAdapter { Ok(()) } - fn whoami(&self) -> &ValidatorId { - &self.identity + fn whoami(&self) -> ValidatorId { + self.identity } fn sign(&self, state_root: &str) -> AdapterResult { diff --git a/adapter/src/ethereum.rs b/adapter/src/ethereum.rs index 681f58041..cab922829 100644 --- a/adapter/src/ethereum.rs +++ b/adapter/src/ethereum.rs @@ -149,8 +149,8 @@ impl Adapter for EthereumAdapter { Ok(()) } - fn whoami(&self) -> &ValidatorId { - &self.address + fn whoami(&self) -> ValidatorId { + self.address } fn sign(&self, state_root: &str) -> AdapterResult { @@ -263,7 +263,7 @@ impl Adapter for EthereumAdapter { address: self.whoami().to_checksum(), }; - ewt_sign(&wallet, &self.keystore_pwd, &payload) + ewt_sign(wallet, &self.keystore_pwd, &payload) .map_err(|err| AdapterError::Adapter(Error::SignMessage(err).into())) } @@ -401,8 +401,8 @@ fn hash_message(message: &[u8]) -> [u8; 32] { let message_length = message.len(); let mut result = Keccak::new_keccak256(); - result.update(&format!("{}{}", eth, message_length).as_bytes()); - result.update(&message); + result.update(format!("{}{}", eth, message_length).as_bytes()); + result.update(message); let mut res: [u8; 32] = [0; 32]; result.finalize(&mut res); @@ -453,7 +453,7 @@ pub fn ewt_sign( base64::URL_SAFE_NO_PAD, ); let message = Message::from(hash_message( - &format!("{}.{}", header_encoded, payload_encoded).as_bytes(), + format!("{}.{}", header_encoded, payload_encoded).as_bytes(), )); let signature: Signature = signer .sign(password, &message) @@ -475,7 +475,7 @@ pub fn ewt_verify( token: &str, ) -> Result { let message = Message::from(hash_message( - &format!("{}.{}", header_encoded, payload_encoded).as_bytes(), + format!("{}.{}", header_encoded, payload_encoded).as_bytes(), )); let decoded_signature = base64::decode_config(&token, base64::URL_SAFE_NO_PAD) diff --git a/adview-manager/src/lib.rs b/adview-manager/src/lib.rs index cae144279..bc845304d 100644 --- a/adview-manager/src/lib.rs +++ b/adview-manager/src/lib.rs @@ -172,7 +172,7 @@ fn get_unit_html( ) -> String { let image_url = normalize_url(&ad_unit.media_url); - let element_html = if is_video(&ad_unit) { + let element_html = if is_video(ad_unit) { video_html(on_load, size, &image_url, &ad_unit.media_mime) } else { image_html(on_load, size, &image_url) diff --git a/primitives/src/adapter.rs b/primitives/src/adapter.rs index af336daff..8eb0ed347 100644 --- a/primitives/src/adapter.rs +++ b/primitives/src/adapter.rs @@ -81,7 +81,7 @@ pub trait Adapter: Send + Sync + fmt::Debug + Clone { fn unlock(&mut self) -> AdapterResult<(), Self::AdapterError>; /// Get Adapter whoami - fn whoami(&self) -> &ValidatorId; + fn whoami(&self) -> ValidatorId; /// Signs the provided state_root fn sign(&self, state_root: &str) -> AdapterResult; diff --git a/primitives/src/big_num.rs b/primitives/src/big_num.rs index 2fd30bbb6..0633baea9 100644 --- a/primitives/src/big_num.rs +++ b/primitives/src/big_num.rs @@ -243,7 +243,7 @@ impl TryFrom<&str> for BigNum { type Error = super::DomainError; fn try_from(num: &str) -> Result { - let big_uint = BigUint::from_str(&num) + let big_uint = BigUint::from_str(num) .map_err(|err| super::DomainError::InvalidArgument(err.to_string()))?; Ok(Self(big_uint)) diff --git a/primitives/src/campaign.rs b/primitives/src/campaign.rs index ad3ca0167..7a427689d 100644 --- a/primitives/src/campaign.rs +++ b/primitives/src/campaign.rs @@ -318,7 +318,7 @@ pub mod validators { } pub fn iter(&self) -> Iter<'_> { - Iter::new(&self) + Iter::new(self) } } diff --git a/primitives/src/campaign_validator.rs b/primitives/src/campaign_validator.rs index 657b12040..d807aca3c 100644 --- a/primitives/src/campaign_validator.rs +++ b/primitives/src/campaign_validator.rs @@ -57,7 +57,7 @@ impl Validator for Campaign { return Err(Validation::UnlistedValidator.into()); } - if !creator_listed(&self, &config.creators_whitelist) { + if !creator_listed(self, &config.creators_whitelist) { return Err(Validation::UnlistedCreator.into()); } diff --git a/primitives/src/channel.rs b/primitives/src/channel.rs index 507d4060b..84ee2b0ad 100644 --- a/primitives/src/channel.rs +++ b/primitives/src/channel.rs @@ -238,9 +238,9 @@ impl SpecValidators { pub fn find(&self, validator_id: &ValidatorId) -> Option> { if &self.leader().id == validator_id { - Some(SpecValidator::Leader(&self.leader())) + Some(SpecValidator::Leader(self.leader())) } else if &self.follower().id == validator_id { - Some(SpecValidator::Follower(&self.follower())) + Some(SpecValidator::Follower(self.follower())) } else { None } @@ -257,7 +257,7 @@ impl SpecValidators { } pub fn iter(&self) -> Iter<'_> { - Iter::new(&self) + Iter::new(self) } } diff --git a/primitives/src/sentry.rs b/primitives/src/sentry.rs index ba07b4035..cda0813b9 100644 --- a/primitives/src/sentry.rs +++ b/primitives/src/sentry.rs @@ -380,7 +380,7 @@ pub mod campaign_create { } } } - + // All editable fields stored in one place, used for checking when a budget is changed #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct ModifyCampaign { diff --git a/primitives/src/sentry/accounting.rs b/primitives/src/sentry/accounting.rs index 74a3aa677..53a4efb44 100644 --- a/primitives/src/sentry/accounting.rs +++ b/primitives/src/sentry/accounting.rs @@ -69,14 +69,18 @@ impl Balances { Ok(()) } - /// Adds the spender to the Balances with `UnifiedNum::from(0)` if he does not exist + /// Adds the spender to the Balances with `0` if he does not exist pub fn add_spender(&mut self, spender: Address) { - self.spenders.entry(spender).or_insert(UnifiedNum::from(0)); + self.spenders + .entry(spender) + .or_insert_with(UnifiedNum::default); } - /// Adds the earner to the Balances with `UnifiedNum::from(0)` if he does not exist + /// Adds the earner to the Balances with `0` if he does not exist pub fn add_earner(&mut self, earner: Address) { - self.earners.entry(earner).or_insert(UnifiedNum::from(0)); + self.earners + .entry(earner) + .or_insert_with(UnifiedNum::default); } } diff --git a/primitives/src/util/tests/time.rs b/primitives/src/util/tests/time.rs index 349165fae..fc16e5422 100644 --- a/primitives/src/util/tests/time.rs +++ b/primitives/src/util/tests/time.rs @@ -25,5 +25,5 @@ pub fn past_datetime(from: Option<&DateTime>) -> DateTime { let from = from.unwrap_or(&default_from); - datetime_between(&from, Some(&to)) + datetime_between(from, Some(&to)) } diff --git a/primitives/src/validator.rs b/primitives/src/validator.rs index 72b89f7a1..9403c45a7 100644 --- a/primitives/src/validator.rs +++ b/primitives/src/validator.rs @@ -27,7 +27,7 @@ impl ValidatorId { } pub fn inner(&self) -> &[u8; 20] { - &self.0.as_bytes() + self.0.as_bytes() } } @@ -53,7 +53,7 @@ impl From<&[u8; 20]> for ValidatorId { impl AsRef<[u8]> for ValidatorId { fn as_ref(&self) -> &[u8] { - &self.0.as_ref() + self.0.as_ref() } } diff --git a/sentry/src/access.rs b/sentry/src/access.rs index a3fca1578..518fce44c 100644 --- a/sentry/src/access.rs +++ b/sentry/src/access.rs @@ -40,7 +40,7 @@ pub async fn check_access( let auth_uid = auth.map(|auth| auth.uid.to_string()).unwrap_or_default(); // Rules for events - if forbidden_country(&session) || forbidden_referrer(&session) { + if forbidden_country(session) || forbidden_referrer(session) { return Err(Error::ForbiddenReferrer); } @@ -79,11 +79,11 @@ pub async fn check_access( let apply_all_rules = try_join_all(rules.iter().map(|rule| { apply_rule( redis.clone(), - &rule, - &events, - &campaign, + rule, + events, + campaign, &auth_uid, - &session, + session, ) })); diff --git a/sentry/src/analytics_recorder.rs b/sentry/src/analytics_recorder.rs index a7e3b2cfc..3d2d07d3c 100644 --- a/sentry/src/analytics_recorder.rs +++ b/sentry/src/analytics_recorder.rs @@ -114,7 +114,6 @@ pub async fn record( .ignore(); } } - _ => {} }); if let Err(err) = db.query_async::<_, Option>(&mut conn).await { diff --git a/sentry/src/db.rs b/sentry/src/db.rs index 63578cac3..336a986f5 100644 --- a/sentry/src/db.rs +++ b/sentry/src/db.rs @@ -77,7 +77,7 @@ pub async fn setup_migrations(environment: &str) { .database_password(POSTGRES_PASSWORD.as_str()) .database_host(POSTGRES_HOST.as_str()) .database_port(*POSTGRES_PORT) - .database_name(&POSTGRES_DB.as_ref().unwrap_or(&POSTGRES_USER)) + .database_name(POSTGRES_DB.as_ref().unwrap_or(&POSTGRES_USER)) .build() .expect("Should build migration settings"); diff --git a/sentry/src/lib.rs b/sentry/src/lib.rs index 713aa80e7..b8313ed5d 100644 --- a/sentry/src/lib.rs +++ b/sentry/src/lib.rs @@ -125,63 +125,63 @@ impl Application { None => Default::default(), }; - let req = match Authenticate.call(req, &self).await { + let req = match Authenticate.call(req, self).await { Ok(req) => req, Err(error) => return map_response_error(error), }; let mut response = match (req.uri().path(), req.method()) { - ("/cfg", &Method::GET) => config(req, &self).await, - ("/channel", &Method::POST) => create_channel(req, &self).await, - ("/channel/list", &Method::GET) => channel_list(req, &self).await, - ("/channel/validate", &Method::POST) => channel_validate(req, &self).await, + ("/cfg", &Method::GET) => config(req, self).await, + ("/channel", &Method::POST) => create_channel(req, self).await, + ("/channel/list", &Method::GET) => channel_list(req, self).await, + ("/channel/validate", &Method::POST) => channel_validate(req, self).await, - ("/analytics", &Method::GET) => analytics(req, &self).await, + ("/analytics", &Method::GET) => analytics(req, self).await, ("/analytics/advanced", &Method::GET) => { - let req = match AuthRequired.call(req, &self).await { + let req = match AuthRequired.call(req, self).await { Ok(req) => req, Err(error) => { return map_response_error(error); } }; - advanced_analytics(req, &self).await + advanced_analytics(req, self).await } ("/analytics/for-advertiser", &Method::GET) => { - let req = match AuthRequired.call(req, &self).await { + let req = match AuthRequired.call(req, self).await { Ok(req) => req, Err(error) => { return map_response_error(error); } }; - advertiser_analytics(req, &self).await + advertiser_analytics(req, self).await } ("/analytics/for-publisher", &Method::GET) => { - let req = match AuthRequired.call(req, &self).await { + let req = match AuthRequired.call(req, self).await { Ok(req) => req, Err(error) => { return map_response_error(error); } }; - publisher_analytics(req, &self).await + publisher_analytics(req, self).await } // For creating campaigns ("/v5/campaign", &Method::POST) => { - let req = match AuthRequired.call(req, &self).await { + let req = match AuthRequired.call(req, self).await { Ok(req) => req, Err(error) => { return map_response_error(error); } }; - create_campaign(req, &self).await + create_campaign(req, self).await } - (route, _) if route.starts_with("/analytics") => analytics_router(req, &self).await, + (route, _) if route.starts_with("/analytics") => analytics_router(req, self).await, // This is important because it prevents us from doing // expensive regex matching for routes without /channel - (path, _) if path.starts_with("/channel") => channels_router(req, &self).await, - (path, _) if path.starts_with("/v5/campaign") => campaigns_router(req, &self).await, + (path, _) if path.starts_with("/channel") => channels_router(req, self).await, + (path, _) if path.starts_with("/v5/campaign") => campaigns_router(req, self).await, _ => Err(ResponseError::NotFound), } .unwrap_or_else(map_response_error); @@ -198,12 +198,12 @@ async fn campaigns_router( ) -> Result, ResponseError> { let (path, method) = (req.uri().path(), req.method()); - if let (Some(_caps), &Method::POST) = (CAMPAIGN_UPDATE_BY_ID.captures(&path), method) { + if let (Some(_caps), &Method::POST) = (CAMPAIGN_UPDATE_BY_ID.captures(path), method) { let req = CampaignLoad.call(req, app).await?; update_campaign::handle_route(req, app).await } else if let (Some(caps), &Method::POST) = - (INSERT_EVENTS_BY_CAMPAIGN_ID.captures(&path), method) + (INSERT_EVENTS_BY_CAMPAIGN_ID.captures(path), method) { let param = RouteParams(vec![caps .get(1) @@ -214,7 +214,7 @@ async fn campaigns_router( campaign::insert_events::handle_route(req, app).await } else if let (Some(_caps), &Method::POST) = - (CLOSE_CAMPAIGN_BY_CAMPAIGN_ID.captures(&path), method) + (CLOSE_CAMPAIGN_BY_CAMPAIGN_ID.captures(path), method) { // TODO AIP#61: Close campaign: // - only by creator @@ -308,16 +308,6 @@ async fn channels_router( ) -> Result, ResponseError> { let (path, method) = (req.uri().path().to_owned(), req.method()); - // regex matching for routes with params - /* if let (Some(caps), &Method::POST) = (CREATE_EVENTS_BY_CHANNEL_ID.captures(&path), method) { - let param = RouteParams(vec![caps - .get(1) - .map_or("".to_string(), |m| m.as_str().to_string())]); - - req.extensions_mut().insert(param); - - insert_events(req, app).await - } else */ if let (Some(caps), &Method::GET) = (LAST_APPROVED_BY_CHANNEL_ID.captures(&path), method) { let param = RouteParams(vec![caps .get(1) @@ -353,7 +343,7 @@ async fn channels_router( } }; - list_validator_messages(req, &app, &extract_params.0, &extract_params.1).await + list_validator_messages(req, app, &extract_params.0, &extract_params.1).await } else if let (Some(caps), &Method::POST) = (CHANNEL_VALIDATOR_MESSAGES.captures(&path), method) { let param = RouteParams(vec![caps @@ -368,7 +358,7 @@ async fn channels_router( .apply(req, app) .await?; - create_validator_messages(req, &app).await + create_validator_messages(req, app).await } else if let (Some(caps), &Method::GET) = (CHANNEL_EVENTS_AGGREGATES.captures(&path), method) { req = AuthRequired.call(req, app).await?; diff --git a/sentry/src/payout.rs b/sentry/src/payout.rs index 9c95751a5..7226b77b9 100644 --- a/sentry/src/payout.rs +++ b/sentry/src/payout.rs @@ -35,7 +35,7 @@ pub fn get_payout( } => { let targeting_rules = campaign.targeting_rules.clone(); - let pricing = get_pricing_bounds(&campaign, &event_type); + let pricing = get_pricing_bounds(campaign, &event_type); if targeting_rules.is_empty() { Ok(Some((*publisher, pricing.min))) diff --git a/sentry/src/routes/analytics.rs b/sentry/src/routes/analytics.rs index c0c3de4ed..cfb7a72a6 100644 --- a/sentry/src/routes/analytics.rs +++ b/sentry/src/routes/analytics.rs @@ -80,7 +80,7 @@ pub async fn process_analytics( app: &Application, analytics_type: AnalyticsType, ) -> Result { - let query = serde_urlencoded::from_str::(&req.uri().query().unwrap_or(""))?; + let query = serde_urlencoded::from_str::(req.uri().query().unwrap_or(""))?; query .is_valid() .map_err(|e| ResponseError::BadRequest(e.to_string()))?; @@ -113,7 +113,7 @@ pub async fn advanced_analytics( let auth = req.extensions().get::().expect("auth is required"); let advertiser_channels = advertiser_channel_ids(&app.pool, &auth.uid).await?; - let query = serde_urlencoded::from_str::(&req.uri().query().unwrap_or(""))?; + let query = serde_urlencoded::from_str::(req.uri().query().unwrap_or(""))?; let response = get_advanced_reports( &app.redis, diff --git a/sentry/src/routes/campaign.rs b/sentry/src/routes/campaign.rs index 955e6ab6c..33a307f15 100644 --- a/sentry/src/routes/campaign.rs +++ b/sentry/src/routes/campaign.rs @@ -85,13 +85,13 @@ pub async fn create_campaign( let latest_spendable = fetch_spendable(app.pool.clone(), &campaign.creator, &campaign.channel.id()) .await? - .ok_or(ResponseError::BadRequest( + .ok_or_else(|| ResponseError::BadRequest( "No spendable amount found for the Campaign creator".to_string(), ))?; // Gets the latest Spendable for this (spender, channelId) pair let total_deposited = latest_spendable.deposit.total; - total_deposited.checked_sub(&accounting_spent).ok_or( + total_deposited.checked_sub(&accounting_spent).ok_or_else(|| ResponseError::FailedValidation("No more budget remaining".to_string()), )? }; @@ -113,7 +113,9 @@ pub async fn create_campaign( .checked_add(&campaign.budget) .ok_or(Error::Calculation)?; - if !(campaigns_remaining_sum <= total_remaining) || campaign.budget > total_remaining { + // `new_campaigns_remaining <= total_remaining` should be upheld + // `campaign.budget < total_remaining` should also be upheld! + if campaigns_remaining_sum > total_remaining || campaign.budget > total_remaining { return Err(ResponseError::BadRequest( "Not enough deposit left for the new campaign's budget".to_string(), )); @@ -232,7 +234,7 @@ pub mod update_campaign { let total_remaining = total_deposited .checked_sub(&accounting_spent) .ok_or(Error::Calculation)?; - let channel_campaigns = get_campaigns_by_channel(&pool, &campaign.channel.id()) + let channel_campaigns = get_campaigns_by_channel(pool, &campaign.channel.id()) .await? .iter() .map(|c| c.id) @@ -257,7 +259,8 @@ pub mod update_campaign { } .ok_or(Error::Calculation)?; - if !(new_campaigns_remaining <= total_remaining) { + // `new_campaigns_remaining <= total_remaining` should be upheld + if new_campaigns_remaining > total_remaining { return Err(Error::NewBudget( "Not enough deposit left for the campaign's new budget".to_string(), )); @@ -280,7 +283,7 @@ pub mod update_campaign { } let modified_campaign = modify_campaign.apply(campaign); - update_campaign(&pool, &modified_campaign).await?; + update_campaign(pool, &modified_campaign).await?; Ok(modified_campaign) } @@ -312,7 +315,7 @@ pub mod update_campaign { .get_remaining_opt(campaign.id) .await? .map(|remaining| UnifiedNum::from(max(0, remaining).unsigned_abs())) - .ok_or(Error::FailedUpdate( + .ok_or_else(|| Error::FailedUpdate( "No remaining entry for campaign".to_string(), ))?; @@ -349,7 +352,7 @@ pub mod update_campaign { .ok_or(Error::Calculation)?; // old remaining > new remaining let decrease_by = old_remaining - .checked_sub(&new_remaining) + .checked_sub(new_remaining) .ok_or(Error::Calculation)?; DeltaBudget::Decrease(decrease_by) @@ -457,7 +460,7 @@ pub mod insert_events { session, auth, &app.config.ip_rate_limit, - &campaign, + campaign, &events, ) .await @@ -489,7 +492,7 @@ pub mod insert_events { Some((earner, payout)) => spend_for_event( &app.pool, app.redis.clone(), - &campaign, + campaign, earner, leader, follower, @@ -520,9 +523,9 @@ pub mod insert_events { ) -> Result<(), Error> { // distribute fees let leader_fee = - calculate_fee((earner, amount), &leader).map_err(EventError::FeeCalculation)?; + calculate_fee((earner, amount), leader).map_err(EventError::FeeCalculation)?; let follower_fee = - calculate_fee((earner, amount), &follower).map_err(EventError::FeeCalculation)?; + calculate_fee((earner, amount), follower).map_err(EventError::FeeCalculation)?; // First update redis `campaignRemaining:{CampaignId}` key let spending = [amount, leader_fee, follower_fee] diff --git a/sentry/src/routes/channel.rs b/sentry/src/routes/channel.rs index ca337b6f7..f15c717b7 100644 --- a/sentry/src/routes/channel.rs +++ b/sentry/src/routes/channel.rs @@ -82,7 +82,7 @@ pub async fn channel_list( req: Request, app: &Application, ) -> Result, ResponseError> { - let query = serde_urlencoded::from_str::(&req.uri().query().unwrap_or(""))?; + let query = serde_urlencoded::from_str::(req.uri().query().unwrap_or(""))?; let skip = query .page .checked_mul(app.config.channels_find_limit.into()) @@ -148,7 +148,7 @@ pub async fn last_approved( return Ok(default_response); } - let query = serde_urlencoded::from_str::(&req.uri().query().unwrap_or(""))?; + let query = serde_urlencoded::from_str::(req.uri().query().unwrap_or(""))?; let validators = channel.spec.validators; let channel_id = channel.id; let heartbeats = if query.with_heartbeat.is_some() { @@ -212,7 +212,7 @@ pub async fn create_validator_messages( None => Err(ResponseError::Unauthorized), _ => { try_join_all(messages.iter().map(|message| { - insert_validator_messages(&app.pool, &channel, &session.uid, &message) + insert_validator_messages(&app.pool, &channel, &session.uid, message) })) .await?; diff --git a/sentry/src/routes/validator_message.rs b/sentry/src/routes/validator_message.rs index 3c2fc375d..e4c98d71b 100644 --- a/sentry/src/routes/validator_message.rs +++ b/sentry/src/routes/validator_message.rs @@ -46,7 +46,7 @@ pub async fn list_validator_messages( message_types: &[String], ) -> Result, ResponseError> { let query = - serde_urlencoded::from_str::(&req.uri().query().unwrap_or(""))?; + serde_urlencoded::from_str::(req.uri().query().unwrap_or(""))?; let channel = req .extensions() diff --git a/validator_worker/src/core/events.rs b/validator_worker/src/core/events.rs index 211a99527..6912f1e17 100644 --- a/validator_worker/src/core/events.rs +++ b/validator_worker/src/core/events.rs @@ -59,9 +59,9 @@ fn _merge_payouts_into_balances<'a, T: Iterator>( let new_balance = new_balances.entry(*acc).or_insert_with(|| 0.into()); - *new_balance += &to_add; + *new_balance += to_add; - remaining = remaining.checked_sub(&to_add).ok_or_else(|| { + remaining = remaining.checked_sub(to_add).ok_or_else(|| { DomainError::RuleViolation("remaining must never be negative".to_string()) })?; } diff --git a/validator_worker/src/follower.rs b/validator_worker/src/follower.rs index 7d200deeb..2e480a732 100644 --- a/validator_worker/src/follower.rs +++ b/validator_worker/src/follower.rs @@ -77,7 +77,7 @@ pub async fn tick( _ => false, }; - let producer_tick = producer::tick(&iface).await?; + let producer_tick = producer::tick(iface).await?; let empty_balances = BalancesMap::default(); let balances = match &producer_tick { producer::TickStatus::Sent { new_accounting, .. } => &new_accounting.balances, @@ -85,13 +85,13 @@ pub async fn tick( producer::TickStatus::EmptyBalances => &empty_balances, }; let approve_state_result = if let (Some(new_state), false) = (new_msg, latest_is_responded_to) { - on_new_state(&iface, &balances, &new_state).await? + on_new_state(iface, balances, &new_state).await? } else { ApproveStateResult::Sent(None) }; Ok(TickStatus { - heartbeat: heartbeat(&iface, &balances).await?, + heartbeat: heartbeat(iface, balances).await?, approve_state: approve_state_result, producer_tick, }) @@ -104,8 +104,8 @@ async fn on_new_state<'a, A: Adapter + 'static>( ) -> Result, Box> { let proposed_balances = new_state.balances.clone(); let proposed_state_root = new_state.state_root.clone(); - if proposed_state_root != hex::encode(get_state_root_hash(&iface, &proposed_balances)?) { - return Ok(on_error(&iface, &new_state, InvalidNewState::RootHash).await); + if proposed_state_root != hex::encode(get_state_root_hash(iface, &proposed_balances)?) { + return Ok(on_error(iface, new_state, InvalidNewState::RootHash).await); } if !iface.adapter.verify( @@ -113,7 +113,7 @@ async fn on_new_state<'a, A: Adapter + 'static>( &proposed_state_root, &new_state.signature, )? { - return Ok(on_error(&iface, &new_state, InvalidNewState::Signature).await); + return Ok(on_error(iface, new_state, InvalidNewState::Signature).await); } let last_approve_response = iface.get_last_approved().await?; @@ -126,12 +126,12 @@ async fn on_new_state<'a, A: Adapter + 'static>( }; if !is_valid_transition(&iface.channel, &prev_balances, &proposed_balances) { - return Ok(on_error(&iface, &new_state, InvalidNewState::Transition).await); + return Ok(on_error(iface, new_state, InvalidNewState::Transition).await); } let health = get_health(&iface.channel, balances, &proposed_balances); if health < u64::from(iface.config.health_unsignable_promilles) { - return Ok(on_error(&iface, &new_state, InvalidNewState::Health).await); + return Ok(on_error(iface, new_state, InvalidNewState::Health).await); } let signature = iface.adapter.sign(&new_state.state_root)?; diff --git a/validator_worker/src/heartbeat.rs b/validator_worker/src/heartbeat.rs index 35af963dc..4e2016558 100644 --- a/validator_worker/src/heartbeat.rs +++ b/validator_worker/src/heartbeat.rs @@ -55,7 +55,7 @@ pub async fn heartbeat( }); if should_send { - Ok(Some(send_heartbeat(&iface).await?)) + Ok(Some(send_heartbeat(iface).await?)) } else { Ok(None) } diff --git a/validator_worker/src/leader.rs b/validator_worker/src/leader.rs index 792764b8c..7b582b7d4 100644 --- a/validator_worker/src/leader.rs +++ b/validator_worker/src/leader.rs @@ -21,11 +21,11 @@ pub struct TickStatus { pub async fn tick( iface: &SentryApi, ) -> Result, Box> { - let producer_tick = producer::tick(&iface).await?; + let producer_tick = producer::tick(iface).await?; let empty_balances = BalancesMap::default(); let (balances, new_state) = match &producer_tick { producer::TickStatus::Sent { new_accounting, .. } => { - let new_state = on_new_accounting(&iface, new_accounting).await?; + let new_state = on_new_accounting(iface, new_accounting).await?; (&new_accounting.balances, Some(new_state)) } producer::TickStatus::NoNewEventAggr(balances) => (balances, None), @@ -33,7 +33,7 @@ pub async fn tick( }; Ok(TickStatus { - heartbeat: heartbeat(&iface, &balances).await?, + heartbeat: heartbeat(iface, balances).await?, new_state, producer_tick, }) @@ -43,7 +43,7 @@ async fn on_new_accounting( iface: &SentryApi, new_accounting: &Accounting, ) -> Result>, Box> { - let state_root_raw = get_state_root_hash(&iface, &new_accounting.balances)?; + let state_root_raw = get_state_root_hash(iface, &new_accounting.balances)?; let state_root = hex::encode(state_root_raw); let signature = iface.adapter.sign(&state_root)?; diff --git a/validator_worker/src/main.rs b/validator_worker/src/main.rs index 3cdd6b928..acf42fa37 100644 --- a/validator_worker/src/main.rs +++ b/validator_worker/src/main.rs @@ -116,10 +116,10 @@ fn main() -> Result<(), Box> { match adapter { AdapterTypes::EthereumAdapter(ethadapter) => { - run(is_single_tick, &sentry_url, &config, *ethadapter, &logger) + run(is_single_tick, sentry_url, &config, *ethadapter, &logger) } AdapterTypes::DummyAdapter(dummyadapter) => { - run(is_single_tick, &sentry_url, &config, *dummyadapter, &logger) + run(is_single_tick, sentry_url, &config, *dummyadapter, &logger) } } } @@ -144,9 +144,9 @@ fn run( let rt = Runtime::new()?; if is_single_tick { - rt.block_on(iterate_channels(args, &logger)); + rt.block_on(iterate_channels(args, logger)); } else { - rt.block_on(infinite(args, &logger)); + rt.block_on(infinite(args, logger)); } Ok(()) @@ -161,7 +161,7 @@ async fn infinite(args: Args, logger: &Logger) { } async fn iterate_channels(args: Args, logger: &Logger) { - let result = all_channels(&args.sentry_url, args.adapter.whoami()).await; + let result = all_channels(&args.sentry_url, &args.adapter.whoami()).await; let channels = match result { Ok(channels) => channels, @@ -197,10 +197,10 @@ async fn validator_tick( config: &Config, logger: &Logger, ) -> Result<(ChannelId, Box), ValidatorWorkerError> { - let whoami = *adapter.whoami(); + let whoami = adapter.whoami(); // Cloning the `Logger` is cheap, see documentation for more info - let sentry = SentryApi::init(adapter, channel.clone(), &config, logger.clone()) + let sentry = SentryApi::init(adapter, channel.clone(), config, logger.clone()) .map_err(ValidatorWorkerError::SentryApi)?; let duration = Duration::from_millis(config.validator_tick_timeout as u64); diff --git a/validator_worker/src/sentry_interface.rs b/validator_worker/src/sentry_interface.rs index bf6be3fc1..ca8e8b61f 100644 --- a/validator_worker/src/sentry_interface.rs +++ b/validator_worker/src/sentry_interface.rs @@ -87,7 +87,7 @@ impl SentryApi { .map_err(Error::BuildingClient)?; // validate that we are to validate the channel - match channel.spec.validators.find(adapter.whoami()) { + match channel.spec.validators.find(&adapter.whoami()) { Some(ref spec_validator) => { let validator = spec_validator.validator(); let validator_url = format!("{}/channel/{}", validator.url, channel.id); @@ -117,7 +117,7 @@ impl SentryApi { None => Err(Error::MissingWhoamiInChannelValidators { channel: channel.id, validators: channel.spec.validators.iter().map(|v| v.id).collect(), - whoami: *adapter.whoami(), + whoami: adapter.whoami(), }), } } @@ -129,9 +129,9 @@ impl SentryApi { join_all(self.propagate_to.iter().map(|(validator, auth_token)| { propagate_to::( &self.channel.id, - &auth_token, + auth_token, &self.client, - &validator, + validator, messages, ) })) @@ -165,7 +165,7 @@ impl SentryApi { &self, message_types: &[&str], ) -> Result, Error> { - self.get_latest_msg(self.adapter.whoami(), message_types) + self.get_latest_msg(&self.adapter.whoami(), message_types) .await } @@ -196,7 +196,7 @@ impl SentryApi { ) -> Result> { let auth_token = self .adapter - .get_auth(self.adapter.whoami()) + .get_auth(&self.adapter.whoami()) .map_err(Error::ValidatorAuthentication)?; let url = format!( @@ -250,13 +250,13 @@ pub async fn all_channels( whoami: &ValidatorId, ) -> Result, reqwest::Error> { let url = sentry_url.to_owned(); - let first_page = fetch_page(url.clone(), 0, &whoami).await?; + let first_page = fetch_page(url.clone(), 0, whoami).await?; if first_page.total_pages < 2 { Ok(first_page.channels) } else { let all: Vec = - try_join_all((1..first_page.total_pages).map(|i| fetch_page(url.clone(), i, &whoami))) + try_join_all((1..first_page.total_pages).map(|i| fetch_page(url.clone(), i, whoami))) .await?; let result_all: Vec = std::iter::once(first_page) From 05ac2b0fbd50ed22a6599c4ca84a008a62212eea Mon Sep 17 00:00:00 2001 From: Lachezar Lechev Date: Tue, 10 Aug 2021 09:10:35 +0300 Subject: [PATCH 11/12] rustfmt --- sentry/src/access.rs | 15 +++------ sentry/src/middleware/campaign.rs | 5 +-- sentry/src/routes/campaign.rs | 53 ++++++++++++++++--------------- 3 files changed, 33 insertions(+), 40 deletions(-) diff --git a/sentry/src/access.rs b/sentry/src/access.rs index 518fce44c..d3435e88d 100644 --- a/sentry/src/access.rs +++ b/sentry/src/access.rs @@ -76,16 +76,11 @@ pub async fn check_access( return Ok(()); } - let apply_all_rules = try_join_all(rules.iter().map(|rule| { - apply_rule( - redis.clone(), - rule, - events, - campaign, - &auth_uid, - session, - ) - })); + let apply_all_rules = try_join_all( + rules + .iter() + .map(|rule| apply_rule(redis.clone(), rule, events, campaign, &auth_uid, session)), + ); apply_all_rules.await.map_err(Error::RulesError).map(|_| ()) } diff --git a/sentry/src/middleware/campaign.rs b/sentry/src/middleware/campaign.rs index 024dc1400..2da58c1b8 100644 --- a/sentry/src/middleware/campaign.rs +++ b/sentry/src/middleware/campaign.rs @@ -37,10 +37,7 @@ impl Middleware for CampaignLoad { #[cfg(test)] mod test { - use primitives::{ - util::tests::prep_db::{DUMMY_CAMPAIGN, IDS}, - Campaign, - }; + use primitives::{util::tests::prep_db::DUMMY_CAMPAIGN, Campaign}; use crate::{db::insert_campaign, test_util::setup_dummy_app}; diff --git a/sentry/src/routes/campaign.rs b/sentry/src/routes/campaign.rs index 33a307f15..313c991ef 100644 --- a/sentry/src/routes/campaign.rs +++ b/sentry/src/routes/campaign.rs @@ -70,31 +70,34 @@ pub async fn create_campaign( let error_response = ResponseError::BadRequest("err occurred; please try again later".to_string()); - let total_remaining = - { - let accounting_spent = get_accounting( - app.pool.clone(), - campaign.channel.id(), - campaign.creator, - Side::Spender, - ) - .await? - .map(|accounting| accounting.amount) - .unwrap_or_default(); + let total_remaining = { + let accounting_spent = get_accounting( + app.pool.clone(), + campaign.channel.id(), + campaign.creator, + Side::Spender, + ) + .await? + .map(|accounting| accounting.amount) + .unwrap_or_default(); - let latest_spendable = - fetch_spendable(app.pool.clone(), &campaign.creator, &campaign.channel.id()) - .await? - .ok_or_else(|| ResponseError::BadRequest( + let latest_spendable = + fetch_spendable(app.pool.clone(), &campaign.creator, &campaign.channel.id()) + .await? + .ok_or_else(|| { + ResponseError::BadRequest( "No spendable amount found for the Campaign creator".to_string(), - ))?; - // Gets the latest Spendable for this (spender, channelId) pair - let total_deposited = latest_spendable.deposit.total; - - total_deposited.checked_sub(&accounting_spent).ok_or_else(|| - ResponseError::FailedValidation("No more budget remaining".to_string()), - )? - }; + ) + })?; + // Gets the latest Spendable for this (spender, channelId) pair + let total_deposited = latest_spendable.deposit.total; + + total_deposited + .checked_sub(&accounting_spent) + .ok_or_else(|| { + ResponseError::FailedValidation("No more budget remaining".to_string()) + })? + }; let channel_campaigns = get_campaigns_by_channel(&app.pool, &campaign.channel.id()) .await? @@ -315,9 +318,7 @@ pub mod update_campaign { .get_remaining_opt(campaign.id) .await? .map(|remaining| UnifiedNum::from(max(0, remaining).unsigned_abs())) - .ok_or_else(|| Error::FailedUpdate( - "No remaining entry for campaign".to_string(), - ))?; + .ok_or_else(|| Error::FailedUpdate("No remaining entry for campaign".to_string()))?; let campaign_spent = campaign .budget From 598ab52c65be4dd2110ef88f430d731d4babfd9c Mon Sep 17 00:00:00 2001 From: Lachezar Lechev Date: Tue, 10 Aug 2021 09:27:59 +0300 Subject: [PATCH 12/12] sentry - routes - camiang insert events - use CampaignRemaining --- sentry/src/routes/campaign.rs | 87 ++++++++++++----------------------- 1 file changed, 30 insertions(+), 57 deletions(-) diff --git a/sentry/src/routes/campaign.rs b/sentry/src/routes/campaign.rs index 313c991ef..4e4b3f653 100644 --- a/sentry/src/routes/campaign.rs +++ b/sentry/src/routes/campaign.rs @@ -370,7 +370,7 @@ pub mod insert_events { use crate::{ access::{self, check_access}, - db::{accounting::spend_amount, DbPool, PoolError, RedisError}, + db::{accounting::spend_amount, CampaignRemaining, DbPool, PoolError, RedisError}, payout::get_payout, spender::fee::calculate_fee, Application, Auth, ResponseError, Session, @@ -384,12 +384,8 @@ pub mod insert_events { }, Address, Campaign, CampaignId, DomainError, UnifiedNum, ValidatorDesc, }; - use redis::aio::MultiplexedConnection; use thiserror::Error; - // TODO AIP#61: Use the Campaign Modify const here - pub const CAMPAIGN_REMAINING_KEY: &str = "campaignRemaining"; - #[derive(Debug, Error)] pub enum Error { #[error(transparent)] @@ -492,7 +488,7 @@ pub mod insert_events { match payout { Some((earner, payout)) => spend_for_event( &app.pool, - app.redis.clone(), + &app.campaign_remaining, campaign, earner, leader, @@ -515,7 +511,7 @@ pub mod insert_events { pub async fn spend_for_event( pool: &DbPool, - mut redis: MultiplexedConnection, + campaign_remaining: &CampaignRemaining, campaign: &Campaign, earner: Address, leader: &ValidatorDesc, @@ -534,14 +530,16 @@ pub mod insert_events { .sum::>() .ok_or(EventError::EventPayoutOverflow)?; - if !has_enough_remaining_budget(&mut redis, campaign.id, spending).await? { + if !has_enough_remaining_budget(campaign_remaining, campaign.id, spending).await? { return Err(Error::Event( EventError::CampaignRemainingNotEnoughForPayout, )); } // The event payout decreases the remaining budget for the Campaign - let remaining = decrease_remaining_budget(&mut redis, campaign.id, spending).await?; + let remaining = campaign_remaining + .decrease_by(campaign.id, spending) + .await?; // Update the Accounting records accordingly let channel_id = campaign.channel.id(); @@ -563,40 +561,22 @@ pub mod insert_events { } async fn has_enough_remaining_budget( - redis: &mut MultiplexedConnection, + campaign_remaining: &CampaignRemaining, campaign: CampaignId, amount: UnifiedNum, ) -> Result { - let key = format!("{}:{}", CAMPAIGN_REMAINING_KEY, campaign); - - let remaining = redis::cmd("GET") - .arg(&key) - .query_async::<_, Option>(redis) + let remaining = campaign_remaining + .get_remaining_opt(campaign) .await? .unwrap_or_default(); Ok(remaining > 0 && remaining.unsigned_abs() > amount.to_u64()) } - async fn decrease_remaining_budget( - redis: &mut MultiplexedConnection, - campaign: CampaignId, - amount: UnifiedNum, - ) -> Result { - let key = format!("{}:{}", CAMPAIGN_REMAINING_KEY, campaign); - - let remaining = redis::cmd("DECRBY") - .arg(&key) - .arg(amount.to_u64()) - .query_async::<_, i64>(redis) - .await?; - - Ok(remaining) - } - #[cfg(test)] mod test { use primitives::util::tests::prep_db::{ADDRESSES, DUMMY_CAMPAIGN}; + use redis::aio::MultiplexedConnection; use crate::db::{ redis_pool::TESTS_POOL, @@ -605,27 +585,13 @@ pub mod insert_events { use super::*; - /// Helper function to get the Campaign Remaining budget in Redis for the tests - async fn get_campaign_remaining( - redis: &mut MultiplexedConnection, - campaign: CampaignId, - ) -> Option { - let key = format!("{}:{}", CAMPAIGN_REMAINING_KEY, campaign); - - redis::cmd("GET") - .arg(&key) - .query_async(redis) - .await - .expect("Should set Campaign remaining key") - } - /// Helper function to set the Campaign Remaining budget in Redis for the tests async fn set_campaign_remaining( redis: &mut MultiplexedConnection, campaign: CampaignId, remaining: i64, ) { - let key = format!("{}:{}", CAMPAIGN_REMAINING_KEY, campaign); + let key = CampaignRemaining::get_key(campaign); redis::cmd("SET") .arg(&key) @@ -638,12 +604,14 @@ pub mod insert_events { #[tokio::test] async fn test_has_enough_remaining_budget() { let mut redis = TESTS_POOL.get().await.expect("Should get redis connection"); + let campaign_remaining = CampaignRemaining::new(redis.connection.clone()); let campaign = DUMMY_CAMPAIGN.id; let amount = UnifiedNum::from(10_000); - let no_remaining_budget_set = has_enough_remaining_budget(&mut redis, campaign, amount) - .await - .expect("Should check campaign remaining"); + let no_remaining_budget_set = + has_enough_remaining_budget(&campaign_remaining, campaign, amount) + .await + .expect("Should check campaign remaining"); assert!( !no_remaining_budget_set, "No remaining budget set, should return false" @@ -652,7 +620,7 @@ pub mod insert_events { set_campaign_remaining(&mut redis, campaign, 9_000).await; let not_enough_remaining_budget = - has_enough_remaining_budget(&mut redis, campaign, amount) + has_enough_remaining_budget(&campaign_remaining, campaign, amount) .await .expect("Should check campaign remaining"); assert!( @@ -663,7 +631,7 @@ pub mod insert_events { set_campaign_remaining(&mut redis, campaign, 11_000).await; let has_enough_remaining_budget = - has_enough_remaining_budget(&mut redis, campaign, amount) + has_enough_remaining_budget(&campaign_remaining, campaign, amount) .await .expect("Should check campaign remaining"); @@ -677,11 +645,13 @@ pub mod insert_events { async fn test_decreasing_remaining_budget() { let mut redis = TESTS_POOL.get().await.expect("Should get redis connection"); let campaign = DUMMY_CAMPAIGN.id; + let campaign_remaining = CampaignRemaining::new(redis.connection.clone()); let amount = UnifiedNum::from(5_000); set_campaign_remaining(&mut redis, campaign, 9_000).await; - let remaining = decrease_remaining_budget(&mut redis, campaign, amount) + let remaining = campaign_remaining + .decrease_by(campaign, amount) .await .expect("Should decrease campaign remaining"); assert_eq!( @@ -689,7 +659,8 @@ pub mod insert_events { "Should decrease remaining budget with amount and be positive" ); - let remaining = decrease_remaining_budget(&mut redis, campaign, amount) + let remaining = campaign_remaining + .decrease_by(campaign, amount) .await .expect("Should decrease campaign remaining"); assert_eq!( @@ -702,6 +673,7 @@ pub mod insert_events { async fn test_spending_for_events_with_enough_remaining_budget() { let mut redis = TESTS_POOL.get().await.expect("Should get redis connection"); let database = DATABASE_POOL.get().await.expect("Should get a DB pool"); + let campaign_remaining = CampaignRemaining::new(redis.connection.clone()); setup_test_migrations(database.pool.clone()) .await @@ -719,7 +691,7 @@ pub mod insert_events { { let spend_event = spend_for_event( &database.pool, - redis.connection.clone(), + &campaign_remaining, &campaign, publisher, leader, @@ -745,7 +717,7 @@ pub mod insert_events { let spend_event = spend_for_event( &database.pool, - redis.connection.clone(), + &campaign_remaining, &campaign, publisher, leader, @@ -765,8 +737,9 @@ pub mod insert_events { // Follower fee: 100 // Follower payout: 300 * 100 / 1000 = 30 assert_eq!( - 10_640_i64, - get_campaign_remaining(&mut redis.connection, campaign.id) + Some(10_640_i64), + campaign_remaining + .get_remaining_opt(campaign.id) .await .expect("Should have key") )