diff --git a/Cargo.lock b/Cargo.lock index 8a4424dc82e5..4f9b96a51c63 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -688,7 +688,7 @@ dependencies = [ "async-lock 3.3.0", "async-task", "concurrent-queue", - "fastrand 2.0.1", + "fastrand 2.1.0", "futures-lite 2.2.0", "slab", ] @@ -975,7 +975,7 @@ version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d67782c3f868daa71d3533538e98a8e13713231969def7536e8039606fc46bf0" dependencies = [ - "fastrand 2.0.1", + "fastrand 2.1.0", "futures-core", "pin-project", "tokio", @@ -1266,7 +1266,7 @@ dependencies = [ "async-channel 2.2.0", "async-lock 3.3.0", "async-task", - "fastrand 2.0.1", + "fastrand 2.1.0", "futures-io", "futures-lite 2.2.0", "piper", @@ -2827,7 +2827,7 @@ dependencies = [ "anyhow", "cargo-license", "cargo_metadata 0.18.1", - "gix 0.62.0", + "gix 0.63.0", "log", "vergen", ] @@ -4170,6 +4170,7 @@ dependencies = [ name = "databend-common-storages-stage" version = "0.1.0" dependencies = [ + "arrow-schema", "async-backtrace", "async-trait-fn", "bstr 1.9.1", @@ -4190,10 +4191,12 @@ dependencies = [ "databend-common-settings", "databend-common-storage", "databend-common-storages-parquet", + "databend-storages-common-table-meta", "enum-as-inner 0.6.0", "futures", "log", "opendal", + "parquet", "serde", "serde_json", "typetag", @@ -5749,9 +5752,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "2.0.1" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" +checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" [[package]] name = "faststr" @@ -6171,7 +6174,7 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "445ba825b27408685aaecefd65178908c36c6e96aaf6d8599419d46e624192ba" dependencies = [ - "fastrand 2.0.1", + "fastrand 2.1.0", "futures-core", "futures-io", "parking", @@ -6504,49 +6507,49 @@ dependencies = [ [[package]] name = "gix" -version = "0.62.0" +version = "0.63.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5631c64fb4cd48eee767bf98a3cbc5c9318ef3bb71074d4c099a2371510282b6" +checksum = "984c5018adfa7a4536ade67990b3ebc6e11ab57b3d6cd9968de0947ca99b4b06" dependencies = [ - "gix-actor 0.31.1", + "gix-actor 0.31.2", "gix-archive", "gix-attributes", "gix-command", "gix-commitgraph 0.24.2", - "gix-config 0.36.1", + "gix-config 0.37.0", "gix-credentials", "gix-date", - "gix-diff 0.43.0", + "gix-diff 0.44.0", "gix-dir", - "gix-discover 0.31.0", - "gix-features 0.38.1", + "gix-discover 0.32.0", + "gix-features 0.38.2", "gix-filter", - "gix-fs 0.10.2", + "gix-fs 0.11.0", "gix-glob 0.16.2", "gix-hash", "gix-hashtable", "gix-ignore", - "gix-index 0.32.1", - "gix-lock 13.1.1", + "gix-index 0.33.0", + "gix-lock 14.0.0", "gix-macros", "gix-mailmap", "gix-negotiate", - "gix-object 0.42.1", - "gix-odb 0.60.0", - "gix-pack 0.50.0", + "gix-object 0.42.2", + "gix-odb 0.61.0", + "gix-pack 0.51.0", "gix-path", "gix-pathspec", "gix-prompt", - "gix-ref 0.43.0", + "gix-ref 0.44.0", "gix-refspec 0.23.0", - "gix-revision 0.27.0", - "gix-revwalk 0.13.0", + "gix-revision 0.27.1", + "gix-revwalk 0.13.1", "gix-sec", "gix-status", "gix-submodule", - "gix-tempfile 13.1.1", + "gix-tempfile 14.0.0", "gix-trace", - "gix-traverse 0.39.0", + "gix-traverse 0.39.1", "gix-url 0.27.3", "gix-utils", "gix-validate", @@ -6577,9 +6580,9 @@ dependencies = [ [[package]] name = "gix-actor" -version = "0.31.1" +version = "0.31.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45c3a3bde455ad2ee8ba8a195745241ce0b770a8a26faae59fcf409d01b28c46" +checksum = "d69c59d392c7e6c94385b6fd6089d6df0fe945f32b4357687989f3aee253cd7f" dependencies = [ "bstr 1.9.1", "gix-date", @@ -6591,13 +6594,13 @@ dependencies = [ [[package]] name = "gix-archive" -version = "0.12.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62f28b5481bbe35de9f2eacbd8dbc61da7b8d763eaecd667018602aa805e2e2e" +checksum = "ef585600612f8d75689ab0cb236e6c26620b40819e0e062c0578ea84c22a59ad" dependencies = [ "bstr 1.9.1", "gix-date", - "gix-object 0.42.1", + "gix-object 0.42.2", "gix-worktree-stream", "thiserror", ] @@ -6639,9 +6642,9 @@ dependencies = [ [[package]] name = "gix-command" -version = "0.3.6" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f90009020dc4b3de47beed28e1334706e0a330ddd17f5cfeb097df3b15a54b77" +checksum = "6c22e086314095c43ffe5cdc5c0922d5439da4fd726f3b0438c56147c34dc225" dependencies = [ "bstr 1.9.1", "gix-path", @@ -6671,7 +6674,7 @@ checksum = "f7b102311085da4af18823413b5176d7c500fb2272eaf391cfa8635d8bcb12c4" dependencies = [ "bstr 1.9.1", "gix-chunk", - "gix-features 0.38.1", + "gix-features 0.38.2", "gix-hash", "memmap2 0.9.4", "thiserror", @@ -6700,16 +6703,16 @@ dependencies = [ [[package]] name = "gix-config" -version = "0.36.1" +version = "0.37.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7580e05996e893347ad04e1eaceb92e1c0e6a3ffe517171af99bf6b6df0ca6e5" +checksum = "53fafe42957e11d98e354a66b6bd70aeea00faf2f62dd11164188224a507c840" dependencies = [ "bstr 1.9.1", "gix-config-value", - "gix-features 0.38.1", + "gix-features 0.38.2", "gix-glob 0.16.2", "gix-path", - "gix-ref 0.43.0", + "gix-ref 0.44.0", "gix-sec", "memchr", "once_cell", @@ -6775,18 +6778,18 @@ dependencies = [ [[package]] name = "gix-diff" -version = "0.43.0" +version = "0.44.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5fbc24115b957346cd23fb0f47d830eb799c46c89cdcf2f5acc9bf2938c2d01" +checksum = "40b9bd8b2d07b6675a840b56a6c177d322d45fa082672b0dad8f063b25baf0a4" dependencies = [ "bstr 1.9.1", "gix-command", "gix-filter", - "gix-fs 0.10.2", + "gix-fs 0.11.0", "gix-hash", - "gix-object 0.42.1", + "gix-object 0.42.2", "gix-path", - "gix-tempfile 13.1.1", + "gix-tempfile 14.0.0", "gix-trace", "gix-worktree", "imara-diff", @@ -6795,16 +6798,16 @@ dependencies = [ [[package]] name = "gix-dir" -version = "0.4.1" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d6fcd56ffa1133f35525af890226ad0d3b2e607b4490360c94b1869e278eba3" +checksum = "60c99f8c545abd63abe541d20ab6cda347de406c0a3f1c80aadc12d9b0e94974" dependencies = [ "bstr 1.9.1", - "gix-discover 0.31.0", - "gix-fs 0.10.2", + "gix-discover 0.32.0", + "gix-fs 0.11.0", "gix-ignore", - "gix-index 0.32.1", - "gix-object 0.42.1", + "gix-index 0.33.0", + "gix-object 0.42.2", "gix-path", "gix-pathspec", "gix-trace", @@ -6830,16 +6833,16 @@ dependencies = [ [[package]] name = "gix-discover" -version = "0.31.0" +version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64bab49087ed3710caf77e473dc0efc54ca33d8ccc6441359725f121211482b1" +checksum = "fc27c699b63da66b50d50c00668bc0b7e90c3a382ef302865e891559935f3dbf" dependencies = [ "bstr 1.9.1", "dunce", - "gix-fs 0.10.2", + "gix-fs 0.11.0", "gix-hash", "gix-path", - "gix-ref 0.43.0", + "gix-ref 0.44.0", "gix-sec", "thiserror", ] @@ -6864,9 +6867,9 @@ dependencies = [ [[package]] name = "gix-features" -version = "0.38.1" +version = "0.38.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db4254037d20a247a0367aa79333750146a369719f0c6617fec4f5752cc62b37" +checksum = "ac7045ac9fe5f9c727f38799d002a7ed3583cd777e3322a7c4b43e3cf437dc69" dependencies = [ "bytes", "bytesize", @@ -6888,16 +6891,16 @@ dependencies = [ [[package]] name = "gix-filter" -version = "0.11.1" +version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c0d1f01af62bfd2fb3dd291acc2b29d4ab3e96ad52a679174626508ce98ef12" +checksum = "00ce6ea5ac8fca7adbc63c48a1b9e0492c222c386aa15f513405f1003f2f4ab2" dependencies = [ "bstr 1.9.1", "encoding_rs", "gix-attributes", "gix-command", "gix-hash", - "gix-object 0.42.1", + "gix-object 0.42.2", "gix-packetline-blocking", "gix-path", "gix-quote", @@ -6918,11 +6921,12 @@ dependencies = [ [[package]] name = "gix-fs" -version = "0.10.2" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2184c40e7910529677831c8b481acf788ffd92427ed21fad65b6aa637e631b8" +checksum = "3f78f7d6dcda7a5809efd73a33b145e3dce7421c460df21f32126f9732736b0c" dependencies = [ - "gix-features 0.38.1", + "fastrand 2.1.0", + "gix-features 0.38.2", "gix-utils", ] @@ -6946,7 +6950,7 @@ checksum = "682bdc43cb3c00dbedfcc366de2a849b582efd8d886215dbad2ea662ec156bb5" dependencies = [ "bitflags 2.4.2", "bstr 1.9.1", - "gix-features 0.38.1", + "gix-features 0.38.2", "gix-path", ] @@ -7011,22 +7015,23 @@ dependencies = [ [[package]] name = "gix-index" -version = "0.32.1" +version = "0.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "881ab3b1fa57f497601a5add8289e72a7ae09471fc0b9bbe483b628ae8e418a1" +checksum = "2d8c5a5f1c58edcbc5692b174cda2703aba82ed17d7176ff4c1752eb48b1b167" dependencies = [ "bitflags 2.4.2", "bstr 1.9.1", "filetime", "fnv", "gix-bitmap", - "gix-features 0.38.1", - "gix-fs 0.10.2", + "gix-features 0.38.2", + "gix-fs 0.11.0", "gix-hash", - "gix-lock 13.1.1", - "gix-object 0.42.1", - "gix-traverse 0.39.0", + "gix-lock 14.0.0", + "gix-object 0.42.2", + "gix-traverse 0.39.1", "gix-utils", + "gix-validate", "hashbrown 0.14.3", "itoa", "libc", @@ -7049,20 +7054,20 @@ dependencies = [ [[package]] name = "gix-lock" -version = "13.1.1" +version = "14.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7c359f81f01b8352063319bcb39789b7ea0887b406406381106e38c4a34d049" +checksum = "e3bc7fe297f1f4614774989c00ec8b1add59571dc9b024b4c00acb7dedd4e19d" dependencies = [ - "gix-tempfile 13.1.1", + "gix-tempfile 14.0.0", "gix-utils", "thiserror", ] [[package]] name = "gix-macros" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1dff438f14e67e7713ab9332f5fd18c8f20eb7eb249494f6c2bf170522224032" +checksum = "999ce923619f88194171a67fb3e6d613653b8d4d6078b529b15a765da0edcc17" dependencies = [ "proc-macro2", "quote", @@ -7071,28 +7076,28 @@ dependencies = [ [[package]] name = "gix-mailmap" -version = "0.23.0" +version = "0.23.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28a62c86c08a65f99002013d58dd3312b2987705547436bdb90c507dcd9a41b1" +checksum = "cf3082fad1058fb25a5317f5a31f293bc054670aec76c0e3724dae059f6c32bf" dependencies = [ "bstr 1.9.1", - "gix-actor 0.31.1", + "gix-actor 0.31.2", "gix-date", "thiserror", ] [[package]] name = "gix-negotiate" -version = "0.13.0" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54ba98f8c8c06870dfc167d192ca38a38261867b836cb89ac80bc9176dba975e" +checksum = "d57dec54544d155a495e01de947da024471e1825d7d3f2724301c07a310d6184" dependencies = [ "bitflags 2.4.2", "gix-commitgraph 0.24.2", "gix-date", "gix-hash", - "gix-object 0.42.1", - "gix-revwalk 0.13.0", + "gix-object 0.42.2", + "gix-revwalk 0.13.1", "smallvec", "thiserror", ] @@ -7118,14 +7123,14 @@ dependencies = [ [[package]] name = "gix-object" -version = "0.42.1" +version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d4f8efae72030df1c4a81d02dbe2348e748d9b9a11e108ed6efbd846326e051" +checksum = "1fe2dc4a41191c680c942e6ebd630c8107005983c4679214fdb1007dcf5ae1df" dependencies = [ "bstr 1.9.1", - "gix-actor 0.31.1", + "gix-actor 0.31.2", "gix-date", - "gix-features 0.38.1", + "gix-features 0.38.2", "gix-hash", "gix-utils", "gix-validate", @@ -7156,17 +7161,17 @@ dependencies = [ [[package]] name = "gix-odb" -version = "0.60.0" +version = "0.61.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8bbb43d2fefdc4701ffdf9224844d05b136ae1b9a73c2f90710c8dd27a93503" +checksum = "e92b9790e2c919166865d0825b26cc440a387c175bed1b43a2fa99c0e9d45e98" dependencies = [ "arc-swap", "gix-date", - "gix-features 0.38.1", - "gix-fs 0.10.2", + "gix-features 0.38.2", + "gix-fs 0.11.0", "gix-hash", - "gix-object 0.42.1", - "gix-pack 0.50.0", + "gix-object 0.42.2", + "gix-pack 0.51.0", "gix-path", "gix-quote", "parking_lot 0.12.1", @@ -7196,18 +7201,18 @@ dependencies = [ [[package]] name = "gix-pack" -version = "0.50.0" +version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b58bad27c7677fa6b587aab3a1aca0b6c97373bd371a0a4290677c838c9bcaf1" +checksum = "7a8da51212dbff944713edb2141ed7e002eea326b8992070374ce13a6cb610b3" dependencies = [ "clru", "gix-chunk", - "gix-features 0.38.1", + "gix-features 0.38.2", "gix-hash", "gix-hashtable", - "gix-object 0.42.1", + "gix-object 0.42.2", "gix-path", - "gix-tempfile 13.1.1", + "gix-tempfile 14.0.0", "memmap2 0.9.4", "parking_lot 0.12.1", "smallvec", @@ -7242,9 +7247,9 @@ dependencies = [ [[package]] name = "gix-pathspec" -version = "0.7.4" +version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea9f934a111e0efdf93ae06e3648427e60e783099fbebd6a53a7a2ffb10a1e65" +checksum = "a76cab098dc10ba2d89f634f66bf196dea4d7db4bf10b75c7a9c201c55a2ee19" dependencies = [ "bitflags 2.4.2", "bstr 1.9.1", @@ -7257,9 +7262,9 @@ dependencies = [ [[package]] name = "gix-prompt" -version = "0.8.4" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5325eb17ce7b5e5d25dec5c2315d642a09d55b9888b3bf46b7d72e1621a55d8" +checksum = "fddabbc7c51c241600ab3c4623b19fa53bde7c1a2f637f61043ed5fcadf000cc" dependencies = [ "gix-command", "gix-config-value", @@ -7302,19 +7307,19 @@ dependencies = [ [[package]] name = "gix-ref" -version = "0.43.0" +version = "0.44.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd4aba68b925101cb45d6df328979af0681364579db889098a0de75b36c77b65" +checksum = "0b36752b448647acd59c9668fdd830b16d07db1e6d9c3b3af105c1605a6e23d9" dependencies = [ - "gix-actor 0.31.1", + "gix-actor 0.31.2", "gix-date", - "gix-features 0.38.1", - "gix-fs 0.10.2", + "gix-features 0.38.2", + "gix-fs 0.11.0", "gix-hash", - "gix-lock 13.1.1", - "gix-object 0.42.1", + "gix-lock 14.0.0", + "gix-object 0.42.2", "gix-path", - "gix-tempfile 13.1.1", + "gix-tempfile 14.0.0", "gix-utils", "gix-validate", "memmap2 0.9.4", @@ -7344,7 +7349,7 @@ checksum = "dde848865834a54fe4d9b4573f15d0e9a68eaf3d061b42d3ed52b4b8acf880b2" dependencies = [ "bstr 1.9.1", "gix-hash", - "gix-revision 0.27.0", + "gix-revision 0.27.1", "gix-validate", "smallvec", "thiserror", @@ -7368,16 +7373,16 @@ dependencies = [ [[package]] name = "gix-revision" -version = "0.27.0" +version = "0.27.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e34196e1969bd5d36e2fbc4467d893999132219d503e23474a8ad2b221cb1e8" +checksum = "63e08f8107ed1f93a83bcfbb4c38084c7cb3f6cd849793f1d5eec235f9b13b2b" dependencies = [ "bstr 1.9.1", "gix-date", "gix-hash", "gix-hashtable", - "gix-object 0.42.1", - "gix-revwalk 0.13.0", + "gix-object 0.42.2", + "gix-revwalk 0.13.1", "gix-trace", "thiserror", ] @@ -7399,15 +7404,15 @@ dependencies = [ [[package]] name = "gix-revwalk" -version = "0.13.0" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0a7d393ae814eeaae41a333c0ff684b243121cc61ccdc5bbe9897094588047d" +checksum = "4181db9cfcd6d1d0fd258e91569dbb61f94cb788b441b5294dd7f1167a3e788f" dependencies = [ "gix-commitgraph 0.24.2", "gix-date", "gix-hash", "gix-hashtable", - "gix-object 0.42.1", + "gix-object 0.42.2", "smallvec", "thiserror", ] @@ -7426,20 +7431,20 @@ dependencies = [ [[package]] name = "gix-status" -version = "0.9.0" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50c413bfd2952e4ee92e48438dac3c696f3555e586a34d184a427f6bedd1e4f9" +checksum = "2f4373d989713809554d136f51bc7da565adf45c91aa4d86ef6a79801621bfc8" dependencies = [ "bstr 1.9.1", "filetime", - "gix-diff 0.43.0", + "gix-diff 0.44.0", "gix-dir", - "gix-features 0.38.1", + "gix-features 0.38.2", "gix-filter", - "gix-fs 0.10.2", + "gix-fs 0.11.0", "gix-hash", - "gix-index 0.32.1", - "gix-object 0.42.1", + "gix-index 0.33.0", + "gix-object 0.42.2", "gix-path", "gix-pathspec", "gix-worktree", @@ -7448,12 +7453,12 @@ dependencies = [ [[package]] name = "gix-submodule" -version = "0.10.0" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fb7ea05666362472fecd44c1fc35fe48a5b9b841b431cc4f85b95e6f20c23ec" +checksum = "921cd49924ac14b6611b22e5fb7bbba74d8780dc7ad26153304b64d1272460ac" dependencies = [ "bstr 1.9.1", - "gix-config 0.36.1", + "gix-config 0.37.0", "gix-path", "gix-pathspec", "gix-refspec 0.23.0", @@ -7478,12 +7483,12 @@ dependencies = [ [[package]] name = "gix-tempfile" -version = "13.1.1" +version = "14.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a761d76594f4443b675e85928e4902dec333273836bd386906f01e7e346a0d11" +checksum = "d3b0e276cd08eb2a22e9f286a4f13a222a01be2defafa8621367515375644b99" dependencies = [ "dashmap", - "gix-fs 0.10.2", + "gix-fs 0.11.0", "libc", "once_cell", "parking_lot 0.12.1", @@ -7516,17 +7521,17 @@ dependencies = [ [[package]] name = "gix-traverse" -version = "0.39.0" +version = "0.39.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4029ec209b0cc480d209da3837a42c63801dd8548f09c1f4502c60accb62aeb" +checksum = "f20cb69b63eb3e4827939f42c05b7756e3488ef49c25c412a876691d568ee2a0" dependencies = [ "bitflags 2.4.2", "gix-commitgraph 0.24.2", "gix-date", "gix-hash", "gix-hashtable", - "gix-object 0.42.1", - "gix-revwalk 0.13.0", + "gix-object 0.42.2", + "gix-revwalk 0.13.1", "smallvec", "thiserror", ] @@ -7552,7 +7557,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0db829ebdca6180fbe32be7aed393591df6db4a72dbbc0b8369162390954d1cf" dependencies = [ "bstr 1.9.1", - "gix-features 0.38.1", + "gix-features 0.38.2", "gix-path", "home", "thiserror", @@ -7566,15 +7571,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35192df7fd0fa112263bad8021e2df7167df4cc2a6e6d15892e1e55621d3d4dc" dependencies = [ "bstr 1.9.1", - "fastrand 2.0.1", + "fastrand 2.1.0", "unicode-normalization", ] [[package]] name = "gix-validate" -version = "0.8.4" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e39fc6e06044985eac19dd34d474909e517307582e462b2eb4c8fa51b6241545" +checksum = "82c27dd34a49b1addf193c92070bcbf3beaf6e10f16a78544de6372e146a0acf" dependencies = [ "bstr 1.9.1", "thiserror", @@ -7582,36 +7587,37 @@ dependencies = [ [[package]] name = "gix-worktree" -version = "0.33.1" +version = "0.34.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f06ca5dd164678914fc9280ba9d1ffeb66499ccc16ab1278c513828beee88401" +checksum = "53f6b7de83839274022aff92157d7505f23debf739d257984a300a35972ca94e" dependencies = [ "bstr 1.9.1", "gix-attributes", - "gix-features 0.38.1", - "gix-fs 0.10.2", + "gix-features 0.38.2", + "gix-fs 0.11.0", "gix-glob 0.16.2", "gix-hash", "gix-ignore", - "gix-index 0.32.1", - "gix-object 0.42.1", + "gix-index 0.33.0", + "gix-object 0.42.2", "gix-path", + "gix-validate", ] [[package]] name = "gix-worktree-state" -version = "0.10.0" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70b4bcac42d5b3197d38e3f15f6eb277c5e6d6a1669c7beabed8f666dba1c9b8" +checksum = "e64b2835892ce553b15aef7f6f7bb1e39e146fdf71eb99609b86710a7786cf34" dependencies = [ "bstr 1.9.1", - "gix-features 0.38.1", + "gix-features 0.38.2", "gix-filter", - "gix-fs 0.10.2", + "gix-fs 0.11.0", "gix-glob 0.16.2", "gix-hash", - "gix-index 0.32.1", - "gix-object 0.42.1", + "gix-index 0.33.0", + "gix-object 0.42.2", "gix-path", "gix-worktree", "io-close", @@ -7620,18 +7626,18 @@ dependencies = [ [[package]] name = "gix-worktree-stream" -version = "0.12.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e73f7f8c1354516339a244f61f0fe3080b4cf25ddcbcfefd2d56661d51e88d4" +checksum = "4c5a4d58fa1375cd40a24c9d1a501520fcba17eea109c58c7e208b309635b46a" dependencies = [ "gix-attributes", - "gix-features 0.38.1", + "gix-features 0.38.2", "gix-filter", - "gix-fs 0.10.2", + "gix-fs 0.11.0", "gix-hash", - "gix-object 0.42.1", + "gix-object 0.42.2", "gix-path", - "gix-traverse 0.39.0", + "gix-traverse 0.39.1", "parking_lot 0.12.1", "thiserror", ] @@ -10675,7 +10681,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "668d31b1c4eba19242f2088b2bf3316b82ca31082a8335764db4e083db7485d4" dependencies = [ "atomic-waker", - "fastrand 2.0.1", + "fastrand 2.1.0", "futures-io", ] @@ -13705,7 +13711,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1" dependencies = [ "cfg-if", - "fastrand 2.0.1", + "fastrand 2.1.0", "rustix 0.38.31", "windows-sys 0.52.0", ] diff --git a/src/common/building/Cargo.toml b/src/common/building/Cargo.toml index 10ded652ef50..441ac7528168 100644 --- a/src/common/building/Cargo.toml +++ b/src/common/building/Cargo.toml @@ -14,6 +14,6 @@ test = true anyhow = { workspace = true } cargo-license = "0.6.1" cargo_metadata = "0.18" -gix = "0.62.0" +gix = "0.63.0" log = { workspace = true } vergen = { version = "8.3.1", default-features = false, features = ["build", "cargo", "git", "gix", "rustc"] } diff --git a/src/meta/raft-store/src/sm_v002/leveled_store/arc_level_impl.rs b/src/meta/raft-store/src/sm_v002/leveled_store/immutable.rs similarity index 75% rename from src/meta/raft-store/src/sm_v002/leveled_store/arc_level_impl.rs rename to src/meta/raft-store/src/sm_v002/leveled_store/immutable.rs index f9db1cb60b6d..905fd81d968c 100644 --- a/src/meta/raft-store/src/sm_v002/leveled_store/arc_level_impl.rs +++ b/src/meta/raft-store/src/sm_v002/leveled_store/immutable.rs @@ -14,6 +14,7 @@ use std::borrow::Borrow; use std::io; +use std::ops::Deref; use std::ops::RangeBounds; use std::sync::Arc; @@ -27,10 +28,50 @@ use crate::sm_v002::leveled_store::map_api::MarkedOf; use crate::sm_v002::marked::Marked; use crate::state_machine::ExpireKey; -impl Level { +/// A single **immutable** level of state machine data. +/// +/// Immutable level implement only [`MapApiRO`], but not [`MapApi`]. +/// +/// [`MapApi`]: crate::sm_v002::leveled_store::map_api::MapApi +#[derive(Debug, Clone)] +pub struct Immutable { + level: Arc, +} + +impl Immutable { + pub fn new(level: Arc) -> Self { + Self { level } + } + + pub fn new_from_level(level: Level) -> Self { + Self { + level: Arc::new(level), + } + } + + pub fn inner(&self) -> &Arc { + &self.level + } +} + +impl AsRef for Immutable { + fn as_ref(&self) -> &Level { + self.level.as_ref() + } +} + +impl Deref for Immutable { + type Target = Level; + + fn deref(&self) -> &Self::Target { + self.level.as_ref() + } +} + +impl Immutable { /// Build a static stream that yields key values for primary index #[futures_async_stream::try_stream(boxed, ok = MapKV, error = io::Error)] - async fn str_range(self: Arc, range: R) + async fn str_range(self: Immutable, range: R) where String: Borrow, Q: Ord + Send + Sync + ?Sized, @@ -45,7 +86,7 @@ impl Level { /// Build a static stream that yields expire key and key for the secondary expiration index #[futures_async_stream::try_stream(boxed, ok = MapKV, error = io::Error)] - async fn expire_range(self: Arc, range: R) + async fn expire_range(self: Immutable, range: R) where ExpireKey: Borrow, Q: Ord + Send + Sync + ?Sized, @@ -60,7 +101,7 @@ impl Level { } #[async_trait::async_trait] -impl MapApiRO for Arc { +impl MapApiRO for Immutable { async fn get(&self, key: &Q) -> Result::V>, io::Error> where String: Borrow, @@ -78,7 +119,7 @@ impl MapApiRO for Arc { } #[async_trait::async_trait] -impl MapApiRO for Arc { +impl MapApiRO for Immutable { async fn get(&self, key: &Q) -> Result, io::Error> where ExpireKey: Borrow, diff --git a/src/meta/raft-store/src/sm_v002/leveled_store/static_levels.rs b/src/meta/raft-store/src/sm_v002/leveled_store/immutable_levels.rs similarity index 81% rename from src/meta/raft-store/src/sm_v002/leveled_store/static_levels.rs rename to src/meta/raft-store/src/sm_v002/leveled_store/immutable_levels.rs index 95578f380402..011cfd0cad1f 100644 --- a/src/meta/raft-store/src/sm_v002/leveled_store/static_levels.rs +++ b/src/meta/raft-store/src/sm_v002/leveled_store/immutable_levels.rs @@ -15,8 +15,8 @@ use std::borrow::Borrow; use std::io; use std::ops::RangeBounds; -use std::sync::Arc; +use crate::sm_v002::leveled_store::immutable::Immutable; use crate::sm_v002::leveled_store::level::Level; use crate::sm_v002::leveled_store::map_api::compacted_get; use crate::sm_v002::leveled_store::map_api::compacted_range; @@ -28,20 +28,20 @@ use crate::sm_v002::marked::Marked; /// A readonly leveled map that owns the data. #[derive(Debug, Default, Clone)] -pub struct StaticLevels { +pub struct ImmutableLevels { /// From oldest to newest, i.e., levels[0] is the oldest - levels: Vec>, + levels: Vec, } -impl StaticLevels { - pub(in crate::sm_v002) fn new(levels: impl IntoIterator>) -> Self { +impl ImmutableLevels { + pub(in crate::sm_v002) fn new(levels: impl IntoIterator) -> Self { Self { levels: levels.into_iter().collect(), } } /// Return an iterator of all Arc of levels from newest to oldest. - pub(in crate::sm_v002) fn iter_arc_levels(&self) -> impl Iterator> { + pub(in crate::sm_v002) fn iter_immutable_levels(&self) -> impl Iterator { self.levels.iter().rev() } @@ -50,11 +50,11 @@ impl StaticLevels { self.levels.iter().map(|x| x.as_ref()).rev() } - pub(in crate::sm_v002) fn newest(&self) -> Option<&Arc> { + pub(in crate::sm_v002) fn newest(&self) -> Option<&Immutable> { self.levels.last() } - pub(in crate::sm_v002) fn push(&mut self, level: Arc) { + pub(in crate::sm_v002) fn push(&mut self, level: Immutable) { self.levels.push(level); } @@ -69,24 +69,24 @@ impl StaticLevels { } #[async_trait::async_trait] -impl MapApiRO for StaticLevels +impl MapApiRO for ImmutableLevels where K: MapKey, Level: MapApiRO, - Arc: MapApiRO, + Immutable: MapApiRO, { async fn get(&self, key: &Q) -> Result, io::Error> where K: Borrow, Q: Ord + Send + Sync + ?Sized, { - let levels = self.iter_arc_levels(); + let levels = self.iter_immutable_levels(); compacted_get(key, levels).await } async fn range(&self, range: R) -> Result, io::Error> where R: RangeBounds + Clone + Send + Sync + 'static { - let levels = self.iter_arc_levels(); + let levels = self.iter_immutable_levels(); compacted_range::<_, _, _, Level>(range, None, levels).await } } diff --git a/src/meta/raft-store/src/sm_v002/leveled_store/leveled_map.rs b/src/meta/raft-store/src/sm_v002/leveled_store/leveled_map.rs index 5db4a2f4d584..3cdad7a47a5e 100644 --- a/src/meta/raft-store/src/sm_v002/leveled_store/leveled_map.rs +++ b/src/meta/raft-store/src/sm_v002/leveled_store/leveled_map.rs @@ -16,10 +16,11 @@ use std::borrow::Borrow; use std::fmt; use std::io; use std::ops::RangeBounds; -use std::sync::Arc; use databend_common_meta_types::KVMeta; +use crate::sm_v002::leveled_store::immutable::Immutable; +use crate::sm_v002::leveled_store::immutable_levels::ImmutableLevels; use crate::sm_v002::leveled_store::level::Level; use crate::sm_v002::leveled_store::map_api::compacted_get; use crate::sm_v002::leveled_store::map_api::compacted_range; @@ -31,7 +32,6 @@ use crate::sm_v002::leveled_store::map_api::MarkedOf; use crate::sm_v002::leveled_store::map_api::Transition; use crate::sm_v002::leveled_store::ref_::Ref; use crate::sm_v002::leveled_store::ref_mut::RefMut; -use crate::sm_v002::leveled_store::static_levels::StaticLevels; use crate::sm_v002::marked::Marked; /// State machine data organized in multiple levels. @@ -46,14 +46,14 @@ pub struct LeveledMap { writable: Level, /// The immutable levels. - frozen: StaticLevels, + immutable_levels: ImmutableLevels, } impl LeveledMap { pub(crate) fn new(writable: Level) -> Self { Self { writable, - frozen: Default::default(), + immutable_levels: Default::default(), } } @@ -61,24 +61,28 @@ impl LeveledMap { pub(in crate::sm_v002) fn iter_levels(&self) -> impl Iterator { [&self.writable] .into_iter() - .chain(self.frozen.iter_levels()) + .chain(self.immutable_levels.iter_levels()) } - /// Return the top level and an iterator of all frozen levels, in newest to oldest order. + /// Return the top level and an iterator of all immutable levels, in newest to oldest order. pub(in crate::sm_v002) fn iter_shared_levels( &self, - ) -> (Option<&Level>, impl Iterator>) { - (Some(&self.writable), self.frozen.iter_arc_levels()) + ) -> (Option<&Level>, impl Iterator) { + ( + Some(&self.writable), + self.immutable_levels.iter_immutable_levels(), + ) } /// Freeze the current writable level and create a new empty writable level. - pub fn freeze_writable(&mut self) -> &StaticLevels { + pub fn freeze_writable(&mut self) -> &ImmutableLevels { let new_writable = self.writable.new_level(); - let frozen = std::mem::replace(&mut self.writable, new_writable); - self.frozen.push(Arc::new(frozen)); + let immutable = std::mem::replace(&mut self.writable, new_writable); + self.immutable_levels + .push(Immutable::new_from_level(immutable)); - &self.frozen + &self.immutable_levels } /// Return an immutable reference to the top level i.e., the writable level. @@ -92,22 +96,22 @@ impl LeveledMap { } /// Return a reference to the immutable levels. - pub fn frozen_ref(&self) -> &StaticLevels { - &self.frozen + pub fn immutable_levels_ref(&self) -> &ImmutableLevels { + &self.immutable_levels } /// Replace all immutable levels with the given one. - pub(crate) fn replace_frozen(&mut self, b: StaticLevels) { - self.frozen = b; + pub(crate) fn replace_immutable_levels(&mut self, b: ImmutableLevels) { + self.immutable_levels = b; } pub(crate) fn to_ref_mut(&mut self) -> RefMut { - RefMut::new(&mut self.writable, &self.frozen) + RefMut::new(&mut self.writable, &self.immutable_levels) } #[allow(dead_code)] pub(crate) fn to_ref(&self) -> Ref { - Ref::new(Some(&self.writable), &self.frozen) + Ref::new(Some(&self.writable), &self.immutable_levels) } } @@ -116,7 +120,7 @@ impl MapApiRO for LeveledMap where K: MapKey + fmt::Debug, Level: MapApiRO, - Arc: MapApiRO, + Immutable: MapApiRO, { async fn get(&self, key: &Q) -> Result, io::Error> where @@ -139,7 +143,7 @@ impl MapApi for LeveledMap where K: MapKey, Level: MapApi, - Arc: MapApiRO, + Immutable: MapApiRO, { async fn set( &mut self, diff --git a/src/meta/raft-store/src/sm_v002/leveled_store/leveled_map_test.rs b/src/meta/raft-store/src/sm_v002/leveled_store/leveled_map_test.rs index 024631aa6e38..62bbfd794d35 100644 --- a/src/meta/raft-store/src/sm_v002/leveled_store/leveled_map_test.rs +++ b/src/meta/raft-store/src/sm_v002/leveled_store/leveled_map_test.rs @@ -51,9 +51,9 @@ async fn test_freeze() -> anyhow::Result<()> { ]); // Listing from the base level sees the old value. - let frozen = l.frozen_ref(); + let immutables = l.immutable_levels_ref(); - let got = frozen + let got = immutables .str_map() .range(s("")..) .await? @@ -193,9 +193,9 @@ async fn test_two_levels() -> anyhow::Result<()> { // Check base level - let frozen = l.frozen_ref(); + let immutables = l.immutable_levels_ref(); - let strm = frozen.str_map().range(s("")..).await?; + let strm = immutables.str_map().range(s("")..).await?; let got = strm.try_collect::>().await?; assert_eq!(got, vec![ // diff --git a/src/meta/raft-store/src/sm_v002/leveled_store/map_api.rs b/src/meta/raft-store/src/sm_v002/leveled_store/map_api.rs index 08561af6aaf6..718c002d82b9 100644 --- a/src/meta/raft-store/src/sm_v002/leveled_store/map_api.rs +++ b/src/meta/raft-store/src/sm_v002/leveled_store/map_api.rs @@ -223,9 +223,9 @@ where /// There could be tombstone entries: [`Marked::TombStone`]. /// /// The `TOP` is the type of the top level. -/// The `L` is the type of frozen levels. +/// The `L` is the type of immutable levels. /// -/// Because the top level is very likely to be a different type from the frozen levels, i.e., it is writable. +/// Because the top level is very likely to be a different type from the immutable levels, i.e., it is writable. pub(in crate::sm_v002) async fn compacted_range<'d, K, R, L, TOP>( range: R, top: Option<&'d TOP>, @@ -257,10 +257,10 @@ where #[cfg(test)] mod tests { - use std::sync::Arc; use futures_util::TryStreamExt; + use crate::sm_v002::leveled_store::immutable::Immutable; use crate::sm_v002::leveled_store::level::Level; use crate::sm_v002::leveled_store::map_api::compacted_get; use crate::sm_v002::leveled_store::map_api::compacted_range; @@ -301,12 +301,12 @@ mod tests { let mut l0 = Level::default(); l0.set(s("a"), Some((b("a"), None))).await?; l0.set(s("b"), Some((b("b"), None))).await?; - let l0 = Arc::new(l0); + let l0 = Immutable::new_from_level(l0); let mut l1 = l0.new_level(); l1.set(s("a"), None).await?; l1.set(s("c"), None).await?; - let l1 = Arc::new(l1); + let l1 = Immutable::new_from_level(l1); let mut l2 = l1.new_level(); l2.set(s("b"), Some((b("b2"), None))).await?; diff --git a/src/meta/raft-store/src/sm_v002/leveled_store/mod.rs b/src/meta/raft-store/src/sm_v002/leveled_store/mod.rs index 099e58919143..12d8f7b58b21 100644 --- a/src/meta/raft-store/src/sm_v002/leveled_store/mod.rs +++ b/src/meta/raft-store/src/sm_v002/leveled_store/mod.rs @@ -12,13 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod arc_level_impl; +pub mod immutable; +pub mod immutable_levels; pub mod level; pub mod leveled_map; pub mod map_api; pub mod ref_; pub mod ref_mut; -pub mod static_levels; pub mod sys_data; pub mod sys_data_api; pub mod util; diff --git a/src/meta/raft-store/src/sm_v002/leveled_store/ref_.rs b/src/meta/raft-store/src/sm_v002/leveled_store/ref_.rs index dbaadaaa2ebd..14769f6aaabd 100644 --- a/src/meta/raft-store/src/sm_v002/leveled_store/ref_.rs +++ b/src/meta/raft-store/src/sm_v002/leveled_store/ref_.rs @@ -16,15 +16,15 @@ use std::borrow::Borrow; use std::fmt; use std::io; use std::ops::RangeBounds; -use std::sync::Arc; +use crate::sm_v002::leveled_store::immutable::Immutable; +use crate::sm_v002::leveled_store::immutable_levels::ImmutableLevels; use crate::sm_v002::leveled_store::level::Level; use crate::sm_v002::leveled_store::map_api::compacted_get; use crate::sm_v002::leveled_store::map_api::compacted_range; use crate::sm_v002::leveled_store::map_api::KVResultStream; use crate::sm_v002::leveled_store::map_api::MapApiRO; use crate::sm_v002::leveled_store::map_api::MapKey; -use crate::sm_v002::leveled_store::static_levels::StaticLevels; use crate::sm_v002::marked::Marked; /// A readonly leveled map that does not not own the data. @@ -34,26 +34,31 @@ pub struct Ref<'d> { writable: Option<&'d Level>, /// The immutable levels. - frozen: &'d StaticLevels, + immutable_levels: &'d ImmutableLevels, } impl<'d> Ref<'d> { pub(in crate::sm_v002) fn new( writable: Option<&'d Level>, - frozen: &'d StaticLevels, + immutable_levels: &'d ImmutableLevels, ) -> Ref<'d> { - Self { writable, frozen } + Self { + writable, + immutable_levels, + } } /// Return an iterator of all levels in reverse order. pub(in crate::sm_v002) fn iter_levels(&self) -> impl Iterator + 'd { - self.writable.into_iter().chain(self.frozen.iter_levels()) + self.writable + .into_iter() + .chain(self.immutable_levels.iter_levels()) } pub(in crate::sm_v002) fn iter_shared_levels( &self, - ) -> (Option<&Level>, impl Iterator>) { - (self.writable, self.frozen.iter_arc_levels()) + ) -> (Option<&Level>, impl Iterator) { + (self.writable, self.immutable_levels.iter_immutable_levels()) } } @@ -62,7 +67,7 @@ impl<'d, K> MapApiRO for Ref<'d> where K: MapKey + fmt::Debug, Level: MapApiRO, - Arc: MapApiRO, + Immutable: MapApiRO, { async fn get(&self, key: &Q) -> Result, io::Error> where diff --git a/src/meta/raft-store/src/sm_v002/leveled_store/ref_mut.rs b/src/meta/raft-store/src/sm_v002/leveled_store/ref_mut.rs index d2cd5c69a422..1ecb619ec627 100644 --- a/src/meta/raft-store/src/sm_v002/leveled_store/ref_mut.rs +++ b/src/meta/raft-store/src/sm_v002/leveled_store/ref_mut.rs @@ -15,10 +15,11 @@ use std::borrow::Borrow; use std::io; use std::ops::RangeBounds; -use std::sync::Arc; use databend_common_meta_types::KVMeta; +use crate::sm_v002::leveled_store::immutable::Immutable; +use crate::sm_v002::leveled_store::immutable_levels::ImmutableLevels; use crate::sm_v002::leveled_store::level::Level; use crate::sm_v002::leveled_store::map_api::compacted_get; use crate::sm_v002::leveled_store::map_api::compacted_range; @@ -29,7 +30,6 @@ use crate::sm_v002::leveled_store::map_api::MapKey; use crate::sm_v002::leveled_store::map_api::MarkedOf; use crate::sm_v002::leveled_store::map_api::Transition; use crate::sm_v002::leveled_store::ref_::Ref; -use crate::sm_v002::leveled_store::static_levels::StaticLevels; use crate::sm_v002::marked::Marked; /// A writable leveled map that does not not own the data. @@ -39,30 +39,39 @@ pub struct RefMut<'d> { writable: &'d mut Level, /// The immutable levels. - frozen: &'d StaticLevels, + immutable_levels: &'d ImmutableLevels, } impl<'d> RefMut<'d> { - pub(in crate::sm_v002) fn new(writable: &'d mut Level, frozen: &'d StaticLevels) -> Self { - Self { writable, frozen } + pub(in crate::sm_v002) fn new( + writable: &'d mut Level, + immutable_levels: &'d ImmutableLevels, + ) -> Self { + Self { + writable, + immutable_levels, + } } #[allow(dead_code)] pub(in crate::sm_v002) fn to_ref(&self) -> Ref { - Ref::new(Some(&*self.writable), self.frozen) + Ref::new(Some(&*self.writable), self.immutable_levels) } /// Return an iterator of all levels in new-to-old order. pub(in crate::sm_v002) fn iter_levels(&self) -> impl Iterator + '_ { [&*self.writable] .into_iter() - .chain(self.frozen.iter_levels()) + .chain(self.immutable_levels.iter_levels()) } pub(in crate::sm_v002) fn iter_shared_levels( &self, - ) -> (Option<&Level>, impl Iterator>) { - (Some(self.writable), self.frozen.iter_arc_levels()) + ) -> (Option<&Level>, impl Iterator) { + ( + Some(self.writable), + self.immutable_levels.iter_immutable_levels(), + ) } } @@ -73,7 +82,7 @@ impl<'d, K> MapApiRO for RefMut<'d> where K: MapKey, Level: MapApiRO, - Arc: MapApiRO, + Immutable: MapApiRO, { async fn get(&self, key: &Q) -> Result, io::Error> where @@ -96,7 +105,7 @@ impl<'d, K> MapApi for RefMut<'d> where K: MapKey, Level: MapApi, - Arc: MapApiRO, + Immutable: MapApiRO, { async fn set( &mut self, diff --git a/src/meta/raft-store/src/sm_v002/leveled_store/util.rs b/src/meta/raft-store/src/sm_v002/leveled_store/util.rs index 2165684d8f2b..115df316974b 100644 --- a/src/meta/raft-store/src/sm_v002/leveled_store/util.rs +++ b/src/meta/raft-store/src/sm_v002/leveled_store/util.rs @@ -15,19 +15,16 @@ use std::fmt; use std::io; -use crate::sm_v002::leveled_store::map_api::MapKV; use crate::sm_v002::leveled_store::map_api::MapKey; use crate::sm_v002::marked::Marked; +/// Result type of a key-value pair and io Error used in a map. +type KVResult = Result<(K, Marked<::V>), io::Error>; + /// Sort by key and internal_seq. /// Return `true` if `a` should be placed before `b`, e.g., `a` is smaller. -pub(in crate::sm_v002) fn by_key_seq( - r1: &Result, io::Error>, - r2: &Result, io::Error>, -) -> bool -where - K: MapKey + Ord + fmt::Debug, -{ +pub(in crate::sm_v002) fn by_key_seq(r1: &KVResult, r2: &KVResult) -> bool +where K: MapKey + Ord + fmt::Debug { match (r1, r2) { (Ok((k1, v1)), Ok((k2, v2))) => { assert_ne!((k1, v1.internal_seq()), (k2, v2.internal_seq())); @@ -42,9 +39,6 @@ where } } -/// Result type of a key-value pair and io Error used in a map. -type KVResult = Result, io::Error>; - /// Return a Ok(combined) to merge two consecutive values, /// otherwise return Err((x,y)) to not to merge. #[allow(clippy::type_complexity)] diff --git a/src/meta/raft-store/src/sm_v002/sm_v002.rs b/src/meta/raft-store/src/sm_v002/sm_v002.rs index c2d25b270108..fa50e9db178f 100644 --- a/src/meta/raft-store/src/sm_v002/sm_v002.rs +++ b/src/meta/raft-store/src/sm_v002/sm_v002.rs @@ -424,13 +424,13 @@ impl SMV002 { pub fn replace_frozen(&mut self, snapshot: &SnapshotViewV002) { assert!( Arc::ptr_eq( - self.levels.frozen_ref().newest().unwrap(), - snapshot.original_ref().newest().unwrap() + self.levels.immutable_levels_ref().newest().unwrap().inner(), + snapshot.original_ref().newest().unwrap().inner() ), "the frozen must not change" ); - self.levels.replace_frozen(snapshot.compacted()); + self.levels.replace_immutable_levels(snapshot.compacted()); } /// It returns 2 entries: the previous one and the new one after upsert. diff --git a/src/meta/raft-store/src/sm_v002/snapshot_view_v002.rs b/src/meta/raft-store/src/sm_v002/snapshot_view_v002.rs index d1ca760ce4fe..43b6b1b40a47 100644 --- a/src/meta/raft-store/src/sm_v002/snapshot_view_v002.rs +++ b/src/meta/raft-store/src/sm_v002/snapshot_view_v002.rs @@ -14,7 +14,6 @@ use std::future; use std::io; -use std::sync::Arc; use databend_common_meta_types::SeqNum; use databend_common_meta_types::SeqV; @@ -25,10 +24,11 @@ use futures_util::TryStreamExt; use crate::key_spaces::SMEntry; use crate::ondisk::Header; use crate::ondisk::OnDisk; +use crate::sm_v002::leveled_store::immutable::Immutable; +use crate::sm_v002::leveled_store::immutable_levels::ImmutableLevels; use crate::sm_v002::leveled_store::map_api::AsMap; use crate::sm_v002::leveled_store::map_api::MapApiRO; use crate::sm_v002::leveled_store::map_api::ResultStream; -use crate::sm_v002::leveled_store::static_levels::StaticLevels; use crate::sm_v002::leveled_store::sys_data_api::SysDataApiRO; use crate::state_machine::ExpireValue; use crate::state_machine::MetaSnapshotId; @@ -38,16 +38,16 @@ use crate::state_machine::StateMachineMetaValue; /// A snapshot view of a state machine, which is static and not affected by further writing to the state machine. pub struct SnapshotViewV002 { /// The compacted snapshot data. - compacted: StaticLevels, + compacted: ImmutableLevels, /// Original non compacted snapshot data. /// /// This is kept just for debug. - original: StaticLevels, + original: ImmutableLevels, } impl SnapshotViewV002 { - pub fn new(top: StaticLevels) -> Self { + pub fn new(top: ImmutableLevels) -> Self { Self { compacted: top.clone(), original: top, @@ -55,12 +55,12 @@ impl SnapshotViewV002 { } /// Return the data level of this snapshot - pub fn compacted(&self) -> StaticLevels { + pub fn compacted(&self) -> ImmutableLevels { self.compacted.clone() } /// The original, non compacted snapshot data. - pub fn original_ref(&self) -> &StaticLevels { + pub fn original_ref(&self) -> &ImmutableLevels { &self.original } @@ -110,7 +110,7 @@ impl SnapshotViewV002 { data.replace_expire(bt); - self.compacted = StaticLevels::new([Arc::new(data)]); + self.compacted = ImmutableLevels::new([Immutable::new_from_level(data)]); Ok(()) } diff --git a/src/meta/raft-store/src/sm_v002/snapshot_view_v002_test.rs b/src/meta/raft-store/src/sm_v002/snapshot_view_v002_test.rs index 5bb6a352aa82..d2289cdd1bd3 100644 --- a/src/meta/raft-store/src/sm_v002/snapshot_view_v002_test.rs +++ b/src/meta/raft-store/src/sm_v002/snapshot_view_v002_test.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use databend_common_meta_types::Endpoint; use databend_common_meta_types::KVMeta; use databend_common_meta_types::Membership; @@ -26,11 +24,12 @@ use openraft::testing::log_id; use pretty_assertions::assert_eq; use crate::key_spaces::RaftStoreEntry; +use crate::sm_v002::leveled_store::immutable::Immutable; +use crate::sm_v002::leveled_store::immutable_levels::ImmutableLevels; use crate::sm_v002::leveled_store::leveled_map::LeveledMap; use crate::sm_v002::leveled_store::map_api::AsMap; use crate::sm_v002::leveled_store::map_api::MapApi; use crate::sm_v002::leveled_store::map_api::MapApiRO; -use crate::sm_v002::leveled_store::static_levels::StaticLevels; use crate::sm_v002::leveled_store::sys_data_api::SysDataApiRO; use crate::sm_v002::marked::Marked; use crate::sm_v002::sm_v002::SMV002; @@ -51,7 +50,7 @@ async fn test_compact_copied_value_and_kv() -> anyhow::Result<()> { let d = top_level.newest().unwrap().as_ref(); - assert_eq!(top_level.iter_arc_levels().count(), 1); + assert_eq!(top_level.iter_immutable_levels().count(), 1); assert_eq!( d.last_membership_ref(), &StoredMembership::new(Some(log_id(3, 3, 3)), Membership::new(vec![], ())) @@ -214,7 +213,7 @@ async fn test_import() -> anyhow::Result<()> { let d = SMV002::import(data)?; - let snapshot = SnapshotViewV002::new(StaticLevels::new([Arc::new(d)])); + let snapshot = SnapshotViewV002::new(ImmutableLevels::new([Immutable::new_from_level(d)])); let got = snapshot .export() diff --git a/src/query/service/src/interpreters/interpreter_table_analyze.rs b/src/query/service/src/interpreters/interpreter_table_analyze.rs index 4f1554e5585c..5cac4bb7614e 100644 --- a/src/query/service/src/interpreters/interpreter_table_analyze.rs +++ b/src/query/service/src/interpreters/interpreter_table_analyze.rs @@ -87,8 +87,8 @@ impl Interpreter for AnalyzeTableInterpreter { .read_table_snapshot_statistics(Some(&snapshot)) .await?; - let temporal_str = if let Some(table_statistics) = &table_statistics { - let is_full = table + let (is_full, temporal_str) = if let Some(table_statistics) = &table_statistics { + let is_full = match table .navigate_to_point( &NavigationPoint::SnapshotID( table_statistics.snapshot_id.simple().to_string(), @@ -96,10 +96,16 @@ impl Interpreter for AnalyzeTableInterpreter { self.ctx.clone().get_abort_checker(), ) .await - .is_err(); - - if is_full { - format!("AT (snapshot => '{}')", snapshot.snapshot_id.simple(),) + { + Ok(t) => !t + .read_table_snapshot() + .await + .is_ok_and(|s| s.is_some_and(|s| s.prev_table_seq.is_some())), + Err(_) => true, + }; + + let temporal_str = if is_full { + format!("AT (snapshot => '{}')", snapshot.snapshot_id.simple()) } else { // analyze only need to collect the added blocks. let table_alias = format!("_change_insert${:08x}", Utc::now().timestamp()); @@ -108,9 +114,13 @@ impl Interpreter for AnalyzeTableInterpreter { table_statistics.snapshot_id.simple(), snapshot.snapshot_id.simple(), ) - } + }; + (is_full, temporal_str) } else { - format!("AT (snapshot => '{}')", snapshot.snapshot_id.simple(),) + ( + true, + format!("AT (snapshot => '{}')", snapshot.snapshot_id.simple()), + ) }; let index_cols: Vec<(u32, String)> = schema @@ -134,10 +144,8 @@ impl Interpreter for AnalyzeTableInterpreter { .join(", "); let sql = format!( - "SELECT {select_expr}, {} as is_full from {}.{} {temporal_str}", - temporal_str.is_empty(), - plan.database, - plan.table, + "SELECT {select_expr}, {is_full} as is_full from {}.{} {temporal_str}", + plan.database, plan.table, ); log::info!("Analyze via sql {:?}", sql); diff --git a/src/query/service/src/interpreters/interpreter_txn_commit.rs b/src/query/service/src/interpreters/interpreter_txn_commit.rs index 73bc73c6df7b..7c2545c4ee7a 100644 --- a/src/query/service/src/interpreters/interpreter_txn_commit.rs +++ b/src/query/service/src/interpreters/interpreter_txn_commit.rs @@ -18,6 +18,8 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_storages_fuse::TableContext; use databend_storages_common_txn::TxnManagerRef; +use log::error; +use log::info; use crate::interpreters::Interpreter; use crate::pipelines::PipelineBuildResult; @@ -55,15 +57,51 @@ impl Interpreter for CommitInterpreter { let is_active = self.ctx.txn_mgr().lock().is_active(); if is_active { let catalog = self.ctx.get_default_catalog()?; + let req = self.ctx.txn_mgr().lock().req(); - let mismatched_tids = catalog.update_multi_table_meta(req).await?; + + let update_summary = { + let table_descriptions = req + .update_table_metas + .iter() + .map(|req| (req.table_id, req.seq, req.new_table_meta.engine.clone())) + .collect::>(); + let stream_descriptions = req + .update_stream_metas + .iter() + .map(|s| (s.stream_id, s.seq, "stream")) + .collect::>(); + (table_descriptions, stream_descriptions) + }; + + let mismatched_tids = { + let ret = catalog.update_multi_table_meta(req).await; + if let Err(ref e) = ret { + // other errors may occur, especially the version mismatch of streams, + // let's log it here for the convenience of diagnostics + error!( + "Non-recoverable fault occurred during updating tables. {}", + e + ); + } + ret? + }; + match &mismatched_tids { - Ok(_) => {} + Ok(_) => { + info!( + "COMMIT: Commit explicit transaction success, targets updated {:?}", + update_summary + ); + } Err(e) => { - return Err(ErrorCode::TableVersionMismatched(format!( - "Table version mismatched in multi statement transaction, tids: {:?}", - e.iter().map(|(tid, _, _)| tid).collect::>() - ))); + let err_msg = format!( + "COMMIT: Table versions mismatched in multi statement transaction, conflict tables: {:?}", + e.iter() + .map(|(tid, seq, meta)| (tid, seq, &meta.engine)) + .collect::>() + ); + return Err(ErrorCode::TableVersionMismatched(err_msg)); } } let need_purge_files = self.ctx.txn_mgr().lock().need_purge_files(); diff --git a/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs b/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs index 51ca787854c5..08d2561742e7 100644 --- a/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs +++ b/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs @@ -17,10 +17,14 @@ use std::sync::Arc; use databend_common_base::base::tokio; use databend_common_catalog::table::Table; +use databend_common_catalog::table::TableExt; use databend_common_exception::Result; use databend_common_expression::types::number::NumberScalar; +use databend_common_expression::ColumnId; use databend_common_expression::Scalar; +use databend_common_io::prelude::borsh_deserialize_from_slice; use databend_common_storages_fuse::io::MetaReaders; +use databend_common_storages_fuse::io::MetaWriter; use databend_common_storages_fuse::statistics::reducers::merge_statistics_mut; use databend_common_storages_fuse::FuseTable; use databend_query::sessions::QueryContext; @@ -29,8 +33,12 @@ use databend_query::sql::plans::Plan; use databend_query::sql::Planner; use databend_query::test_kits::*; use databend_storages_common_cache::LoadParams; +use databend_storages_common_table_meta::meta::MetaHLL; use databend_storages_common_table_meta::meta::SegmentInfo; use databend_storages_common_table_meta::meta::Statistics; +use databend_storages_common_table_meta::meta::TableSnapshot; +use databend_storages_common_table_meta::meta::TableSnapshotStatistics; +use databend_storages_common_table_meta::meta::Versioned; #[tokio::test(flavor = "multi_thread")] async fn test_table_modify_column_ndv_statistics() -> Result<()> { @@ -179,6 +187,69 @@ async fn check_column_ndv_statistics( Ok(()) } +#[tokio::test(flavor = "multi_thread")] +async fn test_table_analyze_without_prev_table_seq() -> Result<()> { + let fixture = TestFixture::setup().await?; + let ctx = fixture.new_query_ctx().await?; + + // setup + let create_tbl_command = "create table t(c int)"; + fixture.execute_command(create_tbl_command).await?; + + append_rows(ctx.clone(), 3).await?; + let catalog = ctx.get_catalog("default").await?; + let table = catalog.get_table(&ctx.get_tenant(), "default", "t").await?; + let fuse_table = FuseTable::try_from_table(table.as_ref())?; + let location_gen = fuse_table.meta_location_generator(); + let operator = fuse_table.get_operator(); + + // genenrate snapshot without prev_table_seq + let snapshot_0 = fuse_table.read_table_snapshot().await?.unwrap(); + let snapshot_1 = TableSnapshot::from_previous(&snapshot_0, None); + let snapshot_loc_1 = location_gen + .snapshot_location_from_uuid(&snapshot_1.snapshot_id, TableSnapshot::VERSION)?; + snapshot_1.write_meta(&operator, &snapshot_loc_1).await?; + + // generate table statistics. + let col: Vec = vec![1, 3, 0, 0, 0, 118, 5, 1, 21, 6, 3, 229, 13, 3]; + let hll: HashMap = HashMap::from([(0, borsh_deserialize_from_slice(&col)?)]); + let table_statistics = TableSnapshotStatistics::new(hll, snapshot_1.snapshot_id); + let table_statistics_location = location_gen.snapshot_statistics_location_from_uuid( + &table_statistics.snapshot_id, + table_statistics.format_version(), + )?; + // genenrate snapshot without prev_table_seq + let mut snapshot_2 = TableSnapshot::from_previous(&snapshot_1, None); + snapshot_2.table_statistics_location = Some(table_statistics_location); + FuseTable::commit_to_meta_server( + fixture.new_query_ctx().await?.as_ref(), + fuse_table.get_table_info(), + location_gen, + snapshot_2, + Some(table_statistics), + &None, + &operator, + ) + .await?; + + // check statistics. + let table = table.refresh(ctx.as_ref()).await?; + let expected = HashMap::from([(0, 3_u64)]); + check_column_ndv_statistics(ctx.clone(), table.clone(), expected.clone()).await?; + + let qry = "insert into t values(4)"; + execute_command(ctx.clone(), qry).await?; + + ctx.evict_table_from_cache("default", "default", "t")?; + let statistics_sql = "analyze table default.t"; + fixture.execute_command(statistics_sql).await?; + + let table = table.refresh(ctx.as_ref()).await?; + let expected = HashMap::from([(0, 4_u64)]); + check_column_ndv_statistics(ctx.clone(), table.clone(), expected.clone()).await?; + Ok(()) +} + async fn append_rows(ctx: Arc, n: usize) -> Result<()> { for i in 0..n { let qry = format!("insert into t values({})", i); diff --git a/src/query/sql/src/evaluator/cse.rs b/src/query/sql/src/evaluator/cse.rs index dacc67e5ebce..e5a8d3b6d8a0 100644 --- a/src/query/sql/src/evaluator/cse.rs +++ b/src/query/sql/src/evaluator/cse.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use databend_common_expression::Expr; +use databend_common_functions::BUILTIN_FUNCTIONS; use log::debug; use super::BlockOperator; @@ -117,6 +118,9 @@ pub fn apply_cse( /// `count_expressions` recursively counts the occurrences of expressions in an expression tree /// and stores the count in a HashMap. fn count_expressions(expr: &Expr, counter: &mut HashMap) { + if !expr.is_deterministic(&BUILTIN_FUNCTIONS) { + return; + } match expr { Expr::FunctionCall { function, .. } if function.signature.name == "if" => {} Expr::FunctionCall { function, .. } if function.signature.name == "is_not_error" => {} diff --git a/src/query/sql/src/planner/optimizer/decorrelate/subquery_rewriter.rs b/src/query/sql/src/planner/optimizer/decorrelate/subquery_rewriter.rs index 9e99b86f2696..003101bfb5e3 100644 --- a/src/query/sql/src/planner/optimizer/decorrelate/subquery_rewriter.rs +++ b/src/query/sql/src/planner/optimizer/decorrelate/subquery_rewriter.rs @@ -168,7 +168,10 @@ impl SubqueryRewriter { )) } - RelOperator::Limit(_) | RelOperator::Sort(_) => Ok(SExpr::create_unary( + RelOperator::Limit(_) + | RelOperator::Sort(_) + | RelOperator::Udf(_) + | RelOperator::AsyncFunction(_) => Ok(SExpr::create_unary( Arc::new(s_expr.plan().clone()), Arc::new(self.rewrite(s_expr.child(0)?)?), )), @@ -176,9 +179,9 @@ impl SubqueryRewriter { RelOperator::DummyTableScan(_) | RelOperator::Scan(_) | RelOperator::CteScan(_) - | RelOperator::ConstantTableScan(_) => Ok(s_expr.clone()), - - _ => Err(ErrorCode::Internal("Invalid plan type")), + | RelOperator::ConstantTableScan(_) + | RelOperator::AddRowNumber(_) + | RelOperator::Exchange(_) => Ok(s_expr.clone()), } } diff --git a/src/query/storages/fuse/src/operations/common/processors/multi_table_insert_commit.rs b/src/query/storages/fuse/src/operations/common/processors/multi_table_insert_commit.rs index 06b514d4bd1b..eb2cd4ffe0e0 100644 --- a/src/query/storages/fuse/src/operations/common/processors/multi_table_insert_commit.rs +++ b/src/query/storages/fuse/src/operations/common/processors/multi_table_insert_commit.rs @@ -35,6 +35,8 @@ use databend_common_pipeline_sinks::AsyncSink; use databend_storages_common_table_meta::meta::TableSnapshot; use databend_storages_common_table_meta::meta::Versioned; use log::debug; +use log::error; +use log::info; use crate::operations::set_backoff; use crate::operations::AppendGenerator; @@ -96,6 +98,7 @@ impl AsyncSink for CommitMultiTableInsert { let is_active = self.ctx.txn_mgr().lock().is_active(); match is_active { true => { + // inside explicit transaction if update_table_meta_reqs.is_empty() { return Err(ErrorCode::Internal( "No table meta to update in multi table insert commit. It's a bug", @@ -107,14 +110,12 @@ impl AsyncSink for CommitMultiTableInsert { update_table_meta_reqs[0].update_stream_meta = std::mem::take(&mut self.update_stream_meta); update_table_meta_reqs[0].deduplicated_label = self.deduplicated_label.clone(); - for (req, info) in update_table_meta_reqs - .into_iter() - .zip(table_infos.into_iter()) - { + for (req, info) in update_table_meta_reqs.into_iter().zip(table_infos.iter()) { self.catalog.update_table_meta(info, req).await?; } } false => { + // auto commit let mut backoff = set_backoff(None, None, None); let mut retries = 0; @@ -125,17 +126,49 @@ impl AsyncSink for CommitMultiTableInsert { update_stream_metas: self.update_stream_meta.clone(), deduplicated_labels: self.deduplicated_label.clone().into_iter().collect(), }; - let update_meta_result = self - .catalog - .update_multi_table_meta(update_multi_table_meta_req) - .await?; + + let update_meta_result = { + let ret = self + .catalog + .update_multi_table_meta(update_multi_table_meta_req) + .await; + if let Err(ref e) = ret { + // other errors may occur, especially the version mismatch of streams, + // let's log it here for the convenience of diagnostics + error!( + "Non-recoverable fault occurred during updating tables. {}", + e + ); + } + ret? + }; + let Err(update_failed_tbls) = update_meta_result else { + let table_descriptions = self + .tables + .values() + .map(|tbl| { + let table_info = tbl.get_table_info(); + (&table_info.desc, &table_info.ident, &table_info.meta.engine) + }) + .collect::>(); + let stream_descriptions = self + .update_stream_meta + .iter() + .map(|s| (s.stream_id, s.seq, "stream")) + .collect::>(); + info!( + "update tables success (auto commit), tables updated {:?}, streams updated {:?}", + table_descriptions, stream_descriptions + ); + return Ok(()); }; - let update_failed_tbls_name: Vec = update_failed_tbls + let update_failed_tbl_descriptions: Vec<_> = update_failed_tbls .iter() - .map(|(tid, _, _)| { - self.tables.get(tid).unwrap().get_table_info().name.clone() + .map(|(tid, seq, meta)| { + let tbl_info = self.tables.get(tid).unwrap().get_table_info(); + (&tbl_info.desc, (tid, seq), &meta.engine) }) .collect(); match backoff.next_backoff() { @@ -143,8 +176,8 @@ impl AsyncSink for CommitMultiTableInsert { retries += 1; debug!( - "Failed to update tables: {:?}, the commit process of multi-table insert will be retried after {} ms, retrying {} times", - update_failed_tbls_name, + "Failed(temporarily) to update tables: {:?}, the commit process of multi-table insert will be retried after {} ms, retrying {} times", + update_failed_tbl_descriptions, duration.as_millis(), retries, ); @@ -167,14 +200,16 @@ impl AsyncSink for CommitMultiTableInsert { } } None => { - return Err(ErrorCode::OCCRetryFailure(format!( - "Can not fulfill the tx after retries({} times, {} ms), aborted. target table names {:?}", + let err_msg = format!( + "Can not fulfill the tx after retries({} times, {} ms), aborted. updated tables {:?}", retries, Instant::now() .duration_since(backoff.start_time) .as_millis(), - update_failed_tbls_name, - ))); + update_failed_tbl_descriptions, + ); + error!("{}", err_msg); + return Err(ErrorCode::OCCRetryFailure(err_msg)); } } } diff --git a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs index 313ae9f3eb06..ad26a0aef5e0 100644 --- a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs +++ b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs @@ -455,6 +455,19 @@ where F: SnapshotGenerator + Send + 'static for segment_loc in std::mem::take(&mut self.new_segment_locs).into_iter() { self.ctx.add_segment_location(segment_loc)?; } + + let target_descriptions = { + let table_info = self.table.get_table_info(); + let tbl = (&table_info.name, table_info.ident, &table_info.meta.engine); + + let stream_descriptions = self + .update_stream_meta + .iter() + .map(|s| (s.stream_id, s.seq, "stream")) + .collect::>(); + (tbl, stream_descriptions) + }; + info!("commit mutation success, targets {:?}", target_descriptions); self.state = State::Finish; } Err(e) if self.is_error_recoverable(&e) => { diff --git a/src/query/storages/stage/Cargo.toml b/src/query/storages/stage/Cargo.toml index 499ffa061e79..d6adf36c8307 100644 --- a/src/query/storages/stage/Cargo.toml +++ b/src/query/storages/stage/Cargo.toml @@ -26,7 +26,9 @@ databend-common-pipeline-transforms = { path = "../../pipeline/transforms" } databend-common-settings = { path = "../../settings" } databend-common-storage = { path = "../../../common/storage" } databend-common-storages-parquet = { path = "../parquet" } +databend-storages-common-table-meta = { path = "../common/table_meta" } +arrow-schema = { workspace = true } async-backtrace = { workspace = true } async-trait = { workspace = true } bstr = "1.9.1" @@ -36,6 +38,7 @@ enum-as-inner = "0.6.0" futures = { workspace = true } log = { workspace = true } opendal = { workspace = true } +parquet_rs = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/src/query/storages/stage/src/append/do_append.rs b/src/query/storages/stage/src/append/do_append.rs index 6ef111badf3d..9380b5f8ea1d 100644 --- a/src/query/storages/stage/src/append/do_append.rs +++ b/src/query/storages/stage/src/append/do_append.rs @@ -34,45 +34,32 @@ impl StageTable { ) -> databend_common_exception::Result<()> { let settings = ctx.get_settings(); - let single = self.table_info.stage_info.copy_options.single; - let max_file_size = if single { - usize::MAX - } else { - let max_file_size = self.table_info.stage_info.copy_options.max_file_size; - if max_file_size == 0 { - // 256M per file by default. - 256 * 1024 * 1024 - } else { - let mem_limit = (settings.get_max_memory_usage()? / 2) as usize; - max_file_size.min(mem_limit) - } - }; + let fmt = self.table_info.stage_info.file_format_params.clone(); + let mem_limit = settings.get_max_memory_usage()? as usize; let max_threads = settings.get_max_threads()? as usize; let op = StageTable::get_op(&self.table_info.stage_info)?; - let fmt = self.table_info.stage_info.file_format_params.clone(); let uuid = uuid::Uuid::new_v4().to_string(); let group_id = AtomicUsize::new(0); match fmt { FileFormatParams::Parquet(_) => append_data_to_parquet_files( pipeline, - ctx.clone(), self.table_info.clone(), op, - max_file_size, - max_threads, uuid, &group_id, + mem_limit, + max_threads, )?, _ => append_data_to_row_based_files( pipeline, ctx.clone(), self.table_info.clone(), op, - max_file_size, - max_threads, uuid, &group_id, + mem_limit, + max_threads, )?, }; if !self.table_info.stage_info.copy_options.detailed_output { diff --git a/src/query/storages/stage/src/append/parquet_file/pipeline.rs b/src/query/storages/stage/src/append/parquet_file/pipeline.rs index 5fcb87c8c9fe..c432e495ea42 100644 --- a/src/query/storages/stage/src/append/parquet_file/pipeline.rs +++ b/src/query/storages/stage/src/append/parquet_file/pipeline.rs @@ -12,54 +12,57 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use databend_common_catalog::plan::StageTableInfo; -use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; -use databend_common_formats::FileFormatOptionsExt; use databend_common_pipeline_core::Pipeline; use opendal::Operator; use super::limit_file_size_processor::LimitFileSizeProcessor; use super::writer_processor::ParquetFileWriter; -// LimitFileSizeProcessor * 1: slice/group block to batches (as a block meta) that are suitable as a file. -// ParquetFileSink * N: simply serialize blocks in each meta to a whole file and write out. +/// - LimitFileSizeProcessor * 1: slice/group block to batches (as a block meta) to avoid files being too small when there are many threads. +/// - ParquetFileSink * N: serialize incoming blocks to Vec to reduce memory, and flush when they are large enough. #[allow(clippy::too_many_arguments)] pub(crate) fn append_data_to_parquet_files( pipeline: &mut Pipeline, - ctx: Arc, table_info: StageTableInfo, op: Operator, - max_file_size: usize, - max_threads: usize, uuid: String, group_id: &std::sync::atomic::AtomicUsize, + mem_limit: usize, + max_threads: usize, ) -> Result<()> { + let is_single = table_info.stage_info.copy_options.single; + let max_file_size = table_info.stage_info.copy_options.max_file_size; + // when serializing block to parquet, the memory may be doubled + let mem_limit = mem_limit / 2; pipeline.try_resize(1)?; - pipeline.add_transform(|input, output| { - LimitFileSizeProcessor::try_create(input, output, max_file_size) - })?; - if max_file_size != usize::MAX { + let max_file_size = if is_single { + None + } else { + let max_file_size = if max_file_size == 0 { + 64 * 1024 * 1024 + } else { + max_file_size.min(mem_limit) + }; + pipeline.add_transform(|input, output| { + LimitFileSizeProcessor::try_create(input, output, max_file_size) + })?; + + let max_threads = max_threads.min(mem_limit / max_file_size).max(1); pipeline.try_resize(max_threads)?; - } + Some(max_file_size) + }; pipeline.add_transform(|input, output| { let gid = group_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - let mut options_ext = - FileFormatOptionsExt::create_from_settings(&ctx.get_settings(), false)?; - let output_format = options_ext.get_output_format( - table_info.schema(), - table_info.stage_info.file_format_params.clone(), - )?; ParquetFileWriter::try_create( input, output, table_info.clone(), - output_format, op.clone(), uuid.clone(), gid, + max_file_size, ) })?; Ok(()) diff --git a/src/query/storages/stage/src/append/parquet_file/writer_processor.rs b/src/query/storages/stage/src/append/parquet_file/writer_processor.rs index 0b54c7461931..753e0553c023 100644 --- a/src/query/storages/stage/src/append/parquet_file/writer_processor.rs +++ b/src/query/storages/stage/src/append/parquet_file/writer_processor.rs @@ -17,18 +17,24 @@ use std::collections::VecDeque; use std::mem; use std::sync::Arc; +use arrow_schema::Schema as ArrowSchema; use async_trait::async_trait; use databend_common_catalog::plan::StageTableInfo; use databend_common_exception::Result; +use databend_common_expression::converts::arrow::table_schema_to_arrow_schema; use databend_common_expression::BlockMetaInfoDowncast; use databend_common_expression::DataBlock; -use databend_common_formats::output_format::OutputFormat; use databend_common_pipeline_core::processors::Event; use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::OutputPort; use databend_common_pipeline_core::processors::Processor; use databend_common_pipeline_core::processors::ProcessorPtr; +use databend_storages_common_table_meta::table::TableCompression; use opendal::Operator; +use parquet_rs::arrow::ArrowWriter; +use parquet_rs::basic::Encoding; +use parquet_rs::file::properties::EnabledStatistics; +use parquet_rs::file::properties::WriterProperties; use super::block_batch::BlockBatch; use crate::append::output::DataSummary; @@ -40,18 +46,50 @@ pub struct ParquetFileWriter { output: Arc, table_info: StageTableInfo, - output_format: Box, + arrow_schema: Arc, - unload_output: UnloadOutput, - unload_output_blocks: Option>, - input_data: Option, - file_to_write: Option<(Vec, DataSummary)>, + input_data: Vec, + + input_bytes: usize, + row_counts: usize, + writer: ArrowWriter>, + file_to_write: Option<(Vec, DataSummary)>, data_accessor: Operator, + // the result of statement + unload_output: UnloadOutput, + unload_output_blocks: Option>, + uuid: String, group_id: usize, batch_id: usize, + + targe_file_size: Option, +} + +const MAX_BUFFER_SIZE: usize = 64 * 1024 * 1024; +// this is number of rows, not size +const MAX_ROW_GROUP_SIZE: usize = 1024 * 1024; + +fn create_writer( + arrow_schema: Arc, + targe_file_size: Option, +) -> Result>> { + let props = WriterProperties::builder() + .set_compression(TableCompression::Zstd.into()) + .set_max_row_group_size(MAX_ROW_GROUP_SIZE) + .set_encoding(Encoding::PLAIN) + .set_dictionary_enabled(false) + .set_statistics_enabled(EnabledStatistics::None) + .set_bloom_filter_enabled(false) + .build(); + let buf_size = match targe_file_size { + Some(n) if n < MAX_BUFFER_SIZE => n, + _ => MAX_BUFFER_SIZE, + }; + let writer = ArrowWriter::try_new(Vec::with_capacity(buf_size), arrow_schema, Some(props))?; + Ok(writer) } impl ParquetFileWriter { @@ -59,34 +97,61 @@ impl ParquetFileWriter { input: Arc, output: Arc, table_info: StageTableInfo, - output_format: Box, data_accessor: Operator, uuid: String, group_id: usize, + targe_file_size: Option, ) -> Result { let unload_output = UnloadOutput::create(table_info.stage_info.copy_options.detailed_output); + + let arrow_schema = Arc::new(table_schema_to_arrow_schema(&table_info.schema)); + let writer = create_writer(arrow_schema.clone(), targe_file_size)?; + Ok(ProcessorPtr::create(Box::new(ParquetFileWriter { input, output, table_info, - output_format, + arrow_schema, unload_output, unload_output_blocks: None, - input_data: None, + writer, + input_data: Vec::new(), + input_bytes: 0, file_to_write: None, data_accessor, uuid, group_id, batch_id: 0, + targe_file_size, + row_counts: 0, }))) } + pub fn reinit_writer(&mut self) -> Result<()> { + self.writer = create_writer(self.arrow_schema.clone(), self.targe_file_size)?; + self.row_counts = 0; + self.input_bytes = 0; + Ok(()) + } + + fn flush(&mut self) -> Result<()> { + _ = self.writer.finish(); + let buf = mem::take(self.writer.inner_mut()); + let output_bytes = buf.len(); + self.file_to_write = Some((buf, DataSummary { + row_counts: self.row_counts, + input_bytes: self.input_bytes, + output_bytes, + })); + self.reinit_writer()?; + Ok(()) + } } #[async_trait] impl Processor for ParquetFileWriter { fn name(&self) -> String { - "ParquetFileSink".to_string() + "ParquetFileWriter".to_string() } fn as_any(&mut self) -> &mut dyn Any { @@ -100,10 +165,13 @@ impl Processor for ParquetFileWriter { } else if self.file_to_write.is_some() { self.input.set_not_need_data(); Ok(Event::Async) - } else if self.input_data.is_some() { + } else if !self.input_data.is_empty() { self.input.set_not_need_data(); Ok(Event::Sync) } else if self.input.is_finished() { + if self.row_counts > 0 { + return Ok(Event::Sync); + } if self.unload_output.is_empty() { self.output.finish(); return Ok(Event::Finished); @@ -123,7 +191,15 @@ impl Processor for ParquetFileWriter { Ok(Event::NeedConsume) } } else if self.input.has_data() { - self.input_data = Some(self.input.pull_data().unwrap()?); + let block = self.input.pull_data().unwrap()?; + if self.targe_file_size.is_none() { + self.input_data.push(block); + } else { + let block_meta = block.get_owned_meta().unwrap(); + let blocks = BlockBatch::downcast_from(block_meta).unwrap(); + self.input_data.extend_from_slice(&blocks.blocks); + } + self.input.set_not_need_data(); Ok(Event::Sync) } else { @@ -133,23 +209,29 @@ impl Processor for ParquetFileWriter { } fn process(&mut self) -> Result<()> { - let block = self.input_data.take().unwrap(); - let block_meta = block.get_owned_meta().unwrap(); - let blocks = BlockBatch::downcast_from(block_meta).unwrap(); - let mut input_bytes = 0; - let mut row_counts = 0; - for b in blocks.blocks { - input_bytes += b.memory_size(); - row_counts += b.num_rows(); - self.output_format.serialize_block(&b)?; + while let Some(b) = self.input_data.pop() { + self.input_bytes += b.memory_size(); + self.row_counts += b.num_rows(); + let batch = b.to_record_batch(&self.table_info.schema)?; + self.writer.write(&batch)?; + + if let Some(target) = self.targe_file_size { + if self.row_counts > 0 { + // written row groups: compressed, controlled by MAX_ROW_GROUP_SIZE + let file_size = self.writer.bytes_written(); + // in_progress row group: each column leaf has an at most 1MB uncompressed buffer and multi compressed pages + // may result in small file for schema with many columns + let in_progress = self.writer.in_progress_size(); + if file_size + in_progress >= target { + self.flush()?; + return Ok(()); + } + } + } + } + if self.input.is_finished() && self.row_counts > 0 { + self.flush()?; } - let data = self.output_format.finalize()?; - let output_bytes = data.len(); - self.file_to_write = Some((data, DataSummary { - row_counts, - input_bytes, - output_bytes, - })); Ok(()) } diff --git a/src/query/storages/stage/src/append/row_based_file/pipeline.rs b/src/query/storages/stage/src/append/row_based_file/pipeline.rs index cbbe10a5469b..41a34bc2c364 100644 --- a/src/query/storages/stage/src/append/row_based_file/pipeline.rs +++ b/src/query/storages/stage/src/append/row_based_file/pipeline.rs @@ -18,6 +18,7 @@ use databend_common_catalog::plan::StageTableInfo; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_formats::FileFormatOptionsExt; +use databend_common_meta_app::principal::StageFileCompression; use databend_common_pipeline_core::Pipeline; use databend_common_pipeline_sources::input_formats::InputContext; use opendal::Operator; @@ -26,20 +27,39 @@ use super::limit_file_size_processor::LimitFileSizeProcessor; use super::serialize_processor::SerializeProcessor; use super::writer_processor::RowBasedFileWriter; -// SerializeProcessor * N: serialize each data block to many small byte buffers. -// LimitFileSizeProcessor * 1: group small byte buffers to batches (as a block meta) that are large enough as a file. -// RowBasedFileSink * N: simply concat small byte buffers to a whole and write out. +/// SerializeProcessor * N: serialize each data block to many small byte buffers. +/// LimitFileSizeProcessor * 1: group small byte buffers to batches (as a block meta) that are large enough as a file. +/// RowBasedFileSink * N: simply concat small byte buffers to a whole and write out. #[allow(clippy::too_many_arguments)] pub(crate) fn append_data_to_row_based_files( pipeline: &mut Pipeline, ctx: Arc, table_info: StageTableInfo, op: Operator, - max_file_size: usize, - max_threads: usize, uuid: String, group_id: &std::sync::atomic::AtomicUsize, + mem_limit: usize, + max_threads: usize, ) -> Result<()> { + let is_single = table_info.stage_info.copy_options.single; + let max_file_size = table_info.stage_info.copy_options.max_file_size; + let compression = table_info.stage_info.file_format_params.compression(); + // when serializing block to parquet, the memory may be doubled + let mem_limit = mem_limit / 2; + let max_file_size = if is_single { + usize::MAX + } else if max_file_size == 0 { + if compression == StageFileCompression::None { + 16 * 1024 * 1024 + } else { + 64 * 1024 * 1024 + } + } else { + max_file_size.min(mem_limit) + }; + + let max_threads = max_threads.min(mem_limit / max_file_size).max(1); + let mut options_ext = FileFormatOptionsExt::create_from_settings(&ctx.get_settings(), false)?; let output_format = options_ext.get_output_format( table_info.schema(), diff --git a/tests/sqllogictests/suites/udf_server/udf_server_test.test b/tests/sqllogictests/suites/udf_server/udf_server_test.test index 5649306a6795..46757934a18e 100644 --- a/tests/sqllogictests/suites/udf_server/udf_server_test.test +++ b/tests/sqllogictests/suites/udf_server/udf_server_test.test @@ -170,6 +170,12 @@ NULL 6 NULL +query I +SELECT gcd(a,b) d from (select number + 1 a, a * 2 b from numbers(3) where number in (select * from numbers(300))) order by d; +---- +1 +2 +3 statement ok diff --git a/tests/suites/1_stateful/00_stage/00_0001_copy_into_stage.sh b/tests/suites/1_stateful/00_stage/00_0001_copy_into_stage.sh index c849cb772dc9..2bdcece326be 100755 --- a/tests/suites/1_stateful/00_stage/00_0001_copy_into_stage.sh +++ b/tests/suites/1_stateful/00_stage/00_0001_copy_into_stage.sh @@ -33,5 +33,3 @@ fi echo "drop STAGE s2;" | $BENDSQL_CLIENT_CONNECT echo "drop table test_table;" | $BENDSQL_CLIENT_CONNECT -aws --endpoint-url http://127.0.0.1:9900/ s3 rm s3://testbucket/admin/stage/s2 --recursive > /dev/null 2>&1 - diff --git a/tests/suites/1_stateful/00_stage/00_0007_copy_into_stage2.result b/tests/suites/1_stateful/00_stage/00_0007_copy_into_stage2.result index e7ecd758a226..6c164cc0e466 100644 --- a/tests/suites/1_stateful/00_stage/00_0007_copy_into_stage2.result +++ b/tests/suites/1_stateful/00_stage/00_0007_copy_into_stage2.result @@ -14,28 +14,8 @@ ---csv_20 5 20 -20 450 800 ----parq -1 -20 -20 450 800 ----parq_single -1 -20 -20 450 7700 ----parq_40 -10 -20 -20 450 3980 ----parq_80 -5 -20 big.csv 2000 0 NULL NULL 2000 8893 8893 ---csv_big_20 2 2000 -2000 8250 4465 ----parq_big_80 -1 -2000 diff --git a/tests/suites/1_stateful/00_stage/00_0007_copy_into_stage2.sh b/tests/suites/1_stateful/00_stage/00_0007_copy_into_stage2.sh index c33c8fe38f1b..f63cf2860276 100755 --- a/tests/suites/1_stateful/00_stage/00_0007_copy_into_stage2.sh +++ b/tests/suites/1_stateful/00_stage/00_0007_copy_into_stage2.sh @@ -41,23 +41,6 @@ check_csv "csv_10" echo "copy into @s1/csv_20 from test_table FILE_FORMAT = (type = CSV) MAX_FILE_SIZE = 20;" | $BENDSQL_CLIENT_CONNECT check_csv "csv_20" -check_parq() { - echo "---${1}" - ls "$STAGE_DIR"/${1} | wc -l | sed 's/ //g' - echo "select count(*) from @s1/${1}/ (FILE_FORMAT => 'PARQUET');" | $BENDSQL_CLIENT_CONNECT -} - -# each block memory size is 42 -echo "copy into @s1/parq from test_table FILE_FORMAT = (type = PARQUET);" | $BENDSQL_CLIENT_CONNECT -check_parq "parq" -echo "copy into @s1/parq_single from test_table FILE_FORMAT = (type = PARQUET) single=true;" | $BENDSQL_CLIENT_CONNECT -check_parq "parq_single" -echo "copy into @s1/parq_40 from test_table FILE_FORMAT = (type = PARQUET) MAX_FILE_SIZE = 40;" | $BENDSQL_CLIENT_CONNECT -check_parq "parq_40" -echo "copy into @s1/parq_80 from test_table FILE_FORMAT = (type = PARQUET) MAX_FILE_SIZE = 80;" | $BENDSQL_CLIENT_CONNECT -check_parq "parq_80" - -# test big block echo "drop table if exists t1;" | $BENDSQL_CLIENT_CONNECT echo "create table t1 (a int);" | $BENDSQL_CLIENT_CONNECT @@ -67,6 +50,4 @@ for i in $(seq 1 2000);do done echo "copy into t1 from @s1/big.csv FILE_FORMAT = (type = CSV);" | $BENDSQL_CLIENT_CONNECT echo "copy into @s1/csv_big_20 from t1 FILE_FORMAT = (type = CSV) MAX_FILE_SIZE = 20;" | $BENDSQL_CLIENT_CONNECT -check_csv "csv_big_20" -echo "copy into @s1/parq_big_80 from t1 FILE_FORMAT = (type = PARQUET) MAX_FILE_SIZE = 80;" | $BENDSQL_CLIENT_CONNECT -check_parq "parq_big_80" \ No newline at end of file +check_csv "csv_big_20" \ No newline at end of file diff --git a/tests/suites/1_stateful/00_stage/00_0017_copy_into_parquet.result b/tests/suites/1_stateful/00_stage/00_0017_copy_into_parquet.result new file mode 100755 index 000000000000..0bbad3668ea7 --- /dev/null +++ b/tests/suites/1_stateful/00_stage/00_0017_copy_into_parquet.result @@ -0,0 +1,8 @@ +>>>> drop stage if exists s1; +>>>> create stage s1; +1 +2 +1 +1 +2 +>>>> drop stage if exists s1; diff --git a/tests/suites/1_stateful/00_stage/00_0017_copy_into_parquet.sh b/tests/suites/1_stateful/00_stage/00_0017_copy_into_parquet.sh new file mode 100755 index 000000000000..3d032ab8b90d --- /dev/null +++ b/tests/suites/1_stateful/00_stage/00_0017_copy_into_parquet.sh @@ -0,0 +1,25 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. "$CURDIR"/../../../shell_env.sh + +stmt "drop stage if exists s1;" +stmt "create stage s1;" + +# one file when #row is small even though multi-threads +echo "copy into @s1/ from (select * from numbers(6000000)) max_file_size=64000000 detailed_output=true" | $BENDSQL_CLIENT_CONNECT | wc -l | sed 's/ //g' + +# two files, the larger is about 63569025 +echo "copy /*+ set_var(max_threads=1) */ into @s1/ from (select * from numbers(70000000)) max_file_size=64000000 detailed_output=true;" | $BENDSQL_CLIENT_CONNECT | wc -l | sed 's/ //g' + +# one file +echo "copy /*+ set_var(max_threads=1) */ into @s1/ from (select * from numbers(60000000)) max_file_size=64000000 detailed_output=true;" | $BENDSQL_CLIENT_CONNECT | wc -l | sed 's/ //g' + +# one files, limit threads by memory +echo "copy /*+ set_var(max_threads=4) set_var(max_memory_usage=128000000) */ into @s1/ from (select * from numbers(60000000)) max_file_size=64000000 detailed_output=true;" | $BENDSQL_CLIENT_CONNECT | wc -l | sed 's/ //g' + +# two files, limit threads by memory +# copy /*+ set_var(max_threads=4) set_var(max_memory_usage=256000000) */ not working in cluster mode +echo "set max_threads=4; set max_memory_usage=256000000; copy /*+ set_var(max_threads=4) set_var(max_memory_usage=256000000) */ into @s1/ from (select * from numbers(60000000)) max_file_size=64000000 detailed_output=true;" | $BENDSQL_CLIENT_CONNECT | wc -l | sed 's/ //g' + +stmt "drop stage if exists s1;"