diff --git a/Cargo.lock b/Cargo.lock index 863f0f8..ad29808 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -24,6 +24,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if", + "const-random", "getrandom", "once_cell", "version_check", @@ -39,6 +40,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "anes" version = "0.1.6" @@ -93,6 +109,230 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "arrow" +version = "51.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "219d05930b81663fd3b32e3bde8ce5bff3c4d23052a99f11a8fa50a3b47b2658" +dependencies = [ + "arrow-arith", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-csv", + "arrow-data", + "arrow-ipc", + "arrow-json", + "arrow-ord", + "arrow-row", + "arrow-schema", + "arrow-select", + "arrow-string", +] + +[[package]] +name = "arrow-arith" +version = "51.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0272150200c07a86a390be651abdd320a2d12e84535f0837566ca87ecd8f95e0" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "half", + "num", +] + +[[package]] +name = "arrow-array" +version = "51.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8010572cf8c745e242d1b632bd97bd6d4f40fefed5ed1290a8f433abaa686fea" +dependencies = [ + "ahash", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "half", + "hashbrown", + "num", +] + +[[package]] +name = "arrow-buffer" +version = "51.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d0a2432f0cba5692bf4cb757469c66791394bac9ec7ce63c1afe74744c37b27" +dependencies = [ + "bytes", + "half", + "num", +] + +[[package]] +name = "arrow-cast" +version = "51.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9abc10cd7995e83505cc290df9384d6e5412b207b79ce6bdff89a10505ed2cba" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "atoi", + "base64", + "chrono", + "half", + "lexical-core", + "num", + "ryu", +] + +[[package]] +name = "arrow-csv" +version = "51.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95cbcba196b862270bf2a5edb75927380a7f3a163622c61d40cbba416a6305f2" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-schema", + "chrono", + "csv", + "csv-core", + "lazy_static", + "lexical-core", + "regex", +] + +[[package]] +name = "arrow-data" +version = "51.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2742ac1f6650696ab08c88f6dd3f0eb68ce10f8c253958a18c943a68cd04aec5" +dependencies = [ + "arrow-buffer", + "arrow-schema", + "half", + "num", +] + +[[package]] +name = "arrow-ipc" +version = "51.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a42ea853130f7e78b9b9d178cb4cd01dee0f78e64d96c2949dc0a915d6d9e19d" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-schema", + "flatbuffers", +] + +[[package]] +name = "arrow-json" +version = "51.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eaafb5714d4e59feae964714d724f880511500e3569cc2a94d02456b403a2a49" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-schema", + "chrono", + "half", + "indexmap", + "lexical-core", + "num", + "serde", + "serde_json", +] + +[[package]] +name = "arrow-ord" +version = "51.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3e6b61e3dc468f503181dccc2fc705bdcc5f2f146755fa5b56d0a6c5943f412" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "half", + "num", +] + +[[package]] +name = "arrow-row" +version = "51.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "848ee52bb92eb459b811fb471175ea3afcf620157674c8794f539838920f9228" +dependencies = [ + "ahash", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "half", + "hashbrown", +] + +[[package]] +name = "arrow-schema" +version = "51.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02d9483aaabe910c4781153ae1b6ae0393f72d9ef757d38d09d450070cf2e528" + +[[package]] +name = "arrow-select" +version = "51.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "849524fa70e0e3c5ab58394c770cb8f514d0122d20de08475f7b472ed8075830" +dependencies = [ + "ahash", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "num", +] + +[[package]] +name = "arrow-string" +version = "51.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9373cb5a021aee58863498c37eb484998ef13377f69989c6c5ccfbd258236cdb" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "memchr", + "num", + "regex", + "regex-syntax", +] + +[[package]] +name = "atoi" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" +dependencies = [ + "num-traits", +] + [[package]] name = "autocfg" version = "1.2.0" @@ -186,6 +426,18 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.38" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "num-traits", + "windows-targets 0.52.5", +] + [[package]] name = "ciborium" version = "0.2.2" @@ -270,6 +522,32 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +[[package]] +name = "const-random" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" +dependencies = [ + "const-random-macro", +] + +[[package]] +name = "const-random-macro" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" +dependencies = [ + "getrandom", + "once_cell", + "tiny-keccak", +] + +[[package]] +name = "core-foundation-sys" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" + [[package]] name = "cpufeatures" version = "0.2.12" @@ -478,6 +756,16 @@ version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "658bd65b1cf4c852a3cc96f18a8ce7b5640f6b703f905c7d74532294c2a63984" +[[package]] +name = "flatbuffers" +version = "23.5.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dac53e22462d78c16d64a1cd22371b54cc3fe94aa15e7886a2fa6e5d1ab8640" +dependencies = [ + "bitflags 1.3.2", + "rustc_version", +] + [[package]] name = "flume" version = "0.11.0" @@ -631,6 +919,7 @@ checksum = "6dd08c532ae367adf81c312a4580bc67f1d0fe8bc9c460520283f4c0ff277888" dependencies = [ "cfg-if", "crunchy", + "num-traits", ] [[package]] @@ -657,6 +946,29 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" +[[package]] +name = "iana-time-zone" +version = "0.1.60" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7ffbb5a1b541ea2561f8c41c087286cc091e21e556a4f09a8f6cbf17b69b141" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "indexmap" version = "2.2.6" @@ -742,12 +1054,82 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +[[package]] +name = "lexical-core" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2cde5de06e8d4c2faabc400238f9ae1c74d5412d03a7bd067645ccbc47070e46" +dependencies = [ + "lexical-parse-float", + "lexical-parse-integer", + "lexical-util", + "lexical-write-float", + "lexical-write-integer", +] + +[[package]] +name = "lexical-parse-float" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683b3a5ebd0130b8fb52ba0bdc718cc56815b6a097e28ae5a6997d0ad17dc05f" +dependencies = [ + "lexical-parse-integer", + "lexical-util", + "static_assertions", +] + +[[package]] +name = "lexical-parse-integer" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d0994485ed0c312f6d965766754ea177d07f9c00c9b82a5ee62ed5b47945ee9" +dependencies = [ + "lexical-util", + "static_assertions", +] + +[[package]] +name = "lexical-util" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5255b9ff16ff898710eb9eb63cb39248ea8a5bb036bea8085b1a767ff6c4e3fc" +dependencies = [ + "static_assertions", +] + +[[package]] +name = "lexical-write-float" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accabaa1c4581f05a3923d1b4cfd124c329352288b7b9da09e766b0668116862" +dependencies = [ + "lexical-util", + "lexical-write-integer", + "static_assertions", +] + +[[package]] +name = "lexical-write-integer" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1b6f3d1f4422866b68192d62f77bc5c700bee84f3069f2469d7bc8c77852446" +dependencies = [ + "lexical-util", + "static_assertions", +] + [[package]] name = "libc" version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" +[[package]] +name = "libm" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" + [[package]] name = "libmimalloc-sys" version = "0.1.35" @@ -882,13 +1264,78 @@ dependencies = [ "winapi", ] +[[package]] +name = "num" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35bd024e8b2ff75562e5f34e7f4905839deb4b22955ef5e73d2fea1b9813cb23" +dependencies = [ + "num-bigint", + "num-complex", + "num-integer", + "num-iter", + "num-rational", + "num-traits", +] + +[[package]] +name = "num-bigint" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c165a9ab64cf766f73521c0dd2cfdff64f488b8f0b3e621face3462d3db536d7" +dependencies = [ + "num-integer", + "num-traits", +] + +[[package]] +name = "num-complex" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73f88a1307638156682bada9d7604135552957b7818057dcef22705b4d509495" +dependencies = [ + "num-traits", +] + +[[package]] +name = "num-integer" +version = "0.1.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" +dependencies = [ + "num-traits", +] + +[[package]] +name = "num-iter" +version = "0.1.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1429034a0490724d0075ebb2bc9e875d6503c3cf69e235a8941aa757d83ef5bf" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-rational" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f83d14da390562dca69fc84082e73e548e1ad308d24accdedd2720017cb37824" +dependencies = [ + "num-bigint", + "num-integer", + "num-traits", +] + [[package]] name = "num-traits" -version = "0.2.18" +version = "0.2.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da0df0e5185db44f69b44f26786fe401b6c293d1907744beaa7fa62b2e5a517a" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" dependencies = [ "autocfg", + "libm", ] [[package]] @@ -1171,6 +1618,7 @@ checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56" name = "renoir" version = "0.2.0" dependencies = [ + "arrow", "base64", "bincode", "clap", @@ -1221,6 +1669,15 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "rustc_version" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +dependencies = [ + "semver", +] + [[package]] name = "rustix" version = "0.38.32" @@ -1255,6 +1712,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "semver" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" + [[package]] name = "serde" version = "1.0.197" @@ -1443,6 +1906,15 @@ dependencies = [ "once_cell", ] +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + [[package]] name = "tinytemplate" version = "1.2.1" @@ -1752,6 +2224,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets 0.52.5", +] + [[package]] name = "windows-sys" version = "0.48.0" diff --git a/Cargo.toml b/Cargo.toml index 9656dd1..7e1364c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,6 +78,7 @@ indexmap = "2.2.6" tracing = { version = "0.1.40", features = ["log"] } quick_cache = "0.5.1" dashmap = "5.5.3" +arrow = "51.0.0" [dev-dependencies] # for the tests diff --git a/src/operator/window/aggr/collect.rs b/src/operator/window/aggr/collect.rs index ed04068..1e1aa9a 100644 --- a/src/operator/window/aggr/collect.rs +++ b/src/operator/window/aggr/collect.rs @@ -1,25 +1,33 @@ /// TODO - - use super::super::*; use crate::operator::{Data, DataKey, Operator}; use crate::stream::{KeyedStream, WindowedStream}; -#[derive(Clone)] -struct CollectVec +struct CollectTo where - F: Fn(Vec) -> O, + O: Extend + Default, { - vec: Vec, - f: F, - _o: PhantomData, + collection: O, + _i: PhantomData, +} + +impl Clone for CollectTo +where + O: Extend + Default, +{ + fn clone(&self) -> Self { + Self { + collection: Default::default(), + _i: PhantomData, + } + } } -impl WindowAccumulator for CollectVec +impl WindowAccumulator for CollectTo where - F: Fn(Vec) -> O + Send + Clone + 'static, + O:, I: Clone + Send + 'static, - O: Clone + Send + 'static, + O: Extend + Default + Clone+ Send + 'static, { type In = I; @@ -27,32 +35,29 @@ where #[inline] fn process(&mut self, el: Self::In) { - self.vec.push(el); + self.collection.extend(std::iter::once(el)); } #[inline] fn output(self) -> Self::Out { - (self.f)(self.vec) + self.collection } } -impl WindowedStream +impl WindowedStream where - WindowDescr: WindowDescription, - OperatorChain: Operator> + 'static, + WindowDescr: WindowDescription, + OperatorChain: Operator + 'static, Key: DataKey, Out: Data + Ord, { - /// Prefer other aggregators if possible as they don't save all elements - pub fn map) -> NewOut + Send + Clone + 'static>( + pub fn collect_to + Default + Clone + Send + 'static>( self, - f: F, - ) -> KeyedStream>> { - let acc = CollectVec:: { - vec: Default::default(), - f, - _o: PhantomData, + ) -> KeyedStream> { + let acc = CollectTo:: { + collection: Default::default(), + _i: PhantomData, }; - self.add_window_operator("WindowMap", acc) + self.add_window_operator("WindowCollect", acc) } } diff --git a/src/operator/window/aggr/collect_vec.rs b/src/operator/window/aggr/collect_vec.rs index 274aabf..ab4e518 100644 --- a/src/operator/window/aggr/collect_vec.rs +++ b/src/operator/window/aggr/collect_vec.rs @@ -21,6 +21,7 @@ where type In = I; type Out = O; + #[inline] fn process(&mut self, el: Self::In) { @@ -31,6 +32,11 @@ where fn output(self) -> Self::Out { (self.f)(self.vec) } + + #[inline] + fn size_hint(&mut self, size: usize) { + self.vec.reserve(size); + } } impl WindowedStream @@ -59,6 +65,6 @@ where f: |v| v, _o: PhantomData, }; - self.add_window_operator("WindowMap", acc) + self.add_window_operator("WindowCollectVec", acc) } } diff --git a/src/operator/window/aggr/mod.rs b/src/operator/window/aggr/mod.rs index 1793745..f4c6693 100644 --- a/src/operator/window/aggr/mod.rs +++ b/src/operator/window/aggr/mod.rs @@ -2,6 +2,7 @@ mod fold; pub(super) use fold::{Fold, FoldFirst}; mod collect_vec; +mod collect; mod count; mod join; mod max; diff --git a/src/operator/window/descr/count.rs b/src/operator/window/descr/count.rs index f40b80d..f8138a3 100644 --- a/src/operator/window/descr/count.rs +++ b/src/operator/window/descr/count.rs @@ -142,8 +142,10 @@ impl WindowDescription for CountWindow { #[inline] fn build>(&self, accumulator: A) -> Self::Manager { + let mut init = accumulator; + init.size_hint(self.size); CountWindowManager { - init: accumulator, + init, size: self.size, slide: self.slide, exact: self.exact, diff --git a/src/operator/window/descr/event_time.rs b/src/operator/window/descr/event_time.rs index e85b32d..5702055 100644 --- a/src/operator/window/descr/event_time.rs +++ b/src/operator/window/descr/event_time.rs @@ -3,7 +3,7 @@ use std::collections::VecDeque; use super::super::*; use crate::operator::{Data, StreamElement, Timestamp}; -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct EventTimeWindowManager where A: WindowAccumulator, @@ -14,6 +14,22 @@ where last_watermark: Option, ws: VecDeque>, } + +impl Clone for EventTimeWindowManager +where + A: WindowAccumulator, +{ + fn clone(&self) -> Self { + Self { + init: self.init.clone(), + size: self.size.clone(), + slide: self.slide.clone(), + last_watermark: self.last_watermark.clone(), + ws: self.ws.clone(), + } + } +} + impl EventTimeWindowManager { fn alloc_windows(&mut self, ts: Timestamp) { assert!(self.last_watermark.map(|w| ts >= w).unwrap_or(true)); diff --git a/src/operator/window/mod.rs b/src/operator/window/mod.rs index f102734..7bc38c2 100644 --- a/src/operator/window/mod.rs +++ b/src/operator/window/mod.rs @@ -34,9 +34,11 @@ pub trait WindowDescription { /// /// Convention: output will always be called after at least one element has been processed pub trait WindowAccumulator: Clone + Send + 'static { - type In: Data; - type Out: Data; + type In: Clone + Send + 'static; + type Out: Clone + Send + 'static; + #[allow(unused)] + fn size_hint(&mut self, size: usize) {} /// Process a single input element updating the state of the accumulator fn process(&mut self, el: Self::In); /// Finalize the accumulator and produce a result