diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 8bf6e8f7..76ce3d21 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -4,7 +4,7 @@ jobs: fmt: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - uses: dtolnay/rust-toolchain@stable with: components: rustfmt @@ -17,7 +17,7 @@ jobs: check: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Install protoc run: sudo apt-get update && sudo apt-get install -y protobuf-compiler - uses: dtolnay/rust-toolchain@stable @@ -30,7 +30,7 @@ jobs: clippy: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Install protoc run: sudo apt-get update && sudo apt-get install -y protobuf-compiler - uses: dtolnay/rust-toolchain@stable @@ -51,7 +51,13 @@ jobs: sudo rm -rf /opt/ghc sudo rm -rf "/usr/local/share/boost" sudo rm -rf "$AGENT_TOOLSDIRECTORY" - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 + - name: Starting up Docker 🐳 + run: ls && docker compose -f ./integrationos-emit/tests/resource/docker-compose.yml up -d + - name: Install fluvio CLI + run: curl -fsS https://hub.infinyon.cloud/install/install.sh | bash + - name: Create fluvio topic + run: ~/.fluvio/bin/fluvio profile add docker 127.0.0.1:9103 docker && ~/.fluvio/bin/fluvio topic create -p 2 events && ~/.fluvio/bin/fluvio topic create -p 2 dlq - name: Install protoc run: sudo apt-get update && sudo apt-get install -y protobuf-compiler - uses: dtolnay/rust-toolchain@stable diff --git a/Cargo.lock b/Cargo.lock index a57f49ec..fec25225 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -307,9 +307,9 @@ dependencies = [ [[package]] name = "async-stream" -version = "0.3.5" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" dependencies = [ "async-stream-impl", "futures-core", @@ -318,9 +318,9 @@ dependencies = [ [[package]] name = "async-stream-impl" -version = "0.3.5" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" dependencies = [ "proc-macro2", "quote", @@ -387,9 +387,9 @@ checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" [[package]] name = "axum" -version = "0.7.5" +version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" dependencies = [ "async-trait", "axum-core", @@ -414,7 +414,7 @@ dependencies = [ "serde_urlencoded", "sync_wrapper 1.0.2", "tokio", - "tower", + "tower 0.5.1", "tower-layer", "tower-service", "tracing", @@ -422,9 +422,9 @@ dependencies = [ [[package]] name = "axum-core" -version = "0.4.3" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a15c63fd72d41492dc4f497196f5da1fb04fb7529e631d73630d1b491e47a2e3" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" dependencies = [ "async-trait", "bytes", @@ -435,7 +435,7 @@ dependencies = [ "mime", "pin-project-lite", "rustversion", - "sync_wrapper 0.1.2", + "sync_wrapper 1.0.2", "tower-layer", "tower-service", "tracing", @@ -443,11 +443,10 @@ dependencies = [ [[package]] name = "axum-macros" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00c055ee2d014ae5981ce1016374e8213682aa14d9bf40e48ab48b5f3ef20eaa" +checksum = "57d123550fa8d071b7255cb0cc04dc302baa6c8c4a79f55701552684d8399bce" dependencies = [ - "heck 0.4.1", "proc-macro2", "quote", "syn 2.0.89", @@ -471,7 +470,7 @@ dependencies = [ "once_cell", "pin-project", "tokio", - "tower", + "tower 0.4.13", "tower-http", ] @@ -640,14 +639,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed59b5c00048f48d7af971b71f800fdf23e858844a6f9e4d32ca72e9399e7864" dependencies = [ "serde", - "serde_with", + "serde_with 1.14.0", ] [[package]] name = "borsh" -version = "1.5.1" +version = "1.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6362ed55def622cddc70a4746a68554d7b687713770de539e59a739b249f8ed" +checksum = "2506947f73ad44e344215ccd6403ac2ae18cd8e046e581a441bf8d199f257f03" dependencies = [ "borsh-derive", "cfg_aliases", @@ -655,16 +654,15 @@ dependencies = [ [[package]] name = "borsh-derive" -version = "1.5.1" +version = "1.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3ef8005764f53cd4dca619f5bf64cafd4664dada50ece25e4d81de54c80cc0b" +checksum = "c2593a3b8b938bd68373196c9832f516be11fa487ef4ae745eb282e6a56a7244" dependencies = [ "once_cell", "proc-macro-crate", "proc-macro2", "quote", "syn 2.0.89", - "syn_derive", ] [[package]] @@ -1068,16 +1066,6 @@ dependencies = [ "cipher", ] -[[package]] -name = "ctrlc" -version = "3.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90eeab0aa92f3f9b4e87f258c72b139c207d251f9cbc1080a0086b86a8870dd3" -dependencies = [ - "nix", - "windows-sys 0.59.0", -] - [[package]] name = "darling" version = "0.13.4" @@ -1484,14 +1472,14 @@ dependencies = [ [[package]] name = "enum-as-inner" -version = "0.4.0" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21cdad81446a7f7dc43f6a77409efeb9733d2fa65553efef6018ef257c959b73" +checksum = "a1e6a265c649f3f5979b601d26f1d05ada116434c87741c9493cb56218f76cbc" dependencies = [ - "heck 0.4.1", + "heck 0.5.0", "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.89", ] [[package]] @@ -1653,9 +1641,9 @@ dependencies = [ [[package]] name = "flume" -version = "0.11.0" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181" +checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" dependencies = [ "futures-core", "futures-sink", @@ -1758,7 +1746,7 @@ dependencies = [ "openssl", "openssl-sys", "pin-project", - "socket2 0.5.7", + "socket2", "thiserror 1.0.69", "tracing", "ws_stream_wasm", @@ -2180,9 +2168,9 @@ dependencies = [ [[package]] name = "google-cloud-auth" -version = "0.17.1" +version = "0.17.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "357160f51a60ec3e32169ad687f4abe0ee1e90c73b449aa5d11256c4f1cf2ff6" +checksum = "e57a13fbacc5e9c41ded3ad8d0373175a6b7a6ad430d99e89d314ac121b7ab06" dependencies = [ "async-trait", "base64 0.21.7", @@ -2202,9 +2190,9 @@ dependencies = [ [[package]] name = "google-cloud-gax" -version = "0.19.1" +version = "0.19.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c929076122a1839455cfe6c030278f10a400dd4dacc11d2ca46c20c47dc05996" +checksum = "de13e62d7e0ffc3eb40a0113ddf753cf6ec741be739164442b08893db4f9bfca" dependencies = [ "google-cloud-token", "http 1.1.0", @@ -2212,7 +2200,7 @@ dependencies = [ "tokio", "tokio-retry2", "tonic", - "tower", + "tower 0.4.13", "tracing", ] @@ -2452,6 +2440,51 @@ dependencies = [ "serde", ] +[[package]] +name = "hickory-proto" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07698b8420e2f0d6447a436ba999ec85d8fbf2a398bbd737b82cac4a2e96e512" +dependencies = [ + "async-trait", + "cfg-if", + "data-encoding", + "enum-as-inner", + "futures-channel", + "futures-io", + "futures-util", + "idna 0.4.0", + "ipnet", + "once_cell", + "rand", + "thiserror 1.0.69", + "tinyvec", + "tokio", + "tracing", + "url", +] + +[[package]] +name = "hickory-resolver" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28757f23aa75c98f254cf0405e6d8c25b831b32921b050a66692427679b1f243" +dependencies = [ + "cfg-if", + "futures-util", + "hickory-proto", + "ipconfig", + "lru-cache", + "once_cell", + "parking_lot 0.12.3", + "rand", + "resolv-conf", + "smallvec", + "thiserror 1.0.69", + "tokio", + "tracing", +] + [[package]] name = "hkdf" version = "0.12.4" @@ -2600,7 +2633,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.5.7", + "socket2", "tokio", "tower-service", "tracing", @@ -2670,9 +2703,9 @@ dependencies = [ [[package]] name = "hyper-timeout" -version = "0.5.1" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" dependencies = [ "hyper 1.5.1", "hyper-util", @@ -2710,7 +2743,7 @@ dependencies = [ "http-body 1.0.1", "hyper 1.5.1", "pin-project-lite", - "socket2 0.5.7", + "socket2", "tokio", "tower-service", "tracing", @@ -2863,17 +2896,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" -[[package]] -name = "idna" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "418a0a6fab821475f634efe3ccc45c013f742efe03d853e8d3355d5cb850ecf8" -dependencies = [ - "matches", - "unicode-bidi", - "unicode-normalization", -] - [[package]] name = "idna" version = "0.4.0" @@ -2925,6 +2947,7 @@ checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" dependencies = [ "autocfg", "hashbrown 0.12.3", + "serde", ] [[package]] @@ -2990,7 +3013,7 @@ dependencies = [ "num_cpus", "openapiv3", "rand", - "redis 0.27.4", + "redis 0.27.5", "regex", "reqwest", "schemars", @@ -3003,7 +3026,7 @@ dependencies = [ "testcontainers-modules", "thiserror 1.0.69", "tokio", - "tower", + "tower 0.4.13", "tower-http", "tracing", "tracing-subscriber", @@ -3051,7 +3074,7 @@ dependencies = [ "integrationos-domain", "moka", "mongodb", - "redis 0.27.4", + "redis 0.27.5", "serde", "serde_json", "tokio", @@ -3086,7 +3109,7 @@ dependencies = [ "strum", "testcontainers-modules", "tokio", - "tower", + "tower 0.4.13", "tower-http", "tracing", "tracing-subscriber", @@ -3122,6 +3145,7 @@ dependencies = [ "indexmap 2.6.0", "js-sandbox-ios", "jsonpath_lib", + "jsonwebtoken 8.3.0", "k8s-openapi", "kube", "mockito", @@ -3165,7 +3189,6 @@ dependencies = [ "async-trait", "axum", "chrono", - "ctrlc", "dotenvy", "envconfig", "fluvio", @@ -3174,7 +3197,7 @@ dependencies = [ "http 1.1.0", "http-serde-ext-ios", "integrationos-domain", - "log", + "mockito", "mongodb", "num_cpus", "reqwest", @@ -3216,7 +3239,7 @@ dependencies = [ "mockito", "moka", "mongodb", - "redis 0.27.4", + "redis 0.27.5", "reqwest", "serde", "serde_json", @@ -3244,11 +3267,11 @@ dependencies = [ "integrationos-domain", "moka", "mongodb", - "redis 0.27.4", + "redis 0.27.5", "serde", "serde_json", "tokio", - "tower", + "tower 0.4.13", "tower-http", "tracing", ] @@ -3291,7 +3314,7 @@ dependencies = [ "integrationos-cache", "integrationos-domain", "mongodb", - "redis 0.27.4", + "redis 0.27.5", "serde_json", "tokio", "tracing", @@ -3303,7 +3326,7 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b58db92f96b720de98181bbbe63c831e87005ab460c1bf306eb2622b4707997f" dependencies = [ - "socket2 0.5.7", + "socket2", "widestring", "windows-sys 0.48.0", "winreg", @@ -3344,9 +3367,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.13" +version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "540654e97a3f4470a492cd30ff187bc95d89557a903a2bbf112e2fae98104ef2" +checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" [[package]] name = "jobserver" @@ -3470,9 +3493,9 @@ dependencies = [ [[package]] name = "k256" -version = "0.13.3" +version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "956ff9b67e26e1a6a866cb758f12c6f8746208489e3e4a4b5580802f2f0a587b" +checksum = "f6e3919bbaa2945715f0bb6d3934a173d1e9a59ac23767fbaaef277265a7411b" dependencies = [ "cfg-if", "ecdsa", @@ -3560,7 +3583,7 @@ dependencies = [ "thiserror 1.0.69", "tokio", "tokio-util", - "tower", + "tower 0.4.13", "tower-http", "tracing", ] @@ -3673,9 +3696,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.164" +version = "0.2.165" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "433bfe06b8c75da9b2e3fbea6e5329ff87748f0b144ef75306e674c3f6f7c13f" +checksum = "fcb4d3d38eab6c5239a362fa8bae48c03baf980a6e7079f063942d563ef3533e" [[package]] name = "libloading" @@ -3793,12 +3816,6 @@ dependencies = [ "regex-automata 0.1.10", ] -[[package]] -name = "matches" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2532096657941c2fea9c289d370a250971c689d4f143798ff67113ec042024a5" - [[package]] name = "matchit" version = "0.7.3" @@ -4058,9 +4075,9 @@ dependencies = [ [[package]] name = "mongodb" -version = "2.8.2" +version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef206acb1b72389b49bc9985efe7eb1f8a9bb18e5680d262fac26c07f44025f1" +checksum = "c857d71f918b38221baf2fdff7207fec9984b4504901544772b1edf0302d669f" dependencies = [ "async-trait", "base64 0.13.1", @@ -4074,9 +4091,12 @@ dependencies = [ "futures-io", "futures-util", "hex", + "hickory-proto", + "hickory-resolver", "hmac", - "lazy_static", "md-5", + "mongodb-internal-macros", + "once_cell", "pbkdf2", "percent-encoding", "rand", @@ -4085,24 +4105,33 @@ dependencies = [ "rustls-pemfile 1.0.4", "serde", "serde_bytes", - "serde_with", + "serde_with 3.11.0", "sha-1", "sha2", - "socket2 0.4.10", + "socket2", "stringprep", - "strsim 0.10.0", + "strsim 0.11.1", "take_mut", "thiserror 1.0.69", "tokio", "tokio-rustls 0.24.1", "tokio-util", - "trust-dns-proto", - "trust-dns-resolver", "typed-builder", "uuid", "webpki-roots 0.25.4", ] +[[package]] +name = "mongodb-internal-macros" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a6dbc533e93429a71c44a14c04547ac783b56d3f22e6c4f12b1b994cf93844e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.89", +] + [[package]] name = "napi" version = "2.16.13" @@ -4853,9 +4882,9 @@ dependencies = [ [[package]] name = "prost-types" -version = "0.13.1" +version = "0.13.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cee5168b05f49d4b0ca581206eb14a7b22fafd963efe729ac48eb03266e25cc2" +checksum = "4759aa0d3a6232fb8dbdb97b61de2c20047c68aca932c7ed76da9d788508d670" dependencies = [ "prost 0.13.3", ] @@ -4929,7 +4958,7 @@ dependencies = [ "quinn-udp", "rustc-hash 2.0.0", "rustls 0.23.18", - "socket2 0.5.7", + "socket2", "thiserror 2.0.3", "tokio", "tracing", @@ -4964,7 +4993,7 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2 0.5.7", + "socket2", "tracing", "windows-sys 0.59.0", ] @@ -5053,9 +5082,9 @@ dependencies = [ [[package]] name = "redis" -version = "0.27.4" +version = "0.27.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc6baebe319ef5e4b470f248335620098d1c2e9261e995be05f56f719ca4bdb2" +checksum = "81cccf17a692ce51b86564334614d72dcae1def0fd5ecebc9f02956da74352b5" dependencies = [ "arc-swap", "async-trait", @@ -5072,7 +5101,7 @@ dependencies = [ "serde", "serde_json", "sha1_smol", - "socket2 0.5.7", + "socket2", "tokio", "tokio-native-tls", "tokio-retry2", @@ -5426,12 +5455,12 @@ dependencies = [ [[package]] name = "rustc_version_runtime" -version = "0.2.1" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d31b7153270ebf48bf91c65ae5b0c00e749c4cfad505f66530ac74950249582f" +checksum = "2dd18cd2bae1820af0b6ad5e54f4a51d0f3fcc53b05f845675074efcc7af071d" dependencies = [ - "rustc_version 0.2.3", - "semver 0.9.0", + "rustc_version 0.4.1", + "semver 1.0.23", ] [[package]] @@ -5549,9 +5578,9 @@ dependencies = [ [[package]] name = "rustversion" -version = "1.0.17" +version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "955d28af4278de8121b7ebeb796b6a45735dc01436d898801014aced2773a3d6" +checksum = "0e819f2bc632f285be6d7cd36e25940d45b2391dd6d9b939e79de557f7014248" [[package]] name = "ryu" @@ -5832,7 +5861,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "678b5a069e50bf00ecd22d0cd8ddf7c236f68581b03db652061ed5eb13a312ff" dependencies = [ "serde", - "serde_with_macros", + "serde_with_macros 1.5.2", +] + +[[package]] +name = "serde_with" +version = "3.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e28bdad6db2b8340e449f7108f020b3b092e8583a9e3fb82713e1d4e71fe817" +dependencies = [ + "base64 0.22.1", + "chrono", + "hex", + "indexmap 1.9.3", + "indexmap 2.6.0", + "serde", + "serde_derive", + "serde_json", + "serde_with_macros 3.11.0", + "time", ] [[package]] @@ -5847,6 +5894,18 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "serde_with_macros" +version = "3.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d846214a9854ef724f3da161b426242d8de7c1fc7de2f89bb1efcb154dca79d" +dependencies = [ + "darling 0.20.10", + "proc-macro2", + "quote", + "syn 2.0.89", +] + [[package]] name = "serde_yaml" version = "0.9.34+deprecated" @@ -6012,16 +6071,6 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" -[[package]] -name = "socket2" -version = "0.4.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" -dependencies = [ - "libc", - "winapi", -] - [[package]] name = "socket2" version = "0.5.7" @@ -6385,18 +6434,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "syn_derive" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1329189c02ff984e9736652b1631330da25eaa6bc639089ed4915d25446cbe7b" -dependencies = [ - "proc-macro-error", - "proc-macro2", - "quote", - "syn 2.0.89", -] - [[package]] name = "sync_wrapper" version = "0.1.2" @@ -6599,7 +6636,7 @@ dependencies = [ "parking_lot 0.12.3", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.7", + "socket2", "tokio-macros", "windows-sys 0.52.0", ] @@ -6653,9 +6690,9 @@ dependencies = [ [[package]] name = "tokio-retry2" -version = "0.5.5" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c093bc28674bb8824c31e3c4dd87512d0b9427ce8ec45f0a38a3bf57c426b636" +checksum = "903934dba1c4c2f2e9cb460ef10b5695e0b0ecad3bf9ee7c8675e540c5e8b2d1" dependencies = [ "pin-project", "rand", @@ -6767,11 +6804,11 @@ dependencies = [ "pin-project", "prost 0.13.3", "rustls-pemfile 2.2.0", - "socket2 0.5.7", + "socket2", "tokio", "tokio-rustls 0.26.0", "tokio-stream", - "tower", + "tower 0.4.13", "tower-layer", "tower-service", "tracing", @@ -6798,6 +6835,22 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2873938d487c3cfb9aed7546dc9f2711d867c9f90c46b889989a2cb84eba6b4f" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper 0.1.2", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower-http" version = "0.5.2" @@ -6872,9 +6925,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.32" +version = "0.1.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" +checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c" dependencies = [ "once_cell", "valuable", @@ -6944,51 +6997,6 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "859eb650cfee7434994602c3a68b25d77ad9e68c8a6cd491616ef86661382eb3" -[[package]] -name = "trust-dns-proto" -version = "0.21.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c31f240f59877c3d4bb3b3ea0ec5a6a0cff07323580ff8c7a605cd7d08b255d" -dependencies = [ - "async-trait", - "cfg-if", - "data-encoding", - "enum-as-inner", - "futures-channel", - "futures-io", - "futures-util", - "idna 0.2.3", - "ipnet", - "lazy_static", - "log", - "rand", - "smallvec", - "thiserror 1.0.69", - "tinyvec", - "tokio", - "url", -] - -[[package]] -name = "trust-dns-resolver" -version = "0.21.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4ba72c2ea84515690c9fcef4c6c660bb9df3036ed1051686de84605b74fd558" -dependencies = [ - "cfg-if", - "futures-util", - "ipconfig", - "lazy_static", - "log", - "lru-cache", - "parking_lot 0.12.3", - "resolv-conf", - "smallvec", - "thiserror 1.0.69", - "tokio", - "trust-dns-proto", -] - [[package]] name = "try-lock" version = "0.2.5" @@ -7042,9 +7050,9 @@ checksum = "5ab17db44d7388991a428b2ee655ce0c212e862eff1768a455c58f9aad6e7893" [[package]] name = "unicode-id-start" -version = "1.2.0" +version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc3882f69607a2ac8cc4de3ee7993d8f68bb06f2974271195065b3bd07f2edea" +checksum = "2f322b60f6b9736017344fa0635d64be2f458fbc04eef65f6be22976dd1ffd5b" [[package]] name = "unicode-ident" diff --git a/Cargo.toml b/Cargo.toml index 03b8d998..f1a9e57a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,7 +47,7 @@ kube = "0.95.0" k8s-openapi = "0.23.0" mockito = "1.6.1" moka = { version = "0.12.8", features = ["future"] } -mongodb = "2.8.2" +mongodb = "3.1.0" num_cpus = "1" openapiv3 = { version = "2.0.0", features = ["skip_serializing_defaults"] } rand = "0.8.5" diff --git a/integrationos-api/src/domain/config.rs b/integrationos-api/src/domain/config.rs index ffe99d39..29592120 100644 --- a/integrationos-api/src/domain/config.rs +++ b/integrationos-api/src/domain/config.rs @@ -59,6 +59,8 @@ pub struct ConnectionsConfig { /// This is the admin secret for the API. Be sure this value is not the one use to generate /// tokens for the users as it gives access to sensitive admin endpoints. pub jwt_secret: String, + #[envconfig(from = "EMIT_URL", default = "http://localhost:3001")] + pub emit_url: String, /// Burst size limit #[envconfig(from = "API_VERSION", default = "v1")] pub api_version: String, @@ -114,6 +116,7 @@ impl Display for ConnectionsConfig { "CONNECTION_CACHE_TTL_SECS: {}", self.connection_cache_ttl_secs )?; + writeln!(f, "EMIT_URL: {}", self.emit_url)?; writeln!( f, "CONNECTION_DEFINITION_CACHE_TTL_SECS: {}", diff --git a/integrationos-api/src/helper/k8s_driver.rs b/integrationos-api/src/helper/k8s_driver.rs index abfd0d59..2f750516 100644 --- a/integrationos-api/src/helper/k8s_driver.rs +++ b/integrationos-api/src/helper/k8s_driver.rs @@ -26,6 +26,21 @@ pub enum NamespaceScope { Production, } +impl TryFrom<&str> for NamespaceScope { + type Error = IntegrationOSError; + + fn try_from(value: &str) -> Result { + match value { + "development-db-conns" => Ok(NamespaceScope::Development), + "production-db-conns" => Ok(NamespaceScope::Production), + _ => Err(InternalError::invalid_argument( + &format!("Invalid namespace scope: {}", value), + None, + )), + } + } +} + impl AsRef for NamespaceScope { fn as_ref(&self) -> &str { match self { @@ -367,6 +382,40 @@ pub async fn coordinator_impl( #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct ServiceName(String); +impl ServiceName { + pub fn from_id(connection_id: Id) -> Result { + let connection_id = connection_id.to_string(); + // Sanitize the connection id by removing underscores + let connection_id = connection_id.replace('_', ""); + + // Create regex to match non-alphanumeric characters + let regex = regex::Regex::new(r"[^a-zA-Z0-9]+").map_err(|e| { + tracing::error!("Failed to create regex for connection id: {}", e); + InternalError::invalid_argument("Invalid connection id", None) + })?; + + // Convert connection_id to lowercase and replace special characters with '-' + let mut service_name = regex + .replace_all(&connection_id.to_lowercase(), "-") + .to_string(); + + // Trim leading/trailing '-' and ensure it starts with a letter + service_name = service_name.trim_matches('-').to_string(); + + // Ensure it starts with a letter + if !service_name.chars().next().unwrap_or(' ').is_alphabetic() { + service_name.insert(0, 'a'); // Prepend 'a' if it doesn't start with a letter + } + + // Truncate to meet Kubernetes' max DNS-1035 label length (63 characters) + if service_name.len() > 63 { + service_name = service_name[..63].to_string(); + } + + Ok(ServiceName(service_name)) + } +} + impl AsRef for ServiceName { fn as_ref(&self) -> &str { &self.0 @@ -378,32 +427,3 @@ impl Display for ServiceName { write!(f, "{}", self.as_ref()) } } - -pub fn generate_service_name(connection_id: &Id) -> Result { - let connection_id = connection_id.to_string(); - // Create regex to match non-alphanumeric characters - let regex = regex::Regex::new(r"[^a-zA-Z0-9]+").map_err(|e| { - tracing::error!("Failed to create regex for connection id: {}", e); - InternalError::invalid_argument("Invalid connection id", None) - })?; - - // Convert connection_id to lowercase and replace special characters with '-' - let mut service_name = regex - .replace_all(&connection_id.to_lowercase(), "-") - .to_string(); - - // Trim leading/trailing '-' and ensure it starts with a letter - service_name = service_name.trim_matches('-').to_string(); - - // Ensure it starts with a letter - if !service_name.chars().next().unwrap_or(' ').is_alphabetic() { - service_name.insert(0, 'a'); // Prepend 'a' if it doesn't start with a letter - } - - // Truncate to meet Kubernetes' max DNS-1035 label length (63 characters) - if service_name.len() > 63 { - service_name = service_name[..63].to_string(); - } - - Ok(ServiceName(service_name)) -} diff --git a/integrationos-api/src/logic/connection.rs b/integrationos-api/src/logic/connection.rs index 5302f1bb..8732b418 100644 --- a/integrationos-api/src/logic/connection.rs +++ b/integrationos-api/src/logic/connection.rs @@ -1,6 +1,6 @@ use super::{delete, event_access::DEFAULT_NAMESPACE, read, PublicExt, RequestExt}; use crate::{ - helper::{generate_service_name, DeploymentSpecParams, NamespaceScope, ServiceSpecParams}, + helper::{DeploymentSpecParams, NamespaceScope, ServiceName, ServiceSpecParams}, logic::event_access::{ generate_event_access, get_client_throughput, CreateEventAccessPayloadWithOwnership, }, @@ -398,7 +398,7 @@ async fn generate_k8s_specs_and_secret( ]) .collect(); - let service_name = generate_service_name(connection_id)?; + let service_name = ServiceName::from_id(*connection_id)?; let namespace = match state.config.environment { Environment::Test | Environment::Development => NamespaceScope::Development, @@ -651,7 +651,7 @@ pub async fn delete_connection( Environment::Test | Environment::Development => NamespaceScope::Development, Environment::Live | Environment::Production => NamespaceScope::Production, }; - let service_name = generate_service_name(&connection.args.id)?; + let service_name = ServiceName::from_id(connection.args.id)?; state.k8s_client.delete_all(namespace, service_name).await?; }; diff --git a/integrationos-api/src/logic/event_callback.rs b/integrationos-api/src/logic/event_callback.rs new file mode 100644 index 00000000..631dcae0 --- /dev/null +++ b/integrationos-api/src/logic/event_callback.rs @@ -0,0 +1,88 @@ +use super::connection::DatabaseConnectionSecret; +use crate::{ + helper::{NamespaceScope, ServiceName}, + server::AppState, +}; +use axum::{ + extract::{Path, State}, + routing::post, + Json, Router, +}; +use bson::doc; +use integrationos_domain::{ApplicationError, Connection, Id, IntegrationOSError}; +use std::sync::Arc; + +pub fn get_router() -> Router> { + Router::new().route( + "/database-connection-lost/:connection_id", + post(database_connection_lost_callback), + ) +} + +// TODO: Write tests for this endpoint +async fn database_connection_lost_callback( + State(state): State>, + Path(connection_id): Path, +) -> Result, IntegrationOSError> { + // Instead of direcly updating we're getting the record first so that we can + // modify the active and deprecated fields from the record metadata + // without having to update the whole record + let id = connection_id.to_string(); + let connection = state + .app_stores + .connection + .get_one_by_id(id.as_str()) + .await?; + + match connection { + None => Err(ApplicationError::not_found( + &format!("Connection with id {} not found", id), + None, + )), + Some(mut conn) => { + if conn.record_metadata.active { + conn.record_metadata.mark_deprecated("system"); + conn.record_metadata.mark_inactive("system"); + conn.record_metadata.mark_updated("system"); + + let secret = state + .secrets_client + .get(&conn.secrets_service_id, &conn.ownership.id) + .await?; + + // This means that there's a pod resource that is not running + // and we need to delete these resources + if let Ok(secret) = secret.decode::() { + let namespace: NamespaceScope = secret.namespace.as_str().try_into()?; + let service_name = ServiceName::from_id(connection_id)?; + + tracing::info!( + "Deleting all resources for connection {id} in namespace {}", + namespace + ); + + tracing::info!("service_name: {service_name}"); + + state.k8s_client.delete_all(namespace, service_name).await?; + + tracing::info!("Deleted all resources for connection {id}"); + } + + let updated = bson::to_document(&conn).map_err(|e| { + ApplicationError::bad_request( + &format!("Could not serialize connection: {e}"), + None, + ) + })?; + + state + .app_stores + .connection + .update_one(id.as_str(), doc! { "$set": updated }) + .await?; + } + + Ok(Json(conn)) + } + } +} diff --git a/integrationos-api/src/logic/metrics.rs b/integrationos-api/src/logic/metrics.rs index 1ed3e38e..5d43af44 100644 --- a/integrationos-api/src/logic/metrics.rs +++ b/integrationos-api/src/logic/metrics.rs @@ -68,10 +68,7 @@ pub async fn get_full_record( .collection::(&Store::Metrics.to_string()); let doc = match coll - .find_one( - bson::doc! { "clientId": access.ownership.client_id.clone()}, - None, - ) + .find_one(bson::doc! { "clientId": access.ownership.client_id.clone()}) .await { Ok(Some(doc)) => doc, @@ -113,10 +110,7 @@ pub async fn get_metrics( .map(|p| p.0) .unwrap_or(state.config.metric_system_id.clone()); - let doc = match coll - .find_one(bson::doc! { "clientId": &client_id }, None) - .await - { + let doc = match coll.find_one(bson::doc! { "clientId": &client_id }).await { Ok(Some(doc)) => doc, Ok(None) => { return Err(ApplicationError::not_found( diff --git a/integrationos-api/src/logic/mod.rs b/integrationos-api/src/logic/mod.rs index 48f163a3..6c670ad6 100644 --- a/integrationos-api/src/logic/mod.rs +++ b/integrationos-api/src/logic/mod.rs @@ -29,6 +29,7 @@ pub mod connection_model_definition; pub mod connection_model_schema; pub mod connection_oauth_definition; pub mod event_access; +pub mod event_callback; pub mod events; pub mod metrics; pub mod oauth; @@ -354,7 +355,7 @@ async fn get_connection( }) .build(); - let sparse_connection = match collection.find_one(filter, options).await { + let sparse_connection = match collection.find_one(filter).with_options(options).await { Ok(Some(data)) => data, Ok(None) => return Err(ApplicationError::not_found("Connection", None)), Err(e) => { diff --git a/integrationos-api/src/logic/openapi/mod.rs b/integrationos-api/src/logic/openapi/mod.rs index bf4c6437..2d1adf88 100644 --- a/integrationos-api/src/logic/openapi/mod.rs +++ b/integrationos-api/src/logic/openapi/mod.rs @@ -191,7 +191,7 @@ fn spawn_openapi_generation( tokio::spawn(async move { let stream: StreamResult = cm_store .collection - .find(Some(doc! { "primary": true }), None) + .find(doc! { "primary": true }) .await .map_err(|e| { error!("Could not fetch common model: {:?}", e); diff --git a/integrationos-api/src/logic/schema_generator.rs b/integrationos-api/src/logic/schema_generator.rs index 258fd02c..45229a22 100644 --- a/integrationos-api/src/logic/schema_generator.rs +++ b/integrationos-api/src/logic/schema_generator.rs @@ -184,7 +184,7 @@ pub async fn get_common_models_projections( .projection(doc! { "_id": 1, "name": 1 }) .build(); - let mut cursor = collection.find(filter, options).await?; + let mut cursor = collection.find(filter).with_options(options).await?; let mut common_models: Vec = Vec::new(); while let Some(result) = cursor.next().await { diff --git a/integrationos-api/src/middleware/blocker.rs b/integrationos-api/src/middleware/blocker.rs index e3c98dfc..fa78c7f9 100644 --- a/integrationos-api/src/middleware/blocker.rs +++ b/integrationos-api/src/middleware/blocker.rs @@ -24,7 +24,7 @@ pub struct BlockInvalidHeaders { } impl BlockInvalidHeaders { - pub async fn new(state: Arc) -> Self { + pub async fn from_state(state: Arc) -> Self { let whitelist = Arc::new(RwLock::new(BTreeSet::new())); let header_name = @@ -45,8 +45,8 @@ impl BlockInvalidHeaders { .app_stores .db .collection::(&Store::EventAccess.to_string()) - .find( - bson::doc! { "deleted": false }, + .find(bson::doc! { "deleted": false }) + .with_options( FindOptions::builder() .projection(bson::doc! { "accessKey": 1 diff --git a/integrationos-api/src/middleware/extractor.rs b/integrationos-api/src/middleware/extractor.rs index c9a2df3f..dfeb70e4 100644 --- a/integrationos-api/src/middleware/extractor.rs +++ b/integrationos-api/src/middleware/extractor.rs @@ -29,7 +29,7 @@ pub struct RateLimiter { } impl RateLimiter { - pub async fn new(state: Arc) -> Result { + pub async fn from_state(state: Arc) -> Result { if !state.config.rate_limit_enabled { return Err(anyhow::anyhow!("Rate limiting is disabled")); }; @@ -94,7 +94,7 @@ impl RateLimiter { } } -pub async fn rate_limit( +pub async fn rate_limit_middleware( Extension(event_access): Extension>, State(state): State>, req: Request, diff --git a/integrationos-api/src/middleware/header_auth.rs b/integrationos-api/src/middleware/header_auth.rs index 38914e1e..eafac32c 100644 --- a/integrationos-api/src/middleware/header_auth.rs +++ b/integrationos-api/src/middleware/header_auth.rs @@ -6,7 +6,7 @@ use mongodb::bson::doc; use std::sync::Arc; use tracing::error; -pub async fn header_auth( +pub async fn header_auth_middleware( State(state): State>, mut req: Request, next: Next, diff --git a/integrationos-api/src/middleware/jwt_auth.rs b/integrationos-api/src/middleware/jwt_auth.rs index f18fe9ae..316ca153 100644 --- a/integrationos-api/src/middleware/jwt_auth.rs +++ b/integrationos-api/src/middleware/jwt_auth.rs @@ -1,7 +1,10 @@ use crate::server::AppState; use axum::{body::Body, extract::State, middleware::Next, response::Response}; use http::Request; -use integrationos_domain::{ApplicationError, Claims, IntegrationOSError}; +use integrationos_domain::{ + ApplicationError, Claims, IntegrationOSError, DEFAULT_AUDIENCE, DEFAULT_ISSUER, + FALLBACK_AUDIENCE, FALLBACK_ISSUER, +}; use jsonwebtoken::{DecodingKey, Validation}; use std::sync::Arc; use tracing::info; @@ -15,10 +18,10 @@ pub struct JwtState { } impl JwtState { - pub fn new(state: &Arc) -> Self { + pub fn from_state(state: &Arc) -> Self { let mut validation = Validation::default(); - validation.set_audience(&["integrationos-users", "buildable-users"]); - validation.set_issuer(&["integrationos", "buildable"]); + validation.set_audience(&[DEFAULT_AUDIENCE, FALLBACK_AUDIENCE]); + validation.set_issuer(&[DEFAULT_ISSUER, FALLBACK_ISSUER]); Self { validation, decoding_key: DecodingKey::from_secret(state.config.jwt_secret.as_ref()), @@ -26,7 +29,7 @@ impl JwtState { } } -pub async fn jwt_auth( +pub async fn jwt_auth_middleware( State(state): State>, mut req: Request, next: Next, diff --git a/integrationos-api/src/middleware/mod.rs b/integrationos-api/src/middleware/mod.rs index f0c9d3fd..36d7ffa2 100644 --- a/integrationos-api/src/middleware/mod.rs +++ b/integrationos-api/src/middleware/mod.rs @@ -3,5 +3,5 @@ pub mod extractor; pub mod header_auth; pub mod jwt_auth; -pub use header_auth::header_auth; -pub use jwt_auth::jwt_auth; +pub use header_auth::header_auth_middleware; +pub use jwt_auth::jwt_auth_middleware; diff --git a/integrationos-api/src/router/public.rs b/integrationos-api/src/router/public.rs index a8a84f35..5d249939 100644 --- a/integrationos-api/src/router/public.rs +++ b/integrationos-api/src/router/public.rs @@ -27,8 +27,8 @@ pub fn get_router(state: &Arc) -> Router> { .route( "/event-access/default", post(create_event_access_for_new_user).layer(from_fn_with_state( - Arc::new(JwtState::new(state)), - jwt_auth::jwt_auth, + Arc::new(JwtState::from_state(state)), + jwt_auth::jwt_auth_middleware, )), ) .route( diff --git a/integrationos-api/src/router/secured_jwt.rs b/integrationos-api/src/router/secured_jwt.rs index 21eebbf4..77471ca4 100644 --- a/integrationos-api/src/router/secured_jwt.rs +++ b/integrationos-api/src/router/secured_jwt.rs @@ -2,7 +2,8 @@ use crate::{ logic::{ common_enum, common_model, connection_definition, connection_model_definition::{self}, - connection_model_schema, connection_oauth_definition, openapi, platform, platform_page, + connection_model_schema, connection_oauth_definition, event_callback, openapi, platform, + platform_page, }, middleware::jwt_auth::{self, JwtState}, server::AppState, @@ -38,12 +39,13 @@ pub async fn get_router(state: &Arc) -> Router> { .nest("/platforms", platform::get_router()) .nest("/platform-pages", platform_page::get_router()) .nest("/common-models", common_model::get_router()) - .nest("/common-enums", common_enum::get_router()); + .nest("/common-enums", common_enum::get_router()) + .nest("/event-callbacks", event_callback::get_router()); routes .layer(from_fn_with_state( - Arc::new(JwtState::new(state)), - jwt_auth::jwt_auth, + Arc::new(JwtState::from_state(state)), + jwt_auth::jwt_auth_middleware, )) .layer(from_fn(log_request_middleware)) .layer(TraceLayer::new_for_http()) diff --git a/integrationos-api/src/router/secured_key.rs b/integrationos-api/src/router/secured_key.rs index 1f3eb83a..b97a9b57 100644 --- a/integrationos-api/src/router/secured_key.rs +++ b/integrationos-api/src/router/secured_key.rs @@ -10,7 +10,7 @@ use crate::{ }, middleware::{ blocker::{handle_blocked_error, BlockInvalidHeaders}, - extractor::{rate_limit, RateLimiter}, + extractor::{rate_limit_middleware, RateLimiter}, header_auth, }, server::AppState, @@ -56,10 +56,10 @@ pub async fn get_router(state: &Arc) -> Router> { .layer(TraceLayer::new_for_http()) .nest("/metrics", metrics::get_router()); - let routes = match RateLimiter::new(state.clone()).await { + let routes = match RateLimiter::from_state(state.clone()).await { Ok(rate_limiter) => routes.layer(axum::middleware::from_fn_with_state( Arc::new(rate_limiter), - rate_limit, + rate_limit_middleware, )), Err(e) => { warn!("Could not connect to redis: {e}"); @@ -68,7 +68,10 @@ pub async fn get_router(state: &Arc) -> Router> { }; routes - .layer(from_fn_with_state(state.clone(), header_auth::header_auth)) + .layer(from_fn_with_state( + state.clone(), + header_auth::header_auth_middleware, + )) .layer(from_fn(log_request_middleware)) .layer(TraceLayer::new_for_http()) .layer(SetSensitiveRequestHeadersLayer::new(once( @@ -78,7 +81,7 @@ pub async fn get_router(state: &Arc) -> Router> { ServiceBuilder::new() .layer(HandleErrorLayer::new(handle_blocked_error)) .layer(FilterLayer::new( - BlockInvalidHeaders::new(state.clone()).await, + BlockInvalidHeaders::from_state(state.clone()).await, )), ) } diff --git a/integrationos-api/src/server.rs b/integrationos-api/src/server.rs index 45d96baa..a2353a20 100644 --- a/integrationos-api/src/server.rs +++ b/integrationos-api/src/server.rs @@ -230,7 +230,7 @@ impl Server { ); let events = events.clone(); tokio::spawn(async move { - if let Err(e) = events.insert_many(to_save, None).await { + if let Err(e) = events.insert_many(to_save).await { error!("Could not save buffer of events: {e}"); } }); @@ -262,20 +262,22 @@ impl Server { .await; if let Ok(Some(metric)) = res { let doc = metric.update_doc(); - let client = metrics.update_one( - bson::doc! { - "clientId": &metric.ownership().client_id, - }, - doc.clone(), - options.clone(), - ); - let system = metrics.update_one( - bson::doc! { - "clientId": metric_system_id.as_str(), - }, - doc, - options.clone(), - ); + let client = metrics + .update_one( + bson::doc! { + "clientId": &metric.ownership().client_id, + }, + doc.clone(), + ) + .with_options(options.clone()); + let system = metrics + .update_one( + bson::doc! { + "clientId": metric_system_id.as_str(), + }, + doc, + ) + .with_options(options.clone()); if let Err(e) = try_join!(client, system) { error!("Could not upsert metric: {e}"); } diff --git a/integrationos-api/tests/context.rs b/integrationos-api/tests/context.rs index 7f00099b..4701ce25 100644 --- a/integrationos-api/tests/context.rs +++ b/integrationos-api/tests/context.rs @@ -319,7 +319,7 @@ impl TestServer { }) } - pub async fn send_request_with_auth_headers( + async fn send_request_with_auth_headers( &self, path: &str, method: http::Method, diff --git a/integrationos-api/tests/http/callback.rs b/integrationos-api/tests/http/callback.rs new file mode 100644 index 00000000..8a5f6214 --- /dev/null +++ b/integrationos-api/tests/http/callback.rs @@ -0,0 +1,41 @@ +use crate::context::TestServer; +use http::{Method, StatusCode}; +use integrationos_domain::{environment::Environment, prefix::IdPrefix, Connection, Id}; +use serde_json::Value; + +#[tokio::test] +async fn test_database_connection_lost_callback() { + let mut server = TestServer::new(None).await; + + let (mut connection, _) = server.create_connection(Environment::Live).await; + connection.group = server.live_access_key.data.group.clone(); + + let connection_id = connection.id.to_string(); + + let path = format!("v1/event-callbacks/database-connection-lost/{connection_id}"); + + let request = server + .send_request::(&path, Method::POST, None, None) + .await + .expect("Failed to send request"); + + assert_eq!(request.code, StatusCode::OK); + assert!(request.data.record_metadata.deprecated); + assert!(!request.data.record_metadata.deleted); + assert!(!request.data.record_metadata.active); +} + +#[tokio::test] +async fn test_database_connection_lost_callback_404() { + let server = TestServer::new(None).await; + + let connection_id = Id::now(IdPrefix::Connection).to_string(); + let path = format!("v1/event-callbacks/database-connection-lost/{connection_id}"); + + let request = server + .send_request::(&path, Method::POST, None, None) + .await + .expect("Failed to send request"); + + assert_eq!(request.code, StatusCode::NOT_FOUND); +} diff --git a/integrationos-api/tests/http/mod.rs b/integrationos-api/tests/http/mod.rs index 0d4a6505..3478eba2 100644 --- a/integrationos-api/tests/http/mod.rs +++ b/integrationos-api/tests/http/mod.rs @@ -1,4 +1,5 @@ pub mod auth; +pub mod callback; pub mod connection; pub mod crud; pub mod pagination; diff --git a/integrationos-archiver/src/main.rs b/integrationos-archiver/src/main.rs index c1a107ed..c97ec658 100644 --- a/integrationos-archiver/src/main.rs +++ b/integrationos-archiver/src/main.rs @@ -110,14 +110,12 @@ async fn dump( let document = target_store .collection - .find_one( - doc! {}, - Some( - mongodb::options::FindOneOptions::builder() - .sort(doc! { "createdAt": 1 }) // Sort by `createdAt` in ascending order - .projection(doc! { "createdAt": 1 }) // Only retrieve the `createdAt` field - .build(), - ), + .find_one(doc! {}) + .with_options( + FindOneOptions::builder() + .sort(doc! { "createdAt": 1 }) // Sort by `createdAt` in ascending order + .projection(doc! { "createdAt": 1 }) // Only retrieve the `createdAt` field + .build(), ) .await .map_err(|e| anyhow!("Failed to find first event in collection: {e}"))?; @@ -139,10 +137,10 @@ async fn dump( let last_chosen_date_event = archives .collection - .find_one( - doc! { - "type": "DateChosen" - }, + .find_one(doc! { + "type": "DateChosen" + }) + .with_options( FindOneOptions::builder() .sort(doc! { "endsAt": -1 }) .build(), @@ -156,13 +154,10 @@ async fn dump( Event::DateChosen(e) => { let finished = archives .collection - .find_one( - doc! { - "type": "Finished", - "reference": e.reference().to_string() - }, - None, - ) + .find_one(doc! { + "type": "Finished", + "reference": e.reference().to_string() + }) .await? .map(|e| e.is_finished()) .unwrap_or(false); @@ -253,7 +248,7 @@ async fn dump( } }; - target_store.collection.delete_many(filter, None).await?; + target_store.collection.delete_many(filter).await?; tracing::warn!("Old events deleted successfully"); } Ok::<_, anyhow::Error>(()) @@ -302,7 +297,7 @@ async fn save( }; let count = target_store .collection - .count_documents(filter.clone(), None) + .count_documents(filter.clone()) .await?; tracing::info!( @@ -320,7 +315,7 @@ async fn save( // Run this only on debug mode if cfg!(debug_assertions) { - let events = target_store.collection.find(filter.clone(), None).await?; + let events = target_store.collection.find(filter.clone()).await?; let events = events.try_collect::>().await?; diff --git a/integrationos-cache/src/local/connection_model_schema_cache.rs b/integrationos-cache/src/local/connection_model_schema_cache.rs index 7f7c0258..e5d19b2d 100644 --- a/integrationos-cache/src/local/connection_model_schema_cache.rs +++ b/integrationos-cache/src/local/connection_model_schema_cache.rs @@ -40,7 +40,11 @@ impl ConnectionModelSchemaCache { } None => { tracing::debug!("Cache miss for key: {:?}", key); - let value = store.collection.find_one(filter, options).await?; + let value = store + .collection + .find_one(filter) + .with_options(options) + .await?; if let Some(value) = value { self.set(key, &value).await?; Ok(value) diff --git a/integrationos-domain/Cargo.toml b/integrationos-domain/Cargo.toml index 50d5ca2b..8682962c 100644 --- a/integrationos-domain/Cargo.toml +++ b/integrationos-domain/Cargo.toml @@ -45,6 +45,7 @@ http.workspace = true indexmap = "2.6.0" js-sandbox-ios.workspace = true jsonpath_lib.workspace = true +jsonwebtoken.workspace = true kube.workspace = true k8s-openapi = { workspace = true, features = ["latest"] } mongodb.workspace = true diff --git a/integrationos-domain/src/algebra/store.rs b/integrationos-domain/src/algebra/store.rs index e4b1b415..74a4cde5 100644 --- a/integrationos-domain/src/algebra/store.rs +++ b/integrationos-domain/src/algebra/store.rs @@ -9,7 +9,7 @@ use serde::de::DeserializeOwned; use serde::Serialize; #[derive(Debug, Clone)] -pub struct MongoStore { +pub struct MongoStore { pub collection: Collection, } @@ -23,29 +23,19 @@ impl MongoStore &self, pipeline: Vec, ) -> Result, IntegrationOSError> { - let cursor = self.collection.aggregate(pipeline, None).await?; + let cursor = self.collection.aggregate(pipeline).await?; let results = cursor.try_collect().await?; Ok(results) } pub async fn get_one(&self, filter: Document) -> Result, IntegrationOSError> { - Ok(self.collection.find_one(filter, None).await?) + Ok(self.collection.find_one(filter).await?) } pub async fn get_one_by_id(&self, id: &str) -> Result, IntegrationOSError> { let filter = doc! { "_id": id }; - Ok(self.collection.find_one(filter, None).await?) - } - - /// Get all records from the collection - /// - /// Use this method with caution, as it can be very slow for large collections. - pub async fn get_all(&self) -> Result, IntegrationOSError> { - let cursor = self.collection.find(None, None).await?; - let records = cursor.try_collect().await?; - - Ok(records) + Ok(self.collection.find_one(filter).await?) } pub async fn get_many( @@ -66,20 +56,25 @@ impl MongoStore filter_options.sort = Some(doc! { "createdAt": -1 }); } - let cursor = self.collection.find(filter, filter_options).await?; + let cursor = self + .collection + .find(filter.unwrap_or_default()) + .with_options(filter_options) + .await?; + let records = cursor.try_collect().await?; Ok(records) } pub async fn create_one(&self, data: &T) -> Result<(), IntegrationOSError> { - self.collection.insert_one(data, None).await?; + self.collection.insert_one(data).await?; Ok(()) } pub async fn create_many(&self, data: &[T]) -> Result<(), IntegrationOSError> { - self.collection.insert_many(data, None).await?; + self.collection.insert_many(data).await?; Ok(()) } @@ -87,7 +82,7 @@ impl MongoStore pub async fn update_one(&self, id: &str, data: Document) -> Result<(), IntegrationOSError> { let filter = doc! { "_id": id }; - self.collection.update_one(filter, data, None).await?; + self.collection.update_one(filter, data).await?; Ok(()) } @@ -96,7 +91,7 @@ impl MongoStore filter: Document, data: Document, ) -> Result<(), IntegrationOSError> { - self.collection.update_many(filter, data, None).await?; + self.collection.update_many(filter, data).await?; Ok(()) } @@ -106,9 +101,7 @@ impl MongoStore filter: Document, data: &[Document], ) -> Result<(), IntegrationOSError> { - self.collection - .update_many(filter, data.to_vec(), None) - .await?; + self.collection.update_many(filter, data.to_vec()).await?; Ok(()) } @@ -120,7 +113,8 @@ impl MongoStore ) -> Result { Ok(self .collection - .count_documents(filter, CountOptions::builder().limit(limit).build()) + .count_documents(filter) + .with_options(CountOptions::builder().limit(limit).build()) .await?) } } diff --git a/integrationos-domain/src/domain/configuration/database.rs b/integrationos-domain/src/domain/configuration/database.rs index cfd82642..9c5ff49b 100644 --- a/integrationos-domain/src/domain/configuration/database.rs +++ b/integrationos-domain/src/domain/configuration/database.rs @@ -79,6 +79,8 @@ pub struct DatabaseConnectionConfig { pub address: SocketAddr, #[envconfig(from = "ENVIRONMENT", default = "development")] pub environment: Environment, + #[envconfig(from = "EMIT_URL", default = "http://localhost:3001")] + pub emit_url: String, #[envconfig(nested = true)] pub postgres_config: PostgresConfig, #[envconfig(from = "DATABASE_CONNECTION_TYPE", default = "postgres")] @@ -206,6 +208,7 @@ impl Default for DatabaseConnectionConfig { fn default() -> Self { Self { worker_threads: Some(1), + emit_url: "http://localhost:3001".to_string(), address: SocketAddr::new("0.0.0.0".parse().expect("Invalid address"), 5005), environment: Environment::Development, postgres_config: PostgresConfig::default(), @@ -218,6 +221,7 @@ impl Display for DatabaseConnectionConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { writeln!(f, "WORKER_THREADS: {:?}", self.worker_threads)?; writeln!(f, "INTERNAL_SERVER_ADDRESS: {}", self.address)?; + writeln!(f, "EMIT_URL: {}", self.emit_url)?; writeln!(f, "{}", self.environment)?; match self.database_connection_type { DatabaseConnectionType::PostgreSql => writeln!(f, "{}", self.postgres_config), diff --git a/integrationos-domain/src/domain/error/mod.rs b/integrationos-domain/src/domain/error/mod.rs index ee5e87b0..8a51e38a 100644 --- a/integrationos-domain/src/domain/error/mod.rs +++ b/integrationos-domain/src/domain/error/mod.rs @@ -1038,8 +1038,8 @@ impl From for IntegrationOSError { mongodb::error::ErrorKind::BulkWrite(error) => InternalError::unknown( &error .write_errors - .into_iter() - .map(|e| e.into_iter().map(|e| e.message).collect()) + .into_values() + .map(|e| e.message) .collect::>() .join(", "), Some("Bulk write error"), diff --git a/integrationos-domain/src/domain/http/mod.rs b/integrationos-domain/src/domain/http/mod.rs index 7349f4d2..5825a854 100644 --- a/integrationos-domain/src/domain/http/mod.rs +++ b/integrationos-domain/src/domain/http/mod.rs @@ -1,6 +1,16 @@ +use super::IntegrationOSError; +use crate::InternalError; +use chrono::Utc; +use jsonwebtoken::{EncodingKey, Header}; use serde::{Deserialize, Serialize}; -#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Hash)] +pub const DEFAULT_AUDIENCE: &str = "integrationos-users"; +pub const DEFAULT_ISSUER: &str = "integrationos"; + +pub const FALLBACK_AUDIENCE: &str = "buildable-users"; +pub const FALLBACK_ISSUER: &str = "buildable"; + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Hash, Default)] #[serde(rename_all = "camelCase")] pub struct Claims { #[serde(rename = "_id")] @@ -19,3 +29,25 @@ pub struct Claims { pub aud: String, pub iss: String, } + +impl Claims { + pub fn from_secret(secret: &str) -> Result { + let now = Utc::now(); + + let header = Header::default(); + let claims = Claims { + is_buildable_core: true, + iat: now.timestamp(), + exp: now.timestamp() + 60, + aud: DEFAULT_AUDIENCE.to_string(), + iss: DEFAULT_ISSUER.to_string(), + ..Default::default() + }; + let key = EncodingKey::from_secret(secret.as_bytes()); + + jsonwebtoken::encode(&header, &claims, &key).map_err(|e| { + tracing::error!("Failed to encode token: {e}"); + InternalError::invalid_argument("Failed to encode token", None) + }) + } +} diff --git a/integrationos-domain/src/domain/shared/record_metadata.rs b/integrationos-domain/src/domain/shared/record_metadata.rs index c1932ca0..c933efed 100644 --- a/integrationos-domain/src/domain/shared/record_metadata.rs +++ b/integrationos-domain/src/domain/shared/record_metadata.rs @@ -72,6 +72,22 @@ impl RecordMetadata { self.change_log.insert(log_entry, now); } + // Mark record as inactive + pub fn mark_inactive(&mut self, modifier: &str) { + let now = Utc::now().timestamp_millis(); + self.active = false; + let log_entry = format!("Marked as inactive by {}", modifier); + self.change_log.insert(log_entry, now); + } + + // Mark record as deprecated + pub fn mark_deprecated(&mut self, modifier: &str) { + let now = Utc::now().timestamp_millis(); + self.deprecated = true; + let log_entry = format!("Marked as deprecated by {}", modifier); + self.change_log.insert(log_entry, now); + } + // Add tag to record pub fn add_tag(&mut self, tag: &str) { self.tags.push(tag.to_string()); diff --git a/integrationos-emit/Cargo.toml b/integrationos-emit/Cargo.toml index 66ee1cf6..327c635a 100644 --- a/integrationos-emit/Cargo.toml +++ b/integrationos-emit/Cargo.toml @@ -5,8 +5,8 @@ edition = "2021" [dependencies] anyhow.workspace = true -axum.workspace = true async-trait.workspace = true +axum.workspace = true chrono.workspace = true dotenvy.workspace = true envconfig.workspace = true @@ -26,13 +26,12 @@ serde = { workspace = true , features = ["derive"] } serde_json.workspace = true strum.workspace = true tokio = { workspace = true, features = ["full"] } +tokio-graceful-shutdown = "0.15.2" tower-http.workspace = true tracing.workspace = true -log = "0.4.22" -ctrlc = { version = "3.4.5", features = ["termination"] } -tokio-graceful-shutdown = "0.15.2" [dev-dependencies] +mockito.workspace = true testcontainers-modules = { workspace = true, features = ["mongo"] } tracing-subscriber.workspace = true uuid = { workspace = true, features = ["v4", "serde"] } diff --git a/integrationos-emit/src/algebra/event.rs b/integrationos-emit/src/algebra/event.rs new file mode 100644 index 00000000..005a3b77 --- /dev/null +++ b/integrationos-emit/src/algebra/event.rs @@ -0,0 +1,48 @@ +use crate::{domain::event::Event, server::AppState}; +use async_trait::async_trait; +use http::header::AUTHORIZATION; +use integrationos_domain::{ApplicationError, Claims, Id, IntegrationOSError, InternalError, Unit}; + +#[async_trait] +pub trait EventExt { + async fn side_effect(&self, ctx: &AppState, entity_id: Id) -> Result; +} + +#[async_trait] +impl EventExt for Event { + async fn side_effect(&self, ctx: &AppState, entity_id: Id) -> Result { + match self { + Event::DatabaseConnectionLost { connection_id, .. } => { + let base_path = &ctx.config.event_callback_url; + let path = format!("{base_path}/database-connection-lost/{connection_id}"); + + let authorization = Claims::from_secret(ctx.config.jwt_secret.as_str())?; + + ctx.http_client + .post(path) + .header(AUTHORIZATION, format!("Bearer {authorization}")) + .send() + .await + .inspect(|res| { + tracing::info!("Response: {:?}", res); + }) + .map_err(|e| { + tracing::error!("Failed to build request for entity id {entity_id}: {e}"); + InternalError::io_err( + &format!("Failed to build request for entity id {entity_id}"), + None, + ) + })? + .error_for_status() + .map_err(|e| { + tracing::error!("Failed to execute request for entity id {entity_id}: {e}"); + ApplicationError::bad_request( + &format!("Failed to execute request for entity id {entity_id}"), + None, + ) + }) + .map(|res| tracing::info!("Response: {:?}", res)) + } + } + } +} diff --git a/integrationos-emit/src/algebra/mod.rs b/integrationos-emit/src/algebra/mod.rs new file mode 100644 index 00000000..53f11265 --- /dev/null +++ b/integrationos-emit/src/algebra/mod.rs @@ -0,0 +1 @@ +pub mod event; diff --git a/integrationos-emit/src/domain/config.rs b/integrationos-emit/src/domain/config.rs index 76750b30..0d512dcd 100644 --- a/integrationos-emit/src/domain/config.rs +++ b/integrationos-emit/src/domain/config.rs @@ -30,14 +30,32 @@ pub struct EmitterConfig { pub event_stream_provider: EventStreamProvider, #[envconfig(from = "EVENT_PROCESSING_MAX_RETRIES", default = "5")] pub event_processing_max_retries: u32, + #[envconfig(from = "EVENT_MAX_SPAN_FOR_RETRY_SECS", default = "86400")] + pub event_max_span_for_retry_secs: i64, #[envconfig(from = "SCHEDULED_MAX_CONCURRENT_TASKS", default = "10")] pub scheduled_max_concurrent_tasks: usize, #[envconfig(from = "SCHEDULED_SLEEP_DURATION_IN_MILLIS", default = "1000")] pub scheduled_sleep_duration_millis: u64, #[envconfig(from = "SCHEDULED_MAX_CHUNK_SIZE", default = "100")] pub scheduled_max_chunk_size: usize, - #[envconfig(from = "SHUTDOWN_TIMEOUT_SECS", default = "10")] - pub shutdown_timeout_secs: u64, + #[envconfig(from = "PUSHER_MAX_CONCURRENT_TASKS", default = "10")] + pub pusher_max_concurrent_tasks: usize, + #[envconfig(from = "PUSHER_SLEEP_DURATION_IN_MILLIS", default = "1000")] + pub pusher_sleep_duration_millis: u64, + #[envconfig(from = "PUSHER_MAX_CHUNK_SIZE", default = "100")] + pub pusher_max_chunk_size: usize, + #[envconfig(from = "SHUTDOWN_TIMEOUT_MILLIS", default = "20000")] + pub shutdown_timeout_millis: u64, + #[envconfig( + from = "JWT_SECRET", + default = "2thZ2UiOnsibmFtZSI6IlN0YXJ0dXBsa3NoamRma3NqZGhma3NqZGhma3NqZG5jhYtggfaP9ubmVjdGlvbnMiOjUwMDAwMCwibW9kdWxlcyI6NSwiZW5kcG9pbnRzIjo3b4e05e2-f050-401f-9822-44f43f71753c" + )] + pub jwt_secret: String, + #[envconfig( + from = "EVENT_CALLBACK_URL", + default = "http://localhost:3005/v1/event-callbacks" + )] + pub event_callback_url: String, #[envconfig(nested = true)] pub fluvio: EventStreamConfig, #[envconfig(nested = true)] @@ -63,12 +81,30 @@ impl Display for EmitterConfig { self.http_client_max_retries )?; writeln!(f, "EVENT_STREAM_PROVIDER: {}", self.event_stream_provider)?; + writeln!( + f, + "EVENT_MAX_SPAN_FOR_RETRY_DAYS: {}", + self.event_max_span_for_retry_secs + )?; + writeln!( + f, + "PUSHER_MAX_CONCURRENT_TASKS: {}", + self.pusher_max_concurrent_tasks + )?; + writeln!( + f, + "PUSHER_SLEEP_DURATION_IN_MILLIS: {}", + self.pusher_sleep_duration_millis + )?; + writeln!(f, "PUSHER_MAX_CHUNK_SIZE: {}", self.pusher_max_chunk_size)?; + writeln!(f, "JWT_SECRET: ****")?; + writeln!(f, "EVENT_CALLBACK_URL: {}", self.event_callback_url)?; writeln!( f, "EVENT_PROCESSING_MAX_RETRIES: {}", self.event_processing_max_retries )?; - writeln!(f, "SHUTDOWN_TIMEOUT_SECS: {}", self.shutdown_timeout_secs)?; + writeln!(f, "SHUTDOWN_TIMEOUT_SECS: {}", self.shutdown_timeout_millis)?; writeln!(f, "{}", self.fluvio)?; writeln!(f, "{}", self.cache)?; writeln!(f, "{}", self.db_config) diff --git a/integrationos-emit/src/domain/event.rs b/integrationos-emit/src/domain/event.rs index 017469de..7f57755f 100644 --- a/integrationos-emit/src/domain/event.rs +++ b/integrationos-emit/src/domain/event.rs @@ -1,16 +1,11 @@ -use crate::server::AppState; -use async_trait::async_trait; +use crate::{algebra::event::EventExt, server::AppState}; +use chrono::Utc; use integrationos_domain::{ prefix::IdPrefix, record_metadata::RecordMetadata, Id, IntegrationOSError, Unit, }; use serde::{Deserialize, Serialize}; use strum::{AsRefStr, EnumString}; -#[async_trait] -pub trait EventExt { - async fn side_effect(&self, ctx: &AppState, entity_id: Id) -> Result; -} - #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)] #[serde(rename_all = "PascalCase", tag = "type")] pub enum Event { @@ -21,28 +16,12 @@ pub enum Event { }, } -#[async_trait] -impl EventExt for Event { - async fn side_effect( - &self, - _ctx: &AppState, - entity_id: Id, - ) -> Result { - match self { - Event::DatabaseConnectionLost { .. } => Ok(tracing::info!( - "Received event: {:?}. With id: {entity_id}", - self - )), - } - } -} - impl Event { pub fn as_entity(&self) -> EventEntity { EventEntity { entity: self.clone(), entity_id: Id::now(IdPrefix::PipelineEvent), - outcome: None, + outcome: EventStatus::Created, metadata: RecordMetadata::default(), } } @@ -60,13 +39,13 @@ pub struct EventEntity { #[serde(rename = "_id")] pub entity_id: Id, pub entity: Event, - pub outcome: Option, - #[serde(flatten)] + pub outcome: EventStatus, + #[serde(flatten, default)] pub metadata: RecordMetadata, } impl EventEntity { - pub fn with_outcome(&self, outcome: Option) -> Self { + pub fn with_outcome(&self, outcome: EventStatus) -> Self { let mut metadata = self.metadata.clone(); metadata.mark_updated("system"); Self { @@ -77,52 +56,59 @@ impl EventEntity { } } - pub fn partition_key(&self) -> String { - match self.entity { - Event::DatabaseConnectionLost { .. } => "connection-broken".to_string(), - } - } - pub async fn side_effect(&self, ctx: &AppState) -> Result { self.entity.side_effect(ctx, self.entity_id).await } pub fn retries(&self) -> u32 { - self.outcome.iter().map(|o| o.retries()).sum() + self.outcome.retries() } pub fn error(&self) -> Option { - self.outcome.iter().filter_map(|o| o.err()).next() + self.outcome.error() + } + + pub fn is_created(&self) -> bool { + matches!(self.outcome, EventStatus::Created) } } #[derive(Debug, Clone, PartialEq, Deserialize, Serialize, EnumString, AsRefStr)] #[serde(rename_all = "kebab-case", tag = "type")] #[strum(serialize_all = "kebab-case")] -pub enum EventOutcome { - Success, - Error { error: String, retries: u32 }, +pub enum EventStatus { + Created, + Executed { timestamp: i64 }, + Succeded { retries: u32 }, + Errored { error: String, retries: u32 }, } -impl EventOutcome { - pub fn success() -> Self { - Self::Success +impl EventStatus { + pub fn succeded(retries: u32) -> Self { + Self::Succeded { retries } } - pub fn error(error: String, retries: u32) -> Self { - Self::Error { error, retries } + pub fn errored(error: String, retries: u32) -> Self { + Self::Errored { error, retries } + } + + pub fn executed() -> Self { + Self::Executed { + timestamp: Utc::now().timestamp_millis(), + } } fn retries(&self) -> u32 { match self { - Self::Error { retries, .. } => *retries, + Self::Errored { retries, .. } => *retries, + Self::Succeded { retries, .. } => *retries, _ => 0, } } - fn err(&self) -> Option { + fn error(&self) -> Option { match self { - Self::Error { error, .. } => Some(error.clone()), + Self::Errored { error, .. } => Some(error.clone()), _ => None, } } diff --git a/integrationos-emit/src/domain/idempotency.rs b/integrationos-emit/src/domain/idempotency.rs index 3a35c85f..0d5789e1 100644 --- a/integrationos-emit/src/domain/idempotency.rs +++ b/integrationos-emit/src/domain/idempotency.rs @@ -3,15 +3,15 @@ use serde::{Deserialize, Serialize}; use std::fmt::Display; #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -pub struct IdempotencyKey(String); +pub struct IdempotencyKey(Id); impl IdempotencyKey { - pub fn new(key: String) -> Self { + pub fn new(key: Id) -> Self { Self(key) } - pub fn inner(&self) -> &str { - &self.0 + pub fn inner(&self) -> Id { + self.0 } } @@ -29,7 +29,6 @@ pub struct Idempotency { /// the box and we can use it as a conflict generation key #[serde(rename = "_id")] pub key: IdempotencyKey, - pub indexable: Id, #[serde(flatten)] pub metadata: RecordMetadata, } diff --git a/integrationos-emit/src/lib.rs b/integrationos-emit/src/lib.rs index 5724e216..9b7d20d5 100644 --- a/integrationos-emit/src/lib.rs +++ b/integrationos-emit/src/lib.rs @@ -1,3 +1,4 @@ +pub mod algebra; pub mod domain; pub mod logic; pub mod middleware; diff --git a/integrationos-emit/src/logic/emitter.rs b/integrationos-emit/src/logic/emitter.rs index 1ea977fd..a0663ce2 100644 --- a/integrationos-emit/src/logic/emitter.rs +++ b/integrationos-emit/src/logic/emitter.rs @@ -29,6 +29,7 @@ pub fn get_router() -> Router> { #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct EntityIdResponse { + pub idempotency_key: Id, pub entity_id: Id, } @@ -42,7 +43,7 @@ pub async fn emit( .app_stores .idempotency .get_one(doc! { - "_id": idempotency_key.inner() + "_id": idempotency_key.inner().to_string() }) .await .map(|idempotency| idempotency.is_some()) @@ -56,7 +57,6 @@ pub async fn emit( } else { let idempotency = Idempotency { key: idempotency_key.clone(), - indexable: Id::now(IdPrefix::Idempotency), metadata: RecordMetadata::default(), }; @@ -73,7 +73,10 @@ pub async fn emit( .publish(event.as_entity(), EventStreamTopic::Target) .await?; - Ok(Json(EntityIdResponse { entity_id: id })) + Ok(Json(EntityIdResponse { + entity_id: id, + idempotency_key: idempotency_key.inner(), + })) } Some(schedule_on) => { let scheduled = ScheduledEvent { @@ -91,6 +94,7 @@ pub async fn emit( Ok(Json(EntityIdResponse { entity_id: scheduled.id, + idempotency_key: idempotency_key.inner(), })) } } diff --git a/integrationos-emit/src/main.rs b/integrationos-emit/src/main.rs index 4b0f28b3..dd325029 100644 --- a/integrationos-emit/src/main.rs +++ b/integrationos-emit/src/main.rs @@ -5,47 +5,15 @@ use integrationos_domain::{ telemetry::{get_subscriber, init_subscriber}, Unit, }; -use integrationos_emit::{domain::config::EmitterConfig, server::Server, stream::EventStreamTopic}; +use integrationos_emit::{domain::config::EmitterConfig, server::Server}; use std::time::Duration; -use tokio_graceful_shutdown::{SubsystemBuilder, SubsystemHandle, Toplevel}; -use tracing::info; +use tokio_graceful_shutdown::{SubsystemHandle, Toplevel}; -async fn subsystem( - server: Server, - config: &EmitterConfig, - subsys: SubsystemHandle, -) -> Result { - info!("Starting Emitter API with config:\n{config}"); - - let state = server.state.clone(); - let stream = server.state.event_stream.clone(); - let scheduler = server.scheduler.clone(); - - subsys.start(SubsystemBuilder::new( - EventStreamTopic::Dlq.as_ref(), - |h| async move { stream.consume(EventStreamTopic::Dlq, h, &state).await }, - )); - - let state = server.state.clone(); - let stream = server.state.event_stream.clone(); - subsys.start(SubsystemBuilder::new( - EventStreamTopic::Target.as_ref(), - |h| async move { stream.consume(EventStreamTopic::Target, h, &state).await }, - )); - - subsys.start(SubsystemBuilder::new( - "Scheduler Subsystem", - |_| async move { scheduler.start().await }, - )); - - server.run().await -} - -fn main() -> Result<()> { +fn main() -> Result { dotenv().ok(); let config = EmitterConfig::init_from_env()?; - let shutdown_timeout_secs = config.shutdown_timeout_secs; + let shutdown_timeout_millis = config.shutdown_timeout_millis; let subscriber = get_subscriber("emitter".into(), "info".into(), std::io::stdout, None); init_subscriber(subscriber); @@ -55,17 +23,15 @@ fn main() -> Result<()> { .enable_all() .build()? .block_on(async move { - Toplevel::new(|s| async move { + Toplevel::new(|subsys: SubsystemHandle| async move { let server = Server::init(config.clone()) .await .expect("Failed to initialize server"); - s.start(SubsystemBuilder::new("ServerSubsys", |handle| async move { - subsystem(server, &config, handle).await - })); + Server::subsystem(server, &config, subsys).await; }) .catch_signals() - .handle_shutdown_requests(Duration::from_millis(shutdown_timeout_secs)) + .handle_shutdown_requests(Duration::from_millis(shutdown_timeout_millis)) .await .map_err(Into::into) }) diff --git a/integrationos-emit/src/middleware/idempotency.rs b/integrationos-emit/src/middleware/idempotency.rs index 7bf7e969..1811ffd6 100644 --- a/integrationos-emit/src/middleware/idempotency.rs +++ b/integrationos-emit/src/middleware/idempotency.rs @@ -1,42 +1,34 @@ use crate::domain::idempotency::IdempotencyKey; use axum::{body::Body, middleware::Next, response::Response}; use http::Request; -use integrationos_domain::{ApplicationError, IntegrationOSError}; +use integrationos_domain::{prefix::IdPrefix, ApplicationError, Id, IntegrationOSError}; pub const IDEMPOTENCY_HEADER_STR: &str = "x-integrationos-idempotency-key"; -const MAX_LENGTH: usize = 50; pub async fn header_idempotency( mut req: Request, next: Next, ) -> Result { - let Some(idempotency_key) = req.headers().get(IDEMPOTENCY_HEADER_STR) else { - return Err(ApplicationError::bad_request( - "Please provide an idempotency key", - None, - )); - }; + if let Some(idempotency_key) = req.headers().get(IDEMPOTENCY_HEADER_STR) { + let idempotency_key = idempotency_key + .to_str() + .map_err(|_| ApplicationError::bad_request("Invalid idempotency key", None))?; - let idempotency_key = idempotency_key - .to_str() - .map_err(|_| ApplicationError::bad_request("Invalid idempotency key", None))?; + if idempotency_key.is_empty() { + return Err(ApplicationError::bad_request( + "Invalid idempotency key, cannot be empty", + None, + )); + } - if idempotency_key.is_empty() { - return Err(ApplicationError::bad_request( - "Invalid idempotency key, cannot be empty", - None, - )); - } + let id = Id::try_from(idempotency_key.to_owned()) + .map_err(|_| ApplicationError::bad_request("Invalid idempotency key", None))?; - if idempotency_key.len() > MAX_LENGTH { - return Err(ApplicationError::bad_request( - "Idempotency key is too long, max length is 50", - None, - )); + let data = IdempotencyKey::new(id); + req.extensions_mut().insert(data); + } else { + let data = IdempotencyKey::new(Id::now(IdPrefix::Idempotency)); + req.extensions_mut().insert(data); } - - let data = IdempotencyKey::new(idempotency_key.to_owned()); - - req.extensions_mut().insert(data); Ok(next.run(req).await) } diff --git a/integrationos-emit/src/server.rs b/integrationos-emit/src/server.rs index a027acd6..c6f7cdc2 100644 --- a/integrationos-emit/src/server.rs +++ b/integrationos-emit/src/server.rs @@ -7,8 +7,8 @@ use crate::{ }, router, stream::{ - fluvio_driver::FluvioDriverImpl, logger_driver::LoggerDriverImpl, - scheduler::PublishScheduler, EventStreamExt, EventStreamProvider, + fluvio_driver::FluvioDriverImpl, logger_driver::LoggerDriverImpl, pusher::EventPusher, + scheduler::PublishScheduler, EventStreamExt, EventStreamProvider, EventStreamTopic, }, }; use anyhow::Result as AnyhowResult; @@ -19,7 +19,8 @@ use reqwest_middleware::{reqwest, ClientBuilder, ClientWithMiddleware}; use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware}; use reqwest_tracing::TracingMiddleware; use std::{sync::Arc, time::Duration}; -use tokio::net::TcpListener; +use tokio::{net::TcpListener, signal}; +use tokio_graceful_shutdown::{SubsystemBuilder, SubsystemHandle}; #[derive(Clone)] pub struct AppStores { @@ -42,6 +43,7 @@ pub struct Server { pub state: Arc, pub event_stream: Arc, pub scheduler: Arc, + pub pusher: Arc, } impl Server { @@ -49,16 +51,6 @@ impl Server { let client = Client::with_uri_str(&config.db_config.event_db_url).await?; let database = client.database(&config.db_config.event_db_name); - let retry_policy = - ExponentialBackoff::builder().build_with_max_retries(config.http_client_max_retries); - let client = reqwest::Client::builder() - .timeout(Duration::from_secs(config.http_client_timeout_secs)) - .build()?; - let http_client = ClientBuilder::new(client) - .with(RetryTransientMiddleware::new_with_policy(retry_policy)) - .with(TracingMiddleware::default()) - .build(); - let app_stores = AppStores { events: MongoStore::new(&database, &Store::PipelineEvents).await?, idempotency: MongoStore::new(&database, &Store::Idempotency).await?, @@ -72,6 +64,15 @@ impl Server { EventStreamProvider::Fluvio => Arc::new(FluvioDriverImpl::new(&config).await?), }; + let pusher = Arc::new(EventPusher { + event_stream: Arc::clone(&event_stream), + events: app_stores.events.clone(), + deduplication: app_stores.deduplication.clone(), + max_concurrent_tasks: config.pusher_max_concurrent_tasks, + max_chunk_size: config.pusher_max_chunk_size, + sleep_duration: config.pusher_sleep_duration_millis, + }); + let scheduler = Arc::new(PublishScheduler { event_stream: Arc::clone(&event_stream), scheduled: app_stores.scheduled.clone(), @@ -80,6 +81,16 @@ impl Server { sleep_duration: config.scheduled_sleep_duration_millis, }); + let retry_policy = + ExponentialBackoff::builder().build_with_max_retries(config.http_client_max_retries); + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(config.http_client_timeout_secs)) + .build()?; + let http_client = ClientBuilder::new(client) + .with(RetryTransientMiddleware::new_with_policy(retry_policy)) + .with(TracingMiddleware::default()) + .build(); + let state = Arc::new(AppState { config: config.clone(), app_stores, @@ -91,10 +102,11 @@ impl Server { state, event_stream, scheduler, + pusher, }) } - pub async fn run(&self) -> AnyhowResult<()> { + pub async fn run(&self, subsys: SubsystemHandle) -> AnyhowResult { let app = router::get_router(&self.state).await; let app: Router = app.with_state(self.state.clone()); @@ -104,7 +116,76 @@ impl Server { let tcp_listener = TcpListener::bind(&self.state.config.address).await?; axum::serve(tcp_listener, app) + .with_graceful_shutdown(Self::shutdown(subsys)) .await .map_err(|e| anyhow::anyhow!("Server error: {}", e)) } + + async fn shutdown(subsys: SubsystemHandle) { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::(); + + tokio::select! { + _ = ctrl_c => { + subsys.on_shutdown_requested().await; + }, + _ = terminate => { + subsys.on_shutdown_requested().await; + }, + } + tracing::info!("Starting server shutdown ..."); + } + + pub async fn subsystem( + server: Server, + config: &EmitterConfig, + subsys: SubsystemHandle, + ) -> Unit { + tracing::info!("Starting Emitter API with config:\n{config}"); + + let state = server.state.clone(); + let stream = server.state.event_stream.clone(); + let scheduler = server.scheduler.clone(); + let pusher = server.pusher.clone(); + + subsys.start(SubsystemBuilder::new( + EventStreamTopic::Dlq.as_ref(), + |h| async move { stream.consume(EventStreamTopic::Dlq, h, &state).await }, + )); + + let state = server.state.clone(); + let stream = server.state.event_stream.clone(); + subsys.start(SubsystemBuilder::new( + EventStreamTopic::Target.as_ref(), + |s| async move { stream.consume(EventStreamTopic::Target, s, &state).await }, + )); + // + let config = server.state.config.clone(); + subsys.start(SubsystemBuilder::new("PusherSubsystem", |s| async move { + pusher.start(&config, s).await + })); + + subsys.start(SubsystemBuilder::new( + "SchedulerSubsystem", + |s| async move { scheduler.start(s).await }, + )); + + subsys.start(SubsystemBuilder::new("ServerSubsystem", |s| async move { + server.run(s).await + })); + } } diff --git a/integrationos-emit/src/stream/fluvio_driver.rs b/integrationos-emit/src/stream/fluvio_driver.rs index 155fe6eb..23c2f10d 100644 --- a/integrationos-emit/src/stream/fluvio_driver.rs +++ b/integrationos-emit/src/stream/fluvio_driver.rs @@ -3,7 +3,7 @@ use crate::{ domain::{ config::{EmitterConfig, EventStreamConfig}, deduplication::Deduplication, - event::{EventEntity, EventOutcome}, + event::{EventEntity, EventStatus}, }, server::AppState, }; @@ -14,15 +14,20 @@ use fluvio::{ ConsumerConfigExt, ConsumerConfigExtBuilder, ConsumerStream, OffsetManagementStrategy, Record, }, + dataplane::link::ErrorCode, spu::SpuSocketPool, Compression, Fluvio, FluvioConfig, Offset, RetryPolicy, TopicProducer, TopicProducerConfigBuilder, }; use futures::StreamExt; -use integrationos_domain::{Id, IntegrationOSError, InternalError, Unit}; +use integrationos_domain::{Id, IntegrationOSError, InternalError, TimedExt, Unit}; use mongodb::bson::doc; use std::boxed::Box; -use std::time::Duration; +use std::{ + sync::atomic::{AtomicBool, AtomicU64, Ordering}, + time::Duration, +}; +use tokio::time::interval; use tokio_graceful_shutdown::SubsystemHandle; pub struct ConsumerConfig { @@ -30,8 +35,8 @@ pub struct ConsumerConfig { app: EventStreamConfig, } -pub type TargetProducer = TopicProducer; -pub type DlqProducer = TopicProducer; +type TargetProducer = TopicProducer; +type DlqProducer = TopicProducer; pub struct FluvioDriverImpl { pub client: Fluvio, @@ -159,10 +164,81 @@ impl FluvioDriverImpl { dlq_producer, }) } + + async fn consume_topic( + &self, + target: EventStreamTopic, + subsys: &SubsystemHandle, + ctx: &AppState, + consumer: &ConsumerConfig, + stream: &mut impl ConsumerStream>, + ) -> Result { + let mut interval = interval(Duration::from_millis(consumer.app.consumer_linger_time)); + interval.tick().await; + + // We don't really need it but we may use a different approach if something comes out of https://github.com/infinyon/fluvio/issues/4267#issuecomment-2489354987 + let count = AtomicU64::new(0); + let is_processing = AtomicBool::new(true); + + loop { + is_processing.store(false, Ordering::SeqCst); + tokio::select! { + timeout = interval.tick() => { + if count.load(std::sync::atomic::Ordering::SeqCst) > 0 { + tracing::info!("Committing offsets after {:?} for topic {}", timeout.elapsed(), target.as_ref()); + stream.offset_commit().map_err(|err| anyhow::anyhow!(err))?; + stream.offset_flush().await.map_err(|err| anyhow::anyhow!(err))?; + tracing::info!("Periodic offset commit completed for topic {}", target.as_ref()); + count.store(0, std::sync::atomic::Ordering::SeqCst); + } + + if subsys.is_shutdown_requested() && !is_processing.load(Ordering::SeqCst) { + tracing::info!("Consumer for {} cancelled by external request. Breaking the loop", target.as_ref()); + break Ok(()); + } + }, + record = stream.next() => { + count.fetch_add(1, Ordering::Relaxed); + + match record { + Some(Ok(record)) => { + let event: EventEntity = serde_json::from_slice(record.get_value()).context("Could not deserialize event")?; + is_processing.store(true, Ordering::SeqCst); + self.process(ctx, target, &event).await?; + is_processing.store(false, Ordering::SeqCst); + }, + Some(Err(err)) => return Err(InternalError::io_err(&format!("Error consuming record: {err}"), None)), + None => { + tracing::info!("Consumer stream closed"); + subsys.request_shutdown(); + } + } + + if count.load(std::sync::atomic::Ordering::SeqCst) >= consumer.app.consumer_batch_size as u64 { + count.store(0, Ordering::SeqCst); + stream.offset_commit().map_err(|err| anyhow::anyhow!(err))?; + stream.offset_flush().await.map_err(|err| anyhow::anyhow!(err))?; + } + + if subsys.is_shutdown_requested() { + tracing::info!("Consumer for {} cancelled by external request. Breaking the loop", target.as_ref()); + break Ok(()); + } + } + } + } + } } #[async_trait] impl EventStreamExt for FluvioDriverImpl { + /** + * Publishes an event to the specified topic + * @param event - The event to publish + * @param target - The target topic of the event + * + * It serializes the event using serde_json and sends it to the specified topic. + */ async fn publish( &self, event: EventEntity, @@ -175,7 +251,7 @@ impl EventStreamExt for FluvioDriverImpl { match target { EventStreamTopic::Target => { self.tgt_producer - .send(event.partition_key(), payload) + .send(event.entity_id.to_string(), payload) .await .map_err(|e| { InternalError::io_err(&format!("Could not send event to fluvio: {e}"), None) @@ -183,7 +259,7 @@ impl EventStreamExt for FluvioDriverImpl { } EventStreamTopic::Dlq => { self.dlq_producer - .send(event.partition_key(), payload) + .send(event.entity_id.to_string(), payload) .await .map_err(|e| { InternalError::io_err(&format!("Could not send event to fluvio: {e}"), None) @@ -194,6 +270,15 @@ impl EventStreamExt for FluvioDriverImpl { Ok(event.entity_id) } + /** + * Consumes events from the specified topic + * @param target - The target topic of the event + * @param subsys - The subsystem handle + * @param ctx - The application state + * + * It consumes events from the specified topic using the consumer stream. + * It processes each event and updates the event outcome in the events collection. + */ async fn consume( &self, target: EventStreamTopic, @@ -210,63 +295,9 @@ impl EventStreamExt for FluvioDriverImpl { .consumer_with_config(consumer.ext.clone()) .await?; - let mut count = 0; - let mut interval = - tokio::time::interval(Duration::from_millis(consumer.app.consumer_linger_time)); - interval.tick().await; - - loop { - tokio::select! { - timeout = interval.tick() => { - - if count > 0 || subsys.is_shutdown_requested() { - tracing::info!("Committing offsets after {timeout:?}"); - stream.offset_commit().map_err(|err| anyhow::anyhow!(err))?; - stream.offset_flush().await.map_err(|err| anyhow::anyhow!(err))?; - tracing::info!("Periodic offset commit completed."); - count = 0; // Reset count after Committing - - } - - if subsys.is_shutdown_requested() { - tracing::info!("Consumer cancelled, gracefully shutting down. Committing pending offsets"); - return Ok(()); - } - - }, - record = stream.next() => { - count += 1; - - match record { - Some(Ok(record)) => { - self.process(ctx, target, &record).await?; - }, - Some(Err(err)) => return Err(InternalError::io_err(&format!("Error consuming record: {err}"), None)), - None => { - tracing::info!("Consumer stream closed"); - return Ok(()); - } - } - - if count >= consumer.app.consumer_batch_size || subsys.is_shutdown_requested() { - count = 0; - stream.offset_commit().map_err(|err| anyhow::anyhow!(err))?; - stream.offset_flush().await.map_err(|err| anyhow::anyhow!(err))?; - } - - if subsys.is_shutdown_requested() { - tracing::info!("Consumer cancelled, gracefully shutting down. Committing pending offsets"); - return Ok(()); - } - } - _ = subsys.on_shutdown_requested() => { - tracing::info!("Consumer cancelled, gracefully shutting down. Committing pending offsets"); - stream.offset_commit().map_err(|err| anyhow::anyhow!(err))?; - stream.offset_flush().await.map_err(|err| anyhow::anyhow!(err))?; - return Ok(()); - }, - } - } + // match + self.consume_topic(target, &subsys, ctx, consumer, &mut stream) + .await } /** @@ -284,11 +315,8 @@ impl EventStreamExt for FluvioDriverImpl { &self, ctx: &AppState, target: EventStreamTopic, - event: &Record, + event: &EventEntity, ) -> Result { - let event: EventEntity = - serde_json::from_slice(event.get_value()).context("Could not deserialize event")?; - let is_processed = ctx .app_stores .deduplication @@ -319,24 +347,31 @@ impl EventStreamExt for FluvioDriverImpl { match target { EventStreamTopic::Target => { - ctx.app_stores - .events - .create_one(&event) - .await - .map_err(|e| { - tracing::error!("Could not create event record: {e}"); - InternalError::unknown("Could not create event record", None) - })?; + ctx.app_stores.events.create_one(event).await.map_err(|e| { + tracing::error!("Could not create event record: {e}"); + InternalError::unknown("Could not create event record", None) + })?; tracing::info!("Event with id {} is ready to be processed", event.entity_id); - let result = event.side_effect(ctx).await; + let result = event + .side_effect(ctx) + .timed(|_, elapsed| { + tracing::info!( + "Side effect for entity id {} took {}ms", + event.entity_id, + elapsed.as_millis() + ) + }) + .await; + + update_event_outcome(ctx, event, EventStatus::executed()).await?; if let Err(e) = result { tracing::error!("Error processing event: {e}, removing deduplication record"); - delete_deduplication_record(ctx, &event).await?; + delete_deduplication_record(ctx, event).await?; - let outcome = EventOutcome::error(e.to_string(), 1); - let event = event.with_outcome(Some(outcome.clone())); + let outcome = EventStatus::errored(e.to_string(), 1); + let event = event.with_outcome(outcome.clone()); self.publish(event.clone(), EventStreamTopic::Dlq).await?; @@ -345,7 +380,7 @@ impl EventStreamExt for FluvioDriverImpl { return Ok(()); } - update_event_outcome(ctx, &event, EventOutcome::success()).await?; + update_event_outcome(ctx, event, EventStatus::succeded(event.retries())).await?; } EventStreamTopic::Dlq => { tracing::info!("Event with id {} is in DLQ", event.entity_id); @@ -356,10 +391,10 @@ impl EventStreamExt for FluvioDriverImpl { tracing::error!( "Error processing event: {e}, removing deduplication record" ); - delete_deduplication_record(ctx, &event).await?; + delete_deduplication_record(ctx, event).await?; - let outcome = EventOutcome::error(e.to_string(), event.retries() + 1); - let event = event.with_outcome(Some(outcome.clone())); + let outcome = EventStatus::errored(e.to_string(), event.retries() + 1); + let event = event.with_outcome(outcome.clone()); self.publish(event.clone(), EventStreamTopic::Dlq).await?; @@ -368,7 +403,8 @@ impl EventStreamExt for FluvioDriverImpl { return Ok(()); } - update_event_outcome(ctx, &event, EventOutcome::success()).await?; + update_event_outcome(ctx, event, EventStatus::succeded(event.retries())) + .await?; } else { tracing::info!("Giving up on event with id {}", event.entity_id); // this is the case where we exhausted the retries, now @@ -376,7 +412,7 @@ impl EventStreamExt for FluvioDriverImpl { let error = event.error().unwrap_or_default() + ".\n Exhausted retries, cannot process event"; - update_event_outcome(ctx, &event, EventOutcome::error(error, event.retries())) + update_event_outcome(ctx, event, EventStatus::errored(error, event.retries())) .await?; // TODO: create an alert on grafana @@ -395,12 +431,9 @@ async fn delete_deduplication_record( ctx.app_stores .deduplication .collection - .delete_one( - doc! { - "_id": event.entity_id.to_string() - }, - None, - ) + .delete_one(doc! { + "_id": event.entity_id.to_string() + }) .await?; Ok(()) @@ -409,7 +442,7 @@ async fn delete_deduplication_record( async fn update_event_outcome( ctx: &AppState, event: &EventEntity, - outcome: EventOutcome, + outcome: EventStatus, ) -> Result { let outcome = mongodb::bson::to_bson(&outcome).context("Could not serialize event")?; diff --git a/integrationos-emit/src/stream/logger_driver.rs b/integrationos-emit/src/stream/logger_driver.rs index b0896f7b..4a1406bb 100644 --- a/integrationos-emit/src/stream/logger_driver.rs +++ b/integrationos-emit/src/stream/logger_driver.rs @@ -1,7 +1,6 @@ use super::{EventStreamExt, EventStreamTopic}; use crate::{domain::event::EventEntity, server::AppState}; use async_trait::async_trait; -use fluvio::consumer::Record; use integrationos_domain::{prefix::IdPrefix, Id, IntegrationOSError, Unit}; use std::boxed::Box; use tokio_graceful_shutdown::SubsystemHandle; @@ -38,7 +37,7 @@ impl EventStreamExt for LoggerDriverImpl { &self, _ctx: &AppState, target: EventStreamTopic, - _event: &Record, + _event: &EventEntity, ) -> Result { tracing::info!( "Processing records from {} using logger handler", diff --git a/integrationos-emit/src/stream/mod.rs b/integrationos-emit/src/stream/mod.rs index 58478d61..2b6c178b 100644 --- a/integrationos-emit/src/stream/mod.rs +++ b/integrationos-emit/src/stream/mod.rs @@ -1,16 +1,16 @@ pub mod fluvio_driver; pub mod logger_driver; +pub mod pusher; pub mod scheduler; use crate::{domain::event::EventEntity, server::AppState}; use async_trait::async_trait; -use fluvio::consumer::Record; use integrationos_domain::{Id, IntegrationOSError, Unit}; use strum::{AsRefStr, Display, EnumIter, EnumString}; use tokio_graceful_shutdown::SubsystemHandle; #[async_trait] -pub trait EventStreamExt { +pub trait EventStreamExt { async fn publish( &self, event: EventEntity, @@ -26,7 +26,7 @@ pub trait EventStreamExt { &self, ctx: &AppState, target: EventStreamTopic, - event: &T, + events: &T, ) -> Result; } diff --git a/integrationos-emit/src/stream/pusher.rs b/integrationos-emit/src/stream/pusher.rs new file mode 100644 index 00000000..fda181aa --- /dev/null +++ b/integrationos-emit/src/stream/pusher.rs @@ -0,0 +1,172 @@ +use super::EventStreamExt; +use crate::{ + domain::{config::EmitterConfig, deduplication::Deduplication, event::EventEntity}, + stream::EventStreamTopic, +}; +use chrono::{Duration as CDuration, Utc}; +use futures::{StreamExt, TryStreamExt}; +use integrationos_domain::{IntegrationOSError, InternalError, MongoStore, Unit}; +use mongodb::bson::doc; +use std::{sync::Arc, time::Duration}; +use tokio_graceful_shutdown::{FutureExt, SubsystemHandle}; + +#[derive(Clone)] +pub struct EventPusher { + pub event_stream: Arc, + pub events: MongoStore, + pub deduplication: MongoStore, + pub max_concurrent_tasks: usize, + pub max_chunk_size: usize, + pub sleep_duration: u64, +} + +impl EventPusher { + pub async fn start( + &self, + config: &EmitterConfig, + subsys: SubsystemHandle, + ) -> Result { + match self.process(config).cancel_on_shutdown(&subsys).await { + Ok(result) => { + tracing::info!("Scheduled event publisher finished"); + subsys.on_shutdown_requested().await; + + result + } + Err(_) => { + tracing::warn!("EventPusher was cancelled due to shutdown"); + subsys.on_shutdown_requested().await; + Ok(()) + } + } + } + + async fn process(&self, config: &EmitterConfig) -> Result { + let events_store = self.events.clone(); + let deduplication_store = self.deduplication.clone(); + let event_stream = Arc::clone(&self.event_stream); + + let max_concurrent_tasks = self.max_concurrent_tasks; + let max_chunk_size = self.max_chunk_size; + let sleep_duration = self.sleep_duration; + + tracing::info!("Starting event pusher"); + loop { + let now = Utc::now(); + let before = now - CDuration::seconds(config.event_max_span_for_retry_secs); + + tracing::debug!("Polling for events at {}", now); + + let query = doc! { + "$or": [ + {"$and": [ + { + "outcome.type": "errored" + }, + { + "outcome.retries": { "$lt": config.event_processing_max_retries} + }, + { + "createdAt": { "$lt": before.timestamp_millis() } + }, + ]}, + {"$and": [ + { + "outcome.type": "executed" + }, + { + "createdAt": { "$lt": before.timestamp_millis() } + } + ]}, + {"$and": [ + { + "outcome.type": "created" + }, + { + "createdAt": { "$lt": before.timestamp_millis() } + } + ]} + ] + }; + + let events = events_store.collection.find(query).await; + + if let Ok(events) = events { + let event_stream = Arc::clone(&event_stream); + let deduplication_store = deduplication_store.clone(); + + let result = + events + .try_chunks(max_chunk_size) + .map(|result| { + let event_stream = Arc::clone(&event_stream); + let deduplication_store = deduplication_store.clone(); + + let result = + result.map_err(|e| InternalError::io_err(&e.to_string(), None)); + async move { + process_chunk(result, &event_stream, &deduplication_store).await + } + }) + .buffer_unordered(max_concurrent_tasks) + .collect::>() + .await + .into_iter() + .collect::, IntegrationOSError>>(); + + if let Err(e) = result { + tracing::error!("Failed to publish one or more event chunks: {e}"); + } + } else if let Err(e) = events { + tracing::error!("Failed to fetch events: {e}"); + } + + tokio::time::sleep(Duration::from_millis(sleep_duration)).await; + } + } +} + +async fn process_chunk( + result: Result, IntegrationOSError>, + event_stream: &Arc, + deduplication_store: &MongoStore, +) -> Result { + match result { + Ok(chunk) => { + tracing::info!("Publishing {} event(s)", chunk.len()); + for event in chunk { + let entity_id = event.entity_id; + let topic = if event.is_created() { + EventStreamTopic::Target + } else { + EventStreamTopic::Dlq + }; + + let deleted = deduplication_store + .collection + .delete_one(doc! { "_id": entity_id.to_string() }) + .await?; + + tracing::info!( + "Deleted event with id {:?} from deduplication store", + deleted + ); + + event_stream + .publish(event, topic) + .await + .inspect(|_| { + tracing::info!("Event with id {} is published", entity_id); + }) + .inspect_err(|e| { + tracing::error!("Failed to publish event: {e}"); + })?; + } + Ok(()) + } + Err(e) => { + tracing::error!("Failed to chunk events: {e}"); + Err(e) + } + } +} diff --git a/integrationos-emit/src/stream/scheduler.rs b/integrationos-emit/src/stream/scheduler.rs index f12ed2f6..2f4c6864 100644 --- a/integrationos-emit/src/stream/scheduler.rs +++ b/integrationos-emit/src/stream/scheduler.rs @@ -5,6 +5,7 @@ use futures::{StreamExt, TryStreamExt}; use integrationos_domain::{IntegrationOSError, InternalError, MongoStore, Unit}; use mongodb::bson::doc; use std::{sync::Arc, time::Duration}; +use tokio_graceful_shutdown::{FutureExt, SubsystemHandle}; // Simple scheduler. Heavily relies on the database for scheduling events #[derive(Clone)] @@ -17,7 +18,23 @@ pub struct PublishScheduler { } impl PublishScheduler { - pub async fn start(&self) -> Result { + pub async fn start(&self, subsys: SubsystemHandle) -> Result { + match self.process().cancel_on_shutdown(&subsys).await { + Ok(result) => { + tracing::info!("Scheduled event publisher finished"); + subsys.on_shutdown_requested().await; + + result + } + Err(_) => { + tracing::warn!("PublishScheduler was cancelled due to shutdown"); + subsys.on_shutdown_requested().await; + Ok(()) + } + } + } + + async fn process(&self) -> Result { let scheduled = self.scheduled.clone(); let event_stream = Arc::clone(&self.event_stream); @@ -31,12 +48,13 @@ impl PublishScheduler { "Polling for scheduled events at {}", Utc::now().timestamp_millis() ); + let events = scheduled .collection - .find( - doc! { "scheduleOn": { "$lte": Utc::now().timestamp_millis() } }, - None, - ) + .find(doc! { + "scheduleOn": { "$lte": Utc::now().timestamp_millis() } + + }) .await; if let Ok(events) = events { @@ -47,7 +65,7 @@ impl PublishScheduler { .map(|result| { let event_stream = Arc::clone(&event_stream); let scheduled = scheduled.clone(); - // + let result = result.map_err(|e| InternalError::io_err(&e.to_string(), None)); async move { process_chunk(result, &event_stream, &scheduled).await } @@ -88,7 +106,7 @@ async fn process_chunk( tracing::info!("Event with id {} is published", entity_id); scheduled .collection - .delete_one(doc! { "_id": id.to_string() }, None) + .delete_one(doc! { "_id": id.to_string() }) .await?; } } diff --git a/integrationos-emit/tests/context.rs b/integrationos-emit/tests/context.rs index b6e3e9e1..6994464b 100644 --- a/integrationos-emit/tests/context.rs +++ b/integrationos-emit/tests/context.rs @@ -1,8 +1,9 @@ use envconfig::Envconfig; use http::{Method, StatusCode}; -use integrationos_domain::{IntegrationOSError, InternalError}; +use integrationos_domain::{IntegrationOSError, InternalError, Unit}; use integrationos_emit::domain::config::EmitterConfig; use integrationos_emit::server::Server; +use mockito::{Server as MockServer, ServerGuard}; use serde::{de::DeserializeOwned, Serialize}; use serde_json::Value; use std::error::Error; @@ -13,17 +14,19 @@ use testcontainers_modules::{ testcontainers::{clients::Cli as Docker, Container}, }; use tokio::net::TcpListener; +use tokio_graceful_shutdown::Toplevel; use tracing::level_filters::LevelFilter; use tracing_subscriber::EnvFilter; use uuid::Uuid; static DOCKER: OnceLock = OnceLock::new(); static MONGO: OnceLock> = OnceLock::new(); -static TRACING: OnceLock<()> = OnceLock::new(); +static TRACING: OnceLock = OnceLock::new(); pub struct TestServer { pub port: u16, pub client: reqwest::Client, + pub mock_server: ServerGuard, } #[derive(Debug, Clone, Eq, PartialEq)] @@ -33,7 +36,7 @@ pub struct ApiResponse { } impl TestServer { - pub async fn new() -> Result { + pub async fn new(stream: bool) -> Result { TRACING.get_or_init(|| { let filter = EnvFilter::builder() .with_default_directive(LevelFilter::INFO.into()) @@ -55,7 +58,7 @@ impl TestServer { .expect("Failed to get local address") .port(); - let config = EmitterConfig::init_from_hashmap(&HashMap::from([ + let mut config = vec![ ( "INTERNAL_SERVER_ADDRESS".to_string(), format!("0.0.0.0:{port}"), @@ -66,20 +69,58 @@ impl TestServer { ("CONTEXT_DATABASE_NAME".to_string(), database_name.clone()), ("EVENT_DATABASE_URL".to_string(), database_uri.clone()), ("EVENT_DATABASE_NAME".to_string(), database_name.clone()), - ])) - .expect("Failed to initialize storage config"); + ]; + + let mock_server = MockServer::new_async().await; + + if stream { + let uri = mock_server.url(); + + config.push(("EVENT_STREAM_PROVIDER".to_string(), "fluvio".to_string())); + config.push(("EVENT_STREAM_PORT".to_string(), "9103".to_string())); + config.push(( + "EVENT_STREAM_PRODUCER_TOPIC".to_string(), + "events".to_string(), + )); + config.push(( + "EVENT_STREAM_CONSUMER_TOPIC".to_string(), + "events".to_string(), + )); + config.push(( + "EVENT_STREAM_CONSUMER_GROUP".to_string(), + "event-all-partitions-consumer".to_string(), + )); + config.push(( + "EVENT_CALLBACK_URL".to_string(), + format!("{uri}/v1/event-callbacks"), + )); + } + + let config = EmitterConfig::init_from_hashmap(&HashMap::from_iter(config)) + .expect("Failed to initialize storage config"); let server = Server::init(config.clone()) .await .expect("Failed to initialize storage"); - tokio::task::spawn(async move { server.run().await }); + tokio::task::spawn(async move { + Toplevel::new(|s| async move { + Server::subsystem(server, &config, s).await; + }) + .catch_signals() + .handle_shutdown_requests(Duration::from_secs(5)) + .await + }); - tokio::time::sleep(Duration::from_millis(50)).await; + tokio::time::sleep(Duration::from_secs(1)).await; let client = reqwest::Client::new(); - Ok(Self { port, client }) + Ok(Self { + port, + client, + mock_server, + }) } pub async fn send_request( @@ -90,7 +131,6 @@ impl TestServer { header: Option<&HashMap>, ) -> Result, IntegrationOSError> { let uri = format!("http://localhost:{}/{path}", self.port); - println!("Sending request to {uri}"); let mut req = self.client.request(method, uri); if let Some(payload) = payload { req = req.json(payload); diff --git a/integrationos-emit/tests/http/emitter.rs b/integrationos-emit/tests/http/emitter.rs index f9cecf8f..6eae74ca 100644 --- a/integrationos-emit/tests/http/emitter.rs +++ b/integrationos-emit/tests/http/emitter.rs @@ -1,24 +1,38 @@ use crate::context::TestServer; use futures::{stream, StreamExt}; -use http::{Method, StatusCode}; -use integrationos_domain::{IntegrationOSError, Unit}; +use http::{ + header::{ACCEPT, AUTHORIZATION, HOST}, + Method, StatusCode, +}; +use integrationos_domain::{prefix::IdPrefix, Id, IntegrationOSError, Unit}; +use integrationos_emit::logic::emitter::EntityIdResponse; +use mockito::Matcher; use serde_json::{json, Value}; -use std::collections::HashMap; -use uuid::Uuid; +use std::{collections::HashMap, time::Duration}; const PARALLEL_REQUESTS: usize = 10; #[tokio::test] async fn test_concurrent_requests() -> Result { - let server = TestServer::new().await?; + let server = TestServer::new(true).await?; let payload = json!({ "type": "DatabaseConnectionLost", "connectionId": "conn::GAL2svWJp9k::MtmXaau5Qf6R5n3Y-L9ejQ" }); + let response = server + .send_request::("v1/emit", Method::POST, Some(&payload), None) + .await; + + assert!(response.is_ok()); + let headers = HashMap::from_iter(vec![( "x-integrationos-idempotency-key".to_string(), - Uuid::new_v4().to_string(), + response + .expect("Failed to get response") + .data + .entity_id + .to_string(), )]); let reqs = vec!["v1/emit"; PARALLEL_REQUESTS]; @@ -55,5 +69,72 @@ async fn test_concurrent_requests() -> Result { 1 ); + tokio::time::sleep(Duration::from_secs(10)).await; + + Ok(()) +} + +#[tokio::test] +async fn test_event_processed() -> Result { + let mut server = TestServer::new(true).await?; + + let id = Id::now(IdPrefix::Connection).to_string(); + let payload = json!({ + "type": "DatabaseConnectionLost", + "connectionId": id.clone() + }); + let path = format!("/v1/event-callbacks/database-connection-lost/{}", id); + let mock_server = server + .mock_server + .mock("POST", path.as_str()) + .match_header(AUTHORIZATION, Matcher::Any) + .match_header(ACCEPT, "*/*") + .match_header(HOST, server.mock_server.host_with_port().as_str()) + .with_status(200) + .with_body("{}") + .with_header("content-type", "application/json") + .create_async() + .await; + + let res = server + .send_request::("v1/emit", Method::POST, Some(&payload), None) + .await + .expect("Failed to send request"); + + assert_eq!(res.code, StatusCode::OK); + + // Giving it some time for the commit to happen + tokio::time::sleep(Duration::from_secs(10)).await; + + mock_server.expect_at_most(1).assert_async().await; + + let id = Id::now(IdPrefix::Connection).to_string(); + let payload = json!({ + "type": "DatabaseConnectionLost", + "connectionId": id.clone() + }); + let path = format!("/v1/event-callbacks/database-connection-lost/{}", id); + let mock_server = server + .mock_server + .mock("POST", path.as_str()) + .match_header(AUTHORIZATION, Matcher::Any) + .match_header(ACCEPT, "*/*") + .match_header(HOST, server.mock_server.host_with_port().as_str()) + .with_status(500) + .with_body("{}") + .with_header("content-type", "application/json") + .create_async() + .await; + + let res = server + .send_request::("v1/emit", Method::POST, Some(&payload), None) + .await + .expect("Failed to send request"); + + assert_eq!(res.code, StatusCode::OK); + + tokio::time::sleep(Duration::from_secs(3)).await; + + mock_server.expect_at_least(3).assert_async().await; Ok(()) } diff --git a/integrationos-emit/tests/resource/Dockerfile b/integrationos-emit/tests/resource/Dockerfile new file mode 100644 index 00000000..c66da9dc --- /dev/null +++ b/integrationos-emit/tests/resource/Dockerfile @@ -0,0 +1,8 @@ +FROM ubuntu:20.04 + +RUN apt-get update +RUN apt-get install -y curl unzip +RUN curl -fsS https://hub.infinyon.cloud/install/install.sh?ctx=dc | bash + +ENV PATH "$PATH:/root/.fluvio/bin" +ENV PATH "$PATH:/root/.fvm/bin" diff --git a/integrationos-emit/tests/resource/docker-compose.yml b/integrationos-emit/tests/resource/docker-compose.yml new file mode 100644 index 00000000..1197a5aa --- /dev/null +++ b/integrationos-emit/tests/resource/docker-compose.yml @@ -0,0 +1,47 @@ +services: + cache: + image: redis:7-alpine + ports: + - '6379:6379' + volumes: + - ./redis-data:/data + sc: + image: infinyon/fluvio:stable + container_name: sc + hostname: sc + ports: + - "9103:9003" + environment: + - RUST_LOG=debug + command: "./fluvio-run sc --local /fluvio/metadata" + volumes: + - ./fluvio-metadata:/fluvio/metadata + sc-setup: + build: + context: . + dockerfile: Dockerfile + container_name: sc-setup + environment: + - RUST_LOG=debug + entrypoint: > + /bin/sh -c " + fluvio profile add docker sc:9003 docker; + fluvio cluster spu register --id 5001 -p 0.0.0.0:9110 -l spu:9010 --private-server spu:9011; + exit 0; + " + depends_on: + - sc + spu: + image: infinyon/fluvio:stable + container_name: spu + hostname: spu + volumes: + - ./fluvio-data:/fluvio/data + environment: + - RUST_LOG=debug + ports: + - "9110:9010" + - "9111:9011" + command: "./fluvio-run spu -i 5001 -p spu:9010 -v spu:9011 --sc-addr sc:9004 --log-base-dir /fluvio/data" + depends_on: + - sc diff --git a/integrationos-event/src/mongo_context_store.rs b/integrationos-event/src/mongo_context_store.rs index eb165c5a..2f5cea72 100644 --- a/integrationos-event/src/mongo_context_store.rs +++ b/integrationos-event/src/mongo_context_store.rs @@ -32,7 +32,7 @@ impl ContextStore for MongoContextStore { ) -> Result { let coll = self.db.collection(&self.collection_name); let context = coll - .find_one(doc! { "id": context_key.to_string() }, None) + .find_one(doc! { "id": context_key.to_string() }) .await?; Ok(context.ok_or_else(|| anyhow!("No context found"))?) } @@ -40,7 +40,7 @@ impl ContextStore for MongoContextStore { async fn set(&self, context: T) -> Result<()> { let instant = Instant::now(); let coll = self.db.collection(&self.collection_name); - if let Err(e) = coll.insert_one(context, None).await { + if let Err(e) = coll.insert_one(context).await { error!("PipelineExt insertion error {e}"); } trace!( diff --git a/integrationos-event/tests/mock_destination.rs b/integrationos-event/tests/mock_destination.rs index 11972596..65b410ee 100644 --- a/integrationos-event/tests/mock_destination.rs +++ b/integrationos-event/tests/mock_destination.rs @@ -88,7 +88,6 @@ pub async fn seed_db(config: &EventCoreConfig, base_url: String) -> Id { .insert_one( bson::to_bson_with_options(&stripe_model_config, Default::default()) .expect("Unable to serialize connection model definition"), - None, ) .await .unwrap(); @@ -122,7 +121,6 @@ pub async fn seed_db(config: &EventCoreConfig, base_url: String) -> Id { .insert_one( bson::to_bson_with_options(&conn, Default::default()) .expect("Unable to serialize connection"), - None, ) .await .unwrap(); diff --git a/integrationos-gateway/src/finalizer/mod.rs b/integrationos-gateway/src/finalizer/mod.rs index 78a64522..57c999b2 100644 --- a/integrationos-gateway/src/finalizer/mod.rs +++ b/integrationos-gateway/src/finalizer/mod.rs @@ -71,7 +71,7 @@ impl FinalizeEvent for Finalizer { } } let context = RootContext::new(event.id); - match self.context_collection.insert_one(&context, None).await { + match self.context_collection.insert_one(&context).await { Err(e) => { error!("Failed to save event context: {e}"); bail!(e); diff --git a/integrationos-unified/src/unified.rs b/integrationos-unified/src/unified.rs index 5e1386ae..a2341089 100644 --- a/integrationos-unified/src/unified.rs +++ b/integrationos-unified/src/unified.rs @@ -175,12 +175,12 @@ impl UnifiedDestination { Action::Unified { name, action, .. } => Ok(self .connection_model_definitions_store .collection - .find_one( - doc! { - "connectionPlatform": destination.platform.as_ref(), - "mapping.commonModelName": name.as_ref(), - "actionName": action.to_string() - }, + .find_one(doc! { + "connectionPlatform": destination.platform.as_ref(), + "mapping.commonModelName": name.as_ref(), + "actionName": action.to_string() + }) + .with_options( FindOneOptions::builder() .collation(Some( Collation::builder() diff --git a/integrationos-watchdog/src/client.rs b/integrationos-watchdog/src/client.rs index 576285bb..b9da1052 100644 --- a/integrationos-watchdog/src/client.rs +++ b/integrationos-watchdog/src/client.rs @@ -156,7 +156,7 @@ impl WatchdogClient { }, ]; - let mut event_keys = match coll.clone().aggregate(pipeline, None).await { + let mut event_keys = match coll.clone().aggregate(pipeline).await { Ok(e) => e, Err(e) => { error!("Failed to fetch event keys: {e}"); @@ -179,13 +179,11 @@ impl WatchdogClient { // Get the latest root context, then also get all latest pipeline contexts and extractor contexts if applicable let root_context = match root_coll .clone() - .find_one( - doc! { - "eventKey": event_key, - "type": "root" - }, - options.clone(), - ) + .find_one(doc! { + "eventKey": event_key, + "type": "root" + }) + .with_options(options.clone()) .await { Ok(c) => c, @@ -200,15 +198,15 @@ impl WatchdogClient { }; if let RootStage::ProcessingPipelines(ref mut pipelines) = root_context.stage { - let futs = pipelines.values().map(|p| { - pipeline_coll.find_one( - doc! { + let futs = pipelines.values().map(|p| async { + pipeline_coll + .find_one(doc! { "eventKey": p.event_key.to_string(), "pipelineKey": p.pipeline_key.clone(), "type": "pipeline" - }, - options.clone(), - ) + }) + .with_options(options.clone()) + .await }); let results = join_all(futs).await; @@ -222,14 +220,18 @@ impl WatchdogClient { if let PipelineStage::ExecutingExtractors(ref mut extractors) = context.stage { - let futs = extractors.values().map(|e| { + let futs = extractors.values().map(|e| async { let filter = doc! { "eventKey": e.event_key.to_string(), "pipelineKey": e.pipeline_key.clone(), "extractorKey": e.extractor_key.to_string(), "type": "extractor" }; - extractor_coll.find_one(filter, options.clone()) + + extractor_coll + .find_one(filter) + .with_options(options.clone()) + .await }); let results = join_all(futs).await; for result in results {