diff --git a/adapter/src/ethereum/client.rs b/adapter/src/ethereum/client.rs index edb6dac48..2505a722c 100644 --- a/adapter/src/ethereum/client.rs +++ b/adapter/src/ethereum/client.rs @@ -796,7 +796,7 @@ mod test { // make sure 1 TOKEN is the minimum set in Config let config_token = eth_adapter .config - .find_chain_token(channel.token) + .find_chain_of(channel.token) .expect("Channel token should be present in Config") .token; diff --git a/adapter/src/lib.rs b/adapter/src/lib.rs index 68466dbde..2a4e8baf9 100644 --- a/adapter/src/lib.rs +++ b/adapter/src/lib.rs @@ -1,3 +1,7 @@ +#![deny(rust_2018_idioms)] +#![deny(clippy::all)] +#![cfg_attr(docsrs, feature(doc_cfg))] + pub use { self::adapter::{ state::{LockedState, UnlockedState}, diff --git a/adview-manager/Cargo.toml b/adview-manager/Cargo.toml index 978ea5ab3..7004079de 100644 --- a/adview-manager/Cargo.toml +++ b/adview-manager/Cargo.toml @@ -23,3 +23,7 @@ async-std = "^1.8" once_cell = "^1.8" thiserror = "^1.0" rand = "^0.8" + +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] diff --git a/adview-manager/src/lib.rs b/adview-manager/src/lib.rs index 4f816e465..709936445 100644 --- a/adview-manager/src/lib.rs +++ b/adview-manager/src/lib.rs @@ -1,5 +1,6 @@ #![deny(rust_2018_idioms)] #![deny(clippy::all)] +#![cfg_attr(docsrs, feature(doc_cfg))] use adex_primitives::{ campaign::Validators, diff --git a/docs/config/dev.toml b/docs/config/dev.toml index 40a0cbbe3..749abbaf7 100644 --- a/docs/config/dev.toml +++ b/docs/config/dev.toml @@ -7,8 +7,9 @@ spendable_find_limit = 200 wait_time = 500 msgs_find_limit = 10 -analytics_find_limit_v5 = 5000 -analytics_maxtime_v5 = 20000 +analytics_find_limit = 5000 +# in milliseconds +analytics_maxtime = 20000 heartbeat_time = 30000 health_threshold_promilles = 950 diff --git a/docs/config/ganache.toml b/docs/config/ganache.toml index 8190d1f3b..92fc43f60 100644 --- a/docs/config/ganache.toml +++ b/docs/config/ganache.toml @@ -1,37 +1,32 @@ -# based on: prod.toml # Maximum number of channels to return per request max_channels = 512 -channels_find_limit = 512 -campaigns_find_limit = 512 -spendable_find_limit = 512 - -wait_time = 40000 - -# V4 Deprecated -aggr_throttle = 0 - -events_find_limit = 100 +channels_find_limit = 200 +campaigns_find_limit = 200 +spendable_find_limit = 200 +wait_time = 500 msgs_find_limit = 10 -analytics_find_limit_v5 = 5000 -analytics_maxtime_v5 = 15000 +analytics_find_limit = 5000 +# in milliseconds +analytics_maxtime = 20000 -heartbeat_time = 60000 -health_threshold_promilles = 970 -health_unsignable_promilles = 770 -propagation_timeout = 3000 +heartbeat_time = 30000 +health_threshold_promilles = 950 +health_unsignable_promilles = 750 +propagation_timeout = 1000 -fetch_timeout = 10000 -all_campaigns_timeout = 10000 -channel_tick_timeout = 10000 +fetch_timeout = 5000 +all_campaigns_timeout = 5000 +channel_tick_timeout = 5000 ip_rate_limit = { type = 'ip', timeframe = 1200000 } sid_rate_limit = { type = 'sid', timeframe = 0 } creators_whitelist = [] validators_whitelist = [] -admins = ['0x80690751969B234697e9059e04ed72195c3507fa'] +# Leader - 0xce07CbB7e054514D590a0262C93070D838bFBA2e +admins = ['0xce07CbB7e054514D590a0262C93070D838bFBA2e'] # Ethereum mainnet tokens # [chain."Ganache #1"] diff --git a/docs/config/prod.toml b/docs/config/prod.toml index e6b0eb111..1828c20df 100644 --- a/docs/config/prod.toml +++ b/docs/config/prod.toml @@ -4,12 +4,12 @@ max_channels = 512 channels_find_limit = 512 campaigns_find_limit = 512 spendable_find_limit = 512 - wait_time = 40000 msgs_find_limit = 10 -analytics_find_limit_v5 = 5000 -analytics_maxtime_v5 = 15000 +analytics_find_limit = 5000 +# in milliseconds +analytics_maxtime = 15000 heartbeat_time = 60000 health_threshold_promilles = 970 @@ -23,13 +23,6 @@ channel_tick_timeout = 10000 ip_rate_limit = { type = 'ip', timeframe = 1200000 } sid_rate_limit = { type = 'sid', timeframe = 0 } -# TODO: Replace with real contract address -outpace_address = '0x333420fc6a897356e69b62417cd17ff012177d2b' -# TODO: Replace with real contract address -sweeper_address = '0x333420fc6a897356e69b62417cd17ff012177d2b' - -ethereum_network = 'http://localhost:8545' - creators_whitelist = [] validators_whitelist = [] admins = ['0x5d6A3F1AD7b124ecDFDf4841D9bB246eD5fBF04c'] diff --git a/primitives/Cargo.toml b/primitives/Cargo.toml index 24b404ff4..4b086473a 100644 --- a/primitives/Cargo.toml +++ b/primitives/Cargo.toml @@ -73,3 +73,7 @@ once_cell = "^1.8" pretty_assertions = "1" tokio = { version = "1", features = ["rt-multi-thread", "macros"] } serde_urlencoded = "^0.7" + +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] diff --git a/primitives/src/analytics.rs b/primitives/src/analytics.rs index 17ccd3e29..18c11a27b 100644 --- a/primitives/src/analytics.rs +++ b/primitives/src/analytics.rs @@ -7,8 +7,6 @@ use serde::{Deserialize, Serialize}; use self::query::{AllowedKey, Time}; -pub const ANALYTICS_QUERY_LIMIT: u32 = 200; - #[cfg(feature = "postgres")] pub mod postgres { use super::{query::AllowedKey, AnalyticsQuery, OperatingSystem}; diff --git a/primitives/src/campaign.rs b/primitives/src/campaign.rs index 19bfcb50a..79502fc46 100644 --- a/primitives/src/campaign.rs +++ b/primitives/src/campaign.rs @@ -31,6 +31,7 @@ mod campaign_id { #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] /// an Id of 16 bytes, (de)serialized as a `0x` prefixed hex + /// /// In this implementation of the `CampaignId` the value is generated from a `Uuid::new_v4().to_simple()` pub struct CampaignId([u8; 16]); diff --git a/primitives/src/campaign_validator.rs b/primitives/src/campaign_validator.rs index 56908d2bc..47cf5c1bc 100644 --- a/primitives/src/campaign_validator.rs +++ b/primitives/src/campaign_validator.rs @@ -69,7 +69,7 @@ impl Validator for Campaign { // Check if Channel token is listed in the configuration token Chain ID & Address let chain_context = config - .find_chain_token(self.channel.token) + .find_chain_of(self.channel.token) .ok_or(Validation::UnlistedAsset)?; // Check if the campaign budget is above the minimum deposit configured @@ -345,7 +345,7 @@ mod test { { let mut campaign = DUMMY_CAMPAIGN.clone(); let campaign_token = config - .find_chain_token(campaign.channel.token) + .find_chain_of(campaign.channel.token) .unwrap() .token; @@ -391,7 +391,7 @@ mod test { let mut campaign = DUMMY_CAMPAIGN.clone(); let campaign_token = config - .find_chain_token(campaign.channel.token) + .find_chain_of(campaign.channel.token) .unwrap() .token; diff --git a/primitives/src/chain.rs b/primitives/src/chain.rs index b949b3aab..d9d5b236f 100644 --- a/primitives/src/chain.rs +++ b/primitives/src/chain.rs @@ -7,7 +7,7 @@ use crate::{config::TokenInfo, util::ApiUrl, Address, Campaign, Channel}; /// /// # Ethereum Virtual Machine /// -/// For all the EVM-compatible Chain IDs visit https://chainid.network +/// For all the EVM-compatible Chain IDs visit #[derive(Serialize, Deserialize, Hash, Clone, Copy, Eq, PartialEq)] #[serde(transparent)] pub struct ChainId(u32); @@ -42,8 +42,8 @@ pub struct Chain { /// RPC url of the chain which will be used for Blockchain interactions. /// /// # Ethereum Virtual Machine - /// Check out the metadata for all EVM-chains: https://github.com/ethereum-lists/chains - /// Or in json: https://chainid.network/chains.json + /// Check out the metadata for all EVM-chains: + /// Or in json: pub rpc: ApiUrl, /// The OUTPACE contract address on this Chain pub outpace: Address, @@ -52,8 +52,9 @@ pub struct Chain { } /// Context of [`TokenInfo`] & [`Chain`] information for given [`Channel`] or [`Campaign`]. -/// The additional context of [`Channel`] is known after checking if the Channel token's -/// Chain & Address are whitelisted in the configuration. +/// +/// The additional context of [`Channel`] is known after checking if the `Channel` token's +/// `Chain` & `Address` are whitelisted in the configuration. #[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Hash, Clone)] pub struct ChainOf { pub context: T, diff --git a/primitives/src/config.rs b/primitives/src/config.rs index 89976f0f7..c4e1d0447 100644 --- a/primitives/src/config.rs +++ b/primitives/src/config.rs @@ -46,9 +46,12 @@ pub struct Config { pub spendable_find_limit: u32, pub wait_time: u32, pub msgs_find_limit: u32, - pub analytics_find_limit_v5: u32, + /// The maximum analytic results you can receive per request. + pub analytics_find_limit: u32, + /// A timeout to be used when collecting the Analytics for a request. /// In milliseconds - pub analytics_maxtime_v5: u32, + pub analytics_maxtime: u32, + /// The amount of time between heartbeats. /// In milliseconds pub heartbeat_time: u32, pub health_threshold_promilles: u32, @@ -73,12 +76,12 @@ pub struct Config { /// The key of this map is a human-readable text of the Chain name /// for readability in the configuration file. /// - /// - To get the chain of a token address use [`Config::find_token_chain`]. + /// - To get the chain of a token address use [`Config::find_chain_of()`]. /// - /// - To get a chain RPC use [`Config::find_chain_rpc`]. + /// - To get a [`ChainInfo`] only by a [`ChainId`] use [`Config::find_chain()`]. /// /// **NOTE:** Make sure that a Token [`Address`] is unique across all Chains, - /// otherwise `Config::find_chain_token` will fetch only one of them and cause unexpected problems. + /// otherwise [`Config::find_chain_of()`] will fetch only one of them and cause unexpected problems. #[serde(rename = "chain")] pub chains: HashMap, } @@ -99,7 +102,7 @@ impl Config { } /// Finds the pair of Chain & Token, given only a token [`Address`]. - pub fn find_chain_token(&self, token: Address) -> Option> { + pub fn find_chain_of(&self, token: Address) -> Option> { self.chains.values().find_map(|chain_info| { chain_info .tokens diff --git a/primitives/src/lib.rs b/primitives/src/lib.rs index 886d0056f..c87e0f171 100644 --- a/primitives/src/lib.rs +++ b/primitives/src/lib.rs @@ -1,6 +1,7 @@ #![deny(rust_2018_idioms)] #![deny(clippy::all)] -#![allow(deprecated)] +#![cfg_attr(docsrs, feature(doc_cfg))] + use std::{error, fmt}; pub use self::{ @@ -43,14 +44,17 @@ pub mod spender; pub mod supermarket; pub mod targeting; #[cfg(feature = "test-util")] +#[cfg_attr(docsrs, doc(cfg(feature = "test-util")))] pub mod test_util; mod unified_num; pub mod validator; -/// This module is available with the `postgres` feature +/// This module is available with the `postgres` feature. +/// /// Other places where you'd find `mod postgres` implementations is for many of the structs in the crate /// all of which implement [`tokio_postgres::types::FromSql`], [`tokio_postgres::types::ToSql`] or [`From<&tokio_postgres::Row>`] #[cfg(feature = "postgres")] +#[cfg_attr(docsrs, doc(cfg(feature = "postgres")))] pub mod postgres { use std::env::{self, VarError}; diff --git a/primitives/src/sentry.rs b/primitives/src/sentry.rs index f799c6dca..d34a5f292 100644 --- a/primitives/src/sentry.rs +++ b/primitives/src/sentry.rs @@ -3,7 +3,7 @@ use crate::{ balances::BalancesState, spender::Spender, validator::{ApproveState, Heartbeat, MessageTypes, NewState, Type as MessageType}, - Address, Balances, BigNum, CampaignId, Channel, ChannelId, UnifiedNum, ValidatorId, IPFS, + Address, Balances, CampaignId, UnifiedNum, ValidatorId, IPFS, }; use chrono::{ serde::ts_milliseconds, Date, DateTime, Duration, NaiveDate, TimeZone, Timelike, Utc, @@ -348,6 +348,7 @@ pub struct DateHourError { #[derive(Clone, Hash)] /// [`DateHour`] holds the date and hour (only). +/// /// It uses [`chrono::DateTime`] when serializing and deserializing. /// When serializing it always sets minutes and seconds to `0` (zero). /// When deserializing the minutes and seconds should always be set to `0` (zero), @@ -525,22 +526,6 @@ impl<'de> Deserialize<'de> for DateHour { } } -#[derive(Debug, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct EventAggregate { - pub channel_id: ChannelId, - pub created: DateTime, - pub events: HashMap, -} - -#[derive(Clone, Debug, Serialize, Deserialize, Default)] -#[serde(rename_all = "camelCase")] -pub struct AggregateEvents { - #[serde(default, skip_serializing_if = "Option::is_none")] - pub event_counts: Option>, - pub event_payouts: HashMap, -} - #[derive(Debug, Serialize, Deserialize, PartialEq)] #[serde(rename_all = "camelCase")] pub struct Pagination { @@ -600,14 +585,21 @@ pub struct ValidatorMessage { #[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] -pub struct ValidatorMessageResponse { - pub validator_messages: Vec, +pub struct ValidatorMessagesListResponse { + pub messages: Vec, } #[derive(Serialize, Deserialize, Debug)] -pub struct EventAggregateResponse { - pub channel: Channel, - pub events: Vec, +#[serde(rename_all = "camelCase")] +pub struct ValidatorMessagesCreateRequest { + pub messages: Vec, +} + +#[derive(Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct ValidatorMessagesListQuery { + /// Will apply the lower limit of: `query.limit` and `Config::msgs_find_limit` + pub limit: Option, } #[derive(Serialize, Deserialize, Debug)] @@ -618,53 +610,6 @@ pub struct ValidationErrorResponse { pub validation: Vec, } -#[derive(Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct AdvancedAnalyticsResponse { - pub by_channel_stats: HashMap>>, - pub publisher_stats: HashMap>, -} - -#[derive(Serialize, Deserialize, Debug, Hash, PartialEq, Eq, Clone)] -#[serde(rename_all = "camelCase")] -pub enum PublisherReport { - AdUnit, - AdSlot, - AdSlotPay, - Country, - Hostname, -} - -impl fmt::Display for PublisherReport { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match *self { - PublisherReport::AdUnit => write!(f, "reportPublisherToAdUnit"), - PublisherReport::AdSlot => write!(f, "reportPublisherToAdSlot"), - PublisherReport::AdSlotPay => write!(f, "reportPublisherToAdSlotPay"), - PublisherReport::Country => write!(f, "reportPublisherToCountry"), - PublisherReport::Hostname => write!(f, "reportPublisherToHostname"), - } - } -} - -#[derive(Serialize, Deserialize, Debug, Hash, PartialEq, Eq, Clone)] -#[serde(rename_all = "camelCase")] -pub enum ChannelReport { - AdUnit, - Hostname, - HostnamePay, -} - -impl fmt::Display for ChannelReport { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match *self { - ChannelReport::AdUnit => write!(f, "reportPublisherToAdUnit"), - ChannelReport::Hostname => write!(f, "reportChannelToHostname"), - ChannelReport::HostnamePay => write!(f, "reportChannelToHostnamePay"), - } - } -} - pub mod channel_list { use crate::{Channel, ValidatorId}; use serde::{Deserialize, Serialize}; @@ -684,13 +629,12 @@ pub mod channel_list { #[serde(default)] // default is `u64::default()` = `0` pub page: u64, - pub creator: Option, /// filters the channels containing a specific validator if provided pub validator: Option, } } -pub mod campaign { +pub mod campaign_list { use crate::{Address, Campaign, ValidatorId}; use chrono::{serde::ts_seconds, DateTime, Utc}; use serde::{Deserialize, Serialize}; @@ -878,6 +822,16 @@ pub mod campaign_create { Self::from_campaign_erased(campaign, id) } } +} + +pub mod campaign_modify { + use serde::{Deserialize, Serialize}; + + use crate::{ + campaign::{PricingBounds, Validators}, + targeting::Rules, + AdUnit, Campaign, EventSubmission, UnifiedNum, + }; // All editable fields stored in one place, used for checking when a budget is changed #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] @@ -947,7 +901,6 @@ mod postgres { }; use crate::{ analytics::{AnalyticsQuery, Metric}, - sentry::EventAggregate, validator::{messages::Type as MessageType, MessageTypes}, }; use bytes::BytesMut; @@ -958,16 +911,6 @@ mod postgres { Error, Row, }; - impl From<&Row> for EventAggregate { - fn from(row: &Row) -> Self { - Self { - channel_id: row.get("channel_id"), - created: row.get("created"), - events: row.get::<_, Json<_>>("events").0, - } - } - } - impl From<&Row> for ValidatorMessage { fn from(row: &Row) -> Self { Self { diff --git a/sentry/Cargo.toml b/sentry/Cargo.toml index f1f7ba799..05ca7c769 100644 --- a/sentry/Cargo.toml +++ b/sentry/Cargo.toml @@ -57,3 +57,7 @@ once_cell = "1.5.2" [dev-dependencies] pretty_assertions = "1" + +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] diff --git a/sentry/migrations/20190806011140_initial-tables/down.sql b/sentry/migrations/20190806011140_initial-tables/down.sql index 6042892f0..b2fffae7b 100644 --- a/sentry/migrations/20190806011140_initial-tables/down.sql +++ b/sentry/migrations/20190806011140_initial-tables/down.sql @@ -3,7 +3,6 @@ ALTER TABLE campaigns DROP CONSTRAINT fk_campaigns_channel_id; ALTER TABLE spendable DROP CONSTRAINT fk_spendable_channel_id; ALTER TABLE validator_messages DROP CONSTRAINT fk_validator_messages_channel_id; ALTER TABLE accounting DROP CONSTRAINT fk_accounting_channel_id; -DROP AGGREGATE jsonb_object_agg(jsonb); DROP TABLE validator_messages, channels, analytics, campaigns, spendable, accounting; -- Types should be dropped last DROP TYPE AccountingSide; diff --git a/sentry/migrations/20190806011140_initial-tables/up.sql b/sentry/migrations/20190806011140_initial-tables/up.sql index 64fe1b1c0..40b9872fe 100644 --- a/sentry/migrations/20190806011140_initial-tables/up.sql +++ b/sentry/migrations/20190806011140_initial-tables/up.sql @@ -62,25 +62,6 @@ CREATE INDEX idx_validator_messages_msg_type ON validator_messages ((msg ->> 'ty CREATE INDEX idx_validator_messages_msg_state_root ON validator_messages ((msg ->> 'stateRoot')); --- TODO: AIP#61 Alter Event Aggregates --- CREATE TABLE event_aggregates ( --- channel_id varchar(66) NOT NULL, -- REFERENCES channels (id) ON DELETE RESTRICT, --- created timestamp(2) with time zone NOT NULL DEFAULT NOW(), --- event_type varchar(255) NOT NULL, --- earner varchar(42), --- -- todo: AIP#61 check the count and payout --- count varchar NOT NULL, --- payout varchar NOT NULL --- ); --- CREATE INDEX idx_event_aggregates_created ON event_aggregates (created); --- CREATE INDEX idx_event_aggregates_channel ON event_aggregates (channel_id); --- CREATE INDEX idx_event_aggregates_event_type ON event_aggregates (event_type); -CREATE AGGREGATE jsonb_object_agg (jsonb) ( - SFUNC = 'jsonb_concat', - STYPE = jsonb, - INITCOND = '{}' -); - CREATE TYPE AccountingSide AS ENUM ( 'Earner', 'Spender' diff --git a/sentry/src/access.rs b/sentry/src/access.rs index 09be44810..9b015338a 100644 --- a/sentry/src/access.rs +++ b/sentry/src/access.rs @@ -227,7 +227,7 @@ mod test { let campaign = get_campaign(rule); let chain_context = config - .find_chain_token(campaign.channel.token) + .find_chain_of(campaign.channel.token) .expect("Campaign's Channel.token should be set in config"); let auth = Auth { @@ -288,7 +288,7 @@ mod test { let campaign = get_campaign(rule); let chain_context = config - .find_chain_token(campaign.channel.token) + .find_chain_of(campaign.channel.token) .expect("Campaign's Channel.token should be set in config"); let auth = Auth { @@ -348,7 +348,7 @@ mod test { campaign.active.to = Utc.ymd(1970, 1, 1).and_hms(12, 00, 9); let chain_context = config - .find_chain_token(campaign.channel.token) + .find_chain_of(campaign.channel.token) .expect("Campaign's Channel.token should be set in config"); let auth = Auth { @@ -391,7 +391,7 @@ mod test { let campaign = get_campaign(rule); let chain_context = config - .find_chain_token(campaign.channel.token) + .find_chain_of(campaign.channel.token) .expect("Campaign's Channel.token should be set in config"); let auth = Auth { @@ -434,7 +434,7 @@ mod test { let campaign = get_campaign(rule); let chain_context = config - .find_chain_token(campaign.channel.token) + .find_chain_of(campaign.channel.token) .expect("Campaign's Channel.token should be set in config"); let auth = Auth { @@ -474,7 +474,7 @@ mod test { let campaign = get_campaign(rule); let chain_context = config - .find_chain_token(campaign.channel.token) + .find_chain_of(campaign.channel.token) .expect("Campaign's Channel.token should be set in config"); let auth = Auth { @@ -517,7 +517,7 @@ mod test { let campaign = get_campaign(rule); let chain_context = config - .find_chain_token(campaign.channel.token) + .find_chain_of(campaign.channel.token) .expect("Campaign's Channel.token should be set in config"); let auth = Auth { diff --git a/sentry/src/db.rs b/sentry/src/db.rs index 7126d81a4..ced86876c 100644 --- a/sentry/src/db.rs +++ b/sentry/src/db.rs @@ -14,14 +14,11 @@ pub mod accounting; pub mod analytics; pub mod campaign; mod channel; -pub mod event_aggregate; pub mod spendable; -mod validator_message; +pub mod validator_message; pub use self::campaign::*; pub use self::channel::*; -pub use self::event_aggregate::*; -pub use self::validator_message::*; // Re-export the Postgres Config pub use tokio_postgres::Config as PostgresConfig; @@ -139,6 +136,7 @@ pub async fn setup_migrations(environment: Environment) { } #[cfg(feature = "test-util")] +#[cfg_attr(docsrs, doc(cfg(feature = "test-util")))] pub mod tests_postgres { use std::{ ops::{Deref, DerefMut}, @@ -435,6 +433,7 @@ pub mod tests_postgres { } #[cfg(feature = "test-util")] +#[cfg_attr(docsrs, doc(cfg(feature = "test-util")))] pub mod redis_pool { use dashmap::DashMap; diff --git a/sentry/src/db/campaign.rs b/sentry/src/db/campaign.rs index eae0856f7..ccc295c1e 100644 --- a/sentry/src/db/campaign.rs +++ b/sentry/src/db/campaign.rs @@ -2,7 +2,7 @@ use crate::db::{DbPool, PoolError, TotalCount}; use chrono::{DateTime, Utc}; use primitives::{ sentry::{ - campaign::{CampaignListResponse, ValidatorParam}, + campaign_list::{CampaignListResponse, ValidatorParam}, Pagination, }, Address, Campaign, CampaignId, ChannelId, @@ -505,7 +505,7 @@ mod test { campaign, campaign::Validators, event_submission::{RateLimit, Rule}, - sentry::campaign_create::ModifyCampaign, + sentry::campaign_modify::ModifyCampaign, targeting::Rules, util::tests::prep_db::{ ADDRESSES, DUMMY_AD_UNITS, DUMMY_CAMPAIGN, DUMMY_VALIDATOR_FOLLOWER, IDS, diff --git a/sentry/src/db/channel.rs b/sentry/src/db/channel.rs index 625d5edb5..782eec857 100644 --- a/sentry/src/db/channel.rs +++ b/sentry/src/db/channel.rs @@ -1,5 +1,4 @@ -use chrono::Utc; -use primitives::{validator::MessageTypes, Channel, ChannelId, ValidatorId}; +use primitives::{Channel, ChannelId, ValidatorId}; pub use list_channels::list_channels; @@ -72,27 +71,6 @@ pub async fn insert_channel(pool: &DbPool, channel: Channel) -> Result Result { - let client = pool.get().await?; - - let stmt = client.prepare("INSERT INTO validator_messages (channel_id, \"from\", msg, received) values ($1, $2, $3, $4)").await?; - - let row = client - .execute( - &stmt, - &[&channel.id(), &from, &validator_message, &Utc::now()], - ) - .await?; - - let inserted = row == 1; - Ok(inserted) -} - mod list_channels { use primitives::{ sentry::{channel_list::ChannelListResponse, Pagination}, diff --git a/sentry/src/db/event_aggregate.rs b/sentry/src/db/event_aggregate.rs deleted file mode 100644 index a60a9b72a..000000000 --- a/sentry/src/db/event_aggregate.rs +++ /dev/null @@ -1,226 +0,0 @@ -use chrono::{DateTime, Utc}; -use futures::pin_mut; -use primitives::{ - balances::UncheckedState, - sentry::{EventAggregate, MessageResponse}, - validator::{ApproveState, Heartbeat, NewState}, - Address, BigNum, Channel, ChannelId, ValidatorId, -}; -use std::ops::Add; -use tokio_postgres::{ - binary_copy::BinaryCopyInWriter, - types::{ToSql, Type}, -}; - -use super::{DbPool, PoolError}; - -pub async fn latest_approve_state_v5( - pool: &DbPool, - channel: &Channel, -) -> Result>, PoolError> { - let client = pool.get().await?; - - let select = client.prepare("SELECT \"from\", msg, received FROM validator_messages WHERE channel_id = $1 AND \"from\" = $2 AND msg ->> 'type' = 'ApproveState' ORDER BY received DESC LIMIT 1").await?; - let rows = client - .query(&select, &[&channel.id(), &channel.follower]) - .await?; - - rows.get(0) - .map(MessageResponse::::try_from) - .transpose() - .map_err(PoolError::Backend) -} - -pub async fn latest_new_state_v5( - pool: &DbPool, - channel: &Channel, - state_root: &str, -) -> Result>>, PoolError> { - let client = pool.get().await?; - - let select = client.prepare("SELECT \"from\", msg, received FROM validator_messages WHERE channel_id = $1 AND \"from\" = $2 AND msg ->> 'type' = 'NewState' AND msg->> 'stateRoot' = $3 ORDER BY received DESC LIMIT 1").await?; - let rows = client - .query(&select, &[&channel.id(), &channel.leader, &state_root]) - .await?; - - rows.get(0) - .map(MessageResponse::>::try_from) - .transpose() - .map_err(PoolError::Backend) -} - -pub async fn latest_heartbeats( - pool: &DbPool, - channel_id: &ChannelId, - validator_id: &ValidatorId, -) -> Result>, PoolError> { - let client = pool.get().await?; - - let select = client.prepare("SELECT \"from\", msg, received FROM validator_messages WHERE channel_id = $1 AND \"from\" = $2 AND msg ->> 'type' = 'Heartbeat' ORDER BY received DESC LIMIT 2").await?; - let rows = client.query(&select, &[&channel_id, &validator_id]).await?; - - rows.iter() - .map(MessageResponse::::try_from) - .collect::>() - .map_err(PoolError::Backend) -} - -#[deprecated = "V4 Double check what we will need for analytics and remove this function"] -pub async fn list_event_aggregates( - pool: &DbPool, - channel_id: &ChannelId, - limit: u32, - from: &Option, - after: &Option>, -) -> Result, PoolError> { - let client = pool.get().await?; - - let (mut where_clauses, mut params) = (vec![], Vec::<&(dyn ToSql + Sync)>::new()); - let id = channel_id.to_string(); - params.push(&id); - where_clauses.push(format!("channel_id = ${}", params.len())); - - if let Some(from) = from { - where_clauses.push(format!("earner = '{}'", from.to_string())); - params.push(&"IMPRESSION"); - where_clauses.push(format!("event_type = ${}", params.len())); - } else { - where_clauses.push("earner is NOT NULL".to_string()); - } - - if let Some(after) = after { - params.push(after); - where_clauses.push(format!("created > ${}", params.len())); - } - - let where_clause = if !where_clauses.is_empty() { - where_clauses.join(" AND ").to_string() - } else { - "".to_string() - }; - let statement = format!( - " - WITH aggregates AS ( - SELECT - channel_id, - created, - event_type, - jsonb_build_object( - 'eventCounts', - jsonb_object_agg( - jsonb_build_object( - earner, count - ) - ), - 'eventPayouts', - jsonb_object_agg( - jsonb_build_object( - earner, payout - ) - ) - ) - as data - FROM event_aggregates WHERE {} GROUP BY channel_id, event_type, created ORDER BY created DESC LIMIT {} - ) SELECT channel_id, created, jsonb_object_agg(event_type , data) as events FROM aggregates GROUP BY channel_id, created - ", where_clause, limit); - - let stmt = client.prepare(&statement).await?; - let rows = client.query(&stmt, params.as_slice()).await?; - - let event_aggregates = rows.iter().map(EventAggregate::from).collect(); - - Ok(event_aggregates) -} - -#[derive(Debug)] -struct EventData { - id: ChannelId, - event_type: String, - earner: Option
, - event_count: BigNum, - event_payout: BigNum, -} - -#[deprecated = "V4 No longer needed for V5, use analytics instead"] -pub async fn insert_event_aggregate( - pool: &DbPool, - channel_id: &ChannelId, - event: &EventAggregate, -) -> Result { - let mut data: Vec = Vec::new(); - - for (event_type, aggr) in &event.events { - if let Some(event_counts) = &aggr.event_counts { - let mut total_event_counts: BigNum = 0.into(); - let mut total_event_payouts: BigNum = 0.into(); - for (earner, event_count) in event_counts { - let event_payout = aggr.event_payouts[earner].clone(); - - data.push(EventData { - id: channel_id.to_owned(), - event_type: event_type.clone(), - earner: Some(*earner), - event_count: event_count.to_owned(), - event_payout: event_payout.clone(), - }); - - // total sum - total_event_counts = event_count.add(&total_event_counts); - total_event_payouts = event_payout.add(total_event_payouts); - } - - data.push(EventData { - id: channel_id.to_owned(), - event_type: event_type.clone(), - earner: None, - event_count: total_event_counts, - event_payout: total_event_payouts, - }); - } - } - - let client = pool.get().await?; - - let mut err: Option = None; - let sink = client.copy_in("COPY event_aggregates(channel_id, created, event_type, count, payout, earner) FROM STDIN BINARY").await?; - - let created = Utc::now(); // time discrepancy - - let writer = BinaryCopyInWriter::new( - sink, - &[ - Type::VARCHAR, - Type::TIMESTAMPTZ, - Type::VARCHAR, - Type::VARCHAR, - Type::VARCHAR, - Type::VARCHAR, - ], - ); - pin_mut!(writer); - for item in data { - if let Err(e) = writer - .as_mut() - .write(&[ - &item.id, - &created, - &item.event_type, - &item.event_count, - &item.event_payout, - &item.earner, - ]) - .await - { - err = Some(e); - break; - } - } - - match err { - Some(e) => Err(PoolError::Backend(e)), - None => { - writer.finish().await?; - Ok(true) - } - } -} diff --git a/sentry/src/db/validator_message.rs b/sentry/src/db/validator_message.rs index af2742091..d124f18c3 100644 --- a/sentry/src/db/validator_message.rs +++ b/sentry/src/db/validator_message.rs @@ -1,8 +1,40 @@ -use primitives::{sentry::ValidatorMessage, ChannelId, ValidatorId}; +use chrono::Utc; use tokio_postgres::types::ToSql; +use primitives::{ + balances::UncheckedState, + sentry::{MessageResponse, ValidatorMessage}, + validator::{ApproveState, Heartbeat, MessageTypes, NewState}, + Channel, ChannelId, ValidatorId, +}; + use super::{DbPool, PoolError}; +/// Inserts a new validator [`MessageTypes`] using the `from` [`ValidatorId`] and `received` at: [`Utc::now()`][Utc] +pub async fn insert_validator_messages( + pool: &DbPool, + channel: &Channel, + from: &ValidatorId, + validator_message: &MessageTypes, +) -> Result { + let client = pool.get().await?; + + let stmt = client.prepare("INSERT INTO validator_messages (channel_id, \"from\", msg, received) values ($1, $2, $3, $4)").await?; + + let row = client + .execute( + &stmt, + &[&channel.id(), &from, &validator_message, &Utc::now()], + ) + .await?; + + let inserted = row == 1; + Ok(inserted) +} + +/// Retrieves [`ValidatorMessage`]s for a given [`Channel`], +/// filters them by the `message_types` and optionally, +/// filters them by the provided `from` [`ValidatorId`]. pub async fn get_validator_messages( pool: &DbPool, channel_id: &ChannelId, @@ -49,3 +81,60 @@ fn add_message_types_params<'a>( where_clauses.push(format!("msg->>'type' IN ({})", msg_prep.join(","))); } } + +pub async fn latest_approve_state( + pool: &DbPool, + channel: &Channel, +) -> Result>, PoolError> { + let client = pool.get().await?; + + let select = client.prepare("SELECT \"from\", msg, received FROM validator_messages WHERE channel_id = $1 AND \"from\" = $2 AND msg ->> 'type' = 'ApproveState' ORDER BY received DESC LIMIT 1").await?; + let rows = client + .query(&select, &[&channel.id(), &channel.follower]) + .await?; + + rows.get(0) + .map(MessageResponse::::try_from) + .transpose() + .map_err(PoolError::Backend) +} + +/// Returns the latest [`NewState`] message for this [`Channel`] and the provided `state_root`. +/// +/// Ordered by: `received DESC` +pub async fn latest_new_state( + pool: &DbPool, + channel: &Channel, + state_root: &str, +) -> Result>>, PoolError> { + let client = pool.get().await?; + + let select = client.prepare("SELECT \"from\", msg, received FROM validator_messages WHERE channel_id = $1 AND \"from\" = $2 AND msg ->> 'type' = 'NewState' AND msg->> 'stateRoot' = $3 ORDER BY received DESC LIMIT 1").await?; + let rows = client + .query(&select, &[&channel.id(), &channel.leader, &state_root]) + .await?; + + rows.get(0) + .map(MessageResponse::>::try_from) + .transpose() + .map_err(PoolError::Backend) +} + +/// Returns the latest 2 [`Heartbeat`] messages for this [`Channel`] received `from` the [`ValidatorId`]. +/// +/// Ordered by: `received DESC` +pub async fn latest_heartbeats( + pool: &DbPool, + channel_id: &ChannelId, + validator_id: &ValidatorId, +) -> Result>, PoolError> { + let client = pool.get().await?; + + let select = client.prepare("SELECT \"from\", msg, received FROM validator_messages WHERE channel_id = $1 AND \"from\" = $2 AND msg ->> 'type' = 'Heartbeat' ORDER BY received DESC LIMIT 2").await?; + let rows = client.query(&select, &[&channel_id, &validator_id]).await?; + + rows.iter() + .map(MessageResponse::::try_from) + .collect::>() + .map_err(PoolError::Backend) +} diff --git a/sentry/src/event_reducer.rs b/sentry/src/event_reducer.rs deleted file mode 100644 index 4a1ba5c68..000000000 --- a/sentry/src/event_reducer.rs +++ /dev/null @@ -1,139 +0,0 @@ -use primitives::{ - sentry::{AggregateEvents, Event, EventAggregate}, - Address, BigNum, Channel, -}; - -// -// TODO: AIP#61 remove `allow(dead_code)` and see what should be changed for Spender Aggregate -// -#[allow(dead_code, clippy::unnecessary_wraps)] -pub(crate) fn reduce( - channel: &Channel, - initial_aggr: &mut EventAggregate, - ev: &Event, - payout: &Option<(Address, BigNum)>, -) -> Result<(), Box> { - let event_type = ev.to_string(); - - match ev { - Event::Impression { publisher, .. } => { - let impression = initial_aggr.events.get(&event_type); - let merge = merge_payable_event( - impression, - payout - .to_owned() - .unwrap_or_else(|| (*publisher, Default::default())), - ); - - initial_aggr.events.insert(event_type, merge); - } - Event::Click { publisher, .. } => { - let clicks = initial_aggr.events.get(&event_type); - let merge = merge_payable_event( - clicks, - payout - .to_owned() - .unwrap_or_else(|| (*publisher, Default::default())), - ); - - initial_aggr.events.insert(event_type, merge); - } - Event::Close => { - let close_event = AggregateEvents { - event_counts: Some( - vec![(channel.creator.to_address(), 1.into())] - .into_iter() - .collect(), - ), - event_payouts: vec![(channel.creator.to_address(), channel.deposit_amount.clone())] - .into_iter() - .collect(), - }; - initial_aggr.events.insert(event_type, close_event); - } - _ => {} - }; - - Ok(()) -} - -/// payable_event is either an IMPRESSION or a CLICK -fn merge_payable_event( - payable_event: Option<&AggregateEvents>, - payout: (Address, BigNum), -) -> AggregateEvents { - let mut payable_event = payable_event.cloned().unwrap_or_default(); - - let event_count = payable_event - .event_counts - .get_or_insert_with(Default::default) - .entry(payout.0) - .or_insert_with(|| 0.into()); - - *event_count += &1.into(); - - let event_payouts = payable_event - .event_payouts - .entry(payout.0) - .or_insert_with(|| 0.into()); - *event_payouts += &payout.1; - - payable_event -} - -#[cfg(test)] -mod test { - use super::*; - use chrono::Utc; - use primitives::{ - util::tests::prep_db::{ADDRESSES, DUMMY_CHANNEL}, - BigNum, - }; - - #[test] - fn test_reduce() { - let mut channel: Channel = DUMMY_CHANNEL.clone(); - channel.deposit_amount = 100.into(); - // make immutable again - let channel = channel; - - let mut event_aggr = EventAggregate { - channel_id: channel.id, - created: Utc::now(), - events: Default::default(), - }; - - let event = Event::Impression { - publisher: ADDRESSES["publisher"], - ad_unit: None, - ad_slot: None, - referrer: None, - }; - let payout = Some((ADDRESSES["publisher"], BigNum::from(1))); - for i in 0..101 { - reduce(&channel, &mut event_aggr, &event, &payout) - .expect(&format!("Should be able to reduce event #{}", i)); - } - - assert_eq!(event_aggr.channel_id, channel.id); - - let impression_event = event_aggr - .events - .get("IMPRESSION") - .expect("Should have an Impression event"); - - let event_counts = impression_event - .event_counts - .as_ref() - .expect("there should be event_counts set") - .get(&ADDRESSES["publisher"]) - .expect("There should be myAwesomePublisher event_counts key"); - assert_eq!(event_counts, &BigNum::from(101)); - - let event_payouts = impression_event - .event_payouts - .get(&ADDRESSES["publisher"]) - .expect("There should be myAwesomePublisher event_payouts key"); - assert_eq!(event_payouts, &BigNum::from(101)); - } -} diff --git a/sentry/src/lib.rs b/sentry/src/lib.rs index 239854eb9..9f4da126a 100644 --- a/sentry/src/lib.rs +++ b/sentry/src/lib.rs @@ -1,6 +1,7 @@ #![deny(clippy::all)] #![deny(rust_2018_idioms)] #![allow(deprecated)] +#![cfg_attr(docsrs, feature(doc_cfg))] use adapter::{prelude::*, Adapter}; use chrono::Utc; @@ -143,8 +144,7 @@ where let mut response = match (req.uri().path(), req.method()) { ("/cfg", &Method::GET) => get_cfg(req, self).await, - ("/channel/list", &Method::GET) => channel_list(req, self).await, - (route, _) if route.starts_with("/analytics") => analytics_router(req, self).await, + (route, _) if route.starts_with("/v5/analytics") => analytics_router(req, self).await, // This is important because it prevents us from doing // expensive regex matching for routes without /channel (path, _) if path.starts_with("/v5/channel") => channels_router(req, self).await, @@ -219,13 +219,13 @@ async fn analytics_router( let (route, method) = (req.uri().path(), req.method()); match (route, method) { - ("/analytics", &Method::GET) => { + ("/v5/analytics", &Method::GET) => { let allowed_keys_for_request = vec![AllowedKey::Country, AllowedKey::AdSlotType] .into_iter() .collect(); get_analytics(req, app, Some(allowed_keys_for_request), None).await } - ("/analytics/for-advertiser", &Method::GET) => { + ("/v5/analytics/for-advertiser", &Method::GET) => { let req = AuthRequired.call(req, app).await?; let authenticate_as = req @@ -236,7 +236,7 @@ async fn analytics_router( get_analytics(req, app, None, Some(authenticate_as)).await } - ("/analytics/for-publisher", &Method::GET) => { + ("/v5/analytics/for-publisher", &Method::GET) => { let authenticate_as = req .extensions() .get::() @@ -246,7 +246,7 @@ async fn analytics_router( let req = AuthRequired.call(req, app).await?; get_analytics(req, app, None, Some(authenticate_as)).await } - ("/analytics/for-admin", &Method::GET) => { + ("/v5/analytics/for-admin", &Method::GET) => { req = Chain::new() .chain(AuthRequired) .chain(IsAdmin) @@ -258,22 +258,25 @@ async fn analytics_router( } } +// TODO AIP#61: Add routes for: +// - POST /channel/:id/pay +// #[serde(rename_all = "camelCase")] +// Pay { payout: BalancesMap }, +// +// - GET /channel/:id/get-leaf async fn channels_router( mut req: Request, app: &Application, ) -> Result, ResponseError> { let (path, method) = (req.uri().path().to_owned(), req.method()); - // TODO AIP#61: Add routes for: - // - POST /channel/:id/pay - // #[serde(rename_all = "camelCase")] - // Pay { payout: BalancesMap }, - // - // - GET /channel/:id/spender/:addr - // - GET /channel/:id/spender/all - // - POST /channel/:id/spender/:addr - // - GET /channel/:id/get-leaf - if let (Some(caps), &Method::GET) = (LAST_APPROVED_BY_CHANNEL_ID.captures(&path), method) { + // `GET /v5/channel/list` + if let ("/v5/channel/list", &Method::GET) = (path.as_str(), method) { + channel_list(req, app).await + } + // `GET /v5/channel/:id/last-approved` + else if let (Some(caps), &Method::GET) = (LAST_APPROVED_BY_CHANNEL_ID.captures(&path), method) + { let param = RouteParams(vec![caps .get(1) .map_or("".to_string(), |m| m.as_str().to_string())]); @@ -282,7 +285,9 @@ async fn channels_router( req = ChannelLoad.call(req, app).await?; last_approved(req, app).await - } else if let (Some(caps), &Method::GET) = (CHANNEL_VALIDATOR_MESSAGES.captures(&path), method) + } + // `GET /v5/channel/:id/validator-messages` + else if let (Some(caps), &Method::GET) = (CHANNEL_VALIDATOR_MESSAGES.captures(&path), method) { let param = RouteParams(vec![caps .get(1) @@ -301,7 +306,9 @@ async fn channels_router( }; list_validator_messages(req, app, &extract_params.0, &extract_params.1).await - } else if let (Some(caps), &Method::POST) = (CHANNEL_VALIDATOR_MESSAGES.captures(&path), method) + } + // `POST /v5/channel/:id/validator-messages` + else if let (Some(caps), &Method::POST) = (CHANNEL_VALIDATOR_MESSAGES.captures(&path), method) { let param = RouteParams(vec![caps .get(1) @@ -316,7 +323,9 @@ async fn channels_router( .await?; create_validator_messages(req, app).await - } else if let (Some(caps), &Method::GET) = ( + } + // `GET /v5/channel/:id/spender/:addr` + else if let (Some(caps), &Method::GET) = ( CHANNEL_SPENDER_LEAF_AND_TOTAL_DEPOSITED.captures(&path), method, ) { @@ -334,7 +343,9 @@ async fn channels_router( .await?; get_spender_limits(req, app).await - } else if let (Some(caps), &Method::POST) = ( + } + // `POST /v5/channel/:id/spender/:addr` + else if let (Some(caps), &Method::POST) = ( CHANNEL_SPENDER_LEAF_AND_TOTAL_DEPOSITED.captures(&path), method, ) { @@ -352,7 +363,9 @@ async fn channels_router( .await?; add_spender_leaf(req, app).await - } else if let (Some(caps), &Method::GET) = (CHANNEL_ALL_SPENDER_LIMITS.captures(&path), method) + } + // `GET /v5/channel/:id/spender/all` + else if let (Some(caps), &Method::GET) = (CHANNEL_ALL_SPENDER_LIMITS.captures(&path), method) { let param = RouteParams(vec![caps .get(1) @@ -366,7 +379,9 @@ async fn channels_router( .await?; get_all_spender_limits(req, app).await - } else if let (Some(caps), &Method::GET) = (CHANNEL_ACCOUNTING.captures(&path), method) { + } + // `GET /v5/channel/:id/accounting` + else if let (Some(caps), &Method::GET) = (CHANNEL_ACCOUNTING.captures(&path), method) { let param = RouteParams(vec![caps .get(1) .map_or("".to_string(), |m| m.as_str().to_string())]); diff --git a/sentry/src/middleware/campaign.rs b/sentry/src/middleware/campaign.rs index aacdfee6a..746f0033a 100644 --- a/sentry/src/middleware/campaign.rs +++ b/sentry/src/middleware/campaign.rs @@ -33,7 +33,7 @@ impl Middleware for CampaignLoad { let campaign_context = application .config - .find_chain_token(campaign.channel.token) + .find_chain_of(campaign.channel.token) .ok_or(ResponseError::BadRequest( "Channel token not whitelisted".to_string(), ))? diff --git a/sentry/src/middleware/channel.rs b/sentry/src/middleware/channel.rs index f60f7aab6..de0207a31 100644 --- a/sentry/src/middleware/channel.rs +++ b/sentry/src/middleware/channel.rs @@ -45,7 +45,7 @@ fn channel_load( .await? .ok_or(ResponseError::NotFound)?; - let channel_context = app.config.find_chain_token(channel.token).ok_or(ResponseError::FailedValidation("Channel token is not whitelisted in this validator".into()))?.with_channel(channel); + let channel_context = app.config.find_chain_of(channel.token).ok_or(ResponseError::FailedValidation("Channel token is not whitelisted in this validator".into()))?.with_channel(channel); // If this is an authenticated call // Check if the Channel context (Chain Id) aligns with the Authentication token Chain id diff --git a/sentry/src/routes.rs b/sentry/src/routes.rs index 58feed512..b218a253c 100644 --- a/sentry/src/routes.rs +++ b/sentry/src/routes.rs @@ -2,44 +2,79 @@ //! //! ## Channel //! -//! All routes are implemented under module [channel]. +//! All routes are implemented under the module [channel]. +//! +//! ### Route parameters +//! +//! Paths which include these parameters are validated as follows: +//! +//! - `:id` - [`ChannelId`] +//! - `:addr` - a valid [`Address`] or [`ValidatorId`]. +//! +//! ### Routes //! //! - [`GET /v5/channel/list`](crate::routes::channel::channel_list) //! -//! todo +//! Request query parameters: [`ChannelListQuery`](primitives::sentry::channel_list::ChannelListQuery) +//! +//! Response: [`ChannelListResponse`](primitives::sentry::channel_list::ChannelListResponse) //! //! - [`GET /v5/channel/:id/accounting`](channel::get_accounting_for_channel) //! -//! todo +//! Response: [`AccountingResponse::`](primitives::sentry::AccountingResponse) //! //! - [`GET /v5/channel/:id/spender/:addr`](channel::get_spender_limits) (auth required) //! -//! todo +//! Response: [`SpenderResponse`](primitives::sentry::SpenderResponse) //! //! - [`POST /v5/channel/:id/spender/:addr`](channel::add_spender_leaf) (auth required) //! -//! todo +//! This route forces the addition of a spender [`Accounting`] +//! (if one does not exist) to the given [`Channel`] with `spent = 0`. +//! This will also ensure that the spender is added to the [`NewState`] as well. +//! +//! Response: [`SuccessResponse`] //! //! - [`GET /v5/channel/:id/spender/all`](channel::get_all_spender_limits) (auth required) //! -//! todo +//! Response: [`AllSpendersResponse`](primitives::sentry::AllSpendersResponse) +//! +//! - [`GET /v5/channel/:id/validator-messages`][list_validator_messages] +//! +//! - [`GET /v5/channel/:id/validator-messages/:addr`][list_validator_messages] - filter by the given [`ValidatorId`] +//! - [`GET /v5/channel/:id/validator-messages/:addr/:validator_messages`][list_validator_messages] - filters by the given [`ValidatorId`] and a +//! [`Validator message types`][`MessageTypes`]. +//! - `:validator_messages` - url encoded list of [`Validator message types`][`MessageTypes`] separated by a `+`. +//! +//! E.g. `NewState+ApproveState` becomes `NewState%2BApproveState` //! -//! - [`GET /v5/channel/:id/validator-messages`](channel::validator_message::list_validator_messages) +//! Request query parameters: [ValidatorMessagesListQuery](primitives::sentry::ValidatorMessagesListQuery) //! -//! - `GET /v5/channel/:id/validator-messages/:ValidatorId` - filter by ValidatorId -//! - `GET /v5/channel/:id/validator-messages/:ValidatorId/NewState+ApproveState` - filters by a given [`primitives::ValidatorId`] and a -//! [`Validator message types`](primitives::validator::MessageTypes). +//! Response: [ValidatorMessagesListResponse](primitives::sentry::ValidatorMessagesListResponse) //! -//! Request query parameters: [channel::validator_message::ValidatorMessagesListQuery] -//! Response: [primitives::sentry::ValidatorMessageResponse] +//! [list_validator_messages]: channel::validator_message::list_validator_messages //! //! - [`POST /v5/channel/:id/validator-messages`](channel::validator_message::create_validator_messages) (auth required) //! -//! todo +//! Request body (json): [`ValidatorMessagesCreateRequest`](primitives::sentry::ValidatorMessagesCreateRequest) +//! +//! Example: +//! ```json +//! { +//! "messages": [ +//! /// validator messages +//! ... +//! ] +//! } +//! ``` +//! +//! Validator messages: [`MessageTypes`] //! //! - [`POST /v5/channel/:id/last-approved`](channel::last_approved) //! -//! todo +//! Request query parameters: [`LastApprovedQuery`][primitives::sentry::LastApprovedQuery] +//! +//! Response: [`LastApprovedResponse`][primitives::sentry::LastApprovedResponse] //! //! - `POST /v5/channel/:id/pay` (auth required) //! @@ -53,11 +88,11 @@ //! //! TODO: implement and document as part of issue #382 //! -//! This route gets the latest approved state (`NewState`/`ApproveState` pair), +//! This route gets the latest approved state ([`NewState`]/[`ApproveState`] pair), //! and finds the given `spender`/`earner` in the balances tree, and produce a merkle proof for it. //! This is useful for the Platform to verify if a spender leaf really exists. //! -//! Query parameters: +//! Request query parameters: //! //! - `spender=[0x...]` or `earner=[0x...]` (required) //! @@ -72,53 +107,113 @@ //! //! ## Campaign //! -//! All routes are implemented under module [campaign]. +//! All routes are implemented under the module [campaign]. +//! +//! ### Route parameters +//! +//! Paths which include these parameters are validated as follow: +//! +//! - `:id` - [`CampaignId`] +//! +//! ### Routes +//! +//! - [`GET /v5/campaign/list`](campaign::campaign_list) //! -//! - `GET /v5/campaign/list` +//! Lists all campaigns with pagination and orders them in +//! descending order (`DESC`) by `Campaign.created`. +//! This ensures that the order in the pages will not change if a new +//! `Campaign` is created while still retrieving a page. //! -//! Lists all campaigns with pagination and orders them in descending order (`DESC`) by `Campaign.created`. This ensures that the order in the pages will not change if a new `Campaign` is created while still retrieving a page. +//! Request query parameters: [`CampaignListQuery`][primitives::sentry::campaign_list::CampaignListQuery] //! -//! Query parameters: -//! - `page=[integer]` (optional) default: `0` -//! - `creator=[0x....]` (optional) - address of the creator to be filtered by -//! - `activeTo=[integer]` (optional) in seconds - filters campaigns by `Campaign.active.to > query.activeTo` -//! - `validator=[0x...]` or `leader=[0x...]` (optional) - address of the validator to be filtered by. You can either -//! - `validator=[0x...]` - it will return all `Campaign`s where this address is **either** `Channel.leader` or `Channel.follower` -//! - `leader=[0x...]` - it will return all `Campaign`s where this address is `Channel.leader` +//! - `page=[integer]` (optional) default: `0` +//! - `creator=[0x....]` (optional) - address of the creator to be filtered by +//! - `activeTo=[integer]` (optional) in seconds - filters campaigns by `Campaign.active.to > query.activeTo` +//! - `validator=[0x...]` or `leader=[0x...]` (optional) - address of the validator to be filtered by. You can either +//! - `validator=[0x...]` - it will return all `Campaign`s where this address is **either** `Channel.leader` or `Channel.follower` +//! - `leader=[0x...]` - it will return all `Campaign`s where this address is `Channel.leader` //! +//! Response: [`CampaignListResponse`][primitives::sentry::campaign_list::CampaignListResponse] //! -//! - `POST /v5/campaign` (auth required) +//! - [`POST /v5/campaign`](campaign::create_campaign) (auth required) //! -//! Create a new Campaign. +//! Create a new Campaign. Request must be sent by the [`Campaign.creator`](primitives::Campaign::creator). //! -//! It will make sure the `Channel` is created if new and it will update the spendable amount using the `Adapter::get_deposit()`. +//! **Authentication is required** to validate [`Campaign.creator`](primitives::Campaign::creator) == [`Auth.uid`](crate::Auth::uid) //! -//! Authentication: **required** to validate `Campaign.creator == Auth.uid` +//! It will make sure the `Channel` is created if new and it will update +//! the spendable amount using the [`Adapter`]`::get_deposit()`. //! -//! Request Body: [`primitives::sentry::campaign_create::CreateCampaign`] (json) //! -//! `POST /v5/campaign/:id/close` (auth required) +//! Request body (json): [`CreateCampaign`][primitives::sentry::campaign_create::CreateCampaign] //! -//! todo +//! Response: [`Campaign`] +//! +//! - [`POST /v5/campaign/:id`](campaign::update_campaign::handle_route) (auth required) +//! +//! Modify the [`Campaign`]. Request must be sent by the [`Campaign.creator`](primitives::Campaign::creator). +//! +//! **Authentication is required** to validate [`Campaign.creator`](primitives::Campaign::creator) == [`Auth.uid`](crate::Auth::uid) +//! +//! Request body (json): [`ModifyCampaign`][primitives::sentry::campaign_modify::ModifyCampaign] +//! +//! Response: [`Campaign`] +//! +//! - [`POST /v5/campaign/:id/close`](campaign::close_campaign) (auth required) +//! +//! Close the campaign. +//! +//! Request must be sent by the [`Campaign.creator`](primitives::Campaign::creator). +//! +//! **Authentication is required** to validate [`Campaign.creator`](primitives::Campaign::creator) == [`Auth.uid`](crate::Auth::uid) +//! +//! Closes the campaign by setting [`Campaign.budget`](primitives::Campaign::budget) so that `remaining budget = 0`. +//! +//! Response: [`SuccessResponse`] //! //! ## Analytics //! -//! - `GET /v5/analytics` +//! - [`GET /v5/analytics`](get_analytics) +//! +//! Allowed keys: [`AllowedKey::Country`][primitives::analytics::query::AllowedKey::Country] & [`AllowedKey::AdSlotType`][primitives::analytics::query::AllowedKey::AdSlotType] +//! +//! - [`GET /v5/analytics/for-publisher`](get_analytics) (auth required) //! -//! todo +//! Returns all analytics where the currently authenticated address [`Auth.uid`](crate::Auth::uid) is a **publisher**. //! -//! - `GET /v5/analytics/for-publisher` (auth required) +//! All [`ALLOWED_KEYS`] are allowed for this route. //! -//! todo //! -//! - `GET /v5/analytics/for-advertiser` (auth required) +//! - [`GET /v5/analytics/for-advertiser`](get_analytics) (auth required) //! -//! todo +//! Returns all analytics where the currently authenticated address [`Auth.uid`](crate::Auth::uid) is an **advertiser**. //! -//! - `GET /v5/analytics/for-admin` (auth required) +//! All [`ALLOWED_KEYS`] are allowed for this route. //! -//! todo +//! - [`GET /v5/analytics/for-admin`](get_analytics) (auth required) //! +//! Admin access to the analytics with no restrictions on the keys for filtering. +//! +//! All [`ALLOWED_KEYS`] are allowed for admins. +//! +//! Admin addresses are configured in the [`Config.admins`](primitives::Config::admins) +//! +//! [`Adapter`]: adapter::Adapter +//! [`Address`]: primitives::Address +//! [`AllowedKey`]: primitives::analytics::query::AllowedKey +//! [`ALLOWED_KEYS`]: primitives::analytics::query::ALLOWED_KEYS +//! [`ApproveState`]: primitives::validator::ApproveState +//! [`Accounting`]: crate::db::accounting::Accounting +//! [`AccountingResponse`]: primitives::sentry::AccountingResponse +//! [`Campaign`]: primitives::Campaign +//! [`CampaignId`]: primitives::CampaignId +//! [`ChannelId`]: primitives::ChannelId +//! [`Channel`]: primitives::Channel +//! [`MessageTypes`]: primitives::validator::MessageTypes +//! [`NewState`]: primitives::validator::NewState +//! [`SuccessResponse`]: primitives::sentry::SuccessResponse +//! [`ValidatorId`]: primitives::ValidatorId + pub use analytics::analytics as get_analytics; pub use cfg::config as get_cfg; diff --git a/sentry/src/routes/analytics.rs b/sentry/src/routes/analytics.rs index 953362ecd..99dcccbef 100644 --- a/sentry/src/routes/analytics.rs +++ b/sentry/src/routes/analytics.rs @@ -8,10 +8,10 @@ use adapter::client::Locked; use hyper::{Body, Request, Response}; use primitives::analytics::{ query::{AllowedKey, ALLOWED_KEYS}, - AnalyticsQuery, AuthenticateAs, ANALYTICS_QUERY_LIMIT, + AnalyticsQuery, AuthenticateAs, }; -/// `GET /analytics` request +/// `GET /v5/analytics` request /// with query parameters: [`primitives::analytics::AnalyticsQuery`]. pub async fn analytics( req: Request, @@ -21,7 +21,7 @@ pub async fn analytics( ) -> Result, ResponseError> { let query = serde_urlencoded::from_str::(req.uri().query().unwrap_or(""))?; - let applied_limit = query.limit.min(ANALYTICS_QUERY_LIMIT); + let applied_limit = query.limit.min(app.config.analytics_find_limit); let route_allowed_keys: HashSet = request_allowed.unwrap_or_else(|| ALLOWED_KEYS.clone()); @@ -54,14 +54,30 @@ pub async fn analytics( ))); } - let analytics = get_analytics( - &app.pool, - query.clone(), - route_allowed_keys, - authenticate_as, - applied_limit, + let analytics_maxtime = std::time::Duration::from_millis(app.config.analytics_maxtime.into()); + + let analytics = match tokio::time::timeout( + analytics_maxtime, + get_analytics( + &app.pool, + query.clone(), + route_allowed_keys, + authenticate_as, + applied_limit, + ), ) - .await?; + .await + { + Ok(Ok(analytics)) => analytics, + // Error getting the analytics + Ok(Err(err)) => return Err(err.into()), + // Timeout error + Err(_elapsed) => { + return Err(ResponseError::BadRequest( + "Timeout when fetching analytics data".into(), + )) + } + }; Ok(success_response(serde_json::to_string(&analytics)?)) } @@ -250,7 +266,7 @@ mod test { // Test with no optional values { let req = Request::builder() - .uri("http://127.0.0.1/analytics?limit=100&eventType=CLICK&metric=count&timeframe=day") + .uri("http://127.0.0.1/v5/analytics?limit=100&eventType=CLICK&metric=count&timeframe=day") .body(Body::empty()) .expect("Should build Request"); @@ -311,7 +327,7 @@ mod test { }; let query = serde_urlencoded::to_string(query).expect("should parse query"); let req = Request::builder() - .uri(format!("http://127.0.0.1/analytics?{}", query)) + .uri(format!("http://127.0.0.1/v5/analytics?{}", query)) .body(Body::empty()) .expect("Should build Request"); @@ -366,7 +382,7 @@ mod test { }; let query = serde_urlencoded::to_string(query).expect("should parse query"); let req = Request::builder() - .uri(format!("http://127.0.0.1/analytics?{}", query)) + .uri(format!("http://127.0.0.1/v5/analytics?{}", query)) .body(Body::empty()) .expect("Should build Request"); @@ -425,7 +441,7 @@ mod test { }; let query = serde_urlencoded::to_string(query).expect("should serialize query"); let req = Request::builder() - .uri(format!("http://127.0.0.1/analytics?{}", query)) + .uri(format!("http://127.0.0.1/v5/analytics?{}", query)) .body(Body::empty()) .expect("Should build Request"); let analytics_response = analytics( @@ -485,7 +501,7 @@ mod test { }; let query = serde_urlencoded::to_string(query).expect("should parse query"); let req = Request::builder() - .uri(format!("http://127.0.0.1/analytics?{}", query)) + .uri(format!("http://127.0.0.1/v5/analytics?{}", query)) .body(Body::empty()) .expect("Should build Request"); @@ -531,7 +547,7 @@ mod test { // Test with not allowed segment by { let req = Request::builder() - .uri("http://127.0.0.1/analytics?limit=100&eventType=CLICK&metric=count&timeframe=day&segmentBy=campaignId&campaignId=0x936da01f9abd4d9d80c702af85c822a8") + .uri("http://127.0.0.1/v5/analytics?limit=100&eventType=CLICK&metric=count&timeframe=day&segmentBy=campaignId&campaignId=0x936da01f9abd4d9d80c702af85c822a8") .body(Body::empty()) .expect("Should build Request"); @@ -557,7 +573,7 @@ mod test { // test with not allowed key { let req = Request::builder() - .uri("http://127.0.0.1/analytics?limit=100&eventType=CLICK&metric=count&timeframe=day&campaignId=0x936da01f9abd4d9d80c702af85c822a8") + .uri("http://127.0.0.1/v5/analytics?limit=100&eventType=CLICK&metric=count&timeframe=day&campaignId=0x936da01f9abd4d9d80c702af85c822a8") .body(Body::empty()) .expect("Should build Request"); @@ -583,7 +599,7 @@ mod test { // test with different metric { let req = Request::builder() - .uri("http://127.0.0.1/analytics?limit=100&eventType=CLICK&metric=paid&timeframe=day") + .uri("http://127.0.0.1/v5/analytics?limit=100&eventType=CLICK&metric=paid&timeframe=day") .body(Body::empty()) .expect("Should build Request"); @@ -622,7 +638,7 @@ mod test { // Test with different timeframe { let req = Request::builder() - .uri("http://127.0.0.1/analytics?limit=100&eventType=CLICK&metric=count&timeframe=week") + .uri("http://127.0.0.1/v5/analytics?limit=100&eventType=CLICK&metric=count&timeframe=week") .body(Body::empty()) .expect("Should build Request"); @@ -663,7 +679,7 @@ mod test { { let req = Request::builder() .uri( - "http://127.0.0.1/analytics?limit=2&eventType=CLICK&metric=count&timeframe=day", + "http://127.0.0.1/v5/analytics?limit=2&eventType=CLICK&metric=count&timeframe=day", ) .body(Body::empty()) .expect("Should build Request"); @@ -700,7 +716,7 @@ mod test { { let req = Request::builder() .uri( - "http://127.0.0.1/analytics?limit=100&eventType=CLICK&metric=count&timeframe=month", + "http://127.0.0.1/v5/analytics?limit=100&eventType=CLICK&metric=count&timeframe=month", ) .body(Body::empty()) .expect("Should build Request"); @@ -740,7 +756,7 @@ mod test { // Test with a year timeframe { let req = Request::builder() - .uri("http://127.0.0.1/analytics?limit=100&eventType=CLICK&metric=count&timeframe=year") + .uri("http://127.0.0.1/v5/analytics?limit=100&eventType=CLICK&metric=count&timeframe=year") .body(Body::empty()) .expect("Should build Request"); @@ -799,7 +815,7 @@ mod test { }; let query = serde_urlencoded::to_string(query).expect("should parse query"); let req = Request::builder() - .uri(format!("http://127.0.0.1/analytics?{}", query)) + .uri(format!("http://127.0.0.1/v5/analytics?{}", query)) .body(Body::empty()) .expect("Should build Request"); let analytics_response = analytics( @@ -998,7 +1014,7 @@ mod test { { let req = Request::builder() .extension(publisher_auth.clone()) - .uri("http://127.0.0.1/analytics?limit=100&eventType=CLICK&metric=count&timeframe=day") + .uri("http://127.0.0.1/v5/analytics?limit=100&eventType=CLICK&metric=count&timeframe=day") .body(Body::empty()) .expect("Should build Request"); @@ -1029,7 +1045,7 @@ mod test { { let req = Request::builder() .extension(advertiser_auth.clone()) - .uri("http://127.0.0.1/analytics?limit=100&eventType=CLICK&metric=count&timeframe=day") + .uri("http://127.0.0.1/v5/analytics?limit=100&eventType=CLICK&metric=count&timeframe=day") .body(Body::empty()) .expect("Should build Request"); @@ -1060,7 +1076,7 @@ mod test { { let req = Request::builder() .extension(admin_auth.clone()) - .uri("http://127.0.0.1/analytics?limit=100&eventType=CLICK&metric=count&timeframe=day") + .uri("http://127.0.0.1/v5/analytics?limit=100&eventType=CLICK&metric=count&timeframe=day") .body(Body::empty()) .expect("Should build Request"); @@ -1109,7 +1125,7 @@ mod test { }; let query = serde_urlencoded::to_string(query).expect("should parse query"); let req = Request::builder() - .uri(format!("http://127.0.0.1/analytics?{}", query)) + .uri(format!("http://127.0.0.1/v5/analytics?{}", query)) .body(Body::empty()) .expect("Should build Request"); @@ -1132,7 +1148,7 @@ mod test { // TODO: Move test to a analytics_router test // test with no authUid // let req = Request::builder() - // .uri("http://127.0.0.1/analytics?limit=100&eventType=CLICK&metric=count&timeframe=day") + // .uri("http://127.0.0.1/v5/analytics?limit=100&eventType=CLICK&metric=count&timeframe=day") // .body(Body::empty()) // .expect("Should build Request"); diff --git a/sentry/src/routes/campaign.rs b/sentry/src/routes/campaign.rs index c2f2bbe11..6f049c922 100644 --- a/sentry/src/routes/campaign.rs +++ b/sentry/src/routes/campaign.rs @@ -19,9 +19,8 @@ use hyper::{Body, Request, Response}; use primitives::{ campaign_validator::Validator, sentry::{ - campaign::CampaignListQuery, - campaign_create::{CreateCampaign, ModifyCampaign}, - SuccessResponse, + campaign_create::CreateCampaign, campaign_list::CampaignListQuery, + campaign_modify::ModifyCampaign, SuccessResponse, }, spender::Spendable, Address, Campaign, CampaignId, ChainOf, Channel, ChannelId, Deposit, UnifiedNum, @@ -1000,7 +999,7 @@ mod test { #[tokio::test] /// Test single campaign creation and modification // & - /// Test with multiple campaigns (because of Budget) a modification of campaign + /// Test with multiple campaigns (because of Budget) and modifications async fn create_and_modify_with_multiple_campaigns() { let app = setup_dummy_app().await; @@ -1008,7 +1007,7 @@ mod test { let dummy_channel = DUMMY_CAMPAIGN.channel; let channel_chain = app .config - .find_chain_token(dummy_channel.token) + .find_chain_of(dummy_channel.token) .expect("Channel token should be whitelisted in config!"); let channel_context = channel_chain.with_channel(dummy_channel); @@ -1336,7 +1335,7 @@ mod test { let campaign_context = app .config - .find_chain_token(campaign.channel.token) + .find_chain_of(campaign.channel.token) .expect("Config should have the Dummy campaign.channel.token") .with(campaign.clone()); diff --git a/sentry/src/routes/channel.rs b/sentry/src/routes/channel.rs index b90ee1ecf..c316c251a 100644 --- a/sentry/src/routes/channel.rs +++ b/sentry/src/routes/channel.rs @@ -1,11 +1,11 @@ -//! Channel - `/v5/channel` routes +//! `/v5/channel` routes //! use crate::db::{ accounting::{get_all_accountings_for_channel, update_accounting, Side}, - event_aggregate::{latest_approve_state_v5, latest_heartbeats, latest_new_state_v5}, insert_channel, list_channels, spendable::{fetch_spendable, get_all_spendables_for_channel, update_spendable}, + validator_message::{latest_approve_state, latest_heartbeats, latest_new_state}, DbPool, }; use crate::{success_response, Application, ResponseError, RouteParams}; @@ -25,6 +25,11 @@ use primitives::{ use slog::{error, Logger}; use std::{collections::HashMap, str::FromStr}; +/// `GET /v5/channel/list` request +/// +/// Query: [`ChannelListQuery`] +/// +/// Response: [`ChannelListResponse`](primitives::sentry::channel_list::ChannelListResponse) pub async fn channel_list( req: Request, app: &Application, @@ -46,6 +51,11 @@ pub async fn channel_list( Ok(success_response(serde_json::to_string(&list_response)?)) } +/// `GET /v5/channel/0xXXX.../last-approved` request +/// +/// Query: [`LastApprovedQuery`] +/// +/// Response: [`LastApprovedResponse`] pub async fn last_approved( req: Request, app: &Application, @@ -68,14 +78,14 @@ pub async fn last_approved( ) .expect("should build response"); - let approve_state = match latest_approve_state_v5(&app.pool, &channel).await? { + let approve_state = match latest_approve_state(&app.pool, &channel).await? { Some(approve_state) => approve_state, None => return Ok(default_response), }; let state_root = approve_state.msg.state_root.clone(); - let new_state = latest_new_state_v5(&app.pool, &channel, &state_root).await?; + let new_state = latest_new_state(&app.pool, &channel, &state_root).await?; if new_state.is_none() { return Ok(default_response); } @@ -161,6 +171,9 @@ fn spender_response_without_leaf( Ok(success_response(serde_json::to_string(&res)?)) } +/// `GET /v5/channel/0xXXX.../spender/0xXXX...` request +/// +/// Response: [`SpenderResponse`] pub async fn get_spender_limits( req: Request, app: &Application, @@ -215,6 +228,9 @@ pub async fn get_spender_limits( Ok(success_response(serde_json::to_string(&res)?)) } +/// `GET /v5/channel/0xXXX.../spender/all` request +/// +/// Response: [`AllSpendersResponse`] pub async fn get_all_spender_limits( req: Request, app: &Application, @@ -269,6 +285,8 @@ pub async fn get_all_spender_limits( Ok(success_response(serde_json::to_string(&res)?)) } +/// `POST /v5/channel/0xXXX.../spender/0xXXX...` request +/// /// internally, to make the validator worker to add a spender leaf in NewState we'll just update Accounting pub async fn add_spender_leaf( req: Request, @@ -306,14 +324,14 @@ async fn get_corresponding_new_state( logger: &Logger, channel: &Channel, ) -> Result>, ResponseError> { - let approve_state = match latest_approve_state_v5(pool, channel).await? { + let approve_state = match latest_approve_state(pool, channel).await? { Some(approve_state) => approve_state, None => return Ok(None), }; let state_root = approve_state.msg.state_root.clone(); - let new_state = match latest_new_state_v5(pool, channel, &state_root).await? { + let new_state = match latest_new_state(pool, channel, &state_root).await? { Some(new_state) => { let new_state = new_state.msg.into_inner().try_checked().map_err(|err| { error!(&logger, "Balances are not aligned in an approved NewState: {}", &err; "module" => "get_spender_limits"); @@ -378,10 +396,8 @@ pub async fn get_accounting_for_channel( /// starting with `/v5/channel/0xXXX.../validator-messages` /// pub mod validator_message { - use std::collections::HashMap; - use crate::{ - db::{get_validator_messages, insert_validator_messages}, + db::validator_message::{get_validator_messages, insert_validator_messages}, Auth, }; use crate::{success_response, Application, ResponseError}; @@ -389,17 +405,12 @@ pub mod validator_message { use futures::future::try_join_all; use hyper::{Body, Request, Response}; use primitives::{ - sentry::{SuccessResponse, ValidatorMessageResponse}, - validator::MessageTypes, - ChainOf, + sentry::{ + SuccessResponse, ValidatorMessagesCreateRequest, ValidatorMessagesListQuery, + ValidatorMessagesListResponse, + }, + ChainOf, Channel, DomainError, ValidatorId, }; - use primitives::{Channel, DomainError, ValidatorId}; - use serde::Deserialize; - - #[derive(Deserialize)] - pub struct ValidatorMessagesListQuery { - limit: Option, - } pub fn extract_params( from_path: &str, @@ -460,12 +471,18 @@ pub mod validator_message { get_validator_messages(&app.pool, &channel.id(), validator_id, message_types, limit) .await?; - let response = ValidatorMessageResponse { validator_messages }; + let response = ValidatorMessagesListResponse { + messages: validator_messages, + }; Ok(success_response(serde_json::to_string(&response)?)) } - /// `POST /v5/channel/0xXXX.../validator-messages` with Request body (json): + /// `POST /v5/channel/0xXXX.../validator-messages` + /// with Request body (json): [ValidatorMessagesCreateRequest] + /// + /// # Example + /// /// ```json /// { /// "messages": [ @@ -475,7 +492,9 @@ pub mod validator_message { /// } /// ``` /// - /// Validator messages: [`MessageTypes`] + /// Validator messages: [`MessageTypes`][primitives::validator::MessageTypes] + /// + /// Response: [`SuccessResponse`] pub async fn create_validator_messages( req: Request, app: &Application, @@ -483,7 +502,7 @@ pub mod validator_message { let session = req .extensions() .get::() - .expect("auth request session") + .ok_or(ResponseError::Unauthorized)? .to_owned(); let channel = req @@ -495,15 +514,13 @@ pub mod validator_message { let into_body = req.into_body(); let body = hyper::body::to_bytes(into_body).await?; - let request_body = serde_json::from_slice::>>(&body)?; - let messages = request_body - .get("messages") - .ok_or_else(|| ResponseError::BadRequest("missing messages body".to_string()))?; + let create_request = serde_json::from_slice::(&body) + .map_err(|_err| ResponseError::BadRequest("Bad Request body json".to_string()))?; match channel.find_validator(session.uid) { None => Err(ResponseError::Unauthorized), _ => { - try_join_all(messages.iter().map(|message| { + try_join_all(create_request.messages.iter().map(|message| { insert_validator_messages(&app.pool, &channel, &session.uid, message) })) .await?; @@ -537,7 +554,7 @@ mod test { let channel = DUMMY_CAMPAIGN.channel; let channel_context = app .config - .find_chain_token(DUMMY_CAMPAIGN.channel.token) + .find_chain_of(DUMMY_CAMPAIGN.channel.token) .expect("should retrieve Chain & token"); (channel_context.with_channel(channel), channel) @@ -623,7 +640,7 @@ mod test { .expect("Should get json"); let accounting_response: AccountingResponse = - serde_json::from_slice(&json).expect("Should get AccouuntingResponse"); + serde_json::from_slice(&json).expect("Should get AccountingResponse"); accounting_response } @@ -632,7 +649,7 @@ mod test { let app = setup_dummy_app().await; let channel_context = app .config - .find_chain_token(DUMMY_CAMPAIGN.channel.token) + .find_chain_of(DUMMY_CAMPAIGN.channel.token) .expect("Dummy channel Token should be present in config!") .with(DUMMY_CAMPAIGN.channel); @@ -752,7 +769,7 @@ mod test { let app = setup_dummy_app().await; let channel_context = app .config - .find_chain_token(DUMMY_CAMPAIGN.channel.token) + .find_chain_of(DUMMY_CAMPAIGN.channel.token) .expect("Dummy channel Token should be present in config!") .with(DUMMY_CAMPAIGN.channel); diff --git a/test_harness/src/lib.rs b/test_harness/src/lib.rs index bff5fd9ea..e1500217b 100644 --- a/test_harness/src/lib.rs +++ b/test_harness/src/lib.rs @@ -413,7 +413,7 @@ mod tests { assert_eq!(CAMPAIGN_1.channel.token, CAMPAIGN_2.channel.token); let token_chain = GANACHE_CONFIG - .find_chain_token(CAMPAIGN_1.channel.token) + .find_chain_of(CAMPAIGN_1.channel.token) .expect("Should find CAMPAIGN_1 channel token address in Config!"); assert_eq!(&token_chain.chain, &chain, "CAMPAIGN_1 & CAMPAIGN_2 should be both using the same Chain which is setup in the Ganache Config"); diff --git a/validator_worker/Cargo.toml b/validator_worker/Cargo.toml index fce6cd5f2..434f90f22 100644 --- a/validator_worker/Cargo.toml +++ b/validator_worker/Cargo.toml @@ -44,3 +44,7 @@ clap = "^2.33" [dev-dependencies] wiremock = "0.5" + +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] diff --git a/validator_worker/src/follower.rs b/validator_worker/src/follower.rs index 10d328434..51869b353 100644 --- a/validator_worker/src/follower.rs +++ b/validator_worker/src/follower.rs @@ -296,7 +296,7 @@ async fn on_new_state<'a, C: Unlocked + 'static>( let propagation_result = sentry .propagate( channel_context, - &[&MessageTypes::ApproveState(ApproveState { + &[MessageTypes::ApproveState(ApproveState { state_root: proposed_state_root, signature, is_healthy, @@ -316,7 +316,7 @@ async fn on_error<'a, C: Unlocked + 'static>( let propagation = sentry .propagate( channel_context, - &[&MessageTypes::RejectState(RejectState { + &[MessageTypes::RejectState(RejectState { reason: status.to_string(), state_root: new_state.state_root.clone(), signature: new_state.signature.clone(), diff --git a/validator_worker/src/heartbeat.rs b/validator_worker/src/heartbeat.rs index 87cb5578b..7b9032af9 100644 --- a/validator_worker/src/heartbeat.rs +++ b/validator_worker/src/heartbeat.rs @@ -70,5 +70,5 @@ async fn send_heartbeat( timestamp: Utc::now(), }); - Ok(iface.propagate(channel_context, &[&message_types]).await?) + Ok(iface.propagate(channel_context, &[message_types]).await?) } diff --git a/validator_worker/src/leader.rs b/validator_worker/src/leader.rs index 6ae0382aa..613e09dae 100644 --- a/validator_worker/src/leader.rs +++ b/validator_worker/src/leader.rs @@ -117,7 +117,7 @@ async fn on_new_accounting( let propagation_results = sentry .propagate( channel_context, - &[&MessageTypes::NewState(NewState { + &[MessageTypes::NewState(NewState { state_root, signature, balances: accounting_balances.into_unchecked(), diff --git a/validator_worker/src/lib.rs b/validator_worker/src/lib.rs index 3276ac072..3d2eda6f6 100644 --- a/validator_worker/src/lib.rs +++ b/validator_worker/src/lib.rs @@ -1,6 +1,6 @@ #![deny(rust_2018_idioms)] #![deny(clippy::all)] -#![allow(deprecated)] +#![cfg_attr(docsrs, feature(doc_cfg))] use adapter::util::{get_balance_leaf, get_signable_state_root, BalanceLeafError}; use primitives::{ diff --git a/validator_worker/src/sentry_interface.rs b/validator_worker/src/sentry_interface.rs index c558c4f40..d60ae5b7c 100644 --- a/validator_worker/src/sentry_interface.rs +++ b/validator_worker/src/sentry_interface.rs @@ -12,7 +12,7 @@ use primitives::{ balances::{CheckedState, UncheckedState}, sentry::{ AccountingResponse, AllSpendersResponse, LastApprovedResponse, SuccessResponse, - ValidatorMessageResponse, + ValidatorMessagesCreateRequest, ValidatorMessagesListResponse, }, spender::Spender, util::ApiUrl, @@ -208,15 +208,15 @@ impl SentryApi { )) .expect("Should not error when creating endpoint url"); - let result = self + let response = self .client .get(endpoint) .send() .await? - .json::() + .json::() .await?; - Ok(result.validator_messages.into_iter().next().map(|m| m.msg)) + Ok(response.messages.into_iter().next().map(|m| m.msg)) } pub async fn get_our_latest_msg( @@ -352,7 +352,7 @@ impl SentryApi { let (validators, channels) = campaigns.into_iter().fold( (ChainsValidators::new(), HashSet::>::new()), |(mut validators, mut channels), campaign| { - let channel_context = match self.config.find_chain_token(campaign.channel.token) { + let channel_context = match self.config.find_chain_of(campaign.channel.token) { Some(chain_of) => chain_of.with_channel(campaign.channel), // Skip the current Channel as the Chain/Token is not configured None => return (validators, channels), @@ -416,7 +416,7 @@ impl SentryApi { pub async fn propagate( &self, channel_context: &ChainOf, - messages: &[&MessageTypes], + messages: &[MessageTypes], ) -> Result, Error> { let chain_validators = self .propagate_to @@ -470,21 +470,22 @@ async fn propagate_to( timeout: u32, channel_id: ChannelId, (validator_id, validator): (ValidatorId, &Validator), - messages: &[&MessageTypes], + messages: &[MessageTypes], ) -> PropagationResult { let endpoint = validator .url .join(&format!("v5/channel/{}/validator-messages", channel_id)) .expect("Should not error when creating endpoint url"); - let mut body = HashMap::new(); - body.insert("messages", messages); + let request_body = ValidatorMessagesCreateRequest { + messages: messages.to_vec(), + }; let _response: SuccessResponse = client .request(Method::POST, endpoint) .timeout(Duration::from_millis(timeout.into())) .bearer_auth(&validator.token) - .json(&body) + .json(&request_body) .send() .await .map_err(|e| (validator_id, Error::Request(e)))? @@ -536,7 +537,6 @@ pub mod channels { ) -> Result { let query = ChannelListQuery { page, - creator: None, validator: Some(validator), }; @@ -559,7 +559,7 @@ pub mod campaigns { use chrono::Utc; use futures::future::try_join_all; use primitives::{ - sentry::campaign::{CampaignListQuery, CampaignListResponse, ValidatorParam}, + sentry::campaign_list::{CampaignListQuery, CampaignListResponse, ValidatorParam}, util::ApiUrl, Campaign, ValidatorId, }; @@ -758,7 +758,7 @@ mod test { let logger = discard_logger(); let channel_context = config - .find_chain_token(DUMMY_CAMPAIGN.channel.token) + .find_chain_of(DUMMY_CAMPAIGN.channel.token) .expect("Should find Dummy campaign token in config") .with_channel(DUMMY_CAMPAIGN.channel);