From 079e2a842692cd2b2c161739e11a0ebe467cb5c2 Mon Sep 17 00:00:00 2001 From: Santiago Fraire Willemoes Date: Tue, 19 Nov 2024 16:40:26 +0100 Subject: [PATCH 1/2] feat!: add benchmarks BREAKING CHANGE: python 3.8 is no longer supported --- poetry.lock | 143 +++++++++++++++++++++++++------------------------ pyproject.toml | 2 +- scripts/bench | 12 +++++ 3 files changed, 85 insertions(+), 72 deletions(-) create mode 100755 scripts/bench diff --git a/poetry.lock b/poetry.lock index 9c5d364..d5718fe 100644 --- a/poetry.lock +++ b/poetry.lock @@ -305,13 +305,13 @@ files = [ [[package]] name = "commitizen" -version = "3.31.0" +version = "4.0.0" description = "Python commitizen client tool" optional = false -python-versions = ">=3.8" +python-versions = ">=3.9" files = [ - {file = "commitizen-3.31.0-py3-none-any.whl", hash = "sha256:a28df7ab5b8665d48796c422a97dcfae0d0fce7e2d28404c0e386cf1ebd42c8f"}, - {file = "commitizen-3.31.0.tar.gz", hash = "sha256:6ab973e91d07c1e745c6c0efe6dd0708b1f6d8fd7e4ab5e7c773b5ceb3df4ff0"}, + {file = "commitizen-4.0.0-py3-none-any.whl", hash = "sha256:52873ee589a64cf77fc55570dbd3f987c6ffcd33132d179eb625c4d06ae935f7"}, + {file = "commitizen-4.0.0.tar.gz", hash = "sha256:16aff27e01b43015eab1c74eabbca3e284b4988dd1b146a0963282db241dc2c0"}, ] [package.dependencies] @@ -326,76 +326,77 @@ pyyaml = ">=3.08" questionary = ">=2.0,<3.0" termcolor = ">=1.1,<3" tomlkit = ">=0.5.3,<1.0.0" +typing-extensions = {version = ">=4.0.1,<5.0.0", markers = "python_version < \"3.11\""} [[package]] name = "coverage" -version = "7.6.7" +version = "7.6.8" description = "Code coverage measurement for Python" optional = false python-versions = ">=3.9" files = [ - {file = "coverage-7.6.7-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:108bb458827765d538abcbf8288599fee07d2743357bdd9b9dad456c287e121e"}, - {file = "coverage-7.6.7-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:c973b2fe4dc445cb865ab369df7521df9c27bf40715c837a113edaa2aa9faf45"}, - {file = "coverage-7.6.7-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3c6b24007c4bcd0b19fac25763a7cac5035c735ae017e9a349b927cfc88f31c1"}, - {file = "coverage-7.6.7-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:acbb8af78f8f91b3b51f58f288c0994ba63c646bc1a8a22ad072e4e7e0a49f1c"}, - {file = "coverage-7.6.7-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ad32a981bcdedb8d2ace03b05e4fd8dace8901eec64a532b00b15217d3677dd2"}, - {file = "coverage-7.6.7-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:34d23e28ccb26236718a3a78ba72744212aa383141961dd6825f6595005c8b06"}, - {file = "coverage-7.6.7-cp310-cp310-musllinux_1_2_i686.whl", hash = "sha256:e25bacb53a8c7325e34d45dddd2f2fbae0dbc230d0e2642e264a64e17322a777"}, - {file = "coverage-7.6.7-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:af05bbba896c4472a29408455fe31b3797b4d8648ed0a2ccac03e074a77e2314"}, - {file = "coverage-7.6.7-cp310-cp310-win32.whl", hash = "sha256:796c9b107d11d2d69e1849b2dfe41730134b526a49d3acb98ca02f4985eeff7a"}, - {file = "coverage-7.6.7-cp310-cp310-win_amd64.whl", hash = "sha256:987a8e3da7da4eed10a20491cf790589a8e5e07656b6dc22d3814c4d88faf163"}, - {file = "coverage-7.6.7-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:7e61b0e77ff4dddebb35a0e8bb5a68bf0f8b872407d8d9f0c726b65dfabe2469"}, - {file = "coverage-7.6.7-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:1a5407a75ca4abc20d6252efeb238377a71ce7bda849c26c7a9bece8680a5d99"}, - {file = "coverage-7.6.7-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:df002e59f2d29e889c37abd0b9ee0d0e6e38c24f5f55d71ff0e09e3412a340ec"}, - {file = "coverage-7.6.7-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:673184b3156cba06154825f25af33baa2671ddae6343f23175764e65a8c4c30b"}, - {file = "coverage-7.6.7-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e69ad502f1a2243f739f5bd60565d14a278be58be4c137d90799f2c263e7049a"}, - {file = "coverage-7.6.7-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:60dcf7605c50ea72a14490d0756daffef77a5be15ed1b9fea468b1c7bda1bc3b"}, - {file = "coverage-7.6.7-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:9c2eb378bebb2c8f65befcb5147877fc1c9fbc640fc0aad3add759b5df79d55d"}, - {file = "coverage-7.6.7-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:3c0317288f032221d35fa4cbc35d9f4923ff0dfd176c79c9b356e8ef8ef2dff4"}, - {file = "coverage-7.6.7-cp311-cp311-win32.whl", hash = "sha256:951aade8297358f3618a6e0660dc74f6b52233c42089d28525749fc8267dccd2"}, - {file = "coverage-7.6.7-cp311-cp311-win_amd64.whl", hash = "sha256:5e444b8e88339a2a67ce07d41faabb1d60d1004820cee5a2c2b54e2d8e429a0f"}, - {file = "coverage-7.6.7-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:f07ff574986bc3edb80e2c36391678a271d555f91fd1d332a1e0f4b5ea4b6ea9"}, - {file = "coverage-7.6.7-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:49ed5ee4109258973630c1f9d099c7e72c5c36605029f3a91fe9982c6076c82b"}, - {file = "coverage-7.6.7-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f3e8796434a8106b3ac025fd15417315d7a58ee3e600ad4dbcfddc3f4b14342c"}, - {file = "coverage-7.6.7-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a3b925300484a3294d1c70f6b2b810d6526f2929de954e5b6be2bf8caa1f12c1"}, - {file = "coverage-7.6.7-cp312-cp312-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3c42ec2c522e3ddd683dec5cdce8e62817afb648caedad9da725001fa530d354"}, - {file = "coverage-7.6.7-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:0266b62cbea568bd5e93a4da364d05de422110cbed5056d69339bd5af5685433"}, - {file = "coverage-7.6.7-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:e5f2a0f161d126ccc7038f1f3029184dbdf8f018230af17ef6fd6a707a5b881f"}, - {file = "coverage-7.6.7-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:c132b5a22821f9b143f87446805e13580b67c670a548b96da945a8f6b4f2efbb"}, - {file = "coverage-7.6.7-cp312-cp312-win32.whl", hash = "sha256:7c07de0d2a110f02af30883cd7dddbe704887617d5c27cf373362667445a4c76"}, - {file = "coverage-7.6.7-cp312-cp312-win_amd64.whl", hash = "sha256:fd49c01e5057a451c30c9b892948976f5d38f2cbd04dc556a82743ba8e27ed8c"}, - {file = "coverage-7.6.7-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:46f21663e358beae6b368429ffadf14ed0a329996248a847a4322fb2e35d64d3"}, - {file = "coverage-7.6.7-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:40cca284c7c310d622a1677f105e8507441d1bb7c226f41978ba7c86979609ab"}, - {file = "coverage-7.6.7-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:77256ad2345c29fe59ae861aa11cfc74579c88d4e8dbf121cbe46b8e32aec808"}, - {file = "coverage-7.6.7-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:87ea64b9fa52bf395272e54020537990a28078478167ade6c61da7ac04dc14bc"}, - {file = "coverage-7.6.7-cp313-cp313-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2d608a7808793e3615e54e9267519351c3ae204a6d85764d8337bd95993581a8"}, - {file = "coverage-7.6.7-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:cdd94501d65adc5c24f8a1a0eda110452ba62b3f4aeaba01e021c1ed9cb8f34a"}, - {file = "coverage-7.6.7-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:82c809a62e953867cf57e0548c2b8464207f5f3a6ff0e1e961683e79b89f2c55"}, - {file = "coverage-7.6.7-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:bb684694e99d0b791a43e9fc0fa58efc15ec357ac48d25b619f207c41f2fd384"}, - {file = "coverage-7.6.7-cp313-cp313-win32.whl", hash = "sha256:963e4a08cbb0af6623e61492c0ec4c0ec5c5cf74db5f6564f98248d27ee57d30"}, - {file = "coverage-7.6.7-cp313-cp313-win_amd64.whl", hash = "sha256:14045b8bfd5909196a90da145a37f9d335a5d988a83db34e80f41e965fb7cb42"}, - {file = "coverage-7.6.7-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:f2c7a045eef561e9544359a0bf5784b44e55cefc7261a20e730baa9220c83413"}, - {file = "coverage-7.6.7-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:5dd4e4a49d9c72a38d18d641135d2fb0bdf7b726ca60a103836b3d00a1182acd"}, - {file = "coverage-7.6.7-cp313-cp313t-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5c95e0fa3d1547cb6f021ab72f5c23402da2358beec0a8e6d19a368bd7b0fb37"}, - {file = "coverage-7.6.7-cp313-cp313t-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f63e21ed474edd23f7501f89b53280014436e383a14b9bd77a648366c81dce7b"}, - {file = "coverage-7.6.7-cp313-cp313t-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ead9b9605c54d15be228687552916c89c9683c215370c4a44f1f217d2adcc34d"}, - {file = "coverage-7.6.7-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:0573f5cbf39114270842d01872952d301027d2d6e2d84013f30966313cadb529"}, - {file = "coverage-7.6.7-cp313-cp313t-musllinux_1_2_i686.whl", hash = "sha256:e2c8e3384c12dfa19fa9a52f23eb091a8fad93b5b81a41b14c17c78e23dd1d8b"}, - {file = "coverage-7.6.7-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:70a56a2ec1869e6e9fa69ef6b76b1a8a7ef709972b9cc473f9ce9d26b5997ce3"}, - {file = "coverage-7.6.7-cp313-cp313t-win32.whl", hash = "sha256:dbba8210f5067398b2c4d96b4e64d8fb943644d5eb70be0d989067c8ca40c0f8"}, - {file = "coverage-7.6.7-cp313-cp313t-win_amd64.whl", hash = "sha256:dfd14bcae0c94004baba5184d1c935ae0d1231b8409eb6c103a5fd75e8ecdc56"}, - {file = "coverage-7.6.7-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:37a15573f988b67f7348916077c6d8ad43adb75e478d0910957394df397d2874"}, - {file = "coverage-7.6.7-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:b6cce5c76985f81da3769c52203ee94722cd5d5889731cd70d31fee939b74bf0"}, - {file = "coverage-7.6.7-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a1ab9763d291a17b527ac6fd11d1a9a9c358280adb320e9c2672a97af346ac2c"}, - {file = "coverage-7.6.7-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6cf96ceaa275f071f1bea3067f8fd43bec184a25a962c754024c973af871e1b7"}, - {file = "coverage-7.6.7-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:aee9cf6b0134d6f932d219ce253ef0e624f4fa588ee64830fcba193269e4daa3"}, - {file = "coverage-7.6.7-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:2bc3e45c16564cc72de09e37413262b9f99167803e5e48c6156bccdfb22c8327"}, - {file = "coverage-7.6.7-cp39-cp39-musllinux_1_2_i686.whl", hash = "sha256:623e6965dcf4e28a3debaa6fcf4b99ee06d27218f46d43befe4db1c70841551c"}, - {file = "coverage-7.6.7-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:850cfd2d6fc26f8346f422920ac204e1d28814e32e3a58c19c91980fa74d8289"}, - {file = "coverage-7.6.7-cp39-cp39-win32.whl", hash = "sha256:c296263093f099da4f51b3dff1eff5d4959b527d4f2f419e16508c5da9e15e8c"}, - {file = "coverage-7.6.7-cp39-cp39-win_amd64.whl", hash = "sha256:90746521206c88bdb305a4bf3342b1b7316ab80f804d40c536fc7d329301ee13"}, - {file = "coverage-7.6.7-pp39.pp310-none-any.whl", hash = "sha256:0ddcb70b3a3a57581b450571b31cb774f23eb9519c2aaa6176d3a84c9fc57671"}, - {file = "coverage-7.6.7.tar.gz", hash = "sha256:d79d4826e41441c9a118ff045e4bccb9fdbdcb1d02413e7ea6eb5c87b5439d24"}, + {file = "coverage-7.6.8-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:b39e6011cd06822eb964d038d5dff5da5d98652b81f5ecd439277b32361a3a50"}, + {file = "coverage-7.6.8-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:63c19702db10ad79151a059d2d6336fe0c470f2e18d0d4d1a57f7f9713875dcf"}, + {file = "coverage-7.6.8-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3985b9be361d8fb6b2d1adc9924d01dec575a1d7453a14cccd73225cb79243ee"}, + {file = "coverage-7.6.8-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:644ec81edec0f4ad17d51c838a7d01e42811054543b76d4ba2c5d6af741ce2a6"}, + {file = "coverage-7.6.8-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1f188a2402f8359cf0c4b1fe89eea40dc13b52e7b4fd4812450da9fcd210181d"}, + {file = "coverage-7.6.8-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:e19122296822deafce89a0c5e8685704c067ae65d45e79718c92df7b3ec3d331"}, + {file = "coverage-7.6.8-cp310-cp310-musllinux_1_2_i686.whl", hash = "sha256:13618bed0c38acc418896005732e565b317aa9e98d855a0e9f211a7ffc2d6638"}, + {file = "coverage-7.6.8-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:193e3bffca48ad74b8c764fb4492dd875038a2f9925530cb094db92bb5e47bed"}, + {file = "coverage-7.6.8-cp310-cp310-win32.whl", hash = "sha256:3988665ee376abce49613701336544041f2117de7b7fbfe91b93d8ff8b151c8e"}, + {file = "coverage-7.6.8-cp310-cp310-win_amd64.whl", hash = "sha256:f56f49b2553d7dd85fd86e029515a221e5c1f8cb3d9c38b470bc38bde7b8445a"}, + {file = "coverage-7.6.8-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:86cffe9c6dfcfe22e28027069725c7f57f4b868a3f86e81d1c62462764dc46d4"}, + {file = "coverage-7.6.8-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:d82ab6816c3277dc962cfcdc85b1efa0e5f50fb2c449432deaf2398a2928ab94"}, + {file = "coverage-7.6.8-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:13690e923a3932e4fad4c0ebfb9cb5988e03d9dcb4c5150b5fcbf58fd8bddfc4"}, + {file = "coverage-7.6.8-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:4be32da0c3827ac9132bb488d331cb32e8d9638dd41a0557c5569d57cf22c9c1"}, + {file = "coverage-7.6.8-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:44e6c85bbdc809383b509d732b06419fb4544dca29ebe18480379633623baafb"}, + {file = "coverage-7.6.8-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:768939f7c4353c0fac2f7c37897e10b1414b571fd85dd9fc49e6a87e37a2e0d8"}, + {file = "coverage-7.6.8-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:e44961e36cb13c495806d4cac67640ac2866cb99044e210895b506c26ee63d3a"}, + {file = "coverage-7.6.8-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:3ea8bb1ab9558374c0ab591783808511d135a833c3ca64a18ec927f20c4030f0"}, + {file = "coverage-7.6.8-cp311-cp311-win32.whl", hash = "sha256:629a1ba2115dce8bf75a5cce9f2486ae483cb89c0145795603d6554bdc83e801"}, + {file = "coverage-7.6.8-cp311-cp311-win_amd64.whl", hash = "sha256:fb9fc32399dca861584d96eccd6c980b69bbcd7c228d06fb74fe53e007aa8ef9"}, + {file = "coverage-7.6.8-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:e683e6ecc587643f8cde8f5da6768e9d165cd31edf39ee90ed7034f9ca0eefee"}, + {file = "coverage-7.6.8-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:1defe91d41ce1bd44b40fabf071e6a01a5aa14de4a31b986aa9dfd1b3e3e414a"}, + {file = "coverage-7.6.8-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d7ad66e8e50225ebf4236368cc43c37f59d5e6728f15f6e258c8639fa0dd8e6d"}, + {file = "coverage-7.6.8-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:3fe47da3e4fda5f1abb5709c156eca207eacf8007304ce3019eb001e7a7204cb"}, + {file = "coverage-7.6.8-cp312-cp312-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:202a2d645c5a46b84992f55b0a3affe4f0ba6b4c611abec32ee88358db4bb649"}, + {file = "coverage-7.6.8-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:4674f0daa1823c295845b6a740d98a840d7a1c11df00d1fd62614545c1583787"}, + {file = "coverage-7.6.8-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:74610105ebd6f33d7c10f8907afed696e79c59e3043c5f20eaa3a46fddf33b4c"}, + {file = "coverage-7.6.8-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:37cda8712145917105e07aab96388ae76e787270ec04bcb9d5cc786d7cbb8443"}, + {file = "coverage-7.6.8-cp312-cp312-win32.whl", hash = "sha256:9e89d5c8509fbd6c03d0dd1972925b22f50db0792ce06324ba069f10787429ad"}, + {file = "coverage-7.6.8-cp312-cp312-win_amd64.whl", hash = "sha256:379c111d3558272a2cae3d8e57e6b6e6f4fe652905692d54bad5ea0ca37c5ad4"}, + {file = "coverage-7.6.8-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:0b0c69f4f724c64dfbfe79f5dfb503b42fe6127b8d479b2677f2b227478db2eb"}, + {file = "coverage-7.6.8-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:c15b32a7aca8038ed7644f854bf17b663bc38e1671b5d6f43f9a2b2bd0c46f63"}, + {file = "coverage-7.6.8-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:63068a11171e4276f6ece913bde059e77c713b48c3a848814a6537f35afb8365"}, + {file = "coverage-7.6.8-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6f4548c5ead23ad13fb7a2c8ea541357474ec13c2b736feb02e19a3085fac002"}, + {file = "coverage-7.6.8-cp313-cp313-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3b4b4299dd0d2c67caaaf286d58aef5e75b125b95615dda4542561a5a566a1e3"}, + {file = "coverage-7.6.8-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:c9ebfb2507751f7196995142f057d1324afdab56db1d9743aab7f50289abd022"}, + {file = "coverage-7.6.8-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:c1b4474beee02ede1eef86c25ad4600a424fe36cff01a6103cb4533c6bf0169e"}, + {file = "coverage-7.6.8-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:d9fd2547e6decdbf985d579cf3fc78e4c1d662b9b0ff7cc7862baaab71c9cc5b"}, + {file = "coverage-7.6.8-cp313-cp313-win32.whl", hash = "sha256:8aae5aea53cbfe024919715eca696b1a3201886ce83790537d1c3668459c7146"}, + {file = "coverage-7.6.8-cp313-cp313-win_amd64.whl", hash = "sha256:ae270e79f7e169ccfe23284ff5ea2d52a6f401dc01b337efb54b3783e2ce3f28"}, + {file = "coverage-7.6.8-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:de38add67a0af869b0d79c525d3e4588ac1ffa92f39116dbe0ed9753f26eba7d"}, + {file = "coverage-7.6.8-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:b07c25d52b1c16ce5de088046cd2432b30f9ad5e224ff17c8f496d9cb7d1d451"}, + {file = "coverage-7.6.8-cp313-cp313t-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:62a66ff235e4c2e37ed3b6104d8b478d767ff73838d1222132a7a026aa548764"}, + {file = "coverage-7.6.8-cp313-cp313t-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:09b9f848b28081e7b975a3626e9081574a7b9196cde26604540582da60235fdf"}, + {file = "coverage-7.6.8-cp313-cp313t-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:093896e530c38c8e9c996901858ac63f3d4171268db2c9c8b373a228f459bbc5"}, + {file = "coverage-7.6.8-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:9a7b8ac36fd688c8361cbc7bf1cb5866977ece6e0b17c34aa0df58bda4fa18a4"}, + {file = "coverage-7.6.8-cp313-cp313t-musllinux_1_2_i686.whl", hash = "sha256:38c51297b35b3ed91670e1e4efb702b790002e3245a28c76e627478aa3c10d83"}, + {file = "coverage-7.6.8-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:2e4e0f60cb4bd7396108823548e82fdab72d4d8a65e58e2c19bbbc2f1e2bfa4b"}, + {file = "coverage-7.6.8-cp313-cp313t-win32.whl", hash = "sha256:6535d996f6537ecb298b4e287a855f37deaf64ff007162ec0afb9ab8ba3b8b71"}, + {file = "coverage-7.6.8-cp313-cp313t-win_amd64.whl", hash = "sha256:c79c0685f142ca53256722a384540832420dff4ab15fec1863d7e5bc8691bdcc"}, + {file = "coverage-7.6.8-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:3ac47fa29d8d41059ea3df65bd3ade92f97ee4910ed638e87075b8e8ce69599e"}, + {file = "coverage-7.6.8-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:24eda3a24a38157eee639ca9afe45eefa8d2420d49468819ac5f88b10de84f4c"}, + {file = "coverage-7.6.8-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e4c81ed2820b9023a9a90717020315e63b17b18c274a332e3b6437d7ff70abe0"}, + {file = "coverage-7.6.8-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:bd55f8fc8fa494958772a2a7302b0354ab16e0b9272b3c3d83cdb5bec5bd1779"}, + {file = "coverage-7.6.8-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f39e2f3530ed1626c66e7493be7a8423b023ca852aacdc91fb30162c350d2a92"}, + {file = "coverage-7.6.8-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:716a78a342679cd1177bc8c2fe957e0ab91405bd43a17094324845200b2fddf4"}, + {file = "coverage-7.6.8-cp39-cp39-musllinux_1_2_i686.whl", hash = "sha256:177f01eeaa3aee4a5ffb0d1439c5952b53d5010f86e9d2667963e632e30082cc"}, + {file = "coverage-7.6.8-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:912e95017ff51dc3d7b6e2be158dedc889d9a5cc3382445589ce554f1a34c0ea"}, + {file = "coverage-7.6.8-cp39-cp39-win32.whl", hash = "sha256:4db3ed6a907b555e57cc2e6f14dc3a4c2458cdad8919e40b5357ab9b6db6c43e"}, + {file = "coverage-7.6.8-cp39-cp39-win_amd64.whl", hash = "sha256:428ac484592f780e8cd7b6b14eb568f7c85460c92e2a37cb0c0e5186e1a0d076"}, + {file = "coverage-7.6.8-pp39.pp310-none-any.whl", hash = "sha256:5c52a036535d12590c32c49209e79cabaad9f9ad8aa4cbd875b68c4d67a9cbce"}, + {file = "coverage-7.6.8.tar.gz", hash = "sha256:8b2b8503edb06822c86d82fa64a4a5cb0760bb8f31f26e138ec743f422f37cfc"}, ] [package.dependencies] @@ -751,13 +752,13 @@ pyyaml = ">=5.1" [[package]] name = "mkdocs-material" -version = "9.5.45" +version = "9.5.46" description = "Documentation that simply works" optional = false python-versions = ">=3.8" files = [ - {file = "mkdocs_material-9.5.45-py3-none-any.whl", hash = "sha256:a9be237cfd0be14be75f40f1726d83aa3a81ce44808dc3594d47a7a592f44547"}, - {file = "mkdocs_material-9.5.45.tar.gz", hash = "sha256:286489cf0beca4a129d91d59d6417419c63bceed1ce5cd0ec1fc7e1ebffb8189"}, + {file = "mkdocs_material-9.5.46-py3-none-any.whl", hash = "sha256:98f0a2039c62e551a68aad0791a8d41324ff90c03a6e6cea381a384b84908b83"}, + {file = "mkdocs_material-9.5.46.tar.gz", hash = "sha256:ae2043f4238e572f9a40e0b577f50400d6fc31e2fef8ea141800aebf3bd273d7"}, ] [package.dependencies] @@ -1761,4 +1762,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "fb817b5731f1847ba088912b2654b09a083be1dd1c70f59d7edfa61cd3607351" +content-hash = "6ebc30facbae1ff72fa99b1e741ad7bd1e9c366334caec913ac7a7ed0872a794" diff --git a/pyproject.toml b/pyproject.toml index 7ad832b..fc8519e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,7 +42,7 @@ aiorun = "^2024.5.1" jedi = "^0.19.1" mkdocs = "^1.1.2" uvicorn = "<1.0" -commitizen = "^3.29.1" +commitizen = "^4" fastapi = "^0.115.0" mkdocs-material = "^9.5.39" starlette-prometheus = "^0.10.0" diff --git a/scripts/bench b/scripts/bench new file mode 100755 index 0000000..36025a1 --- /dev/null +++ b/scripts/bench @@ -0,0 +1,12 @@ +# When you run the bench script, it will create a "bench" report under the `.benchmarks` folder. +# This way we can keep track of the regressions in performance. +# Normally, you would run the benchmarks locally, and commit them to git, and then the +# normal `test` script, can check for an accepted regression penalty. +#!/bin/bash -e + +export PREFIX="" +if [ -d '.venv' ] ; then + export PREFIX=".venv/bin/" +fi + +${PREFIX}pytest tests/test_benchmarks.py --benchmark-autosave From 3083d43f866c053e63d6ee8e9414cd86a337051f Mon Sep 17 00:00:00 2001 From: Santiago Fraire Date: Sat, 17 Sep 2022 09:45:33 +0200 Subject: [PATCH 2/2] feat: add dependency injection framework --- .github/workflows/bench-release.yml | 4 +- .github/workflows/pr-tests.yaml | 40 ++++++-- README.md | 1 + kstreams/__init__.py | 6 ++ kstreams/_di/binders/api.py | 68 +++++++++++++ kstreams/_di/binders/header.py | 44 +++++++++ kstreams/_di/dependencies/core.py | 113 +++++++++++++++++++++ kstreams/_di/dependencies/hooks.py | 31 ++++++ kstreams/_di/parameters.py | 37 +++++++ kstreams/engine.py | 12 ++- kstreams/exceptions.py | 3 + kstreams/middleware/di_middleware.py | 30 ++++++ kstreams/middleware/middleware.py | 29 +++++- kstreams/middleware/udf_middleware.py | 3 + kstreams/streams.py | 17 +++- kstreams/types.py | 3 +- kstreams/typing.py | 8 ++ poetry.lock | 130 +++++++++++++++++++++++- pyproject.toml | 4 +- scripts/bench-compare | 2 +- scripts/test | 4 +- tests/_di/test_dependency_manager.py | 109 ++++++++++++++++++++ tests/_di/test_hooks.py | 75 ++++++++++++++ tests/_di/test_param_headers.py | 76 ++++++++++++++ tests/conftest.py | 61 +++++++++++- tests/test_benchmarks.py | 26 +++++ tests/test_monitor.py | 137 ++++++++++++++------------ tests/test_streams.py | 1 - 28 files changed, 981 insertions(+), 93 deletions(-) create mode 100644 kstreams/_di/binders/api.py create mode 100644 kstreams/_di/binders/header.py create mode 100644 kstreams/_di/dependencies/core.py create mode 100644 kstreams/_di/dependencies/hooks.py create mode 100644 kstreams/_di/parameters.py create mode 100644 kstreams/middleware/di_middleware.py create mode 100644 kstreams/typing.py create mode 100644 tests/_di/test_dependency_manager.py create mode 100644 tests/_di/test_hooks.py create mode 100644 tests/_di/test_param_headers.py diff --git a/.github/workflows/bench-release.yml b/.github/workflows/bench-release.yml index 1d15de8..4bea850 100644 --- a/.github/workflows/bench-release.yml +++ b/.github/workflows/bench-release.yml @@ -1,4 +1,4 @@ -name: Bump version +name: Benchmark latest release on: push: @@ -46,5 +46,5 @@ jobs: git config --global user.email "action@github.com" git config --global user.name "GitHub Action" git add .benchmarks/ - git commit -m "bench: bench: add benchmark current release" + git commit -m "bench: current release" git push origin master diff --git a/.github/workflows/pr-tests.yaml b/.github/workflows/pr-tests.yaml index e4920cd..6d6756f 100644 --- a/.github/workflows/pr-tests.yaml +++ b/.github/workflows/pr-tests.yaml @@ -17,7 +17,7 @@ on: required: true jobs: - build_test_bench: + test: runs-on: ubuntu-latest strategy: matrix: @@ -56,11 +56,6 @@ jobs: git config --global user.email "action@github.com" git config --global user.name "GitHub Action" ./scripts/test - - - name: Benchmark regression test - run: | - ./scripts/bench-compare - - name: Upload coverage to Codecov uses: codecov/codecov-action@v5.0.2 with: @@ -68,3 +63,36 @@ jobs: name: kstreams fail_ci_if_error: true token: ${{secrets.CODECOV_TOKEN}} + bench: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + - name: Setup python + uses: actions/setup-python@v5 + with: + python-version: '3.13' + architecture: x64 + - name: Set Cache + uses: actions/cache@v4 + id: cache # name for referring later + with: + path: .venv/ + # The cache key depends on poetry.lock + key: ${{ runner.os }}-cache-${{ hashFiles('poetry.lock') }} + restore-keys: | + ${{ runner.os }}-cache- + ${{ runner.os }}- + - name: Install Dependencies + # if: steps.cache.outputs.cache-hit != 'true' + run: | + python -m pip install -U pip poetry + poetry --version + poetry config --local virtualenvs.in-project true + poetry install + - name: Benchmark regression test + run: | + ./scripts/bench-current + ./scripts/bench-compare + diff --git a/README.md b/README.md index cfbb6dc..4f80f04 100644 --- a/README.md +++ b/README.md @@ -75,6 +75,7 @@ if __name__ == "__main__": - [ ] Store (kafka streams pattern) - [ ] Stream Join - [ ] Windowing +- [ ] PEP 593 ## Development diff --git a/kstreams/__init__.py b/kstreams/__init__.py index 54e8247..c8b045e 100644 --- a/kstreams/__init__.py +++ b/kstreams/__init__.py @@ -1,5 +1,7 @@ from aiokafka.structs import RecordMetadata, TopicPartition +from ._di.parameters import FromHeader, Header +from .backends.kafka import Kafka from .clients import Consumer, Producer from .create import StreamEngine, create_engine from .prometheus.monitor import PrometheusMonitor, PrometheusMonitorType @@ -31,4 +33,8 @@ "TestStreamClient", "TopicPartition", "TopicPartitionOffset", + "Kafka", + "StreamDependencyManager", + "FromHeader", + "Header", ] diff --git a/kstreams/_di/binders/api.py b/kstreams/_di/binders/api.py new file mode 100644 index 0000000..02b959e --- /dev/null +++ b/kstreams/_di/binders/api.py @@ -0,0 +1,68 @@ +import inspect +from typing import Any, AsyncIterator, Awaitable, Protocol, TypeVar, Union + +from di.api.dependencies import CacheKey +from di.dependent import Dependent, Marker + +from kstreams.types import ConsumerRecord + + +class ExtractorTrait(Protocol): + """Implement to extract data from incoming `ConsumerRecord`. + + Consumers will always work with a consumer Record. + Implementing this would let you extract information from the `ConsumerRecord`. + """ + + def __hash__(self) -> int: + """Required by di in order to cache the deps""" + ... + + def __eq__(self, __o: object) -> bool: + """Required by di in order to cache the deps""" + ... + + async def extract( + self, consumer_record: ConsumerRecord + ) -> Union[Awaitable[Any], AsyncIterator[Any]]: + """This is where the magic should happen. + + For example, you could "extract" here a json from the `ConsumerRecord.value` + """ + ... + + +T = TypeVar("T", covariant=True) + + +class MarkerTrait(Protocol[T]): + def register_parameter(self, param: inspect.Parameter) -> T: ... + + +class Binder(Dependent[Any]): + def __init__( + self, + *, + extractor: ExtractorTrait, + ) -> None: + super().__init__(call=extractor.extract, scope="consumer_record") + self.extractor = extractor + + @property + def cache_key(self) -> CacheKey: + return self.extractor + + +class BinderMarker(Marker): + """Bind together the different dependencies. + + NETX: Add asyncapi marker here, like `MarkerTrait[AsyncApiTrait]`. + Recommendation to wait until 3.0: + - [#618](https://github.com/asyncapi/spec/issues/618) + """ + + def __init__(self, *, extractor_marker: MarkerTrait[ExtractorTrait]) -> None: + self.extractor_marker = extractor_marker + + def register_parameter(self, param: inspect.Parameter) -> Binder: + return Binder(extractor=self.extractor_marker.register_parameter(param)) diff --git a/kstreams/_di/binders/header.py b/kstreams/_di/binders/header.py new file mode 100644 index 0000000..c0f46de --- /dev/null +++ b/kstreams/_di/binders/header.py @@ -0,0 +1,44 @@ +import inspect +from typing import Any, NamedTuple, Optional + +from kstreams.exceptions import HeaderNotFound +from kstreams.types import ConsumerRecord + + +class HeaderExtractor(NamedTuple): + name: str + + def __hash__(self) -> int: + return hash((self.__class__, self.name)) + + def __eq__(self, __o: object) -> bool: + return isinstance(__o, HeaderExtractor) and __o.name == self.name + + async def extract(self, consumer_record: ConsumerRecord) -> Any: + headers = dict(consumer_record.headers) + try: + header = headers[self.name] + except KeyError as e: + message = ( + f"No header `{self.name}` found.\n" + "Check if your broker is sending the header.\n" + "Try adding a default value to your parameter like `None`.\n" + "Or set `convert_underscores = False`." + ) + raise HeaderNotFound(message) from e + else: + return header + + +class HeaderMarker(NamedTuple): + alias: Optional[str] + convert_underscores: bool + + def register_parameter(self, param: inspect.Parameter) -> HeaderExtractor: + if self.alias is not None: + name = self.alias + elif self.convert_underscores: + name = param.name.replace("_", "-") + else: + name = param.name + return HeaderExtractor(name=name) diff --git a/kstreams/_di/dependencies/core.py b/kstreams/_di/dependencies/core.py new file mode 100644 index 0000000..4503f6e --- /dev/null +++ b/kstreams/_di/dependencies/core.py @@ -0,0 +1,113 @@ +from typing import Any, Callable, Optional + +from di import Container, bind_by_type +from di.dependent import Dependent +from di.executors import AsyncExecutor + +from kstreams._di.dependencies.hooks import bind_by_generic +from kstreams.streams import Stream +from kstreams.types import ConsumerRecord, Send + +LayerFn = Callable[..., Any] + + +class StreamDependencyManager: + """Core of dependency injection on kstreams. + + This is an internal class of kstreams that manages the dependency injection, + as a user you should not use this class directly. + + Attributes: + container: dependency store. + stream: the stream wrapping the user function. Optional to improve testability. + When instanciating this class you must provide a stream, otherwise users + won't be able to use the `stream` parameter in their functions. + send: send object. Optional to improve testability, same as stream. + + Usage: + + stream and send are ommited for simplicity + + ```python + def user_func(cr: ConsumerRecord): + ... + + sdm = StreamDependencyManager() + sdm.solve(user_func) + sdm.execute(consumer_record) + ``` + """ + + container: Container + + def __init__( + self, + container: Optional[Container] = None, + stream: Optional[Stream] = None, + send: Optional[Send] = None, + ): + self.container = container or Container() + self.async_executor = AsyncExecutor() + self.stream = stream + self.send = send + + def solve_user_fn(self, fn: LayerFn) -> None: + """Build the dependency graph for the given function. + + Objects must be injected before this function is called. + + Attributes: + fn: user defined function, using allowed kstreams params + """ + self._register_consumer_record() + + if isinstance(self.stream, Stream): + self._register_stream(self.stream) + + if self.send is not None: + self._register_send(self.send) + + self.solved_user_fn = self.container.solve( + Dependent(fn, scope="consumer_record"), + scopes=["consumer_record", "stream", "application"], + ) + + async def execute(self, consumer_record: ConsumerRecord) -> Any: + """Execute the dependencies graph with external values. + + Attributes: + consumer_record: A kafka record containing `values`, `headers`, etc. + """ + async with self.container.enter_scope("consumer_record") as state: + return await self.solved_user_fn.execute_async( + executor=self.async_executor, + state=state, + values={ConsumerRecord: consumer_record}, + ) + + def _register_stream(self, stream: Stream): + """Register the stream with the container.""" + hook = bind_by_type( + Dependent(lambda: stream, scope="consumer_record", wire=False), Stream + ) + self.container.bind(hook) + + def _register_consumer_record(self): + """Register consumer record with the container. + + We bind_by_generic because we want to bind the `ConsumerRecord` type which + is generic. + + The value must be injected at runtime. + """ + hook = bind_by_generic( + Dependent(ConsumerRecord, scope="consumer_record", wire=False), + ConsumerRecord, + ) + self.container.bind(hook) + + def _register_send(self, send: Send): + hook = bind_by_type( + Dependent(lambda: send, scope="consumer_record", wire=False), Send + ) + self.container.bind(hook) diff --git a/kstreams/_di/dependencies/hooks.py b/kstreams/_di/dependencies/hooks.py new file mode 100644 index 0000000..11e0799 --- /dev/null +++ b/kstreams/_di/dependencies/hooks.py @@ -0,0 +1,31 @@ +import inspect +from typing import Any, get_origin + +from di._container import BindHook +from di._utils.inspect import get_type +from di.api.dependencies import DependentBase + + +def bind_by_generic( + provider: DependentBase[Any], + dependency: type, +) -> BindHook: + """Hook to substitute the matched dependency based on its generic.""" + + def hook( + param: inspect.Parameter | None, dependent: DependentBase[Any] + ) -> DependentBase[Any] | None: + if dependent.call == dependency: + return provider + if param is None: + return None + + type_annotation_option = get_type(param) + if type_annotation_option is None: + return None + type_annotation = type_annotation_option.value + if get_origin(type_annotation) is dependency: + return provider + return None + + return hook diff --git a/kstreams/_di/parameters.py b/kstreams/_di/parameters.py new file mode 100644 index 0000000..b16a1b1 --- /dev/null +++ b/kstreams/_di/parameters.py @@ -0,0 +1,37 @@ +from typing import Optional, TypeVar + +from kstreams._di.binders.api import BinderMarker +from kstreams._di.binders.header import HeaderMarker +from kstreams.typing import Annotated + + +def Header( + *, alias: Optional[str] = None, convert_underscores: bool = True +) -> BinderMarker: + """Construct another type from the headers of a kafka record. + + Args: + alias: Use a different header name + convert_underscores: If True, convert underscores to dashes. + + Usage: + + ```python + from kstream import Header, Annotated + + def user_fn(event_type: Annotated[str, Header(alias="EventType")]): + ... + ``` + """ + header_marker = HeaderMarker(alias=alias, convert_underscores=convert_underscores) + binder = BinderMarker(extractor_marker=header_marker) + return binder + + +T = TypeVar("T") + +FromHeader = Annotated[T, Header()] +FromHeader.__doc__ = """General purpose convenient header type. + +Use `Annotated` to provide custom params. +""" diff --git a/kstreams/engine.py b/kstreams/engine.py index b69095f..5b8b905 100644 --- a/kstreams/engine.py +++ b/kstreams/engine.py @@ -5,13 +5,13 @@ from aiokafka.structs import RecordMetadata +from kstreams.middleware.di_middleware import DependencyInjectionHandler from kstreams.structs import TopicPartitionOffset from .backends.kafka import Kafka from .clients import Consumer, Producer from .exceptions import DuplicateStreamException, EngineNotStartedException from .middleware import Middleware -from .middleware.udf_middleware import UdfHandler from .prometheus.monitor import PrometheusMonitor from .rebalance_listener import MetricsRebalanceListener, RebalanceListener from .serializers import Deserializer, Serializer @@ -389,7 +389,13 @@ def add_stream( stream.rebalance_listener.stream = stream stream.rebalance_listener.engine = self - stream.udf_handler = UdfHandler( + # stream.udf_handler = UdfHandler( + # next_call=stream.func, + # send=self.send, + # stream=stream, + # ) + + stream.udf_handler = DependencyInjectionHandler( next_call=stream.func, send=self.send, stream=stream, @@ -397,7 +403,7 @@ def add_stream( # NOTE: When `no typing` support is deprecated this check can # be removed - if stream.udf_handler.type != UDFType.NO_TYPING: + if stream.udf_handler.get_type() != UDFType.NO_TYPING: stream.func = self._build_stream_middleware_stack(stream=stream) def _build_stream_middleware_stack(self, *, stream: Stream) -> NextMiddlewareCall: diff --git a/kstreams/exceptions.py b/kstreams/exceptions.py index 249f2db..dd5a65d 100644 --- a/kstreams/exceptions.py +++ b/kstreams/exceptions.py @@ -25,3 +25,6 @@ def __str__(self) -> str: class BackendNotSet(StreamException): ... + + +class HeaderNotFound(StreamException): ... diff --git a/kstreams/middleware/di_middleware.py b/kstreams/middleware/di_middleware.py new file mode 100644 index 0000000..2e53f5a --- /dev/null +++ b/kstreams/middleware/di_middleware.py @@ -0,0 +1,30 @@ +import inspect +import typing + +from kstreams import types +from kstreams._di.dependencies.core import StreamDependencyManager +from kstreams.streams_utils import UDFType, setup_type + +from .middleware import BaseMiddleware + + +class DependencyInjectionHandler(BaseMiddleware): + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + + self.dependecy_manager = StreamDependencyManager( + stream=self.stream, send=self.send + ) + + # To be deprecated once streams with type hints are deprecated + signature = inspect.signature(self.next_call) + self.params = list(signature.parameters.values()) + self.type: UDFType = setup_type(self.params) + if self.type == UDFType.WITH_TYPING: + self.dependecy_manager.solve_user_fn(fn=self.next_call) + + def get_type(self) -> UDFType: + return self.type + + async def __call__(self, cr: types.ConsumerRecord) -> typing.Any: + return await self.dependecy_manager.execute(cr) diff --git a/kstreams/middleware/middleware.py b/kstreams/middleware/middleware.py index f5b164b..0173c15 100644 --- a/kstreams/middleware/middleware.py +++ b/kstreams/middleware/middleware.py @@ -4,7 +4,7 @@ import typing from kstreams import types -from kstreams.streams_utils import StreamErrorPolicy +from kstreams.streams_utils import StreamErrorPolicy, UDFType if typing.TYPE_CHECKING: from kstreams import Stream, StreamEngine # pragma: no cover @@ -14,6 +14,10 @@ class MiddlewareProtocol(typing.Protocol): + next_call: types.NextMiddlewareCall + send: types.Send + stream: "Stream" + def __init__( self, *, @@ -44,7 +48,11 @@ def __repr__(self) -> str: return f"{middleware_name}({extra_options})" -class BaseMiddleware: +class BaseMiddleware(MiddlewareProtocol): + next_call: types.NextMiddlewareCall + send: types.Send + stream: "Stream" + def __init__( self, *, @@ -145,3 +153,20 @@ async def cleanup_policy(self, exc: Exception) -> None: await self.engine.stop() await self.stream.is_processing.acquire() signal.raise_signal(signal.SIGTERM) + + # acquire the asyncio.Lock `is_processing` again to resume the processing + # and avoid `RuntimeError: Lock is not acquired.` + await self.stream.is_processing.acquire() + + +class BaseDependcyMiddleware(MiddlewareProtocol, typing.Protocol): + """Base class for Dependency Injection Middleware. + + Both old and new DI middlewares make use of the type. + + The `type` is used to identify the way to call the user defined function. + + On top of that, this middleware helps avoid circular dependencies. + """ + + def get_type(self) -> UDFType: ... diff --git a/kstreams/middleware/udf_middleware.py b/kstreams/middleware/udf_middleware.py index 2bc1f29..f12b1f9 100644 --- a/kstreams/middleware/udf_middleware.py +++ b/kstreams/middleware/udf_middleware.py @@ -21,6 +21,9 @@ def __init__(self, *args, **kwargs) -> None: self.params = list(signature.parameters.values()) self.type: UDFType = setup_type(self.params) + def get_type(self) -> UDFType: + return self.type + def bind_udf_params(self, cr: types.ConsumerRecord) -> typing.List: # NOTE: When `no typing` support is deprecated then this can # be more eficient as the CR will be always there. diff --git a/kstreams/streams.py b/kstreams/streams.py index bf17682..e6112a2 100644 --- a/kstreams/streams.py +++ b/kstreams/streams.py @@ -10,12 +10,12 @@ from kstreams import TopicPartition from kstreams.exceptions import BackendNotSet -from kstreams.middleware.middleware import ExceptionMiddleware +from kstreams.middleware.middleware import BaseDependcyMiddleware, ExceptionMiddleware from kstreams.structs import TopicPartitionOffset from .backends.kafka import Kafka from .clients import Consumer -from .middleware import Middleware, udf_middleware +from .middleware import Middleware from .rebalance_listener import RebalanceListener from .serializers import Deserializer from .streams_utils import StreamErrorPolicy, UDFType @@ -172,11 +172,14 @@ def __init__( self.seeked_initial_offsets = False self.rebalance_listener = rebalance_listener self.middlewares = middlewares or [] - self.udf_handler: typing.Optional[udf_middleware.UdfHandler] = None + self.udf_handler: typing.Optional[BaseDependcyMiddleware] = None self.topics = [topics] if isinstance(topics, str) else topics self.subscribe_by_pattern = subscribe_by_pattern self.error_policy = error_policy + def __name__(self) -> str: + return self.name + def _create_consumer(self) -> Consumer: if self.backend is None: raise BackendNotSet("A backend has not been set for this stream") @@ -342,7 +345,7 @@ async def start(self) -> None: self.running = True if self.udf_handler is not None: - if self.udf_handler.type == UDFType.NO_TYPING: + if self.udf_handler.get_type() == UDFType.NO_TYPING: # deprecated use case msg = ( "Streams with `async for in` loop approach are deprecated.\n" @@ -356,6 +359,10 @@ async def start(self) -> None: await func else: # Typing cases + + # If it's an async generator, then DON'T await the function + # because we want to start ONLY and let the user retrieve the + # values while iterating the stream if not inspect.isasyncgenfunction(self.udf_handler.next_call): # Is not an async_generator, then create `await` the func await self.func_wrapper_with_typing() @@ -436,7 +443,7 @@ async def __anext__(self) -> ConsumerRecord: if ( self.udf_handler is not None - and self.udf_handler.type == UDFType.NO_TYPING + and self.udf_handler.get_type() == UDFType.NO_TYPING ): return cr return await self.func(cr) diff --git a/kstreams/types.py b/kstreams/types.py index 3562f3b..9010772 100644 --- a/kstreams/types.py +++ b/kstreams/types.py @@ -8,8 +8,7 @@ Headers = typing.Dict[str, str] EncodedHeaders = typing.Sequence[typing.Tuple[str, bytes]] -StreamFunc = typing.Callable - +StreamFunc = typing.Callable[..., typing.Any] EngineHooks = typing.Sequence[typing.Callable[[], typing.Any]] diff --git a/kstreams/typing.py b/kstreams/typing.py new file mode 100644 index 0000000..1707d56 --- /dev/null +++ b/kstreams/typing.py @@ -0,0 +1,8 @@ +"""Remove this file when python3.8 support is dropped.""" + +import sys + +if sys.version_info < (3, 9): + from typing_extensions import Annotated as Annotated # noqa: F401 +else: + from typing import Annotated as Annotated # noqa: F401 diff --git a/poetry.lock b/poetry.lock index d5718fe..1c7b738 100644 --- a/poetry.lock +++ b/poetry.lock @@ -416,6 +416,24 @@ files = [ {file = "decli-0.6.2.tar.gz", hash = "sha256:36f71eb55fd0093895efb4f416ec32b7f6e00147dda448e3365cf73ceab42d6f"}, ] +[[package]] +name = "di" +version = "0.79.2" +description = "Dependency injection toolkit" +optional = false +python-versions = ">=3.8,<4" +files = [ + {file = "di-0.79.2-py3-none-any.whl", hash = "sha256:4b2ac7c46d4d9e941ca47d37c2029ba739c1f8a0e19e5288731224870f00d6e6"}, + {file = "di-0.79.2.tar.gz", hash = "sha256:0c65b9ccb984252dadbdcdb39743eeddef0c1f167f791c59fcd70e97bb0d3af8"}, +] + +[package.dependencies] +anyio = {version = ">=3.5.0", optional = true, markers = "extra == \"anyio\""} +graphlib2 = ">=0.4.1,<0.5.0" + +[package.extras] +anyio = ["anyio (>=3.5.0)"] + [[package]] name = "exceptiongroup" version = "1.2.2" @@ -430,6 +448,21 @@ files = [ [package.extras] test = ["pytest (>=6)"] +[[package]] +name = "faker" +version = "33.0.0" +description = "Faker is a Python package that generates fake data for you." +optional = false +python-versions = ">=3.8" +files = [ + {file = "Faker-33.0.0-py3-none-any.whl", hash = "sha256:68e5580cb6b4226710886e595eabc13127149d6e71e9d1db65506a7fbe2c7fce"}, + {file = "faker-33.0.0.tar.gz", hash = "sha256:9b01019c1ddaf2253ca2308c0472116e993f4ad8fc9905f82fa965e0c6f932e9"}, +] + +[package.dependencies] +python-dateutil = ">=2.4" +typing-extensions = "*" + [[package]] name = "fastapi" version = "0.115.5" @@ -478,6 +511,40 @@ python-dateutil = ">=2.8.1" [package.extras] dev = ["flake8", "markdown", "twine", "wheel"] +[[package]] +name = "graphlib2" +version = "0.4.7" +description = "Rust port of the Python stdlib graphlib modules" +optional = false +python-versions = ">=3.7" +files = [ + {file = "graphlib2-0.4.7-cp37-abi3-macosx_10_7_x86_64.whl", hash = "sha256:483710733215783cdc76452ccde1247af8f697685c9c1dfd9bb9ff4f52d990ee"}, + {file = "graphlib2-0.4.7-cp37-abi3-macosx_10_9_x86_64.macosx_11_0_arm64.macosx_10_9_universal2.whl", hash = "sha256:3619c7d3c5aca95e6cbbfc283aa6bf42ffa5b59d7f39c8d0ad615bce65dc406f"}, + {file = "graphlib2-0.4.7-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5b19f1b91d0f22ca3d1cfb2965478db98cf5916a5c6cea5fdc7caf4bf1bfbc33"}, + {file = "graphlib2-0.4.7-cp37-abi3-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:624020f6808ee21ffbb2e455f8dd4196bbb37032a35aa3327f0f5b65fb6a35d1"}, + {file = "graphlib2-0.4.7-cp37-abi3-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:6efc6a197a619a97f1b105aea14b202101241c1db9014bd100ad19cf29288cbf"}, + {file = "graphlib2-0.4.7-cp37-abi3-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:d7cc38b68775cb2cdfc487bbaca2f7991da0d76d42a68f412c2ca61461e6e026"}, + {file = "graphlib2-0.4.7-cp37-abi3-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:b06bed98d42f4e10adfe2a8332efdca06b5bac6e7c86dd1d22a4dea4de9b275a"}, + {file = "graphlib2-0.4.7-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3c9ec3a5645bdf020d8bd9196b2665e26090d60e523fd498df29628f2c5fbecc"}, + {file = "graphlib2-0.4.7-cp37-abi3-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:824df87f767471febfd785a05a2cc77c0c973e0112a548df827763ca0aa8c126"}, + {file = "graphlib2-0.4.7-cp37-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:2de5e32ca5c0b06d442d2be4b378cc0bc335c5fcbc14a7d531a621eb8294d019"}, + {file = "graphlib2-0.4.7-cp37-abi3-musllinux_1_2_armv7l.whl", hash = "sha256:13a23fcf07c7bef8a5ad0e04ab826d3a2a2bcb493197005300c68b4ea7b8f581"}, + {file = "graphlib2-0.4.7-cp37-abi3-musllinux_1_2_i686.whl", hash = "sha256:15a8a6daa28c1fb5c518d387879f3bbe313264fbbc2fab5635b718bc71a24913"}, + {file = "graphlib2-0.4.7-cp37-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:0cb6c4449834077972c3cea4602f86513b4b75fcf2d40b12e4fe4bf1aa5c8da2"}, + {file = "graphlib2-0.4.7-cp37-abi3-win32.whl", hash = "sha256:31b40cea537845d80b69403ae306d7c6a68716b76f5171f68daed1804aadefec"}, + {file = "graphlib2-0.4.7-cp37-abi3-win_amd64.whl", hash = "sha256:d40935a9da81a046ebcaa0216ad593ef504ae8a5425a59bdbd254c0462adedc8"}, + {file = "graphlib2-0.4.7-pp37-pypy37_pp73-macosx_10_7_x86_64.whl", hash = "sha256:9cef08a50632e75a9e11355e68fa1f8c9371d0734642855f8b5c4ead1b058e6f"}, + {file = "graphlib2-0.4.7-pp37-pypy37_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:aeecb604d70317c20ca6bc3556f7f5c40146ad1f0ded837e978b2fe6edf3e567"}, + {file = "graphlib2-0.4.7-pp37-pypy37_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:cb4ae9df7ed895c6557619049c9f73e1c2e6d1fbed568010fd5d4af94e2f0692"}, + {file = "graphlib2-0.4.7-pp38-pypy38_pp73-macosx_10_7_x86_64.whl", hash = "sha256:3ee3a99fc39df948fef340b01254709cc603263f8b176f72ed26f1eea44070a4"}, + {file = "graphlib2-0.4.7-pp38-pypy38_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5873480df8991273bd1585122df232acd0f946c401c254bd9f0d661c72589dcf"}, + {file = "graphlib2-0.4.7-pp38-pypy38_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:297c817229501255cd3a744c62c8f91e5139ee79bc550488f5bc765ffa33f7c5"}, + {file = "graphlib2-0.4.7-pp39-pypy39_pp73-macosx_10_7_x86_64.whl", hash = "sha256:853ef22df8e9f695706e0b8556cda9342d4d617f7d7bd02803e824bcc0c30b20"}, + {file = "graphlib2-0.4.7-pp39-pypy39_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ee62ff1042fde980adf668e30393eca79aee8f1fa1274ab3b98d69091c70c5e8"}, + {file = "graphlib2-0.4.7-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b16e21e70938132d4160c2591fed59f79b5f8b702e4860c8933111b5fedb55c2"}, + {file = "graphlib2-0.4.7.tar.gz", hash = "sha256:a951c18cb4c2c2834eec898b4c75d3f930d6f08beb37496f0e0ce56eb3f571f5"}, +] + [[package]] name = "griffe" version = "1.5.1" @@ -1023,13 +1090,13 @@ files = [ [[package]] name = "pydantic" -version = "2.10.1" +version = "2.10.2" description = "Data validation using Python type hints" optional = false python-versions = ">=3.8" files = [ - {file = "pydantic-2.10.1-py3-none-any.whl", hash = "sha256:a8d20db84de64cf4a7d59e899c2caf0fe9d660c7cfc482528e7020d7dd189a7e"}, - {file = "pydantic-2.10.1.tar.gz", hash = "sha256:a4daca2dc0aa429555e0656d6bf94873a7dc5f54ee42b1f5873d666fb3f35560"}, + {file = "pydantic-2.10.2-py3-none-any.whl", hash = "sha256:cfb96e45951117c3024e6b67b25cdc33a3cb7b2fa62e239f7af1378358a1d99e"}, + {file = "pydantic-2.10.2.tar.gz", hash = "sha256:2bc2d7f17232e0841cbba4641e65ba1eb6fafb3a08de3a091ff3ce14a197c4fa"}, ] [package.dependencies] @@ -1153,6 +1220,38 @@ files = [ [package.dependencies] typing-extensions = ">=4.6.0,<4.7.0 || >4.7.0" +[[package]] +name = "pygal" +version = "3.0.5" +description = "A Python svg graph plotting library" +optional = false +python-versions = ">=3.8" +files = [ + {file = "pygal-3.0.5-py3-none-any.whl", hash = "sha256:a3268a5667b470c8fbbb0eca7e987561a7321caeba589d40e4c1bc16dbe71393"}, + {file = "pygal-3.0.5.tar.gz", hash = "sha256:c0a0f34e5bc1c01975c2bfb8342ad521e293ad42e525699dd00c4d7a52c14b71"}, +] + +[package.dependencies] +importlib-metadata = "*" + +[package.extras] +docs = ["pygal-sphinx-directives", "sphinx", "sphinx-rtd-theme"] +lxml = ["lxml"] +moulinrouge = ["flask", "pygal-maps-ch", "pygal-maps-fr", "pygal-maps-world"] +png = ["cairosvg"] +test = ["cairosvg", "coveralls", "lxml", "pyquery", "pytest", "pytest-cov", "ruff (>=0.5.6)"] + +[[package]] +name = "pygaljs" +version = "1.0.2" +description = "Python package providing assets from https://github.com/Kozea/pygal.js" +optional = false +python-versions = "*" +files = [ + {file = "pygaljs-1.0.2-py2.py3-none-any.whl", hash = "sha256:d75e18cb21cc2cda40c45c3ee690771e5e3d4652bf57206f20137cf475c0dbe8"}, + {file = "pygaljs-1.0.2.tar.gz", hash = "sha256:0b71ee32495dcba5fbb4a0476ddbba07658ad65f5675e4ad409baf154dec5111"}, +] + [[package]] name = "pygments" version = "2.18.0" @@ -1238,7 +1337,10 @@ files = [ [package.dependencies] py-cpuinfo = "*" +pygal = {version = "*", optional = true, markers = "extra == \"histogram\""} +pygaljs = {version = "*", optional = true, markers = "extra == \"histogram\""} pytest = ">=8.1" +setuptools = {version = "*", optional = true, markers = "extra == \"histogram\""} [package.extras] aspect = ["aspectlib"] @@ -1532,6 +1634,26 @@ files = [ {file = "ruff-0.7.4.tar.gz", hash = "sha256:cd12e35031f5af6b9b93715d8c4f40360070b2041f81273d0527683d5708fce2"}, ] +[[package]] +name = "setuptools" +version = "75.6.0" +description = "Easily download, build, install, upgrade, and uninstall Python packages" +optional = false +python-versions = ">=3.9" +files = [ + {file = "setuptools-75.6.0-py3-none-any.whl", hash = "sha256:ce74b49e8f7110f9bf04883b730f4765b774ef3ef28f722cce7c273d253aaf7d"}, + {file = "setuptools-75.6.0.tar.gz", hash = "sha256:8199222558df7c86216af4f84c30e9b34a61d8ba19366cc914424cdbd28252f6"}, +] + +[package.extras] +check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1)", "ruff (>=0.7.0)"] +core = ["importlib_metadata (>=6)", "jaraco.collections", "jaraco.functools (>=4)", "jaraco.text (>=3.7)", "more_itertools", "more_itertools (>=8.8)", "packaging", "packaging (>=24.2)", "platformdirs (>=4.2.2)", "tomli (>=2.0.1)", "wheel (>=0.43.0)"] +cover = ["pytest-cov"] +doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "pyproject-hooks (!=1.1)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier", "towncrier (<24.7)"] +enabler = ["pytest-enabler (>=2.2)"] +test = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "jaraco.test (>=5.5)", "packaging (>=24.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.*)", "pytest-home (>=0.5)", "pytest-perf", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel (>=0.44.0)"] +type = ["importlib_metadata (>=7.0.2)", "jaraco.develop (>=7.21)", "mypy (>=1.12,<1.14)", "pytest-mypy"] + [[package]] name = "six" version = "1.16.0" @@ -1762,4 +1884,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "6ebc30facbae1ff72fa99b1e741ad7bd1e9c366334caec913ac7a7ed0872a794" +content-hash = "c2d58e5f2bf2508c5a867d61d8f8dfaf79133e6d6b93f45fc4920547252b5e6f" diff --git a/pyproject.toml b/pyproject.toml index fc8519e..9fb34f1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,11 +29,12 @@ prometheus-client = "<1.0" future = "^1.0.0" PyYAML = ">=5.4,<7.0.0" pydantic = ">=2.0.0,<3.0.0" +di = {version = "^0.79.2", extras = ["anyio"]} [tool.poetry.group.dev.dependencies] pytest = "^8.3.3" pytest-asyncio = "^0.24.0" -pytest-benchmark = "^5.1.0" +pytest-benchmark = { version = "^5.1.0", extras = ["histogram"] } pytest-cov = "^6" pytest-httpserver = "^1.1.0" mypy = "^1.11.2" @@ -48,6 +49,7 @@ mkdocs-material = "^9.5.39" starlette-prometheus = "^0.10.0" codecov = "^2.1.12" mkdocstrings = { version = "^0.27", extras = ["python"] } +Faker = "^33" [build-system] requires = ["poetry-core>=1.0.0"] diff --git a/scripts/bench-compare b/scripts/bench-compare index 99e4482..4f58d6d 100755 --- a/scripts/bench-compare +++ b/scripts/bench-compare @@ -16,4 +16,4 @@ if [ -d '.venv' ] ; then fi # Commented out until after merge, so there will be date to compare with. -# ${PREFIX}pytest tests/test_benchmarks.py --benchmark-compare --benchmark-compare-fail=min:5% +${PREFIX}pytest tests/test_benchmarks.py --benchmark-compare --benchmark-compare-fail=mean:5% diff --git a/scripts/test b/scripts/test index a20ebb1..5cff726 100755 --- a/scripts/test +++ b/scripts/test @@ -5,7 +5,7 @@ if [ -d '.venv' ] ; then export PREFIX=".venv/bin/" fi -${PREFIX}pytest -x --cov-report term-missing --cov-report=xml:coverage.xml --cov=kstreams ${1-"./tests"} $2 +${PREFIX}pytest --cov-report term-missing --cov-report=xml:coverage.xml --cov=kstreams ${1-"./tests"} $2 ${PREFIX}ruff check kstreams tests ${PREFIX}ruff format --check kstreams tests examples -${PREFIX}mypy kstreams/ +${PREFIX}mypy kstreams/ tests diff --git a/tests/_di/test_dependency_manager.py b/tests/_di/test_dependency_manager.py new file mode 100644 index 0000000..820059c --- /dev/null +++ b/tests/_di/test_dependency_manager.py @@ -0,0 +1,109 @@ +from typing import Any, AsyncGenerator, Generator + +import pytest + +from kstreams._di.dependencies.core import StreamDependencyManager +from kstreams.streams import Stream +from kstreams.types import ConsumerRecord + + +class AppWrapper: + """This is a fake class used to check if the ConsumerRecord is injected""" + + def __init__(self) -> None: + self.foo = "bar" + + async def consume(self, cr: ConsumerRecord) -> str: + return self.foo + + +@pytest.fixture +def di_cr(rand_consumer_record) -> Generator[ConsumerRecord, Any, None]: + """Dependency injected ConsumerRecord""" + yield rand_consumer_record() + + +async def test_cr_is_injected(di_cr: ConsumerRecord): + async def user_fn(cr: ConsumerRecord) -> str: + cr.value = "hello" + return cr.value + + stream_manager = StreamDependencyManager() + stream_manager._register_consumer_record() + stream_manager.solve_user_fn(user_fn) + content = await stream_manager.execute(di_cr) + assert content == "hello" + + +async def test_cr_is_injected_in_class(di_cr: ConsumerRecord): + app = AppWrapper() + stream_manager = StreamDependencyManager() + stream_manager._register_consumer_record() + stream_manager.solve_user_fn(app.consume) + content = await stream_manager.execute(di_cr) + assert content == app.foo + + +async def test_cr_generics_is_injected(di_cr: ConsumerRecord): + async def user_fn(cr: ConsumerRecord[Any, Any]) -> str: + cr.value = "hello" + return cr.value + + stream_manager = StreamDependencyManager() + stream_manager._register_consumer_record() + stream_manager.solve_user_fn(user_fn) + content = await stream_manager.execute(di_cr) + assert content == "hello" + + +async def test_cr_generics_str_is_injected(di_cr: ConsumerRecord): + async def user_fn(cr: ConsumerRecord[str, str]) -> str: + cr.value = "hello" + return cr.value + + stream_manager = StreamDependencyManager() + stream_manager._register_consumer_record() + stream_manager.solve_user_fn(user_fn) + content = await stream_manager.execute(di_cr) + assert content == "hello" + + +async def test_cr_with_generator(di_cr: ConsumerRecord): + async def user_fn(cr: ConsumerRecord) -> AsyncGenerator[str, None]: + cr.value = "hello" + yield cr.value + + stream_manager = StreamDependencyManager() + stream_manager._register_consumer_record() + stream_manager.solve_user_fn(user_fn) + content = await stream_manager.execute(di_cr) + + assert content == "hello" + + +async def test_stream(di_cr: ConsumerRecord): + async def user_fn(stream: Stream) -> str: + return stream.name + + stream = Stream("my-topic", func=user_fn, name="stream_name") + stream_manager = StreamDependencyManager() + stream_manager._register_stream(stream) + stream_manager._register_consumer_record() + stream_manager.solve_user_fn(user_fn) + content = await stream_manager.execute(di_cr) + assert content == "stream_name" + + +async def test_stream_and_consumer_record(di_cr: ConsumerRecord): + async def user_fn(stream: Stream, record: ConsumerRecord) -> tuple[str, str]: + return (stream.name, record.topic) + + stream = Stream("my-topic", func=user_fn, name="stream_name") + stream_manager = StreamDependencyManager() + stream_manager._register_stream(stream) + stream_manager._register_consumer_record() + stream_manager.solve_user_fn(user_fn) + (stream_name, topic_name) = await stream_manager.execute(di_cr) + + assert stream_name == "stream_name" + assert topic_name == di_cr.topic diff --git a/tests/_di/test_hooks.py b/tests/_di/test_hooks.py new file mode 100644 index 0000000..30e83ad --- /dev/null +++ b/tests/_di/test_hooks.py @@ -0,0 +1,75 @@ +import typing + +import pytest +from di import Container +from di.dependent import Dependent +from di.executors import SyncExecutor + +from kstreams._di.dependencies.hooks import bind_by_generic + +KT = typing.TypeVar("KT") +VT = typing.TypeVar("VT") + + +class Record(typing.Generic[KT, VT]): + def __init__(self, key: KT, value: VT): + self.key = key + self.value = value + + +def func_hinted(record: Record[str, int]) -> Record[str, int]: + return record + + +def func_base(record: Record) -> Record: + return record + + +@pytest.mark.parametrize( + "func", + [ + func_hinted, + func_base, + ], +) +def test_bind_generic_ok(func: typing.Callable): + dep = Dependent(func) + container = Container() + container.bind( + bind_by_generic( + Dependent(lambda: Record("foo", 1), wire=False), + Record, + ) + ) + solved = container.solve(dep, scopes=[None]) + with container.enter_scope(None) as state: + instance = solved.execute_sync(executor=SyncExecutor(), state=state) + assert isinstance(instance, Record) + + +def func_str(record: str) -> str: + return record + + +@pytest.mark.parametrize( + "func", + [ + func_str, + ], +) +def test_bind_generic_unrelated(func: typing.Callable): + dep = Dependent(func) + container = Container() + container.bind( + bind_by_generic( + Dependent(lambda: Record("foo", 1), wire=False), + Record, + ) + ) + solved = container.solve(dep, scopes=[None]) + with container.enter_scope(None) as state: + instance = solved.execute_sync(executor=SyncExecutor(), state=state) + print(type(instance)) + print(instance) + assert not isinstance(instance, Record) + assert isinstance(instance, str) diff --git a/tests/_di/test_param_headers.py b/tests/_di/test_param_headers.py new file mode 100644 index 0000000..cef9496 --- /dev/null +++ b/tests/_di/test_param_headers.py @@ -0,0 +1,76 @@ +from typing import Callable + +import pytest + +from kstreams import FromHeader, Header +from kstreams._di.dependencies.core import StreamDependencyManager +from kstreams.exceptions import HeaderNotFound +from kstreams.types import ConsumerRecord +from kstreams.typing import Annotated + +RandConsumerRecordFixture = Callable[..., ConsumerRecord] + + +async def test_from_headers_ok(rand_consumer_record: RandConsumerRecordFixture): + cr = rand_consumer_record(headers=(("event-type", "hello"),)) + + async def user_fn(event_type: FromHeader[str]) -> str: + return event_type + + stream_manager = StreamDependencyManager() + stream_manager.solve_user_fn(user_fn) + header_content = await stream_manager.execute(cr) + assert header_content == "hello" + + +async def test_from_header_not_found(rand_consumer_record: RandConsumerRecordFixture): + cr = rand_consumer_record(headers=(("event-type", "hello"),)) + + def user_fn(a_header: FromHeader[str]) -> str: + return a_header + + stream_manager = StreamDependencyManager() + stream_manager.solve_user_fn(user_fn) + with pytest.raises(HeaderNotFound): + await stream_manager.execute(cr) + + +@pytest.mark.xfail(reason="not implemenetd yet") +async def test_from_headers_numbers(rand_consumer_record: RandConsumerRecordFixture): + cr = rand_consumer_record(headers=(("event-type", "1"),)) + + async def user_fn(event_type: FromHeader[int]) -> int: + return event_type + + stream_manager = StreamDependencyManager() + stream_manager.solve_user_fn(user_fn) + header_content = await stream_manager.execute(cr) + assert header_content == 1 + + +async def test_headers_alias(rand_consumer_record: RandConsumerRecordFixture): + cr = rand_consumer_record(headers=(("EventType", "hello"),)) + + async def user_fn(event_type: Annotated[int, Header(alias="EventType")]) -> int: + return event_type + + stream_manager = StreamDependencyManager() + stream_manager.solve_user_fn(user_fn) + header_content = await stream_manager.execute(cr) + assert header_content == "hello" + + +async def test_headers_convert_underscores( + rand_consumer_record: RandConsumerRecordFixture, +): + cr = rand_consumer_record(headers=(("event_type", "hello"),)) + + async def user_fn( + event_type: Annotated[int, Header(convert_underscores=False)], + ) -> int: + return event_type + + stream_manager = StreamDependencyManager() + stream_manager.solve_user_fn(user_fn) + header_content = await stream_manager.execute(cr) + assert header_content == "hello" diff --git a/tests/conftest.py b/tests/conftest.py index e1bbc43..3ccd649 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,15 +1,22 @@ import asyncio +import logging from collections import namedtuple from dataclasses import field -from typing import Any, Dict, List, NamedTuple, Optional, Sequence, Tuple +from typing import Any, Callable, Dict, List, NamedTuple, Optional, Sequence, Tuple import pytest import pytest_asyncio +from faker import Faker from pytest_httpserver import HTTPServer from kstreams import clients, create_engine +from kstreams.types import ConsumerRecord from kstreams.utils import create_ssl_context_from_mem +# Silence faker DEBUG logs +logger = logging.getLogger("faker") +logger.setLevel(logging.INFO) + class RecordMetadata(NamedTuple): offset: int = 1 @@ -243,3 +250,55 @@ def _(): benchmark(func, *args, **kwargs) return _wrapper + + +@pytest.fixture +def fake(): + return Faker() + + +@pytest.fixture() +def rand_consumer_record(fake: Faker) -> Callable[..., ConsumerRecord]: + """A random consumer record generator. + + You can inject this fixture in your test, + and then you can override the default values. + + Example: + + ```python + def test_my_consumer(rand_consumer_record): + rand_cr = rand_consumer_record() + custom_attrs_cr = rand_consumer_record(topic="my-topic", value="my-value") + # ... + ``` + """ + + def generate( + topic: Optional[str] = None, + headers: Optional[Sequence[Tuple[str, bytes]]] = None, + partition: Optional[int] = None, + offset: Optional[int] = None, + timestamp: Optional[int] = None, + timestamp_type: Optional[int] = None, + key: Optional[Any] = None, + value: Optional[Any] = None, + checksum: Optional[int] = None, + serialized_key_size: Optional[int] = None, + serialized_value_size: Optional[int] = None, + ) -> ConsumerRecord: + return ConsumerRecord( + topic=topic or fake.slug(), + headers=headers or tuple(), + partition=partition or fake.pyint(max_value=10), + offset=offset or fake.pyint(max_value=99999999), + timestamp=timestamp or int(fake.unix_time()), + timestamp_type=timestamp_type or 1, + key=key or fake.pystr(), + value=value or fake.pystr().encode(), + checksum=checksum, + serialized_key_size=serialized_key_size or fake.pyint(max_value=10), + serialized_value_size=serialized_value_size or fake.pyint(max_value=10), + ) + + return generate diff --git a/tests/test_benchmarks.py b/tests/test_benchmarks.py index 592a99e..3d6801d 100644 --- a/tests/test_benchmarks.py +++ b/tests/test_benchmarks.py @@ -124,3 +124,29 @@ async def consume_bench(cr: ConsumerRecord, stream: Stream, send: Send): consume_bench, crs, ) + + +if __name__ == "__main__": + """Section added for profiling purposes.""" + import asyncio + + from kstreams.create import create_engine + + stream_engine = create_engine( + title="test-engine", + ) + cr = ConsumerRecord( + topic="local--kstreams", + value="hello", + partition=0, + offset=0, + key="foo", + timestamp=0, + timestamp_type=0, + checksum=None, + serialized_key_size=0, + serialized_value_size=0, + headers=[], + ) + r= asyncio.run(bench_startup_and_processing_single_consumer_record(stream_engine, cr)) + print(r) \ No newline at end of file diff --git a/tests/test_monitor.py b/tests/test_monitor.py index 0b94361..bb013d9 100644 --- a/tests/test_monitor.py +++ b/tests/test_monitor.py @@ -20,62 +20,68 @@ async def my_coroutine(_): stream_engine.add_stream(stream=stream) await stream.start() + assert stream.consumer is not None await stream_engine.monitor.generate_consumer_metrics(stream.consumer) consumer = stream.consumer for topic_partition in consumer.assignment(): # super ugly notation but for now is the only way to get the metrics met_committed = ( - stream_engine.monitor.MET_COMMITTED.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] + list( + stream_engine.monitor.MET_COMMITTED.labels( + topic=topic_partition.topic, + partition=topic_partition.partition, + consumer_group=consumer._group_id, + ).collect() + )[0] .samples[0] .value ) met_position = ( - stream_engine.monitor.MET_POSITION.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] + list( + stream_engine.monitor.MET_POSITION.labels( + topic=topic_partition.topic, + partition=topic_partition.partition, + consumer_group=consumer._group_id, + ).collect() + )[0] .samples[0] .value ) met_highwater = ( - stream_engine.monitor.MET_HIGHWATER.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] + list( + stream_engine.monitor.MET_HIGHWATER.labels( + topic=topic_partition.topic, + partition=topic_partition.partition, + consumer_group=consumer._group_id, + ).collect() + )[0] .samples[0] .value ) met_lag = ( - stream_engine.monitor.MET_LAG.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] + list( + stream_engine.monitor.MET_LAG.labels( + topic=topic_partition.topic, + partition=topic_partition.partition, + consumer_group=consumer._group_id, + ).collect() + )[0] .samples[0] .value ) met_position_lag = ( - stream_engine.monitor.MET_POSITION_LAG.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] + list( + stream_engine.monitor.MET_POSITION_LAG.labels( + topic=topic_partition.topic, + partition=topic_partition.partition, + consumer_group=consumer._group_id, + ).collect() + )[0] .samples[0] .value ) @@ -135,56 +141,61 @@ async def my_coroutine(_): for topic_partition in consumer.assignment(): # super ugly notation but for now is the only way to get the metrics met_committed = ( - stream_engine.monitor.MET_COMMITTED.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] + list( + stream_engine.monitor.MET_COMMITTED.labels( + topic=topic_partition.topic, + partition=topic_partition.partition, + consumer_group=consumer._group_id, + ).collect() + )[0] .samples[0] .value ) met_position = ( - stream_engine.monitor.MET_POSITION.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] + list( + stream_engine.monitor.MET_POSITION.labels( + topic=topic_partition.topic, + partition=topic_partition.partition, + consumer_group=consumer._group_id, + ).collect() + )[0] .samples[0] .value ) met_highwater = ( - stream_engine.monitor.MET_HIGHWATER.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] + list( + stream_engine.monitor.MET_HIGHWATER.labels( + topic=topic_partition.topic, + partition=topic_partition.partition, + consumer_group=consumer._group_id, + ).collect() + )[0] .samples[0] .value ) met_lag = ( - stream_engine.monitor.MET_LAG.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] + list( + stream_engine.monitor.MET_LAG.labels( + topic=topic_partition.topic, + partition=topic_partition.partition, + consumer_group=consumer._group_id, + ).collect() + )[0] .samples[0] .value ) met_position_lag = ( - stream_engine.monitor.MET_POSITION_LAG.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] + list( + stream_engine.monitor.MET_POSITION_LAG.labels( + topic=topic_partition.topic, + partition=topic_partition.partition, + consumer_group=consumer._group_id, + ).collect() + )[0] .samples[0] .value ) @@ -200,9 +211,9 @@ async def my_coroutine(_): met_position_lag == consumer.highwater(topic_partition) - consumer_position ) - assert len(stream_engine.monitor.MET_POSITION_LAG.collect()[0].samples) == 2 + assert len(list(stream_engine.monitor.MET_POSITION_LAG.collect())[0].samples) == 2 await stream_engine.remove_stream(stream) - assert len(stream_engine.monitor.MET_POSITION_LAG.collect()[0].samples) == 0 + assert len(list(stream_engine.monitor.MET_POSITION_LAG.collect())[0].samples) == 0 @pytest.mark.asyncio @@ -223,6 +234,6 @@ async def my_coroutine(_): stream_engine.add_stream(stream=stream) await stream.start() - assert len(stream_engine.monitor.MET_POSITION_LAG.collect()[0].samples) == 0 + assert len(list(stream_engine.monitor.MET_POSITION_LAG.collect())[0].samples) == 0 await stream_engine.remove_stream(stream) assert "Metrics for consumer with group-id: my-group not found" in caplog.text diff --git a/tests/test_streams.py b/tests/test_streams.py index 9f05a5a..29cdd22 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -339,7 +339,6 @@ async def streaming_fn(_): Consumer.stop.assert_awaited() -@pytest.mark.asyncio async def test_stream_decorates_properly(stream_engine: StreamEngine): topic = "local--hello-kpn"