diff --git a/tap_core/src/error.rs b/tap_core/src/error.rs index 9a4a31f8..052fd06b 100644 --- a/tap_core/src/error.rs +++ b/tap_core/src/error.rs @@ -9,7 +9,7 @@ use std::result::Result as StdResult; use alloy::primitives::{Address, SignatureError}; use thiserror::Error as ThisError; -use crate::{rav::ReceiptAggregateVoucher, receipt::ReceiptError}; +use crate::receipt::ReceiptError; /// Error type for the TAP protocol #[derive(ThisError, Debug)] @@ -38,8 +38,8 @@ pub enum Error { /// Error when the received RAV does not match the expected RAV #[error("Received RAV does not match expexted RAV")] InvalidReceivedRAV { - received_rav: ReceiptAggregateVoucher, - expected_rav: ReceiptAggregateVoucher, + received_rav: String, + expected_rav: String, }, /// Generic error from the adapter #[error("Error from adapter.\n Caused by: {source_error}")] diff --git a/tap_core/src/manager/adapters/escrow.rs b/tap_core/src/manager/adapters/escrow.rs index 6570c513..c4a0250c 100644 --- a/tap_core/src/manager/adapters/escrow.rs +++ b/tap_core/src/manager/adapters/escrow.rs @@ -1,12 +1,13 @@ // Copyright 2023-, Semiotic AI, Inc. // SPDX-License-Identifier: Apache-2.0 -use alloy::{dyn_abi::Eip712Domain, primitives::Address}; +use alloy::{dyn_abi::Eip712Domain, primitives::Address, sol_types::SolStruct}; use async_trait::async_trait; use crate::{ - rav::SignedRAV, + manager::WithValueAndTimestamp, receipt::{state::AwaitingReserve, ReceiptError, ReceiptResult, ReceiptWithState}, + signed_message::EIP712SignedMessage, Error, }; @@ -41,9 +42,9 @@ pub trait EscrowHandler: Send + Sync { async fn verify_signer(&self, signer_address: Address) -> Result; /// Checks and reserves escrow for the received receipt - async fn check_and_reserve_escrow( + async fn check_and_reserve_escrow( &self, - received_receipt: &ReceiptWithState, + received_receipt: &ReceiptWithState, domain_separator: &Eip712Domain, ) -> ReceiptResult<()> { let signed_receipt = &received_receipt.signed_receipt; @@ -55,7 +56,7 @@ pub trait EscrowHandler: Send + Sync { })?; if self - .subtract_escrow(receipt_signer_address, signed_receipt.message.value) + .subtract_escrow(receipt_signer_address, signed_receipt.message.value()) .await .is_err() { @@ -66,9 +67,9 @@ pub trait EscrowHandler: Send + Sync { } /// Checks the signature of the RAV - async fn check_rav_signature( + async fn check_rav_signature( &self, - signed_rav: &SignedRAV, + signed_rav: &EIP712SignedMessage, domain_separator: &Eip712Domain, ) -> Result<(), Error> { let recovered_address = signed_rav.recover_signer(domain_separator)?; diff --git a/tap_core/src/manager/adapters/rav.rs b/tap_core/src/manager/adapters/rav.rs index 3e719b98..96b48f5f 100644 --- a/tap_core/src/manager/adapters/rav.rs +++ b/tap_core/src/manager/adapters/rav.rs @@ -1,9 +1,10 @@ // Copyright 2023-, Semiotic AI, Inc. // SPDX-License-Identifier: Apache-2.0 +use alloy::sol_types::SolStruct; use async_trait::async_trait; -use crate::rav::SignedRAV; +use crate::signed_message::EIP712SignedMessage; /// Stores the latest RAV in the storage. /// @@ -12,7 +13,10 @@ use crate::rav::SignedRAV; /// For example code see [crate::manager::context::memory::RAVStorage] #[async_trait] -pub trait RAVStore { +pub trait RAVStore +where + T: SolStruct, +{ /// Defines the user-specified error type. /// /// This error type should implement the `Error` and `Debug` traits from @@ -25,7 +29,7 @@ pub trait RAVStore { /// This method should be implemented to store the most recent validated /// `SignedRAV` into your chosen storage system. Any errors that occur /// during this process should be captured and returned as an `AdapterError`. - async fn update_last_rav(&self, rav: SignedRAV) -> Result<(), Self::AdapterError>; + async fn update_last_rav(&self, rav: EIP712SignedMessage) -> Result<(), Self::AdapterError>; } /// Reads the RAV from storage @@ -35,7 +39,10 @@ pub trait RAVStore { /// For example code see [crate::manager::context::memory::RAVStorage] #[async_trait] -pub trait RAVRead { +pub trait RAVRead +where + T: SolStruct, +{ /// Defines the user-specified error type. /// /// This error type should implement the `Error` and `Debug` traits from @@ -46,5 +53,5 @@ pub trait RAVRead { /// Retrieves the latest `SignedRAV` from the storage. /// /// If no `SignedRAV` is available, this method should return `None`. - async fn last_rav(&self) -> Result, Self::AdapterError>; + async fn last_rav(&self) -> Result>, Self::AdapterError>; } diff --git a/tap_core/src/manager/adapters/receipt.rs b/tap_core/src/manager/adapters/receipt.rs index 92fe2972..74d2792a 100644 --- a/tap_core/src/manager/adapters/receipt.rs +++ b/tap_core/src/manager/adapters/receipt.rs @@ -3,11 +3,15 @@ use std::ops::RangeBounds; +use alloy::sol_types::SolStruct; use async_trait::async_trait; -use crate::receipt::{ - state::{Checking, ReceiptState}, - ReceiptWithState, +use crate::{ + manager::WithValueAndTimestamp, + receipt::{ + state::{Checking, ReceiptState}, + ReceiptWithState, + }, }; /// Stores receipts in the storage. @@ -16,7 +20,10 @@ use crate::receipt::{ /// /// For example code see [crate::manager::context::memory::ReceiptStorage] #[async_trait] -pub trait ReceiptStore { +pub trait ReceiptStore +where + T: SolStruct, +{ /// Defines the user-specified error type. /// /// This error type should implement the `Error` and `Debug` traits from the standard library. @@ -29,7 +36,7 @@ pub trait ReceiptStore { /// this process should be captured and returned as an `AdapterError`. async fn store_receipt( &self, - receipt: ReceiptWithState, + receipt: ReceiptWithState, ) -> Result; } @@ -62,7 +69,10 @@ pub trait ReceiptDelete { /// /// For example code see [crate::manager::context::memory::ReceiptStorage] #[async_trait] -pub trait ReceiptRead { +pub trait ReceiptRead +where + T: SolStruct, +{ /// Defines the user-specified error type. /// /// This error type should implement the `Error` and `Debug` traits from @@ -92,15 +102,15 @@ pub trait ReceiptRead { &self, timestamp_range_ns: R, limit: Option, - ) -> Result>, Self::AdapterError>; + ) -> Result>, Self::AdapterError>; } /// See [`ReceiptRead::retrieve_receipts_in_timestamp_range()`] for details. /// /// WARNING: Will sort the receipts by timestamp using /// [vec::sort_unstable](https://doc.rust-lang.org/std/vec/struct.Vec.html#method.sort_unstable). -pub fn safe_truncate_receipts( - receipts: &mut Vec>, +pub fn safe_truncate_receipts( + receipts: &mut Vec>, limit: u64, ) { if receipts.len() <= limit as usize { @@ -110,18 +120,18 @@ pub fn safe_truncate_receipts( return; } - receipts.sort_unstable_by_key(|rx_receipt| rx_receipt.signed_receipt().message.timestamp_ns); + receipts.sort_unstable_by_key(|rx_receipt| rx_receipt.signed_receipt().message.timestamp()); // This one will be the last timestamp in `receipts` after naive truncation let last_timestamp = receipts[limit as usize - 1] .signed_receipt() .message - .timestamp_ns; + .timestamp(); // This one is the timestamp that comes just after the one above let after_last_timestamp = receipts[limit as usize] .signed_receipt() .message - .timestamp_ns; + .timestamp(); receipts.truncate(limit as usize); @@ -129,8 +139,7 @@ pub fn safe_truncate_receipts( // If the last timestamp is the same as the one that came after it, we need to // remove all the receipts with the same timestamp as the last one, because // otherwise we would leave behind part of the receipts for that timestamp. - receipts.retain(|rx_receipt| { - rx_receipt.signed_receipt().message.timestamp_ns != last_timestamp - }); + receipts + .retain(|rx_receipt| rx_receipt.signed_receipt().message.timestamp() != last_timestamp); } } diff --git a/tap_core/src/manager/context/memory.rs b/tap_core/src/manager/context/memory.rs index 455b123d..757a43e8 100644 --- a/tap_core/src/manager/context/memory.rs +++ b/tap_core/src/manager/context/memory.rs @@ -17,14 +17,14 @@ use async_trait::async_trait; use crate::{ manager::adapters::*, - rav::SignedRAV, - receipt::{checks::StatefulTimestampCheck, state::Checking, ReceiptWithState}, + rav::{ReceiptAggregateVoucher, SignedRAV}, + receipt::{checks::StatefulTimestampCheck, state::Checking, Receipt, ReceiptWithState}, signed_message::MessageId, }; pub type EscrowStorage = Arc>>; pub type QueryAppraisals = Arc>>; -pub type ReceiptStorage = Arc>>>; +pub type ReceiptStorage = Arc>>>; pub type RAVStorage = Arc>>; use thiserror::Error; @@ -71,7 +71,7 @@ impl InMemoryContext { pub async fn retrieve_receipt_by_id( &self, receipt_id: u64, - ) -> Result, InMemoryError> { + ) -> Result, InMemoryError> { let receipt_storage = self.receipt_storage.read().unwrap(); receipt_storage @@ -85,7 +85,7 @@ impl InMemoryContext { pub async fn retrieve_receipts_by_timestamp( &self, timestamp_ns: u64, - ) -> Result)>, InMemoryError> { + ) -> Result)>, InMemoryError> { let receipt_storage = self.receipt_storage.read().unwrap(); Ok(receipt_storage .iter() @@ -99,7 +99,7 @@ impl InMemoryContext { pub async fn retrieve_receipts_upto_timestamp( &self, timestamp_ns: u64, - ) -> Result>, InMemoryError> { + ) -> Result>, InMemoryError> { self.retrieve_receipts_in_timestamp_range(..=timestamp_ns, None) .await } @@ -125,7 +125,7 @@ impl InMemoryContext { } #[async_trait] -impl RAVStore for InMemoryContext { +impl RAVStore for InMemoryContext { type AdapterError = InMemoryError; async fn update_last_rav(&self, rav: SignedRAV) -> Result<(), Self::AdapterError> { @@ -138,7 +138,7 @@ impl RAVStore for InMemoryContext { } #[async_trait] -impl RAVRead for InMemoryContext { +impl RAVRead for InMemoryContext { type AdapterError = InMemoryError; async fn last_rav(&self) -> Result, Self::AdapterError> { @@ -147,12 +147,12 @@ impl RAVRead for InMemoryContext { } #[async_trait] -impl ReceiptStore for InMemoryContext { +impl ReceiptStore for InMemoryContext { type AdapterError = InMemoryError; async fn store_receipt( &self, - receipt: ReceiptWithState, + receipt: ReceiptWithState, ) -> Result { let mut id_pointer = self.unique_id.write().unwrap(); let id_previous = *id_pointer; @@ -179,15 +179,15 @@ impl ReceiptDelete for InMemoryContext { } } #[async_trait] -impl ReceiptRead for InMemoryContext { +impl ReceiptRead for InMemoryContext { type AdapterError = InMemoryError; async fn retrieve_receipts_in_timestamp_range + std::marker::Send>( &self, timestamp_range_ns: R, limit: Option, - ) -> Result>, Self::AdapterError> { + ) -> Result>, Self::AdapterError> { let receipt_storage = self.receipt_storage.read().unwrap(); - let mut receipts_in_range: Vec> = receipt_storage + let mut receipts_in_range: Vec> = receipt_storage .iter() .filter(|(_, rx_receipt)| { timestamp_range_ns.contains(&rx_receipt.signed_receipt().message.timestamp_ns) @@ -274,7 +274,7 @@ pub mod checks { receipt::{ checks::{Check, CheckError, CheckResult, ReceiptCheck}, state::Checking, - Context, ReceiptError, ReceiptWithState, + Context, Receipt, ReceiptError, ReceiptWithState, }, signed_message::MessageId, }; @@ -284,7 +284,7 @@ pub mod checks { valid_signers: HashSet
, allocation_ids: Arc>>, _query_appraisals: Arc>>, - ) -> Vec { + ) -> Vec> { vec![ // Arc::new(UniqueCheck ), // Arc::new(ValueCheck { query_appraisals }), @@ -301,8 +301,12 @@ pub mod checks { } #[async_trait::async_trait] - impl Check for AllocationIdCheck { - async fn check(&self, _: &Context, receipt: &ReceiptWithState) -> CheckResult { + impl Check for AllocationIdCheck { + async fn check( + &self, + _: &Context, + receipt: &ReceiptWithState, + ) -> CheckResult { let received_allocation_id = receipt.signed_receipt().message.allocation_id; if self .allocation_ids @@ -328,8 +332,12 @@ pub mod checks { } #[async_trait::async_trait] - impl Check for SignatureCheck { - async fn check(&self, _: &Context, receipt: &ReceiptWithState) -> CheckResult { + impl Check for SignatureCheck { + async fn check( + &self, + _: &Context, + receipt: &ReceiptWithState, + ) -> CheckResult { let recovered_address = receipt .signed_receipt() .recover_signer(&self.domain_separator) diff --git a/tap_core/src/manager/mod.rs b/tap_core/src/manager/mod.rs index 7696796e..55fbab3d 100644 --- a/tap_core/src/manager/mod.rs +++ b/tap_core/src/manager/mod.rs @@ -34,6 +34,7 @@ //! //! ```rust //! use async_trait::async_trait; +//! use alloy::sol_types::SolStruct; //! use tap_core::{ //! receipt::{ //! ReceiptWithState, @@ -42,6 +43,7 @@ //! ReceiptError, //! Context //! }, +//!# rav::ReceiptAggregateVoucher, //! manager::{ //! Manager, //! adapters::ReceiptStore @@ -51,14 +53,19 @@ //! struct MyContext; //! //! #[async_trait] -//! impl ReceiptStore for MyContext { +//! impl ReceiptStore for MyContext +//! where +//! T: SolStruct + Send + 'static +//! { //! type AdapterError = ReceiptError; //! -//! async fn store_receipt(&self, receipt: ReceiptWithState) -> Result { +//! async fn store_receipt(&self, receipt: ReceiptWithState) -> Result { //! // ... //! # Ok(0) //! } //! } +//! +//! # type Rav = ReceiptAggregateVoucher; //! # #[tokio::main] //! # async fn main() { //! # use alloy::{dyn_abi::Eip712Domain, primitives::Address, signers::local::PrivateKeySigner}; @@ -70,7 +77,7 @@ //! //! let receipt = EIP712SignedMessage::new(&domain_separator, message, &wallet).unwrap(); //! -//! let manager = Manager::new(domain_separator, MyContext, CheckList::empty()); +//! let manager = Manager::<_, _, Rav>::new(domain_separator, MyContext, CheckList::empty()); //! manager.verify_and_store_receipt(&Context::new(), receipt).await.unwrap() //! # } //! ``` @@ -82,3 +89,8 @@ pub mod context; mod tap_manager; pub use tap_manager::Manager; + +pub trait WithValueAndTimestamp { + fn value(&self) -> u128; + fn timestamp(&self) -> u64; +} diff --git a/tap_core/src/manager/tap_manager.rs b/tap_core/src/manager/tap_manager.rs index 304e4d17..0e114871 100644 --- a/tap_core/src/manager/tap_manager.rs +++ b/tap_core/src/manager/tap_manager.rs @@ -1,48 +1,62 @@ // Copyright 2023-, Semiotic AI, Inc. // SPDX-License-Identifier: Apache-2.0 -use alloy::dyn_abi::Eip712Domain; +use std::marker::PhantomData; -use super::adapters::{EscrowHandler, RAVRead, RAVStore, ReceiptDelete, ReceiptRead, ReceiptStore}; +use alloy::{dyn_abi::Eip712Domain, sol_types::SolStruct}; + +use super::{ + adapters::{EscrowHandler, RAVRead, RAVStore, ReceiptDelete, ReceiptRead, ReceiptStore}, + WithValueAndTimestamp, +}; use crate::{ - rav::{RAVRequest, ReceiptAggregateVoucher, SignedRAV}, + rav::RAVRequest, receipt::{ checks::{CheckBatch, CheckList, TimestampCheck, UniqueCheck}, state::{Failed, Reserved}, - Context, ReceiptError, ReceiptWithState, SignedReceipt, + Context, ReceiptError, ReceiptWithState, }, + signed_message::EIP712SignedMessage, Error, }; -pub struct Manager { +pub struct Manager { /// Context that implements adapters context: E, /// Checks that must be completed for each receipt before being confirmed or denied for rav request - checks: CheckList, + checks: CheckList, /// Struct responsible for doing checks for receipt. Ownership stays with manager allowing manager /// to update configuration ( like minimum timestamp ). domain_separator: Eip712Domain, + + _receipt: PhantomData<(T, R)>, } -impl Manager { +impl Manager { /// Creates new manager with provided `adapters`, any receipts received by this manager /// will complete all `required_checks` before being accepted or declined from RAV. /// `starting_min_timestamp` will be used as min timestamp until the first RAV request is created. /// - pub fn new(domain_separator: Eip712Domain, context: E, checks: impl Into) -> Self { + pub fn new( + domain_separator: Eip712Domain, + context: E, + checks: impl Into>, + ) -> Self { Self { context, domain_separator, checks: checks.into(), + _receipt: PhantomData, } } } -impl Manager +impl Manager where - E: RAVStore + EscrowHandler, + E: RAVStore + EscrowHandler, + R: SolStruct + PartialEq + Sync + std::fmt::Debug, { /// Verify `signed_rav` matches all values on `expected_rav`, and that `signed_rav` has a valid signer. /// @@ -52,8 +66,8 @@ where /// pub async fn verify_and_store_rav( &self, - expected_rav: ReceiptAggregateVoucher, - signed_rav: SignedRAV, + expected_rav: R, + signed_rav: EIP712SignedMessage, ) -> std::result::Result<(), Error> { self.context .check_rav_signature(&signed_rav, &self.domain_separator) @@ -61,8 +75,8 @@ where if signed_rav.message != expected_rav { return Err(Error::InvalidReceivedRAV { - received_rav: signed_rav.message, - expected_rav, + received_rav: format!("{:?}", signed_rav.message), + expected_rav: format!("{:?}", expected_rav), }); } @@ -77,11 +91,12 @@ where } } -impl Manager +impl Manager where - E: RAVRead, + E: RAVRead, + R: SolStruct, { - async fn get_previous_rav(&self) -> Result, Error> { + async fn get_previous_rav(&self) -> Result>, Error> { let previous_rav = self .context .last_rav() @@ -93,9 +108,10 @@ where } } -impl Manager +impl Manager where - E: ReceiptRead + EscrowHandler, + E: ReceiptRead + EscrowHandler, + T: SolStruct + WithValueAndTimestamp + Sync, { async fn collect_receipts( &self, @@ -105,8 +121,8 @@ where limit: Option, ) -> Result< ( - Vec>, - Vec>, + Vec>, + Vec>, ), Error, > { @@ -164,9 +180,11 @@ where } } -impl Manager +impl Manager where - E: ReceiptRead + RAVRead + EscrowHandler, + E: ReceiptRead + RAVRead + EscrowHandler, + T: SolStruct + WithValueAndTimestamp + Sync, + R: SolStruct + WithValueAndTimestamp + Clone, { /// Completes remaining checks on all receipts up to /// (current time - `timestamp_buffer_ns`). Returns them in two lists @@ -188,18 +206,22 @@ where ctx: &Context, timestamp_buffer_ns: u64, receipts_limit: Option, - ) -> Result { + generate_rav: impl FnOnce( + &[ReceiptWithState], + Option>, + ) -> Result, + ) -> Result, Error> { let previous_rav = self.get_previous_rav().await?; let min_timestamp_ns = previous_rav .as_ref() - .map(|rav| rav.message.timestampNs + 1) + .map(|rav| rav.message.timestamp() + 1) .unwrap_or(0); let (valid_receipts, invalid_receipts) = self .collect_receipts(ctx, timestamp_buffer_ns, min_timestamp_ns, receipts_limit) .await?; - let expected_rav = Self::generate_expected_rav(&valid_receipts, previous_rav.clone()); + let expected_rav = generate_rav(&valid_receipts, previous_rav.clone()); Ok(RAVRequest { valid_receipts, @@ -208,30 +230,12 @@ where expected_rav, }) } - - fn generate_expected_rav( - receipts: &[ReceiptWithState], - previous_rav: Option, - ) -> Result { - if receipts.is_empty() { - return Err(Error::NoValidReceiptsForRAVRequest); - } - let allocation_id = receipts[0].signed_receipt().message.allocation_id; - let receipts = receipts - .iter() - .map(|rx_receipt| rx_receipt.signed_receipt().clone()) - .collect::>(); - ReceiptAggregateVoucher::aggregate_receipts( - allocation_id, - receipts.as_slice(), - previous_rav, - ) - } } -impl Manager +impl Manager where - E: ReceiptDelete + RAVRead, + E: ReceiptDelete + RAVRead, + R: SolStruct + WithValueAndTimestamp, { /// Removes obsolete receipts from storage. Obsolete receipts are receipts /// that are older than the last RAV, and therefore already aggregated into the RAV. @@ -247,7 +251,7 @@ where match self.get_previous_rav().await? { Some(last_rav) => { self.context - .remove_receipts_in_timestamp_range(..=last_rav.message.timestampNs) + .remove_receipts_in_timestamp_range(..=last_rav.message.timestamp()) .await .map_err(|err| Error::AdapterError { source_error: anyhow::Error::new(err), @@ -259,9 +263,10 @@ where } } -impl Manager +impl Manager where - E: ReceiptStore, + E: ReceiptStore, + T: SolStruct, { /// Runs `initial_checks` on `signed_receipt` for initial verification, /// then stores received receipt. @@ -274,7 +279,7 @@ where pub async fn verify_and_store_receipt( &self, ctx: &Context, - signed_receipt: SignedReceipt, + signed_receipt: EIP712SignedMessage, ) -> std::result::Result<(), Error> { let mut received_receipt = ReceiptWithState::new(signed_receipt); diff --git a/tap_core/src/rav.rs b/tap_core/src/rav.rs index bf55f862..bd72c95c 100644 --- a/tap_core/src/rav.rs +++ b/tap_core/src/rav.rs @@ -44,7 +44,12 @@ use std::cmp; use alloy::{primitives::Address, sol}; use serde::{Deserialize, Serialize}; -use crate::{receipt::Receipt, signed_message::EIP712SignedMessage, Error}; +use crate::{ + manager::WithValueAndTimestamp, + receipt::{state::Reserved, Receipt, ReceiptWithState}, + signed_message::EIP712SignedMessage, + Error, +}; /// EIP712 signed message for ReceiptAggregateVoucher pub type SignedRAV = EIP712SignedMessage; @@ -67,6 +72,16 @@ sol! { } } +impl WithValueAndTimestamp for ReceiptAggregateVoucher { + fn value(&self) -> u128 { + self.valueAggregate + } + + fn timestamp(&self) -> u64 { + self.timestampNs + } +} + impl ReceiptAggregateVoucher { /// Aggregates a batch of validated receipts with optional validated previous RAV, /// returning a new RAV if all provided items are valid or an error if not. @@ -106,3 +121,18 @@ impl ReceiptAggregateVoucher { }) } } + +pub fn generate_expected_rav( + receipts: &[ReceiptWithState], + previous_rav: Option, +) -> Result { + if receipts.is_empty() { + return Err(Error::NoValidReceiptsForRAVRequest); + } + let allocation_id = receipts[0].signed_receipt().message.allocation_id; + let receipts = receipts + .iter() + .map(|rx_receipt| rx_receipt.signed_receipt().clone()) + .collect::>(); + ReceiptAggregateVoucher::aggregate_receipts(allocation_id, receipts.as_slice(), previous_rav) +} diff --git a/tap_core/src/rav/request.rs b/tap_core/src/rav/request.rs index 3854d7f2..a22b65d2 100644 --- a/tap_core/src/rav/request.rs +++ b/tap_core/src/rav/request.rs @@ -1,24 +1,30 @@ // Copyright 2023-, Semiotic AI, Inc. // SPDX-License-Identifier: Apache-2.0 +use alloy::sol_types::SolStruct; + use crate::{ - rav::{ReceiptAggregateVoucher, SignedRAV}, receipt::{ state::{Failed, Reserved}, ReceiptWithState, }, + signed_message::EIP712SignedMessage, Error, }; /// Request to `tap_aggregator` to aggregate receipts into a Signed RAV. #[derive(Debug)] -pub struct RAVRequest { +pub struct RAVRequest +where + T: SolStruct, + Rav: SolStruct, +{ /// List of checked and reserved receipts to aggregate - pub valid_receipts: Vec>, + pub valid_receipts: Vec>, /// Optional previous RAV to aggregate with - pub previous_rav: Option, + pub previous_rav: Option>, /// List of failed receipt used to log invalid receipts - pub invalid_receipts: Vec>, + pub invalid_receipts: Vec>, /// Expected RAV to be created - pub expected_rav: Result, + pub expected_rav: Result, } diff --git a/tap_core/src/receipt/checks.rs b/tap_core/src/receipt/checks.rs index 86889f0d..2d5a28d3 100644 --- a/tap_core/src/receipt/checks.rs +++ b/tap_core/src/receipt/checks.rs @@ -12,21 +12,25 @@ //! # use std::sync::Arc; //! use tap_core::{ //! receipt::checks::{Check, CheckResult, ReceiptCheck}, -//! receipt::{Context, ReceiptWithState, state::Checking} +//! receipt::{Context, ReceiptWithState, state::Checking, Receipt} //! }; +//! use alloy::sol_types::SolStruct; //! # use async_trait::async_trait; //! //! struct MyCheck; //! //! #[async_trait] -//! impl Check for MyCheck { -//! async fn check(&self, ctx: &Context, receipt: &ReceiptWithState) -> CheckResult { +//! impl Check for MyCheck +//! where +//! T: SolStruct +//! { +//! async fn check(&self, ctx: &Context, receipt: &ReceiptWithState) -> CheckResult { //! // Implement your check here //! Ok(()) //! } //! } //! -//! let my_check: ReceiptCheck = Arc::new(MyCheck); +//! let my_check: ReceiptCheck = Arc::new(MyCheck); //! ``` use std::{ @@ -35,14 +39,19 @@ use std::{ sync::{Arc, RwLock}, }; +use alloy::sol_types::SolStruct; + use super::{ state::{Checking, Failed}, - Context, ReceiptError, ReceiptWithState, + Context, Receipt, ReceiptError, ReceiptWithState, +}; +use crate::{ + manager::WithValueAndTimestamp, + signed_message::{SignatureBytes, SignatureBytesExt}, }; -use crate::signed_message::{SignatureBytes, SignatureBytesExt}; /// ReceiptCheck is a type alias for an Arc of a struct that implements the `Check` trait. -pub type ReceiptCheck = Arc; +pub type ReceiptCheck = Arc + Sync + Send>; /// Result of a check operation. It uses the `anyhow` crate to handle errors. pub type CheckResult = Result<(), CheckError>; @@ -57,10 +66,10 @@ pub enum CheckError { /// CheckList is a NewType pattern to store a list of checks. /// It is a wrapper around an Arc of ReceiptCheck[]. -pub struct CheckList(Arc<[ReceiptCheck]>); +pub struct CheckList(Arc<[ReceiptCheck]>); -impl CheckList { - pub fn new(checks: Vec) -> Self { +impl CheckList { + pub fn new(checks: Vec>) -> Self { Self(checks.into()) } @@ -69,8 +78,8 @@ impl CheckList { } } -impl Deref for CheckList { - type Target = [ReceiptCheck]; +impl Deref for CheckList { + type Target = [ReceiptCheck]; fn deref(&self) -> &Self::Target { self.0.as_ref() @@ -79,20 +88,25 @@ impl Deref for CheckList { /// Check trait is implemented by the lib user to validate receipts before they are stored. #[async_trait::async_trait] -pub trait Check { - async fn check(&self, ctx: &Context, receipt: &ReceiptWithState) -> CheckResult; +pub trait Check +where + T: SolStruct, +{ + async fn check(&self, ctx: &Context, receipt: &ReceiptWithState) -> CheckResult; } +type CheckBatchResponse = ( + Vec>, + Vec>, +); + /// CheckBatch is mostly used by the lib to implement checks /// that transition from one state to another. -pub trait CheckBatch { - fn check_batch( - &self, - receipts: Vec>, - ) -> ( - Vec>, - Vec>, - ); +pub trait CheckBatch +where + R: SolStruct, +{ + fn check_batch(&self, receipts: Vec>) -> CheckBatchResponse; } /// Provides a built-in check to verify that the timestamp of a receipt @@ -118,8 +132,12 @@ impl StatefulTimestampCheck { } #[async_trait::async_trait] -impl Check for StatefulTimestampCheck { - async fn check(&self, _: &Context, receipt: &ReceiptWithState) -> CheckResult { +impl Check for StatefulTimestampCheck { + async fn check( + &self, + _: &Context, + receipt: &ReceiptWithState, + ) -> CheckResult { let min_timestamp_ns = *self.min_timestamp_ns.read().unwrap(); let signed_receipt = receipt.signed_receipt(); if signed_receipt.message.timestamp_ns <= min_timestamp_ns { @@ -141,17 +159,20 @@ impl Check for StatefulTimestampCheck { /// Used by the [`crate::manager::Manager`]. pub struct TimestampCheck(pub u64); -impl CheckBatch for TimestampCheck { +impl CheckBatch for TimestampCheck +where + T: SolStruct + WithValueAndTimestamp, +{ fn check_batch( &self, - receipts: Vec>, + receipts: Vec>, ) -> ( - Vec>, - Vec>, + Vec>, + Vec>, ) { let (mut checking, mut failed) = (vec![], vec![]); for receipt in receipts.into_iter() { - let receipt_timestamp_ns = receipt.signed_receipt().message.timestamp_ns; + let receipt_timestamp_ns = receipt.signed_receipt().message.timestamp(); let min_timestamp_ns = self.0; if receipt_timestamp_ns >= min_timestamp_ns { checking.push(receipt); @@ -172,13 +193,16 @@ impl CheckBatch for TimestampCheck { /// Used by the [`crate::manager::Manager`]. pub struct UniqueCheck; -impl CheckBatch for UniqueCheck { +impl CheckBatch for UniqueCheck +where + T: SolStruct, +{ fn check_batch( &self, - receipts: Vec>, + receipts: Vec>, ) -> ( - Vec>, - Vec>, + Vec>, + Vec>, ) { let mut signatures: HashSet = HashSet::new(); let (mut checking, mut failed) = (vec![], vec![]); @@ -213,7 +237,7 @@ mod tests { use super::*; use crate::{receipt::Receipt, signed_message::EIP712SignedMessage}; - fn create_signed_receipt_with_custom_value(value: u128) -> ReceiptWithState { + fn create_signed_receipt_with_custom_value(value: u128) -> ReceiptWithState { let wallet: PrivateKeySigner = PrivateKeySigner::random(); let eip712_domain_separator: Eip712Domain = eip712_domain! { name: "TAP", @@ -243,7 +267,7 @@ mod tests { &wallet, ) .unwrap(); - ReceiptWithState::::new(receipt) + ReceiptWithState::::new(receipt) } #[tokio::test] diff --git a/tap_core/src/receipt/receipt_sol.rs b/tap_core/src/receipt/receipt_sol.rs index f855fb2b..3e770515 100644 --- a/tap_core/src/receipt/receipt_sol.rs +++ b/tap_core/src/receipt/receipt_sol.rs @@ -12,6 +12,8 @@ use alloy::{primitives::Address, sol}; use rand::{thread_rng, Rng}; use serde::{Deserialize, Serialize}; +use crate::manager::WithValueAndTimestamp; + sol! { /// Holds information needed for promise of payment signed with ECDSA #[derive(Debug, Serialize, Deserialize, Eq, PartialEq)] @@ -27,6 +29,16 @@ sol! { } } +impl WithValueAndTimestamp for Receipt { + fn value(&self) -> u128 { + self.value + } + + fn timestamp(&self) -> u64 { + self.timestamp_ns + } +} + impl Receipt { /// Returns a receipt with provided values pub fn new(allocation_id: Address, value: u128) -> crate::Result { diff --git a/tap_core/src/receipt/received_receipt.rs b/tap_core/src/receipt/received_receipt.rs index 708d6dab..b1bc7d6e 100644 --- a/tap_core/src/receipt/received_receipt.rs +++ b/tap_core/src/receipt/received_receipt.rs @@ -13,11 +13,11 @@ //! This module is useful for managing and tracking the state of received receipts, as well as //! their progress through various checks and stages of inclusion in RAV requests and received RAVs. -use alloy::dyn_abi::Eip712Domain; +use alloy::{dyn_abi::Eip712Domain, sol_types::SolStruct}; -use super::{checks::CheckError, Context, Receipt, ReceiptError, ReceiptResult, SignedReceipt}; +use super::{checks::CheckError, Context, ReceiptError, ReceiptResult}; use crate::{ - manager::adapters::EscrowHandler, + manager::{adapters::EscrowHandler, WithValueAndTimestamp}, receipt::{ checks::ReceiptCheck, state::{AwaitingReserve, Checking, Failed, ReceiptState, Reserved}, @@ -25,7 +25,8 @@ use crate::{ signed_message::EIP712SignedMessage, }; -pub type ResultReceipt = std::result::Result, ReceiptWithState>; +pub type ResultReceipt = + std::result::Result, ReceiptWithState>; /// Typestate pattern for tracking the state of a receipt /// @@ -40,17 +41,21 @@ pub type ResultReceipt = std::result::Result, ReceiptWith /// - The [ `Reserved` ] state is used to represent a receipt that has /// successfully reserved escrow. #[derive(Debug, Clone)] -pub struct ReceiptWithState +pub struct ReceiptWithState where S: ReceiptState, + R: SolStruct, { /// An EIP712 signed receipt message - pub(crate) signed_receipt: EIP712SignedMessage, + pub(crate) signed_receipt: EIP712SignedMessage, /// The current state of the receipt (e.g., received, checking, failed, accepted, etc.) pub(crate) _state: S, } -impl ReceiptWithState { +impl ReceiptWithState +where + R: SolStruct + Sync + WithValueAndTimestamp, +{ /// Perform the checks implemented by the context and reserve escrow if /// all checks pass /// @@ -60,7 +65,7 @@ impl ReceiptWithState { self, context: &E, domain_separator: &Eip712Domain, - ) -> ResultReceipt + ) -> ResultReceipt where E: EscrowHandler, { @@ -74,9 +79,12 @@ impl ReceiptWithState { } } -impl ReceiptWithState { +impl ReceiptWithState +where + R: SolStruct, +{ /// Creates a new `ReceiptWithState` in the `Checking` state - pub fn new(signed_receipt: SignedReceipt) -> ReceiptWithState { + pub fn new(signed_receipt: EIP712SignedMessage) -> ReceiptWithState { ReceiptWithState { signed_receipt, _state: Checking, @@ -94,7 +102,7 @@ impl ReceiptWithState { pub async fn perform_checks( &mut self, ctx: &Context, - checks: &[ReceiptCheck], + checks: &[ReceiptCheck], ) -> ReceiptResult<()> { for check in checks { // return early on an error @@ -114,8 +122,8 @@ impl ReceiptWithState { pub async fn finalize_receipt_checks( mut self, ctx: &Context, - checks: &[ReceiptCheck], - ) -> Result, String> { + checks: &[ReceiptCheck], + ) -> Result, String> { let all_checks_passed = self.perform_checks(ctx, checks).await; if let Err(ReceiptError::RetryableCheck(e)) = all_checks_passed { Err(e.to_string()) @@ -128,24 +136,28 @@ impl ReceiptWithState { } } -impl ReceiptWithState { +impl ReceiptWithState +where + R: SolStruct, +{ pub fn error(self) -> ReceiptError { self._state.error } } -impl ReceiptWithState +impl ReceiptWithState where S: ReceiptState, + R: SolStruct, { - pub(super) fn perform_state_error(self, error: ReceiptError) -> ReceiptWithState { + pub(super) fn perform_state_error(self, error: ReceiptError) -> ReceiptWithState { ReceiptWithState { signed_receipt: self.signed_receipt, _state: Failed { error }, } } - fn perform_state_changes(self, new_state: T) -> ReceiptWithState + fn perform_state_changes(self, new_state: T) -> ReceiptWithState where T: ReceiptState, { @@ -156,7 +168,7 @@ where } /// Returns the signed receipt - pub fn signed_receipt(&self) -> &EIP712SignedMessage { + pub fn signed_receipt(&self) -> &EIP712SignedMessage { &self.signed_receipt } } diff --git a/tap_core/tests/manager_test.rs b/tap_core/tests/manager_test.rs index e8a05082..f801a840 100644 --- a/tap_core/tests/manager_test.rs +++ b/tap_core/tests/manager_test.rs @@ -23,7 +23,7 @@ use tap_core::{ }, Manager, }, - rav::ReceiptAggregateVoucher, + rav::{self, ReceiptAggregateVoucher}, receipt::{ checks::{Check, CheckError, CheckList, StatefulTimestampCheck}, state::Checking, @@ -71,7 +71,7 @@ struct ContextFixture { context: InMemoryContext, escrow_storage: EscrowStorage, query_appraisals: QueryAppraisals, - checks: CheckList, + checks: CheckList, signer: PrivateKeySigner, } @@ -128,7 +128,8 @@ async fn manager_verify_and_store_varying_initial_checks( signer, .. } = context; - let manager = Manager::new(domain_separator.clone(), context, checks); + let manager = + Manager::<_, _, ReceiptAggregateVoucher>::new(domain_separator.clone(), context, checks); let value = 20u128; let signed_receipt = EIP712SignedMessage::new( @@ -188,7 +189,9 @@ async fn manager_create_rav_request_all_valid_receipts( .await .is_ok()); } - let rav_request_result = manager.create_rav_request(&Context::new(), 0, None).await; + let rav_request_result = manager + .create_rav_request(&Context::new(), 0, None, rav::generate_expected_rav) + .await; assert!(rav_request_result.is_ok()); let rav_request = rav_request_result.unwrap(); @@ -284,7 +287,9 @@ async fn manager_create_multiple_rav_requests_all_valid_receipts( .is_ok()); expected_accumulated_value += value; } - let rav_request_result = manager.create_rav_request(&Context::new(), 0, None).await; + let rav_request_result = manager + .create_rav_request(&Context::new(), 0, None, rav::generate_expected_rav) + .await; assert!(rav_request_result.is_ok()); let rav_request = rav_request_result.unwrap(); @@ -328,7 +333,9 @@ async fn manager_create_multiple_rav_requests_all_valid_receipts( .is_ok()); expected_accumulated_value += value; } - let rav_request_result = manager.create_rav_request(&Context::new(), 0, None).await; + let rav_request_result = manager + .create_rav_request(&Context::new(), 0, None, rav::generate_expected_rav) + .await; assert!(rav_request_result.is_ok()); let rav_request = rav_request_result.unwrap(); @@ -403,7 +410,9 @@ async fn manager_create_multiple_rav_requests_all_valid_receipts_consecutive_tim manager.remove_obsolete_receipts().await.unwrap(); } - let rav_request_1_result = manager.create_rav_request(&Context::new(), 0, None).await; + let rav_request_1_result = manager + .create_rav_request(&Context::new(), 0, None, rav::generate_expected_rav) + .await; assert!(rav_request_1_result.is_ok()); let rav_request_1 = rav_request_1_result.unwrap(); @@ -458,7 +467,9 @@ async fn manager_create_multiple_rav_requests_all_valid_receipts_consecutive_tim ); } - let rav_request_2_result = manager.create_rav_request(&Context::new(), 0, None).await; + let rav_request_2_result = manager + .create_rav_request(&Context::new(), 0, None, rav::generate_expected_rav) + .await; assert!(rav_request_2_result.is_ok()); let rav_request_2 = rav_request_2_result.unwrap(); @@ -524,7 +535,7 @@ async fn manager_create_rav_and_ignore_invalid_receipts( } let rav_request = manager - .create_rav_request(&Context::new(), 0, None) + .create_rav_request(&Context::new(), 0, None, rav::generate_expected_rav) .await .unwrap(); let expected_rav = rav_request.expected_rav.unwrap(); @@ -546,11 +557,11 @@ async fn test_retryable_checks( struct RetryableCheck(Arc); #[async_trait::async_trait] - impl Check for RetryableCheck { + impl Check for RetryableCheck { async fn check( &self, _: &Context, - receipt: &ReceiptWithState, + receipt: &ReceiptWithState, ) -> Result<(), CheckError> { // we want to fail only if nonce is 5 and if is create rav step if self.0.load(std::sync::atomic::Ordering::SeqCst) @@ -573,7 +584,7 @@ async fn test_retryable_checks( let is_create_rav = Arc::new(AtomicBool::new(false)); - let mut checks: Vec> = checks.iter().cloned().collect(); + let mut checks: Vec + Send + Sync>> = checks.iter().cloned().collect(); checks.push(Arc::new(RetryableCheck(is_create_rav.clone()))); let manager = Manager::new( @@ -605,7 +616,9 @@ async fn test_retryable_checks( is_create_rav.store(true, std::sync::atomic::Ordering::SeqCst); - let rav_request = manager.create_rav_request(&Context::new(), 0, None).await; + let rav_request = manager + .create_rav_request(&Context::new(), 0, None, rav::generate_expected_rav) + .await; assert_eq!( rav_request.expect_err("Didn't fail").to_string(), diff --git a/tap_core/tests/receipt_test.rs b/tap_core/tests/receipt_test.rs index 6416d0d8..1a58f34a 100644 --- a/tap_core/tests/receipt_test.rs +++ b/tap_core/tests/receipt_test.rs @@ -159,7 +159,7 @@ fn safe_truncate_receipts_test( let wallet = PrivateKeySigner::random(); // Vec of (id, receipt) - let mut receipts_orig: Vec> = Vec::new(); + let mut receipts_orig: Vec> = Vec::new(); for timestamp in input.iter() { // The contents of the receipt only need to be unique for this test (so we can check) diff --git a/tap_core/tests/received_receipt_test.rs b/tap_core/tests/received_receipt_test.rs index 2b6c6e0b..38248bc5 100644 --- a/tap_core/tests/received_receipt_test.rs +++ b/tap_core/tests/received_receipt_test.rs @@ -59,7 +59,7 @@ struct ContextFixture { context: InMemoryContext, escrow_storage: EscrowStorage, query_appraisals: QueryAppraisals, - checks: Vec, + checks: Vec>, signer: PrivateKeySigner, } diff --git a/tap_integration_tests/tests/indexer_mock.rs b/tap_integration_tests/tests/indexer_mock.rs index 7877ffba..78d14425 100644 --- a/tap_integration_tests/tests/indexer_mock.rs +++ b/tap_integration_tests/tests/indexer_mock.rs @@ -20,8 +20,8 @@ use tap_core::{ adapters::{EscrowHandler, RAVRead, RAVStore, ReceiptRead, ReceiptStore}, Manager, }, - rav::SignedRAV, - receipt::{checks::CheckList, Context, SignedReceipt}, + rav::{self, ReceiptAggregateVoucher, SignedRAV}, + receipt::{checks::CheckList, Context, Receipt, SignedReceipt}, }; /// Rpc trait represents a JSON-RPC server that has a single async method `request`. /// This method is designed to handle incoming JSON-RPC requests. @@ -43,9 +43,9 @@ pub trait Rpc { /// threshold is a limit to which receipt_count can increment, after reaching which RAV request is triggered. /// aggregator_client is an HTTP client used for making JSON-RPC requests to another server. pub struct RpcManager { - manager: Arc>, // Manager object reference counted with an Arc + manager: Arc>, // Manager object reference counted with an Arc receipt_count: Arc, // Thread-safe atomic counter for receipts - threshold: u64, // The count at which a RAV request will be triggered + threshold: u64, // The count at which a RAV request will be triggered aggregator_client: (HttpClient, String), // HTTP client for sending requests to the aggregator server } @@ -59,17 +59,13 @@ where pub fn new( domain_separator: Eip712Domain, context: E, - required_checks: CheckList, + required_checks: CheckList, threshold: u64, aggregate_server_address: String, aggregate_server_api_version: String, ) -> Result { Ok(Self { - manager: Arc::new(Manager::::new( - domain_separator, - context, - required_checks, - )), + manager: Arc::new(Manager::new(domain_separator, context, required_checks)), receipt_count: Arc::new(AtomicU64::new(0)), threshold, aggregator_client: ( @@ -83,7 +79,14 @@ where #[async_trait] impl RpcServer for RpcManager where - E: ReceiptStore + ReceiptRead + RAVStore + RAVRead + EscrowHandler + Send + Sync + 'static, + E: ReceiptStore + + ReceiptRead + + RAVStore + + RAVRead + + EscrowHandler + + Send + + Sync + + 'static, { async fn request( &self, @@ -137,16 +140,16 @@ pub async fn run_server( port: u16, // Port on which the server will listen domain_separator: Eip712Domain, // EIP712 domain separator context: E, // context instance - required_checks: CheckList, // Vector of required checks to be performed on each request - threshold: u64, // The count at which a RAV request will be triggered - aggregate_server_address: String, // Address of the aggregator server + required_checks: CheckList, // Vector of required checks to be performed on each request + threshold: u64, // The count at which a RAV request will be triggered + aggregate_server_address: String, // Address of the aggregator server aggregate_server_api_version: String, // API version of the aggregator server ) -> Result<(ServerHandle, std::net::SocketAddr)> where - E: ReceiptStore - + ReceiptRead - + RAVStore - + RAVRead + E: ReceiptStore + + ReceiptRead + + RAVStore + + RAVRead + EscrowHandler + Clone + Send @@ -176,17 +179,25 @@ where // request_rav function creates a request for aggregate receipts (RAV), sends it to another server and verifies the result. async fn request_rav( - manager: &Arc>, + manager: &Arc>, time_stamp_buffer: u64, // Buffer for timestamping, see tap_core for details aggregator_client: &(HttpClient, String), // HttpClient for making requests to the tap_aggregator server threshold: usize, ) -> Result<()> where - E: ReceiptRead + RAVRead + RAVStore + EscrowHandler, + E: ReceiptRead + + RAVRead + + RAVStore + + EscrowHandler, { // Create the aggregate_receipts request params let rav_request = manager - .create_rav_request(&Context::new(), time_stamp_buffer, None) + .create_rav_request( + &Context::new(), + time_stamp_buffer, + None, + rav::generate_expected_rav, + ) .await?; // To-do: Need to add previous RAV, when tap_manager supports replacing receipts diff --git a/tap_integration_tests/tests/showcase.rs b/tap_integration_tests/tests/showcase.rs index 794b2e9d..f73d34ee 100644 --- a/tap_integration_tests/tests/showcase.rs +++ b/tap_integration_tests/tests/showcase.rs @@ -161,7 +161,7 @@ fn query_appraisals(query_price: &[u128]) -> QueryAppraisals { struct ContextFixture { context: InMemoryContext, - checks: CheckList, + checks: CheckList, } #[fixture] @@ -800,7 +800,7 @@ async fn start_indexer_server( mut context: InMemoryContext, sender_id: Address, available_escrow: u128, - required_checks: CheckList, + required_checks: CheckList, receipt_threshold: u64, agg_server_addr: SocketAddr, ) -> Result<(ServerHandle, SocketAddr)> {