Skip to content

Commit

Permalink
Clean up for version bump, add all window
Browse files Browse the repository at this point in the history
  • Loading branch information
imDema committed Jan 24, 2025
1 parent 7fa24c3 commit 58c5aa4
Show file tree
Hide file tree
Showing 12 changed files with 339 additions and 201 deletions.
359 changes: 182 additions & 177 deletions Cargo.lock

Large diffs are not rendered by default.

28 changes: 14 additions & 14 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "renoir"
description = "Reactive Network of Operators In Rust"
version = "0.3.0"
version = "0.4.0"
edition = "2021"
authors = [
"Luca De Martini <[email protected]>",
Expand All @@ -26,7 +26,7 @@ rdkafka = ["dep:rdkafka", "tokio"]

[dependencies]
# for logging to the console
log = { version = "0.4.22", features = ["release_max_level_info"] }
log = { version = "0.4.25", features = ["release_max_level_info"] }

# used by the network for storing type-generic structures
typemap_rev = "0.3.0"
Expand All @@ -38,12 +38,12 @@ nanorand = "0.7.0"
derivative = "2.2.0"

# serialization
serde = { version = "1.0.215", features = ["derive"] }
serde_json = "1.0.133"
serde = { version = "1.0.217", features = ["derive"] }
serde_json = "1.0.137"
bincode = "1.3.3"
toml = "0.8.19"

thiserror = "2.0.3"
thiserror = "2.0.11"

# handy iterators functions

Expand All @@ -54,7 +54,7 @@ once_cell = "1.20.2"
ssh2 = { version = "0.9.4", optional = true }
whoami = { version = "1.5.2", optional = true }
shell-escape = { version = "0.1.5", optional = true }
clap = { version = "4.5.21", features = ["derive"], optional = true }
clap = { version = "4.5.27", features = ["derive"], optional = true }
sha2 = { version = "0.10.8", optional = true }
base64 = { version = "0.22.1", optional = true }

Expand All @@ -68,34 +68,34 @@ csv = "1.3.1"
lazy-init = "0.5.1"

# Faster monotonic clock using libc's CLOCK_MONOTONIC_COARSE
coarsetime = "0.1.34"
coarsetime = "0.1.35"

tokio = { version = "1.41.1", features = ["rt"], default-features = false, optional = true }
tokio = { version = "1.43.0", features = ["rt"], default-features = false, optional = true }
futures = { version = "0.3.31", optional = true }

parking_lot = "0.12.3"

wyhash = "0.5.0"
fxhash = "0.2.1"
glidesort = "0.1.2"
indexmap = "2.7.0"
indexmap = "2.7.1"
tracing = { version = "0.1.41", features = ["log"] }
quick_cache = "0.6.9"
dashmap = "6.1.0"
dyn-clone = "1.0.17"

apache-avro = { version = "0.17.0", features = ["derive"], optional = true }
parquet = { version = "53.3.0", optional = true }
arrow = { version = "53.3.0", optional = true }
parquet = { version = "53.4.0", optional = true }
arrow = { version = "53.4.0", optional = true }
rdkafka = { version = "0.37.0", optional = true }

[dev-dependencies]
# for the tests
env_logger = "0.11.5"
env_logger = "0.11.6"
rand = { version = "0.8.5", features = ["small_rng"] }
tempfile = "3.14.0"
tempfile = "3.15.0"
criterion = { version = "0.5.1", features = ["html_reports"] }
fake = "3.0.1"
fake = "3.1.0"
mimalloc = { version = "0.1.43", default-features = false }
tracing-subscriber = "0.3.19"
itertools = "0.13.0"
Expand Down
2 changes: 2 additions & 0 deletions examples/kafka.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// WARNING: KAFKA API IS EXPERIMENTAL

use clap::Parser;
use rdkafka::{config::RDKafkaLogLevel, ClientConfig, Message};
use renoir::prelude::*;
Expand Down
2 changes: 2 additions & 0 deletions examples/kafka_wordcount.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// WARNING: KAFKA API IS EXPERIMENTAL

use std::collections::HashMap;
use std::hash::BuildHasherDefault;
use std::str::FromStr;
Expand Down
1 change: 0 additions & 1 deletion src/operator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2820,7 +2820,6 @@ where
/// let s = env.stream_iter(0..5);
/// let res = s.shuffle();
/// ```
pub fn shuffle(self) -> Stream<impl Operator<Out = Op::Out>> {
self.0.split_block(End::new, NextStrategy::random())
}
Expand Down
1 change: 1 addition & 0 deletions src/operator/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ impl<Op> Stream<Op>
where
Op: Operator<Out: AsRef<[u8]>> + 'static,
{
/// # WARNING: KAFKA API IS EXPERIMENTAL
pub fn write_kafka(self, producer_config: ClientConfig, topic: &str) {
let producer = producer_config
.create::<FutureProducer>()
Expand Down
21 changes: 17 additions & 4 deletions src/operator/source/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,14 @@ impl Clone for KafkaSourceInner {
}
}

/// Source that consumes an iterator and emits all its elements into the stream.
/// # WARNING: KAFKA API IS EXPERIMENTAL
///
/// The iterator will be consumed **only from one replica**, therefore this source is not parallel.
/// If replication is greater than `Replication::One` and timestamping logic
/// is being used, ensure that the number of kafka partitions receiving events
/// is greater than the number of replicas. Otherwise, watermarks may not be generated
/// stalling the computation. To solve this, reduce the replication.
///
/// TODO: address this
#[derive(Derivative)]
#[derivative(Debug)]
pub struct KafkaSource {
Expand Down Expand Up @@ -139,8 +144,7 @@ impl Operator for KafkaSource {
Err(flume::RecvTimeoutError::Disconnected) => {
tracing::warn!("kafka background task disconnected.");
StreamElement::Terminate
}
// StreamElement::Terminate,
} // StreamElement::Terminate,
}
}
}
Expand Down Expand Up @@ -186,6 +190,15 @@ impl crate::StreamContext {
/// Convenience method, creates a `KafkaSource` and makes a stream using `StreamContext::stream`
///
/// See Examples
///
/// # WARNING: KAFKA API IS EXPERIMENTAL
///
/// If replication is greater than `Replication::One` and timestamping logic
/// is being used, ensure that the number of kafka partitions receiving events
/// is greater than the number of replicas. Otherwise, watermarks may not be generated
/// stalling the computation. To solve this, reduce the replication.
///
/// TODO: address this
pub fn stream_kafka(
&self,
client_config: ClientConfig,
Expand Down
113 changes: 113 additions & 0 deletions src/operator/window/descr/all.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
//! The types related to the windowed streams.
use crate::operator::{Data, StreamElement, Timestamp};

use super::super::*;

#[derive(Clone)]
pub struct AllWindowManager<A> {
init: A,
accumulator: Option<A>,
tsmin: Option<Timestamp>,
}

impl<A: WindowAccumulator> WindowManager for AllWindowManager<A>
where
A::In: Data,
A::Out: Data,
{
type In = A::In;
type Out = A::Out;
type Output = Option<WindowResult<A::Out>>;

#[inline]
fn process(&mut self, el: StreamElement<A::In>) -> Self::Output {
let ts = el.timestamp().cloned();
self.tsmin = match (ts, self.tsmin.take()) {
(Some(a), Some(b)) => Some(a.min(b)),
(Some(a), None) | (None, Some(a)) => Some(a),
(None, None) => None,
};
match el {
StreamElement::Item(item) | StreamElement::Timestamped(item, _) => {
self.accumulator
.get_or_insert_with(|| self.init.clone())
.process(item);
None
}
StreamElement::FlushAndRestart => Some(WindowResult::new(
self.accumulator.take().unwrap().output(),
self.tsmin.take(),
)),
StreamElement::Terminate => self
.accumulator
.take()
.map(|a| WindowResult::new(a.output(), self.tsmin.take())),
_ => None,
}
}
}

/// Window of fixed count of elements
#[derive(Clone, Default)]
pub struct AllWindow;

impl AllWindow {
/// Windows that contains all elements until the stream is flushed and restarted or terminated.
#[inline]
pub fn new() -> Self {
Self
}
}

impl<T: Data> WindowDescription<T> for AllWindow {
type Manager<A: WindowAccumulator<In = T>> = AllWindowManager<A>;

#[inline]
fn build<A: WindowAccumulator<In = T>>(&self, accumulator: A) -> Self::Manager<A> {
AllWindowManager {
init: accumulator,
accumulator: None,
tsmin: None,
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::operator::window::aggr::Fold;

macro_rules! check_return {
($ret:expr, $v:expr) => {{
let mut ia = $ret.into_iter();
let mut ib = $v.into_iter();
loop {
let (a, b) = (ia.next(), ib.next());
assert_eq!(a, b);

if let (None, None) = (a, b) {
break;
}
}
}};
}

#[test]
fn all_window() {
let window = AllWindow::new();

let fold: Fold<isize, Vec<isize>, _> = Fold::new(Vec::new(), |v, el| v.push(el));
let mut manager = window.build(fold);

for i in 1..100 {
check_return!(manager.process(StreamElement::Item(i)), None);
}

check_return!(
manager.process(StreamElement::FlushAndRestart),
Some(WindowResult::Item((1..100).collect()))
);
check_return!(manager.process(StreamElement::Terminate), None);
}
}
2 changes: 1 addition & 1 deletion src/operator/window/descr/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ where
let ts = el.timestamp().cloned();
match el {
StreamElement::Item(item) | StreamElement::Timestamped(item, _) => {
while self.ws.len() < (self.size + self.slide - 1) / self.slide {
while self.ws.len() < self.size.div_ceil(self.slide) {
self.ws.push_back(Slot::new(self.init.clone()))
}
let k = self.ws.front().unwrap().count / self.slide + 1; // TODO: Check
Expand Down
3 changes: 3 additions & 0 deletions src/operator/window/descr/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
mod count;
pub use count::CountWindow;

mod all;
pub use all::AllWindow;

#[cfg(feature = "timestamp")]
mod event_time;
#[cfg(feature = "timestamp")]
Expand Down
4 changes: 2 additions & 2 deletions src/profiler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ mod without_profiler {
use crate::profiler::*;

/// The fake profiler for when the `profiler` feature is disabled.
// static PROFILER: UnsafeCell<NoOpProfiler> = UnsafeCell::new(NoOpProfiler);

/// static PROFILER: UnsafeCell<NoOpProfiler> = UnsafeCell::new(NoOpProfiler);
///
/// Fake profiler. This is used when the `profiler` feature is not enabled.
///
/// This struct MUST NOT contain any field and must do absolutely nothing since it is accessed
Expand Down
4 changes: 2 additions & 2 deletions tests/parallel_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ mod utils;
#[test]
fn parallel_iterator() {
TestHelper::local_remote_env(|env| {
let n = 100;
let n = 100u64;
let source = ParallelIteratorSource::new(move |id, instances| {
let chunk_size = (n + instances - 1) / instances;
let chunk_size = n.div_ceil(instances);
let remaining = n - n.min(chunk_size * id);
let range = remaining.min(chunk_size);

Expand Down

0 comments on commit 58c5aa4

Please sign in to comment.