From 3959843a725d1170796e6e9905ef001ae1a24dab Mon Sep 17 00:00:00 2001 From: Samuel Gomez Date: Thu, 18 Apr 2024 16:47:59 +0100 Subject: [PATCH] Migrating from http 0.* to 1.*, and IOS v.3.0.0 --- Cargo.lock | 561 +++++++++++++++--- Cargo.toml | 22 +- api/Cargo.toml | 9 +- api/src/config.rs | 7 +- api/src/endpoints/common_model.rs | 38 +- api/src/endpoints/connection.rs | 22 +- api/src/endpoints/connection_definition.rs | 66 +-- .../endpoints/connection_model_definition.rs | 103 ++-- api/src/endpoints/connection_model_schema.rs | 62 +- .../endpoints/connection_oauth_definition.rs | 144 ++--- api/src/endpoints/event_access.rs | 33 +- api/src/endpoints/events.rs | 18 +- api/src/endpoints/mod.rs | 40 +- api/src/endpoints/oauth.rs | 8 +- api/src/endpoints/passthrough.rs | 4 +- api/src/endpoints/pipeline.rs | 54 +- api/src/endpoints/transactions.rs | 18 +- api/src/endpoints/unified.rs | 4 +- api/src/main.rs | 23 +- api/src/middleware/auth.rs | 8 +- api/src/middleware/extractor.rs | 21 + api/src/middleware/jwt_auth.rs | 32 +- api/src/middleware/mod.rs | 2 +- api/src/middleware/rate_limiter.rs | 122 ---- api/src/routes/protected.rs | 81 +-- api/src/server.rs | 36 +- api/src/util/openapi_builder.rs | 4 +- api/src/util/shape_mongo_filter.rs | 6 +- api/tests/api_tests/get_tests.rs | 2 +- api/tests/api_tests/pagination_tests.rs | 2 +- api/tests/api_tests/passthrough_tests.rs | 2 +- api/tests/api_tests/schema_tests.rs | 4 +- api/tests/api_tests/test_crud.rs | 2 +- api/tests/api_tests/test_server/mod.rs | 8 +- api/tests/api_tests/test_server/test_core.rs | 4 +- .../api_tests/test_server/test_gateway.rs | 5 +- api/tests/api_tests/transaction_tests.rs | 2 +- api/tests/api_tests/unified_tests.rs | 4 +- event-core/src/config.rs | 2 +- event-core/src/dispatcher.rs | 8 +- event-core/src/event_handler.rs | 2 +- event-core/src/main.rs | 13 +- event-core/src/mongo_control_data_store.rs | 13 +- event-core/src/store.rs | 2 +- event-core/tests/mock_destination.rs | 10 +- event-core/tests/mock_storage.rs | 8 +- gateway/Cargo.toml | 5 +- gateway/benches/bench.rs | 2 +- gateway/src/config.rs | 2 +- gateway/src/finalize_event.rs | 2 +- gateway/src/finalizer.rs | 2 +- gateway/src/main.rs | 13 +- gateway/src/mock_finalizer.rs | 2 +- gateway/src/server.rs | 31 +- gateway/src/state.rs | 2 +- watchdog/src/config.rs | 25 - watchdog/src/lib.rs | 1 - watchdog/src/main.rs | 274 +-------- 58 files changed, 964 insertions(+), 1038 deletions(-) create mode 100644 api/src/middleware/extractor.rs delete mode 100644 api/src/middleware/rate_limiter.rs delete mode 100644 watchdog/src/config.rs delete mode 100644 watchdog/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index e1b15af1..1fbe1168 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -100,9 +100,9 @@ dependencies = [ "futures-util", "gateway", "handlebars", - "http", + "http 1.1.0", "http-serde-ext", - "hyper", + "hyper 0.14.28", "indexmap 2.2.5", "integrationos-domain", "jsonwebtoken", @@ -113,7 +113,7 @@ dependencies = [ "openapiv3", "rand", "redis", - "reqwest", + "reqwest 0.12.3", "segment", "semver 1.0.22", "serde", @@ -123,6 +123,7 @@ dependencies = [ "tokio", "tower", "tower-http", + "tower_governor", "tracing", "tracing-subscriber", "uuid", @@ -190,18 +191,20 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "axum" -version = "0.6.20" +version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf" dependencies = [ "async-trait", "axum-core", - "bitflags 1.3.2", + "axum-macros", "bytes", "futures-util", - "http", - "http-body", - "hyper", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.3.1", + "hyper-util", "itoa", "matchit", "memchr", @@ -213,35 +216,40 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_urlencoded", - "sync_wrapper", + "sync_wrapper 1.0.1", "tokio", "tower", "tower-layer", "tower-service", + "tracing", ] [[package]] name = "axum-core" -version = "0.3.4" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +checksum = "a15c63fd72d41492dc4f497196f5da1fb04fb7529e631d73630d1b491e47a2e3" dependencies = [ "async-trait", "bytes", "futures-util", - "http", - "http-body", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", "mime", + "pin-project-lite", "rustversion", + "sync_wrapper 0.1.2", "tower-layer", "tower-service", + "tracing", ] [[package]] name = "axum-macros" -version = "0.3.8" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cdca6a10ecad987bda04e95606ef85a5417dcaac1a78455242d72e031e2b6b62" +checksum = "00c055ee2d014ae5981ce1016374e8213682aa14d9bf40e48ab48b5f3ef20eaa" dependencies = [ "heck", "proc-macro2", @@ -251,20 +259,19 @@ dependencies = [ [[package]] name = "axum-prometheus" -version = "0.4.0" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97def327c5481791abb57ac295bfc70f2e1a0727675b7dbf74bd1b27a72b6fd8" +checksum = "b683cbc43010e9a3d72c2f31ca464155ff4f95819e88a32924b0f47a43898978" dependencies = [ "axum", - "axum-core", "bytes", "futures", "futures-core", - "http", - "http-body", + "http 1.1.0", + "http-body 1.0.0", "matchit", - "metrics", - "metrics-exporter-prometheus", + "metrics 0.22.3", + "metrics-exporter-prometheus 0.13.1", "once_cell", "pin-project", "tokio", @@ -299,6 +306,12 @@ version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" +[[package]] +name = "base64" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9475866fec1451be56a3c2400fd081ff546538961565ccb5b7142cbd22bc7a51" + [[package]] name = "base64ct" version = "1.6.0" @@ -750,6 +763,19 @@ dependencies = [ "syn 2.0.52", ] +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown 0.14.3", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.5.0" @@ -977,16 +1003,16 @@ dependencies = [ "fake", "futures", "handlebars", - "http", + "http 1.1.0", "integrationos-domain", "js-sandbox-ios", - "metrics", - "metrics-exporter-prometheus", + "metrics 0.21.1", + "metrics-exporter-prometheus 0.12.2", "mockito", "moka", "mongodb", "redis", - "reqwest", + "reqwest 0.12.3", "serde", "serde_json", "testcontainers-modules", @@ -1005,14 +1031,14 @@ checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" [[package]] name = "fake" -version = "2.9.1" +version = "2.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26221445034074d46b276e13eb97a265ebdb8ed8da705c4dddd3dd20b66b45d2" +checksum = "1c25829bde82205da46e1823b2259db6273379f626fc211f126f65654a2669be" dependencies = [ "chrono", "deunicode", "dummy", - "http", + "http 1.1.0", "rand", "rand_core", "semver 1.0.22", @@ -1048,6 +1074,16 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "forwarded-header-value" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8835f84f38484cc86f110a805655697908257fb9a7af005234060891557198e9" +dependencies = [ + "nonempty", + "thiserror", +] + [[package]] name = "fslock" version = "0.1.8" @@ -1135,6 +1171,12 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" + [[package]] name = "futures-util" version = "0.3.30" @@ -1160,12 +1202,13 @@ dependencies = [ "anyhow", "async-trait", "axum", - "axum-macros", "axum-prometheus", "criterion", "dotenvy", "envconfig", - "http", + "futures", + "http 1.1.0", + "http-body-util", "http-serde-ext", "integrationos-domain", "moka", @@ -1190,6 +1233,16 @@ dependencies = [ "version_check", ] +[[package]] +name = "gethostname" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1ebd34e35c46e00bb73e81363248d627782724609fe1b6396f553f68fe3862e" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "getrandom" version = "0.2.12" @@ -1213,6 +1266,26 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "governor" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68a7f542ee6b35af73b06abc0dad1c1bae89964e4e253bc4b587b91c9637867b" +dependencies = [ + "cfg-if", + "dashmap", + "futures", + "futures-timer", + "no-std-compat", + "nonzero_ext", + "parking_lot", + "portable-atomic", + "quanta 0.12.2", + "rand", + "smallvec", + "spinning_top", +] + [[package]] name = "h2" version = "0.3.24" @@ -1224,7 +1297,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.12", "indexmap 2.2.5", "slab", "tokio", @@ -1276,6 +1349,9 @@ name = "hashbrown" version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +dependencies = [ + "ahash", +] [[package]] name = "heck" @@ -1335,6 +1411,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.6" @@ -1342,23 +1429,40 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", - "http", + "http 0.2.12", "pin-project-lite", ] [[package]] -name = "http-range-header" -version = "0.3.1" +name = "http-body" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +dependencies = [ + "bytes", + "http 1.1.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "add0ab9360ddbd88cfeb3bd9574a1d85cfdfa14db10b3e21d3700dbc4328758f" +checksum = "0475f8b2ac86659c21b64320d5d653f9efe42acd2a4e560073ec61a155a34f1d" +dependencies = [ + "bytes", + "futures-core", + "http 1.1.0", + "http-body 1.0.0", + "pin-project-lite", +] [[package]] name = "http-serde-ext" -version = "0.1.8" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c94a985b23074de11e6d99712873131c3a3d12cf482406de7e0a2ec8a4cd1943" +checksum = "665c24b8e7e21688dc74edb228f07c1815bbc7ff3b48a3ee72fa20937fbde095" dependencies = [ - "http", + "http 1.1.0", "serde", ] @@ -1385,8 +1489,8 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", - "http-body", + "http 0.2.12", + "http-body 0.4.6", "httparse", "httpdate", "itoa", @@ -1398,6 +1502,26 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe575dd17d0862a9a33781c8c4696a55c320909004a67a00fb286ba8b1bc496d" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", + "want", +] + [[package]] name = "hyper-rustls" version = "0.24.2" @@ -1405,11 +1529,48 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ "futures-util", - "http", - "hyper", - "rustls", + "http 0.2.12", + "hyper 0.14.28", + "rustls 0.21.10", + "tokio", + "tokio-rustls 0.24.1", +] + +[[package]] +name = "hyper-rustls" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c" +dependencies = [ + "futures-util", + "http 1.1.0", + "hyper 1.3.1", + "hyper-util", + "rustls 0.22.3", + "rustls-pki-types", + "tokio", + "tokio-rustls 0.25.0", + "tower-service", +] + +[[package]] +name = "hyper-util" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca38ef113da30126bbff9cd1705f9273e15d45498615d138b0c20279ac7a76aa" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "hyper 1.3.1", + "pin-project-lite", + "socket2 0.5.6", "tokio", - "tokio-rustls", + "tower", + "tower-service", + "tracing", ] [[package]] @@ -1510,9 +1671,9 @@ dependencies = [ [[package]] name = "integrationos-domain" -version = "1.2.0" +version = "3.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54b659a2663d9b21703fcac95895c05104289b25da1efe95de0c9a6c5ffe225b" +checksum = "eb24610e25746745657ee39e75aac2faea7300a9f570e58e6096647c3e1421cb" dependencies = [ "aes", "anyhow", @@ -1529,7 +1690,7 @@ dependencies = [ "fake", "futures", "handlebars", - "http", + "http 1.1.0", "http-serde-ext", "indexmap 2.2.5", "js-sandbox-ios", @@ -1541,7 +1702,7 @@ dependencies = [ "prost", "rand", "redis", - "reqwest", + "reqwest 0.12.3", "semver 1.0.22", "serde", "serde_json", @@ -1551,6 +1712,8 @@ dependencies = [ "thiserror", "tokio", "tracing", + "tracing-bunyan-formatter", + "tracing-log 0.2.0", "tracing-subscriber", "uuid", ] @@ -1564,7 +1727,7 @@ dependencies = [ "socket2 0.5.6", "widestring", "windows-sys 0.48.0", - "winreg", + "winreg 0.50.0", ] [[package]] @@ -1786,6 +1949,16 @@ dependencies = [ "portable-atomic", ] +[[package]] +name = "metrics" +version = "0.22.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2be3cbd384d4e955b231c895ce10685e3d8260c5ccffae898c96c723b0772835" +dependencies = [ + "ahash", + "portable-atomic", +] + [[package]] name = "metrics-exporter-prometheus" version = "0.12.2" @@ -1793,17 +1966,34 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d4fa7ce7c4862db464a37b0b31d89bca874562f034bd7993895572783d02950" dependencies = [ "base64 0.21.7", - "hyper", + "hyper 0.14.28", "indexmap 1.9.3", "ipnet", - "metrics", - "metrics-util", + "metrics 0.21.1", + "metrics-util 0.15.1", "quanta 0.11.1", "thiserror", "tokio", "tracing", ] +[[package]] +name = "metrics-exporter-prometheus" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9bf4e7146e30ad172c42c39b3246864bd2d3c6396780711a1baf749cfe423e21" +dependencies = [ + "base64 0.21.7", + "hyper 0.14.28", + "indexmap 2.2.5", + "ipnet", + "metrics 0.22.3", + "metrics-util 0.16.3", + "quanta 0.12.2", + "thiserror", + "tokio", +] + [[package]] name = "metrics-macros" version = "0.7.1" @@ -1824,12 +2014,27 @@ dependencies = [ "crossbeam-epoch", "crossbeam-utils", "hashbrown 0.13.1", - "metrics", + "metrics 0.21.1", "num_cpus", "quanta 0.11.1", "sketches-ddsketch", ] +[[package]] +name = "metrics-util" +version = "0.16.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b07a5eb561b8cbc16be2d216faf7757f9baf3bfb94dbb0fae3df8387a5bb47f" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", + "hashbrown 0.14.3", + "metrics 0.22.3", + "num_cpus", + "quanta 0.12.2", + "sketches-ddsketch", +] + [[package]] name = "mime" version = "0.3.17" @@ -1865,7 +2070,7 @@ dependencies = [ "assert-json-diff", "colored", "futures-core", - "hyper", + "hyper 0.14.28", "log", "rand", "regex", @@ -1924,8 +2129,8 @@ dependencies = [ "percent-encoding", "rand", "rustc_version_runtime", - "rustls", - "rustls-pemfile", + "rustls 0.21.10", + "rustls-pemfile 1.0.4", "serde", "serde_bytes", "serde_with", @@ -1937,15 +2142,33 @@ dependencies = [ "take_mut", "thiserror", "tokio", - "tokio-rustls", + "tokio-rustls 0.24.1", "tokio-util", "trust-dns-proto", "trust-dns-resolver", "typed-builder", "uuid", - "webpki-roots", + "webpki-roots 0.25.4", ] +[[package]] +name = "no-std-compat" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" + +[[package]] +name = "nonempty" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9e591e719385e6ebaeb5ce5d3887f7d5676fceca6411d1925ccc95745f3d6f7" + +[[package]] +name = "nonzero_ext" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -2509,10 +2732,10 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", - "http-body", - "hyper", - "hyper-rustls", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.28", + "hyper-rustls 0.24.2", "ipnet", "js-sys", "log", @@ -2520,22 +2743,63 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls", - "rustls-pemfile", + "rustls 0.21.10", + "rustls-pemfile 1.0.4", "serde", "serde_json", "serde_urlencoded", - "sync_wrapper", + "sync_wrapper 0.1.2", "system-configuration", "tokio", - "tokio-rustls", + "tokio-rustls 0.24.1", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "webpki-roots 0.25.4", + "winreg 0.50.0", +] + +[[package]] +name = "reqwest" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e6cc1e89e689536eb5aeede61520e874df5a4707df811cd5da4aa5fbb2aae19" +dependencies = [ + "base64 0.22.0", + "bytes", + "futures-core", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.3.1", + "hyper-rustls 0.26.0", + "hyper-util", + "ipnet", + "js-sys", + "log", + "mime", + "once_cell", + "percent-encoding", + "pin-project-lite", + "rustls 0.22.3", + "rustls-pemfile 2.1.2", + "rustls-pki-types", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper 0.1.2", + "tokio", + "tokio-rustls 0.25.0", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", "web-sys", - "webpki-roots", - "winreg", + "webpki-roots 0.26.1", + "winreg 0.52.0", ] [[package]] @@ -2633,10 +2897,24 @@ checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba" dependencies = [ "log", "ring 0.17.8", - "rustls-webpki", + "rustls-webpki 0.101.7", "sct", ] +[[package]] +name = "rustls" +version = "0.22.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99008d7ad0bbbea527ec27bddbc0e432c5b87d8175178cee68d2eec9c4a1813c" +dependencies = [ + "log", + "ring 0.17.8", + "rustls-pki-types", + "rustls-webpki 0.102.2", + "subtle", + "zeroize", +] + [[package]] name = "rustls-pemfile" version = "1.0.4" @@ -2646,6 +2924,22 @@ dependencies = [ "base64 0.21.7", ] +[[package]] +name = "rustls-pemfile" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29993a25686778eb88d4189742cd713c9bce943bc54251a33509dc63cbacf73d" +dependencies = [ + "base64 0.22.0", + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecd36cc4259e3e4514335c4a138c6b43171a8d61d8f5c9348f9fc7529416f247" + [[package]] name = "rustls-webpki" version = "0.101.7" @@ -2656,6 +2950,17 @@ dependencies = [ "untrusted 0.9.0", ] +[[package]] +name = "rustls-webpki" +version = "0.102.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "faaa0a62740bedb9b2ef5afa303da42764c012f743917351dc9a237ea1663610" +dependencies = [ + "ring 0.17.8", + "rustls-pki-types", + "untrusted 0.9.0", +] + [[package]] name = "rustversion" version = "1.0.14" @@ -2700,7 +3005,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "12485833e00457a6bbba60397d3f19362751a0caefe27f6755fff1a2be4fd601" dependencies = [ "async-trait", - "reqwest", + "reqwest 0.11.24", "serde", "serde_json", "thiserror", @@ -2989,6 +3294,15 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +[[package]] +name = "spinning_top" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300" +dependencies = [ + "lock_api", +] + [[package]] name = "static_assertions" version = "1.1.0" @@ -3068,6 +3382,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "sync_wrapper" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" + [[package]] name = "system-configuration" version = "0.5.1" @@ -3287,7 +3607,18 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls", + "rustls 0.21.10", + "tokio", +] + +[[package]] +name = "tokio-rustls" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" +dependencies = [ + "rustls 0.22.3", + "rustls-pki-types", "tokio", ] @@ -3324,17 +3655,15 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.4.4" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140" +checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" dependencies = [ "bitflags 2.4.2", "bytes", - "futures-core", - "futures-util", - "http", - "http-body", - "http-range-header", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", "pin-project-lite", "tower-layer", "tower-service", @@ -3353,6 +3682,22 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" +[[package]] +name = "tower_governor" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3790eac6ad3fb8d9d96c2b040ae06e2517aa24b067545d1078b96ae72f7bb9a7" +dependencies = [ + "axum", + "forwarded-header-value", + "governor", + "http 1.1.0", + "pin-project", + "thiserror", + "tower", + "tracing", +] + [[package]] name = "tracing" version = "0.1.40" @@ -3376,6 +3721,24 @@ dependencies = [ "syn 2.0.52", ] +[[package]] +name = "tracing-bunyan-formatter" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5c266b9ac83dedf0e0385ad78514949e6d89491269e7065bee51d2bb8ec7373" +dependencies = [ + "ahash", + "gethostname", + "log", + "serde", + "serde_json", + "time", + "tracing", + "tracing-core", + "tracing-log 0.1.4", + "tracing-subscriber", +] + [[package]] name = "tracing-core" version = "0.1.32" @@ -3386,6 +3749,17 @@ dependencies = [ "valuable", ] +[[package]] +name = "tracing-log" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f751112709b4e791d8ce53e32c4ed2d353565a795ce84da2285393f41557bdf2" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + [[package]] name = "tracing-log" version = "0.2.0" @@ -3412,7 +3786,7 @@ dependencies = [ "thread_local", "tracing", "tracing-core", - "tracing-log", + "tracing-log 0.2.0", ] [[package]] @@ -3774,6 +4148,15 @@ version = "0.25.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" +[[package]] +name = "webpki-roots" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3de34ae270483955a94f4b21bdaaeb83d508bb84a01435f393818edb0012009" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "which" version = "4.4.2" @@ -3974,6 +4357,16 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "winreg" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a277a57398d4bfa075df44f501a17cfdf8542d224f0d36095a2adc7aee4ef0a5" +dependencies = [ + "cfg-if", + "windows-sys 0.48.0", +] + [[package]] name = "wyz" version = "0.5.1" @@ -4002,3 +4395,9 @@ dependencies = [ "quote", "syn 2.0.52", ] + +[[package]] +name = "zeroize" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "525b4ec142c6b68a2d10f01f7bbf6755599ca3f81ea53b8431b7dd348f5fdb2d" diff --git a/Cargo.toml b/Cargo.toml index 320b1f13..01ffaa8d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,19 +1,17 @@ [workspace] - resolver = "2" - members = [ "api", "event-core", "gateway", - "watchdog" + "watchdog", ] [workspace.dependencies] anyhow = "1.0.75" +async-recursion = "1.0.5" async-trait = "0.1.74" -axum = "0.6.20" -axum-macros = "0.3.8" +axum = {version = "0.7", features = ["macros"]} base64 = "0.21.5" base64ct = { version = "1.6.0", features = ["alloc"] } bson = "2.7.0" @@ -21,7 +19,7 @@ chrono = { version = "0.4.31", features = ["serde"] } convert_case = "0.6.0" dotenvy = "0.15.7" envconfig = "0.10.0" -fake = { version = "=2.9.1", features = [ +fake = { version = "2.9.2", features = [ "uuid", "derive", "dummy", @@ -34,9 +32,9 @@ fake = { version = "=2.9.1", features = [ futures = "0.3.28" futures-util = "0.3.28" handlebars = "4.4.0" -http = "0.2.9" -http-serde-ext = "0.1.8" -integrationos-domain = "1.0.0" +http = "1.1.0" +http-serde-ext = "1.0.2" +integrationos-domain = "3.0.0" js-sandbox-ios = "0.1.0" jsonpath_lib = "0.3.0" jsonwebtoken = "8.3.0" @@ -48,7 +46,7 @@ openapiv3 = { version = "2.0.0", features = ["skip_serializing_defaults"] } rand = "0.8.5" redis = { version = "0.23.3", features = ["connection-manager", "tokio-comp"] } regex = "1.10.2" -reqwest = { version = "0.11.22", features = [ +reqwest = { version = "0.12.3", features = [ "json", "rustls-tls", ], default-features = false } @@ -64,7 +62,7 @@ tokio = { version = "1.33.0", features = [ "time", "sync", ] } -tower-http = { version = "0.4.4", features = [ +tower-http = { version = "0.5", features = [ "trace", "cors", "sensitive-headers", @@ -76,4 +74,4 @@ uuid = { version = "1.5.0", features = ["v4", "serde"] } validator = { version = "0.16.1", features = ["derive"] } [profile.release] -lto = true +lto = "thin" diff --git a/api/Cargo.toml b/api/Cargo.toml index f1017211..817bd3b4 100644 --- a/api/Cargo.toml +++ b/api/Cargo.toml @@ -15,18 +15,14 @@ convert_case.workspace = true dotenvy.workspace = true envconfig.workspace = true fake = { workspace = true, optional = true } -futures.workspace = true futures-util.workspace = true +futures.workspace = true handlebars.workspace = true http-serde-ext.workspace = true http.workspace = true hyper = "0.14.27" indexmap = "2.1.0" -integrationos-domain = { workspace = true, features = [ - "unified", - "metrics", - "axum-error", -] } +integrationos-domain = { workspace = true, features = [ "unified", "metrics", "axum-error", ] } jsonwebtoken.workspace = true moka.workspace = true mongodb.workspace = true @@ -43,6 +39,7 @@ strum.workspace = true tokio.workspace = true tower = { version = "0.4.13", features = ["filter"] } tower-http.workspace = true +tower_governor = "0.3.2" tracing-subscriber.workspace = true tracing.workspace = true validator.workspace = true diff --git a/api/src/config.rs b/api/src/config.rs index 496b2598..5089fb22 100644 --- a/api/src/config.rs +++ b/api/src/config.rs @@ -5,7 +5,7 @@ use std::{ use envconfig::Envconfig; use integrationos_domain::cache::CacheConfig as RedisConfig; -use integrationos_domain::common::{ +use integrationos_domain::{ claude::ClaudeConfig, database::DatabaseConfig, openai::OpenAiConfig, secrets::SecretsConfig, }; @@ -55,6 +55,11 @@ pub struct Config { default = "2thZ2UiOnsibmFtZSI6IlN0YXJ0dXBsa3NoamRma3NqZGhma3NqZGhma3NqZG5jhYtggfaP9ubmVjdGlvbnMiOjUwMDAwMCwibW9kdWxlcyI6NSwiZW5kcG9pbnRzIjo3b4e05e2-f050-401f-9822-44f43f71753c" )] pub jwt_secret: String, + #[envconfig(from = "BURST_RATE_LIMIT", default = "1")] + pub burst_rate_limit: u64, + /// Burst size limit + #[envconfig(from = "BURST_SIZE_LIMIT", default = "30")] + pub burst_size: u32, #[envconfig(from = "API_VERSION", default = "v1")] pub api_version: String, #[envconfig(from = "MOCK_LLM", default = "false")] diff --git a/api/src/endpoints/common_model.rs b/api/src/endpoints/common_model.rs index f663edbe..a1b3baab 100644 --- a/api/src/endpoints/common_model.rs +++ b/api/src/endpoints/common_model.rs @@ -12,12 +12,9 @@ use axum::{ use integrationos_domain::{ algebra::{MongoStore, StoreExt}, api_model_config::Lang, - common::{ - common_model::{CommonModel, Field}, - event_access::EventAccess, - json_schema::JsonSchema, - }, + common_model::{CommonModel, Field}, id::{prefix::IdPrefix, Id}, + json_schema::JsonSchema, IntegrationOSError, }; use mongodb::bson::doc; @@ -83,33 +80,28 @@ impl CrudHook for CreateRequest { impl CrudRequest for CreateRequest { type Output = CommonModel; - type Error = (); - fn into_public(self) -> Result { + fn output(&self) -> Option { let mut record = Self::Output { id: Id::now(IdPrefix::CommonModel), - name: self.name, - fields: self.fields, - sample: self.sample, - category: self.category, + name: self.name.clone(), + fields: self.fields.clone(), + sample: self.sample.clone(), + category: self.category.clone(), primary: self.primary, interface: Default::default(), record_metadata: Default::default(), }; - record.record_metadata.version = self.version; - Ok(record) + record.record_metadata.version = self.version.clone(); + Some(record) } - fn into_with_event_access(self, _event_access: Arc) -> Self::Output { - unimplemented!() - } - - fn update(self, record: &mut Self::Output) { - record.name = self.name; - record.record_metadata.version = self.version; - record.fields = self.fields; - record.category = self.category; - record.sample = self.sample; + fn update(&self, record: &mut Self::Output) { + record.name = self.name.clone(); + record.record_metadata.version = self.version.clone(); + record.fields = self.fields.clone(); + record.category = self.category.clone(); + record.sample = self.sample.clone(); } fn get_store(stores: AppStores) -> MongoStore { diff --git a/api/src/endpoints/connection.rs b/api/src/endpoints/connection.rs index a4c64383..5626744d 100644 --- a/api/src/endpoints/connection.rs +++ b/api/src/endpoints/connection.rs @@ -18,11 +18,12 @@ use convert_case::{Case, Casing}; use http::HeaderMap; use integrationos_domain::{ algebra::{MongoStore, StoreExt}, - common::{ - connection_definition::ConnectionDefinition, event_access::EventAccess, - record_metadata::RecordMetadata, settings::Settings, Connection, Throughput, - }, + connection_definition::ConnectionDefinition, + event_access::EventAccess, id::{prefix::IdPrefix, Id}, + record_metadata::RecordMetadata, + settings::Settings, + Connection, Throughput, }; use mongodb::bson::doc; use mongodb::bson::Regex; @@ -93,23 +94,10 @@ async fn test_connection( impl CrudRequest for CreateConnectionPayload { type Output = Connection; - type Error = (); - - fn into_with_event_access(self, _event_access: Arc) -> Self::Output { - unimplemented!() - } - - fn update(self, _record: &mut Self::Output) { - unimplemented!() - } fn get_store(stores: AppStores) -> MongoStore { stores.connection } - - fn into_public(self) -> Result { - unimplemented!() - } } pub async fn create_connection( diff --git a/api/src/endpoints/connection_definition.rs b/api/src/endpoints/connection_definition.rs index d6ae8b09..9087235e 100644 --- a/api/src/endpoints/connection_definition.rs +++ b/api/src/endpoints/connection_definition.rs @@ -1,5 +1,5 @@ use super::{ - create, delete, update, ApiResult, CachedRequest, CrudHook, CrudRequest, ReadResponse, + create, delete, update, ApiResult, CachedRequest, CrudHook, CrudRequest, ReadResponse, Unit, }; use crate::{ internal_server_error, not_found, @@ -12,19 +12,16 @@ use axum::{ }; use integrationos_domain::{ algebra::{MongoStore, StoreExt}, - common::{ - api_model_config::AuthMethod, - connection_definition::{ - AuthSecret, ConnectionDefinition, ConnectionDefinitionType, ConnectionForm, - FormDataItem, Frontend, Paths, Spec, - }, - event_access::EventAccess, - record_metadata::RecordMetadata, - settings::Settings, - }, + api_model_config::AuthMethod, connection_definition::ConnectionStatus, + connection_definition::{ + AuthSecret, ConnectionDefinition, ConnectionDefinitionType, ConnectionForm, FormDataItem, + Frontend, Paths, Spec, + }, connection_model_definition::{ConnectionModelDefinition, CrudAction}, id::{prefix::IdPrefix, Id}, + record_metadata::RecordMetadata, + settings::Settings, }; use moka::future::Cache; use mongodb::bson::doc; @@ -255,9 +252,8 @@ pub async fn public_get_connection_details( impl CrudRequest for CreateRequest { type Output = ConnectionDefinition; - type Error = (); - fn into_public(self) -> Result { + fn output(&self) -> Option { let auth_secrets: Vec = self .authentication .iter() @@ -287,48 +283,44 @@ impl CrudRequest for CreateRequest { let mut record = Self::Output { id: Id::now(IdPrefix::ConnectionDefinition), - platform_version: self.platform_version, + platform_version: self.platform_version.clone(), platform: self.platform.clone(), - status: self.status, - r#type: self.r#type, + status: self.status.clone(), + r#type: self.r#type.clone(), name: self.name.clone(), key, frontend: Frontend { spec: Spec { - title: self.name, - description: self.description, - platform: self.platform, - category: self.category, - image: self.image, - tags: self.tags, + title: self.name.clone(), + description: self.description.clone(), + platform: self.platform.clone(), + category: self.category.clone(), + image: self.image.clone(), + tags: self.tags.clone(), }, connection_form, }, test_connection: self.test_connection, auth_secrets, - auth_method: self.auth_method, - paths: self.paths, - settings: self.settings, + auth_method: self.auth_method.clone(), + paths: self.paths.clone(), + settings: self.settings.clone(), hidden: false, record_metadata: RecordMetadata::default(), }; record.record_metadata.active = self.active; - Ok(record) - } - - fn into_with_event_access(self, _event_access: Arc) -> Self::Output { - unimplemented!() + Some(record) } - fn update(self, record: &mut Self::Output) { - record.name = self.name; - record.frontend.spec.description = self.description; - record.frontend.spec.category = self.category; - record.frontend.spec.image = self.image; - record.frontend.spec.tags = self.tags; + fn update(&self, record: &mut Self::Output) -> Unit { + record.name = self.name.clone(); + record.frontend.spec.description = self.description.clone(); + record.frontend.spec.category = self.category.clone(); + record.frontend.spec.image = self.image.clone(); + record.frontend.spec.tags = self.tags.clone(); record.test_connection = self.test_connection; - record.platform = self.platform; + record.platform = self.platform.clone(); record.record_metadata.active = self.active; } diff --git a/api/src/endpoints/connection_model_definition.rs b/api/src/endpoints/connection_model_definition.rs index 5b89250d..bbf3cd1f 100644 --- a/api/src/endpoints/connection_model_definition.rs +++ b/api/src/endpoints/connection_model_definition.rs @@ -1,4 +1,4 @@ -use super::{create, delete, read, update, CrudHook, CrudRequest}; +use super::{create, delete, read, update, CrudHook, CrudRequest, Unit}; use crate::{ api_payloads::ErrorResponse, internal_server_error, not_found, @@ -16,16 +16,14 @@ use chrono::Utc; use http::HeaderMap; use integrationos_domain::{ algebra::{MongoStore, StoreExt}, - common::{ - api_model_config::{ - ApiModelConfig, AuthMethod, ModelPaths, ResponseBody, SamplesInput, SchemasInput, - }, - connection_model_definition::{ - ConnectionModelDefinition, CrudAction, CrudMapping, ExtractorConfig, PlatformInfo, - TestConnection, TestConnectionState, - }, - event_access::EventAccess, + api_model_config::{ + ApiModelConfig, AuthMethod, ModelPaths, ResponseBody, SamplesInput, SchemasInput, + }, + connection_model_definition::{ + ConnectionModelDefinition, CrudAction, CrudMapping, ExtractorConfig, PlatformInfo, + TestConnection, TestConnectionState, }, + event_access::EventAccess, get_secret_request::GetSecretRequest, id::{prefix::IdPrefix, Id}, }; @@ -304,9 +302,8 @@ impl CrudHook for CreateRequest {} impl CrudRequest for CreateRequest { type Output = ConnectionModelDefinition; - type Error = (); - fn into_public(self) -> Result { + fn output(&self) -> Option { let key = format!( "api::{}::{}::{}::{}::{}::{}", self.connection_platform, @@ -320,42 +317,38 @@ impl CrudRequest for CreateRequest { let mut record = Self::Output { id: Id::new(IdPrefix::ConnectionModelDefinition, Utc::now()), - connection_platform: self.connection_platform, + connection_platform: self.connection_platform.clone(), connection_definition_id: self.connection_definition_id, - platform_version: self.platform_version, + platform_version: self.platform_version.clone(), key, - title: self.title, - name: self.name, - model_name: self.model_name, + title: self.title.clone(), + name: self.name.clone(), + model_name: self.model_name.clone(), platform_info: PlatformInfo::Api(ApiModelConfig { - base_url: self.base_url, - path: self.path, + base_url: self.base_url.clone(), + path: self.path.clone(), content: Default::default(), - auth_method: self.auth_method, - headers: self.headers, - query_params: self.query_params, - schemas: self.schemas, - samples: self.samples, - responses: self.responses, - paths: self.paths, + auth_method: self.auth_method.clone(), + headers: self.headers.clone(), + query_params: self.query_params.clone(), + schemas: self.schemas.clone(), + samples: self.samples.clone(), + responses: self.responses.clone(), + paths: self.paths.clone(), }), - action: self.http_method, - action_name: self.action_name, - extractor_config: self.extractor_config, + action: self.http_method.clone(), + action_name: self.action_name.clone(), + extractor_config: self.extractor_config.clone(), test_connection_status: TestConnection::default(), is_default_crud_mapping: self.is_default_crud_mapping, - mapping: self.mapping, + mapping: self.mapping.clone(), record_metadata: Default::default(), }; - record.record_metadata.version = self.version; - Ok(record) - } - - fn into_with_event_access(self, _event_access: Arc) -> Self::Output { - unimplemented!() + record.record_metadata.version = self.version.clone(); + Some(record) } - fn update(self, record: &mut Self::Output) { + fn update(&self, record: &mut Self::Output) -> Unit { let key = format!( "api::{}::{}::{}::{}::{}::{}", self.connection_platform, @@ -368,28 +361,28 @@ impl CrudRequest for CreateRequest { .to_lowercase(); record.key = key; - record.connection_platform = self.connection_platform; + record.connection_platform = self.connection_platform.clone(); record.connection_definition_id = self.connection_definition_id; - record.platform_version = self.platform_version; - record.title = self.title; - record.name = self.name; - record.action = self.http_method; - record.action_name = self.action_name; + record.platform_version = self.platform_version.clone(); + record.title = self.title.clone(); + record.name = self.name.clone(); + record.action = self.http_method.clone(); + record.action_name = self.action_name.clone(); record.platform_info = PlatformInfo::Api(ApiModelConfig { - base_url: self.base_url, - path: self.path, + base_url: self.base_url.clone(), + path: self.path.clone(), content: Default::default(), - auth_method: self.auth_method, - headers: self.headers, - query_params: self.query_params, - schemas: self.schemas, - samples: self.samples, - responses: self.responses, - paths: self.paths, + auth_method: self.auth_method.clone(), + headers: self.headers.clone(), + query_params: self.query_params.clone(), + schemas: self.schemas.clone(), + samples: self.samples.clone(), + responses: self.responses.clone(), + paths: self.paths.clone(), }); - record.mapping = self.mapping; - record.extractor_config = self.extractor_config; - record.record_metadata.version = self.version; + record.mapping = self.mapping.clone(); + record.extractor_config = self.extractor_config.clone(); + record.record_metadata.version = self.version.clone(); } fn get_store(stores: AppStores) -> MongoStore { diff --git a/api/src/endpoints/connection_model_schema.rs b/api/src/endpoints/connection_model_schema.rs index aaf71a75..35dd5410 100644 --- a/api/src/endpoints/connection_model_schema.rs +++ b/api/src/endpoints/connection_model_schema.rs @@ -15,14 +15,12 @@ use axum::{ use http::StatusCode; use integrationos_domain::{ algebra::{MongoStore, StoreExt}, - common::{ - connection_model_schema::{ - ConnectionModelSchema, Mappings, PublicConnectionModelSchema, SchemaPaths, - }, - event_access::EventAccess, - json_schema::JsonSchema, + connection_model_schema::{ + ConnectionModelSchema, Mappings, PublicConnectionModelSchema, SchemaPaths, }, + event_access::EventAccess, id::{prefix::IdPrefix, Id}, + json_schema::JsonSchema, }; use mongodb::bson::doc; use serde::{de::DeserializeOwned, Deserialize, Serialize}; @@ -146,19 +144,6 @@ pub async fn public_get_platform_models( impl CrudRequest for PublicGetConnectionModelSchema { type Output = PublicConnectionModelSchema; - type Error = (); - - fn into_public(self) -> Result { - unimplemented!() - } - - fn into_with_event_access(self, _event_access: Arc) -> Self::Output { - unimplemented!() - } - - fn update(self, _record: &mut Self::Output) { - unimplemented!() - } fn get_store(stores: AppStores) -> MongoStore { stores.public_model_schema.clone() @@ -186,47 +171,42 @@ impl CrudHook for CreateRequest {} impl CrudRequest for CreateRequest { type Output = ConnectionModelSchema; - type Error = (); - fn into_public(self) -> Result { + fn output(&self) -> Option { let key = format!( "api::{}::{}::{}", self.connection_platform, self.platform_version, self.model_name ) .to_lowercase(); - Ok(Self::Output { + Some(Self::Output { id: Id::now(IdPrefix::ConnectionModelSchema), platform_id: self.platform_id, platform_page_id: self.platform_page_id, - connection_platform: self.connection_platform, + connection_platform: self.connection_platform.clone(), connection_definition_id: self.connection_definition_id, - platform_version: self.platform_version, + platform_version: self.platform_version.clone(), key, - model_name: self.model_name, - schema: self.schema, - mapping: self.mapping, - sample: self.sample, - paths: self.paths, + model_name: self.model_name.clone(), + schema: self.schema.clone(), + mapping: self.mapping.clone(), + sample: self.sample.clone(), + paths: self.paths.clone(), record_metadata: Default::default(), }) } - fn into_with_event_access(self, _event_access: Arc) -> Self::Output { - unimplemented!() - } - - fn update(self, record: &mut Self::Output) { + fn update(&self, record: &mut Self::Output) { record.platform_id = self.platform_id; record.platform_page_id = self.platform_page_id; - record.connection_platform = self.connection_platform; + record.connection_platform = self.connection_platform.clone(); record.connection_definition_id = self.connection_definition_id; - record.platform_version = self.platform_version; - record.model_name = self.model_name; - record.schema = self.schema; - record.sample = self.sample; - record.paths = self.paths; - record.mapping = self.mapping; + record.platform_version = self.platform_version.clone(); + record.model_name = self.model_name.clone(); + record.schema = self.schema.clone(); + record.sample = self.sample.clone(); + record.paths = self.paths.clone(); + record.mapping = self.mapping.clone(); } fn get_store(stores: AppStores) -> MongoStore { diff --git a/api/src/endpoints/connection_oauth_definition.rs b/api/src/endpoints/connection_oauth_definition.rs index a7db6aa2..8a82edc9 100644 --- a/api/src/endpoints/connection_oauth_definition.rs +++ b/api/src/endpoints/connection_oauth_definition.rs @@ -1,4 +1,6 @@ -use super::{create, delete, read, update, CachedRequest, CrudHook, CrudRequest, ReadResponse}; +use super::{ + create, delete, read, update, CachedRequest, CrudHook, CrudRequest, ReadResponse, Unit, +}; use crate::server::{AppState, AppStores}; use axum::{ routing::{patch, post}, @@ -7,12 +9,9 @@ use axum::{ use chrono::Utc; use integrationos_domain::{ algebra::MongoStore, - common::{ - api_model_config::{ApiModelConfig, Compute, Function, Lang}, - connection_oauth_definition::{ - ComputeRequest, ConnectionOAuthDefinition, Frontend, OAuthApiConfig, OAuthCompute, - }, - event_access::EventAccess, + api_model_config::{ApiModelConfig, Compute, Function, Lang}, + connection_oauth_definition::{ + ComputeRequest, ConnectionOAuthDefinition, Frontend, OAuthApiConfig, OAuthCompute, }, id::{prefix::IdPrefix, Id}, record_metadata::RecordMetadata, @@ -47,6 +46,7 @@ pub struct CreateRequest { pub separator: Option, pub init: RequestParams, pub refresh: RequestParams, + pub is_full_template_enabled: bool, } impl CrudHook for CreateRequest {} @@ -66,102 +66,119 @@ fn default_separator() -> Option { impl CrudRequest for CreateRequest { type Output = ConnectionOAuthDefinition; - type Error = (); - fn into_public(self) -> Result { - Ok(Self::Output { + fn output(&self) -> Option { + Some(Self::Output { id: Id::new(IdPrefix::ConnectionOAuthDefinition, Utc::now()), - connection_platform: self.connection_platform, + connection_platform: self.connection_platform.clone(), configuration: OAuthApiConfig { - init: self.init.configuration, - refresh: self.refresh.configuration, + init: self.init.configuration.clone(), + refresh: self.refresh.configuration.clone(), }, + is_full_template_enabled: self.is_full_template_enabled, compute: OAuthCompute { init: ComputeRequest { response: Function(Compute { entry: "compute".to_string(), - function: self.init.response_compute, + function: self.init.response_compute.clone(), language: Lang::JavaScript, }), - computation: self.init.compute.map(|compute| { - Function(Compute { - entry: "compute".to_string(), - function: compute, - language: Lang::JavaScript, + computation: self + .init + .compute + .iter() + .map(|compute| { + Function(Compute { + entry: "compute".to_string(), + function: compute.clone(), + language: Lang::JavaScript, + }) }) - }), + .next(), }, refresh: ComputeRequest { - computation: self.refresh.compute.map(|compute| { - Function(Compute { - entry: "compute".to_string(), - function: compute, - language: Lang::JavaScript, + computation: self + .refresh + .compute + .iter() + .map(|compute| { + Function(Compute { + entry: "compute".to_string(), + function: compute.clone(), + language: Lang::JavaScript, + }) }) - }), + .next(), response: Function(Compute { entry: "compute".to_string(), - function: self.refresh.response_compute, + function: self.refresh.response_compute.clone(), language: Lang::JavaScript, }), }, }, frontend: Frontend { - platform_redirect_uri: self.platform_redirect_uri, - ios_redirect_uri: self.ios_redirect_uri, - scopes: self.scopes, - separator: self.separator, + platform_redirect_uri: self.platform_redirect_uri.clone(), + ios_redirect_uri: self.ios_redirect_uri.clone(), + scopes: self.scopes.clone(), + separator: self.separator.clone(), }, record_metadata: Default::default(), hooks: Default::default(), }) } - fn into_with_event_access(self, _event_access: Arc) -> Self::Output { - unimplemented!() - } - - fn update(self, record: &mut Self::Output) { - record.connection_platform = self.connection_platform; + fn update(&self, record: &mut Self::Output) -> Unit { + record.connection_platform = self.connection_platform.clone(); record.configuration = OAuthApiConfig { - init: self.init.configuration, - refresh: self.refresh.configuration, + init: self.init.configuration.clone(), + refresh: self.refresh.configuration.clone(), }; + record.is_full_template_enabled = self.is_full_template_enabled; record.compute = OAuthCompute { init: ComputeRequest { - computation: self.init.compute.map(|compute| { - Function(Compute { - entry: "compute".to_string(), - function: compute, - language: Lang::JavaScript, + computation: self + .init + .compute + .iter() + .map(|compute| { + Function(Compute { + entry: "compute".to_string(), + function: compute.clone(), + language: Lang::JavaScript, + }) }) - }), + .next(), response: Function(Compute { entry: "compute".to_string(), - function: self.init.response_compute, + function: self.init.response_compute.clone(), language: Lang::JavaScript, }), }, refresh: ComputeRequest { response: Function(Compute { entry: "compute".to_string(), - function: self.refresh.response_compute, + function: self.refresh.response_compute.clone(), language: Lang::JavaScript, }), - computation: self.refresh.compute.map(|compute| { - Function(Compute { - entry: "compute".to_string(), - function: compute, - language: Lang::JavaScript, + computation: self + .refresh + .compute + .iter() + .map(|compute| { + Function(Compute { + entry: "compute".to_string(), + function: compute.clone(), + language: Lang::JavaScript, + }) }) - }), + .next(), }, }; record.frontend = Frontend { - platform_redirect_uri: self.platform_redirect_uri, - ios_redirect_uri: self.ios_redirect_uri, - scopes: self.scopes, - separator: self.separator, + platform_redirect_uri: self.platform_redirect_uri.clone(), + ios_redirect_uri: self.ios_redirect_uri.clone(), + scopes: self.scopes.clone(), + separator: self.separator.clone(), }; record.record_metadata.updated_at = Utc::now().timestamp_millis(); record.record_metadata.updated = true; @@ -185,19 +202,6 @@ pub struct FrontendOauthConnectionDefinition { impl CrudRequest for FrontendOauthConnectionDefinition { type Output = FrontendOauthConnectionDefinition; - type Error = (); - - fn into_public(self) -> Result { - unimplemented!() - } - - fn into_with_event_access(self, _: Arc) -> Self::Output { - unimplemented!() - } - - fn update(self, _: &mut Self::Output) { - unimplemented!() - } fn get_store(stores: AppStores) -> MongoStore { stores.frontend_oauth_config.clone() diff --git a/api/src/endpoints/event_access.rs b/api/src/endpoints/event_access.rs index fef274b1..77b34ed1 100644 --- a/api/src/endpoints/event_access.rs +++ b/api/src/endpoints/event_access.rs @@ -14,19 +14,17 @@ use axum::{ Extension, Json, Router, }; use integrationos_domain::{ + access_key_data::AccessKeyData, + access_key_prefix::AccessKeyPrefix, algebra::{MongoStore, StoreExt}, - common::{ - access_key_data::AccessKeyData, - access_key_prefix::AccessKeyPrefix, - connection_definition::{ConnectionDefinitionType, Paths}, - environment::Environment, - event_access::EventAccess, - event_type::EventType, - ownership::Ownership, - record_metadata::RecordMetadata, - AccessKey, - }, + connection_definition::{ConnectionDefinitionType, Paths}, + environment::Environment, + event_access::EventAccess, + event_type::EventType, id::{prefix::IdPrefix, Id}, + ownership::Ownership, + record_metadata::RecordMetadata, + AccessKey, }; use mongodb::bson::doc; use rand::Rng; @@ -61,23 +59,10 @@ pub struct CreateEventAccessRequest { impl CrudRequest for CreateEventAccessRequest { type Output = EventAccess; - type Error = (); - - fn into_with_event_access(self, _event_access: Arc) -> Self::Output { - unimplemented!() - } - - fn update(self, _record: &mut Self::Output) { - unimplemented!() - } fn get_store(stores: AppStores) -> MongoStore { stores.event_access } - - fn into_public(self) -> Result { - unimplemented!() - } } #[derive(Debug, Clone, PartialEq, Deserialize, Serialize, Validate)] diff --git a/api/src/endpoints/events.rs b/api/src/endpoints/events.rs index 6d3c8213..59c2e7a5 100644 --- a/api/src/endpoints/events.rs +++ b/api/src/endpoints/events.rs @@ -2,10 +2,7 @@ use super::{read, CrudRequest}; use crate::server::{AppState, AppStores}; use axum::{routing::get, Router}; use bson::doc; -use integrationos_domain::{ - algebra::MongoStore, - common::{event_access::EventAccess, Event}, -}; +use integrationos_domain::{algebra::MongoStore, Event}; use serde::{Deserialize, Serialize}; use std::sync::Arc; @@ -18,21 +15,8 @@ pub struct CreateEventRequest; impl CrudRequest for CreateEventRequest { type Output = Event; - type Error = (); - - fn into_with_event_access(self, _event_access: Arc) -> Self::Output { - unimplemented!() - } - - fn update(self, _record: &mut Self::Output) { - unimplemented!() - } fn get_store(stores: AppStores) -> MongoStore { stores.event } - - fn into_public(self) -> anyhow::Result { - unimplemented!() - } } diff --git a/api/src/endpoints/mod.rs b/api/src/endpoints/mod.rs index ff09aa28..051a59c5 100644 --- a/api/src/endpoints/mod.rs +++ b/api/src/endpoints/mod.rs @@ -14,8 +14,8 @@ use bson::{doc, SerializerOptions}; use http::{HeaderMap, HeaderValue, StatusCode}; use integrationos_domain::{ algebra::{MongoStore, StoreExt}, - common::{event_access::EventAccess, Connection}, - ApplicationError, IntegrationOSError, InternalError, OAuth, Store, + event_access::EventAccess, + ApplicationError, Connection, IntegrationOSError, InternalError, OAuth, Store, }; use moka::future::Cache; use mongodb::options::FindOneOptions; @@ -43,13 +43,28 @@ pub mod unified; const INTEGRATION_OS_PASSTHROUGH_HEADER: &str = "x-integrationos-passthrough"; +pub type Unit = (); pub trait CrudRequest: Sized { type Output: Serialize + DeserializeOwned + Unpin + Sync + Send + 'static; - type Error: Serialize + DeserializeOwned + Unpin + Sync + Send + 'static + std::fmt::Debug; - fn into_with_event_access(self, event_access: Arc) -> Self::Output; - fn into_public(self) -> Result; - fn update(self, record: &mut Self::Output); + /// Generate the output of the request based on the input and the event access. + /// + /// @param self + /// @param event_access + /// @return Option + fn event_access(&self, _: Arc) -> Option { + None + } + + /// Generate the output of the request based on the input. + /// @param self + /// @return Result, Self::Error> + fn output(&self) -> Option { + None + } + + /// + fn update(&self, _: &mut Self::Output) -> Unit {} fn get_store(stores: AppStores) -> MongoStore; } @@ -106,12 +121,14 @@ where U: Serialize + DeserializeOwned + Unpin + Sync + Send + 'static, { let output = if let Some(Extension(event_access)) = event_access { - req.into_with_event_access(event_access) + req.event_access(event_access) } else { - req.into_public().map_err(|e| { - error!("Error creating object: {:?}", e); - internal_server_error!() - })? + req.output() + }; + + let output = match output { + Some(output) => output, + None => return Err(not_found!("Record")), }; match T::get_store(state.app_stores.clone()) @@ -356,6 +373,7 @@ where struct SparseConnection { oauth: OAuth, } + async fn get_connection( access: &EventAccess, connection_id: &HeaderValue, diff --git a/api/src/endpoints/oauth.rs b/api/src/endpoints/oauth.rs index d5234835..e0535d4c 100644 --- a/api/src/endpoints/oauth.rs +++ b/api/src/endpoints/oauth.rs @@ -9,7 +9,10 @@ use chrono::{Duration, Utc}; use http::{HeaderMap, HeaderName, HeaderValue}; use integrationos_domain::{ algebra::{MongoStore, StoreExt}, - common::{ + get_secret_request::GetSecretRequest, + id::{prefix::IdPrefix, Id}, + oauth_secret::OAuthSecret, + { api_model_config::ContentType, connection_definition::ConnectionDefinition, connection_oauth_definition::{ @@ -19,9 +22,6 @@ use integrationos_domain::{ ownership::Ownership, Connection, OAuth, Throughput, }, - get_secret_request::GetSecretRequest, - id::{prefix::IdPrefix, Id}, - oauth_secret::OAuthSecret, }; use mongodb::bson::doc; use reqwest::Request; diff --git a/api/src/endpoints/passthrough.rs b/api/src/endpoints/passthrough.rs index 68955fd9..463a7603 100644 --- a/api/src/endpoints/passthrough.rs +++ b/api/src/endpoints/passthrough.rs @@ -9,11 +9,11 @@ use axum::{ use http::{header::CONTENT_LENGTH, HeaderMap, HeaderName, Method, Uri}; use hyper::body::Bytes; use integrationos_domain::{ - common::{ + ApplicationError, InternalError, + { destination::{Action, Destination}, event_access::EventAccess, }, - ApplicationError, InternalError, }; use std::{collections::HashMap, sync::Arc}; use tracing::error; diff --git a/api/src/endpoints/pipeline.rs b/api/src/endpoints/pipeline.rs index 11aad767..c802ba5b 100644 --- a/api/src/endpoints/pipeline.rs +++ b/api/src/endpoints/pipeline.rs @@ -4,12 +4,15 @@ use axum::{routing::post, Router}; use bson::doc; use integrationos_domain::{ algebra::MongoStore, - common::{ - configuration::pipeline::PipelineConfig, destination::Destination, - event_access::EventAccess, middleware::Middleware, record_metadata::RecordMetadata, - signature::Signature, source::Source, Pipeline, - }, + configuration::pipeline::PipelineConfig, + destination::Destination, + event_access::EventAccess, id::{prefix::IdPrefix, Id}, + middleware::Middleware, + record_metadata::RecordMetadata, + signature::Signature, + source::Source, + Pipeline, }; use serde::{Deserialize, Serialize}; use std::sync::Arc; @@ -44,29 +47,24 @@ impl CrudHook for CreatePipelineRequest {} impl CrudRequest for CreatePipelineRequest { type Output = Pipeline; - type Error = (); - fn into_public(self) -> anyhow::Result { - unimplemented!() - } - - fn into_with_event_access(self, event_access: Arc) -> Self::Output { - Self::Output { + fn event_access(&self, event_access: Arc) -> Option { + Some(Self::Output { id: Id::now(IdPrefix::Pipeline).to_string(), environment: event_access.environment, - name: self.name, - key: self.key, - source: self.source, - destination: self.destination, - middleware: self.middleware, + name: self.name.clone(), + key: self.key.clone(), + source: self.source.clone(), + destination: self.destination.clone(), + middleware: self.middleware.clone(), ownership: event_access.ownership.clone(), - signature: self.signature, - config: Some(self.config), + signature: self.signature.clone(), + config: Some(self.config.clone()), record_metadata: RecordMetadata::default(), - } + }) } - fn update(self, record: &mut Self::Output) { + fn update(&self, record: &mut Self::Output) { let CreatePipelineRequest { name, key, @@ -77,13 +75,13 @@ impl CrudRequest for CreatePipelineRequest { config, } = self; - record.name = name; - record.key = key; - record.source = source; - record.destination = destination; - record.middleware = middleware; - record.signature = signature; - record.config = Some(config); + record.name = name.into(); + record.key = key.into(); + record.source = source.clone(); + record.destination = destination.clone(); + record.middleware = middleware.clone(); + record.signature = signature.clone(); + record.config = Some(config.clone()); record.record_metadata.mark_updated(&record.ownership.id); } diff --git a/api/src/endpoints/transactions.rs b/api/src/endpoints/transactions.rs index f7978a5b..c67b10a2 100644 --- a/api/src/endpoints/transactions.rs +++ b/api/src/endpoints/transactions.rs @@ -2,10 +2,7 @@ use super::{read, CrudRequest}; use crate::server::{AppState, AppStores}; use axum::{routing::get, Router}; use bson::doc; -use integrationos_domain::{ - algebra::MongoStore, - common::{event_access::EventAccess, Transaction}, -}; +use integrationos_domain::{algebra::MongoStore, Transaction}; use serde::{Deserialize, Serialize}; use std::sync::Arc; @@ -18,19 +15,6 @@ pub struct TransactionCrud; impl CrudRequest for TransactionCrud { type Output = Transaction; - type Error = (); - - fn into_public(self) -> anyhow::Result { - unimplemented!() - } - - fn into_with_event_access(self, _event_access: Arc) -> Self::Output { - unimplemented!() - } - - fn update(self, _record: &mut Self::Output) { - unimplemented!() - } fn get_store(stores: AppStores) -> MongoStore { stores.transactions diff --git a/api/src/endpoints/unified.rs b/api/src/endpoints/unified.rs index 5ff93402..bb4030d2 100644 --- a/api/src/endpoints/unified.rs +++ b/api/src/endpoints/unified.rs @@ -9,12 +9,12 @@ use bson::doc; use convert_case::{Case, Casing}; use http::{HeaderMap, HeaderName}; use integrationos_domain::{ - common::{ + ApplicationError, InternalError, + { connection_model_definition::CrudAction, destination::Action, encrypted_access_key::EncryptedAccessKey, encrypted_data::PASSWORD_LENGTH, event_access::EventAccess, AccessKey, Event, }, - ApplicationError, InternalError, }; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; diff --git a/api/src/main.rs b/api/src/main.rs index c1e8fe82..097860d5 100644 --- a/api/src/main.rs +++ b/api/src/main.rs @@ -1,24 +1,23 @@ -use std::sync::Arc; - use anyhow::Result; use api::{config::Config, server::Server}; use dotenvy::dotenv; use envconfig::Envconfig; -use integrationos_domain::service::secrets_client::SecretsClient; +use integrationos_domain::client::secrets_client::SecretsClient; +use integrationos_domain::telemetry::{get_subscriber, init_subscriber}; +use std::sync::Arc; use tracing::info; -use tracing_subscriber::filter::LevelFilter; -use tracing_subscriber::EnvFilter; fn main() -> Result<()> { dotenv().ok(); - - let filter = EnvFilter::builder() - .with_default_directive(LevelFilter::INFO.into()) - .from_env_lossy(); - - tracing_subscriber::fmt().with_env_filter(filter).init(); - let config = Config::init_from_env()?; + let name = if config.is_admin { + "admin-api" + } else { + "event-api" + }; + + let subscriber = get_subscriber(name.into(), "info".into(), std::io::stdout); + init_subscriber(subscriber); info!("Starting API with config:\n{config}"); diff --git a/api/src/middleware/auth.rs b/api/src/middleware/auth.rs index dc8c4f8e..ffa8ea63 100644 --- a/api/src/middleware/auth.rs +++ b/api/src/middleware/auth.rs @@ -1,17 +1,17 @@ use crate::{ endpoints::ApiError, internal_server_error, not_found, server::AppState, unauthorized, }; -use axum::{extract::State, middleware::Next, response::Response}; +use axum::{body::Body, extract::State, middleware::Next, response::Response}; use http::Request; use integrationos_domain::{algebra::StoreExt, ApplicationError, InternalError}; use mongodb::bson::doc; use std::sync::Arc; use tracing::error; -pub async fn auth( +pub async fn auth( State(state): State>, - mut req: Request, - next: Next, + mut req: Request, + next: Next, ) -> Result { let Some(auth_header) = req.headers().get(&state.config.headers.auth_header) else { return Err(unauthorized!()); diff --git a/api/src/middleware/extractor.rs b/api/src/middleware/extractor.rs new file mode 100644 index 00000000..91c819f9 --- /dev/null +++ b/api/src/middleware/extractor.rs @@ -0,0 +1,21 @@ +use anyhow::Result; +use integrationos_domain::event_access::EventAccess; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use tower_governor::{errors::GovernorError, key_extractor::KeyExtractor}; + +#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] +pub struct OwnershipId; + +impl KeyExtractor for OwnershipId { + type Key = String; + + fn extract(&self, req: &http::request::Request) -> Result { + let event_access = req + .extensions() + .get::>() + .ok_or_else(|| GovernorError::UnableToExtractKey)?; + + Ok(event_access.ownership.id.to_string()) + } +} diff --git a/api/src/middleware/jwt_auth.rs b/api/src/middleware/jwt_auth.rs index 91081773..353edb8c 100644 --- a/api/src/middleware/jwt_auth.rs +++ b/api/src/middleware/jwt_auth.rs @@ -1,31 +1,11 @@ use crate::{api_payloads::ErrorResponse, server::AppState, unauthorized}; -use axum::{extract::State, middleware::Next, response::Response, Json}; +use axum::{body::Body, extract::State, middleware::Next, response::Response, Json}; use http::{Request, StatusCode}; +use integrationos_domain::Claims; use jsonwebtoken::{DecodingKey, Validation}; -use serde::{Deserialize, Serialize}; use std::sync::Arc; use tracing::info; -#[derive(Debug, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct Claims { - #[serde(rename = "_id")] - pub id: String, - pub email: String, - pub username: String, - pub user_key: String, - pub first_name: String, - pub last_name: String, - pub buildable_id: String, - pub container_id: String, - pub pointers: Vec, - pub is_buildable_core: bool, - pub iat: i64, - pub exp: i64, - pub aud: String, - pub iss: String, -} - const BEARER_PREFIX: &str = "Bearer "; #[derive(Clone)] @@ -43,10 +23,10 @@ impl JwtState { } } -pub async fn jwt_auth( +pub async fn jwt_auth( State(state): State>, - mut req: Request, - next: Next, + mut req: Request, + next: Next, ) -> Result)> { let Some(auth_header) = req.headers().get(http::header::AUTHORIZATION) else { info!("missing authorization header"); @@ -67,7 +47,7 @@ pub async fn jwt_auth( match jsonwebtoken::decode::(token, &state.decoding_key, &state.validation) { Ok(decoded_token) => { - req.extensions_mut().insert(decoded_token.claims); + req.extensions_mut().insert(Arc::new(decoded_token.claims)); Ok(next.run(req).await) } Err(e) => { diff --git a/api/src/middleware/mod.rs b/api/src/middleware/mod.rs index 627fafb6..2c3b3216 100644 --- a/api/src/middleware/mod.rs +++ b/api/src/middleware/mod.rs @@ -1,7 +1,7 @@ pub mod auth; pub mod blocker; +pub mod extractor; pub mod jwt_auth; -pub mod rate_limiter; pub use auth::auth; pub use jwt_auth::jwt_auth; diff --git a/api/src/middleware/rate_limiter.rs b/api/src/middleware/rate_limiter.rs deleted file mode 100644 index 5606a026..00000000 --- a/api/src/middleware/rate_limiter.rs +++ /dev/null @@ -1,122 +0,0 @@ -use crate::{metrics::Metric, server::AppState, too_many_requests}; -use anyhow::{Context, Result}; -use axum::{ - extract::State, - middleware::Next, - response::{IntoResponse, Response}, - Extension, -}; -use http::{HeaderName, Request}; -use integrationos_domain::{algebra::RedisCache, event_access::EventAccess}; -use redis::AsyncCommands; -use std::sync::Arc; -use tokio::sync::{mpsc, oneshot}; -use tracing::warn; - -#[derive(Debug, Clone)] -pub struct RateLimiter { - tx: mpsc::Sender<(Arc, oneshot::Sender)>, - key_header_name: HeaderName, - limit_header_name: HeaderName, - remaining_header_name: HeaderName, - reset_header_name: HeaderName, - metric_tx: mpsc::Sender, -} - -impl RateLimiter { - pub async fn new(state: Arc) -> Result { - let mut redis = RedisCache::new(&state.config.redis_config, 0) - .await - .with_context(|| "Could not connect to redis")?; - - let (tx, mut rx) = mpsc::channel::<(Arc, oneshot::Sender)>(1024); - - let throughput_key = state.config.redis_config.api_throughput_key.clone(); - - tokio::spawn(async move { - while let Some((id, tx)) = rx.recv().await { - let count: u64 = redis - .hincr(&throughput_key, id.as_ref(), 1) - .await - .unwrap_or_default(); - let _ = tx.send(count); - } - }); - - let key_header_name = - HeaderName::from_lowercase(state.config.headers.connection_header.as_bytes()).unwrap(); - - let limit_header_name = - HeaderName::from_lowercase(state.config.headers.rate_limit_limit.as_bytes()).unwrap(); - - let remaining_header_name = - HeaderName::from_lowercase(state.config.headers.rate_limit_remaining.as_bytes()) - .unwrap(); - - let reset_header_name = - HeaderName::from_lowercase(state.config.headers.rate_limit_reset.as_bytes()).unwrap(); - - Ok(RateLimiter { - tx, - metric_tx: state.metric_tx.clone(), - key_header_name, - limit_header_name, - remaining_header_name, - reset_header_name, - }) - } - - pub async fn get_request_count(&self, id: Arc) -> u64 { - let (tx, rx) = oneshot::channel(); - match self.tx.send((id, tx)).await { - Ok(()) => rx.await.unwrap_or_default(), - Err(e) => { - warn!("Could not send to redis task: {e}"); - 0 - } - } - } -} - -pub async fn rate_limiter( - Extension(event_access): Extension>, - State(state): State>, - req: Request, - next: Next, -) -> Result { - let throughput = event_access.throughput; - - let count = state - .get_request_count(event_access.ownership.id.clone()) - .await; - - if count >= throughput { - let _ = state - .metric_tx - .send(Metric::rate_limited( - event_access.clone(), - req.headers().get(&state.key_header_name).cloned(), - )) - .await; - let mut res = too_many_requests!().into_response(); - - let headers = res.headers_mut(); - - headers.insert(state.limit_header_name.clone(), throughput.into()); - headers.insert(state.remaining_header_name.clone(), 0.into()); - headers.insert(state.reset_header_name.clone(), 60.into()); - - Err(res) - } else { - let mut res = next.run(req).await; - let headers = res.headers_mut(); - - headers.insert(state.limit_header_name.clone(), throughput.into()); - headers.insert( - state.remaining_header_name.clone(), - (throughput - count).into(), - ); - headers.insert(state.reset_header_name.clone(), 60.into()); - Ok(res) - } -} diff --git a/api/src/routes/protected.rs b/api/src/routes/protected.rs index 7df300b7..e833dd25 100644 --- a/api/src/routes/protected.rs +++ b/api/src/routes/protected.rs @@ -1,16 +1,3 @@ -use std::{iter::once, sync::Arc}; - -use axum::{ - error_handling::HandleErrorLayer, - routing::{get, post}, - Router, -}; -use http::HeaderName; -use integrationos_domain::common::connection_model_schema::PublicConnectionModelSchema; -use tower::{filter::FilterLayer, ServiceBuilder}; -use tower_http::{sensitive_headers::SetSensitiveRequestHeadersLayer, trace::TraceLayer}; -use tracing::warn; - use crate::{ endpoints::{ connection, @@ -21,13 +8,24 @@ use crate::{ middleware::{ auth, blocker::{handle_blocked_error, BlockInvalidHeaders}, - rate_limiter::{self, RateLimiter}, + extractor::OwnershipId, }, server::AppState, }; +use axum::{ + error_handling::HandleErrorLayer, + routing::{get, post}, + Router, +}; +use http::HeaderName; +use integrationos_domain::connection_model_schema::PublicConnectionModelSchema; +use std::{iter::once, sync::Arc}; +use tower::{filter::FilterLayer, ServiceBuilder}; +use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer}; +use tower_http::{sensitive_headers::SetSensitiveRequestHeadersLayer, trace::TraceLayer}; pub async fn get_router(state: &Arc) -> Router> { - let r = Router::new() + let routes = Router::new() .nest("/pipelines", pipeline::get_router()) .nest("/events", events::get_router()) .nest("/transactions", transactions::get_router()) @@ -48,30 +46,33 @@ pub async fn get_router(state: &Arc) -> Router> { .nest("/oauth", oauth::get_router()) .nest("/unified", unified::get_router()); - let r = match RateLimiter::new(state.clone()).await { - Ok(rate_limiter) => r.layer(axum::middleware::from_fn_with_state( - Arc::new(rate_limiter), - rate_limiter::rate_limiter, - )), - Err(e) => { - warn!("Could not connect to redis: {e}"); - r - } - }; + let config = Box::new( + GovernorConfigBuilder::default() + .per_second(state.config.burst_rate_limit) + .burst_size(state.config.burst_size) + .key_extractor(OwnershipId) + .use_headers() + .finish() + .expect("Failed to build GovernorConfig"), + ); - r.layer(axum::middleware::from_fn_with_state( - state.clone(), - auth::auth, - )) - .layer(TraceLayer::new_for_http()) - .layer(SetSensitiveRequestHeadersLayer::new(once( - HeaderName::from_lowercase(state.config.headers.auth_header.as_bytes()).unwrap(), - ))) - .layer( - ServiceBuilder::new() - .layer(HandleErrorLayer::new(handle_blocked_error)) - .layer(FilterLayer::new( - BlockInvalidHeaders::new(state.clone()).await, - )), - ) + routes + .layer(GovernorLayer { + config: Box::leak(config), + }) + .layer(axum::middleware::from_fn_with_state( + state.clone(), + auth::auth, + )) + .layer(TraceLayer::new_for_http()) + .layer(SetSensitiveRequestHeadersLayer::new(once( + HeaderName::from_lowercase(state.config.headers.auth_header.as_bytes()).unwrap(), + ))) + .layer( + ServiceBuilder::new() + .layer(HandleErrorLayer::new(handle_blocked_error)) + .layer(FilterLayer::new( + BlockInvalidHeaders::new(state.clone()).await, + )), + ) } diff --git a/api/src/server.rs b/api/src/server.rs index 045d076a..40b54dd1 100644 --- a/api/src/server.rs +++ b/api/src/server.rs @@ -11,27 +11,23 @@ use anyhow::{anyhow, Context, Result}; use axum::Router; use http::HeaderValue; use integrationos_domain::{ - algebra::{CryptoExt, MongoStore}, - common::{ - common_model::CommonModel, - connection_definition::ConnectionDefinition, - connection_model_definition::ConnectionModelDefinition, - connection_model_schema::{ConnectionModelSchema, PublicConnectionModelSchema}, - connection_oauth_definition::{ConnectionOAuthDefinition, Settings}, - cursor::Cursor, - event_access::EventAccess, - stage::Stage, - Connection, Event, Pipeline, Store, Transaction, - }, - common_model::CommonEnum, - connection_definition::PublicConnectionDetails, - service::unified_destination::UnifiedDestination, + algebra::{CryptoExt, DefaultTemplate, MongoStore}, + client::unified_destination_client::UnifiedDestination, + common_model::{CommonEnum, CommonModel}, + connection_definition::{ConnectionDefinition, PublicConnectionDetails}, + connection_model_definition::ConnectionModelDefinition, + connection_model_schema::{ConnectionModelSchema, PublicConnectionModelSchema}, + connection_oauth_definition::{ConnectionOAuthDefinition, Settings}, + cursor::Cursor, + event_access::EventAccess, + stage::Stage, + Connection, Event, Pipeline, Store, Transaction, }; use moka::future::Cache; use mongodb::{options::UpdateOptions, Client, Database}; use segment::{AutoBatcher, Batcher, HttpClient}; use std::{collections::BTreeMap, sync::Arc, time::Duration}; -use tokio::{sync::mpsc::Sender, time::timeout, try_join}; +use tokio::{net::TcpListener, sync::mpsc::Sender, time::timeout, try_join}; use tracing::{error, info, trace, warn}; #[derive(Clone)] @@ -76,6 +72,7 @@ pub struct AppState { pub extractor_caller: UnifiedDestination, pub event_tx: Sender, pub metric_tx: Sender, + pub template: DefaultTemplate, } #[derive(Clone)] @@ -202,6 +199,7 @@ impl Server { // Update metrics in separate thread let client = HttpClient::default(); let batcher = Batcher::new(None); + let template = DefaultTemplate::default(); let mut batcher = config .segment_write_key .as_ref() @@ -278,6 +276,7 @@ impl Server { extractor_caller, event_tx, metric_tx, + template, }), }) } @@ -289,8 +288,9 @@ impl Server { info!("Api server listening on {}", self.state.config.address); - axum::Server::bind(&self.state.config.address) - .serve(app.into_make_service()) + let tcp_listener = TcpListener::bind(&self.state.config.address).await?; + + axum::serve(tcp_listener, app.into_make_service()) .await .map_err(|e| anyhow!("Server error: {}", e)) } diff --git a/api/src/util/openapi_builder.rs b/api/src/util/openapi_builder.rs index a693e728..67c741bd 100644 --- a/api/src/util/openapi_builder.rs +++ b/api/src/util/openapi_builder.rs @@ -1,8 +1,6 @@ use convert_case::{Case, Casing}; use indexmap::IndexMap; -use integrationos_domain::common::{ - common_model::CommonModel, connection_model_definition::CrudAction, -}; +use integrationos_domain::{common_model::CommonModel, connection_model_definition::CrudAction}; use openapiv3::*; use strum::IntoEnumIterator; use tracing::debug; diff --git a/api/src/util/shape_mongo_filter.rs b/api/src/util/shape_mongo_filter.rs index f39cde2f..1322103a 100644 --- a/api/src/util/shape_mongo_filter.rs +++ b/api/src/util/shape_mongo_filter.rs @@ -1,6 +1,6 @@ use axum::extract::Query; use http::HeaderMap; -use integrationos_domain::common::event_access::EventAccess; +use integrationos_domain::event_access::EventAccess; use mongodb::bson::{doc, Document}; use std::{collections::BTreeMap, sync::Arc}; @@ -72,14 +72,14 @@ mod test { use axum::extract::Query; use http::HeaderMap; use integrationos_domain::{ - common::{ + id::{prefix::IdPrefix, Id}, + { connection_definition::{ConnectionDefinitionType, Paths}, environment::Environment, event_access::EventAccess, ownership::Ownership, record_metadata::RecordMetadata, }, - id::{prefix::IdPrefix, Id}, }; use crate::util::shape_mongo_filter::{ diff --git a/api/tests/api_tests/get_tests.rs b/api/tests/api_tests/get_tests.rs index 3444b5e2..9aa0af79 100644 --- a/api/tests/api_tests/get_tests.rs +++ b/api/tests/api_tests/get_tests.rs @@ -3,7 +3,7 @@ use std::{collections::HashMap, ops::Deref}; use api::endpoints::{common_model, ReadResponse}; use fake::{Fake, Faker}; use http::{Method, StatusCode}; -use integrationos_domain::common::{ +use integrationos_domain::{ common_model::{CommonModel, DataType, Expandable, Field}, json_schema::JsonSchema, }; diff --git a/api/tests/api_tests/pagination_tests.rs b/api/tests/api_tests/pagination_tests.rs index 735977a7..4ad65c45 100644 --- a/api/tests/api_tests/pagination_tests.rs +++ b/api/tests/api_tests/pagination_tests.rs @@ -3,7 +3,7 @@ use std::time::Duration; use api::endpoints::{pipeline::CreatePipelineRequest, ReadResponse}; use fake::{Fake, Faker}; use http::{Method, StatusCode}; -use integrationos_domain::common::Pipeline; +use integrationos_domain::Pipeline; use serde_json::Value; use tokio::time::sleep; diff --git a/api/tests/api_tests/passthrough_tests.rs b/api/tests/api_tests/passthrough_tests.rs index d127add9..9437395d 100644 --- a/api/tests/api_tests/passthrough_tests.rs +++ b/api/tests/api_tests/passthrough_tests.rs @@ -4,7 +4,7 @@ use http::{ header::{AUTHORIZATION, CONTENT_TYPE}, Method, StatusCode, }; -use integrationos_domain::common::{ +use integrationos_domain::{ api_model_config::{AuthMethod, SamplesInput, SchemasInput}, connection_model_definition::CrudAction, environment::Environment, diff --git a/api/tests/api_tests/schema_tests.rs b/api/tests/api_tests/schema_tests.rs index 7a27224a..e9cea3c6 100644 --- a/api/tests/api_tests/schema_tests.rs +++ b/api/tests/api_tests/schema_tests.rs @@ -2,12 +2,12 @@ use api::endpoints::{connection_model_schema::CreateRequest, ReadResponse}; use fake::{Fake, Faker}; use http::{Method, StatusCode}; use integrationos_domain::{ - common::{ + id::{prefix::IdPrefix, Id}, + { connection_model_schema::{ConnectionModelSchema, Mappings}, environment::Environment, json_schema::JsonSchema, }, - id::{prefix::IdPrefix, Id}, }; use serde_json::Value; diff --git a/api/tests/api_tests/test_crud.rs b/api/tests/api_tests/test_crud.rs index 3058a251..2b15a5ec 100644 --- a/api/tests/api_tests/test_crud.rs +++ b/api/tests/api_tests/test_crud.rs @@ -7,7 +7,7 @@ use api::endpoints::{ }; use fake::{Fake, Faker}; use http::{Method, StatusCode}; -use integrationos_domain::common::{ +use integrationos_domain::{ common_model::CommonModel, connection_definition::ConnectionDefinition, connection_model_definition::ConnectionModelDefinition, connection_model_schema::ConnectionModelSchema, diff --git a/api/tests/api_tests/test_server/mod.rs b/api/tests/api_tests/test_server/mod.rs index 04cdf227..893732d7 100644 --- a/api/tests/api_tests/test_server/mod.rs +++ b/api/tests/api_tests/test_server/mod.rs @@ -14,7 +14,10 @@ use http::StatusCode; use http::{header::AUTHORIZATION, Method}; use integrationos_domain::{ algebra::{CryptoExt, MongoStore, StoreExt}, - common::{ + create_secret_response::{CreateSecretAuthor, CreateSecretResponse}, + get_secret_request::GetSecretRequest, + IntegrationOSError, + { access_key_data::AccessKeyData, access_key_prefix::AccessKeyPrefix, api_model_config::{AuthMethod, SamplesInput, SchemasInput}, @@ -27,9 +30,6 @@ use integrationos_domain::{ event_type::EventType, AccessKey, Connection, Store, }, - create_secret_response::{CreateSecretAuthor, CreateSecretResponse}, - get_secret_request::GetSecretRequest, - IntegrationOSError, }; use mockito::{Matcher, Server as MockServer, ServerGuard}; use mongodb::Client; diff --git a/api/tests/api_tests/test_server/test_core.rs b/api/tests/api_tests/test_server/test_core.rs index 019bfb9b..5090a244 100644 --- a/api/tests/api_tests/test_server/test_core.rs +++ b/api/tests/api_tests/test_server/test_core.rs @@ -8,9 +8,7 @@ use event_core::{ }; use gateway::config::Config as GatewayConfig; use http::StatusCode; -use integrationos_domain::common::{ - event_response::EventResponse, event_with_context::EventWithContext, -}; +use integrationos_domain::{event_response::EventResponse, event_with_context::EventWithContext}; use tokio::sync::{ mpsc::{self, Receiver}, Mutex, diff --git a/api/tests/api_tests/test_server/test_gateway.rs b/api/tests/api_tests/test_server/test_gateway.rs index 1fde1d9e..8b9824fd 100644 --- a/api/tests/api_tests/test_server/test_gateway.rs +++ b/api/tests/api_tests/test_server/test_gateway.rs @@ -1,12 +1,11 @@ -use std::collections::HashMap; - use anyhow::Result; use api::config::Config as ApiConfig; use envconfig::Envconfig; use gateway::{config::Config, finalizer::Finalizer, server::Server}; use http::StatusCode; -use integrationos_domain::common::event_response::EventResponse; +use integrationos_domain::event_response::EventResponse; use serde_json::{json, Value}; +use std::collections::HashMap; use testcontainers_modules::{redis::Redis, testcontainers::Container}; use tokio::net::TcpListener; use uuid::Uuid; diff --git a/api/tests/api_tests/transaction_tests.rs b/api/tests/api_tests/transaction_tests.rs index 79bdec6b..6149741b 100644 --- a/api/tests/api_tests/transaction_tests.rs +++ b/api/tests/api_tests/transaction_tests.rs @@ -3,7 +3,7 @@ use std::time::Duration; use api::endpoints::{pipeline::CreatePipelineRequest, ReadResponse}; use fake::{Fake, Faker}; use http::{Method, StatusCode}; -use integrationos_domain::common::{ +use integrationos_domain::{ connection_model_definition::PlatformInfo, destination::Action, environment::Environment, Transaction, }; diff --git a/api/tests/api_tests/unified_tests.rs b/api/tests/api_tests/unified_tests.rs index b80a0c27..11014377 100644 --- a/api/tests/api_tests/unified_tests.rs +++ b/api/tests/api_tests/unified_tests.rs @@ -12,14 +12,14 @@ use http::{ Method, StatusCode, }; use integrationos_domain::{ - common::{ + id::{prefix::IdPrefix, Id}, + { api_model_config::{AuthMethod, SamplesInput, SchemasInput}, connection_model_definition::{ConnectionModelDefinition, CrudAction, CrudMapping}, connection_model_schema::{ConnectionModelSchema, Mappings}, environment::Environment, Connection, }, - id::{prefix::IdPrefix, Id}, }; use mockito::Mock; use serde_json::Value; diff --git a/event-core/src/config.rs b/event-core/src/config.rs index 0a2ac240..c8f9432d 100644 --- a/event-core/src/config.rs +++ b/event-core/src/config.rs @@ -1,5 +1,5 @@ use envconfig::Envconfig; -use integrationos_domain::common::{ +use integrationos_domain::{ cache::CacheConfig as RedisConfig, database::DatabaseConfig, secrets::SecretsConfig, }; use std::fmt::{Display, Formatter}; diff --git a/event-core/src/dispatcher.rs b/event-core/src/dispatcher.rs index 81ba96b1..06d2ea3f 100644 --- a/event-core/src/dispatcher.rs +++ b/event-core/src/dispatcher.rs @@ -10,13 +10,13 @@ use futures::{ }; use integrationos_domain::{ algebra::{PipelineExt, PipelineStatus}, - common::{ - extractor_context::Stage as ExtractorStage, middleware::Middleware, ExtractorContext, - PipelineContext, RootContext, Transaction, - }, pipeline_context::PipelineStage, root_context::RootStage, Event, + { + extractor_context::Stage as ExtractorStage, middleware::Middleware, ExtractorContext, + PipelineContext, RootContext, Transaction, + }, }; use js_sandbox_ios::Script; use serde_json::{json, Value}; diff --git a/event-core/src/event_handler.rs b/event-core/src/event_handler.rs index a8f50954..3e67a90a 100644 --- a/event-core/src/event_handler.rs +++ b/event-core/src/event_handler.rs @@ -3,7 +3,7 @@ use anyhow::{Context, Result}; use integrationos_domain::{ algebra::RedisCache, cache::CacheConfig, - common::{event_with_context::EventWithContext, Event, Transaction}, + {event_with_context::EventWithContext, Event, Transaction}, }; use redis::AsyncCommands; use std::{sync::Arc, time::Duration}; diff --git a/event-core/src/main.rs b/event-core/src/main.rs index 174a1cd3..56313cfa 100644 --- a/event-core/src/main.rs +++ b/event-core/src/main.rs @@ -9,22 +9,19 @@ use event_core::{ mongo_context_store::MongoContextStore, mongo_control_data_store::MongoControlDataStore, }; -use integrationos_domain::service::secrets_client::SecretsClient; +use integrationos_domain::{client::secrets_client::SecretsClient, telemetry::{get_subscriber, init_subscriber}}; use metrics_exporter_prometheus::PrometheusBuilder; use std::sync::Arc; use tokio::sync::Mutex; use tokio_condvar::Condvar; -use tracing::{error, info, metadata::LevelFilter, warn}; -use tracing_subscriber::EnvFilter; +use tracing::{error, info, warn}; -#[tokio::main(flavor = "multi_thread")] +#[tokio::main] async fn main() -> Result<()> { dotenv().ok(); - let filter = EnvFilter::builder() - .with_default_directive(LevelFilter::DEBUG.into()) - .from_env_lossy(); - tracing_subscriber::fmt().with_env_filter(filter).init(); + let suscriber = get_subscriber("event-core".into(), "info".into(), std::io::stdout); + init_subscriber(suscriber); let config = EventCoreConfig::init_from_env()?; diff --git a/event-core/src/mongo_control_data_store.rs b/event-core/src/mongo_control_data_store.rs index c068c449..1a636468 100644 --- a/event-core/src/mongo_control_data_store.rs +++ b/event-core/src/mongo_control_data_store.rs @@ -10,13 +10,14 @@ use handlebars::Handlebars; use http::header::AUTHORIZATION; use integrationos_domain::{ algebra::{CryptoExt, FecherExt, GoogleTokenFetcher, MongoStore, StoreExt}, - common::{ - duplicates::Duplicates, encrypted_access_key::EncryptedAccessKey, - event_access::EventAccess, extractor::HttpExtractor, middleware::Middleware, Connection, - Event, Pipeline, Store, - }, + client::unified_destination_client::UnifiedDestination, + duplicates::Duplicates, + encrypted_access_key::EncryptedAccessKey, + event_access::EventAccess, + extractor::HttpExtractor, id::Id, - service::unified_destination::UnifiedDestination, + middleware::Middleware, + Connection, Event, Pipeline, Store, }; use moka::future::Cache; use mongodb::{options::ClientOptions, Client}; diff --git a/event-core/src/store.rs b/event-core/src/store.rs index 703d201c..0cb242c2 100644 --- a/event-core/src/store.rs +++ b/event-core/src/store.rs @@ -2,8 +2,8 @@ use anyhow::Result; use async_trait::async_trait; use integrationos_domain::{ algebra::PipelineExt, - common::{duplicates::Duplicates, extractor::HttpExtractor, Connection, Event, Pipeline}, id::Id, + {duplicates::Duplicates, extractor::HttpExtractor, Connection, Event, Pipeline}, }; use serde::{Deserialize, Serialize}; use serde_json::Value; diff --git a/event-core/tests/mock_destination.rs b/event-core/tests/mock_destination.rs index a7370c98..71ee537e 100644 --- a/event-core/tests/mock_destination.rs +++ b/event-core/tests/mock_destination.rs @@ -12,7 +12,11 @@ use fake::{ use http::Method; use integrationos_domain::{ algebra::CryptoExt, - common::{ + create_secret_response::CreateSecretResponse, + get_secret_request::GetSecretRequest, + id::{prefix::IdPrefix, Id}, + IntegrationOSError, + { api_model_config::{ApiModelConfig, AuthMethod, SamplesInput, SchemasInput}, connection_model_definition::{ ConnectionModelDefinition, CrudAction, PlatformInfo, TestConnection, @@ -24,10 +28,6 @@ use integrationos_domain::{ settings::Settings, Connection, ConnectionType, Pipeline, Throughput, }, - create_secret_response::CreateSecretResponse, - get_secret_request::GetSecretRequest, - id::{prefix::IdPrefix, Id}, - IntegrationOSError, }; use mockito::Server; use mongodb::Client; diff --git a/event-core/tests/mock_storage.rs b/event-core/tests/mock_storage.rs index 6d9d3ff9..0efc4c89 100644 --- a/event-core/tests/mock_storage.rs +++ b/event-core/tests/mock_storage.rs @@ -8,13 +8,13 @@ use event_core::{ use fake::{Fake, Faker}; use integrationos_domain::{ algebra::PipelineExt, - common::{ - duplicates::Duplicates, extractor::HttpExtractor, Connection, Event, ExtractorContext, - Pipeline, PipelineContext, RootContext, - }, id::{prefix::IdPrefix, Id}, pipeline_context::PipelineStage, root_context::RootStage, + { + duplicates::Duplicates, extractor::HttpExtractor, Connection, Event, ExtractorContext, + Pipeline, PipelineContext, RootContext, + }, }; use serde_json::Value; use std::{ diff --git a/gateway/Cargo.toml b/gateway/Cargo.toml index 29399e3e..6b9646c3 100644 --- a/gateway/Cargo.toml +++ b/gateway/Cargo.toml @@ -6,11 +6,11 @@ edition = "2021" [dependencies] anyhow.workspace = true async-trait.workspace = true -axum-macros.workspace = true -axum-prometheus = "0.4.0" +axum-prometheus = "0.6.1" axum.workspace = true dotenvy.workspace = true envconfig.workspace = true +futures.workspace = true http-serde-ext.workspace = true http.workspace = true integrationos-domain.workspace = true @@ -26,6 +26,7 @@ tracing.workspace = true [dev-dependencies] criterion = { version = "0.5.1", features = ["async_tokio"] } +http-body-util = "0.1.1" tower = { version = "0.4", features = ["util"] } [[bench]] diff --git a/gateway/benches/bench.rs b/gateway/benches/bench.rs index c838c8e6..f3370add 100644 --- a/gateway/benches/bench.rs +++ b/gateway/benches/bench.rs @@ -1,7 +1,7 @@ use axum::{body::Bytes, http::HeaderMap}; use criterion::{criterion_group, criterion_main, Criterion}; use gateway::{config::Config, mock_finalizer::MockFinalizer, server::Server, state::AppState}; -use integrationos_domain::common::{ +use integrationos_domain::{ encrypted_access_key::EncryptedAccessKey, encrypted_data::PASSWORD_LENGTH, AccessKey, Event, }; use std::{collections::HashMap, hint::black_box, sync::Arc}; diff --git a/gateway/src/config.rs b/gateway/src/config.rs index 68fd6843..a5cc0179 100644 --- a/gateway/src/config.rs +++ b/gateway/src/config.rs @@ -1,7 +1,7 @@ use envconfig::Envconfig; use integrationos_domain::{ cache::CacheConfig, - common::{database::DatabaseConfig, environment::Environment}, + {database::DatabaseConfig, environment::Environment}, }; use std::{ fmt::{Display, Formatter}, diff --git a/gateway/src/finalize_event.rs b/gateway/src/finalize_event.rs index d74d94c7..3c67d75c 100644 --- a/gateway/src/finalize_event.rs +++ b/gateway/src/finalize_event.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use integrationos_domain::common::{encrypted_access_key::EncryptedAccessKey, Event}; +use integrationos_domain::{encrypted_access_key::EncryptedAccessKey, Event}; #[async_trait] pub trait FinalizeEvent { diff --git a/gateway/src/finalizer.rs b/gateway/src/finalizer.rs index 097ae730..f694a536 100644 --- a/gateway/src/finalizer.rs +++ b/gateway/src/finalizer.rs @@ -3,7 +3,7 @@ use anyhow::{bail, Context, Result}; use async_trait::async_trait; use integrationos_domain::{ algebra::{MongoStore, RedisCache, StoreExt}, - common::{ + { encrypted_access_key::EncryptedAccessKey, event_with_context::EventWithContext, Event, RootContext, Store, }, diff --git a/gateway/src/main.rs b/gateway/src/main.rs index 5e38e032..0bdfaf97 100644 --- a/gateway/src/main.rs +++ b/gateway/src/main.rs @@ -3,19 +3,16 @@ use dotenvy::dotenv; use envconfig::Envconfig; use gateway::finalizer::Finalizer; use gateway::{config::Config, server::Server}; -use integrationos_domain::common::encrypted_data::PASSWORD_LENGTH; +use integrationos_domain::encrypted_data::PASSWORD_LENGTH; +use integrationos_domain::telemetry::{get_subscriber, init_subscriber}; use tracing::info; -use tracing::metadata::LevelFilter; -use tracing_subscriber::EnvFilter; -#[tokio::main(flavor = "multi_thread")] +#[tokio::main] async fn main() -> Result<()> { dotenv().ok(); - let filter = EnvFilter::builder() - .with_default_directive(LevelFilter::DEBUG.into()) - .from_env_lossy(); - tracing_subscriber::fmt().with_env_filter(filter).init(); + let suscriber = get_subscriber("gateway".into(), "info".into(), std::io::stdout); + init_subscriber(suscriber); let config = Config::init_from_env()?; if config.secret_key.len() != PASSWORD_LENGTH { diff --git a/gateway/src/mock_finalizer.rs b/gateway/src/mock_finalizer.rs index 60b1c2c7..88e72883 100644 --- a/gateway/src/mock_finalizer.rs +++ b/gateway/src/mock_finalizer.rs @@ -1,6 +1,6 @@ use super::finalize_event::FinalizeEvent; use async_trait::async_trait; -use integrationos_domain::common::{encrypted_access_key::EncryptedAccessKey, Event}; +use integrationos_domain::{encrypted_access_key::EncryptedAccessKey, Event}; pub struct MockFinalizer; diff --git a/gateway/src/server.rs b/gateway/src/server.rs index 6f9967ed..b108289b 100644 --- a/gateway/src/server.rs +++ b/gateway/src/server.rs @@ -5,17 +5,19 @@ use crate::{ use anyhow::{anyhow, Result}; use axum::{ body::Bytes, + debug_handler, extract::{Path, Query, State}, http::{HeaderMap, HeaderName, StatusCode}, routing::{get, post}, Json, Router, }; use axum_prometheus::PrometheusMetricLayer; -use integrationos_domain::common::{ +use integrationos_domain::{ encrypted_access_key::EncryptedAccessKey, event_response::EventResponse, event_type::EventType, AccessKey, Event, }; use std::{collections::HashMap, iter::once, sync::Arc}; +use tokio::net::TcpListener; use tower_http::{ cors::{Any, CorsLayer}, sensitive_headers::SetSensitiveRequestHeadersLayer, @@ -55,8 +57,12 @@ impl Server { pub async fn run(&self) -> Result<()> { let app = self.get_router(); info!("Gateway server listening on {}", self.config.address); - axum::Server::bind(&self.config.address) - .serve(app.into_make_service()) + + let tcp_listener = TcpListener::bind(&self.config.address) + .await + .map_err(|e| anyhow!("Failed to bind to address: {}", e))?; + + axum::serve(tcp_listener, app.into_make_service()) .await .map_err(|e| anyhow!("Server error: {}", e)) } @@ -157,7 +163,7 @@ impl Server { } } -#[axum_macros::debug_handler] +#[debug_handler] async fn post_event_sk( headers: HeaderMap, State(state): State>, @@ -188,7 +194,7 @@ async fn post_event_sk( Server::handle_event(encrypted_key, body, None, headers.clone(), state).await } -#[axum_macros::debug_handler] +#[debug_handler] async fn post_event_id( headers: HeaderMap, query: Option>>, @@ -219,10 +225,11 @@ async fn get_root() {} #[cfg(test)] mod tests { use axum::{ - body::{Body, HttpBody}, + body::Body, http::{header::CONTENT_TYPE, Method, Request, StatusCode}, }; - use integrationos_domain::common::{ + use http_body_util::BodyExt; + use integrationos_domain::{ event_state::EventState, hashes::{HashType, HashValue}, }; @@ -249,7 +256,7 @@ mod tests { .unwrap(); assert_eq!(response.status(), StatusCode::OK); - let body = response.into_body().data().await.unwrap().unwrap(); + let body = response.into_body().collect().await.unwrap().to_bytes(); let resp = serde_json::from_slice::(&body).unwrap(); assert_eq!(resp.status, EventState::Acknowledged); assert_eq!( @@ -289,8 +296,8 @@ mod tests { .await .unwrap(); assert_eq!(response.status(), StatusCode::BAD_REQUEST); - let body = response.into_body().data().await.unwrap().unwrap(); - assert_eq!(body, "Invalid access key"); + let body = response.into_body().collect().await.unwrap().to_bytes(); + assert_eq!(&body[..], b"Invalid access key"); } #[tokio::test] @@ -310,7 +317,7 @@ mod tests { .unwrap(); assert_eq!(response.status(), StatusCode::OK); - let body = response.into_body().data().await.unwrap().unwrap(); + let body = response.into_body().collect().await.unwrap().to_bytes(); let resp = serde_json::from_slice::(&body).unwrap(); assert_eq!(resp.status, EventState::Acknowledged); assert_eq!( @@ -351,7 +358,7 @@ mod tests { .await .unwrap(); assert_eq!(response.status(), StatusCode::BAD_REQUEST); - let body = response.into_body().data().await.unwrap().unwrap(); + let body = response.into_body().collect().await.unwrap().to_bytes(); assert_eq!(body, "Invalid access key"); } diff --git a/gateway/src/state.rs b/gateway/src/state.rs index 9d55c15f..730b6e9f 100644 --- a/gateway/src/state.rs +++ b/gateway/src/state.rs @@ -1,6 +1,6 @@ use super::finalize_event::FinalizeEvent; use crate::config::Config; -use integrationos_domain::common::{ +use integrationos_domain::{ encrypted_access_key::EncryptedAccessKey, encrypted_data::PASSWORD_LENGTH, AccessKey, }; use moka::future::Cache; diff --git a/watchdog/src/config.rs b/watchdog/src/config.rs deleted file mode 100644 index 3de725cc..00000000 --- a/watchdog/src/config.rs +++ /dev/null @@ -1,25 +0,0 @@ -use std::fmt::{Display, Formatter}; - -use envconfig::Envconfig; -use integrationos_domain::{cache::CacheConfig, common::database::DatabaseConfig}; - -#[derive(Envconfig, Clone)] // Intentionally no Debug so secret is not printed -pub struct Config { - #[envconfig(from = "EVENT_TIMEOUT", default = "300")] // 300 seconds/ 5 minutes - pub event_timeout: u64, - #[envconfig(from = "POLL_DURATION", default = "10")] // 10 seconds - pub poll_duration: u64, - #[envconfig(nested = true)] - pub redis: CacheConfig, - #[envconfig(nested = true)] - pub db: DatabaseConfig, -} - -impl Display for Config { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - writeln!(f, "POLL_DURATION: {}", self.poll_duration)?; - writeln!(f, "EVENT_TIMEOUT: {}", self.event_timeout)?; - writeln!(f, "{}", self.redis)?; - writeln!(f, "{}", self.db) - } -} diff --git a/watchdog/src/lib.rs b/watchdog/src/lib.rs deleted file mode 100644 index ef68c369..00000000 --- a/watchdog/src/lib.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod config; diff --git a/watchdog/src/main.rs b/watchdog/src/main.rs index 367da520..2b97d9e3 100644 --- a/watchdog/src/main.rs +++ b/watchdog/src/main.rs @@ -1,272 +1,30 @@ use anyhow::{Context, Result}; -use chrono::Utc; use dotenvy::dotenv; use envconfig::Envconfig; -use futures::{future::join_all, TryStreamExt}; use integrationos_domain::{ - algebra::{MongoStore, RedisCache, StoreExt}, - common::{ - event_with_context::EventWithContext, ExtractorContext, PipelineContext, RootContext, Store, - }, - pipeline_context::PipelineStage, - root_context::RootStage, + cache::CacheConfig, + client::watchdog_client::WatchdogClient, + database::DatabaseConfig, + telemetry::{get_subscriber, init_subscriber}, + watchdog::WatchdogConfig, }; -use mongodb::{ - bson::{doc, Bson, Document}, - options::FindOneOptions, -}; -use redis::{AsyncCommands, LposOptions, RedisResult}; -use std::time::Duration; -use tracing::{debug, error, info, metadata::LevelFilter, warn}; -use tracing_subscriber::EnvFilter; -use watchdog::config::Config; +use tracing::info; -#[tokio::main(flavor = "multi_thread")] +#[tokio::main] async fn main() -> Result<()> { dotenv().ok(); - let filter = EnvFilter::builder() - .with_default_directive(LevelFilter::DEBUG.into()) - .from_env_lossy(); - tracing_subscriber::fmt().with_env_filter(filter).init(); - - let config = Config::init_from_env()?; - - info!("Starting watchdog with config: {config}"); - - let mut redis = RedisCache::new(&config.redis, 100).await?; - - let key = config.redis.event_throughput_key.clone(); - let mut redis_clone = redis.clone(); - tokio::spawn(async move { - loop { - let _: RedisResult = async { redis_clone.del(key.clone()).await }.await; - tokio::time::sleep(Duration::from_secs(1)).await; - } - }); - - let key = config.redis.api_throughput_key.clone(); - let mut redis_clone = redis.clone(); - tokio::spawn(async move { - loop { - let _: RedisResult = async { redis_clone.del(key.clone()).await }.await; - tokio::time::sleep(Duration::from_secs(60)).await; - } - }); - - let mongo = mongodb::Client::with_uri_str(config.db.context_db_url) - .await - .with_context(|| "Could not connect to mongodb")?; - let db = mongo.database(&config.db.context_db_name); - let coll = db.collection::(&config.db.context_collection_name); - let root_coll = db.collection::(&config.db.context_collection_name); - let pipeline_coll = db.collection::(&config.db.context_collection_name); - let extractor_coll = db.collection::(&config.db.context_collection_name); - - let event_client = mongodb::Client::with_uri_str(config.db.event_db_url) - .await - .with_context(|| "Could not connect to events db")?; - - let event_db = event_client.database(&config.db.event_db_name); - let event_store = MongoStore::new(&event_db, &Store::Events) - .await - .with_context(|| { - format!( - "Could not connect to event db at {}", - config.db.event_db_name - ) - })?; - - loop { - let mut count = 0; - let timestamp = Utc::now().timestamp_millis() - (config.event_timeout * 1_000) as i64; - - let pipeline = vec![ - // Sort by timestamp to get latest contexts first - doc! { - "$sort": { - "timestamp": -1 - }, - }, - // Group by event_key - // Get the first (latest) context's stage and status - // Count any contexts that are later than the poll duration cutoff - // If there are any that are later then this context is still not dead - doc! { - "$group": { - "_id": "$eventKey", - "stage": { - "$first": "$stage" - }, - "status": { - "$first": "$status" - }, - "count": { - "$sum": { - "$cond": [{ - "$gt": [ - "$timestamp", timestamp - ] - }, 1, 0] - }, - }, - }, - }, - // Match any contexts that have no contexts after our cutoff date, so presumed dead - // And also not finished and status is succeeded (not dropped) - // These contexts are unfinished and dead, so need to be republished to redis - doc! { - "$match": { - "count": { "$eq": 0 }, - "stage": { "$ne": "Finished" }, - "status": { "$eq": "Succeeded" } - } - }, - ]; - - let mut event_keys = match coll.clone().aggregate(pipeline, None).await { - Ok(e) => e, - Err(e) => { - error!("Failed to fetch event keys: {e}"); - continue; - } - }; - - 'outer: while let Some(event_key) = event_keys.try_next().await? { - let Some(Bson::String(event_key)) = event_key.get("_id") else { - error!("Could not get _id out of event keys response"); - continue; - }; - // Sort by earliest timestamp to get latest context - let options = FindOneOptions::builder() - .sort(doc! { "timestamp": -1 }) - .build(); - - // Get the latest root context, then also get all latest pipeline contexts and extractor contexts if applicable - let root_context = match root_coll - .clone() - .find_one( - doc! { - "eventKey": event_key, - "type": "root" - }, - options.clone(), - ) - .await - { - Ok(c) => c, - Err(e) => { - error!("Failed to fetch root context: {e}"); - continue; - } - }; - let Some(mut root_context) = root_context else { - error!("Did not find root context for {event_key}"); - continue; - }; - - if let RootStage::ProcessingPipelines(ref mut pipelines) = root_context.stage { - let futs = pipelines.values().map(|p| { - pipeline_coll.find_one( - doc! { - "eventKey": p.event_key.to_string(), - "pipelineKey": p.pipeline_key.clone(), - "type": "pipeline" - }, - options.clone(), - ) - }); - - let results = join_all(futs).await; - for result in results { - match result { - Ok(context) => { - let Some(mut context) = context else { - error!("Did not find pipeline context for {event_key}"); - continue 'outer; - }; - if let PipelineStage::ExecutingExtractors(ref mut extractors) = - context.stage - { - let futs = extractors.values().map(|e| { - let filter = doc! { - "eventKey": e.event_key.to_string(), - "pipelineKey": e.pipeline_key.clone(), - "extractorKey": e.extractor_key.to_string(), - "type": "extractor" - }; - extractor_coll.find_one(filter, options.clone()) - }); - let results = join_all(futs).await; - for result in results { - match result { - Ok(context) => { - let Some(context) = context else { - error!("Did not find extractor context for {event_key}"); - continue 'outer; - }; - extractors - .insert(context.extractor_key.clone(), context); - } - Err(e) => { - error!("Did not find extractor context for {event_key}: {e}"); - continue 'outer; - } - } - } - } - pipelines.insert(context.pipeline_key.clone(), context); - } - Err(e) => { - error!("Could not fetch pipeline context for {event_key}: {e}"); - continue 'outer; - } - } - } - } - - debug!("Republishing unresponsive context {event_key}"); - - let Some(event) = event_store - .get_one_by_id(event_key) - .await - .with_context(|| "could not fetch event for context {event_key}")? - else { - error!("Event does not exist {event_key}"); - continue; - }; + let suscriber = get_subscriber("watchdog".into(), "info".into(), std::io::stdout); + init_subscriber(suscriber); - let event_with_context = EventWithContext::new(event, root_context); + let watchdog_config = WatchdogConfig::init_from_env().context("Could not load config")?; + let cache_config = CacheConfig::init_from_env().context("Could not load config")?; + let database_config = DatabaseConfig::init_from_env().context("Could not load config")?; - let payload = match serde_json::to_vec(&event_with_context) { - Ok(c) => c, - Err(e) => { - error!("Could not serialize payload {event_with_context:?}: {e}"); - continue; - } - }; - if redis - .lpos::<&str, &[u8], Option>( - &config.redis.queue_name, - &payload, - LposOptions::default(), - ) - .await? - .is_some() - { - warn!("Unresponsive context is already in redis {event_key}"); - continue; - } - match redis.lpush(&config.redis.queue_name, payload).await { - Ok(()) => count += 1, - Err(e) => error!("Could not publish event to redis: {e}"), - } - } + info!("Starting watchdog with config: {watchdog_config}{cache_config}{database_config}"); - if count > 0 { - info!("Republished {count} new events"); - } + let client = WatchdogClient::new(watchdog_config, cache_config, database_config); - tokio::time::sleep(Duration::from_secs(config.poll_duration)).await; - } + client.start().await??; + Ok(()) }