diff --git a/Cargo.lock b/Cargo.lock index aa6df19d..f417bac7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -68,9 +68,9 @@ checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" [[package]] name = "alloy" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02b0561294ccedc6181e5528b850b4579e3fbde696507baa00109bfd9054c5bb" +checksum = "59febb24956a41c29bb5f450978fbe825bd6456b3f80586c8bd558dc882e7b6a" dependencies = [ "alloy-consensus", "alloy-contract", @@ -107,9 +107,9 @@ dependencies = [ [[package]] name = "alloy-consensus" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a101d4d016f47f13890a74290fdd17b05dd175191d9337bc600791fb96e4dea8" +checksum = "e88e1edea70787c33e11197d3f32ae380f3db19e6e061e539a5bcf8184a6b326" dependencies = [ "alloy-eips", "alloy-primitives", @@ -125,9 +125,9 @@ dependencies = [ [[package]] name = "alloy-consensus-any" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa60357dda9a3d0f738f18844bd6d0f4a5924cc5cf00bfad2ff1369897966123" +checksum = "57b1bb53f40c0273cd1975573cd457b39213e68584e36d1401d25fd0398a1d65" dependencies = [ "alloy-consensus", "alloy-eips", @@ -139,9 +139,9 @@ dependencies = [ [[package]] name = "alloy-contract" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2869e4fb31331d3b8c58c7db567d1e4e4e94ef64640beda3b6dd9b7045690941" +checksum = "1b668c78c4b1f12f474ede5a85e8ce550d0aa1ef7d49fd1d22855a43b960e725" dependencies = [ "alloy-dyn-abi", "alloy-json-abi", @@ -160,9 +160,9 @@ dependencies = [ [[package]] name = "alloy-core" -version = "0.8.13" +version = "0.8.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8d22df68fa7d9744be0b1a9be3260e9aa089fbf41903ab182328333061ed186" +checksum = "c618bd382f0bc2ac26a7e4bfae01c9b015ca8f21b37ca40059ae35a7e62b3dc6" dependencies = [ "alloy-dyn-abi", "alloy-json-abi", @@ -173,9 +173,9 @@ dependencies = [ [[package]] name = "alloy-dyn-abi" -version = "0.8.13" +version = "0.8.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cf633ae9a1f0c82fdb9e559ed2be1c8e415c3e48fc47e1feaf32c6078ec0cdd" +checksum = "41056bde53ae10ffbbf11618efbe1e0290859e5eab0fe9ef82ebdb62f12a866f" dependencies = [ "alloy-json-abi", "alloy-primitives", @@ -215,9 +215,9 @@ dependencies = [ [[package]] name = "alloy-eips" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b6755b093afef5925f25079dd5a7c8d096398b804ba60cb5275397b06b31689" +checksum = "5f9fadfe089e9ccc0650473f2d4ef0a28bc015bbca5631d9f0f09e49b557fdb3" dependencies = [ "alloy-eip2930", "alloy-eip7702", @@ -233,9 +233,9 @@ dependencies = [ [[package]] name = "alloy-genesis" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aeec8e6eab6e52b7c9f918748c9b811e87dbef7312a2e3a2ca1729a92966a6af" +checksum = "2b2a4cf7b70f3495788e74ce1c765260ffe38820a2a774ff4aacb62e31ea73f9" dependencies = [ "alloy-primitives", "alloy-serde", @@ -257,9 +257,9 @@ dependencies = [ [[package]] name = "alloy-json-rpc" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fa077efe0b834bcd89ff4ba547f48fb081e4fdc3673dd7da1b295a2cf2bb7b7" +checksum = "e29040b9d5fe2fb70415531882685b64f8efd08dfbd6cc907120650504821105" dependencies = [ "alloy-primitives", "alloy-sol-types", @@ -271,9 +271,9 @@ dependencies = [ [[package]] name = "alloy-network" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "209a1882a08e21aca4aac6e2a674dc6fcf614058ef8cb02947d63782b1899552" +checksum = "510cc00b318db0dfccfdd2d032411cfae64fc144aef9679409e014145d3dacc4" dependencies = [ "alloy-consensus", "alloy-consensus-any", @@ -296,9 +296,9 @@ dependencies = [ [[package]] name = "alloy-network-primitives" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c20219d1ad261da7a6331c16367214ee7ded41d001fabbbd656fbf71898b2773" +checksum = "9081c099e798b8a2bba2145eb82a9a146f01fc7a35e9ab6e7b43305051f97550" dependencies = [ "alloy-consensus", "alloy-eips", @@ -337,9 +337,9 @@ dependencies = [ [[package]] name = "alloy-provider" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9eefa6f4c798ad01f9b4202d02cea75f5ec11fa180502f4701e2b47965a8c0bb" +checksum = "dc2dfaddd9a30aa870a78a4e1316e3e115ec1e12e552cbc881310456b85c1f24" dependencies = [ "alloy-chains", "alloy-consensus", @@ -377,9 +377,9 @@ dependencies = [ [[package]] name = "alloy-pubsub" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aac9a7210e0812b1d814118f426f57eb7fc260a419224dd1c76d169879c06907" +checksum = "695809e743628d54510c294ad17a4645bd9f465aeb0d20ee9ce9877c9712dc9c" dependencies = [ "alloy-json-rpc", "alloy-primitives", @@ -418,9 +418,9 @@ dependencies = [ [[package]] name = "alloy-rpc-client" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed30bf1041e84cabc5900f52978ca345dd9969f2194a945e6fdec25b0620705c" +checksum = "531137b283547d5b9a5cafc96b006c64ef76810c681d606f28be9781955293b6" dependencies = [ "alloy-json-rpc", "alloy-primitives", @@ -444,9 +444,9 @@ dependencies = [ [[package]] name = "alloy-rpc-types" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ab686b0fa475d2a4f5916c5f07797734a691ec58e44f0f55d4746ea39cbcefb" +checksum = "3410a472ce26c457e9780f708ee6bd540b30f88f1f31fdab7a11d00bd6aa1aee" dependencies = [ "alloy-primitives", "alloy-rpc-types-engine", @@ -457,9 +457,9 @@ dependencies = [ [[package]] name = "alloy-rpc-types-any" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "200661999b6e235d9840be5d60a6e8ae2f0af9eb2a256dd378786744660e36ec" +checksum = "ed98e1af55a7d856bfa385f30f63d8d56be2513593655c904a8f4a7ec963aa3e" dependencies = [ "alloy-consensus-any", "alloy-rpc-types-eth", @@ -468,9 +468,9 @@ dependencies = [ [[package]] name = "alloy-rpc-types-engine" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d297268357e3eae834ddd6888b15f764cbc0f4b3be9265f5f6ec239013f3d68" +checksum = "03bd16fa4959255ebf4a7702df08f325e5631df5cdca07c8a8e58bdc10fe02e3" dependencies = [ "alloy-consensus", "alloy-eips", @@ -484,9 +484,9 @@ dependencies = [ [[package]] name = "alloy-rpc-types-eth" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0600b8b5e2dc0cab12cbf91b5a885c35871789fb7b3a57b434bd4fced5b7a8b" +checksum = "8737d7a6e37ca7bba9c23e9495c6534caec6760eb24abc9d5ffbaaba147818e1" dependencies = [ "alloy-consensus", "alloy-consensus-any", @@ -504,9 +504,9 @@ dependencies = [ [[package]] name = "alloy-serde" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9afa753a97002a33b2ccb707d9f15f31c81b8c1b786c95b73cc62bb1d1fd0c3f" +checksum = "5851bf8d5ad33014bd0c45153c603303e730acc8a209450a7ae6b4a12c2789e2" dependencies = [ "alloy-primitives", "serde", @@ -515,9 +515,9 @@ dependencies = [ [[package]] name = "alloy-signer" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b2cbff01a673936c2efd7e00d4c0e9a4dbbd6d600e2ce298078d33efbb19cd7" +checksum = "7e10ca565da6500cca015ba35ee424d59798f2e1b85bc0dd8f81dafd401f029a" dependencies = [ "alloy-dyn-abi", "alloy-primitives", @@ -531,9 +531,9 @@ dependencies = [ [[package]] name = "alloy-signer-aws" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71ce77227fdb9059fd7a3b38a8679c0dae95d81886ee8c13ef8ad99d74866bbd" +checksum = "1e774d4203ad7dbeba06876c8528a169b7cb56770bd900bc061e6a2c2756a736" dependencies = [ "alloy-consensus", "alloy-network", @@ -549,9 +549,9 @@ dependencies = [ [[package]] name = "alloy-signer-gcp" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7622438a51e1fa6379cad6bff52e0cde88b0d4e5e3f2f15e5feebdee527ef5f2" +checksum = "9843facd50077d2010ac0ef9e9176f8a06f2e2c8e653d83d82859803c623c6fc" dependencies = [ "alloy-consensus", "alloy-network", @@ -567,9 +567,9 @@ dependencies = [ [[package]] name = "alloy-signer-ledger" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7b56789cbd13bace37acd7afd080aa7002ed65ab84f0220cd0c32e162b0afd6" +checksum = "08367716d2eee6f15f0f7ee2e855decbfedd12be12fe5f490a2d2717deda95bf" dependencies = [ "alloy-consensus", "alloy-dyn-abi", @@ -587,9 +587,9 @@ dependencies = [ [[package]] name = "alloy-signer-local" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd6d988cb6cd7d2f428a74476515b1a6e901e08c796767f9f93311ab74005c8b" +checksum = "47fababf5a745133490cde927d48e50267f97d3d1209b9fc9f1d1d666964d172" dependencies = [ "alloy-consensus", "alloy-network", @@ -678,9 +678,9 @@ dependencies = [ [[package]] name = "alloy-transport" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d69d36982b9e46075ae6b792b0f84208c6c2c15ad49f6c500304616ef67b70e0" +checksum = "538a04a37221469cac0ce231b737fd174de2fdfcdd843bdd068cb39ed3e066ad" dependencies = [ "alloy-json-rpc", "base64 0.22.1", @@ -698,9 +698,9 @@ dependencies = [ [[package]] name = "alloy-transport-http" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e02ffd5d93ffc51d72786e607c97de3b60736ca3e636ead0ec1f7dce68ea3fd" +checksum = "2ed40eb1e1265b2911512f6aa1dcece9702d078f5a646730c45e39e2be00ac1c" dependencies = [ "alloy-json-rpc", "alloy-transport", @@ -713,9 +713,9 @@ dependencies = [ [[package]] name = "alloy-transport-ipc" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b6f8b87cb84bae6d81ae6604b37741c8116f84f9784a0ecc6038c302e679d23" +checksum = "a7a172a59d24706b26a79a837f86d51745cb26ca6f8524712acd0208a14cff95" dependencies = [ "alloy-json-rpc", "alloy-pubsub", @@ -732,9 +732,9 @@ dependencies = [ [[package]] name = "alloy-transport-ws" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c085c4e1e7680b723ffc558f61a22c061ed3f70eb3436f93f3936779c59cec1" +checksum = "fba0e39d181d13c266dbb8ca54ed584a2c66d6e9279afca89c7a6b1825e98abb" dependencies = [ "alloy-pubsub", "alloy-transport", @@ -2650,6 +2650,12 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + [[package]] name = "flate2" version = "1.0.35" @@ -3722,7 +3728,8 @@ dependencies = [ "serde", "serde_json", "sqlx", - "tap_core", + "tap_core 2.0.0 (git+https://github.com/semiotic-ai/timeline-aggregation-protocol?rev=1dada3e)", + "tap_core 2.0.0 (git+https://github.com/semiotic-ai/timeline-aggregation-protocol?rev=3c56018)", "test-assets", "thegraph-core", "thegraph-graphql-http", @@ -3770,7 +3777,7 @@ dependencies = [ "serde_json", "sqlx", "tap_aggregator", - "tap_core", + "tap_core 2.0.0 (git+https://github.com/semiotic-ai/timeline-aggregation-protocol?rev=3c56018)", "tempfile", "test-assets", "test-log", @@ -4372,6 +4379,12 @@ dependencies = [ "version_check", ] +[[package]] +name = "multimap" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" + [[package]] name = "native-tls" version = "0.2.12" @@ -4866,6 +4879,16 @@ dependencies = [ "ucd-trie", ] +[[package]] +name = "petgraph" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" +dependencies = [ + "fixedbitset", + "indexmap 2.7.0", +] + [[package]] name = "pharos" version = "0.5.3" @@ -4986,6 +5009,16 @@ dependencies = [ "yansi", ] +[[package]] +name = "prettyplease" +version = "0.2.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64d1ec885c64d0457d564db4ec299b2dae3f9c02808b8ad9c3a089c591b18033" +dependencies = [ + "proc-macro2", + "syn 2.0.90", +] + [[package]] name = "primitive-types" version = "0.12.2" @@ -5118,6 +5151,27 @@ dependencies = [ "prost-derive", ] +[[package]] +name = "prost-build" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15" +dependencies = [ + "bytes", + "heck 0.4.1", + "itertools 0.10.5", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "regex", + "syn 2.0.90", + "tempfile", +] + [[package]] name = "prost-derive" version = "0.13.3" @@ -6790,23 +6844,27 @@ checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" [[package]] name = "tap_aggregator" version = "0.3.2" -source = "git+https://github.com/semiotic-ai/timeline-aggregation-protocol?rev=1c6e29f#1c6e29f56fc1672087070c7e8e710bac0564e273" +source = "git+https://github.com/semiotic-ai/timeline-aggregation-protocol?rev=3c56018#3c56018a321736ff19103ea69015160c3647364b" dependencies = [ "alloy", "anyhow", "axum", "clap", "futures-util", + "hyper 1.5.1", "jsonrpsee", "lazy_static", "log", "prometheus", + "prost", "ruint", "serde", "serde_json", "strum", - "tap_core", + "tap_core 2.0.0 (git+https://github.com/semiotic-ai/timeline-aggregation-protocol?rev=3c56018)", "tokio", + "tonic", + "tonic-build", "tower 0.4.13", "tracing-subscriber", ] @@ -6814,7 +6872,22 @@ dependencies = [ [[package]] name = "tap_core" version = "2.0.0" -source = "git+https://github.com/semiotic-ai/timeline-aggregation-protocol?rev=1c6e29f#1c6e29f56fc1672087070c7e8e710bac0564e273" +source = "git+https://github.com/semiotic-ai/timeline-aggregation-protocol?rev=1dada3e#1dada3e32a62ce632c0425151960cbe8c914cb3b" +dependencies = [ + "alloy", + "anyhow", + "anymap3", + "async-trait", + "rand", + "serde", + "thiserror 1.0.69", + "tokio", +] + +[[package]] +name = "tap_core" +version = "2.0.0" +source = "git+https://github.com/semiotic-ai/timeline-aggregation-protocol?rev=3c56018#3c56018a321736ff19103ea69015160c3647364b" dependencies = [ "alloy", "anyhow", @@ -6846,7 +6919,8 @@ dependencies = [ "bip39", "indexer-allocation", "lazy_static", - "tap_core", + "tap_core 2.0.0 (git+https://github.com/semiotic-ai/timeline-aggregation-protocol?rev=1dada3e)", + "tap_core 2.0.0 (git+https://github.com/semiotic-ai/timeline-aggregation-protocol?rev=3c56018)", "thegraph-core", "tokio", "typed-builder", @@ -6875,8 +6949,7 @@ dependencies = [ [[package]] name = "thegraph-core" version = "0.9.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e669ad507b7afcf8b2d303e98de2d8bcd7af56042a8626cd708838349cc4d928" +source = "git+https://github.com/edgeandnode/toolshed?rev=d710e05#d710e05faafc8aa241363fdb0751111cba3b8927" dependencies = [ "alloy", "bs58", @@ -7202,6 +7275,21 @@ dependencies = [ "tower-layer", "tower-service", "tracing", + "zstd", +] + +[[package]] +name = "tonic-build" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9557ce109ea773b399c9b9e5dca39294110b74f1f342cb347a80d1fce8c26a11" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "prost-types", + "quote", + "syn 2.0.90", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index fd64e46c..ea807a2a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,14 +52,15 @@ uuid = { version = "1.11.0", features = ["v7"] } tracing = { version = "0.1.40", default-features = false } bigdecimal = "0.4.3" build-info = "0.0.39" -tap_core = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "1c6e29f", default-features = false } -tap_aggregator = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "1c6e29f", default-features = false } +tap_core = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "3c56018", default-features = false } +tap_core_v2 = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "1dada3e", package = "tap_core" } +tap_aggregator = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "3c56018", default-features = false } tracing-subscriber = { version = "0.3", features = [ "json", "env-filter", "ansi", ], default-features = false } -thegraph-core = { version = "0.9.6", features = [ +thegraph-core = { git = "https://github.com/edgeandnode/toolshed", rev= "d710e05", features = [ "attestation", "alloy-eip712", "alloy-sol-types", diff --git a/crates/service/Cargo.toml b/crates/service/Cargo.toml index 167e19c2..7d413eaa 100644 --- a/crates/service/Cargo.toml +++ b/crates/service/Cargo.toml @@ -37,6 +37,7 @@ async-graphql-axum = "7.0.11" base64.workspace = true graphql = { git = "https://github.com/edgeandnode/toolshed", tag = "graphql-v0.3.0" } tap_core.workspace = true +tap_core_v2.workspace = true uuid.workspace = true typed-builder.workspace = true tower_governor = { version = "0.5.0", features = ["axum"] } diff --git a/crates/service/src/error.rs b/crates/service/src/error.rs index 86a13622..af1778ef 100644 --- a/crates/service/src/error.rs +++ b/crates/service/src/error.rs @@ -28,7 +28,11 @@ pub enum IndexerServiceError { SerializationError(#[from] serde_json::Error), #[error("Issues with provided receipt: {0}")] - TapCoreError(#[from] tap_core::Error), + TapCoreErrorV1(#[from] tap_core::Error), + + #[error("Issues with provided receipt: {0}")] + TapCoreErrorV2(#[from] tap_core_v2::Error), + #[error("There was an error while accessing escrow account: {0}")] EscrowAccount(#[from] EscrowAccountsError), } @@ -37,11 +41,18 @@ impl StatusCodeExt for IndexerServiceError { fn status_code(&self) -> StatusCode { use IndexerServiceError as E; match &self { - E::TapCoreError(ref error) => match error { + E::TapCoreErrorV1(ref error) => match error { TapError::SignatureError(_) | TapError::ReceiptError(ReceiptError::CheckFailure(_)) => StatusCode::BAD_REQUEST, _ => StatusCode::INTERNAL_SERVER_ERROR, }, + E::TapCoreErrorV2(ref error) => match error { + tap_core_v2::Error::SignatureError(_) + | tap_core_v2::Error::ReceiptError( + tap_core_v2::receipt::ReceiptError::CheckFailure(_), + ) => StatusCode::BAD_REQUEST, + _ => StatusCode::INTERNAL_SERVER_ERROR, + }, E::EscrowAccount(_) | E::ReceiptNotFound => StatusCode::PAYMENT_REQUIRED, E::DeploymentIdNotFound => StatusCode::INTERNAL_SERVER_ERROR, E::AxumError(_) | E::SerializationError(_) => StatusCode::BAD_GATEWAY, diff --git a/crates/service/src/middleware/auth.rs b/crates/service/src/middleware/auth.rs index f0a6c2cb..dbbdb181 100644 --- a/crates/service/src/middleware/auth.rs +++ b/crates/service/src/middleware/auth.rs @@ -1,13 +1,18 @@ // Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 +mod async_validate; mod bearer; mod or; -mod tap; +mod tap_v1; +mod tap_v2; +#[cfg(test)] +pub use async_validate::wrap; pub use bearer::Bearer; pub use or::OrExt; -pub use tap::tap_receipt_authorize; +pub use tap_v1::tap_receipt_authorize as tap_receipt_authorize_v1; +pub use tap_v2::tap_receipt_authorize as tap_receipt_authorize_v2; #[cfg(test)] mod tests { @@ -28,7 +33,7 @@ mod tests { use crate::{ middleware::auth::{self, Bearer, OrExt}, - tap::IndexerTapContext, + tap::IndexerTapContextV1, }; const BEARER_TOKEN: &str = "test"; @@ -36,7 +41,7 @@ mod tests { async fn service( pgpool: PgPool, ) -> impl Service, Response = Response, Error = impl std::fmt::Debug> { - let context = IndexerTapContext::new(pgpool.clone(), TAP_EIP712_DOMAIN.clone()).await; + let context = IndexerTapContextV1::new(pgpool.clone(), TAP_EIP712_DOMAIN.clone()).await; let tap_manager = Arc::new(Manager::new( TAP_EIP712_DOMAIN.clone(), context, @@ -54,7 +59,7 @@ mod tests { .unwrap(), )); let free_query = Bearer::new(BEARER_TOKEN); - let tap_auth = auth::tap_receipt_authorize(tap_manager, metric); + let tap_auth = auth::tap_receipt_authorize_v1(tap_manager, metric); let authorize_requests = free_query.or(tap_auth); let authorization_middleware = AsyncRequireAuthorizationLayer::new(authorize_requests); diff --git a/crates/service/src/middleware/auth/async_validate.rs b/crates/service/src/middleware/auth/async_validate.rs new file mode 100644 index 00000000..ef92baea --- /dev/null +++ b/crates/service/src/middleware/auth/async_validate.rs @@ -0,0 +1,66 @@ +use std::future::Future; + +use axum::{ + body::Body, + http::{request::Parts, Request, Response}, +}; +use tower_http::auth::AsyncAuthorizeRequest; + +pub trait AsyncAuthorizeRequestExt { + /// The body type used for responses to unauthorized requests. + type ResponseBody; + + /// Authorize the request. + /// + /// If the future resolves to `Ok(request)` then the request is allowed through, otherwise not. + fn authorize( + &self, + request: &mut Parts, + ) -> impl Future>> + Send; +} + +//) -> impl AsyncAuthorizeRequest< +// B, +// RequestBody = B, +// ResponseBody = Body, +// Future = impl Future, Response>> + Send, +//> + Clone +// + Send +//where +// T: ReceiptStore + Sync + Send + 'static, +// B: Send, + +pub fn wrap( + my_fn: impl AsyncAuthorizeRequestExt + Clone + Send, +) -> impl AsyncAuthorizeRequest< + B, + RequestBody = B, + ResponseBody = Body, + Future = impl Future, Response>> + Send, +> + Clone + + Send +where + B: Send, +{ + move |request: Request| { + let my_fn = my_fn.clone(); + async move { + let (mut parts, body) = request.into_parts(); + my_fn.authorize(&mut parts).await?; + let request = Request::from_parts(parts, body); + Ok(request) + } + } +} + +impl AsyncAuthorizeRequestExt for F +where + F: Fn(&mut Parts) -> Fut + Send + Sync, + Fut: Future>> + Send, +{ + type ResponseBody = ResBody; + + async fn authorize(&self, request: &mut Parts) -> Result<(), Response> { + self(request).await + } +} diff --git a/crates/service/src/middleware/auth/bearer.rs b/crates/service/src/middleware/auth/bearer.rs index cae0c51d..dc593edd 100644 --- a/crates/service/src/middleware/auth/bearer.rs +++ b/crates/service/src/middleware/auth/bearer.rs @@ -8,9 +8,10 @@ use std::{fmt, marker::PhantomData}; -use axum::http::{HeaderValue, Request, Response}; +use axum::http::{request::Parts, HeaderValue, Response}; use reqwest::{header, StatusCode}; -use tower_http::validate_request::ValidateRequest; + +use super::async_validate::AsyncAuthorizeRequestExt; pub struct Bearer { header_value: HeaderValue, @@ -48,14 +49,14 @@ impl fmt::Debug for Bearer { } } -impl ValidateRequest for Bearer +impl AsyncAuthorizeRequestExt for Bearer where ResBody: Default, { type ResponseBody = ResBody; - fn validate(&mut self, request: &mut Request) -> Result<(), Response> { - match request.headers().get(header::AUTHORIZATION) { + async fn authorize(&self, request: &mut Parts) -> Result<(), Response> { + match request.headers.get(header::AUTHORIZATION) { Some(actual) if actual == self.header_value => Ok(()), _ => { let mut res = Response::new(ResBody::default()); diff --git a/crates/service/src/middleware/auth/or.rs b/crates/service/src/middleware/auth/or.rs index 6f27ea24..039faf89 100644 --- a/crates/service/src/middleware/auth/or.rs +++ b/crates/service/src/middleware/auth/or.rs @@ -6,27 +6,25 @@ //! executes a ValidateRequest returning the request if it succeeds //! or else, executes the future and return it -use std::{future::Future, marker::PhantomData, pin::Pin, task::Poll}; +use std::marker::PhantomData; -use axum::http::{Request, Response}; -use pin_project::pin_project; -use tower_http::{auth::AsyncAuthorizeRequest, validate_request::ValidateRequest}; +use axum::http::{request::Parts, Request, Response}; +use thegraph_core::alloy::transports::BoxFuture; +use tower_http::auth::AsyncAuthorizeRequest; + +use super::async_validate::AsyncAuthorizeRequestExt; /// Extension that allows using a simple .or() function and return an Or struct pub trait OrExt: Sized { fn or(self, other: T) -> Or; } -impl OrExt for T +impl OrExt for T where B: 'static + Send, Resp: 'static + Send, - T: ValidateRequest, - A: AsyncAuthorizeRequest - + Clone - + 'static - + Send, - Fut: Future, Response>> + Send, + T: AsyncAuthorizeRequestExt + Clone + 'static + Send, + A: AsyncAuthorizeRequestExt + Clone + 'static + Send, { fn or(self, other: A) -> Or { Or(self, other, PhantomData) @@ -50,79 +48,42 @@ where } } -impl AsyncAuthorizeRequest for Or +impl AsyncAuthorizeRequestExt for Or where - Req: 'static + Send, - Resp: 'static + Send, - T: ValidateRequest, - E: AsyncAuthorizeRequest - + Clone - + 'static - + Send, - Fut: Future, Response>> + Send, + T: AsyncAuthorizeRequestExt + Clone + 'static + Send + Sync, + E: AsyncAuthorizeRequestExt + Clone + 'static + Send + Sync, { - type RequestBody = Req; type ResponseBody = Resp; - type Future = OrFuture; - - fn authorize(&mut self, mut request: axum::http::Request) -> Self::Future { - let mut this = self.1.clone(); - if self.0.validate(&mut request).is_ok() { - return OrFuture::with_result(Ok(request)); - } - OrFuture::with_future(this.authorize(request)) - } -} - -#[pin_project::pin_project(project = KindProj)] -pub enum Kind { - QueryResult { - #[pin] - fut: Fut, - }, - ReturnResult { - validation_result: Option, Response>>, - }, -} - -#[pin_project] -pub struct OrFuture { - #[pin] - kind: Kind, -} - -impl OrFuture { - fn with_result(validation_result: Result, Response>) -> Self { - let validation_result = Some(validation_result); - Self { - kind: Kind::ReturnResult { validation_result }, - } - } - - fn with_future(fut: Fut) -> Self { - Self { - kind: Kind::QueryResult { fut }, + async fn authorize(&self, parts: &mut Parts) -> Result<(), Response> { + if self.0.authorize(parts).await.is_err() { + self.1.authorize(parts).await?; } + Ok(()) } } -impl Future for OrFuture +impl AsyncAuthorizeRequest for Or where - Fut: Future, Response>>, + Req: 'static + Send, + Resp: 'static + Send, + T: AsyncAuthorizeRequestExt + Clone + 'static + Send, + E: AsyncAuthorizeRequestExt + Clone + 'static + Send, { - type Output = Result, Response>; + type RequestBody = Req; + type ResponseBody = Resp; - fn poll( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll { - let this = self.project(); - match this.kind.project() { - KindProj::QueryResult { fut } => fut.poll(cx), - KindProj::ReturnResult { validation_result } => { - Poll::Ready(validation_result.take().expect("cannot poll twice")) + type Future = BoxFuture<'static, Result, Response>>; + + fn authorize(&mut self, req: axum::http::Request) -> Self::Future { + let this = self.clone(); + Box::pin(async move { + let (mut parts, body) = req.into_parts(); + if this.0.authorize(&mut parts).await.is_err() { + this.1.authorize(&mut parts).await?; } - } + let req = Request::from_parts(parts, body); + Ok(req) + }) } } diff --git a/crates/service/src/middleware/auth/tap.rs b/crates/service/src/middleware/auth/tap_v1.rs similarity index 87% rename from crates/service/src/middleware/auth/tap.rs rename to crates/service/src/middleware/auth/tap_v1.rs index 68a58b7a..0c3e2e48 100644 --- a/crates/service/src/middleware/auth/tap.rs +++ b/crates/service/src/middleware/auth/tap_v1.rs @@ -9,46 +9,36 @@ //! This also uses MetricLabels injected in the receipts to provide //! metrics related to receipt check failure -use std::{future::Future, sync::Arc}; +use std::sync::Arc; -use axum::{ - body::Body, - http::{Request, Response}, - response::IntoResponse, -}; +use axum::{body::Body, http::request::Parts, response::IntoResponse}; use tap_core::{ manager::{adapters::ReceiptStore, Manager}, receipt::{Context, SignedReceipt}, }; -use tower_http::auth::AsyncAuthorizeRequest; use crate::{error::IndexerServiceError, middleware::prometheus_metrics::MetricLabels}; +use super::async_validate::AsyncAuthorizeRequestExt; + /// Middleware to verify and store TAP receipts /// /// It also optionally updates a failed receipt metric if Labels are provided /// /// Requires SignedReceipt, MetricLabels and Arc extensions -pub fn tap_receipt_authorize( +pub fn tap_receipt_authorize( tap_manager: Arc>, failed_receipt_metric: &'static prometheus::CounterVec, -) -> impl AsyncAuthorizeRequest< - B, - RequestBody = B, - ResponseBody = Body, - Future = impl Future, Response>> + Send, -> + Clone - + Send +) -> impl AsyncAuthorizeRequestExt + Clone + Send where T: ReceiptStore + Sync + Send + 'static, - B: Send, { - move |request: Request| { - let receipt = request.extensions().get::().cloned(); + move |request: &mut Parts| { + let receipt = request.extensions.remove::(); // load labels from previous middlewares - let labels = request.extensions().get::().cloned(); + let labels = request.extensions.get::().cloned(); // load context from previous middlewares - let ctx = request.extensions().get::>().cloned(); + let ctx = request.extensions.get::>().cloned(); let tap_manager = tap_manager.clone(); async move { @@ -65,7 +55,7 @@ where .inc() } })?; - Ok::<_, IndexerServiceError>(request) + Ok::<_, IndexerServiceError>(()) }; execute().await.map_err(|error| error.into_response()) } @@ -102,10 +92,10 @@ mod tests { use crate::{ middleware::{ - auth::tap_receipt_authorize, + auth::{self, tap_receipt_authorize_v1}, prometheus_metrics::{MetricLabelProvider, MetricLabels}, }, - tap::IndexerTapContext, + tap::IndexerTapContextV1, }; #[fixture] @@ -129,7 +119,7 @@ mod tests { metric: &'static prometheus::CounterVec, pgpool: PgPool, ) -> impl Service, Response = Response, Error = impl std::fmt::Debug> { - let context = IndexerTapContext::new(pgpool, TAP_EIP712_DOMAIN.clone()).await; + let context = IndexerTapContextV1::new(pgpool, TAP_EIP712_DOMAIN.clone()).await; struct MyCheck; #[async_trait::async_trait] @@ -152,8 +142,8 @@ mod tests { context, CheckList::new(vec![Arc::new(MyCheck)]), )); - let tap_auth = tap_receipt_authorize(manager, metric); - let authorization_middleware = AsyncRequireAuthorizationLayer::new(tap_auth); + let tap_auth = tap_receipt_authorize_v1(manager, metric); + let authorization_middleware = AsyncRequireAuthorizationLayer::new(auth::wrap(tap_auth)); let mut service = ServiceBuilder::new() .layer(authorization_middleware) diff --git a/crates/service/src/middleware/auth/tap_v2.rs b/crates/service/src/middleware/auth/tap_v2.rs new file mode 100644 index 00000000..ed8f120d --- /dev/null +++ b/crates/service/src/middleware/auth/tap_v2.rs @@ -0,0 +1,231 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +//! Validates Tap receipts +//! +//! This looks for a Context in the extensions of the request to inject +//! as part of the checks. +//! +//! This also uses MetricLabels injected in the receipts to provide +//! metrics related to receipt check failure + +use std::sync::Arc; + +use axum::{body::Body, http::request::Parts, response::IntoResponse}; +use tap_core_v2::{ + manager::{adapters::ReceiptStore, Manager}, + receipt::{Context, SignedReceipt}, +}; + +use crate::{error::IndexerServiceError, middleware::prometheus_metrics::MetricLabels}; + +use super::async_validate::AsyncAuthorizeRequestExt; + +/// Middleware to verify and store TAP receipts +/// +/// It also optionally updates a failed receipt metric if Labels are provided +/// +/// Requires SignedReceipt, MetricLabels and Arc extensions +pub fn tap_receipt_authorize( + tap_manager: Arc>, + failed_receipt_metric: &'static prometheus::CounterVec, +) -> impl AsyncAuthorizeRequestExt + Clone + Send +where + T: ReceiptStore + Sync + Send + 'static, +{ + move |request: &mut Parts| { + let receipt = request.extensions.remove::(); + // load labels from previous middlewares + let labels = request.extensions.get::().cloned(); + // load context from previous middlewares + let ctx = request.extensions.get::>().cloned(); + let tap_manager = tap_manager.clone(); + + async move { + let execute = || async { + let receipt = receipt.ok_or(IndexerServiceError::ReceiptNotFound)?; + // Verify the receipt and store it in the database + tap_manager + .verify_and_store_receipt(&ctx.unwrap_or_default(), receipt) + .await + .inspect_err(|_| { + if let Some(labels) = labels { + failed_receipt_metric + .with_label_values(&labels.get_labels()) + .inc() + } + })?; + Ok::<_, IndexerServiceError>(()) + }; + execute().await.map_err(|error| error.into_response()) + } + } +} + +#[cfg(test)] +mod tests { + + use core::panic; + use std::sync::Arc; + + use axum::{ + body::Body, + http::{Request, Response}, + }; + use prometheus::core::Collector; + use reqwest::StatusCode; + use rstest::*; + use sqlx::PgPool; + use tap_core_v2::{ + manager::Manager, + receipt::{ + checks::{Check, CheckError, CheckList, CheckResult}, + state::Checking, + ReceiptWithState, + }, + }; + use test_assets::{ + assert_while_retry, create_signed_receipt_v2, SignedReceiptV2Request, TAP_EIP712_DOMAIN, + }; + use tower::{Service, ServiceBuilder, ServiceExt}; + use tower_http::auth::AsyncRequireAuthorizationLayer; + + use crate::{ + middleware::{ + auth::{self, tap_receipt_authorize_v2}, + prometheus_metrics::{MetricLabelProvider, MetricLabels}, + }, + tap::IndexerTapContextV2, + }; + + #[fixture] + fn metric() -> &'static prometheus::CounterVec { + let registry = prometheus::Registry::new(); + let metric = Box::leak(Box::new( + prometheus::register_counter_vec_with_registry!( + "tap_middleware_test", + "Failed queries to handler", + &["deployment"], + registry, + ) + .unwrap(), + )); + metric + } + + const FAILED_NONCE: u64 = 99; + + async fn service( + metric: &'static prometheus::CounterVec, + pgpool: PgPool, + ) -> impl Service, Response = Response, Error = impl std::fmt::Debug> { + let context = IndexerTapContextV2::new(pgpool, TAP_EIP712_DOMAIN.clone()).await; + + struct MyCheck; + #[async_trait::async_trait] + impl Check for MyCheck { + async fn check( + &self, + _: &tap_core::receipt::Context, + receipt: &ReceiptWithState, + ) -> CheckResult { + if receipt.signed_receipt().message.nonce == FAILED_NONCE { + Err(CheckError::Failed(anyhow::anyhow!("Failed"))) + } else { + Ok(()) + } + } + } + + let manager = Arc::new(Manager::new( + TAP_EIP712_DOMAIN.clone(), + context, + CheckList::new(vec![Arc::new(MyCheck)]), + )); + let tap_auth = tap_receipt_authorize_v2(manager, metric); + let authorization_middleware = AsyncRequireAuthorizationLayer::new(auth::wrap(tap_auth)); + + let mut service = ServiceBuilder::new() + .layer(authorization_middleware) + .service_fn(|_: Request| async { + Ok::<_, anyhow::Error>(Response::new(Body::default())) + }); + + service.ready().await.unwrap(); + service + } + + #[rstest] + #[sqlx::test(migrations = "../../migrations")] + async fn test_tap_valid_receipt( + metric: &'static prometheus::CounterVec, + #[ignore] pgpool: PgPool, + ) { + let mut service = service(metric, pgpool.clone()).await; + + let receipt = create_signed_receipt_v2(SignedReceiptV2Request::builder().build()).await; + + // check with receipt + let mut req = Request::new(Body::default()); + req.extensions_mut().insert(receipt); + let res = service.call(req).await.unwrap(); + assert_eq!(res.status(), StatusCode::OK); + + // verify receipts + assert_while_retry!({ + sqlx::query!("SELECT * FROM tap_v2_receipts") + .fetch_all(&pgpool) + .await + .unwrap() + .is_empty() + }) + } + + #[rstest] + #[sqlx::test(migrations = "../../migrations")] + async fn test_invalid_receipt_with_failed_metric( + metric: &'static prometheus::CounterVec, + #[ignore] pgpool: PgPool, + ) { + let mut service = service(metric, pgpool.clone()).await; + // if it fails tap receipt, should return failed to process payment + tap message + + assert_eq!(metric.collect().first().unwrap().get_metric().len(), 0); + + struct TestLabel; + impl MetricLabelProvider for TestLabel { + fn get_labels(&self) -> Vec<&str> { + vec!["label1"] + } + } + + // default labels, all empty + let labels: MetricLabels = Arc::new(TestLabel); + + let mut receipt = create_signed_receipt_v2(SignedReceiptV2Request::builder().build()).await; + // change the nonce to make the receipt invalid + receipt.message.nonce = FAILED_NONCE; + let mut req = Request::new(Body::default()); + req.extensions_mut().insert(receipt); + req.extensions_mut().insert(labels); + let response = service.call(req); + + assert_eq!(response.await.unwrap().status(), StatusCode::BAD_REQUEST); + + assert_eq!(metric.collect().first().unwrap().get_metric().len(), 1); + } + + #[rstest] + #[sqlx::test(migrations = "../../migrations")] + async fn test_tap_missing_signed_receipt( + metric: &'static prometheus::CounterVec, + #[ignore] pgpool: PgPool, + ) { + let mut service = service(metric, pgpool.clone()).await; + // if it doesnt contain the signed receipt + // should return payment required + let req = Request::new(Body::default()); + let res = service.call(req).await.unwrap(); + assert_eq!(res.status(), StatusCode::PAYMENT_REQUIRED); + } +} diff --git a/crates/service/src/middleware/sender.rs b/crates/service/src/middleware/sender.rs index ecfb48d0..6a52f00c 100644 --- a/crates/service/src/middleware/sender.rs +++ b/crates/service/src/middleware/sender.rs @@ -53,6 +53,18 @@ pub async fn sender_middleware( request.extensions_mut().insert(Sender(sender)); } + if let Some(receipt) = request + .extensions() + .get::() + { + let signer = receipt.recover_signer(&state.domain_separator)?; + let sender = state + .escrow_accounts + .borrow() + .get_sender_for_signer(&signer)?; + request.extensions_mut().insert(Sender(sender)); + } + Ok(next.run(request).await) } diff --git a/crates/service/src/middleware/tap_receipt.rs b/crates/service/src/middleware/tap_receipt.rs index 3d068443..93c42d4a 100644 --- a/crates/service/src/middleware/tap_receipt.rs +++ b/crates/service/src/middleware/tap_receipt.rs @@ -14,10 +14,15 @@ use crate::service::TapReceipt; /// /// This is useful to not deserialize multiple times the same receipt pub async fn receipt_middleware(mut request: Request, next: Next) -> Response { - if let Ok(TypedHeader(TapReceipt(receipt))) = - request.extract_parts::>().await - { - request.extensions_mut().insert(receipt); + if let Ok(TypedHeader(receipt)) = request.extract_parts::>().await { + match receipt { + TapReceipt::V1(receipt) => { + request.extensions_mut().insert(receipt); + } + TapReceipt::V2(receipt) => { + request.extensions_mut().insert(receipt); + } + } } next.run(request).await } diff --git a/crates/service/src/service/router.rs b/crates/service/src/service/router.rs index 8768c7bb..93fa6770 100644 --- a/crates/service/src/service/router.rs +++ b/crates/service/src/service/router.rs @@ -52,7 +52,7 @@ use crate::{ dips::{self, Price}, health, request_handler, static_subgraph_request_handler, }, - tap::IndexerTapContext, + tap::{IndexerTapContextV1, IndexerTapContextV2}, wallet::public_key, }; @@ -267,18 +267,18 @@ impl ServiceRouter { let post_request_handler = { // Create tap manager to validate receipts - let tap_manager = { + let tap_manager_v1 = { // Create context let indexer_context = - IndexerTapContext::new(self.database.clone(), self.domain_separator.clone()) + IndexerTapContextV1::new(self.database.clone(), self.domain_separator.clone()) .await; let timestamp_error_tolerance = self.timestamp_buffer_secs; let receipt_max_value = max_receipt_value_grt.get_value(); // Create checks - let checks = IndexerTapContext::get_checks( - self.database, + let checks = IndexerTapContextV1::get_checks( + self.database.clone(), allocations.clone(), escrow_accounts.clone(), timestamp_error_tolerance, @@ -293,6 +293,31 @@ impl ServiceRouter { )) }; + let tap_manager_v2 = { + // Create context + let indexer_context = + IndexerTapContextV2::new(self.database.clone(), self.domain_separator.clone()) + .await; + + let timestamp_error_tolerance = self.timestamp_buffer_secs; + let receipt_max_value = max_receipt_value_grt.get_value(); + + // Create checks + let checks = IndexerTapContextV2::get_checks( + self.database, + escrow_accounts.clone(), + timestamp_error_tolerance, + receipt_max_value, + ) + .await; + // Returned static Manager + Arc::new(tap_core_v2::manager::Manager::new( + self.domain_separator.clone(), + indexer_context, + tap_core_v2::receipt::checks::CheckList::new(checks), + )) + }; + let attestation_state = AttestationState { attestation_signers, }; @@ -307,8 +332,9 @@ impl ServiceRouter { // inject auth let failed_receipt_metric = Box::leak(Box::new(FAILED_RECEIPT.clone())); - let tap_auth = auth::tap_receipt_authorize(tap_manager, failed_receipt_metric); - + let tap_auth_v1 = auth::tap_receipt_authorize_v1(tap_manager_v1, failed_receipt_metric); + let tap_auth_v2 = auth::tap_receipt_authorize_v2(tap_manager_v2, failed_receipt_metric); + let tap_auth = tap_auth_v1.or(tap_auth_v2); if let Some(free_auth_token) = &free_query_auth_token { let free_query = Bearer::new(free_auth_token); let result = free_query.or(tap_auth); diff --git a/crates/service/src/service/tap_receipt_header.rs b/crates/service/src/service/tap_receipt_header.rs index 814b2c93..a1dc4536 100644 --- a/crates/service/src/service/tap_receipt_header.rs +++ b/crates/service/src/service/tap_receipt_header.rs @@ -4,10 +4,81 @@ use axum_extra::headers::{self, Header, HeaderName, HeaderValue}; use lazy_static::lazy_static; use prometheus::{register_counter, Counter}; -use tap_core::receipt::SignedReceipt; +use serde::de; +use serde_json::Value; +use tap_core::receipt::SignedReceipt as SignedReceiptV1; +use tap_core_v2::receipt::SignedReceipt as SignedReceiptV2; -#[derive(Debug, PartialEq)] -pub struct TapReceipt(pub SignedReceipt); +#[derive(Debug, PartialEq, Clone, serde::Serialize)] +#[serde(untagged)] +pub enum TapReceipt { + V1(SignedReceiptV1), + V2(SignedReceiptV2), +} + +impl<'de> serde::Deserialize<'de> for TapReceipt { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let temp = Value::deserialize(deserializer)?; + + let is_v1 = temp + .as_object() + .ok_or(de::Error::custom("Didn't receive an object"))? + .get("message") + .ok_or(de::Error::custom("There's no message in the object"))? + .as_object() + .ok_or(de::Error::custom("Message is not an object"))? + .contains_key("allocation_id"); + + if is_v1 { + // Try V1 first + serde_json::from_value::(temp) + .map(TapReceipt::V1) + .map_err(de::Error::custom) + } else { + // Fall back to V2 + serde_json::from_value::(temp) + .map(TapReceipt::V2) + .map_err(de::Error::custom) + } + } +} + +impl From for TapReceipt { + fn from(value: SignedReceiptV1) -> Self { + Self::V1(value) + } +} + +impl From for TapReceipt { + fn from(value: SignedReceiptV2) -> Self { + Self::V2(value) + } +} + +impl TryFrom for SignedReceiptV1 { + type Error = anyhow::Error; + + fn try_from(value: TapReceipt) -> Result { + match value { + TapReceipt::V2(_) => Err(anyhow::anyhow!("TapReceipt is V2")), + TapReceipt::V1(receipt) => Ok(receipt), + } + } +} + +impl TryFrom for SignedReceiptV2 { + type Error = anyhow::Error; + + fn try_from(value: TapReceipt) -> Result { + match value { + TapReceipt::V1(_) => Err(anyhow::anyhow!("TapReceipt is V1")), + TapReceipt::V2(receipt) => Ok(receipt), + } + } +} lazy_static! { static ref TAP_RECEIPT: HeaderName = HeaderName::from_static("tap-receipt"); @@ -30,9 +101,9 @@ impl Header for TapReceipt { let raw_receipt = raw_receipt .to_str() .map_err(|_| headers::Error::invalid())?; - let parsed_receipt = + let parsed_receipt: TapReceipt = serde_json::from_str(raw_receipt).map_err(|_| headers::Error::invalid())?; - Ok(TapReceipt(parsed_receipt)) + Ok(parsed_receipt) }; execute().inspect_err(|_| TAP_RECEIPT_INVALID.inc()) } @@ -49,7 +120,10 @@ impl Header for TapReceipt { mod test { use axum::http::HeaderValue; use axum_extra::headers::Header; - use test_assets::{create_signed_receipt, SignedReceiptRequest}; + use test_assets::{ + create_signed_receipt, create_signed_receipt_v2, SignedReceiptRequest, + SignedReceiptV2Request, + }; use super::TapReceipt; @@ -57,12 +131,43 @@ mod test { async fn test_decode_valid_tap_receipt_header() { let original_receipt = create_signed_receipt(SignedReceiptRequest::builder().build()).await; let serialized_receipt = serde_json::to_string(&original_receipt).unwrap(); - let header_value = HeaderValue::from_str(&serialized_receipt).unwrap(); + + let original_receipt_v1: TapReceipt = original_receipt.clone().into(); + let serialized_receipt_v1 = serde_json::to_string(&original_receipt_v1).unwrap(); + + assert_eq!(serialized_receipt, serialized_receipt_v1); + + println!("Was able to serialize properly: {serialized_receipt_v1:?}"); + let deserialized: TapReceipt = serde_json::from_str(&serialized_receipt_v1).unwrap(); + println!("Was able to deserialize properly: {deserialized:?}"); + let header_value = HeaderValue::from_str(&serialized_receipt_v1).unwrap(); + let header_values = vec![&header_value]; + let decoded_receipt = TapReceipt::decode(&mut header_values.into_iter()) + .expect("tap receipt header value should be valid"); + + assert_eq!(decoded_receipt, original_receipt.into()); + } + + #[tokio::test] + async fn test_decode_valid_tap_v2_receipt_header() { + let original_receipt = + create_signed_receipt_v2(SignedReceiptV2Request::builder().build()).await; + let serialized_receipt = serde_json::to_string(&original_receipt).unwrap(); + + let original_receipt_v1: TapReceipt = original_receipt.clone().into(); + let serialized_receipt_v1 = serde_json::to_string(&original_receipt_v1).unwrap(); + + assert_eq!(serialized_receipt, serialized_receipt_v1); + + println!("Was able to serialize properly: {serialized_receipt_v1:?}"); + let deserialized: TapReceipt = serde_json::from_str(&serialized_receipt_v1).unwrap(); + println!("Was able to deserialize properly: {deserialized:?}"); + let header_value = HeaderValue::from_str(&serialized_receipt_v1).unwrap(); let header_values = vec![&header_value]; let decoded_receipt = TapReceipt::decode(&mut header_values.into_iter()) .expect("tap receipt header value should be valid"); - assert_eq!(decoded_receipt, TapReceipt(original_receipt)); + assert_eq!(decoded_receipt, original_receipt.into()); } #[test] diff --git a/crates/service/src/tap.rs b/crates/service/src/tap.rs index e6ca2b48..1a774f7d 100644 --- a/crates/service/src/tap.rs +++ b/crates/service/src/tap.rs @@ -1,81 +1,6 @@ -// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 +mod v1; +mod v2; -use std::{collections::HashMap, fmt::Debug, sync::Arc, time::Duration}; - -use indexer_allocation::Allocation; -use indexer_monitor::EscrowAccounts; -use receipt_store::{DatabaseReceipt, InnerContext}; -use sqlx::PgPool; -use tap_core::receipt::checks::ReceiptCheck; -use thegraph_core::alloy::{primitives::Address, sol_types::Eip712Domain}; -use tokio::sync::{ - mpsc::{self, Sender}, - watch::Receiver, -}; -use tokio_util::sync::CancellationToken; - -use crate::tap::checks::{ - allocation_eligible::AllocationEligible, deny_list_check::DenyListCheck, - receipt_max_val_check::ReceiptMaxValueCheck, sender_balance_check::SenderBalanceCheck, - timestamp_check::TimestampCheck, value_check::MinimumValue, -}; - -mod checks; -mod receipt_store; - -pub use checks::value_check::AgoraQuery; - -const GRACE_PERIOD: u64 = 60; - -#[derive(Clone)] -pub struct IndexerTapContext { - domain_separator: Arc, - receipt_producer: Sender, - cancelation_token: CancellationToken, -} - -#[derive(Debug, thiserror::Error)] -pub enum AdapterError { - #[error(transparent)] - AnyhowError(#[from] anyhow::Error), -} - -impl IndexerTapContext { - pub async fn get_checks( - pgpool: PgPool, - indexer_allocations: Receiver>, - escrow_accounts: Receiver, - timestamp_error_tolerance: Duration, - receipt_max_value: u128, - ) -> Vec { - vec![ - Arc::new(AllocationEligible::new(indexer_allocations)), - Arc::new(SenderBalanceCheck::new(escrow_accounts)), - Arc::new(TimestampCheck::new(timestamp_error_tolerance)), - Arc::new(DenyListCheck::new(pgpool.clone()).await), - Arc::new(ReceiptMaxValueCheck::new(receipt_max_value)), - Arc::new(MinimumValue::new(pgpool, Duration::from_secs(GRACE_PERIOD)).await), - ] - } - - pub async fn new(pgpool: PgPool, domain_separator: Eip712Domain) -> Self { - const MAX_RECEIPT_QUEUE_SIZE: usize = 1000; - let (tx, rx) = mpsc::channel(MAX_RECEIPT_QUEUE_SIZE); - let cancelation_token = CancellationToken::new(); - let inner = InnerContext { pgpool }; - Self::spawn_store_receipt_task(inner, rx, cancelation_token.clone()); - - Self { - cancelation_token, - receipt_producer: tx, - domain_separator: Arc::new(domain_separator), - } - } -} - -impl Drop for IndexerTapContext { - fn drop(&mut self) { - self.cancelation_token.cancel(); - } -} +pub use v1::AgoraQuery; +pub use v1::IndexerTapContext as IndexerTapContextV1; +pub use v2::IndexerTapContext as IndexerTapContextV2; diff --git a/crates/service/src/tap/v1.rs b/crates/service/src/tap/v1.rs new file mode 100644 index 00000000..25b3b546 --- /dev/null +++ b/crates/service/src/tap/v1.rs @@ -0,0 +1,81 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use std::{collections::HashMap, fmt::Debug, sync::Arc, time::Duration}; + +use indexer_allocation::Allocation; +use indexer_monitor::EscrowAccounts; +use receipt_store::{DatabaseReceipt, InnerContext}; +use sqlx::PgPool; +use tap_core::receipt::checks::ReceiptCheck; +use thegraph_core::alloy::{primitives::Address, sol_types::Eip712Domain}; +use tokio::sync::{ + mpsc::{self, Sender}, + watch::Receiver, +}; +use tokio_util::sync::CancellationToken; + +use crate::tap::v1::checks::{ + allocation_eligible::AllocationEligible, deny_list_check::DenyListCheck, + receipt_max_val_check::ReceiptMaxValueCheck, sender_balance_check::SenderBalanceCheck, + timestamp_check::TimestampCheck, value_check::MinimumValue, +}; + +mod checks; +mod receipt_store; + +pub use checks::value_check::AgoraQuery; + +const GRACE_PERIOD: u64 = 60; + +#[derive(Clone)] +pub struct IndexerTapContext { + domain_separator: Arc, + receipt_producer: Sender, + cancelation_token: CancellationToken, +} + +#[derive(Debug, thiserror::Error)] +pub enum AdapterError { + #[error(transparent)] + AnyhowError(#[from] anyhow::Error), +} + +impl IndexerTapContext { + pub async fn get_checks( + pgpool: PgPool, + indexer_allocations: Receiver>, + escrow_accounts: Receiver, + timestamp_error_tolerance: Duration, + receipt_max_value: u128, + ) -> Vec { + vec![ + Arc::new(AllocationEligible::new(indexer_allocations)), + Arc::new(SenderBalanceCheck::new(escrow_accounts)), + Arc::new(TimestampCheck::new(timestamp_error_tolerance)), + Arc::new(DenyListCheck::new(pgpool.clone()).await), + Arc::new(ReceiptMaxValueCheck::new(receipt_max_value)), + Arc::new(MinimumValue::new(pgpool, Duration::from_secs(GRACE_PERIOD)).await), + ] + } + + pub async fn new(pgpool: PgPool, domain_separator: Eip712Domain) -> Self { + const MAX_RECEIPT_QUEUE_SIZE: usize = 1000; + let (tx, rx) = mpsc::channel(MAX_RECEIPT_QUEUE_SIZE); + let cancelation_token = CancellationToken::new(); + let inner = InnerContext { pgpool }; + Self::spawn_store_receipt_task(inner, rx, cancelation_token.clone()); + + Self { + cancelation_token, + receipt_producer: tx, + domain_separator: Arc::new(domain_separator), + } + } +} + +impl Drop for IndexerTapContext { + fn drop(&mut self) { + self.cancelation_token.cancel(); + } +} diff --git a/crates/service/src/tap/checks.rs b/crates/service/src/tap/v1/checks.rs similarity index 100% rename from crates/service/src/tap/checks.rs rename to crates/service/src/tap/v1/checks.rs diff --git a/crates/service/src/tap/checks/allocation_eligible.rs b/crates/service/src/tap/v1/checks/allocation_eligible.rs similarity index 100% rename from crates/service/src/tap/checks/allocation_eligible.rs rename to crates/service/src/tap/v1/checks/allocation_eligible.rs diff --git a/crates/service/src/tap/checks/deny_list_check.rs b/crates/service/src/tap/v1/checks/deny_list_check.rs similarity index 100% rename from crates/service/src/tap/checks/deny_list_check.rs rename to crates/service/src/tap/v1/checks/deny_list_check.rs diff --git a/crates/service/src/tap/checks/receipt_max_val_check.rs b/crates/service/src/tap/v1/checks/receipt_max_val_check.rs similarity index 99% rename from crates/service/src/tap/checks/receipt_max_val_check.rs rename to crates/service/src/tap/v1/checks/receipt_max_val_check.rs index e4dbcbd4..428f76ba 100644 --- a/crates/service/src/tap/checks/receipt_max_val_check.rs +++ b/crates/service/src/tap/v1/checks/receipt_max_val_check.rs @@ -47,12 +47,12 @@ mod tests { tap_eip712_domain, }; use thegraph_core::alloy::{ + dyn_abi::Eip712Domain, primitives::{address, Address}, signers::local::{coins_bip39::English, MnemonicBuilder, PrivateKeySigner}, }; use super::*; - use crate::tap::Eip712Domain; fn create_signed_receipt_with_custom_value(value: u128) -> ReceiptWithState { let index: u32 = 0; diff --git a/crates/service/src/tap/checks/sender_balance_check.rs b/crates/service/src/tap/v1/checks/sender_balance_check.rs similarity index 100% rename from crates/service/src/tap/checks/sender_balance_check.rs rename to crates/service/src/tap/v1/checks/sender_balance_check.rs diff --git a/crates/service/src/tap/checks/timestamp_check.rs b/crates/service/src/tap/v1/checks/timestamp_check.rs similarity index 99% rename from crates/service/src/tap/checks/timestamp_check.rs rename to crates/service/src/tap/v1/checks/timestamp_check.rs index fde2fb82..a7cf2c73 100644 --- a/crates/service/src/tap/checks/timestamp_check.rs +++ b/crates/service/src/tap/v1/checks/timestamp_check.rs @@ -57,12 +57,12 @@ mod tests { tap_eip712_domain, }; use thegraph_core::alloy::{ + dyn_abi::Eip712Domain, primitives::{address, Address}, signers::local::{coins_bip39::English, MnemonicBuilder, PrivateKeySigner}, }; use super::TimestampCheck; - use crate::tap::Eip712Domain; fn create_signed_receipt_with_custom_timestamp( timestamp_ns: u64, diff --git a/crates/service/src/tap/checks/value_check.rs b/crates/service/src/tap/v1/checks/value_check.rs similarity index 100% rename from crates/service/src/tap/checks/value_check.rs rename to crates/service/src/tap/v1/checks/value_check.rs diff --git a/crates/service/src/tap/receipt_store.rs b/crates/service/src/tap/v1/receipt_store.rs similarity index 100% rename from crates/service/src/tap/receipt_store.rs rename to crates/service/src/tap/v1/receipt_store.rs diff --git a/crates/service/src/tap/v2.rs b/crates/service/src/tap/v2.rs new file mode 100644 index 00000000..c69a2b50 --- /dev/null +++ b/crates/service/src/tap/v2.rs @@ -0,0 +1,76 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use std::{fmt::Debug, sync::Arc, time::Duration}; + +use indexer_monitor::EscrowAccounts; +use receipt_store::{DatabaseReceipt, InnerContext}; +use sqlx::PgPool; +use tap_core_v2::receipt::checks::ReceiptCheck; +use thegraph_core::alloy::sol_types::Eip712Domain; +use tokio::sync::{ + mpsc::{self, Sender}, + watch::Receiver, +}; +use tokio_util::sync::CancellationToken; + +use checks::{ + deny_list_check::DenyListCheck, receipt_max_val_check::ReceiptMaxValueCheck, + sender_balance_check::SenderBalanceCheck, timestamp_check::TimestampCheck, + value_check::MinimumValue, +}; + +mod checks; +mod receipt_store; + +const GRACE_PERIOD: u64 = 60; + +#[derive(Clone)] +pub struct IndexerTapContext { + domain_separator: Arc, + receipt_producer: Sender, + cancelation_token: CancellationToken, +} + +#[derive(Debug, thiserror::Error)] +pub enum AdapterError { + #[error(transparent)] + AnyhowError(#[from] anyhow::Error), +} + +impl IndexerTapContext { + pub async fn get_checks( + pgpool: PgPool, + escrow_accounts: Receiver, + timestamp_error_tolerance: Duration, + receipt_max_value: u128, + ) -> Vec { + vec![ + Arc::new(SenderBalanceCheck::new(escrow_accounts)), + Arc::new(TimestampCheck::new(timestamp_error_tolerance)), + Arc::new(DenyListCheck::new(pgpool.clone()).await), + Arc::new(ReceiptMaxValueCheck::new(receipt_max_value)), + Arc::new(MinimumValue::new(pgpool, Duration::from_secs(GRACE_PERIOD)).await), + ] + } + + pub async fn new(pgpool: PgPool, domain_separator: Eip712Domain) -> Self { + const MAX_RECEIPT_QUEUE_SIZE: usize = 1000; + let (tx, rx) = mpsc::channel(MAX_RECEIPT_QUEUE_SIZE); + let cancelation_token = CancellationToken::new(); + let inner = InnerContext { pgpool }; + Self::spawn_store_receipt_task(inner, rx, cancelation_token.clone()); + + Self { + cancelation_token, + receipt_producer: tx, + domain_separator: Arc::new(domain_separator), + } + } +} + +impl Drop for IndexerTapContext { + fn drop(&mut self) { + self.cancelation_token.cancel(); + } +} diff --git a/crates/service/src/tap/v2/checks.rs b/crates/service/src/tap/v2/checks.rs new file mode 100644 index 00000000..085ed09f --- /dev/null +++ b/crates/service/src/tap/v2/checks.rs @@ -0,0 +1,8 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +pub mod deny_list_check; +pub mod receipt_max_val_check; +pub mod sender_balance_check; +pub mod timestamp_check; +pub mod value_check; diff --git a/crates/service/src/tap/v2/checks/deny_list_check.rs b/crates/service/src/tap/v2/checks/deny_list_check.rs new file mode 100644 index 00000000..cd5a1a64 --- /dev/null +++ b/crates/service/src/tap/v2/checks/deny_list_check.rs @@ -0,0 +1,290 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use std::{ + collections::HashSet, + str::FromStr, + sync::{Arc, RwLock}, +}; + +use sqlx::{postgres::PgListener, PgPool}; +use tap_core_v2::receipt::{ + checks::{Check, CheckError, CheckResult}, + state::Checking, + ReceiptWithState, +}; +use thegraph_core::alloy::primitives::Address; + +use crate::middleware::Sender; + +pub struct DenyListCheck { + sender_denylist: Arc>>, + _sender_denylist_watcher_handle: Arc>, + sender_denylist_watcher_cancel_token: tokio_util::sync::CancellationToken, + + #[cfg(test)] + notify: std::sync::Arc, +} + +impl DenyListCheck { + pub async fn new(pgpool: PgPool) -> Self { + // Listen to pg_notify events. We start it before updating the sender_denylist so that we + // don't miss any updates. PG will buffer the notifications until we start consuming them. + let mut pglistener = PgListener::connect_with(&pgpool.clone()).await.unwrap(); + pglistener + .listen("scalar_tap_deny_notification") + .await + .expect( + "should be able to subscribe to Postgres Notify events on the channel \ + 'scalar_tap_deny_notification'", + ); + + // Fetch the denylist from the DB + let sender_denylist = Arc::new(RwLock::new(HashSet::new())); + Self::sender_denylist_reload(pgpool.clone(), sender_denylist.clone()) + .await + .expect("should be able to fetch the sender_denylist from the DB on startup"); + + #[cfg(test)] + let notify = std::sync::Arc::new(tokio::sync::Notify::new()); + + let sender_denylist_watcher_cancel_token = tokio_util::sync::CancellationToken::new(); + let sender_denylist_watcher_handle = Arc::new(tokio::spawn(Self::sender_denylist_watcher( + pgpool.clone(), + pglistener, + sender_denylist.clone(), + sender_denylist_watcher_cancel_token.clone(), + #[cfg(test)] + notify.clone(), + ))); + Self { + sender_denylist, + _sender_denylist_watcher_handle: sender_denylist_watcher_handle, + sender_denylist_watcher_cancel_token, + #[cfg(test)] + notify, + } + } + + async fn sender_denylist_reload( + pgpool: PgPool, + denylist_rwlock: Arc>>, + ) -> anyhow::Result<()> { + // Fetch the denylist from the DB + let sender_denylist = sqlx::query!( + r#" + SELECT sender_address FROM scalar_tap_denylist + "# + ) + .fetch_all(&pgpool) + .await? + .iter() + .map(|row| Address::from_str(&row.sender_address)) + .collect::, _>>()?; + + *(denylist_rwlock.write().unwrap()) = sender_denylist; + + Ok(()) + } + + async fn sender_denylist_watcher( + pgpool: PgPool, + mut pglistener: PgListener, + denylist: Arc>>, + cancel_token: tokio_util::sync::CancellationToken, + #[cfg(test)] notify: std::sync::Arc, + ) { + #[derive(serde::Deserialize)] + struct DenylistNotification { + tg_op: String, + sender_address: Address, + } + + loop { + tokio::select! { + _ = cancel_token.cancelled() => { + break; + } + + pg_notification = pglistener.recv() => { + let pg_notification = pg_notification.expect( + "should be able to receive Postgres Notify events on the channel \ + 'scalar_tap_deny_notification'", + ); + + let denylist_notification: DenylistNotification = + serde_json::from_str(pg_notification.payload()).expect( + "should be able to deserialize the Postgres Notify event payload as a \ + DenylistNotification", + ); + + match denylist_notification.tg_op.as_str() { + "INSERT" => { + denylist + .write() + .unwrap() + .insert(denylist_notification.sender_address); + } + "DELETE" => { + denylist + .write() + .unwrap() + .remove(&denylist_notification.sender_address); + } + // UPDATE and TRUNCATE are not expected to happen. Reload the entire denylist. + _ => { + tracing::error!( + "Received an unexpected denylist table notification: {}. Reloading entire \ + denylist.", + denylist_notification.tg_op + ); + + Self::sender_denylist_reload(pgpool.clone(), denylist.clone()) + .await + .expect("should be able to reload the sender denylist") + } + } + #[cfg(test)] + notify.notify_one(); + } + } + } + } +} + +#[async_trait::async_trait] +impl Check for DenyListCheck { + async fn check( + &self, + ctx: &tap_core::receipt::Context, + _: &ReceiptWithState, + ) -> CheckResult { + let Sender(receipt_sender) = ctx + .get::() + .ok_or(CheckError::Failed(anyhow::anyhow!("Could not find sender")))?; + + // Check that the sender is not denylisted + if self + .sender_denylist + .read() + .unwrap() + .contains(receipt_sender) + { + return Err(CheckError::Failed(anyhow::anyhow!( + "Received a receipt from a denylisted sender: {}", + receipt_sender + ))); + } + + Ok(()) + } +} + +impl Drop for DenyListCheck { + fn drop(&mut self) { + // Clean shutdown for the sender_denylist_watcher + // Though since it's not a critical task, we don't wait for it to finish (join). + self.sender_denylist_watcher_cancel_token.cancel(); + } +} + +#[cfg(test)] +mod tests { + use sqlx::PgPool; + use tap_core_v2::receipt::{checks::Check, Context, ReceiptWithState}; + use test_assets::{self, create_signed_receipt_v2, SignedReceiptV2Request, TAP_SENDER}; + use thegraph_core::alloy::hex::ToHexExt; + + use super::*; + + async fn new_deny_list_check(pgpool: PgPool) -> DenyListCheck { + // Mock escrow accounts + DenyListCheck::new(pgpool).await + } + + #[sqlx::test(migrations = "../../migrations")] + async fn test_sender_denylist(pgpool: PgPool) { + // Add the sender to the denylist + sqlx::query!( + r#" + INSERT INTO scalar_tap_denylist (sender_address) + VALUES ($1) + "#, + TAP_SENDER.1.encode_hex() + ) + .execute(&pgpool) + .await + .unwrap(); + + let signed_receipt = + create_signed_receipt_v2(SignedReceiptV2Request::builder().build()).await; + + let deny_list_check = new_deny_list_check(pgpool.clone()).await; + + let checking_receipt = ReceiptWithState::new(signed_receipt); + + let mut ctx = Context::new(); + ctx.insert(Sender(TAP_SENDER.1)); + + // Check that the receipt is rejected + assert!(deny_list_check + .check(&ctx, &checking_receipt) + .await + .is_err()); + } + + #[sqlx::test(migrations = "../../migrations")] + async fn test_sender_denylist_updates(pgpool: PgPool) { + let signed_receipt = + create_signed_receipt_v2(SignedReceiptV2Request::builder().build()).await; + + let deny_list_check = new_deny_list_check(pgpool.clone()).await; + + // Check that the receipt is valid + let checking_receipt = ReceiptWithState::new(signed_receipt); + + let mut ctx = Context::new(); + ctx.insert(Sender(TAP_SENDER.1)); + deny_list_check + .check(&ctx, &checking_receipt) + .await + .unwrap(); + + // Add the sender to the denylist + sqlx::query!( + r#" + INSERT INTO scalar_tap_denylist (sender_address) + VALUES ($1) + "#, + TAP_SENDER.1.encode_hex() + ) + .execute(&pgpool) + .await + .unwrap(); + + deny_list_check.notify.notified().await; + + // Check that the receipt is rejected + assert!(deny_list_check + .check(&ctx, &checking_receipt) + .await + .is_err()); + + // Remove the sender from the denylist + sqlx::query!( + r#" + DELETE FROM scalar_tap_denylist + WHERE sender_address = $1 + "#, + TAP_SENDER.1.encode_hex() + ) + .execute(&pgpool) + .await + .unwrap(); + + deny_list_check.notify.notified().await; + + // Check that the receipt is valid again + assert!(deny_list_check.check(&ctx, &checking_receipt).await.is_ok()); + } +} diff --git a/crates/service/src/tap/v2/checks/receipt_max_val_check.rs b/crates/service/src/tap/v2/checks/receipt_max_val_check.rs new file mode 100644 index 00000000..757f32b5 --- /dev/null +++ b/crates/service/src/tap/v2/checks/receipt_max_val_check.rs @@ -0,0 +1,124 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 +use anyhow::anyhow; + +pub struct ReceiptMaxValueCheck { + receipt_max_value: u128, +} + +use tap_core_v2::receipt::{ + checks::{Check, CheckError, CheckResult}, + state::Checking, + ReceiptWithState, +}; + +impl ReceiptMaxValueCheck { + pub fn new(receipt_max_value: u128) -> Self { + Self { receipt_max_value } + } +} + +#[async_trait::async_trait] +impl Check for ReceiptMaxValueCheck { + async fn check( + &self, + _: &tap_core::receipt::Context, + receipt: &ReceiptWithState, + ) -> CheckResult { + let receipt_value = receipt.signed_receipt().message.value; + + if receipt_value < self.receipt_max_value { + Ok(()) + } else { + Err(CheckError::Failed(anyhow!( + "Receipt value `{}` is higher than the limit set by the user", + receipt_value + ))) + } + } +} +#[cfg(test)] +mod tests { + use std::time::{Duration, SystemTime}; + + use tap_core_v2::{ + receipt::{checks::Check, state::Checking, Context, Receipt, ReceiptWithState}, + signed_message::EIP712SignedMessage, + tap_eip712_domain, + }; + use thegraph_core::alloy::{ + dyn_abi::Eip712Domain, + primitives::{address, Address}, + signers::local::{coins_bip39::English, MnemonicBuilder, PrivateKeySigner}, + }; + + use super::*; + + fn create_signed_receipt_with_custom_value(value: u128) -> ReceiptWithState { + let index: u32 = 0; + let wallet: PrivateKeySigner = MnemonicBuilder::::default() + .phrase("abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about") + .index(index) + .unwrap() + .build() + .unwrap(); + + let eip712_domain_separator: Eip712Domain = + tap_eip712_domain(1, Address::from([0x11u8; 20])); + + let timestamp = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("Time went backwards") + .as_nanos() + + Duration::from_secs(33).as_nanos(); + let timestamp_ns = timestamp as u64; + + let value: u128 = value; + let nonce: u64 = 10; + let receipt = EIP712SignedMessage::new( + &eip712_domain_separator, + Receipt { + nonce, + timestamp_ns, + value, + payer: address!("abababababababababababababababababababab"), + data_service: address!("abababababababababababababababababababab"), + service_provider: address!("abababababababababababababababababababab"), + }, + &wallet, + ) + .unwrap(); + ReceiptWithState::::new(receipt) + } + + const RECEIPT_LIMIT: u128 = 10; + #[tokio::test] + async fn test_receipt_lower_than_limit() { + let signed_receipt = create_signed_receipt_with_custom_value(RECEIPT_LIMIT - 1); + let timestamp_check = ReceiptMaxValueCheck::new(RECEIPT_LIMIT); + assert!(timestamp_check + .check(&Context::new(), &signed_receipt) + .await + .is_ok()); + } + + #[tokio::test] + async fn test_receipt_higher_than_limit() { + let signed_receipt = create_signed_receipt_with_custom_value(RECEIPT_LIMIT + 1); + let timestamp_check = ReceiptMaxValueCheck::new(RECEIPT_LIMIT); + assert!(timestamp_check + .check(&Context::new(), &signed_receipt) + .await + .is_err()); + } + + #[tokio::test] + async fn test_receipt_same_as_limit() { + let signed_receipt = create_signed_receipt_with_custom_value(RECEIPT_LIMIT); + let timestamp_check = ReceiptMaxValueCheck::new(RECEIPT_LIMIT); + assert!(timestamp_check + .check(&Context::new(), &signed_receipt) + .await + .is_err()); + } +} diff --git a/crates/service/src/tap/v2/checks/sender_balance_check.rs b/crates/service/src/tap/v2/checks/sender_balance_check.rs new file mode 100644 index 00000000..d6fd823f --- /dev/null +++ b/crates/service/src/tap/v2/checks/sender_balance_check.rs @@ -0,0 +1,52 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::anyhow; +use indexer_monitor::EscrowAccounts; +use tap_core_v2::receipt::{ + checks::{Check, CheckError, CheckResult}, + state::Checking, + ReceiptWithState, +}; +use thegraph_core::alloy::primitives::U256; +use tokio::sync::watch::Receiver; + +use crate::middleware::Sender; + +pub struct SenderBalanceCheck { + escrow_accounts: Receiver, +} + +impl SenderBalanceCheck { + pub fn new(escrow_accounts: Receiver) -> Self { + Self { escrow_accounts } + } +} + +#[async_trait::async_trait] +impl Check for SenderBalanceCheck { + async fn check( + &self, + ctx: &tap_core::receipt::Context, + _: &ReceiptWithState, + ) -> CheckResult { + let escrow_accounts_snapshot = self.escrow_accounts.borrow(); + + let Sender(receipt_sender) = ctx + .get::() + .ok_or(CheckError::Failed(anyhow::anyhow!("Could not find sender")))?; + + // Check that the sender has a non-zero balance -- more advanced accounting is done in + // `tap-agent`. + if !escrow_accounts_snapshot + .get_balance_for_sender(receipt_sender) + .map_or(false, |balance| balance > U256::ZERO) + { + return Err(CheckError::Failed(anyhow!( + "Receipt sender `{}` does not have a sufficient balance", + receipt_sender, + ))); + } + Ok(()) + } +} diff --git a/crates/service/src/tap/v2/checks/timestamp_check.rs b/crates/service/src/tap/v2/checks/timestamp_check.rs new file mode 100644 index 00000000..0a80e9b0 --- /dev/null +++ b/crates/service/src/tap/v2/checks/timestamp_check.rs @@ -0,0 +1,142 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 +use std::time::{Duration, SystemTime}; + +use anyhow::anyhow; + +pub struct TimestampCheck { + timestamp_error_tolerance: Duration, +} + +use tap_core_v2::receipt::{ + checks::{Check, CheckError, CheckResult}, + state::Checking, + ReceiptWithState, +}; + +impl TimestampCheck { + pub fn new(timestamp_error_tolerance: Duration) -> Self { + Self { + timestamp_error_tolerance, + } + } +} + +#[async_trait::async_trait] +impl Check for TimestampCheck { + async fn check( + &self, + _: &tap_core::receipt::Context, + receipt: &ReceiptWithState, + ) -> CheckResult { + let timestamp_now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .map_err(|e| CheckError::Failed(e.into()))?; + let min_timestamp = timestamp_now - self.timestamp_error_tolerance; + let max_timestamp = timestamp_now + self.timestamp_error_tolerance; + + let receipt_timestamp = Duration::from_nanos(receipt.signed_receipt().message.timestamp_ns); + + if receipt_timestamp < max_timestamp && receipt_timestamp > min_timestamp { + Ok(()) + } else { + Err(CheckError::Failed(anyhow!( + "Receipt timestamp `{}` is outside of current system time +/- timestamp_error_tolerance", + receipt_timestamp.as_secs() + ))) + } + } +} +#[cfg(test)] +mod tests { + use std::time::{Duration, SystemTime}; + + use tap_core_v2::{ + receipt::{checks::Check, state::Checking, Context, Receipt, ReceiptWithState}, + signed_message::EIP712SignedMessage, + tap_eip712_domain, + }; + use thegraph_core::alloy::{ + dyn_abi::Eip712Domain, primitives::{address, Address}, signers::local::{coins_bip39::English, MnemonicBuilder, PrivateKeySigner} + }; + + use super::TimestampCheck; + + fn create_signed_receipt_with_custom_timestamp( + timestamp_ns: u64, + ) -> ReceiptWithState { + let index: u32 = 0; + let wallet: PrivateKeySigner = MnemonicBuilder::::default() + .phrase("abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about") + .index(index) + .unwrap() + .build() + .unwrap(); + let eip712_domain_separator: Eip712Domain = + tap_eip712_domain(1, Address::from([0x11u8; 20])); + let value: u128 = 1234; + let nonce: u64 = 10; + let receipt = EIP712SignedMessage::new( + &eip712_domain_separator, + Receipt { + payer: address!("abababababababababababababababababababab"), + data_service: address!("abababababababababababababababababababab"), + service_provider: address!("abababababababababababababababababababab"), + nonce, + timestamp_ns, + value, + }, + &wallet, + ) + .unwrap(); + ReceiptWithState::::new(receipt) + } + + #[tokio::test] + async fn test_timestamp_inside_tolerance() { + let timestamp = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("Time went backwards") + .as_nanos() + + Duration::from_secs(15).as_nanos(); + let timestamp_ns = timestamp as u64; + let signed_receipt = create_signed_receipt_with_custom_timestamp(timestamp_ns); + let timestamp_check = TimestampCheck::new(Duration::from_secs(30)); + assert!(timestamp_check + .check(&Context::new(), &signed_receipt) + .await + .is_ok()); + } + + #[tokio::test] + async fn test_timestamp_less_than_tolerance() { + let timestamp = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("Time went backwards") + .as_nanos() + + Duration::from_secs(33).as_nanos(); + let timestamp_ns = timestamp as u64; + let signed_receipt = create_signed_receipt_with_custom_timestamp(timestamp_ns); + let timestamp_check = TimestampCheck::new(Duration::from_secs(30)); + assert!(timestamp_check + .check(&Context::new(), &signed_receipt) + .await + .is_err()); + } + + #[tokio::test] + async fn test_timestamp_more_than_tolerance() { + let timestamp = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("Time went backwards") + .as_nanos() + - Duration::from_secs(33).as_nanos(); + let timestamp_ns = timestamp as u64; + let signed_receipt = create_signed_receipt_with_custom_timestamp(timestamp_ns); + let timestamp_check = TimestampCheck::new(Duration::from_secs(30)); + assert!(timestamp_check + .check(&Context::new(), &signed_receipt) + .await + .is_err()); + } +} diff --git a/crates/service/src/tap/v2/checks/value_check.rs b/crates/service/src/tap/v2/checks/value_check.rs new file mode 100644 index 00000000..1f2fa204 --- /dev/null +++ b/crates/service/src/tap/v2/checks/value_check.rs @@ -0,0 +1,615 @@ +// Copyright 2023-, GraphOps and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use std::{ + collections::HashMap, + str::FromStr, + sync::{Arc, RwLock}, + time::{Duration, Instant}, +}; + +use ::cost_model::CostModel; +use anyhow::anyhow; +use bigdecimal::ToPrimitive; +use sqlx::{ + postgres::{PgListener, PgNotification}, + PgPool, +}; +use tap_core_v2::receipt::{ + checks::{Check, CheckError, CheckResult}, + state::Checking, + Context, ReceiptWithState, +}; +use thegraph_core::DeploymentId; + +use crate::{database::cost_model, tap::AgoraQuery}; + +// we only accept receipts with minimal 1 wei grt +const MINIMAL_VALUE: u128 = 1; + +type CostModelMap = Arc>>; +type GlobalModel = Arc>>; +type GracePeriod = Arc>; + +/// Represents the check for minimum for a receipt +/// +/// It contains all information needed in memory to +/// make it as fast as possible +pub struct MinimumValue { + cost_model_map: CostModelMap, + global_model: GlobalModel, + watcher_cancel_token: tokio_util::sync::CancellationToken, + updated_at: GracePeriod, + grace_period: Duration, + + #[cfg(test)] + notify: std::sync::Arc, +} + +struct CostModelWatcher { + pgpool: PgPool, + + cost_models: CostModelMap, + global_model: GlobalModel, + updated_at: GracePeriod, + + #[cfg(test)] + notify: std::sync::Arc, +} + +impl CostModelWatcher { + async fn cost_models_watcher( + pgpool: PgPool, + mut pglistener: PgListener, + cost_models: CostModelMap, + global_model: GlobalModel, + cancel_token: tokio_util::sync::CancellationToken, + grace_period: GracePeriod, + #[cfg(test)] notify: std::sync::Arc, + ) { + let cost_model_watcher = CostModelWatcher { + pgpool, + global_model, + cost_models, + updated_at: grace_period, + #[cfg(test)] + notify, + }; + + loop { + tokio::select! { + _ = cancel_token.cancelled() => { + break; + } + Ok(pg_notification) = pglistener.recv() => { + cost_model_watcher.new_notification( + pg_notification, + ).await; + } + } + } + } + + async fn new_notification(&self, pg_notification: PgNotification) { + let payload = pg_notification.payload(); + let cost_model_notification: Result = + serde_json::from_str(payload); + + match cost_model_notification { + Ok(CostModelNotification::Insert { + deployment, + model, + variables, + }) => self.handle_insert(deployment, model, variables), + Ok(CostModelNotification::Delete { deployment }) => self.handle_delete(deployment), + // UPDATE and TRUNCATE are not expected to happen. Reload the entire cost + // model cache. + Err(_) => self.handle_unexpected_notification(payload).await, + } + #[cfg(test)] + self.notify.notify_one(); + } + + fn handle_insert(&self, deployment: String, model: String, variables: String) { + let model = compile_cost_model(model, variables).unwrap(); + + match deployment.as_str() { + "global" => { + *self.global_model.write().unwrap() = Some(model); + } + deployment_id => match DeploymentId::from_str(deployment_id) { + Ok(deployment_id) => { + let mut cost_model_write = self.cost_models.write().unwrap(); + cost_model_write.insert(deployment_id, model); + } + Err(_) => { + tracing::error!( + "Received insert request for an invalid deployment_id: {}", + deployment_id + ) + } + }, + }; + + *self.updated_at.write().unwrap() = Instant::now(); + } + + fn handle_delete(&self, deployment: String) { + match deployment.as_str() { + "global" => { + *self.global_model.write().unwrap() = None; + } + deployment_id => match DeploymentId::from_str(deployment_id) { + Ok(deployment_id) => { + self.cost_models.write().unwrap().remove(&deployment_id); + } + Err(_) => { + tracing::error!( + "Received delete request for an invalid deployment_id: {}", + deployment_id + ) + } + }, + }; + *self.updated_at.write().unwrap() = Instant::now(); + } + + async fn handle_unexpected_notification(&self, payload: &str) { + tracing::error!( + "Received an unexpected cost model table notification: {}. Reloading entire \ + cost model.", + payload + ); + + MinimumValue::value_check_reload( + &self.pgpool, + self.cost_models.clone(), + self.global_model.clone(), + ) + .await + .expect("should be able to reload cost models"); + + *self.updated_at.write().unwrap() = Instant::now(); + } +} + +impl Drop for MinimumValue { + fn drop(&mut self) { + // Clean shutdown for the minimum value check + // Though since it's not a critical task, we don't wait for it to finish (join). + self.watcher_cancel_token.cancel(); + } +} + +impl MinimumValue { + pub async fn new(pgpool: PgPool, grace_period: Duration) -> Self { + let cost_model_map: CostModelMap = Default::default(); + let global_model: GlobalModel = Default::default(); + let updated_at: GracePeriod = Arc::new(RwLock::new(Instant::now())); + Self::value_check_reload(&pgpool, cost_model_map.clone(), global_model.clone()) + .await + .expect("should be able to reload cost models"); + + let mut pglistener = PgListener::connect_with(&pgpool.clone()).await.unwrap(); + pglistener + .listen("cost_models_update_notification") + .await + .expect( + "should be able to subscribe to Postgres Notify events on the channel \ + 'cost_models_update_notification'", + ); + + #[cfg(test)] + let notify = std::sync::Arc::new(tokio::sync::Notify::new()); + + let watcher_cancel_token = tokio_util::sync::CancellationToken::new(); + tokio::spawn(CostModelWatcher::cost_models_watcher( + pgpool.clone(), + pglistener, + cost_model_map.clone(), + global_model.clone(), + watcher_cancel_token.clone(), + updated_at.clone(), + #[cfg(test)] + notify.clone(), + )); + Self { + global_model, + cost_model_map, + watcher_cancel_token, + updated_at, + grace_period, + #[cfg(test)] + notify, + } + } + + fn inside_grace_period(&self) -> bool { + let time_elapsed = Instant::now().duration_since(*self.updated_at.read().unwrap()); + time_elapsed < self.grace_period + } + + fn expected_value(&self, agora_query: &AgoraQuery) -> anyhow::Result { + // get agora model for the deployment_id + let model = self.cost_model_map.read().unwrap(); + let subgraph_model = model.get(&agora_query.deployment_id); + let global_model = self.global_model.read().unwrap(); + + let expected_value = match (subgraph_model, global_model.as_ref()) { + (Some(model), _) | (_, Some(model)) => model + .cost(&agora_query.query, &agora_query.variables) + .map(|fee| fee.to_u128()) + .ok() + .flatten(), + _ => None, + }; + + Ok(expected_value.unwrap_or(MINIMAL_VALUE)) + } + + async fn value_check_reload( + pgpool: &PgPool, + cost_model_map: CostModelMap, + global_model: GlobalModel, + ) -> anyhow::Result<()> { + let models = sqlx::query!( + r#" + SELECT deployment, model, variables + FROM "CostModels" + WHERE deployment != 'global' + ORDER BY deployment ASC + "# + ) + .fetch_all(pgpool) + .await?; + let models = models + .into_iter() + .flat_map(|record| { + let deployment_id = DeploymentId::from_str(&record.deployment).ok()?; + let model = compile_cost_model( + record.model?, + record.variables.map(|v| v.to_string()).unwrap_or_default(), + ) + .ok()?; + Some((deployment_id, model)) + }) + .collect::>(); + + *cost_model_map.write().unwrap() = models; + + *global_model.write().unwrap() = + cost_model::global_cost_model(pgpool) + .await? + .and_then(|model| { + compile_cost_model( + model.model.unwrap_or_default(), + model.variables.map(|v| v.to_string()).unwrap_or_default(), + ) + .ok() + }); + + Ok(()) + } +} + +#[async_trait::async_trait] +impl Check for MinimumValue { + async fn check(&self, ctx: &Context, receipt: &ReceiptWithState) -> CheckResult { + let agora_query = ctx + .get() + .ok_or(CheckError::Failed(anyhow!("Could not find agora query")))?; + // get value + let value = receipt.signed_receipt().message.value; + + if self.inside_grace_period() && value >= MINIMAL_VALUE { + return Ok(()); + } + + let expected_value = self + .expected_value(agora_query) + .map_err(CheckError::Failed)?; + + let should_accept = value >= expected_value; + + tracing::trace!( + value, + expected_value, + should_accept, + "Evaluating mininum query fee." + ); + + if should_accept { + Ok(()) + } else { + return Err(CheckError::Failed(anyhow!( + "Query receipt does not have the minimum value. Expected value: {}. Received value: {}.", + expected_value, value, + ))); + } + } +} + +fn compile_cost_model(model: String, variables: String) -> anyhow::Result { + if model.len() > (1 << 16) { + return Err(anyhow!("CostModelTooLarge")); + } + let model = CostModel::compile(&model, &variables)?; + Ok(model) +} + +#[derive(serde::Deserialize)] +#[serde(tag = "tg_op")] +enum CostModelNotification { + #[serde(rename = "INSERT")] + Insert { + deployment: String, + model: String, + variables: String, + }, + #[serde(rename = "DELETE")] + Delete { deployment: String }, +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use sqlx::PgPool; + use tap_core_v2::receipt::{checks::Check, Context, ReceiptWithState}; + use test_assets::{create_signed_receipt_v2, flush_messages, SignedReceiptV2Request}; + use tokio::time::sleep; + + use super::{AgoraQuery, MinimumValue}; + use crate::database::cost_model::test::{ + self, add_cost_models, global_cost_model, to_db_models, + }; + + #[sqlx::test(migrations = "../../migrations")] + async fn initialize_check(pgpool: PgPool) { + let check = MinimumValue::new(pgpool, Duration::from_secs(0)).await; + assert_eq!(check.cost_model_map.read().unwrap().len(), 0); + } + + #[sqlx::test(migrations = "../../migrations")] + async fn should_initialize_check_with_models(pgpool: PgPool) { + // insert 2 cost models for different deployment_id + let test_models = test::test_data(); + + add_cost_models(&pgpool, to_db_models(test_models.clone())).await; + + let check = MinimumValue::new(pgpool, Duration::from_secs(0)).await; + assert_eq!(check.cost_model_map.read().unwrap().len(), 2); + + // no global model + assert!(check.global_model.read().unwrap().is_none()); + } + + #[sqlx::test(migrations = "../../migrations")] + async fn should_watch_model_insert(pgpool: PgPool) { + let check = MinimumValue::new(pgpool.clone(), Duration::from_secs(0)).await; + assert_eq!(check.cost_model_map.read().unwrap().len(), 0); + + // insert 2 cost models for different deployment_id + let test_models = test::test_data(); + add_cost_models(&pgpool, to_db_models(test_models.clone())).await; + + flush_messages(&check.notify).await; + + assert_eq!( + check.cost_model_map.read().unwrap().len(), + test_models.len() + ); + } + + #[sqlx::test(migrations = "../../migrations")] + async fn should_watch_model_remove(pgpool: PgPool) { + // insert 2 cost models for different deployment_id + let test_models = test::test_data(); + add_cost_models(&pgpool, to_db_models(test_models.clone())).await; + + let check = MinimumValue::new(pgpool.clone(), Duration::from_secs(0)).await; + assert_eq!(check.cost_model_map.read().unwrap().len(), 2); + + // remove + sqlx::query!(r#"DELETE FROM "CostModels""#) + .execute(&pgpool) + .await + .unwrap(); + + check.notify.notified().await; + + assert_eq!(check.cost_model_map.read().unwrap().len(), 0); + } + + #[sqlx::test(migrations = "../../migrations")] + async fn should_start_global_model(pgpool: PgPool) { + let global_model = global_cost_model(); + add_cost_models(&pgpool, vec![global_model.clone()]).await; + + let check = MinimumValue::new(pgpool.clone(), Duration::from_secs(0)).await; + assert!(check.global_model.read().unwrap().is_some()); + } + + #[sqlx::test(migrations = "../../migrations")] + async fn should_watch_global_model(pgpool: PgPool) { + let check = MinimumValue::new(pgpool.clone(), Duration::from_secs(0)).await; + + let global_model = global_cost_model(); + add_cost_models(&pgpool, vec![global_model.clone()]).await; + + check.notify.notified().await; + + assert!(check.global_model.read().unwrap().is_some()); + } + + #[sqlx::test(migrations = "../../migrations")] + async fn should_remove_global_model(pgpool: PgPool) { + let global_model = global_cost_model(); + add_cost_models(&pgpool, vec![global_model.clone()]).await; + + let check = MinimumValue::new(pgpool.clone(), Duration::from_secs(0)).await; + assert!(check.global_model.read().unwrap().is_some()); + + sqlx::query!(r#"DELETE FROM "CostModels""#) + .execute(&pgpool) + .await + .unwrap(); + + check.notify.notified().await; + + assert_eq!(check.cost_model_map.read().unwrap().len(), 0); + } + + #[sqlx::test(migrations = "../../migrations")] + async fn should_check_minimal_value(pgpool: PgPool) { + // insert cost models for different deployment_id + let test_models = test::test_data(); + + add_cost_models(&pgpool, to_db_models(test_models.clone())).await; + + let grace_period = Duration::from_secs(1); + + let check = MinimumValue::new(pgpool, grace_period).await; + + let deployment_id = test_models[0].deployment; + let mut ctx = Context::new(); + ctx.insert(AgoraQuery { + deployment_id, + query: "query { a(skip: 10), b(bob: 5) }".into(), + variables: "".into(), + }); + + let signed_receipt = + create_signed_receipt_v2(SignedReceiptV2Request::builder().value(0).build()).await; + let receipt = ReceiptWithState::new(signed_receipt); + + assert!( + check.check(&ctx, &receipt).await.is_err(), + "Should deny if value is 0 for any query" + ); + + let signed_receipt = + create_signed_receipt_v2(SignedReceiptV2Request::builder().value(1).build()).await; + let receipt = ReceiptWithState::new(signed_receipt); + assert!( + check.check(&ctx, &receipt).await.is_ok(), + "Should accept if value is more than 0 for any query" + ); + + let deployment_id = test_models[1].deployment; + let mut ctx = Context::new(); + ctx.insert(AgoraQuery { + deployment_id, + query: "query { a(skip: 10), b(bob: 5) }".into(), + variables: "".into(), + }); + let minimal_value = 500000000000000; + + let signed_receipt = create_signed_receipt_v2( + SignedReceiptV2Request::builder() + .value(minimal_value - 1) + .build(), + ) + .await; + + let receipt = ReceiptWithState::new(signed_receipt); + + assert!( + check.check(&ctx, &receipt).await.is_ok(), + "Should accept since its inside grace period " + ); + + sleep(grace_period + Duration::from_millis(10)).await; + + assert!( + check.check(&ctx, &receipt).await.is_err(), + "Should require minimal value" + ); + + let signed_receipt = create_signed_receipt_v2( + SignedReceiptV2Request::builder() + .value(minimal_value) + .build(), + ) + .await; + + let receipt = ReceiptWithState::new(signed_receipt); + check + .check(&ctx, &receipt) + .await + .expect("should accept equals minimal"); + + let signed_receipt = create_signed_receipt_v2( + SignedReceiptV2Request::builder() + .value(minimal_value + 1) + .build(), + ) + .await; + + let receipt = ReceiptWithState::new(signed_receipt); + check + .check(&ctx, &receipt) + .await + .expect("should accept more than minimal"); + } + + #[sqlx::test(migrations = "../../migrations")] + async fn should_check_using_global(pgpool: PgPool) { + // insert cost models for different deployment_id + let test_models = test::test_data(); + let global_model = global_cost_model(); + + add_cost_models(&pgpool, vec![global_model.clone()]).await; + add_cost_models(&pgpool, to_db_models(test_models.clone())).await; + + let check = MinimumValue::new(pgpool, Duration::from_secs(0)).await; + + let deployment_id = test_models[0].deployment; + let mut ctx = Context::new(); + ctx.insert(AgoraQuery { + deployment_id, + query: "query { a(skip: 10), b(bob: 5) }".into(), + variables: "".into(), + }); + + let minimal_global_value = 20000000000000; + + let signed_receipt = create_signed_receipt_v2( + SignedReceiptV2Request::builder() + .value(minimal_global_value - 1) + .build(), + ) + .await; + + let receipt = ReceiptWithState::new(signed_receipt); + + assert!( + check.check(&ctx, &receipt).await.is_err(), + "Should deny less than global" + ); + + let signed_receipt = create_signed_receipt_v2( + SignedReceiptV2Request::builder() + .value(minimal_global_value) + .build(), + ) + .await; + let receipt = ReceiptWithState::new(signed_receipt); + check + .check(&ctx, &receipt) + .await + .expect("should accept equals global"); + + let signed_receipt = create_signed_receipt_v2( + SignedReceiptV2Request::builder() + .value(minimal_global_value + 1) + .build(), + ) + .await; + let receipt = ReceiptWithState::new(signed_receipt); + check + .check(&ctx, &receipt) + .await + .expect("should accept more than global"); + } +} diff --git a/crates/service/src/tap/v2/receipt_store.rs b/crates/service/src/tap/v2/receipt_store.rs new file mode 100644 index 00000000..202fc12b --- /dev/null +++ b/crates/service/src/tap/v2/receipt_store.rs @@ -0,0 +1,171 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::anyhow; +use bigdecimal::num_bigint::BigInt; +use sqlx::{types::BigDecimal, PgPool}; +use tap_core_v2::{ + manager::adapters::ReceiptStore, + receipt::{state::Checking, ReceiptWithState}, +}; +use thegraph_core::alloy::{hex::ToHexExt, sol_types::Eip712Domain}; +use tokio::{sync::mpsc::Receiver, task::JoinHandle}; +use tokio_util::sync::CancellationToken; + +use super::{AdapterError, IndexerTapContext}; + +#[derive(Clone)] +pub struct InnerContext { + pub pgpool: PgPool, +} + +impl InnerContext { + async fn store_receipts(&self, receipts: Vec) -> Result<(), AdapterError> { + let receipts_len = receipts.len(); + let mut signers = Vec::with_capacity(receipts_len); + let mut signatures = Vec::with_capacity(receipts_len); + let mut payers = Vec::with_capacity(receipts_len); + let mut data_services = Vec::with_capacity(receipts_len); + let mut service_providers = Vec::with_capacity(receipts_len); + let mut timestamps = Vec::with_capacity(receipts_len); + let mut nonces = Vec::with_capacity(receipts_len); + let mut values = Vec::with_capacity(receipts_len); + + for receipt in receipts { + signers.push(receipt.signer_address); + signatures.push(receipt.signature); + payers.push(receipt.payer); + data_services.push(receipt.data_service); + service_providers.push(receipt.service_provider); + timestamps.push(receipt.timestamp_ns); + nonces.push(receipt.nonce); + values.push(receipt.value); + } + sqlx::query!( + r#"INSERT INTO tap_v2_receipts ( + signer_address, + signature, + payer, + data_service, + service_provider, + timestamp_ns, + nonce, + value + ) SELECT * FROM UNNEST( + $1::CHAR(40)[], + $2::BYTEA[], + $3::CHAR(40)[], + $4::CHAR(40)[], + $5::CHAR(40)[], + $6::NUMERIC(20)[], + $7::NUMERIC(20)[], + $8::NUMERIC(40)[] + )"#, + &signers, + &signatures, + &payers, + &data_services, + &service_providers, + ×tamps, + &nonces, + &values, + ) + .execute(&self.pgpool) + .await + .map_err(|e| { + tracing::error!("Failed to store receipt: {}", e); + anyhow!(e) + })?; + + Ok(()) + } +} + +impl IndexerTapContext { + pub fn spawn_store_receipt_task( + inner_context: InnerContext, + mut receiver: Receiver, + cancelation_token: CancellationToken, + ) -> JoinHandle<()> { + const BUFFER_SIZE: usize = 100; + tokio::spawn(async move { + loop { + let mut buffer = Vec::with_capacity(BUFFER_SIZE); + tokio::select! { + biased; + _ = receiver.recv_many(&mut buffer, BUFFER_SIZE) => { + if let Err(e) = inner_context.store_receipts(buffer).await { + tracing::error!("Failed to store receipts: {}", e); + } + } + _ = cancelation_token.cancelled() => { break }, + } + } + }) + } +} + +#[async_trait::async_trait] +impl ReceiptStore for IndexerTapContext { + type AdapterError = AdapterError; + + async fn store_receipt( + &self, + receipt: ReceiptWithState, + ) -> Result { + let db_receipt = DatabaseReceipt::from_receipt(receipt, &self.domain_separator)?; + self.receipt_producer.send(db_receipt).await.map_err(|e| { + tracing::error!("Failed to queue receipt for storage: {}", e); + anyhow!(e) + })?; + + // We don't need receipt_ids + Ok(0) + } +} + +pub struct DatabaseReceipt { + signer_address: String, + signature: Vec, + payer: String, + data_service: String, + service_provider: String, + timestamp_ns: BigDecimal, + nonce: BigDecimal, + value: BigDecimal, +} + +impl DatabaseReceipt { + fn from_receipt( + receipt: ReceiptWithState, + separator: &Eip712Domain, + ) -> anyhow::Result { + let receipt = receipt.signed_receipt(); + let payer = receipt.message.payer.encode_hex(); + let data_service = receipt.message.data_service.encode_hex(); + let service_provider = receipt.message.service_provider.encode_hex(); + let signature = receipt.signature.as_bytes().to_vec(); + + let signer_address = receipt + .recover_signer(separator) + .map_err(|e| { + tracing::error!("Failed to recover receipt signer: {}", e); + anyhow!(e) + })? + .encode_hex(); + + let timestamp_ns = BigDecimal::from(receipt.message.timestamp_ns); + let nonce = BigDecimal::from(receipt.message.nonce); + let value = BigDecimal::from(BigInt::from(receipt.message.value)); + Ok(Self { + nonce, + signature, + signer_address, + timestamp_ns, + value, + payer, + data_service, + service_provider, + }) + } +} diff --git a/crates/service/tests/router_test.rs b/crates/service/tests/router_test.rs index 34ea66c3..eb2099b1 100644 --- a/crates/service/tests/router_test.rs +++ b/crates/service/tests/router_test.rs @@ -14,7 +14,7 @@ use indexer_service_rs::{ use reqwest::{Method, StatusCode, Url}; use sqlx::PgPool; use test_assets::{ - create_signed_receipt, SignedReceiptRequest, INDEXER_ALLOCATIONS, TAP_EIP712_DOMAIN, + create_signed_receipt, create_signed_receipt_v2, SignedReceiptRequest, SignedReceiptV2Request, INDEXER_ALLOCATIONS, TAP_EIP712_DOMAIN }; use thegraph_core::alloy::primitives::Address; use tokio::sync::watch; @@ -110,6 +110,7 @@ async fn full_integration_test(database: PgPool) { let res = String::from_utf8(bytes.into()).unwrap(); insta::assert_snapshot!(res); + // v1 receipt let receipt = create_signed_receipt( SignedReceiptRequest::builder() .allocation_id(allocation.id) @@ -140,6 +141,38 @@ async fn full_integration_test(database: PgPool) { insta::assert_snapshot!(res); + // v2 receipt + let receipt = create_signed_receipt_v2( + SignedReceiptV2Request::builder() + .value(100) + .build(), + ) + .await; + + let query = QueryBody { + query: "query".into(), + variables: None, + }; + + let request = Request::builder() + .method(Method::POST) + .uri(format!("/subgraphs/id/{deployment}")) + .header(TapReceipt::name(), serde_json::to_string(&receipt).unwrap()) + .body(serde_json::to_string(&query).unwrap()) + .unwrap(); + + // with deployment + let res = app.call(request).await.unwrap(); + assert_eq!(res.status(), StatusCode::OK); + + let graphql_response = res.into_body(); + let bytes = to_bytes(graphql_response, usize::MAX).await.unwrap(); + let res = String::from_utf8(bytes.into()).unwrap(); + + insta::assert_snapshot!(res); + + + let request = Request::builder() .method(Method::POST) .uri(format!("/subgraphs/id/{deployment}")) diff --git a/crates/service/tests/snapshots/router_test__full_integration_test-3.snap b/crates/service/tests/snapshots/router_test__full_integration_test-3.snap index 2c5ec1f8..bfa8bc41 100644 --- a/crates/service/tests/snapshots/router_test__full_integration_test-3.snap +++ b/crates/service/tests/snapshots/router_test__full_integration_test-3.snap @@ -3,4 +3,4 @@ source: crates/service/tests/router_test.rs expression: res snapshot_kind: text --- -{"message":"No Tap receipt was found in the request"} +{"graphQLResponse":"\n {\n \"data\": {\n \"graphNetwork\": {\n \"currentEpoch\": 960\n }\n }\n }\n ","attestation":null} diff --git a/crates/service/tests/snapshots/router_test__full_integration_test-4.snap b/crates/service/tests/snapshots/router_test__full_integration_test-4.snap new file mode 100644 index 00000000..2c5ec1f8 --- /dev/null +++ b/crates/service/tests/snapshots/router_test__full_integration_test-4.snap @@ -0,0 +1,6 @@ +--- +source: crates/service/tests/router_test.rs +expression: res +snapshot_kind: text +--- +{"message":"No Tap receipt was found in the request"} diff --git a/crates/tap-agent/src/agent.rs b/crates/tap-agent/src/agent.rs index f570241a..5b00c42b 100644 --- a/crates/tap-agent/src/agent.rs +++ b/crates/tap-agent/src/agent.rs @@ -1,141 +1,5 @@ // Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use indexer_config::{ - Config, EscrowSubgraphConfig, GraphNodeConfig, IndexerConfig, NetworkSubgraphConfig, - SubgraphConfig, SubgraphsConfig, TapConfig, -}; -use indexer_monitor::{escrow_accounts, indexer_allocations, DeploymentDetails, SubgraphClient}; -use ractor::{concurrency::JoinHandle, Actor, ActorRef}; -use sender_account::SenderAccountConfig; -use sender_accounts_manager::SenderAccountsManager; - -use crate::{ - agent::sender_accounts_manager::{SenderAccountsManagerArgs, SenderAccountsManagerMessage}, - database, CONFIG, EIP_712_DOMAIN, -}; - -pub mod sender_account; -pub mod sender_accounts_manager; -pub mod sender_allocation; -pub mod unaggregated_receipts; - -pub async fn start_agent() -> (ActorRef, JoinHandle<()>) { - let Config { - indexer: IndexerConfig { - indexer_address, .. - }, - graph_node: - GraphNodeConfig { - status_url: graph_node_status_endpoint, - query_url: graph_node_query_endpoint, - }, - database, - subgraphs: - SubgraphsConfig { - network: - NetworkSubgraphConfig { - config: - SubgraphConfig { - query_url: network_query_url, - query_auth_token: network_query_auth_token, - deployment_id: network_deployment_id, - syncing_interval_secs: network_sync_interval, - }, - recently_closed_allocation_buffer_secs: recently_closed_allocation_buffer, - }, - escrow: - EscrowSubgraphConfig { - config: - SubgraphConfig { - query_url: escrow_query_url, - query_auth_token: escrow_query_auth_token, - deployment_id: escrow_deployment_id, - syncing_interval_secs: escrow_sync_interval, - }, - }, - }, - tap: - TapConfig { - // TODO: replace with a proper implementation once the gateway registry contract is ready - sender_aggregator_endpoints, - .. - }, - .. - } = &*CONFIG; - let pgpool = database::connect(database.clone()).await; - - let http_client = reqwest::Client::new(); - - let network_subgraph = Box::leak(Box::new( - SubgraphClient::new( - http_client.clone(), - network_deployment_id.map(|deployment| { - DeploymentDetails::for_graph_node_url( - graph_node_status_endpoint.clone(), - graph_node_query_endpoint.clone(), - deployment, - ) - }), - DeploymentDetails::for_query_url_with_token( - network_query_url.clone(), - network_query_auth_token.clone(), - ), - ) - .await, - )); - - let indexer_allocations = indexer_allocations( - network_subgraph, - *indexer_address, - *network_sync_interval, - *recently_closed_allocation_buffer, - ) - .await - .expect("Failed to initialize indexer_allocations watcher"); - - let escrow_subgraph = Box::leak(Box::new( - SubgraphClient::new( - http_client.clone(), - escrow_deployment_id.map(|deployment| { - DeploymentDetails::for_graph_node_url( - graph_node_status_endpoint.clone(), - graph_node_query_endpoint.clone(), - deployment, - ) - }), - DeploymentDetails::for_query_url_with_token( - escrow_query_url.clone(), - escrow_query_auth_token.clone(), - ), - ) - .await, - )); - - let escrow_accounts = escrow_accounts( - escrow_subgraph, - *indexer_address, - *escrow_sync_interval, - false, - ) - .await - .expect("Error creating escrow_accounts channel"); - - let config = Box::leak(Box::new(SenderAccountConfig::from_config(&CONFIG))); - - let args = SenderAccountsManagerArgs { - config, - domain_separator: EIP_712_DOMAIN.clone(), - pgpool, - indexer_allocations, - escrow_accounts, - escrow_subgraph, - network_subgraph, - sender_aggregator_endpoints: sender_aggregator_endpoints.clone(), - prefix: None, - }; - - SenderAccountsManager::spawn(None, SenderAccountsManager, args) - .await - .expect("Failed to start sender accounts manager actor.") -} +pub mod v1; +pub mod v2; diff --git a/crates/tap-agent/src/agent/v1.rs b/crates/tap-agent/src/agent/v1.rs new file mode 100644 index 00000000..817bda9a --- /dev/null +++ b/crates/tap-agent/src/agent/v1.rs @@ -0,0 +1,140 @@ +// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use indexer_config::{ + Config, EscrowSubgraphConfig, GraphNodeConfig, IndexerConfig, NetworkSubgraphConfig, + SubgraphConfig, SubgraphsConfig, TapConfig, +}; +use indexer_monitor::{escrow_accounts, indexer_allocations, DeploymentDetails, SubgraphClient}; +use ractor::{concurrency::JoinHandle, Actor, ActorRef}; +use sender_account::SenderAccountConfig; +use sender_accounts_manager::SenderAccountsManager; + +use crate::{ + agent::v1::sender_accounts_manager::{SenderAccountsManagerArgs, SenderAccountsManagerMessage}, + database, CONFIG, EIP_712_DOMAIN, +}; + +pub mod sender_account; +pub mod sender_accounts_manager; +pub mod sender_allocation; + +pub async fn start_agent() -> (ActorRef, JoinHandle<()>) { + let Config { + indexer: IndexerConfig { + indexer_address, .. + }, + graph_node: + GraphNodeConfig { + status_url: graph_node_status_endpoint, + query_url: graph_node_query_endpoint, + }, + database, + subgraphs: + SubgraphsConfig { + network: + NetworkSubgraphConfig { + config: + SubgraphConfig { + query_url: network_query_url, + query_auth_token: network_query_auth_token, + deployment_id: network_deployment_id, + syncing_interval_secs: network_sync_interval, + }, + recently_closed_allocation_buffer_secs: recently_closed_allocation_buffer, + }, + escrow: + EscrowSubgraphConfig { + config: + SubgraphConfig { + query_url: escrow_query_url, + query_auth_token: escrow_query_auth_token, + deployment_id: escrow_deployment_id, + syncing_interval_secs: escrow_sync_interval, + }, + }, + }, + tap: + TapConfig { + // TODO: replace with a proper implementation once the gateway registry contract is ready + sender_aggregator_endpoints, + .. + }, + .. + } = &*CONFIG; + let pgpool = database::connect(database.clone()).await; + + let http_client = reqwest::Client::new(); + + let network_subgraph = Box::leak(Box::new( + SubgraphClient::new( + http_client.clone(), + network_deployment_id.map(|deployment| { + DeploymentDetails::for_graph_node_url( + graph_node_status_endpoint.clone(), + graph_node_query_endpoint.clone(), + deployment, + ) + }), + DeploymentDetails::for_query_url_with_token( + network_query_url.clone(), + network_query_auth_token.clone(), + ), + ) + .await, + )); + + let indexer_allocations = indexer_allocations( + network_subgraph, + *indexer_address, + *network_sync_interval, + *recently_closed_allocation_buffer, + ) + .await + .expect("Failed to initialize indexer_allocations watcher"); + + let escrow_subgraph = Box::leak(Box::new( + SubgraphClient::new( + http_client.clone(), + escrow_deployment_id.map(|deployment| { + DeploymentDetails::for_graph_node_url( + graph_node_status_endpoint.clone(), + graph_node_query_endpoint.clone(), + deployment, + ) + }), + DeploymentDetails::for_query_url_with_token( + escrow_query_url.clone(), + escrow_query_auth_token.clone(), + ), + ) + .await, + )); + + let escrow_accounts = escrow_accounts( + escrow_subgraph, + *indexer_address, + *escrow_sync_interval, + false, + ) + .await + .expect("Error creating escrow_accounts channel"); + + let config = Box::leak(Box::new(SenderAccountConfig::from_config(&CONFIG))); + + let args = SenderAccountsManagerArgs { + config, + domain_separator: EIP_712_DOMAIN.clone(), + pgpool, + indexer_allocations, + escrow_accounts, + escrow_subgraph, + network_subgraph, + sender_aggregator_endpoints: sender_aggregator_endpoints.clone(), + prefix: None, + }; + + SenderAccountsManager::spawn(None, SenderAccountsManager, args) + .await + .expect("Failed to start sender accounts manager actor.") +} diff --git a/crates/tap-agent/src/agent/sender_account.rs b/crates/tap-agent/src/agent/v1/sender_account.rs similarity index 99% rename from crates/tap-agent/src/agent/sender_account.rs rename to crates/tap-agent/src/agent/v1/sender_account.rs index 26eef826..2be18ddb 100644 --- a/crates/tap-agent/src/agent/sender_account.rs +++ b/crates/tap-agent/src/agent/v1/sender_account.rs @@ -35,9 +35,9 @@ use super::sender_allocation::{ }; use crate::{ adaptative_concurrency::AdaptiveLimiter, - agent::unaggregated_receipts::UnaggregatedReceipts, backoff::BackoffInfo, tracker::{SenderFeeTracker, SimpleFeeTracker}, + unaggregated_receipts::UnaggregatedReceipts, }; lazy_static! { @@ -1063,15 +1063,13 @@ pub mod tests { use super::{SenderAccount, SenderAccountArgs, SenderAccountMessage}; use crate::{ - agent::{ - sender_account::ReceiptFees, sender_allocation::SenderAllocationMessage, - unaggregated_receipts::UnaggregatedReceipts, - }, + agent::v1::{sender_account::ReceiptFees, sender_allocation::SenderAllocationMessage}, assert_not_triggered, assert_triggered, test::{ actors::{create_mock_sender_allocation, MockSenderAllocation, TestableActor}, create_rav, store_rav_with_options, INDEXER, TAP_EIP712_DOMAIN_SEPARATOR, }, + unaggregated_receipts::UnaggregatedReceipts, }; // we implement the PartialEq and Eq traits for SenderAccountMessage to be able to compare diff --git a/crates/tap-agent/src/agent/sender_accounts_manager.rs b/crates/tap-agent/src/agent/v1/sender_accounts_manager.rs similarity index 99% rename from crates/tap-agent/src/agent/sender_accounts_manager.rs rename to crates/tap-agent/src/agent/v1/sender_accounts_manager.rs index 615633fd..06efbc70 100644 --- a/crates/tap-agent/src/agent/sender_accounts_manager.rs +++ b/crates/tap-agent/src/agent/v1/sender_accounts_manager.rs @@ -28,7 +28,7 @@ use tokio::{ use super::sender_account::{ SenderAccount, SenderAccountArgs, SenderAccountConfig, SenderAccountMessage, }; -use crate::{agent::sender_allocation::SenderAllocationMessage, lazy_static}; +use crate::{agent::v1::sender_allocation::SenderAllocationMessage, lazy_static}; lazy_static! { static ref RECEIPTS_CREATED: CounterVec = register_counter_vec!( @@ -618,7 +618,7 @@ mod tests { SenderAccountsManagerMessage, State, }; use crate::{ - agent::{ + agent::v1::{ sender_account::{tests::PREFIX_ID, SenderAccountConfig, SenderAccountMessage}, sender_accounts_manager::{handle_notification, NewReceiptNotification}, }, diff --git a/crates/tap-agent/src/agent/sender_allocation.rs b/crates/tap-agent/src/agent/v1/sender_allocation.rs similarity index 99% rename from crates/tap-agent/src/agent/sender_allocation.rs rename to crates/tap-agent/src/agent/v1/sender_allocation.rs index decb5c4c..ad21cf78 100644 --- a/crates/tap-agent/src/agent/sender_allocation.rs +++ b/crates/tap-agent/src/agent/v1/sender_allocation.rs @@ -30,10 +30,9 @@ use tokio::sync::watch::Receiver; use super::sender_account::SenderAccountConfig; use crate::{ - agent::{ + agent::v1::{ sender_account::{ReceiptFees, SenderAccountMessage}, sender_accounts_manager::NewReceiptNotification, - unaggregated_receipts::UnaggregatedReceipts, }, lazy_static, tap::{ @@ -43,6 +42,7 @@ use crate::{ }, signers_trimmed, }, + unaggregated_receipts::UnaggregatedReceipts, }; lazy_static! { @@ -899,16 +899,16 @@ pub mod tests { SenderAllocation, SenderAllocationArgs, SenderAllocationMessage, SenderAllocationState, }; use crate::{ - agent::{ + agent::v1::{ sender_account::{ReceiptFees, SenderAccountMessage}, sender_accounts_manager::NewReceiptNotification, - unaggregated_receipts::UnaggregatedReceipts, }, test::{ actors::{create_mock_sender_account, TestableActor}, create_rav, create_received_receipt, store_invalid_receipt, store_rav, store_receipt, INDEXER, }, + unaggregated_receipts::UnaggregatedReceipts, }; const DUMMY_URL: &str = "http://localhost:1234"; @@ -1258,8 +1258,7 @@ pub mod tests { )); // Stop the TAP aggregator server. - handle.stop().unwrap(); - handle.stopped().await; + handle.abort(); } #[sqlx::test(migrations = "../../migrations")] diff --git a/crates/tap-agent/src/agent/v2.rs b/crates/tap-agent/src/agent/v2.rs new file mode 100644 index 00000000..e69de29b diff --git a/crates/tap-agent/src/agent/v2/manager.rs b/crates/tap-agent/src/agent/v2/manager.rs new file mode 100644 index 00000000..e69de29b diff --git a/crates/tap-agent/src/lib.rs b/crates/tap-agent/src/lib.rs index d0f6452e..eea6021f 100644 --- a/crates/tap-agent/src/lib.rs +++ b/crates/tap-agent/src/lib.rs @@ -22,6 +22,7 @@ pub mod database; pub mod metrics; pub mod tap; pub mod tracker; +pub mod unaggregated_receipts; #[cfg(test)] pub mod test; diff --git a/crates/tap-agent/src/main.rs b/crates/tap-agent/src/main.rs index 3627af8a..fffafc4f 100644 --- a/crates/tap-agent/src/main.rs +++ b/crates/tap-agent/src/main.rs @@ -10,7 +10,7 @@ async fn main() -> anyhow::Result<()> { // Parse basic configurations, also initializes logging. lazy_static::initialize(&CONFIG); - let (manager, handler) = agent::start_agent().await; + let (manager, handler) = agent::v1::start_agent().await; tracing::info!("TAP Agent started."); tokio::spawn(metrics::run_server(CONFIG.metrics.port)); diff --git a/crates/tap-agent/src/test.rs b/crates/tap-agent/src/test.rs index 8413198d..fb33dde9 100644 --- a/crates/tap-agent/src/test.rs +++ b/crates/tap-agent/src/test.rs @@ -184,10 +184,12 @@ pub mod actors { use tokio::sync::{mpsc, watch, Notify}; use super::create_rav; - use crate::agent::{ - sender_account::{ReceiptFees, SenderAccountMessage}, - sender_accounts_manager::NewReceiptNotification, - sender_allocation::SenderAllocationMessage, + use crate::{ + agent::v1::{ + sender_account::{ReceiptFees, SenderAccountMessage}, + sender_accounts_manager::NewReceiptNotification, + sender_allocation::SenderAllocationMessage, + }, unaggregated_receipts::UnaggregatedReceipts, }; diff --git a/crates/tap-agent/src/tracker.rs b/crates/tap-agent/src/tracker.rs index 7af7fde5..273f1a00 100644 --- a/crates/tap-agent/src/tracker.rs +++ b/crates/tap-agent/src/tracker.rs @@ -14,7 +14,7 @@ mod tracker_tests; pub use generic_tracker::GlobalFeeTracker; -use crate::agent::unaggregated_receipts::UnaggregatedReceipts; +use crate::unaggregated_receipts::UnaggregatedReceipts; pub type SimpleFeeTracker = GenericTracker; pub type SenderFeeTracker = diff --git a/crates/tap-agent/src/tracker/generic_tracker.rs b/crates/tap-agent/src/tracker/generic_tracker.rs index d81a392b..778635d6 100644 --- a/crates/tap-agent/src/tracker/generic_tracker.rs +++ b/crates/tap-agent/src/tracker/generic_tracker.rs @@ -12,7 +12,7 @@ use thegraph_core::alloy::primitives::Address; use super::{ global_tracker::GlobalTracker, AllocationStats, DefaultFromExtra, DurationInfo, SenderFeeStats, }; -use crate::agent::unaggregated_receipts::UnaggregatedReceipts; +use crate::unaggregated_receipts::UnaggregatedReceipts; #[derive(Debug, Clone, Copy, Default, PartialEq)] pub struct GlobalFeeTracker { diff --git a/crates/tap-agent/src/tracker/global_tracker.rs b/crates/tap-agent/src/tracker/global_tracker.rs index a29df32d..e4d148b3 100644 --- a/crates/tap-agent/src/tracker/global_tracker.rs +++ b/crates/tap-agent/src/tracker/global_tracker.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use super::GlobalFeeTracker; -use crate::agent::unaggregated_receipts::UnaggregatedReceipts; +use crate::unaggregated_receipts::UnaggregatedReceipts; pub trait GlobalTracker: Sized { fn get_total_fee(&self) -> u128; diff --git a/crates/tap-agent/src/tracker/sender_fee_stats.rs b/crates/tap-agent/src/tracker/sender_fee_stats.rs index dcef47dd..34a5769b 100644 --- a/crates/tap-agent/src/tracker/sender_fee_stats.rs +++ b/crates/tap-agent/src/tracker/sender_fee_stats.rs @@ -7,7 +7,7 @@ use std::{ }; use super::{AllocationStats, DefaultFromExtra, DurationInfo}; -use crate::{agent::unaggregated_receipts::UnaggregatedReceipts, backoff::BackoffInfo}; +use crate::{backoff::BackoffInfo, unaggregated_receipts::UnaggregatedReceipts}; #[derive(Debug, Clone, Default)] pub struct SenderFeeStats { diff --git a/crates/tap-agent/src/tracker/tracker_tests.rs b/crates/tap-agent/src/tracker/tracker_tests.rs index 9904c7ed..2d774523 100644 --- a/crates/tap-agent/src/tracker/tracker_tests.rs +++ b/crates/tap-agent/src/tracker/tracker_tests.rs @@ -9,7 +9,7 @@ use std::{ use thegraph_core::alloy::primitives::address; use super::SimpleFeeTracker; -use crate::{agent::unaggregated_receipts::UnaggregatedReceipts, tracker::SenderFeeTracker}; +use crate::{tracker::SenderFeeTracker, unaggregated_receipts::UnaggregatedReceipts}; #[test] fn test_allocation_id_tracker() { diff --git a/crates/tap-agent/src/agent/unaggregated_receipts.rs b/crates/tap-agent/src/unaggregated_receipts.rs similarity index 100% rename from crates/tap-agent/src/agent/unaggregated_receipts.rs rename to crates/tap-agent/src/unaggregated_receipts.rs diff --git a/crates/test-assets/Cargo.toml b/crates/test-assets/Cargo.toml index 568c14b9..c0ecc1c3 100644 --- a/crates/test-assets/Cargo.toml +++ b/crates/test-assets/Cargo.toml @@ -8,6 +8,7 @@ indexer-allocation = { path = "../allocation" } bip39 = "2.0.0" lazy_static.workspace = true tap_core.workspace = true +tap_core_v2.workspace = true thegraph-core.workspace = true typed-builder.workspace = true tokio.workspace = true diff --git a/crates/test-assets/src/lib.rs b/crates/test-assets/src/lib.rs index a475d3fa..b8bd10ae 100644 --- a/crates/test-assets/src/lib.rs +++ b/crates/test-assets/src/lib.rs @@ -15,6 +15,11 @@ use tap_core::{ signed_message::EIP712SignedMessage, tap_eip712_domain, }; +use tap_core_v2::{ + receipt::{Receipt as ReceiptV2, SignedReceipt as SignedReceiptV2}, + signed_message::EIP712SignedMessage as EIP712SignedMessageV2, + // tap_eip712_domain as tap_eip712_domain_v2, +}; use thegraph_core::{ alloy::{ primitives::{address, Address, U256}, @@ -353,6 +358,49 @@ pub async fn create_signed_receipt( .unwrap() } +#[derive(TypedBuilder)] +pub struct SignedReceiptV2Request { + #[builder(default = Address::ZERO)] + data_service: Address, + #[builder(default = Address::ZERO)] + service_provider: Address, + #[builder(default)] + nonce: u64, + #[builder(default_code = r#"SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_nanos() as u64"#)] + timestamp_ns: u64, + #[builder(default = 1)] + value: u128, +} + +pub async fn create_signed_receipt_v2( + SignedReceiptV2Request { + data_service, + service_provider, + nonce, + timestamp_ns, + value, + }: SignedReceiptV2Request, +) -> SignedReceiptV2 { + let (wallet, payer) = &*self::TAP_SIGNER; + + EIP712SignedMessageV2::new( + &self::TAP_EIP712_DOMAIN, + ReceiptV2 { + payer: *payer, + data_service, + service_provider, + nonce, + timestamp_ns, + value, + }, + wallet, + ) + .unwrap() +} + pub async fn flush_messages(notify: &Notify) { loop { if tokio::time::timeout(Duration::from_millis(10), notify.notified()) diff --git a/migrations/20250102201717_add_tap_v2_receipts.down.sql b/migrations/20250102201717_add_tap_v2_receipts.down.sql new file mode 100644 index 00000000..f95f22af --- /dev/null +++ b/migrations/20250102201717_add_tap_v2_receipts.down.sql @@ -0,0 +1,2 @@ +-- Add down migration script here +DROP TABLE IF EXISTS tap_v2_receipts CASCADE; diff --git a/migrations/20250102201717_add_tap_v2_receipts.up.sql b/migrations/20250102201717_add_tap_v2_receipts.up.sql new file mode 100644 index 00000000..0684efda --- /dev/null +++ b/migrations/20250102201717_add_tap_v2_receipts.up.sql @@ -0,0 +1,15 @@ +-- Add up migration script here + +CREATE TABLE IF NOT EXISTS tap_v2_receipts ( + id BIGSERIAL PRIMARY KEY, -- id being SERIAL is important for the function of tap-agent + signer_address CHAR(40) NOT NULL, + + -- Values below are the individual fields of the EIP-712 receipt + signature BYTEA NOT NULL, + payer CHAR(40) NOT NULL, + data_service CHAR(40) NOT NULL, + service_provider CHAR(40) NOT NULL, + timestamp_ns NUMERIC(20) NOT NULL, + nonce NUMERIC(20) NOT NULL, + value NUMERIC(39) NOT NULL +); diff --git a/migrations/20250108103554_add_tap_v2_ravs.down.sql b/migrations/20250108103554_add_tap_v2_ravs.down.sql new file mode 100644 index 00000000..4dc9ce66 --- /dev/null +++ b/migrations/20250108103554_add_tap_v2_ravs.down.sql @@ -0,0 +1,4 @@ +-- Add down migration script here + +DROP TABLE IF EXISTS tap_v2_ravs CASCADE; +DROP TABLE IF EXISTS tap_v2_rav_requests_failed CASCADE; diff --git a/migrations/20250108103554_add_tap_v2_ravs.up.sql b/migrations/20250108103554_add_tap_v2_ravs.up.sql new file mode 100644 index 00000000..f49181f6 --- /dev/null +++ b/migrations/20250108103554_add_tap_v2_ravs.up.sql @@ -0,0 +1,32 @@ +-- Add up migration script here +CREATE TABLE IF NOT EXISTS tap_v2_ravs ( + -- Values below are the individual fields of the EIP-712 RAV + signature BYTEA NOT NULL, + + payer CHAR(40) NOT NULL, + data_service CHAR(40) NOT NULL, + service_provider CHAR(40) NOT NULL, + + timestamp_ns NUMERIC(20) NOT NULL, + value_aggregate NUMERIC(39) NOT NULL, + + PRIMARY KEY (payer, data_service), + + -- To make indexer-agent's sequelize happy + created_at TIMESTAMP WITH TIME ZONE, + updated_at TIMESTAMP WITH TIME ZONE +); + +-- This table is used to store failed RAV requests. +-- Used for logging and debugging purposes. +CREATE TABLE IF NOT EXISTS tap_v2_rav_requests_failed ( + id BIGSERIAL PRIMARY KEY, + + payer CHAR(40) NOT NULL, + data_service CHAR(40) NOT NULL, + service_provider CHAR(40) NOT NULL, + + expected_rav JSON NOT NULL, + rav_response JSON NOT NULL, + reason TEXT NOT NULL +);