From c03638b18e1c9c088c9cc4f49919cf47e624044a Mon Sep 17 00:00:00 2001 From: Ellie Huxtable Date: Thu, 19 Oct 2023 17:21:49 +0100 Subject: [PATCH] Add billing limiter (#33) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add Redis lib * `cargo update` * fmt * Add base implementation of billing limiter Supports 1. A fixed set of limits, with no redis update 2. A fixed set, subsequently updated from redis 3. No fixed set, updates from redis I still need to figure out how to nicely mock the redis connection that stll leaves enough not mocked to be worth testing. I really don't want integration tests on it :( Also still needs connecting to the API. Reading through the python for this is like 😵‍💫 * Rework I've reworked it a bunch. Honestly the background loop worked but it became really horrible and the locking behaviour a little sketchy. While this will slow down some requests a bit, unless it becomes measurably slow let's keep it that way rather than introducing a bit of a horrible pattern. * hook it all up * Add redis read timeout * Add non-cluster client * Respond to feedback --- Cargo.lock | 257 +++++++++++++++++++++++---------- capture-server/Cargo.toml | 1 + capture-server/src/main.rs | 27 +++- capture/Cargo.toml | 2 + capture/src/api.rs | 12 ++ capture/src/billing_limits.rs | 188 ++++++++++++++++++++++++ capture/src/capture.rs | 24 ++- capture/src/lib.rs | 2 + capture/src/redis.rs | 80 ++++++++++ capture/src/router.rs | 9 +- capture/tests/django_compat.rs | 11 +- 11 files changed, 527 insertions(+), 86 deletions(-) create mode 100644 capture/src/billing_limits.rs create mode 100644 capture/src/redis.rs diff --git a/Cargo.lock b/Cargo.lock index 6ef5c6c..7ea2f9d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -30,9 +30,9 @@ dependencies = [ [[package]] name = "aho-corasick" -version = "1.0.5" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c378d78423fdad8089616f827526ee33c19f2fddbd5de1629152c9593ba4783" +checksum = "b2969dcb958b36655471fc61f7e416fa76033bdd4bfed0678d8fee1e2d07a1f0" dependencies = [ "memchr", ] @@ -61,7 +61,7 @@ checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.31", + "syn 2.0.38", ] [[package]] @@ -104,9 +104,9 @@ dependencies = [ [[package]] name = "axum-client-ip" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df8e81eacc93f36480825da5f46a33b5fb2246ed024eacc9e8933425b80c5807" +checksum = "1ef117890a418b7832678d9ea1e1c08456dd7b2fd1dadb9676cd6f0fe7eb4b21" dependencies = [ "axum", "forwarded-header-value", @@ -165,9 +165,9 @@ dependencies = [ [[package]] name = "base64" -version = "0.21.3" +version = "0.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "414dcefbc63d77c526a76b3afcf6fbb9b5e2791c19c3aa2297733208750c6e53" +checksum = "9ba43ea6f343b788c8764558649e08df62f86c6ef251fdaeb1ffd010a9ae50a2" [[package]] name = "bitflags" @@ -183,15 +183,15 @@ checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635" [[package]] name = "bumpalo" -version = "3.13.0" +version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1" +checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" [[package]] name = "bytes" -version = "1.4.0" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" +checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" [[package]] name = "capture" @@ -212,6 +212,8 @@ dependencies = [ "mockall", "rand", "rdkafka", + "redis", + "redis-test", "serde", "serde_json", "serde_urlencoded", @@ -231,6 +233,7 @@ version = "0.1.0" dependencies = [ "axum", "capture", + "time", "tokio", "tracing", "tracing-subscriber", @@ -260,6 +263,42 @@ dependencies = [ "cc", ] +[[package]] +name = "combine" +version = "4.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35ed6e9d84f0b51a7f52daf1c7d71dd136fd7a3f41a8462b8cdb8c78d920fad4" +dependencies = [ + "bytes", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util", +] + +[[package]] +name = "core-foundation" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "194a7a9e6de53fa55116934067c844d9d749312f75c6f6d0980e8c252f8c2146" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "core-foundation-sys" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e496a50fda8aacccc86d7529e2c1e0892dbd0f898a6b5645b5561b89c3210efa" + +[[package]] +name = "crc16" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "338089f42c427b86394a5ee60ff321da23a5c89c9d89514c829687b26359fcff" + [[package]] name = "crc32fast" version = "1.3.2" @@ -298,7 +337,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" dependencies = [ "cfg-if", - "hashbrown 0.14.0", + "hashbrown 0.14.1", "lock_api", "once_cell", "parking_lot_core", @@ -452,7 +491,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.31", + "syn 2.0.38", ] [[package]] @@ -562,15 +601,15 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.14.0" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a" +checksum = "7dfda62a12f55daeae5015f81b0baea145391cb4520f86c248fc615d72640d12" [[package]] name = "hermit-abi" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "443144c8cdadd93ebf52ddb4056d257f5b52c04d3c804e657d19eb73fc33668b" +checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" [[package]] name = "http" @@ -658,12 +697,12 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.0.0" +version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d" +checksum = "8adf3ddd720272c6ea8bf59463c04e0f93d0bbf7c5439b691bca2987e0270897" dependencies = [ "equivalent", - "hashbrown 0.14.0", + "hashbrown 0.14.1", ] [[package]] @@ -704,9 +743,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.147" +version = "0.2.149" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" +checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" [[package]] name = "libz-sys" @@ -756,15 +795,15 @@ dependencies = [ [[package]] name = "matchit" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed1202b2a6f884ae56f04cff409ab315c5ce26b5e58d7412e484f01fd52f52ef" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" [[package]] name = "memchr" -version = "2.6.3" +version = "2.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f232d6ef707e1956a43342693d2a31e72989554d58299d7a88738cc95b0d35c" +checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" [[package]] name = "memoffset" @@ -812,7 +851,7 @@ checksum = "ddece26afd34c31585c74a4db0630c376df271c285d682d1e55012197830b6df" dependencies = [ "proc-macro2", "quote", - "syn 2.0.31", + "syn 2.0.38", ] [[package]] @@ -929,9 +968,9 @@ dependencies = [ [[package]] name = "num-traits" -version = "0.2.16" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f30b0abd723be7e2ffca1272140fac1a2f084c77ec3e123c192b66af1ee9e6c2" +checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" dependencies = [ "autocfg", ] @@ -1034,7 +1073,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.31", + "syn 2.0.38", ] [[package]] @@ -1109,9 +1148,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.66" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9" +checksum = "134c189feb4956b20f6f547d2cf727d4c0fe06722b20a0eec87ed445a97f92da" dependencies = [ "unicode-ident", ] @@ -1227,6 +1266,40 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "redis" +version = "0.23.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f49cdc0bb3f412bf8e7d1bd90fe1d9eb10bc5c399ba90973c14662a27b3f8ba" +dependencies = [ + "async-trait", + "bytes", + "combine", + "crc16", + "futures", + "futures-util", + "itoa", + "log", + "percent-encoding", + "pin-project-lite", + "rand", + "ryu", + "sha1_smol", + "socket2 0.4.9", + "tokio", + "tokio-util", + "url", +] + +[[package]] +name = "redis-test" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aba266ca48ae66978bf439fd2ac0d7a36a8635823754e2bc73afaf9d2fc25272" +dependencies = [ + "redis", +] + [[package]] name = "redox_syscall" version = "0.3.5" @@ -1238,9 +1311,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.9.5" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "697061221ea1b4a94a624f67d0ae2bfe4e22b8a17b6a192afb11046542cc8c47" +checksum = "d119d7c7ca818f8a53c300863d4f87566aac09943aef5b355bb83969dae75d87" dependencies = [ "aho-corasick", "memchr", @@ -1250,9 +1323,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.8" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795" +checksum = "5d58da636bd923eae52b7e9120271cbefb16f399069ee566ca5ebf9c30e32238" dependencies = [ "aho-corasick", "memchr", @@ -1261,15 +1334,15 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.7.5" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" +checksum = "c3cbb081b9784b07cceb8824c8583f86db4814d172ab043f3c23f7dc600bf83d" [[package]] name = "reqwest" -version = "0.11.20" +version = "0.11.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e9ad3fe7488d7e34558a2033d45a0c90b72d97b4f80705666fea71472e2e6a1" +checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b" dependencies = [ "base64", "bytes", @@ -1291,6 +1364,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", + "system-configuration", "tokio", "tokio-util", "tower-service", @@ -1343,14 +1417,14 @@ checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.31", + "syn 2.0.38", ] [[package]] name = "serde_json" -version = "1.0.105" +version = "1.0.107" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "693151e1ac27563d6dbcec9dee9fbd5da8539b20fa14ad3752b2e6d363ace360" +checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65" dependencies = [ "itoa", "ryu", @@ -1379,11 +1453,17 @@ dependencies = [ "serde", ] +[[package]] +name = "sha1_smol" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" + [[package]] name = "sharded-slab" -version = "0.1.4" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" dependencies = [ "lazy_static", ] @@ -1414,9 +1494,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.11.0" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9" +checksum = "942b4a808e05215192e39f4ab80813e599068285906cc91aa64f923db842bd5a" [[package]] name = "socket2" @@ -1430,9 +1510,9 @@ dependencies = [ [[package]] name = "socket2" -version = "0.5.3" +version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2538b18701741680e0322a2302176d3253a35388e2e62f172f64f4f16605f877" +checksum = "4031e820eb552adee9295814c0ced9e5cf38ddf1e8b7d566d6de8e2538ea989e" dependencies = [ "libc", "windows-sys", @@ -1451,9 +1531,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.31" +version = "2.0.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "718fa2415bcb8d8bd775917a1bf12a7931b6dfa890753378538118181e0cb398" +checksum = "e96b79aaa137db8f61e26363a0c9b47d8b4ec75da28b7d1d614c2303e232408b" dependencies = [ "proc-macro2", "quote", @@ -1466,6 +1546,27 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "system-configuration" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" +dependencies = [ + "bitflags 1.3.2", + "core-foundation", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "termtree" version = "0.4.1" @@ -1474,22 +1575,22 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" [[package]] name = "thiserror" -version = "1.0.48" +version = "1.0.49" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d6d7a740b8a666a7e828dd00da9c0dc290dff53154ea77ac109281de90589b7" +checksum = "1177e8c6d7ede7afde3585fd2513e611227efd6481bd78d2e82ba1ce16557ed4" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.48" +version = "1.0.49" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49922ecae66cc8a249b77e68d1d0623c1b2c514f0060c27cdc68bd62a1219d35" +checksum = "10712f02019e9288794769fba95cd6847df9874d49d871d062172f9dd41bc4cc" dependencies = [ "proc-macro2", "quote", - "syn 2.0.31", + "syn 2.0.38", ] [[package]] @@ -1504,9 +1605,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17f6bb557fd245c28e6411aa56b6403c689ad95061f50e4be16c274e70a17e48" +checksum = "426f806f4089c493dcac0d24c29c01e2c38baf8e30f1b716ee37e83d200b18fe" dependencies = [ "deranged", "itoa", @@ -1517,15 +1618,15 @@ dependencies = [ [[package]] name = "time-core" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb" +checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] name = "time-macros" -version = "0.2.14" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a942f44339478ef67935ab2bbaec2fb0322496cf3cbe84b261e06ac3814c572" +checksum = "4ad70d68dba9e1f8aceda7aa6711965dfec1cac869f311a51bd08b3a2ccbce20" dependencies = [ "time-core", ] @@ -1547,9 +1648,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.32.0" +version = "1.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9" +checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653" dependencies = [ "backtrace", "bytes", @@ -1559,7 +1660,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.3", + "socket2 0.5.4", "tokio-macros", "windows-sys", ] @@ -1572,14 +1673,14 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.31", + "syn 2.0.38", ] [[package]] name = "tokio-util" -version = "0.7.8" +version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" +checksum = "1d68074620f57a0b21594d9735eb2e98ab38b17f80d3fcb189fca266771ca60d" dependencies = [ "bytes", "futures-core", @@ -1597,11 +1698,11 @@ checksum = "7cda73e2f1397b1262d6dfdcef8aafae14d1de7748d66822d3bfeeb6d03e5e4b" [[package]] name = "toml_edit" -version = "0.19.14" +version = "0.19.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8123f27e969974a3dfba720fdb560be359f57b44302d280ba72e76a74480e8a" +checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ - "indexmap 2.0.0", + "indexmap 2.0.2", "toml_datetime", "winnow", ] @@ -1694,7 +1795,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" dependencies = [ "proc-macro2", "quote", - "syn 2.0.31", + "syn 2.0.38", ] [[package]] @@ -1755,9 +1856,9 @@ checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460" [[package]] name = "unicode-ident" -version = "1.0.11" +version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "301abaae475aa91687eb82514b328ab47a211a533026cb25fc3e519b86adfc3c" +checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" [[package]] name = "unicode-normalization" @@ -1848,7 +1949,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.31", + "syn 2.0.38", "wasm-bindgen-shared", ] @@ -1882,7 +1983,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.31", + "syn 2.0.38", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -2006,9 +2107,9 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "winnow" -version = "0.5.15" +version = "0.5.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c2e3184b9c4e92ad5167ca73039d0c42476302ab603e2fec4487511f38ccefc" +checksum = "037711d82167854aff2018dfd193aa0fef5370f456732f0d5a0c59b0f1b4b907" dependencies = [ "memchr", ] diff --git a/capture-server/Cargo.toml b/capture-server/Cargo.toml index 04c6182..6378532 100644 --- a/capture-server/Cargo.toml +++ b/capture-server/Cargo.toml @@ -9,3 +9,4 @@ axum = { workspace = true } tokio = { workspace = true } tracing-subscriber = { workspace = true } tracing = { workspace = true } +time = { workspace = true } diff --git a/capture-server/src/main.rs b/capture-server/src/main.rs index e2232d5..9d60f89 100644 --- a/capture-server/src/main.rs +++ b/capture-server/src/main.rs @@ -1,7 +1,9 @@ use std::env; use std::net::SocketAddr; +use std::sync::Arc; -use capture::{router, sink, time}; +use capture::{billing_limits::BillingLimiter, redis::RedisClient, router, sink}; +use time::Duration; use tokio::signal; async fn shutdown() { @@ -23,16 +25,35 @@ async fn shutdown() { async fn main() { let use_print_sink = env::var("PRINT_SINK").is_ok(); let address = env::var("ADDRESS").unwrap_or(String::from("127.0.0.1:3000")); + let redis_addr = env::var("REDIS").expect("redis required; please set the REDIS env var"); + + let redis_client = + Arc::new(RedisClient::new(redis_addr).expect("failed to create redis client")); + + let billing = BillingLimiter::new(Duration::seconds(5), redis_client.clone()) + .expect("failed to create billing limiter"); let app = if use_print_sink { - router::router(time::SystemTime {}, sink::PrintSink {}, true) + router::router( + capture::time::SystemTime {}, + sink::PrintSink {}, + redis_client, + billing, + true, + ) } else { let brokers = env::var("KAFKA_BROKERS").expect("Expected KAFKA_BROKERS"); let topic = env::var("KAFKA_TOPIC").expect("Expected KAFKA_TOPIC"); let sink = sink::KafkaSink::new(topic, brokers).unwrap(); - router::router(time::SystemTime {}, sink, true) + router::router( + capture::time::SystemTime {}, + sink, + redis_client, + billing, + true, + ) }; // initialize tracing diff --git a/capture/Cargo.toml b/capture/Cargo.toml index 60cca70..7c84c6c 100644 --- a/capture/Cargo.toml +++ b/capture/Cargo.toml @@ -29,8 +29,10 @@ rdkafka = { workspace = true } metrics = { workspace = true } metrics-exporter-prometheus = { workspace = true } thiserror = { workspace = true } +redis = { version="0.23.3", features=["tokio-comp", "cluster", "cluster-async"] } [dev-dependencies] assert-json-diff = "2.0.2" axum-test-helper = "0.2.0" mockall = "0.11.2" +redis-test = "0.2.3" diff --git a/capture/src/api.rs b/capture/src/api.rs index 319056c..ff245b5 100644 --- a/capture/src/api.rs +++ b/capture/src/api.rs @@ -52,6 +52,12 @@ pub enum CaptureError { EventTooBig, #[error("invalid event could not be processed")] NonRetryableSinkError, + + #[error("billing limit reached")] + BillingLimit, + + #[error("rate limited")] + RateLimited, } impl IntoResponse for CaptureError { @@ -64,10 +70,16 @@ impl IntoResponse for CaptureError { | CaptureError::MissingDistinctId | CaptureError::EventTooBig | CaptureError::NonRetryableSinkError => (StatusCode::BAD_REQUEST, self.to_string()), + CaptureError::NoTokenError | CaptureError::MultipleTokensError | CaptureError::TokenValidationError(_) => (StatusCode::UNAUTHORIZED, self.to_string()), + CaptureError::RetryableSinkError => (StatusCode::SERVICE_UNAVAILABLE, self.to_string()), + + CaptureError::BillingLimit | CaptureError::RateLimited => { + (StatusCode::TOO_MANY_REQUESTS, self.to_string()) + } } .into_response() } diff --git a/capture/src/billing_limits.rs b/capture/src/billing_limits.rs new file mode 100644 index 0000000..44a997c --- /dev/null +++ b/capture/src/billing_limits.rs @@ -0,0 +1,188 @@ +use std::{collections::HashSet, ops::Sub, sync::Arc}; + +use crate::redis::Client; + +/// Limit accounts by team ID if they hit a billing limit +/// +/// We have an async celery worker that regularly checks on accounts + assesses if they are beyond +/// a billing limit. If this is the case, a key is set in redis. +/// +/// Requirements +/// +/// 1. Updates from the celery worker should be reflected in capture within a short period of time +/// 2. Capture should cope with redis being _totally down_, and fail open +/// 3. We should not hit redis for every single request +/// +/// The solution here is to read from the cache until a time interval is hit, and then fetch new +/// data. The write requires taking a lock that stalls all readers, though so long as redis reads +/// stay fast we're ok. +/// +/// Some small delay between an account being limited and the limit taking effect is acceptable. +/// However, ideally we should not allow requests from some pods but 429 from others. +use thiserror::Error; +use time::{Duration, OffsetDateTime}; +use tokio::sync::RwLock; + +// todo: fetch from env +const QUOTA_LIMITER_CACHE_KEY: &str = "@posthog/quota-limits/"; + +pub enum QuotaResource { + Events, + Recordings, +} + +impl QuotaResource { + fn as_str(&self) -> &'static str { + match self { + Self::Events => "events", + Self::Recordings => "recordings", + } + } +} + +#[derive(Error, Debug)] +pub enum LimiterError { + #[error("updater already running - there can only be one")] + UpdaterRunning, +} + +#[derive(Clone)] +pub struct BillingLimiter { + limited: Arc>>, + redis: Arc, + interval: Duration, + updated: Arc>, +} + +impl BillingLimiter { + /// Create a new BillingLimiter. + /// + /// This connects to a redis cluster - pass in a vec of addresses for the initial nodes. + /// + /// You can also initialize the limiter with a set of tokens to limit from the very beginning. + /// This may be overridden by Redis, if the sets differ, + /// + /// Pass an empty redis node list to only use this initial set. + pub fn new( + interval: Duration, + redis: Arc, + ) -> anyhow::Result { + let limited = Arc::new(RwLock::new(HashSet::new())); + + // Force an update immediately if we have any reasonable interval + let updated = OffsetDateTime::from_unix_timestamp(0)?; + let updated = Arc::new(RwLock::new(updated)); + + Ok(BillingLimiter { + interval, + limited, + updated, + redis, + }) + } + + async fn fetch_limited( + client: &Arc, + resource: QuotaResource, + ) -> anyhow::Result> { + let now = time::OffsetDateTime::now_utc().unix_timestamp(); + + client + .zrangebyscore( + format!("{QUOTA_LIMITER_CACHE_KEY}{}", resource.as_str()), + now.to_string(), + String::from("+Inf"), + ) + .await + } + + pub async fn is_limited(&self, key: &str, resource: QuotaResource) -> bool { + // hold the read lock to clone it, very briefly. clone is ok because it's very small 🤏 + // rwlock can have many readers, but one writer. the writer will wait in a queue with all + // the readers, so we want to hold read locks for the smallest time possible to avoid + // writers waiting for too long. and vice versa. + let updated = { + let updated = self.updated.read().await; + *updated + }; + + let now = OffsetDateTime::now_utc(); + let since_update = now.sub(updated); + + // If an update is due, fetch the set from redis + cache it until the next update is due. + // Otherwise, return a value from the cache + // + // This update will block readers! Keep it fast. + if since_update > self.interval { + let span = tracing::debug_span!("updating billing cache from redis"); + let _span = span.enter(); + + // a few requests might end up in here concurrently, but I don't think a few extra will + // be a big problem. If it is, we can rework the concurrency a bit. + // On prod atm we call this around 15 times per second at peak times, and it usually + // completes in <1ms. + + let set = Self::fetch_limited(&self.redis, resource).await; + + tracing::debug!("fetched set from redis, caching"); + + if let Ok(set) = set { + let set = HashSet::from_iter(set.iter().cloned()); + + let mut limited = self.limited.write().await; + *limited = set; + + tracing::debug!("updated cache from redis"); + + limited.contains(key) + } else { + tracing::error!("failed to fetch from redis in time, failing open"); + // If we fail to fetch the set, something really wrong is happening. To avoid + // dropping events that we don't mean to drop, fail open and accept data. Better + // than angry customers :) + // + // TODO: Consider backing off our redis checks + false + } + } else { + let l = self.limited.read().await; + + l.contains(key) + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + use time::Duration; + + use crate::{ + billing_limits::{BillingLimiter, QuotaResource}, + redis::MockRedisClient, + }; + + #[tokio::test] + async fn test_dynamic_limited() { + let client = MockRedisClient::new().zrangebyscore_ret(vec![String::from("banana")]); + let client = Arc::new(client); + + let limiter = BillingLimiter::new(Duration::microseconds(1), client) + .expect("Failed to create billing limiter"); + + assert_eq!( + limiter + .is_limited("idk it doesn't matter", QuotaResource::Events) + .await, + false + ); + + assert_eq!( + limiter + .is_limited("some_org_hit_limits", QuotaResource::Events) + .await, + false + ); + assert!(limiter.is_limited("banana", QuotaResource::Events).await); + } +} diff --git a/capture/src/capture.rs b/capture/src/capture.rs index 98a61d3..65e64c9 100644 --- a/capture/src/capture.rs +++ b/capture/src/capture.rs @@ -12,6 +12,7 @@ use axum_client_ip::InsecureClientIp; use base64::Engine; use time::OffsetDateTime; +use crate::billing_limits::QuotaResource; use crate::event::ProcessingContext; use crate::token::validate_token; use crate::{ @@ -44,7 +45,7 @@ pub async fn event( _ => RawEvent::from_bytes(&meta, body), }?; - println!("Got events {:?}", &events); + tracing::debug!("got events {:?}", &events); if events.is_empty() { return Err(CaptureError::EmptyBatch); @@ -61,6 +62,7 @@ pub async fn event( } None }); + let context = ProcessingContext { lib_version: meta.lib_version.clone(), sent_at, @@ -69,7 +71,25 @@ pub async fn event( client_ip: ip.to_string(), }; - println!("Got context {:?}", &context); + let limited = state + .billing + .is_limited(context.token.as_str(), QuotaResource::Events) + .await; + + if limited { + // for v0 we want to just return ok 🙃 + // this is because the clients are pretty dumb and will just retry over and over and + // over... + // + // for v1, we'll return a meaningful error code and error, so that the clients can do + // something meaningful with that error + + return Ok(Json(CaptureResponse { + status: CaptureResponseCode::Ok, + })); + } + + tracing::debug!("got context {:?}", &context); process_events(state.sink.clone(), &events, &context).await?; diff --git a/capture/src/lib.rs b/capture/src/lib.rs index d4ca041..fcd802b 100644 --- a/capture/src/lib.rs +++ b/capture/src/lib.rs @@ -1,7 +1,9 @@ pub mod api; +pub mod billing_limits; pub mod capture; pub mod event; pub mod prometheus; +pub mod redis; pub mod router; pub mod sink; pub mod time; diff --git a/capture/src/redis.rs b/capture/src/redis.rs new file mode 100644 index 0000000..c83c0ad --- /dev/null +++ b/capture/src/redis.rs @@ -0,0 +1,80 @@ +use std::time::Duration; + +use anyhow::Result; +use async_trait::async_trait; +use redis::AsyncCommands; +use tokio::time::timeout; + +// average for all commands is <10ms, check grafana +const REDIS_TIMEOUT_MILLISECS: u64 = 10; + +/// A simple redis wrapper +/// I'm currently just exposing the commands we use, for ease of implementation +/// Allows for testing + injecting failures +/// We can also swap it out for alternative implementations in the future +/// I tried using redis-rs Connection/ConnectionLike traits but honestly things just got really +/// awkward to work with. + +#[async_trait] +pub trait Client { + // A very simplified wrapper, but works for our usage + async fn zrangebyscore(&self, k: String, min: String, max: String) -> Result>; +} + +pub struct RedisClient { + client: redis::Client, +} + +impl RedisClient { + pub fn new(addr: String) -> Result { + let client = redis::Client::open(addr)?; + + Ok(RedisClient { client }) + } +} + +#[async_trait] +impl Client for RedisClient { + async fn zrangebyscore(&self, k: String, min: String, max: String) -> Result> { + let mut conn = self.client.get_async_connection().await?; + + let results = conn.zrangebyscore(k, min, max); + let fut = timeout(Duration::from_secs(REDIS_TIMEOUT_MILLISECS), results).await?; + + Ok(fut?) + } +} + +// mockall got really annoying with async and results so I'm just gonna do my own +#[derive(Clone)] +pub struct MockRedisClient { + zrangebyscore_ret: Vec, +} + +impl MockRedisClient { + pub fn new() -> MockRedisClient { + MockRedisClient { + zrangebyscore_ret: Vec::new(), + } + } + + pub fn zrangebyscore_ret(&mut self, ret: Vec) -> Self { + self.zrangebyscore_ret = ret; + + self.clone() + } +} + +impl Default for MockRedisClient { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl Client for MockRedisClient { + // A very simplified wrapper, but works for our usage + async fn zrangebyscore(&self, _k: String, _min: String, _max: String) -> Result> { + Ok(self.zrangebyscore_ret.clone()) + } +} diff --git a/capture/src/router.rs b/capture/src/router.rs index 0c40658..757b975 100644 --- a/capture/src/router.rs +++ b/capture/src/router.rs @@ -7,7 +7,7 @@ use axum::{ }; use tower_http::trace::TraceLayer; -use crate::{capture, sink, time::TimeSource}; +use crate::{billing_limits::BillingLimiter, capture, redis::Client, sink, time::TimeSource}; use crate::prometheus::{setup_metrics_recorder, track_metrics}; @@ -15,6 +15,8 @@ use crate::prometheus::{setup_metrics_recorder, track_metrics}; pub struct State { pub sink: Arc, pub timesource: Arc, + pub redis: Arc, + pub billing: BillingLimiter, } async fn index() -> &'static str { @@ -24,14 +26,19 @@ async fn index() -> &'static str { pub fn router< TZ: TimeSource + Send + Sync + 'static, S: sink::EventSink + Send + Sync + 'static, + R: Client + Send + Sync + 'static, >( timesource: TZ, sink: S, + redis: Arc, + billing: BillingLimiter, metrics: bool, ) -> Router { let state = State { sink: Arc::new(sink), timesource: Arc::new(timesource), + redis, + billing, }; let router = Router::new() diff --git a/capture/tests/django_compat.rs b/capture/tests/django_compat.rs index 119777d..d418996 100644 --- a/capture/tests/django_compat.rs +++ b/capture/tests/django_compat.rs @@ -5,7 +5,9 @@ use axum_test_helper::TestClient; use base64::engine::general_purpose; use base64::Engine; use capture::api::{CaptureError, CaptureResponse, CaptureResponseCode}; +use capture::billing_limits::BillingLimiter; use capture::event::ProcessedEvent; +use capture::redis::MockRedisClient; use capture::router::router; use capture::sink::EventSink; use capture::time::TimeSource; @@ -15,7 +17,7 @@ use std::fs::File; use std::io::{BufRead, BufReader}; use std::sync::{Arc, Mutex}; use time::format_description::well_known::{Iso8601, Rfc3339}; -use time::OffsetDateTime; +use time::{Duration, OffsetDateTime}; #[derive(Debug, Deserialize)] struct RequestDump { @@ -93,7 +95,12 @@ async fn it_matches_django_capture_behaviour() -> anyhow::Result<()> { let sink = MemorySink::default(); let timesource = FixedTime { time: case.now }; - let app = router(timesource, sink.clone(), false); + + let redis = Arc::new(MockRedisClient::new()); + let billing = BillingLimiter::new(Duration::weeks(1), redis.clone()) + .expect("failed to create billing limiter"); + + let app = router(timesource, sink.clone(), redis, billing, false); let client = TestClient::new(app); let mut req = client.post(&format!("/i/v0{}", case.path)).body(raw_body);