Skip to content

Issue #415 Create/Modify Campaigns - Redis and tests #418

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Aug 9, 2021
22 changes: 21 additions & 1 deletion primitives/src/sentry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,26 @@ pub mod campaign_create {
}
}

/// This implementation helps with test setup
/// **NOTE:** It erases the CampaignId, since the creation of the campaign gives it's CampaignId
impl From<Campaign> for CreateCampaign {
fn from(campaign: Campaign) -> Self {
Self {
channel: campaign.channel,
creator: campaign.creator,
budget: campaign.budget,
validators: campaign.validators,
title: campaign.title,
pricing_bounds: campaign.pricing_bounds,
event_submission: campaign.event_submission,
ad_units: campaign.ad_units,
targeting_rules: campaign.targeting_rules,
created: campaign.created,
active: campaign.active,
}
}
}

// All editable fields stored in one place, used for checking when a budget is changed
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ModifyCampaign {
Expand Down Expand Up @@ -403,7 +423,7 @@ pub mod campaign_create {
if let Some(new_pricing_bounds) = self.pricing_bounds {
campaign.pricing_bounds = Some(new_pricing_bounds);
}

if let Some(new_event_submission) = self.event_submission {
campaign.event_submission = Some(new_event_submission);
}
Expand Down
10 changes: 10 additions & 0 deletions primitives/src/sentry/accounting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ impl<S: BalancesState> Balances<S> {

Ok(())
}

/// Adds the spender to the Balances with `UnifiedNum::from(0)` if he does not exist
pub fn add_spender(&mut self, spender: Address) {
self.spenders.entry(spender).or_insert(UnifiedNum::from(0));
}

/// Adds the earner to the Balances with `UnifiedNum::from(0)` if he does not exist
pub fn add_earner(&mut self, earner: Address) {
self.earners.entry(earner).or_insert(UnifiedNum::from(0));
}
}

#[derive(Debug)]
Expand Down
6 changes: 6 additions & 0 deletions primitives/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ impl From<&Address> for ValidatorId {
}
}

impl From<Address> for ValidatorId {
fn from(address: Address) -> Self {
Self(address)
}
}

impl From<&[u8; 20]> for ValidatorId {
fn from(bytes: &[u8; 20]) -> Self {
Self(Address::from(bytes))
Expand Down
4 changes: 1 addition & 3 deletions sentry/src/db/accounting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ pub async fn get_accounting_spent(
Ok(row.get("spent"))
}

// TODO This is still WIP
#[allow(dead_code)]
async fn insert_accounting(
pub async fn insert_accounting(
pool: DbPool,
channel: Channel,
balances: Balances<CheckedState>,
Expand Down
283 changes: 282 additions & 1 deletion sentry/src/db/campaign.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@ use crate::db::{DbPool, PoolError};
use primitives::{Campaign, CampaignId, ChannelId};
use tokio_postgres::types::Json;

pub use campaign_remaining::CampaignRemaining;

/// ```text
/// INSERT INTO campaigns (id, channel_id, channel, creator, budget, validators, title, pricing_bounds, event_submission, ad_units, targeting_rules, created, active_from, active_to)
/// VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
/// ```
pub async fn insert_campaign(pool: &DbPool, campaign: &Campaign) -> Result<bool, PoolError> {
let client = pool.get().await?;
let ad_units = Json(campaign.ad_units.clone());
let stmt = client.prepare("INSERT INTO campaigns (id, channel_id, channel, creator, budget, validators, title, pricing_bounds, event_submission, ad_units, targeting_rules, created, active_from, active_to) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)").await?;
let stmt = client.prepare("INSERT INTO campaigns (id, channel_id, channel, creator, budget, validators, title, pricing_bounds, event_submission, ad_units, targeting_rules, created, active_from, active_to) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)").await?;
let inserted = client
.execute(
&stmt,
Expand Down Expand Up @@ -49,6 +55,10 @@ pub async fn fetch_campaign(
}

// TODO: We might need to use LIMIT to implement pagination
/// ```text
/// SELECT id, channel, creator, budget, validators, title, pricing_bounds, event_submission, ad_units, targeting_rules, created, active_from, active_to
/// FROM campaigns WHERE channel_id = $1
/// ```
pub async fn get_campaigns_by_channel(
pool: &DbPool,
channel_id: &ChannelId,
Expand Down Expand Up @@ -95,6 +105,277 @@ pub async fn update_campaign(pool: &DbPool, campaign: &Campaign) -> Result<Campa
Ok(Campaign::from(&updated_row))
}

/// struct that handles redis calls for the Campaign Remaining Budget
mod campaign_remaining {
use crate::db::RedisError;
use primitives::{CampaignId, UnifiedNum};
use redis::aio::MultiplexedConnection;

#[derive(Clone)]
pub struct CampaignRemaining {
redis: MultiplexedConnection,
}

impl CampaignRemaining {
pub const CAMPAIGN_REMAINING_KEY: &'static str = "campaignRemaining";

pub fn get_key(campaign: CampaignId) -> String {
format!("{}:{}", Self::CAMPAIGN_REMAINING_KEY, campaign)
}

pub fn new(redis: MultiplexedConnection) -> Self {
Self { redis }
}

pub async fn set_initial(
&self,
campaign: CampaignId,
amount: UnifiedNum,
) -> Result<bool, RedisError> {
redis::cmd("SETNX")
.arg(&Self::get_key(campaign))
.arg(amount.to_u64())
.query_async(&mut self.redis.clone())
.await
}

pub async fn get_remaining_opt(
&self,
campaign: CampaignId,
) -> Result<Option<i64>, RedisError> {
redis::cmd("GET")
.arg(&Self::get_key(campaign))
.query_async::<_, Option<i64>>(&mut self.redis.clone())
.await
}

/// This method uses `max(0, value)` to clamp the value of a campaign, which can be negative and uses `i64`.
/// In addition, it defaults the campaign keys that were not found to `0`.
pub async fn get_multiple(
&self,
campaigns: &[CampaignId],
) -> Result<Vec<UnifiedNum>, RedisError> {
// `MGET` fails on empty keys
if campaigns.is_empty() {
return Ok(vec![]);
}

let keys: Vec<String> = campaigns
.iter()
.map(|campaign| Self::get_key(*campaign))
.collect();

let campaigns_remaining = redis::cmd("MGET")
.arg(keys)
.query_async::<_, Vec<Option<i64>>>(&mut self.redis.clone())
.await?
.into_iter()
.map(|remaining| match remaining {
Some(remaining) => UnifiedNum::from_u64(remaining.max(0).unsigned_abs()),
None => UnifiedNum::from_u64(0),
})
.collect();

Ok(campaigns_remaining)
}

pub async fn increase_by(
&self,
campaign: CampaignId,
amount: UnifiedNum,
) -> Result<i64, RedisError> {
let key = Self::get_key(campaign);
redis::cmd("INCRBY")
.arg(&key)
.arg(amount.to_u64())
.query_async(&mut self.redis.clone())
.await
}

pub async fn decrease_by(
&self,
campaign: CampaignId,
amount: UnifiedNum,
) -> Result<i64, RedisError> {
let key = Self::get_key(campaign);
redis::cmd("DECRBY")
.arg(&key)
.arg(amount.to_u64())
.query_async(&mut self.redis.clone())
.await
}
}

#[cfg(test)]
mod test {
use primitives::util::tests::prep_db::DUMMY_CAMPAIGN;

use crate::db::redis_pool::TESTS_POOL;

use super::*;

#[tokio::test]
async fn it_sets_initial_increases_and_decreases_remaining_for_campaign() {
let redis = TESTS_POOL.get().await.expect("Should return Object");

let campaign = DUMMY_CAMPAIGN.id;
let campaign_remaining = CampaignRemaining::new(redis.connection.clone());

// Get remaining on a key which was not set
{
let get_remaining = campaign_remaining
.get_remaining_opt(campaign)
.await
.expect("Should get None");

assert_eq!(None, get_remaining);
}

// Set Initial amount on that key
{
let initial_amount = UnifiedNum::from(1_000_u64);
let set_initial = campaign_remaining
.set_initial(campaign, initial_amount)
.await
.expect("Should set value in redis");
assert!(set_initial);

// get the remaining of that key, should be the initial value
let get_remaining = campaign_remaining
.get_remaining_opt(campaign)
.await
.expect("Should get None");

assert_eq!(
Some(1_000_i64),
get_remaining,
"should return the initial value that was set"
);
}

// Set initial on already existing key, should return `false`
{
let set_failing_initial = campaign_remaining
.set_initial(campaign, UnifiedNum::from(999_u64))
.await
.expect("Should set value in redis");
assert!(!set_failing_initial);
}

// Decrease by amount
{
let decrease_amount = UnifiedNum::from(64);
let decrease_by = campaign_remaining
.decrease_by(campaign, decrease_amount)
.await
.expect("Should decrease remaining amount");

assert_eq!(936_i64, decrease_by);
}

// Increase by amount
{
let increase_amount = UnifiedNum::from(1064);
let increase_by = campaign_remaining
.increase_by(campaign, increase_amount)
.await
.expect("Should increase remaining amount");

assert_eq!(2_000_i64, increase_by);
}

let get_remaining = campaign_remaining
.get_remaining_opt(campaign)
.await
.expect("Should get remaining");

assert_eq!(Some(2_000_i64), get_remaining);

// Decrease by amount > than currently set
{
let decrease_amount = UnifiedNum::from(5_000);
let decrease_by = campaign_remaining
.decrease_by(campaign, decrease_amount)
.await
.expect("Should decrease remaining amount");

assert_eq!(-3_000_i64, decrease_by);
}

// Increase the negative value without going > 0
{
let increase_amount = UnifiedNum::from(1000);
let increase_by = campaign_remaining
.increase_by(campaign, increase_amount)
.await
.expect("Should increase remaining amount");

assert_eq!(-2_000_i64, increase_by);
}
}

#[tokio::test]
async fn it_gets_multiple_campaigns_remaining() {
let redis = TESTS_POOL.get().await.expect("Should return Object");
let campaign_remaining = CampaignRemaining::new(redis.connection.clone());

// get multiple with empty campaigns slice
// `MGET` throws error on an empty keys argument
assert!(
campaign_remaining
.get_multiple(&[])
.await
.expect("Should get multiple")
.is_empty(),
"Should return an empty result"
);

let campaigns = (CampaignId::new(), CampaignId::new(), CampaignId::new());

// set initial amounts
{
assert!(campaign_remaining
.set_initial(campaigns.0, UnifiedNum::from(100))
.await
.expect("Should set value in redis"));

assert!(campaign_remaining
.set_initial(campaigns.1, UnifiedNum::from(200))
.await
.expect("Should set value in redis"));

assert!(campaign_remaining
.set_initial(campaigns.2, UnifiedNum::from(300))
.await
.expect("Should set value in redis"));
}

// set campaigns.1 to negative value, should return `0` because of `max(value, 0)`
assert_eq!(
-300_i64,
campaign_remaining
.decrease_by(campaigns.1, UnifiedNum::from(500))
.await
.expect("Should decrease remaining")
);

let multiple = campaign_remaining
.get_multiple(&[campaigns.0, campaigns.1, campaigns.2])
.await
.expect("Should get multiple");

assert_eq!(
vec![
UnifiedNum::from(100),
UnifiedNum::from(0),
UnifiedNum::from(300)
],
multiple
);
}
}
}

#[cfg(test)]
mod test {
use primitives::{
Expand Down
Loading