diff --git a/Cargo.lock b/Cargo.lock index b63cbed4..e36551c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -178,9 +178,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.5.0" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf4b9d6a944f767f8e5e0db018570623c85f3d925ac718db4e06d0187adb21c1" +checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" [[package]] name = "block-buffer" @@ -211,9 +211,9 @@ checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" [[package]] name = "cc" -version = "1.0.99" +version = "1.0.104" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96c51067fd44124faa7f870b4b1c969379ad32b2ba805aa959430ceaa384f695" +checksum = "74b6a57f98764a267ff415d50a25e6e166f3831a5071af4995296ea97d210490" [[package]] name = "cfg-if" @@ -233,7 +233,7 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-targets 0.52.5", + "windows-targets 0.52.6", ] [[package]] @@ -261,6 +261,30 @@ dependencies = [ "libc", ] +[[package]] +name = "crc" +version = "3.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69e6e4d7b33a94f0991c26729976b10ebde1d34c3ee82408fb536164fa10d636" +dependencies = [ + "crc-catalog", +] + +[[package]] +name = "crc-catalog" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" + +[[package]] +name = "crc32fast" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a97769d94ddab943e4510d138150169a2758b5ef3eb191a9ee688de3e23ef7b3" +dependencies = [ + "cfg-if", +] + [[package]] name = "crypto-common" version = "0.1.6" @@ -303,17 +327,6 @@ dependencies = [ "subtle", ] -[[package]] -name = "displaydoc" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "encoding_rs" version = "0.8.34" @@ -365,6 +378,16 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" +[[package]] +name = "flate2" +version = "1.0.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f54427cfd1c7829e2a139fcefea601bf088ebca651d2bf53ebc600eac295dae" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "fnv" version = "1.0.7" @@ -593,9 +616,9 @@ dependencies = [ [[package]] name = "httparse" -version = "1.9.3" +version = "1.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0e7a4dd27b9476dc40cb050d3632d3bba3a70ddbff012285f7f8559a1e7e545" +checksum = "0fcc0b4a115bf80b728eb8ea024ad5bd707b615bfed49e0665b6e0f86fd082d9" [[package]] name = "httpdate" @@ -605,9 +628,9 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "hyper" -version = "1.3.1" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe575dd17d0862a9a33781c8c4696a55c320909004a67a00fb286ba8b1bc496d" +checksum = "c4fe55fb7a772d59a5ff1dfbff4fe0258d19b89fec4b233e75d35d5d2316badc" dependencies = [ "bytes", "futures-channel", @@ -624,6 +647,23 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.27.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ee4be2c948921a1a5320b629c4193916ed787a7f7f293fd3f7f5a6c9de74155" +dependencies = [ + "futures-util", + "http", + "hyper", + "hyper-util", + "rustls", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tower-service", +] + [[package]] name = "hyper-tls" version = "0.6.0" @@ -642,9 +682,9 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b875924a60b96e5d7b9ae7b066540b1dd1cbd90d1828f54c92e02a283351c56" +checksum = "3ab92f4f49ee4fb4f997c784b7a2e0fa70050211e0b6a287f898c3c9785ca956" dependencies = [ "bytes", "futures-channel", @@ -683,134 +723,14 @@ dependencies = [ "cc", ] -[[package]] -name = "icu_collections" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db2fa452206ebee18c4b5c2274dbf1de17008e874b4dc4f0aea9d01ca79e4526" -dependencies = [ - "displaydoc", - "yoke", - "zerofrom", - "zerovec", -] - -[[package]] -name = "icu_locid" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13acbb8371917fc971be86fc8057c41a64b521c184808a698c02acc242dbf637" -dependencies = [ - "displaydoc", - "litemap", - "tinystr", - "writeable", - "zerovec", -] - -[[package]] -name = "icu_locid_transform" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01d11ac35de8e40fdeda00d9e1e9d92525f3f9d887cdd7aa81d727596788b54e" -dependencies = [ - "displaydoc", - "icu_locid", - "icu_locid_transform_data", - "icu_provider", - "tinystr", - "zerovec", -] - -[[package]] -name = "icu_locid_transform_data" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fdc8ff3388f852bede6b579ad4e978ab004f139284d7b28715f773507b946f6e" - -[[package]] -name = "icu_normalizer" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19ce3e0da2ec68599d193c93d088142efd7f9c5d6fc9b803774855747dc6a84f" -dependencies = [ - "displaydoc", - "icu_collections", - "icu_normalizer_data", - "icu_properties", - "icu_provider", - "smallvec", - "utf16_iter", - "utf8_iter", - "write16", - "zerovec", -] - -[[package]] -name = "icu_normalizer_data" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8cafbf7aa791e9b22bec55a167906f9e1215fd475cd22adfcf660e03e989516" - -[[package]] -name = "icu_properties" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f8ac670d7422d7f76b32e17a5db556510825b29ec9154f235977c9caba61036" -dependencies = [ - "displaydoc", - "icu_collections", - "icu_locid_transform", - "icu_properties_data", - "icu_provider", - "tinystr", - "zerovec", -] - -[[package]] -name = "icu_properties_data" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67a8effbc3dd3e4ba1afa8ad918d5684b8868b3b26500753effea8d2eed19569" - -[[package]] -name = "icu_provider" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ed421c8a8ef78d3e2dbc98a973be2f3770cb42b606e3ab18d6237c4dfde68d9" -dependencies = [ - "displaydoc", - "icu_locid", - "icu_provider_macros", - "stable_deref_trait", - "tinystr", - "writeable", - "yoke", - "zerofrom", - "zerovec", -] - -[[package]] -name = "icu_provider_macros" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "idna" -version = "1.0.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4716a3a0933a1d01c2f72450e89596eb51dd34ef3c211ccd875acdf1f8fe47ed" +checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" dependencies = [ - "icu_normalizer", - "icu_properties", - "smallvec", - "utf8_iter", + "unicode-bidi", + "unicode-normalization", ] [[package]] @@ -844,6 +764,25 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kafka" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2054ba4edcb4dcda4209e138c7e88caf26d4a325b3db76fbdb6ca5eecc23e426" +dependencies = [ + "byteorder", + "crc", + "flate2", + "fnv", + "openssl", + "openssl-sys", + "ref_slice", + "snap", + "thiserror", + "tracing", + "twox-hash", +] + [[package]] name = "lard_api" version = "0.1.0" @@ -869,6 +808,8 @@ dependencies = [ "chrono", "csv", "futures", + "kafka", + "quick-xml", "regex", "serde", "test-case", @@ -912,12 +853,6 @@ version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" -[[package]] -name = "litemap" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "643cb0b8d4fcc284004d5fd0d67ccf61dfffadb7f75e1e71bc420f4688a3a704" - [[package]] name = "lock_api" version = "0.4.12" @@ -930,9 +865,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.21" +version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" +checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" [[package]] name = "matchit" @@ -964,9 +899,9 @@ checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" [[package]] name = "miniz_oxide" -version = "0.7.3" +version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87dfd01fe195c66b572b37921ad8803d010623c0aca821bea2302239d155cdae" +checksum = "b8a240ddb74feaf34a79a7add65a741f3167852fba007066dcac1ca548d89c08" dependencies = [ "adler", ] @@ -1021,9 +956,9 @@ dependencies = [ [[package]] name = "object" -version = "0.36.0" +version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "576dfe1fc8f9df304abb159d767a29d0476f7750fbf8aa7ad07816004a207434" +checksum = "081b846d1d56ddfc18fdf1a922e4f6e07a11768ea1b92dec44e42b72712ccfce" dependencies = [ "memchr", ] @@ -1040,7 +975,7 @@ version = "0.10.64" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95a0481286a310808298130d22dd1fef0fa571e05a8f44ec801801e84b216b1f" dependencies = [ - "bitflags 2.5.0", + "bitflags 2.6.0", "cfg-if", "foreign-types", "libc", @@ -1098,7 +1033,7 @@ dependencies = [ "libc", "redox_syscall 0.5.2", "smallvec", - "windows-targets 0.52.5", + "windows-targets 0.52.6", ] [[package]] @@ -1214,13 +1149,23 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "proc-macro2" -version = "1.0.85" +version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22244ce15aa966053a896d1accb3a6e68469b97c7f33f284b99f0d576879fc23" +checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77" dependencies = [ "unicode-ident", ] +[[package]] +name = "quick-xml" +version = "0.35.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86e446ed58cef1bbfe847bc2fda0e2e4ea9f0e57b90c507d4781292590d72a4e" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quote" version = "1.0.36" @@ -1285,9 +1230,15 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c82cf8cff14456045f55ec4241383baeff27af886adb72ffb2162f99911de0fd" dependencies = [ - "bitflags 2.5.0", + "bitflags 2.6.0", ] +[[package]] +name = "ref_slice" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4ed1d73fb92eba9b841ba2aef69533a060ccc0d3ec71c90aeda5996d4afb7a9" + [[package]] name = "regex" version = "1.10.5" @@ -1319,9 +1270,9 @@ checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" [[package]] name = "reqwest" -version = "0.12.4" +version = "0.12.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "566cafdd92868e0939d3fb961bd0dc25fcfaaed179291093b3d43e6b3150ea10" +checksum = "c7d6d2a27d57148378eb5e111173f4276ad26340ecc5c49a4a2152167a2d6a37" dependencies = [ "base64 0.22.1", "bytes", @@ -1333,6 +1284,7 @@ dependencies = [ "http-body", "http-body-util", "hyper", + "hyper-rustls", "hyper-tls", "hyper-util", "ipnet", @@ -1347,7 +1299,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", - "sync_wrapper 0.1.2", + "sync_wrapper 1.0.1", "system-configuration", "tokio", "tokio-native-tls", @@ -1359,6 +1311,21 @@ dependencies = [ "winreg", ] +[[package]] +name = "ring" +version = "0.17.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d" +dependencies = [ + "cc", + "cfg-if", + "getrandom", + "libc", + "spin", + "untrusted", + "windows-sys 0.52.0", +] + [[package]] name = "rustc-demangle" version = "0.1.24" @@ -1371,13 +1338,26 @@ version = "0.38.34" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f" dependencies = [ - "bitflags 2.5.0", + "bitflags 2.6.0", "errno", "libc", "linux-raw-sys", "windows-sys 0.52.0", ] +[[package]] +name = "rustls" +version = "0.23.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05cff451f60db80f490f3c182b77c35260baace73209e9cdbbe526bfe3a4d402" +dependencies = [ + "once_cell", + "rustls-pki-types", + "rustls-webpki", + "subtle", + "zeroize", +] + [[package]] name = "rustls-pemfile" version = "2.1.2" @@ -1394,6 +1374,17 @@ version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "976295e77ce332211c0d24d92c0e83e50f5c5f046d11082cea19f3df13a3562d" +[[package]] +name = "rustls-webpki" +version = "0.102.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9a6fccd794a42c2c105b513a2f62bc3fd8f3ba57a4593677ceb0bd035164d78" +dependencies = [ + "ring", + "rustls-pki-types", + "untrusted", +] + [[package]] name = "rustversion" version = "1.0.17" @@ -1427,7 +1418,7 @@ version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c627723fd09706bacdb5cf41499e95098555af3c3c29d014dc3c458ef6be11c0" dependencies = [ - "bitflags 2.5.0", + "bitflags 2.6.0", "core-foundation", "core-foundation-sys", "libc", @@ -1466,9 +1457,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.117" +version = "1.0.120" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "455182ea6142b14f93f4bc5320a2b31c1f266b66a4a5c858b013302a5d8cbfc3" +checksum = "4e0d21c9a8cae1235ad58a00c11cb40d4b1e5c784f1ef2c537876ed6ffd8b7c5" dependencies = [ "itoa", "ryu", @@ -1529,6 +1520,12 @@ version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" +[[package]] +name = "snap" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" + [[package]] name = "socket2" version = "0.5.7" @@ -1540,10 +1537,16 @@ dependencies = [ ] [[package]] -name = "stable_deref_trait" -version = "1.2.0" +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" + +[[package]] +name = "static_assertions" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" [[package]] name = "stringprep" @@ -1558,15 +1561,15 @@ dependencies = [ [[package]] name = "subtle" -version = "2.5.0" +version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" +checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "syn" -version = "2.0.66" +version = "2.0.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c42f3f41a2de00b01c0aaad383c5a45241efc8b2d1eda5661812fda5f3cdcff5" +checksum = "901fa70d88b9d6c98022e23b4136f9f3e54e4662c3bc1bd1d84a42a9a0f0c1e9" dependencies = [ "proc-macro2", "quote", @@ -1585,17 +1588,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" -[[package]] -name = "synstructure" -version = "0.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "system-configuration" version = "0.5.1" @@ -1682,21 +1674,11 @@ dependencies = [ "syn", ] -[[package]] -name = "tinystr" -version = "0.7.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9117f5d4db391c1cf6927e7bea3db74b9a1c1add8f7eda9ffd5364f40f57b82f" -dependencies = [ - "displaydoc", - "zerovec", -] - [[package]] name = "tinyvec" -version = "1.6.0" +version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50" +checksum = "c55115c6fbe2d2bef26eb09ad74bde02d8255476fc0c7b515ef09fbb35742d82" dependencies = [ "tinyvec_macros", ] @@ -1772,6 +1754,17 @@ dependencies = [ "whoami", ] +[[package]] +name = "tokio-rustls" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" +dependencies = [ + "rustls", + "rustls-pki-types", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.11" @@ -1821,9 +1814,21 @@ checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ "log", "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tracing-core" version = "0.1.32" @@ -1839,6 +1844,17 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "twox-hash" +version = "1.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" +dependencies = [ + "cfg-if", + "rand", + "static_assertions", +] + [[package]] name = "typenum" version = "1.17.0" @@ -1872,29 +1888,23 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e4259d9d4425d9f0661581b804cb85fe66a4c631cadd8f490d1c13a35d5d9291" +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + [[package]] name = "url" -version = "2.5.1" +version = "2.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7c25da092f0a868cdf09e8674cd3b7ef3a7d92a24253e663a2fb85e2496de56" +checksum = "22784dbdf76fdde8af1aeda5622b546b422b6fc585325248a2bf9f5e41e94d6c" dependencies = [ "form_urlencoded", "idna", "percent-encoding", ] -[[package]] -name = "utf16_iter" -version = "1.0.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8232dd3cdaed5356e0f716d285e4b40b932ac434100fe9b7e0e8e935b9e6246" - -[[package]] -name = "utf8_iter" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" - [[package]] name = "vcpkg" version = "0.2.15" @@ -2021,7 +2031,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" dependencies = [ - "windows-targets 0.52.5", + "windows-targets 0.52.6", ] [[package]] @@ -2039,7 +2049,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets 0.52.5", + "windows-targets 0.52.6", ] [[package]] @@ -2059,18 +2069,18 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f0713a46559409d202e70e28227288446bf7841d3211583a4b53e3f6d96e7eb" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" dependencies = [ - "windows_aarch64_gnullvm 0.52.5", - "windows_aarch64_msvc 0.52.5", - "windows_i686_gnu 0.52.5", + "windows_aarch64_gnullvm 0.52.6", + "windows_aarch64_msvc 0.52.6", + "windows_i686_gnu 0.52.6", "windows_i686_gnullvm", - "windows_i686_msvc 0.52.5", - "windows_x86_64_gnu 0.52.5", - "windows_x86_64_gnullvm 0.52.5", - "windows_x86_64_msvc 0.52.5", + "windows_i686_msvc 0.52.6", + "windows_x86_64_gnu 0.52.6", + "windows_x86_64_gnullvm 0.52.6", + "windows_x86_64_msvc 0.52.6", ] [[package]] @@ -2081,9 +2091,9 @@ checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" [[package]] name = "windows_aarch64_gnullvm" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7088eed71e8b8dda258ecc8bac5fb1153c5cffaf2578fc8ff5d61e23578d3263" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" [[package]] name = "windows_aarch64_msvc" @@ -2093,9 +2103,9 @@ checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" [[package]] name = "windows_aarch64_msvc" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9985fd1504e250c615ca5f281c3f7a6da76213ebd5ccc9561496568a2752afb6" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" [[package]] name = "windows_i686_gnu" @@ -2105,15 +2115,15 @@ checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" [[package]] name = "windows_i686_gnu" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88ba073cf16d5372720ec942a8ccbf61626074c6d4dd2e745299726ce8b89670" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" [[package]] name = "windows_i686_gnullvm" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87f4261229030a858f36b459e748ae97545d6f1ec60e5e0d6a3d32e0dc232ee9" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" [[package]] name = "windows_i686_msvc" @@ -2123,9 +2133,9 @@ checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" [[package]] name = "windows_i686_msvc" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db3c2bf3d13d5b658be73463284eaf12830ac9a26a90c717b7f771dfe97487bf" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" [[package]] name = "windows_x86_64_gnu" @@ -2135,9 +2145,9 @@ checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" [[package]] name = "windows_x86_64_gnu" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e4246f76bdeff09eb48875a0fd3e2af6aada79d409d33011886d3e1581517d9" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" [[package]] name = "windows_x86_64_gnullvm" @@ -2147,9 +2157,9 @@ checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" [[package]] name = "windows_x86_64_gnullvm" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "852298e482cd67c356ddd9570386e2862b5673c85bd5f88df9ab6802b334c596" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" [[package]] name = "windows_x86_64_msvc" @@ -2159,9 +2169,9 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "windows_x86_64_msvc" -version = "0.52.5" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" [[package]] name = "winreg" @@ -2174,80 +2184,7 @@ dependencies = [ ] [[package]] -name = "write16" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1890f4022759daae28ed4fe62859b1236caebfc61ede2f63ed4e695f3f6d936" - -[[package]] -name = "writeable" -version = "0.5.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51" - -[[package]] -name = "yoke" -version = "0.7.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c5b1314b079b0930c31e3af543d8ee1757b1951ae1e1565ec704403a7240ca5" -dependencies = [ - "serde", - "stable_deref_trait", - "yoke-derive", - "zerofrom", -] - -[[package]] -name = "yoke-derive" -version = "0.7.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28cc31741b18cb6f1d5ff12f5b7523e3d6eb0852bbbad19d73905511d9849b95" -dependencies = [ - "proc-macro2", - "quote", - "syn", - "synstructure", -] - -[[package]] -name = "zerofrom" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91ec111ce797d0e0784a1116d0ddcdbea84322cd79e5d5ad173daeba4f93ab55" -dependencies = [ - "zerofrom-derive", -] - -[[package]] -name = "zerofrom-derive" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ea7b4a3637ea8669cedf0f1fd5c286a17f3de97b8dd5a70a6c167a1730e63a5" -dependencies = [ - "proc-macro2", - "quote", - "syn", - "synstructure", -] - -[[package]] -name = "zerovec" -version = "0.10.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb2cc8827d6c0994478a15c53f374f46fbd41bea663d809b14744bc42e6b109c" -dependencies = [ - "yoke", - "zerofrom", - "zerovec-derive", -] - -[[package]] -name = "zerovec-derive" -version = "0.10.2" +name = "zeroize" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97cf56601ee5052b4417d90c8755c6683473c926039908196cf35d99f893ebe7" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] +checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" diff --git a/Cargo.toml b/Cargo.toml index 4bff1c3a..d5af5164 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,9 @@ bytes = "1.5.0" chrono = { version = "0.4.31", features = ["serde"] } csv = "1.3.0" futures = "0.3.28" +kafka = "0.10.0" postgres-types = { version = "0.2.6", features = ["derive", "with-chrono-0_4"] } +quick-xml = { version = "0.35.0", features = [ "serialize", "overlapped-lists" ] } rand = "0.8.5" rand_distr = "0.4.3" regex = "1.10.2" diff --git a/api/src/lib.rs b/api/src/lib.rs index e1120a22..a3059c0e 100644 --- a/api/src/lib.rs +++ b/api/src/lib.rs @@ -118,11 +118,8 @@ async fn latest_handler( Ok(Json(LatestResp { data })) } -pub async fn run(connect_string: &str) { - // set up postgres connection pool - let manager = PostgresConnectionManager::new_from_stringlike(connect_string, NoTls).unwrap(); - let pool = bb8::Pool::builder().build(manager).await.unwrap(); +pub async fn run(pool: PgConnectionPool) { // build our application with routes let app = Router::new() .route( diff --git a/api/src/main.rs b/api/src/main.rs index 28b4b6a8..fd762231 100644 --- a/api/src/main.rs +++ b/api/src/main.rs @@ -1,3 +1,6 @@ +use bb8_postgres::PostgresConnectionManager; +use tokio_postgres::NoTls; + #[tokio::main] async fn main() { let args: Vec = std::env::args().collect(); @@ -12,5 +15,9 @@ async fn main() { connect_string.push_str(&args[4]) } - lard_api::run(&connect_string).await; + // set up postgres connection pool + let manager = PostgresConnectionManager::new_from_stringlike(connect_string, NoTls).unwrap(); + let pool = bb8::Pool::builder().build(manager).await.unwrap(); + + lard_api::run(pool).await; } diff --git a/db/flags.sql b/db/flags.sql new file mode 100644 index 00000000..bdb8f50f --- /dev/null +++ b/db/flags.sql @@ -0,0 +1,15 @@ +CREATE SCHEMA IF NOT EXISTS flags; + +CREATE TABLE IF NOT EXISTS flags.kvdata ( + timeseries INT4 REFERENCES public.timeseries, + obstime TIMESTAMPTZ NOT NULL, + original REAL NULL, -- could decide not to store this in the future? (KDVH migration will not contain this) + corrected REAL NULL, + controlinfo TEXT NULL, + useinfo TEXT NULL, + cfailed INT4 NULL, + CONSTRAINT unique_kvdata_timeseries_obstime UNIQUE (timeseries, obstime) +); + +CREATE INDEX IF NOT EXISTS kvdata_obtime_index ON flags.kvdata (obstime); +CREATE INDEX IF NOT EXISTS kvdata_timeseries_index ON flags.kvdata USING HASH (timeseries); diff --git a/ingestion/Cargo.toml b/ingestion/Cargo.toml index 5cbfcec3..421b90ba 100644 --- a/ingestion/Cargo.toml +++ b/ingestion/Cargo.toml @@ -14,6 +14,8 @@ bytes.workspace = true chrono.workspace = true csv.workspace = true futures.workspace = true +kafka.workspace = true +quick-xml.workspace = true regex.workspace = true serde.workspace = true thiserror.workspace = true diff --git a/ingestion/src/kvkafka.rs b/ingestion/src/kvkafka.rs new file mode 100644 index 00000000..0a99eebf --- /dev/null +++ b/ingestion/src/kvkafka.rs @@ -0,0 +1,320 @@ +use chrono::{DateTime, NaiveDateTime, Utc}; +use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage}; +use serde::{Deserialize, Deserializer}; +use thiserror::Error; +use tokio::sync::mpsc; + +use crate::PgConnectionPool; + +#[derive(Error, Debug)] +pub enum Error { + #[error("parsing xml error: {0}")] + IssueParsingXML(String), + #[error("parsing time error: {0}")] + IssueParsingTime(#[from] chrono::ParseError), + #[error("kafka returned an error: {0}")] + Kafka(#[from] kafka::Error), + #[error("postgres returned an error: {0}")] + Database(#[from] tokio_postgres::Error), + #[error("no Timeseries ID found for this data - station {0}, param {1}")] + TimeseriesMissing(i32, i32), + #[error("error while deserializing message: {0}")] + Deserialize(#[from] quick_xml::DeError), +} + +#[derive(Debug, Deserialize)] +/// Represents ... +struct KvalobsData { + #[serde(rename = "station")] + stations: Vec, +} +#[derive(Debug, Deserialize)] +/// Represents ... +struct Station { + #[serde(rename = "@val")] + val: i32, + #[serde(rename = "typeid")] + typeids: Vec, +} +#[derive(Debug, Deserialize)] +/// Represents ... +struct Typeid { + #[serde(rename = "@val")] + val: i32, + #[serde(rename = "obstime")] + obstimes: Vec, +} +#[derive(Debug, Deserialize)] +/// Represents ... +struct Obstime { + #[serde(rename = "@val")] + val: String, // avoiding parsing time at this point... + #[serde(rename = "tbtime")] + tbtimes: Vec, +} +#[derive(Debug, Deserialize)] +/// Represents ... +struct Tbtime { + #[serde(rename = "@val")] + _val: String, // avoiding parsing time at this point... + _kvtextdata: Option>, + #[serde(rename = "sensor")] + sensors: Vec, +} +/// Represents ... +#[derive(Debug, Deserialize)] +struct Kvtextdata { + _paramid: Option, + _original: Option, +} +#[derive(Debug, Deserialize)] +/// Represents ... +struct Sensor { + #[serde(rename = "@val", deserialize_with = "zero_to_none")] + val: Option, + #[serde(rename = "level")] + levels: Vec, +} +/// Represents ... +#[derive(Debug, Deserialize)] +struct Level { + #[serde(rename = "@val", deserialize_with = "zero_to_none")] + val: Option, + kvdata: Option>, +} + +// Deserialize sensor and level to null if they are 0 +// 0 is the default for kvalobs, but through obsinn it's actually just missing +fn zero_to_none<'de, D>(des: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + Option::deserialize(des).map(|opt| match opt { + Some("0") | Some("") | None => None, + Some(val) => Some(val.parse::().unwrap()), + }) +} + +/// Represents ... +#[derive(Debug, Deserialize)] +pub struct Kvdata { + #[serde(rename = "@paramid")] + paramid: i32, + #[serde(default, deserialize_with = "optional")] + original: Option, + #[serde(default, deserialize_with = "optional")] + corrected: Option, + #[serde(default, deserialize_with = "optional")] + controlinfo: Option, + #[serde(default, deserialize_with = "optional")] + useinfo: Option, + #[serde(default, deserialize_with = "optional")] + cfailed: Option, +} + +// If the field is either empty or missing it should deserialize to None. +// The latter is ensured by the #[serde(default)] macro, +// while this function takes care of the former case. +fn optional<'de, D, T>(des: D) -> Result, D::Error> +where + D: Deserializer<'de>, + T: std::str::FromStr, + ::Err: std::fmt::Debug, +{ + Option::deserialize(des).map(|opt| match opt { + Some("") | None => None, + Some(val) => Some(val.parse::().unwrap()), + }) +} + +#[derive(Debug, Deserialize)] +struct KvalobsId { + station: i32, + paramid: i32, + typeid: i32, + sensor: Option, + level: Option, +} + +#[derive(Debug)] +pub struct Msg { + kvid: KvalobsId, + obstime: DateTime, + kvdata: Kvdata, +} + +pub async fn read_and_insert(pool: PgConnectionPool, group_string: String) { + let (tx, mut rx) = mpsc::channel(10); + + tokio::spawn(async move { + read_kafka(group_string, tx).await; + }); + + let client = pool.get().await.expect("couldn't connect to database"); + while let Some(msg) = rx.recv().await { + if let Err(e) = insert_kvdata(&client, msg).await { + eprintln!("Database insert error: {e}"); + } + } +} + +pub async fn parse_message(message: &[u8], tx: &mpsc::Sender) -> Result<(), Error> { + // do some basic trimming / processing of the raw message + // received from the kafka queue + let xmlmsg = std::str::from_utf8(message) + .map_err(|_| Error::IssueParsingXML("couldn't convert message from utf8".to_string()))? + .trim() + .replace(['\n', '\\'], ""); + + // do some checking / further processing of message + if !xmlmsg.starts_with("") { + Some(loc) => &xmlmsg[(loc + 2)..], + None => { + return Err(Error::IssueParsingXML( + "couldn't find end of xml tag '?>'".to_string(), + )) + } + }; + let item: KvalobsData = quick_xml::de::from_str(xmlmsg)?; + + // get the useful stuff out of this struct + for station in item.stations { + for typeid in station.typeids { + for obstime in typeid.obstimes { + let obs_time = + match NaiveDateTime::parse_from_str(&obstime.val, "%Y-%m-%d %H:%M:%S") { + Ok(time) => time.and_utc(), + Err(e) => { + eprintln!("{}", Error::IssueParsingTime(e)); + continue; + } + }; + for tbtime in obstime.tbtimes { + // NOTE: tbtime is "table time" which can vary from the actual observation time, + // it's the first time it entered the db in kvalobs. Currently not using it + // TODO: Do we want to handle text data at all? It doesn't seem to be QCed + // if let Some(textdata) = tbtime.kvtextdata {...} + for sensor in tbtime.sensors { + for level in sensor.levels { + if let Some(kvdata) = level.kvdata { + for data in kvdata { + let msg = Msg { + kvid: KvalobsId { + station: station.val, + paramid: data.paramid, + typeid: typeid.val, + sensor: sensor.val, + level: level.val, + }, + obstime: obs_time, + kvdata: data, + }; + tx.send(msg).await.unwrap(); + } + } + } + } + } + } + } + } + + Ok(()) +} + +async fn read_kafka(group_name: String, tx: mpsc::Sender) { + // NOTE: reading from the 4 redundant kafka queues, but only reading the checked data (other topics exists) + let mut consumer = Consumer::from_hosts(vec![ + "kafka2-a1.met.no:9092".to_owned(), + "kafka2-a2.met.no:9092".to_owned(), + "kafka2-b1.met.no:9092".to_owned(), + "kafka2-b2.met.no:9092".to_owned(), + ]) + .with_topic_partitions("kvalobs.production.checked".to_owned(), &[0, 1]) + .with_fallback_offset(FetchOffset::Earliest) + .with_group(group_name) + .with_offset_storage(Some(GroupOffsetStorage::Kafka)) + .create() + .expect("failed to create consumer"); + + // Consume the kafka queue infinitely + loop { + // https://docs.rs/kafka/latest/src/kafka/consumer/mod.rs.html#155 + // poll asks for next available chunk of data as a MessageSet + match consumer.poll() { + Ok(sets) => { + for msgset in sets.iter() { + for msg in msgset.messages() { + if let Err(e) = parse_message(msg.value, &tx).await { + eprintln!("{}", e); + } + } + if let Err(e) = consumer.consume_messageset(msgset) { + eprintln!("{}", e); + } + } + consumer + .commit_consumed() + .expect("could not commit offset in consumer"); // ensure we keep offset + } + Err(e) => { + eprintln!("{}\nRetrying in 5 seconds...", Error::Kafka(e)); + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + } + } + } +} + +pub async fn insert_kvdata( + client: &tokio_postgres::Client, + Msg { + kvid, + obstime, + kvdata, + }: Msg, +) -> Result<(), Error> { + // query timeseries ID + // NOTE: alternately could use conn.query_one, since we want exactly one response + let tsid: i32 = client + .query( + "SELECT timeseries FROM labels.met + WHERE station_id = $1 + AND param_id = $2 + AND type_id = $3 + AND (($4::int IS NULL AND lvl IS NULL) OR (lvl = $4)) + AND (($5::int IS NULL AND sensor IS NULL) OR (sensor = $5))", + &[ + &kvid.station, + &kvid.paramid, + &kvid.typeid, + &kvid.level, + &kvid.sensor, + ], + ) + .await? + .first() + .ok_or(Error::TimeseriesMissing(kvid.station, kvid.paramid))? + .get(0); + + // write the data into the db + client.execute( + "INSERT INTO flags.kvdata (timeseries, obstime, original, corrected, controlinfo, useinfo, cfailed) + VALUES($1, $2, $3, $4, $5, $6, $7) + ON CONFLICT ON CONSTRAINT unique_kvdata_timeseries_obstime + DO UPDATE SET + original = EXCLUDED.original, + corrected = EXCLUDED.corrected, + controlinfo = EXCLUDED.controlinfo, + useinfo = EXCLUDED.useinfo, + cfailed = EXCLUDED.cfailed", + &[&tsid, &obstime, &kvdata.original, &kvdata.corrected, &kvdata.controlinfo, &kvdata.useinfo, &kvdata.cfailed], + ).await?; + + Ok(()) +} diff --git a/ingestion/src/lib.rs b/ingestion/src/lib.rs index 9a544407..4c4700db 100644 --- a/ingestion/src/lib.rs +++ b/ingestion/src/lib.rs @@ -16,6 +16,7 @@ use std::{ use thiserror::Error; use tokio_postgres::NoTls; +pub mod kvkafka; pub mod permissions; use permissions::{ParamPermitTable, StationPermitTable}; @@ -185,14 +186,10 @@ async fn handle_kldata( } pub async fn run( - connect_string: &str, + db_pool: PgConnectionPool, param_conversion_path: &str, permit_tables: Arc>, ) -> Result<(), Box> { - // set up postgres connection pool - let manager = PostgresConnectionManager::new_from_stringlike(connect_string, NoTls)?; - let db_pool = bb8::Pool::builder().build(manager).await?; - // set up param conversion map // TODO: extract to separate function? let param_conversions = Arc::new( diff --git a/ingestion/src/main.rs b/ingestion/src/main.rs index 664e3a82..fd4486d0 100644 --- a/ingestion/src/main.rs +++ b/ingestion/src/main.rs @@ -1,5 +1,7 @@ -use lard_ingestion::permissions::fetch_permits; +use bb8_postgres::PostgresConnectionManager; +use lard_ingestion::{kvkafka, permissions}; use std::sync::{Arc, RwLock}; +use tokio_postgres::NoTls; const PARAMCONV: &str = "resources/paramconversions.csv"; @@ -8,18 +10,21 @@ async fn main() -> Result<(), Box> { // TODO: use clap for argument parsing let args: Vec = std::env::args().collect(); - if args.len() < 4 { - panic!("not enough args passed in, at least host, user, dbname needed, optionally password") + if args.len() < 5 { + panic!(concat!( + "not enough args passed in. At least the group for the kafka queue,", + "and host, user, dbname needed, optionally password, for postgres" + )) } - let mut connect_string = format!("host={} user={} dbname={}", &args[1], &args[2], &args[3]); - if args.len() > 4 { + let mut connect_string = format!("host={} user={} dbname={}", &args[2], &args[3], &args[4]); + if args.len() > 5 { connect_string.push_str(" password="); - connect_string.push_str(&args[4]) + connect_string.push_str(&args[5]) }; // Permit tables handling (needs connection to stinfosys database) - let permit_tables = Arc::new(RwLock::new(fetch_permits().await?)); + let permit_tables = Arc::new(RwLock::new(permissions::fetch_permits().await?)); let background_permit_tables = permit_tables.clone(); // background task to refresh permit tables every 30 mins @@ -32,7 +37,7 @@ async fn main() -> Result<(), Box> { // TODO: better error handling here? Nothing is listening to what returns on this task // but we could surface failures in metrics. Also we maybe don't want to bork the task // forever if these functions fail - let new_tables = fetch_permits().await.unwrap(); + let new_tables = permissions::fetch_permits().await.unwrap(); let mut tables = background_permit_tables.write().unwrap(); *tables = new_tables; } @@ -40,6 +45,14 @@ async fn main() -> Result<(), Box> { } }); + // Set up postgres connection pool + let manager = PostgresConnectionManager::new_from_stringlike(connect_string, NoTls)?; + let db_pool = bb8::Pool::builder().build(manager).await?; + + // Spawn kvkafka reader + let kafka_group = args[1].to_string(); + tokio::spawn(kvkafka::read_and_insert(db_pool.clone(), kafka_group)); + // Set up and run our server + database - lard_ingestion::run(&connect_string, PARAMCONV, permit_tables).await + lard_ingestion::run(db_pool, PARAMCONV, permit_tables).await } diff --git a/integration_tests/Makefile b/integration_tests/Makefile index 720b185a..100555e7 100644 --- a/integration_tests/Makefile +++ b/integration_tests/Makefile @@ -6,6 +6,10 @@ end_to_end: _end_to_end clean _end_to_end: setup cargo test --test end_to_end --no-fail-fast -- --nocapture --test-threads=1 +kafka: _kafka clean +_kafka: setup + cargo test --test end_to_end test_kafka --features debug --no-fail-fast -- --nocapture --test-threads=1 + # With the `debug` feature, the database is not cleaned up after running the test, # so it can be inspected with psql. Run with: # TEST= make debug_test diff --git a/integration_tests/src/main.rs b/integration_tests/src/main.rs index 4514f227..5db6afe7 100644 --- a/integration_tests/src/main.rs +++ b/integration_tests/src/main.rs @@ -30,7 +30,7 @@ async fn main() { } }); - let schemas = ["db/public.sql", "db/labels.sql"]; + let schemas = ["db/public.sql", "db/labels.sql", "db/flags.sql"]; for schema in schemas { insert_schema(&client, schema).await.unwrap(); } diff --git a/integration_tests/tests/end_to_end.rs b/integration_tests/tests/end_to_end.rs index 409a1d76..6ba02214 100644 --- a/integration_tests/tests/end_to_end.rs +++ b/integration_tests/tests/end_to_end.rs @@ -4,13 +4,16 @@ use std::{ sync::{Arc, RwLock}, }; +use bb8_postgres::PostgresConnectionManager; use chrono::{DateTime, Duration, DurationRound, TimeDelta, TimeZone, Utc}; use futures::{Future, FutureExt}; use test_case::test_case; +use tokio::sync::mpsc; use tokio_postgres::NoTls; use lard_api::timeseries::Timeseries; use lard_api::{LatestResp, TimeseriesResp, TimesliceResp}; +use lard_ingestion::kvkafka; use lard_ingestion::permissions::{ timeseries_is_open, ParamPermit, ParamPermitTable, StationPermitTable, }; @@ -134,23 +137,16 @@ async fn cleanup(client: &tokio_postgres::Client) { } async fn e2e_test_wrapper>(test: T) { - let api_server = tokio::spawn(lard_api::run(CONNECT_STRING)); + let manager = PostgresConnectionManager::new_from_stringlike(CONNECT_STRING, NoTls).unwrap(); + let db_pool = bb8::Pool::builder().build(manager).await.unwrap(); + + let api_server = tokio::spawn(lard_api::run(db_pool.clone())); let ingestor = tokio::spawn(lard_ingestion::run( - CONNECT_STRING, + db_pool.clone(), PARAMCONV_CSV, mock_permit_tables(), )); - let (client, conn) = tokio_postgres::connect(CONNECT_STRING, NoTls) - .await - .unwrap(); - - tokio::spawn(async move { - if let Err(e) = conn.await { - eprintln!("{}", e); - } - }); - tokio::select! { _ = api_server => panic!("API server task terminated first"), _ = ingestor => panic!("Ingestor server task terminated first"), @@ -158,7 +154,10 @@ async fn e2e_test_wrapper>(test: T) { test_result = AssertUnwindSafe(test).catch_unwind() => { // For debugging a specific test, it might be useful to skip cleaning up #[cfg(not(feature = "debug"))] - cleanup(&client).await; + { + let client = db_pool.get().await.unwrap(); + cleanup(&client).await; + } assert!(test_result.is_ok()) } } @@ -379,3 +378,76 @@ async fn test_timeslice_endpoint() { }) .await } + +#[tokio::test] +async fn test_kafka() { + e2e_test_wrapper(async { + let (tx, mut rx) = mpsc::channel(10); + + let (pgclient, conn) = tokio_postgres::connect(CONNECT_STRING, NoTls) + .await + .unwrap(); + + tokio::spawn(async move { + if let Err(e) = conn.await { + eprintln!("{}", e) + } + }); + + // Spawn task to send message + tokio::spawn(async move { + let ts = TestData { + station_id: 20001, + params: &[Param::new(106, "RR_1")], // sum(precipitation_amount PT1H) + start_time: Utc.with_ymd_and_hms(2024, 6, 5, 12, 0, 0).unwrap(), + period: chrono::Duration::hours(1), + type_id: -4, + len: 24, + }; + + let client = reqwest::Client::new(); + let ingestor_resp = ingest_data(&client, ts.obsinn_message()).await; + assert_eq!(ingestor_resp.res, 0); + + // This observation was 2.5 hours late?? + let kafka_xml = r#" + + + + + + + + + 10 + 10 + 1000000000000000 + 9000000000000000 + + + + + + + + + "#; + + kvkafka::parse_message(kafka_xml.as_bytes(), &tx) + .await + .unwrap(); + }); + + // wait for message + if let Some(msg) = rx.recv().await { + kvkafka::insert_kvdata(&pgclient, msg).await.unwrap() + } + + // TODO: we do not have an API endpoint to query the flags.kvdata table + assert!(pgclient + .query_one("SELECT * FROM flags.kvdata", &[]) + .await + .is_ok()); + }) + .await +}