diff --git a/dask_planner/Cargo.lock b/dask_planner/Cargo.lock index f94498d8c..9c42d0a9a 100644 --- a/dask_planner/Cargo.lock +++ b/dask_planner/Cargo.lock @@ -343,9 +343,9 @@ dependencies = [ [[package]] name = "block-buffer" -version = "0.10.3" +version = "0.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69cce20737498f97b993470a6e536b8523f0af7892a4f928cceb1ac5e52ebe7e" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" dependencies = [ "generic-array", ] @@ -427,9 +427,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.23" +version = "0.4.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16b0a3d9ed01224b22057780a37bb8c5dbfe1be8ba48678e7bf57ec4b385411f" +checksum = "4e3c5919066adf22df73762e50cffcde3a758f2a848b113b586d1f86728b673b" dependencies = [ "iana-time-zone", "num-integer", @@ -482,9 +482,9 @@ dependencies = [ [[package]] name = "constant_time_eq" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3ad85c1f65dc7b37604eb0e89748faf0b9653065f2a8ef69f96a687ec1e9279" +checksum = "13418e745008f7349ec7e449155f419a61b92b58a99cc3616942b926825ec76b" [[package]] name = "core-foundation-sys" @@ -528,9 +528,9 @@ dependencies = [ [[package]] name = "csv" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af91f40b7355f82b0a891f50e70399475945bb0b0da4f1700ce60761c9d3e359" +checksum = "0b015497079b9a9d69c02ad25de6c0a6edef051ea6360a327d0bd05802ef64ad" dependencies = [ "csv-core", "itoa", @@ -549,9 +549,9 @@ dependencies = [ [[package]] name = "cxx" -version = "1.0.91" +version = "1.0.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86d3488e7665a7a483b57e25bdd90d0aeb2bc7608c8d0346acf2ad3f1caf1d62" +checksum = "9a140f260e6f3f79013b8bfc65e7ce630c9ab4388c6a89c71e07226f49487b72" dependencies = [ "cc", "cxxbridge-flags", @@ -561,9 +561,9 @@ dependencies = [ [[package]] name = "cxx-build" -version = "1.0.91" +version = "1.0.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48fcaf066a053a41a81dfb14d57d99738b767febb8b735c3016e469fac5da690" +checksum = "da6383f459341ea689374bf0a42979739dc421874f112ff26f829b8040b8e613" dependencies = [ "cc", "codespan-reporting", @@ -576,15 +576,15 @@ dependencies = [ [[package]] name = "cxxbridge-flags" -version = "1.0.91" +version = "1.0.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2ef98b8b717a829ca5603af80e1f9e2e48013ab227b68ef37872ef84ee479bf" +checksum = "90201c1a650e95ccff1c8c0bb5a343213bdd317c6e600a93075bca2eff54ec97" [[package]] name = "cxxbridge-macro" -version = "1.0.91" +version = "1.0.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "086c685979a698443656e5cf7856c95c642295a38599f12fb1ff76fb28d19892" +checksum = "0b75aed41bb2e6367cae39e6326ef817a851db13c13e4f3263714ca3cfb8de56" dependencies = [ "proc-macro2", "quote", @@ -868,9 +868,9 @@ dependencies = [ [[package]] name = "futures" -version = "0.3.26" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13e2792b0ff0340399d58445b88fd9770e3489eff258a4cbc1523418f12abf84" +checksum = "531ac96c6ff5fd7c62263c5e3c67a603af4fcaee2e1a0ae5565ba3a11e69e549" dependencies = [ "futures-channel", "futures-core", @@ -883,9 +883,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.26" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e5317663a9089767a1ec00a487df42e0ca174b61b4483213ac24448e4664df5" +checksum = "164713a5a0dcc3e7b4b1ed7d3b433cabc18025386f9339346e8daf15963cf7ac" dependencies = [ "futures-core", "futures-sink", @@ -893,15 +893,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.26" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec90ff4d0fe1f57d600049061dc6bb68ed03c7d2fbd697274c41805dcb3f8608" +checksum = "86d7a0c1aa76363dac491de0ee99faf6941128376f1cf96f07db7603b7de69dd" [[package]] name = "futures-executor" -version = "0.3.26" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8de0a35a6ab97ec8869e32a2473f4b1324459e14c29275d14b10cb1fd19b50e" +checksum = "1997dd9df74cdac935c76252744c1ed5794fac083242ea4fe77ef3ed60ba0f83" dependencies = [ "futures-core", "futures-task", @@ -910,15 +910,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.26" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfb8371b6fb2aeb2d280374607aeabfc99d95c72edfe51692e42d3d7f0d08531" +checksum = "89d422fa3cbe3b40dca574ab087abb5bc98258ea57eea3fd6f1fa7162c778b91" [[package]] name = "futures-macro" -version = "0.3.26" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95a73af87da33b5acf53acfebdc339fe592ecf5357ac7c0a7734ab9d8c876a70" +checksum = "3eb14ed937631bd8b8b8977f2c198443447a8355b6e3ca599f38c975e5a963b6" dependencies = [ "proc-macro2", "quote", @@ -927,21 +927,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.26" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f310820bb3e8cfd46c80db4d7fb8353e15dfff853a127158425f31e0be6c8364" +checksum = "ec93083a4aecafb2a80a885c9de1f0ccae9dbd32c2bb54b0c3a65690e0b8d2f2" [[package]] name = "futures-task" -version = "0.3.26" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dcf79a1bf610b10f42aea489289c5a2c478a786509693b80cd39c44ccd936366" +checksum = "fd65540d33b37b16542a0438c12e6aeead10d4ac5d05bd3f805b8f35ab592879" [[package]] name = "futures-util" -version = "0.3.26" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c1d6de3acfef38d2be4b1f543f553131788603495be83da675e180c8d6b7bd1" +checksum = "3ef6b17e481503ec85211fed8f39d1970f128935ca1f814cd32ac4a6842e84ab" dependencies = [ "futures-channel", "futures-core", @@ -1101,19 +1101,20 @@ checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" [[package]] name = "io-lifetimes" -version = "1.0.5" +version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1abeb7a0dd0f8181267ff8adc397075586500b81b28a73e8a0208b00fc170fb3" +checksum = "76e86b86ae312accbf05ade23ce76b625e0e47a255712b7414037385a1c05380" dependencies = [ + "hermit-abi 0.3.1", "libc", "windows-sys 0.45.0", ] [[package]] name = "is-terminal" -version = "0.4.3" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22e18b0a45d56fe973d6db23972bf5bc46f988a4a2385deac9cc29572f09daef" +checksum = "21b6b32576413a8e69b90e952e4a026476040d81017b80445deda5f2d3921857" dependencies = [ "hermit-abi 0.3.1", "io-lifetimes", @@ -1132,15 +1133,15 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.5" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fad582f4b9e86b6caa621cabeb0963332d92eea04729ab12892c2533951e6440" +checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6" [[package]] name = "jobserver" -version = "0.1.25" +version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "068b1ee6743e4d11fb9c6a1e6064b3693a1b600e7f5f5988047d98b3dc9fb90b" +checksum = "936cfd212a0155903bcbc060e316fb6cc7cbf2e1907329391ebadc1fe0ce77c2" dependencies = [ "libc", ] @@ -1226,9 +1227,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.139" +version = "0.2.140" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "201de327520df007757c1f0adce6e827fe8562fbc28bfd9c15571c66ca1f5f79" +checksum = "99227334921fae1a979cf0bfdfcc6b3e5ce376ef57e16fb6fb3ea2ed6095f80c" [[package]] name = "libm" @@ -1442,9 +1443,9 @@ dependencies = [ [[package]] name = "object_store" -version = "0.5.4" +version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f344e51ec9584d2f51199c0c29c6f73dddd04ade986497875bf8fa2f178caf0" +checksum = "e1ea8f683b4f89a64181393742c041520a1a87e9775e6b4c0dd5a3281af05fc6" dependencies = [ "async-trait", "bytes", @@ -1533,9 +1534,9 @@ dependencies = [ [[package]] name = "paste" -version = "1.0.11" +version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d01a5bd0424d00070b0098dd17ebca6f961a959dead1dbcbbbc1d1cd8d3deeba" +checksum = "9f746c4065a8fa3fe23974dd82f15431cc8d40779821001404d10d2e79ca7d79" [[package]] name = "percent-encoding" @@ -1575,9 +1576,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" [[package]] name = "proc-macro2" -version = "1.0.51" +version = "1.0.52" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d727cae5b39d21da60fa540906919ad737832fe0b1c165da3a34d6548c849d6" +checksum = "1d0e1ae9e836cc3beddd63db0df682593d7e2d3d891ae8c9083d2113e1744224" dependencies = [ "unicode-ident", ] @@ -1644,9 +1645,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.23" +version = "1.0.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8856d8364d252a14d474036ea1358d63c9e6965c8e5c1885c18f73d70bff9c7b" +checksum = "4424af4bf778aae2051a77b60283332f386554255d722233d09fbfc7e30da2fc" dependencies = [ "proc-macro2", ] @@ -1707,15 +1708,6 @@ version = "0.6.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "456c603be3e8d448b072f410900c09faf164fbce2d480456f50eea6e25f9c848" -[[package]] -name = "remove_dir_all" -version = "0.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" -dependencies = [ - "winapi", -] - [[package]] name = "rustc_version" version = "0.4.0" @@ -1727,9 +1719,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.36.8" +version = "0.36.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f43abb88211988493c1abb44a70efa56ff0ce98f233b7b276146f1f3f7ba9644" +checksum = "fd5c6ff11fecd55b40746d1995a02f2eb375bf8c00d192d521ee09f42bef37bc" dependencies = [ "bitflags", "errno", @@ -1741,15 +1733,15 @@ dependencies = [ [[package]] name = "rustversion" -version = "1.0.11" +version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5583e89e108996506031660fe09baa5011b9dd0341b89029313006d1fb508d70" +checksum = "4f3208ce4d8448b3f3e7d168a73f5e0c43a61e32930de3bceeccedb388b6bf06" [[package]] name = "ryu" -version = "1.0.12" +version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b4b9743ed687d4b4bcedf9ff5eaa7398495ae14e61cba0a295704edbc7decde" +checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041" [[package]] name = "same-file" @@ -1768,33 +1760,33 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" [[package]] name = "scratch" -version = "1.0.3" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddccb15bcce173023b3fedd9436f882a0739b8dfb45e4f6b6002bee5929f61b2" +checksum = "1792db035ce95be60c3f8853017b3999209281c24e2ba5bc8e59bf97a0c590c1" [[package]] name = "semver" -version = "1.0.16" +version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58bc9567378fc7690d6b2addae4e60ac2eeea07becb2c64b9f218b53865cba2a" +checksum = "bebd363326d05ec3e2f532ab7660680f3b02130d780c299bca73469d521bc0ed" [[package]] name = "seq-macro" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1685deded9b272198423bdbdb907d8519def2f26cf3699040e54e8c4fbd5c5ce" +checksum = "e6b44e8fc93a14e66336d230954dda83d18b4605ccace8fe09bc7514a71ad0bc" [[package]] name = "serde" -version = "1.0.152" +version = "1.0.156" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb7d1f0d3021d347a83e556fc4683dea2ea09d87bccdf88ff5c12545d89d5efb" +checksum = "314b5b092c0ade17c00142951e50ced110ec27cea304b1037c6969246c2469a4" [[package]] name = "serde_json" -version = "1.0.93" +version = "1.0.94" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cad406b69c91885b5107daf2c29572f6c8cdb3c66826821e286c533490c0bc76" +checksum = "1c533a59c9d8a93a09c6ab31f0fd5e5f4dd1b8fc9434804029839884765d04ea" dependencies = [ "itoa", "ryu", @@ -1909,9 +1901,9 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" [[package]] name = "syn" -version = "1.0.107" +version = "1.0.109" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f4064b5b16e03ae50984a5a8ed5d4f8803e6bc1fd170a3cda91a1be4b18e3f5" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" dependencies = [ "proc-macro2", "quote", @@ -1926,16 +1918,15 @@ checksum = "8ae9980cab1db3fceee2f6c6f643d5d8de2997c58ee8d25fb0cc8a9e9e7348e5" [[package]] name = "tempfile" -version = "3.3.0" +version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cdb1ef4eaeeaddc8fbd371e5017057064af0911902ef36b39801f67cc6d79e4" +checksum = "af18f7ae1acd354b992402e9ec5864359d693cd8a79dcbef59f76891701c1e95" dependencies = [ "cfg-if", "fastrand", - "libc", "redox_syscall", - "remove_dir_all", - "winapi", + "rustix", + "windows-sys 0.42.0", ] [[package]] @@ -1984,9 +1975,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.25.0" +version = "1.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8e00990ebabbe4c14c08aca901caed183ecd5c09562a12c824bb53d3c3fd3af" +checksum = "03201d01c3c27a29c8a5cee5b55a93ddae1ccf6f08f65365c2c918f8c1b76f64" dependencies = [ "autocfg", "bytes", @@ -1995,7 +1986,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "tokio-macros", - "windows-sys 0.42.0", + "windows-sys 0.45.0", ] [[package]] @@ -2083,15 +2074,15 @@ checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" [[package]] name = "unicode-bidi" -version = "0.3.10" +version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d54675592c1dbefd78cbd98db9bacd89886e1ca50692a0692baefffdeb92dd58" +checksum = "524b68aca1d05e03fdf03fcdce2c6c94b6daf6d16861ddaa7e4f2b6638a9052c" [[package]] name = "unicode-ident" -version = "1.0.6" +version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84a22b9f218b40614adcb3f4ff08b703773ad44fa9423e4e0d346d5db86e4ebc" +checksum = "e5464a87b239f13a63a501f2701565754bae92d243d4bb7eb12f6d57d2269bf4" [[package]] name = "unicode-normalization" @@ -2148,12 +2139,11 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" [[package]] name = "walkdir" -version = "2.3.2" +version = "2.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56" +checksum = "36df944cda56c7d8d8b7496af378e6b16de9284591917d307c9b4d313c44e698" dependencies = [ "same-file", - "winapi", "winapi-util", ] @@ -2274,9 +2264,9 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.42.1" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e2522491fbfcd58cc84d47aeb2958948c4b8982e9a2d8a2a35bbaed431390e7" +checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" dependencies = [ "windows_aarch64_gnullvm", "windows_aarch64_msvc", @@ -2289,45 +2279,45 @@ dependencies = [ [[package]] name = "windows_aarch64_gnullvm" -version = "0.42.1" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c9864e83243fdec7fc9c5444389dcbbfd258f745e7853198f365e3c4968a608" +checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" [[package]] name = "windows_aarch64_msvc" -version = "0.42.1" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c8b1b673ffc16c47a9ff48570a9d85e25d265735c503681332589af6253c6c7" +checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" [[package]] name = "windows_i686_gnu" -version = "0.42.1" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de3887528ad530ba7bdbb1faa8275ec7a1155a45ffa57c37993960277145d640" +checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" [[package]] name = "windows_i686_msvc" -version = "0.42.1" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf4d1122317eddd6ff351aa852118a2418ad4214e6613a50e0191f7004372605" +checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" [[package]] name = "windows_x86_64_gnu" -version = "0.42.1" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1040f221285e17ebccbc2591ffdc2d44ee1f9186324dd3e84e99ac68d699c45" +checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" [[package]] name = "windows_x86_64_gnullvm" -version = "0.42.1" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "628bfdf232daa22b0d64fdb62b09fcc36bb01f05a3939e20ab73aaf9470d0463" +checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" [[package]] name = "windows_x86_64_msvc" -version = "0.42.1" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "447660ad36a13288b1db4d4248e857b510e8c3a225c822ba4fb748c0aafecffd" +checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" [[package]] name = "xz2" diff --git a/dask_planner/src/error.rs b/dask_planner/src/error.rs index d5b0eb39c..d3ac4132b 100644 --- a/dask_planner/src/error.rs +++ b/dask_planner/src/error.rs @@ -12,6 +12,7 @@ pub enum DaskPlannerError { ParserError(ParserError), TokenizerError(TokenizerError), Internal(String), + InvalidIOFilter(String), } impl Display for DaskPlannerError { @@ -21,6 +22,7 @@ impl Display for DaskPlannerError { Self::ParserError(e) => write!(f, "SQL Parser Error: {e}"), Self::TokenizerError(e) => write!(f, "SQL Tokenizer Error: {e}"), Self::Internal(e) => write!(f, "Internal Error: {e}"), + Self::InvalidIOFilter(e) => write!(f, "Invalid pyarrow filter: {e} encountered. Defaulting to Dask CPU/GPU bound task operation"), } } } diff --git a/dask_planner/src/sql/logical/table_scan.rs b/dask_planner/src/sql/logical/table_scan.rs index 537f011cc..44e97da78 100644 --- a/dask_planner/src/sql/logical/table_scan.rs +++ b/dask_planner/src/sql/logical/table_scan.rs @@ -1,10 +1,11 @@ use std::sync::Arc; -use datafusion_common::DFSchema; -use datafusion_expr::{logical_plan::TableScan, LogicalPlan}; +use datafusion_common::{DFSchema, ScalarValue}; +use datafusion_expr::{logical_plan::TableScan, Expr, LogicalPlan}; use pyo3::prelude::*; use crate::{ + error::{DaskPlannerError, Result}, expression::{py_expr_list, PyExpr}, sql::exceptions::py_type_err, }; @@ -16,6 +17,169 @@ pub struct PyTableScan { input: Arc, } +#[pyclass(name = "FilteredResult", module = "dask_planner", subclass)] +#[derive(Debug, Clone)] +pub struct PyFilteredResult { + // Exprs that cannot be successfully passed down to the IO layer for filtering and must still be filtered using Dask operations + #[pyo3(get)] + pub io_unfilterable_exprs: Vec, + #[pyo3(get)] + pub filtered_exprs: Vec<(String, String, String)>, +} + +impl PyTableScan { + /// Transform the singular Expr instance into its DNF form serialized in a Vec instance. Possibly recursively expanding + /// it as well if needed. + /// + /// Ex: BinaryExpr("column_name", Operator::Eq, "something") -> vec!["column_name", "=", "something"] + /// Ex: Expr::Column("column_name") -> vec!["column_name"] + /// Ex: Expr::Literal(Utf-8("something")) -> vec!["something"] + pub fn _expand_dnf_filter(filter: &Expr) -> Result> { + let mut filter_tuple: Vec<(String, String, String)> = Vec::new(); + + match filter { + Expr::BinaryExpr(binary_expr) => { + println!( + "!!!BinaryExpr -> Left: {:?}, Operator: {:?}, Right: {:?}", + binary_expr.left, binary_expr.op, binary_expr.right + ); + + // Since Tuples are immutable in Rust we need a datastructure to temporaily hold the Tuples values until all have been parsed + let mut tmp_vals: Vec = Vec::new(); + + // Push left Expr string value or combo of expanded left values + match &*binary_expr.left { + Expr::BinaryExpr(binary_expr) => { + // if let Ok(tup) = filter_tuple.append( + // &mut PyTableScan::_expand_dnf_filter(&Expr::BinaryExpr( + // binary_expr.clone(), + // )) + // .unwrap(), + // ); + + if let Ok(mut tup) = + PyTableScan::_expand_dnf_filter(&Expr::BinaryExpr(binary_expr.clone())) + { + filter_tuple.append(&mut tup); + } else { + // Error so add to list of expressions that cannot be handled by PyArrow + // SIMPLY DO NOT PUSH THE VALUES HERE AND IT WILL CAUSE THE ERROR TO BE PROPAGATED + // REMOVE LATER BUT LEAVE FOR MORE TO MAKE INTENT MORE EXPLICIT. COMPILER OPTIMIZES THIS AWAY ANYWAY + } + } + _ => { + let str = binary_expr.left.to_string(); + if str.contains('.') { + tmp_vals.push(str.split('.').nth(1).unwrap().to_string()); + } else { + tmp_vals.push(str); + } + } + } + + // Handle the operator here. This controls if the format is conjunctive or disjuntive + tmp_vals.push(binary_expr.op.to_string()); + + match &*binary_expr.right { + Expr::BinaryExpr(binary_expr) => { + filter_tuple.append( + &mut PyTableScan::_expand_dnf_filter(&Expr::BinaryExpr( + binary_expr.clone(), + )) + .unwrap(), + ); + } + _ => match &*binary_expr.right { + Expr::Literal(scalar_value) => match &scalar_value { + ScalarValue::Utf8(value) => { + let val = value.as_ref().unwrap(); + if val.contains('.') { + tmp_vals.push(val.split('.').nth(1).unwrap().to_string()); + } else { + tmp_vals.push(val.clone()); + } + } + ScalarValue::Int8(v) => tmp_vals.push(format!("Int({})", v.unwrap())), + ScalarValue::Int16(v) => tmp_vals.push(format!("Int({})", v.unwrap())), + ScalarValue::Int32(v) => tmp_vals.push(format!("Int({})", v.unwrap())), + ScalarValue::Int64(v) => tmp_vals.push(format!("Int({})", v.unwrap())), + ScalarValue::TimestampNanosecond(_val, _an_option) => { + // // Need to encode the value as a String to return to Python, Python side will then convert + // // value back to a integer + // let mut val_builder = "TimestampNanosecond(".to_string(); + // val_builder.push_str(val.unwrap().to_string().as_str()); + // val_builder.push(')'); + // tmp_vals.push(val_builder); + // let er = + // DaskPlannerError::InvalidIOFilter(scalar_value.to_string()); + // Err::, DaskPlannerError>(er) + + // SIMPLY DO NOT PUSH THE VALUES HERE AND IT WILL CAUSE THE ERROR TO BE PROPAGATED + } + _ => tmp_vals.push(scalar_value.to_string()), + }, + _ => panic!("hit this!"), + }, + } + + if tmp_vals.len() == 3 { + filter_tuple.push(( + tmp_vals[0].clone(), + tmp_vals[1].clone(), + tmp_vals[2].clone(), + )); + + Ok(filter_tuple) + } else { + println!( + "Wonder why tmp_vals doesn't equal 3?? {:?}, Value: {:?}", + tmp_vals.len(), + tmp_vals[0] + ); + let er = DaskPlannerError::InvalidIOFilter(format!( + "Wonder why tmp_vals doesn't equal 3?? {:?}, Value: {:?}", + tmp_vals.len(), + tmp_vals[0] + )); + Err::, DaskPlannerError>(er) + } + } + _ => { + println!( + "Unable to apply filter: `{}` to IO reader, using in Dask instead", + filter + ); + let er = DaskPlannerError::InvalidIOFilter(format!( + "Unable to apply filter: `{}` to IO reader, using in Dask instead", + filter + )); + Err::, DaskPlannerError>(er) + } + } + + // Ok(filter_tuple) + } + + pub fn _expand_dnf_filters(input: &Arc, filters: &Vec) -> PyFilteredResult { + // 1. Loop through all of the TableScan filters (Expr(s)) + let mut filtered_exprs: Vec<(String, String, String)> = Vec::new(); + let mut unfiltered_exprs: Vec = Vec::new(); + for filter in filters { + match PyTableScan::_expand_dnf_filter(filter) { + Ok(mut expanded_dnf_filter) => filtered_exprs.append(&mut expanded_dnf_filter), + Err(_e) => { + unfiltered_exprs.push(PyExpr::from(filter.clone(), Some(vec![input.clone()]))) + } + } + } + + PyFilteredResult { + io_unfilterable_exprs: unfiltered_exprs, + filtered_exprs, + } + } +} + #[pymethods] impl PyTableScan { #[pyo3(name = "getTableScanProjects")] @@ -43,12 +207,18 @@ impl PyTableScan { fn scan_filters(&self) -> PyResult> { py_expr_list(&self.input, &self.table_scan.filters) } + + #[pyo3(name = "getDNFFilters")] + fn dnf_io_filters(&self) -> PyResult { + let results = PyTableScan::_expand_dnf_filters(&self.input, &self.table_scan.filters); + Ok(results) + } } impl TryFrom for PyTableScan { type Error = PyErr; - fn try_from(logical_plan: LogicalPlan) -> Result { + fn try_from(logical_plan: LogicalPlan) -> std::result::Result { match logical_plan { LogicalPlan::TableScan(table_scan) => { // Create an input logical plan that's identical to the table scan with schema from the table source @@ -68,3 +238,40 @@ impl TryFrom for PyTableScan { } } } + +#[cfg(test)] +mod tests { + use datafusion::logical_expr::expr_fn::{binary_expr, col}; + use datafusion_common::ScalarValue; + use datafusion_expr::{Expr, Operator}; + + use crate::sql::logical::table_scan::PyTableScan; + + #[test] + pub fn expand_binary_exprs_dnf_filters() { + let jewlery = binary_expr( + col("item.i_category"), + Operator::Eq, + Expr::Literal(ScalarValue::new_utf8("Jewelry")), + ); + let women = binary_expr( + col("item.i_category"), + Operator::Eq, + Expr::Literal(ScalarValue::new_utf8("Women")), + ); + let music = binary_expr( + col("item.i_category"), + Operator::Eq, + Expr::Literal(ScalarValue::new_utf8("Music")), + ); + + let full_expr = binary_expr(jewlery, Operator::Or, women); + let full_expr = binary_expr(full_expr, Operator::Or, music); + println!("BinaryExpr: {:?}", full_expr); + + let filters = vec![full_expr]; + + let result = PyTableScan::_expand_dnf_filters(&filters); + println!("Result: {:?}", result); + } +} diff --git a/dask_sql/context.py b/dask_sql/context.py index f218c31ed..0d14bb2de 100644 --- a/dask_sql/context.py +++ b/dask_sql/context.py @@ -249,6 +249,14 @@ def create_table( **kwargs, ) + # Temporary for testing + self.schema[schema_name].tables_meta[table_name.lower()] = { + "input_path": input_table, + "table_name": table_name, + "format": format, + "gpu": gpu, + } + if type(input_table) == str: filepath = input_table dc.filepath = input_table diff --git a/dask_sql/datacontainer.py b/dask_sql/datacontainer.py index e4c93a8f5..38ea41ad7 100644 --- a/dask_sql/datacontainer.py +++ b/dask_sql/datacontainer.py @@ -282,6 +282,7 @@ class SchemaContainer: def __init__(self, name: str): self.__name__ = name self.tables: Dict[str, DataContainer] = {} + self.tables_meta: Dict[str, str] = {} self.statistics: Dict[str, Statistics] = {} self.experiments: Dict[str, pd.DataFrame] = {} self.models: Dict[str, Tuple[Any, List[str]]] = {} diff --git a/dask_sql/physical/rel/logical/filter.py b/dask_sql/physical/rel/logical/filter.py index 178121fef..0740c2d82 100644 --- a/dask_sql/physical/rel/logical/filter.py +++ b/dask_sql/physical/rel/logical/filter.py @@ -34,10 +34,12 @@ def filter_or_scalar(df: dd.DataFrame, filter_condition: Union[np.bool_, dd.Seri # In SQL, a NULL in a boolean is False on filtering filter_condition = filter_condition.fillna(False) out = df[filter_condition] - if dask_config.get("sql.predicate_pushdown"): - return attempt_predicate_pushdown(out) - else: - return out + # Commented out for this POC PR + # if dask_config.get("sql.predicate_pushdown"): + # return attempt_predicate_pushdown(out) + # else: + # return out + return out class DaskFilterPlugin(BaseRelPlugin): diff --git a/dask_sql/physical/rel/logical/table_scan.py b/dask_sql/physical/rel/logical/table_scan.py index 8bd7874f2..d8b0a837b 100644 --- a/dask_sql/physical/rel/logical/table_scan.py +++ b/dask_sql/physical/rel/logical/table_scan.py @@ -3,7 +3,12 @@ from functools import reduce from typing import TYPE_CHECKING -from dask_sql.datacontainer import DataContainer +import dask_cudf as ddf +import numpy as np +import pandas as pd +from dask.utils_test import hlg_layer + +from dask_sql.datacontainer import ColumnContainer, DataContainer from dask_sql.physical.rel.base import BaseRelPlugin from dask_sql.physical.rel.logical.filter import filter_or_scalar from dask_sql.physical.rex import RexConverter @@ -74,18 +79,71 @@ def _apply_projections(self, table_scan, dask_table, dc): return DataContainer(df, cc) def _apply_filters(self, table_scan, rel, dc, context): - df = dc.df - cc = dc.column_container + + # Columns that should be projected + cols = table_scan.getTableScanProjects() + filters = table_scan.getFilters() - # All partial filters here are applied in conjunction (&) if filters: - df_condition = reduce( - operator.and_, - [ - RexConverter.convert(rel, rex, dc, context=context) - for rex in filters - ], - ) - df = filter_or_scalar(df, df_condition) + df = dc.df + + # Generate the filters in DNF form for the cudf reader + filtered_result = table_scan.getDNFFilters() + filtered = filtered_result.filtered_exprs + unfiltered = filtered_result.io_unfilterable_exprs + + if len(filtered) > 0: + # Prepare the filters to be in the format expected by Python since they came from Rust + updated_filters = [] + for filter_tup in filtered: + if filter_tup[2].startswith("Int"): + num = filter_tup[2].split("(")[1].split(")")[0] + updated_filters.append((filter_tup[0], filter_tup[1], int(num))) + elif filter_tup[2].startswith("TimestampNanosecond"): + ns_timestamp = filter_tup[2].split("(")[1].split(")")[0] + print(f"Nanosecond Value from Rust: {ns_timestamp}") + val = pd.to_datetime(ns_timestamp, unit="ns") + updated_filters.append((filter_tup[0], filter_tup[1], val)) + elif filter_tup[2] == "np.nan": + updated_filters.append((filter_tup[0], filter_tup[1], np.nan)) + else: + updated_filters.append(filter_tup) + + # Rebuild the read_* operation from the existing Dask task + # TODO: This will currently only work with parquet, need to update + layer = hlg_layer(df.dask, "read-parquet") + creation_path = layer.creation_info["args"][0] + print( + f"Rebuilding Dask Task `read_parquet()` \n \ + Original Dask read-parquet: {layer} \n \ + Original creation_info: {layer.creation_info} \n \ + Path: {creation_path} \n \ + Filters: {updated_filters} \n \ + Columns: {cols} \n \ + split_row_groups: {layer.creation_info['kwargs']['split_row_groups']} \n \ + aggregate_files: {layer.creation_info['kwargs']['aggregate_files']}\n" + ) + + df = ddf.read_parquet( + creation_path, + filters=updated_filters, + columns=cols, + split_row_groups=layer.creation_info["kwargs"]["split_row_groups"], + aggregate_files=layer.creation_info["kwargs"]["aggregate_files"], + ) + dc = DataContainer(df, ColumnContainer(df.columns)) + + # All partial filters here are applied in conjunction (&) + if len(unfiltered) > 0: + df_condition = reduce( + operator.and_, + [ + RexConverter.convert(rel, rex, dc, context=context) + for rex in unfiltered + ], + ) + df = filter_or_scalar(df, df_condition) + + dc = DataContainer(df, ColumnContainer(df.columns)) - return DataContainer(df, cc) + return dc