From 9eddc59f5da422804a739a354bb7e7c9088b6e80 Mon Sep 17 00:00:00 2001 From: Jakub Kowalski Date: Fri, 2 Feb 2024 18:31:38 +0100 Subject: [PATCH] OpenTelemetry init (#5426) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --------- Co-authored-by: MichaƂ Bartoszkiewicz GitOrigin-RevId: a18273a568e7ca5c230702cd582e695861380d2f --- Cargo.lock | 538 +++++++++++++++++- Cargo.toml | 10 +- pyproject.toml | 3 + python/pathway/__init__.py | 4 + python/pathway/engine.pyi | 14 + python/pathway/internals/__init__.py | 3 + python/pathway/internals/config.py | 13 +- .../internals/graph_runner/__init__.py | 125 ++-- .../internals/graph_runner/telemetry.py | 67 +++ python/pathway/internals/run.py | 4 + python/pathway/tests/test_telemetry.py | 30 + src/engine/dataflow.rs | 29 +- src/engine/error.rs | 3 + src/engine/license.rs | 39 ++ src/engine/mod.rs | 4 + src/engine/telemetry.rs | 364 ++++++++++++ src/python_api.rs | 68 ++- src/python_api/logging.rs | 4 +- 18 files changed, 1262 insertions(+), 60 deletions(-) create mode 100644 python/pathway/internals/graph_runner/telemetry.py create mode 100644 python/pathway/tests/test_telemetry.py create mode 100644 src/engine/license.rs create mode 100644 src/engine/telemetry.rs diff --git a/Cargo.lock b/Cargo.lock index 04b49ca2..b4cddec1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -87,6 +87,12 @@ dependencies = [ "libc", ] +[[package]] +name = "anyhow" +version = "1.0.79" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "080e9890a082662b09c1ad45f567faeeb47f22b5fb23895fbe1e651e718e25ca" + [[package]] name = "arc-swap" version = "1.6.0" @@ -121,6 +127,28 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "async-trait" version = "0.1.77" @@ -178,6 +206,51 @@ dependencies = [ "thiserror", ] +[[package]] +name = "axum" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +dependencies = [ + "async-trait", + "axum-core", + "bitflags 1.3.2", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -368,6 +441,25 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-deque" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.19" @@ -872,6 +964,12 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" +[[package]] +name = "hermit-abi" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" + [[package]] name = "hex" version = "0.4.3" @@ -945,6 +1043,18 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -1071,6 +1181,15 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" +[[package]] +name = "itertools" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.12.0" @@ -1192,6 +1311,12 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "matrixmultiply" version = "0.3.8" @@ -1313,6 +1438,15 @@ dependencies = [ "libc", ] +[[package]] +name = "ntapi" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4" +dependencies = [ + "winapi", +] + [[package]] name = "num-complex" version = "0.4.4" @@ -1341,6 +1475,16 @@ dependencies = [ "autocfg", ] +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "num_enum" version = "0.5.11" @@ -1446,6 +1590,84 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e32339a5dc40459130b3bd269e9892439f55b33e772d2a9d402a789baaf4e8a" +dependencies = [ + "futures-core", + "futures-sink", + "indexmap 2.1.0", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", + "urlencoding", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f24cda83b20ed2433c68241f918d0f6fdec8b1d43b7a9590ab4420c5095ca930" +dependencies = [ + "async-trait", + "futures-core", + "http", + "opentelemetry", + "opentelemetry-proto", + "opentelemetry-semantic-conventions", + "opentelemetry_sdk", + "prost", + "thiserror", + "tokio", + "tonic", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2e155ce5cc812ea3d1dffbd1539aed653de4bf4882d60e6e04dcf0901d674e1" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost", + "tonic", +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5774f1ef1f982ef2a447f6ee04ec383981a3ab99c8e77a1a7b30182e65bbc84" +dependencies = [ + "opentelemetry", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.21.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f16aec8a98a457a52664d69e0091bac3a0abd18ead9b641cb00202ba4e0efe4" +dependencies = [ + "async-trait", + "crossbeam-channel", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "once_cell", + "opentelemetry", + "ordered-float", + "percent-encoding", + "rand", + "thiserror", + "tokio", + "tokio-stream", +] + [[package]] name = "ordered-float" version = "4.2.0" @@ -1524,7 +1746,7 @@ dependencies = [ "hyper", "id-arena", "inotify", - "itertools", + "itertools 0.12.0", "jemallocator", "log", "ndarray", @@ -1532,6 +1754,10 @@ dependencies = [ "num-integer", "numpy", "once_cell", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry-semantic-conventions", + "opentelemetry_sdk", "ordered-float", "pipe", "postgres", @@ -1551,10 +1777,12 @@ dependencies = [ "serde_with 3.5.1", "smallvec", "syn 2.0.48", + "sysinfo", "tempfile", "thiserror", "timely", "tokio", + "uuid", "xxhash-rust", ] @@ -1602,6 +1830,26 @@ dependencies = [ "siphasher", ] +[[package]] +name = "pin-project" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "pin-project-lite" version = "0.2.13" @@ -1729,6 +1977,29 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "prost" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" +dependencies = [ + "anyhow", + "itertools 0.10.5", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "pyo3" version = "0.20.2" @@ -1871,6 +2142,26 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60a357793950651c4ed0f3f52338f53b2f809f32d83a07f72909fa13e4c6c1e3" +[[package]] +name = "rayon" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa7237101a77a10773db45d62004a272517633fbcc3df19d96455ede1122e051" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "rdkafka" version = "0.36.2" @@ -1993,6 +2284,20 @@ dependencies = [ "winreg", ] +[[package]] +name = "ring" +version = "0.17.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "688c63d65483050968b2a8937f7995f443e27041a0f7700aa59b0822aedebb74" +dependencies = [ + "cc", + "getrandom", + "libc", + "spin", + "untrusted", + "windows-sys 0.48.0", +] + [[package]] name = "rusqlite" version = "0.30.0" @@ -2080,6 +2385,55 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rustls" +version = "0.21.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba" +dependencies = [ + "log", + "ring", + "rustls-webpki", + "sct", +] + +[[package]] +name = "rustls-native-certs" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-pemfile" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" +dependencies = [ + "base64 0.21.5", +] + +[[package]] +name = "rustls-webpki" +version = "0.101.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" +dependencies = [ + "ring", + "untrusted", +] + +[[package]] +name = "rustversion" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4" + [[package]] name = "ryu" version = "1.0.16" @@ -2101,6 +2455,16 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "sct" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "security-framework" version = "2.9.2" @@ -2291,6 +2655,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" + [[package]] name = "stringprep" version = "0.1.4" @@ -2336,6 +2706,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + [[package]] name = "synstructure" version = "0.12.6" @@ -2348,6 +2724,21 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "sysinfo" +version = "0.30.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fb4f3438c8f6389c864e61221cbc97e9bca98b4daf39a5beb7bea660f528bb2" +dependencies = [ + "cfg-if", + "core-foundation-sys", + "libc", + "ntapi", + "once_cell", + "rayon", + "windows", +] + [[package]] name = "system-configuration" version = "0.5.1" @@ -2509,11 +2900,34 @@ dependencies = [ "bytes", "libc", "mio", + "num_cpus", "pin-project-lite", "socket2 0.5.5", + "tokio-macros", "windows-sys 0.48.0", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-macros" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "tokio-native-tls" version = "0.3.1" @@ -2550,6 +2964,27 @@ dependencies = [ "whoami", ] +[[package]] +name = "tokio-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +dependencies = [ + "rustls", + "tokio", +] + +[[package]] +name = "tokio-stream" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.10" @@ -2581,6 +3016,64 @@ dependencies = [ "winnow", ] +[[package]] +name = "tonic" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.21.5", + "bytes", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "rustls-native-certs", + "rustls-pemfile", + "tokio", + "tokio-rustls", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "indexmap 1.9.3", + "pin-project", + "pin-project-lite", + "rand", + "slab", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + [[package]] name = "tower-service" version = "0.3.2" @@ -2594,9 +3087,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "tracing-core" version = "0.1.32" @@ -2657,6 +3162,12 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7de7d73e1754487cb58364ee906a499937a0dfabd86bcb980fa99ec8c8fa2ce" +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + [[package]] name = "url" version = "2.5.0" @@ -2668,6 +3179,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + +[[package]] +name = "uuid" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f00cc9702ca12d3c81455259621e676d0f7251cec66a21e98fe2e9a37db93b2a" +dependencies = [ + "getrandom", +] + [[package]] name = "vcpkg" version = "0.2.15" @@ -2809,6 +3335,16 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" +dependencies = [ + "windows-core", + "windows-targets 0.52.0", +] + [[package]] name = "windows-core" version = "0.52.0" diff --git a/Cargo.toml b/Cargo.toml index 9c696cd5..ae4e11e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,10 +39,14 @@ itertools = "0.12.0" jemallocator = { version = "0.5.4", features = ["stats", "disable_initial_exec_tls"] } log = { version = "0.4.20", features = ["std"] } ndarray = { version = "0.15.6", features = ["serde"] } -nix = { version = "0.27.1", features = ["fs", "user"] } +nix = { version = "0.27.1", features = ["fs", "user", "resource"] } num-integer = "0.1.45" numpy = "0.20.0" once_cell = "1.19.0" +opentelemetry = { version = "0.21.0", features = ["trace", "metrics"] } +opentelemetry-otlp = { version = "0.14.0", features = ["default", "tls", "tls-roots", "metrics"] } +opentelemetry-semantic-conventions = "0.13.0" +opentelemetry_sdk = { version = "0.21.2", features = ["rt-tokio", "rt-tokio-current-thread"] } ordered-float = { version = "4.2.0", features = ["serde"] } pipe = "0.4.0" postgres = { version = "0.19.7", features = ["with-chrono-0_4", "with-serde_json-1"] } @@ -61,10 +65,12 @@ serde_json = "1.0" serde_with = "3.5.1" smallvec = { version = "1.13.1", features = ["union", "const_generics"] } syn = { version = "2.0.48", features = ["default", "full", "visit", "visit-mut"] } # Hack to keep features unified between normal and build deps +sysinfo = "0.30.5" tempfile = "3.9.0" thiserror = "1.0.56" timely = { path = "./external/timely-dataflow/timely", features = ["bincode"] } -tokio = "1.35.1" +tokio = { version = "1.35.1", features = ["rt-multi-thread"] } +uuid = { version = "1.7.0", features = ["v4"] } xxhash-rust = { version = "0.8.8", features = ["xxh3"] } [target.'cfg(target_os = "linux")'.dependencies] diff --git a/pyproject.toml b/pyproject.toml index 8aa54fac..2170123f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,9 @@ dependencies = [ "jmespath >= 1.0.1", "Office365-REST-Python-Client >= 2.5.3", "aiohttp_cors >= 0.7.0", + "opentelemetry-api >= 1.22.0", + "opentelemetry-sdk >= 1.22.0", + "opentelemetry-exporter-otlp-proto-grpc >= 1.22.0", ] [project.optional-dependencies] diff --git a/python/pathway/__init__.py b/python/pathway/__init__.py index 1eb6353b..ae2e1776 100644 --- a/python/pathway/__init__.py +++ b/python/pathway/__init__.py @@ -65,6 +65,8 @@ schema_from_csv, schema_from_dict, schema_from_types, + set_license_key, + set_telemetry_server, sql, table_transformer, this, @@ -184,6 +186,8 @@ "join_outer", "groupby", "persistence", + "set_license_key", + "set_telemetry_server", ] diff --git a/python/pathway/engine.pyi b/python/pathway/engine.pyi index 4439d142..1a5327a8 100644 --- a/python/pathway/engine.pyi +++ b/python/pathway/engine.pyi @@ -615,6 +615,9 @@ def run_with_new_graph( monitoring_level: MonitoringLevel = MonitoringLevel.NONE, with_http_server: bool = False, persistence_config: PersistenceConfig | None = None, + license_key: str | None = None, + telemetry_server: str | None = None, + trace_parent: str | None = None, ) -> list[CapturedStream]: ... def unsafe_make_pointer(arg) -> Pointer: ... @@ -700,3 +703,14 @@ class SnapshotEvent: class LocalBinarySnapshotWriter: def __init__(self, path: str, persistent_id: str, worker_id: int): ... def write(self, events: list[SnapshotEvent]): ... + +class TelemetryConfig: + telemetry_enabled: bool + telemetry_server_endpoint: str | None + service_name: str | None + service_version: str | None + run_id: str + @staticmethod + def create( + *, license_key: str | None = None, telemetry_server: str | None = None + ) -> TelemetryConfig: ... diff --git a/python/pathway/internals/__init__.py b/python/pathway/internals/__init__.py index 6ac0bf54..bcda964b 100644 --- a/python/pathway/internals/__init__.py +++ b/python/pathway/internals/__init__.py @@ -20,6 +20,7 @@ table_transformer, unwrap, ) +from pathway.internals.config import set_license_key, set_telemetry_server from pathway.internals.custom_reducers import BaseCustomAccumulator from pathway.internals.datetime_types import DateTimeNaive, DateTimeUtc, Duration from pathway.internals.decorators import ( @@ -143,4 +144,6 @@ "join_right", "join_outer", "groupby", + "set_license_key", + "set_telemetry_server", ] diff --git a/python/pathway/internals/config.py b/python/pathway/internals/config.py index 60d02f6a..ba2534b0 100644 --- a/python/pathway/internals/config.py +++ b/python/pathway/internals/config.py @@ -57,6 +57,8 @@ class PathwayConfig: persistence_mode: api.PersistenceMode = field(default_factory=_persistence_mode) snapshot_access: api.SnapshotAccess | None = field(default_factory=_snapshot_access) replay_storage: str | None = _env_field("PATHWAY_REPLAY_STORAGE") + license_key: str | None = _env_field("PATHWAY_LICENSE_KEY") + telemetry_server: str | None = _env_field("PATHWAY_TELEMETRY_SERVER") @property def replay_config( @@ -86,4 +88,13 @@ def replay_config( pathway_config = PathwayConfig() -__all__ = ["PathwayConfig", "pathway_config"] + +def set_license_key(key: str) -> None: + pathway_config.license_key = key + + +def set_telemetry_server(endpoint: str) -> None: + pathway_config.telemetry_server = endpoint + + +__all__ = ["PathwayConfig", "pathway_config", "set_license_key", "set_telemetry_server"] diff --git a/python/pathway/internals/graph_runner/__init__.py b/python/pathway/internals/graph_runner/__init__.py index 7ed93bbd..51903067 100644 --- a/python/pathway/internals/graph_runner/__init__.py +++ b/python/pathway/internals/graph_runner/__init__.py @@ -4,6 +4,7 @@ from collections.abc import Callable, Iterable +import pathway.internals.graph_runner.telemetry as telemetry from pathway.internals import api, parse_graph as graph, table, trace from pathway.internals.column_path import ColumnPath from pathway.internals.config import pathway_config @@ -31,6 +32,7 @@ class GraphRunner: debug: bool ignore_asserts: bool runtime_typechecking: bool + telemetry_config: api.TelemetryConfig def __init__( self, @@ -43,6 +45,7 @@ def __init__( default_logging: bool = True, persistence_config: PersistenceConfig | None = None, runtime_typechecking: bool | None = None, + license_key: str | None = None, ) -> None: self._graph = input_graph self.debug = debug @@ -57,6 +60,9 @@ def __init__( self.runtime_typechecking = pathway_config.runtime_typechecking else: self.runtime_typechecking = runtime_typechecking + if license_key is None: + license_key = pathway_config.license_key + self.license_key = license_key def run_tables( self, @@ -106,59 +112,78 @@ def _run( output_tables: Iterable[table.Table] = (), after_build: Callable[[ScopeState, OperatorStorageGraph], None] | None = None, ) -> list[api.CapturedStream]: - storage_graph = OperatorStorageGraph.from_scope_context( - context, self, output_tables + otel = telemetry.Telemetry.create( + license_key=self.license_key, + telemetry_server=pathway_config.telemetry_server, ) - def logic( - scope: api.Scope, - storage_graph: OperatorStorageGraph = storage_graph, - ) -> list[tuple[api.Table, list[ColumnPath]]]: - state = ScopeState(scope) - storage_graph.build_scope(scope, state, self) - if after_build is not None: - after_build(state, storage_graph) - return storage_graph.get_output_tables(output_tables, state) - - node_names = [ - (operator.id, operator.label()) - for operator in context.nodes - if isinstance(operator, ContextualizedIntermediateOperator) - ] - monitoring_level = self.monitoring_level.to_internal() - - with new_event_loop() as event_loop, monitor_stats( - monitoring_level, node_names, self.default_logging - ) as stats_monitor: - if self.persistence_config: - self.persistence_config.on_before_run() - persistence_engine_config = self.persistence_config.engine_config - else: - persistence_engine_config = None - - try: - return api.run_with_new_graph( - logic, - event_loop=event_loop, - ignore_asserts=self.ignore_asserts, - stats_monitor=stats_monitor, - monitoring_level=monitoring_level, - with_http_server=self.with_http_server, - persistence_config=persistence_engine_config, - ) - except api.EngineErrorWithTrace as e: - error, frame = e.args - if frame is not None: - trace.add_pathway_trace_note( - error, - trace.Frame( - filename=frame.file_name, - line_number=frame.line_number, - line=frame.line, - function=frame.function, - ), + with otel.tracer.start_as_current_span("graph_runner.run"): + trace_context, trace_parent = telemetry.get_current_context() + + storage_graph = OperatorStorageGraph.from_scope_context( + context, self, output_tables + ) + + @otel.tracer.start_as_current_span( + "graph_runner.build", + context=trace_context, + attributes=dict( + graph=repr(self._graph), + debug=self.debug, + ), + ) + def logic( + scope: api.Scope, + storage_graph: OperatorStorageGraph = storage_graph, + ) -> list[tuple[api.Table, list[ColumnPath]]]: + state = ScopeState(scope) + storage_graph.build_scope(scope, state, self) + if after_build is not None: + after_build(state, storage_graph) + return storage_graph.get_output_tables(output_tables, state) + + node_names = [ + (operator.id, operator.label()) + for operator in context.nodes + if isinstance(operator, ContextualizedIntermediateOperator) + ] + monitoring_level = self.monitoring_level.to_internal() + + with new_event_loop() as event_loop, monitor_stats( + monitoring_level, node_names, self.default_logging + ) as stats_monitor: + if self.persistence_config: + self.persistence_config.on_before_run() + persistence_engine_config = self.persistence_config.engine_config + else: + persistence_engine_config = None + + try: + return api.run_with_new_graph( + logic, + event_loop=event_loop, + ignore_asserts=self.ignore_asserts, + stats_monitor=stats_monitor, + monitoring_level=monitoring_level, + with_http_server=self.with_http_server, + persistence_config=persistence_engine_config, + license_key=self.license_key, + telemetry_server=pathway_config.telemetry_server, + trace_parent=trace_parent, ) - raise error + except api.EngineErrorWithTrace as e: + error, frame = e.args + if frame is not None: + trace.add_pathway_trace_note( + error, + trace.Frame( + filename=frame.file_name, + line_number=frame.line_number, + line=frame.line, + function=frame.function, + ), + ) + raise error def tree_shake_tables( self, graph_scope: graph.Scope, tables: Iterable[table.Table] diff --git a/python/pathway/internals/graph_runner/telemetry.py b/python/pathway/internals/graph_runner/telemetry.py new file mode 100644 index 00000000..fdff742f --- /dev/null +++ b/python/pathway/internals/graph_runner/telemetry.py @@ -0,0 +1,67 @@ +from __future__ import annotations + +import sys + +from opentelemetry import trace +from opentelemetry.context import Context +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk.resources import SERVICE_NAME, SERVICE_VERSION, Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator +from opentelemetry.util.types import AttributeValue + +from pathway.internals import api + +propagator = TraceContextTextMapPropagator() + + +class Telemetry: + config: api.TelemetryConfig + tracer: trace.Tracer + + def __init__(self, telemetry_config: api.TelemetryConfig) -> None: + self.config = telemetry_config + self.tracer = self._init_tracer() + + @classmethod + def create( + cls, license_key: str | None = None, telemetry_server: str | None = None + ) -> Telemetry: + config = api.TelemetryConfig.create( + license_key=license_key, telemetry_server=telemetry_server + ) + return cls(config) + + def _init_tracer(self) -> trace.Tracer: + if self.config.telemetry_enabled: + trace_provider = TracerProvider( + resource=Resource( + attributes={ + SERVICE_NAME: self.config.service_name or "", + SERVICE_VERSION: self.config.service_version or "", + "run.id": self.config.run_id, + "python.version": sys.version, + "otel.scope.name": "python", + } + ) + ) + exporter = OTLPSpanExporter(endpoint=self.config.telemetry_server_endpoint) + trace_provider.add_span_processor(BatchSpanProcessor(exporter)) + return trace_provider.get_tracer("pathway-tracer") + else: + return trace.NoOpTracer() + + +def get_current_context() -> tuple[Context, str | None]: + carrier: dict[str, str | list[str]] = {} + propagator.inject(carrier) + context = propagator.extract(carrier) + trace_parent = carrier.get("traceparent", None) + assert trace_parent is None or isinstance(trace_parent, str) + return context, trace_parent + + +def event(name: str, attributes: dict[str, AttributeValue]): + span = trace.get_current_span() + span.add_event(name, attributes) diff --git a/python/pathway/internals/run.py b/python/pathway/internals/run.py index eaceb37b..cec4eadd 100644 --- a/python/pathway/internals/run.py +++ b/python/pathway/internals/run.py @@ -17,6 +17,7 @@ def run( default_logging: bool = True, persistence_config: PersistenceConfig | None = None, runtime_typechecking: bool | None = None, + license_key: str | None = None, ): """Runs the computation graph. @@ -41,6 +42,7 @@ def run( with_http_server=with_http_server, default_logging=default_logging, persistence_config=persistence_config, + license_key=license_key, runtime_typechecking=runtime_typechecking, ).run_outputs() @@ -54,6 +56,7 @@ def run_all( default_logging: bool = True, persistence_config: PersistenceConfig | None = None, runtime_typechecking: bool | None = None, + license_key: str | None = None, ): """Runs the computation graph with disabled tree-shaking optimization. @@ -79,4 +82,5 @@ def run_all( default_logging=default_logging, persistence_config=persistence_config, runtime_typechecking=runtime_typechecking, + license_key=license_key, ).run_all() diff --git a/python/pathway/tests/test_telemetry.py b/python/pathway/tests/test_telemetry.py new file mode 100644 index 00000000..aaf9c798 --- /dev/null +++ b/python/pathway/tests/test_telemetry.py @@ -0,0 +1,30 @@ +import logging + +import pytest + +import pathway as pw +from pathway.internals import api + + +def test_license_invalid(): + with pytest.raises(api.EngineError, match="invalid license key"): + pw.run_all(license_key="invalid") + + +def test_telemetry_disabled(caplog): + caplog.set_level(level=logging.DEBUG) + + pw.run_all(license_key="", monitoring_level=pw.MonitoringLevel.NONE) + + assert "Telemetry disabled" in caplog.text + + +def test_telemetry_disabled_no_feature_flag(caplog): + caplog.set_level(level=logging.DEBUG) + + pw.run_all( + license_key="EVALUATION-WITH-TELEMETRY", + monitoring_level=pw.MonitoringLevel.NONE, + ) + + assert "Telemetry disabled" in caplog.text diff --git a/src/engine/dataflow.rs b/src/engine/dataflow.rs index 25e22ee2..38827428 100644 --- a/src/engine/dataflow.rs +++ b/src/engine/dataflow.rs @@ -20,6 +20,7 @@ use crate::engine::dataflow::operators::time_column::{ Epsilon, TimeColumnForget, TimeColumnFreeze, }; +use crate::engine::telemetry::Config as TelemetryConfig; use crate::engine::value::HashInto; use crate::persistence::config::{PersistenceManagerConfig, PersistenceManagerOuterConfig}; use crate::persistence::sync::SharedWorkersPersistenceCoordinator; @@ -90,6 +91,7 @@ use super::error::{DynError, DynResult, Trace}; use super::expression::AnyExpression; use super::graph::{DataRow, SubscribeCallbacks}; use super::http_server::maybe_run_http_server_thread; +use super::license::{License, ResourceLimit}; use super::progress_reporter::{maybe_run_reporter, MonitoringLevel}; use super::reduce::{ AnyReducer, ArgMaxReducer, ArgMinReducer, ArraySumReducer, CountReducer, FloatSumReducer, @@ -97,11 +99,13 @@ use super::reduce::{ StatefulCombineFn, StatefulReducer, TupleReducer, UniqueReducer, }; use super::report_error::{ReportError, ReportErrorExt, SpawnWithReporter, UnwrapWithReporter}; +use super::telemetry::maybe_run_telemetry_thread; use super::{ BatchWrapper, ColumnHandle, ColumnPath, ColumnProperties, ComplexColumn, Error, Expression, ExpressionData, Graph, IterationLogic, IxKeyPolicy, JoinType, Key, LegacyTable, OperatorStats, ProberStats, Reducer, ReducerData, Result, TableHandle, TableProperties, UniverseHandle, Value, }; +use nix::sys::resource::{getrlimit, setrlimit}; pub type WakeupReceiver = Receiver DynResult<()> + Send + Sync + 'static>>; @@ -4652,7 +4656,7 @@ impl> Graph for OuterDataflowGraph } } -fn parse_env_var(name: &str) -> Result, String> +pub fn parse_env_var(name: &str) -> Result, String> where T::Err: Display, { @@ -4723,6 +4727,21 @@ pub fn config_from_env() -> Result<(Config, usize), String> { Ok((config, workers)) } +fn set_process_limits(process_limits: Vec) -> Result<()> { + let telemetry_enabled: bool = parse_env_var("PATHWAY_TELEMETRY_ENABLED") + .map_err(DynError::from)? + .unwrap_or(false); + if telemetry_enabled { + for ResourceLimit(resource, limit) in process_limits { + let (_, hard_limit) = getrlimit(resource).map_err(DynError::from)?; + let limit = hard_limit.min(limit); + info!("Setting process limit: {resource:?}, {limit:?}"); + setrlimit(resource, limit, limit).map_err(DynError::from)?; + } + } + Ok(()) +} + #[allow(clippy::too_many_lines)] // XXX #[allow(clippy::too_many_arguments)] // XXX pub fn run_with_new_dataflow_graph( @@ -4736,11 +4755,15 @@ pub fn run_with_new_dataflow_graph( with_http_server: bool, persistence_config: Option, num_workers: usize, + license: License, + telemetry_config: TelemetryConfig, ) -> Result> where R: 'static, R2: Send + 'static, { + set_process_limits(license.get_resource_limits())?; + if !env::var("PATHWAY_SKIP_START_LOG").is_ok_and(|v| v == "1") { info!("Preparing Pathway computation"); } @@ -4782,6 +4805,7 @@ where mut probers, progress_reporter_runner, http_server_runner, + telemetry_runner, ) = worker.dataflow::(|scope| { let graph = OuterDataflowGraph::new( scope.clone(), @@ -4791,6 +4815,7 @@ where global_persistent_storage.clone(), ) .unwrap_with_reporter(&error_reporter); + let telemetry_runner = maybe_run_telemetry_thread(&graph, telemetry_config.clone()); let res = logic(&graph).unwrap_with_reporter(&error_reporter); let progress_reporter_runner = maybe_run_reporter(&monitoring_level, &graph, stats_monitor.clone()); @@ -4808,6 +4833,7 @@ where graph.probers, progress_reporter_runner, http_server_runner, + telemetry_runner, ) }); @@ -4866,6 +4892,7 @@ where drop(http_server_runner); drop(progress_reporter_runner); + drop(telemetry_runner); finish(res) })) diff --git a/src/engine/error.rs b/src/engine/error.rs index 82117d9c..1c73cab4 100644 --- a/src/engine/error.rs +++ b/src/engine/error.rs @@ -156,6 +156,9 @@ pub enum Error { #[error("exception in Python subject: {0}")] ReaderFailed(#[source] ReadError), + + #[error("invalid license key")] + InvalidLicenseKey, } impl Error { diff --git a/src/engine/license.rs b/src/engine/license.rs new file mode 100644 index 00000000..713b02e4 --- /dev/null +++ b/src/engine/license.rs @@ -0,0 +1,39 @@ +use super::{Error, Result}; +use nix::sys::resource::Resource; +const EVALUATION_LICENSE_KEY: &str = "EVALUATION-WITH-TELEMETRY"; +const DEBUG_NO_LIMIT_LICENSE_KEY: &str = "DEBUG-NO-LIMIT"; + +#[derive(Clone, Copy)] +pub struct ResourceLimit(pub Resource, pub u64); + +#[derive(Clone, Copy)] +pub enum License { + NoLicenseKey, + Evaluation, + DebugNoLimit, +} + +impl License { + pub fn new(license_key: Option) -> Result { + let license = if let Some(license_key) = license_key { + match license_key.trim() { + EVALUATION_LICENSE_KEY => License::Evaluation, + DEBUG_NO_LIMIT_LICENSE_KEY => License::DebugNoLimit, + "" => License::NoLicenseKey, + _ => return Err(Error::InvalidLicenseKey), + } + } else { + License::NoLicenseKey + }; + Ok(license) + } + + pub fn get_resource_limits(self) -> Vec { + match self { + License::NoLicenseKey => { + vec![ResourceLimit(Resource::RLIMIT_CPU, 3600)] + } + _ => vec![], + } + } +} diff --git a/src/engine/mod.rs b/src/engine/mod.rs index de1d4342..a9a20dd2 100644 --- a/src/engine/mod.rs +++ b/src/engine/mod.rs @@ -4,6 +4,7 @@ #![allow(clippy::type_complexity)] pub mod error; +pub mod license; pub use self::error::{Error, Result}; pub mod report_error; @@ -38,3 +39,6 @@ pub use expression::{ pub mod progress_reporter; pub mod time; pub use time::{DateTimeNaive, DateTimeUtc, Duration}; + +pub mod telemetry; +pub use telemetry::Config; diff --git a/src/engine/telemetry.rs b/src/engine/telemetry.rs new file mode 100644 index 00000000..cafbb03d --- /dev/null +++ b/src/engine/telemetry.rs @@ -0,0 +1,364 @@ +use std::{ + sync::Arc, + thread::{Builder, JoinHandle}, + time::{Duration, SystemTime}, +}; + +use super::{ + dataflow::parse_env_var, error::DynError, license::License, Graph, ProberStats, Result, +}; +use arc_swap::ArcSwapOption; +use log::{debug, info}; +use nix::sys::{ + resource::{getrusage, UsageWho}, + time::TimeValLike, +}; +use opentelemetry::metrics::Unit; +use opentelemetry::{global, Key}; +use opentelemetry_otlp::{Protocol, WithExportConfig}; +use opentelemetry_sdk::{ + metrics::{ + reader::{DefaultAggregationSelector, DefaultTemporalitySelector}, + MeterProvider, PeriodicReader, + }, + propagation::TraceContextPropagator, + runtime, + trace::{self, TracerProvider}, + Resource, +}; +use opentelemetry_semantic_conventions::resource::{ + OTEL_SCOPE_NAME, PROCESS_PID, SERVICE_NAME, SERVICE_VERSION, +}; +use scopeguard::defer; +use sysinfo::{get_current_pid, System}; +use tokio::sync::mpsc; +use uuid::Uuid; + +const DEFAULT_TELEMETRY_SERVER: &str = ""; + +const PERIODIC_READER_INTERVAL: Duration = Duration::from_secs(10); +const OPENTELEMETRY_EXPORT_TIMEOUT: Duration = Duration::from_secs(3); + +const PROCESS_MEMORY_USAGE: &str = "process.memory.usage"; +const PROCESS_CPU_USAGAE: &str = "process.cpu.usage"; +const PROCESS_CPU_USER_TIME: &str = "process.cpu.utime"; +const PROCESS_CPU_SYSTEM_TIME: &str = "process.cpu.stime"; +const INPUT_LATENCY: &str = "latency.input"; +const OUTPUT_LATENCY: &str = "latency.output"; + +const ROOT_TRACE_ID: Key = Key::from_static_str("root.trace.id"); +const RUN_ID: Key = Key::from_static_str("run.id"); + +struct Telemetry { + pub config: TelemetryEnabled, +} + +impl Telemetry { + fn new(config: TelemetryEnabled) -> Self { + Telemetry { config } + } + + fn resource(&self) -> Resource { + let pid: i64 = get_current_pid() + .expect("Failed to get current PID") + .as_u32() + .into(); + + let root_trace_id = root_trace_id(self.config.trace_parent.as_deref()).unwrap_or_default(); + + Resource::new([ + SERVICE_NAME.string(self.config.service_name.clone()), + SERVICE_VERSION.string(self.config.service_version.clone()), + OTEL_SCOPE_NAME.string("rust"), + PROCESS_PID.i64(pid), + ROOT_TRACE_ID.string(root_trace_id.to_string()), + RUN_ID.string(self.config.run_id.clone()), + ]) + } + + fn base_otel_exporter_builder(&self) -> opentelemetry_otlp::TonicExporterBuilder { + opentelemetry_otlp::new_exporter() + .tonic() + .with_protocol(Protocol::Grpc) + .with_endpoint(self.config.telemetry_server_endpoint.clone()) + .with_timeout(OPENTELEMETRY_EXPORT_TIMEOUT) + } + + fn init_tracer_provider(&self) { + global::set_text_map_propagator(TraceContextPropagator::new()); + + let exporter = self + .base_otel_exporter_builder() + .build_span_exporter() + .expect("exporter initialization should not fail"); + + let provider = TracerProvider::builder() + .with_config(trace::Config::default().with_resource(self.resource())) + .with_batch_exporter(exporter, runtime::Tokio) + .build(); + + global::set_tracer_provider(provider.clone()); + } + + fn init_meter_provider(&self) { + let exporter = self + .base_otel_exporter_builder() + .build_metrics_exporter( + Box::new(DefaultAggregationSelector::new()), + Box::new(DefaultTemporalitySelector::new()), + ) + .unwrap(); + + let reader = PeriodicReader::builder(exporter, runtime::Tokio) + .with_interval(PERIODIC_READER_INTERVAL) + .build(); + + let meter_provider = MeterProvider::builder() + .with_resource(self.resource()) + .with_reader(reader) + .build(); + + global::set_meter_provider(meter_provider.clone()); + } + + fn init(&self) { + self.init_meter_provider(); + self.init_tracer_provider(); + } + + fn teardown() { + global::shutdown_meter_provider(); + global::shutdown_tracer_provider(); + } +} + +fn root_trace_id(trace_parent: Option<&str>) -> Option<&str> { + if let Some(trace_parent) = trace_parent { + Some( + trace_parent + .split('-') + .nth(1) + .expect("trace parent should contain the root trace ID"), + ) + } else { + None + } +} + +#[derive(Clone, Debug)] +#[allow(clippy::module_name_repetitions)] +pub struct TelemetryEnabled { + pub telemetry_server_endpoint: String, + pub service_name: String, + pub service_version: String, + pub run_id: String, + pub trace_parent: Option, +} + +#[derive(Clone, Debug)] +pub enum Config { + Enabled(TelemetryEnabled), + Disabled, +} + +impl Config { + pub fn create( + license: License, + telemetry_server: Option, + trace_parent: Option, + ) -> Result { + let telemetry_enabled: bool = parse_env_var("PATHWAY_TELEMETRY_ENABLED") + .map_err(DynError::from)? + .unwrap_or(false); + let run_id: String = parse_env_var("PATHWAY_RUN_ID") + .map_err(DynError::from)? + .unwrap_or(Uuid::new_v4().to_string()); + let config = if telemetry_enabled { + let telemetry_server = + telemetry_server.unwrap_or(String::from(DEFAULT_TELEMETRY_SERVER)); + match license { + License::Evaluation | License::DebugNoLimit => Config::Enabled(TelemetryEnabled { + telemetry_server_endpoint: telemetry_server, + service_name: env!("CARGO_PKG_NAME").to_string(), + service_version: env!("CARGO_PKG_VERSION").to_string(), + run_id, + trace_parent, + }), + License::NoLicenseKey => Config::Disabled, + } + } else { + Config::Disabled + }; + Ok(config) + } +} + +pub struct Runner { + close_sender: mpsc::Sender<()>, + telemetry_thread_handle: Option>, +} + +impl Runner { + fn run(telemetry: Telemetry, stats: Arc>) -> Runner { + let (tx, mut rx) = mpsc::channel::>(1); + let telemetry_thread_handle = start_telemetry_thread(telemetry, tx, stats); + let close_sender = rx.blocking_recv().expect("expecting return sender"); + Runner { + close_sender, + telemetry_thread_handle: Some(telemetry_thread_handle), + } + } +} + +fn start_telemetry_thread( + telemetry: Telemetry, + start_sender: mpsc::Sender>, + stats: Arc>, +) -> JoinHandle<()> { + let handle: JoinHandle<()> = Builder::new() + .name("pathway:telemetry_thread".to_string()) + .spawn(move || { + tokio::runtime::Builder::new_multi_thread() + .enable_time() + .enable_io() + .build() + .unwrap() + .block_on(async { + let (tx, mut rx) = mpsc::channel::<()>(1); + telemetry.init(); + defer! { + Telemetry::teardown(); + } + register_stats_metrics(&stats); + register_sys_metrics(); + start_sender.send(tx).await.expect("should not fail"); + rx.recv().await; + }); + }) + .expect("telemetry thread creation failed"); + handle +} + +fn register_stats_metrics(stats: &Arc>) { + let stats = stats.clone(); + + let meter = global::meter("pathway-stats"); + + let input_latency_gauge = meter + .u64_observable_gauge(INPUT_LATENCY) + .with_unit(Unit::new("ms")) + .init(); + + let output_latency_gauge = meter + .u64_observable_gauge(OUTPUT_LATENCY) + .with_unit(Unit::new("ms")) + .init(); + + meter + .register_callback( + &[input_latency_gauge.as_any(), output_latency_gauge.as_any()], + move |observer| { + let now = SystemTime::now(); + + if let Some(ref stats) = *stats.load() { + if let Some(latency) = stats.input_stats.latency(now) { + observer.observe_u64(&input_latency_gauge, latency, &[]); + } + if let Some(latency) = stats.output_stats.latency(now) { + observer.observe_u64(&output_latency_gauge, latency, &[]); + } + } + }, + ) + .expect("Initializing meter callback should not fail"); +} + +fn register_sys_metrics() { + let meter = global::meter("pathway-sys"); + + let pid = get_current_pid().expect("Failed to get current PID"); + + let memory_usage_gauge = meter + .u64_observable_gauge(PROCESS_MEMORY_USAGE) + .with_unit(Unit::new("byte")) + .init(); + + let cpu_usage_gauge = meter + .f64_observable_gauge(PROCESS_CPU_USAGAE) + .with_unit(Unit::new("%")) + .init(); + + let cpu_user_time_gauge = meter + .i64_observable_gauge(PROCESS_CPU_USER_TIME) + .with_unit(Unit::new("s")) + .init(); + + let cpu_system_time_gauge = meter + .i64_observable_gauge(PROCESS_CPU_SYSTEM_TIME) + .with_unit(Unit::new("s")) + .init(); + + meter + .register_callback( + &[ + memory_usage_gauge.as_any(), + cpu_usage_gauge.as_any(), + cpu_user_time_gauge.as_any(), + cpu_system_time_gauge.as_any(), + ], + move |observer| { + let mut sys: System = System::new(); + let usage = getrusage(UsageWho::RUSAGE_SELF).expect("Failed to call getrusage"); + sys.refresh_process(pid); + + if let Some(process) = sys.process(pid) { + observer.observe_u64(&memory_usage_gauge, process.memory(), &[]); + observer.observe_f64(&cpu_usage_gauge, process.cpu_usage().into(), &[]); + } + observer.observe_i64(&cpu_user_time_gauge, usage.user_time().num_seconds(), &[]); + observer.observe_i64( + &cpu_system_time_gauge, + usage.system_time().num_seconds(), + &[], + ); + }, + ) + .expect("Initializing meter callback should not fail"); +} + +impl Drop for Runner { + fn drop(&mut self) { + self.close_sender.blocking_send(()).unwrap(); + self.telemetry_thread_handle + .take() + .unwrap() + .join() + .expect("telemetry thread drop failed"); + } +} + +pub fn maybe_run_telemetry_thread(graph: &dyn Graph, config: Config) -> Option { + match config { + Config::Enabled(config) => { + info!("Telemetry enabled"); + debug!("Telemetry config: {config:?}"); + let telemetry = Telemetry::new(config); + let stats_shared = Arc::new(ArcSwapOption::from(None)); + let runner = Runner::run(telemetry, stats_shared.clone()); + + graph + .attach_prober( + Box::new(move |prober_stats| stats_shared.store(Some(Arc::new(prober_stats)))), + false, + false, + ) + .expect("failed to start telemetry thread"); + + Some(runner) + } + Config::Disabled => { + debug!("Telemetry disabled"); + None + } + } +} diff --git a/src/python_api.rs b/src/python_api.rs index 0b16fbe3..b1dd0e15 100644 --- a/src/python_api.rs +++ b/src/python_api.rs @@ -5,7 +5,9 @@ // `PyRef`s need to be passed by value #![allow(clippy::needless_pass_by_value)] -use crate::engine::{graph::SubscribeCallbacksBuilder, Computer as EngineComputer, Expressions}; +use crate::engine::{ + graph::SubscribeCallbacksBuilder, license::License, Computer as EngineComputer, Expressions, +}; use csv::ReaderBuilder as CsvReaderBuilder; use elasticsearch::{ auth::Credentials as ESCredentials, @@ -30,6 +32,7 @@ use pyo3::pyclass::CompareOp; use pyo3::sync::GILOnceCell; use pyo3::types::{PyBool, PyBytes, PyDict, PyFloat, PyInt, PyString, PyTuple, PyType}; use pyo3::{AsPyPointer, PyTypeInfo}; +use pyo3_log::ResetHandle; use rdkafka::consumer::{BaseConsumer, Consumer}; use rdkafka::producer::{DefaultProducerContext, ThreadedProducer}; use rdkafka::ClientConfig; @@ -69,6 +72,7 @@ use crate::engine::graph::ScopedContext; use crate::engine::progress_reporter::MonitoringLevel; use crate::engine::reduce::StatefulCombineFn; use crate::engine::time::DateTime; +use crate::engine::Config as EngineTelemetryConfig; use crate::engine::ReducerData; use crate::engine::{ run_with_new_dataflow_graph, BatchWrapper, ColumnHandle, ColumnPath, @@ -2707,7 +2711,10 @@ pub fn make_captured_table(table_data: Vec) -> PyResult, + license_key: Option, + telemetry_server: Option, + trace_parent: Option, ) -> PyResult>> { + LOGGING_RESET_HANDLE.reset(); defer! { log::logger().flush(); } @@ -2731,6 +2742,8 @@ pub fn run_with_new_graph( None } }; + let license = License::new(license_key)?; + let telemetry_config = EngineTelemetryConfig::create(license, telemetry_server, trace_parent)?; let results: Vec> = run_with_wakeup_receiver(py, |wakeup_receiver| { py.allow_threads(|| { run_with_new_dataflow_graph( @@ -2761,6 +2774,8 @@ pub fn run_with_new_graph( with_http_server, persistence_config, num_workers, + license, + telemetry_config, ) }) })??; @@ -3213,6 +3228,49 @@ impl PersistenceConfig { } } +#[derive(Clone, Debug, Default)] +#[pyclass(module = "pathway.engine", frozen, get_all)] +pub struct TelemetryConfig { + telemetry_enabled: bool, + telemetry_server_endpoint: Option, + service_name: Option, + service_version: Option, + run_id: String, +} + +#[pymethods] +impl TelemetryConfig { + #[staticmethod] + #[pyo3(signature = ( + *, + license_key = None, + telemetry_server = None, + ))] + fn create( + license_key: Option, + telemetry_server: Option, + ) -> PyResult { + let license = License::new(license_key)?; + let config = EngineTelemetryConfig::create(license, telemetry_server, None)?; + Ok(config.into()) + } +} + +impl From for TelemetryConfig { + fn from(config: EngineTelemetryConfig) -> Self { + match config { + EngineTelemetryConfig::Enabled(config) => Self { + telemetry_enabled: true, + telemetry_server_endpoint: Some(config.telemetry_server_endpoint), + service_name: Some(config.service_name), + service_version: Some(config.service_version), + run_id: config.run_id, + }, + EngineTelemetryConfig::Disabled => Self::default(), + } + } +} + impl<'source> FromPyObject<'source> for SnapshotEvent { fn extract(ob: &'source PyAny) -> PyResult { Ok(ob.extract::>()?.0.clone()) @@ -4193,10 +4251,13 @@ fn run_with_wakeup_receiver( Ok(logic(Some(wakeup_receiver))) } +static LOGGING_RESET_HANDLE: Lazy = Lazy::new(logging::init); + #[pymodule] #[pyo3(name = "engine")] fn module(_py: Python<'_>, m: &PyModule) -> PyResult<()> { - logging::init(); + // Initialize the logging + let _ = Lazy::force(&LOGGING_RESET_HANDLE); m.add_class::()?; m.add_class::()?; @@ -4231,6 +4292,7 @@ fn module(_py: Python<'_>, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/src/python_api/logging.rs b/src/python_api/logging.rs index 81dfa375..5c1f8dab 100644 --- a/src/python_api/logging.rs +++ b/src/python_api/logging.rs @@ -164,8 +164,8 @@ impl Log for Logger { } } -pub fn init() { +pub fn init() -> ResetHandle { Logger::default() .install() - .expect("initializing the logger should not fail"); + .expect("initializing the logger should not fail") }