Skip to content

Commit

Permalink
feat(stream): add kafka backill executor
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Dec 24, 2023
1 parent 96e7ea8 commit a7da39f
Show file tree
Hide file tree
Showing 9 changed files with 988 additions and 1 deletion.
5 changes: 5 additions & 0 deletions src/common/src/array/data_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ impl DataChunk {
self.visibility.next_set_bit(row_idx)
}

/// Return the prev visible row index ~~on or~~ before `row_idx`.
pub fn prev_visible_row_idx(&self, row_idx: usize) -> Option<usize> {
self.visibility.next_set_bit(row_idx)
}

pub fn into_parts(self) -> (Vec<ArrayRef>, Bitmap) {
(self.columns.to_vec(), self.visibility)
}
Expand Down
18 changes: 18 additions & 0 deletions src/common/src/array/data_chunk_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,24 @@ impl<'a> Iterator for DataChunkRefIter<'a> {
}
}

impl<'a> DoubleEndedIterator for DataChunkRefIter<'a> {
fn next_back(&mut self) -> Option<Self::Item> {
if self.idx.start == self.idx.end {
return None;
}
match self.chunk.prev_visible_row_idx(self.idx.end) {
Some(idx) if idx >= self.idx.start => {
self.idx.end = idx;
Some(RowRef::new(self.chunk, idx))
}
_ => {
self.idx.end = self.idx.start;
None
}
}
}
}

impl<'a> FusedIterator for DataChunkRefIter<'a> {}

pub struct DataChunkRefIterWithHoles<'a> {
Expand Down
6 changes: 6 additions & 0 deletions src/common/src/array/stream_chunk_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ impl StreamChunk {
self.rows_in(0..self.capacity())
}

/// # Panics
/// If the chunk is empty.
pub fn last_row(&self) -> RowRef<'_> {
self.data_chunk().rows().next_back().unwrap()
}

/// Return an iterator on rows of this stream chunk in a range.
pub fn rows_in(&self, range: Range<usize>) -> impl Iterator<Item = (Op, RowRef<'_>)> {
self.data_chunk().rows_in(range).map(|row| {
Expand Down
7 changes: 7 additions & 0 deletions src/common/src/buffer/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,13 @@ impl Bitmap {
(bit_idx..self.len()).find(|&idx| unsafe { self.is_set_unchecked(idx) })
}

/// Return the prev set bit index ~~on or~~ before `bit_idx`.
pub fn prev_set_bit(&self, bit_idx: usize) -> Option<usize> {
(0..bit_idx)
.rev()
.find(|&idx| unsafe { self.is_set_unchecked(idx) })
}

/// Counts the number of bits set to 1.
pub fn count_ones(&self) -> usize {
self.count_ones
Expand Down
7 changes: 6 additions & 1 deletion src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -678,13 +678,17 @@ macro_rules! impl_convert {

paste! {
impl ScalarImpl {
/// # Panics
/// If the scalar is not of the expected type.
pub fn [<as_ $suffix_name>](&self) -> &$scalar {
match self {
Self::$variant_name(ref scalar) => scalar,
other_scalar => panic!("cannot convert ScalarImpl::{} to concrete type {}", other_scalar.get_ident(), stringify!($variant_name))
}
}

/// # Panics
/// If the scalar is not of the expected type.
pub fn [<into_ $suffix_name>](self) -> $scalar {
match self {
Self::$variant_name(scalar) => scalar,
Expand All @@ -694,7 +698,8 @@ macro_rules! impl_convert {
}

impl <'scalar> ScalarRefImpl<'scalar> {
// Note that this conversion consume self.
/// # Panics
/// If the scalar is not of the expected type.
pub fn [<into_ $suffix_name>](self) -> $scalar_ref {
match self {
Self::$variant_name(inner) => inner,
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ pub type BoxTryStream<M> = BoxStream<'static, Result<M, RwError>>;
#[derive(Clone, Debug, PartialEq)]
pub struct StreamChunkWithState {
pub chunk: StreamChunk,
/// The latest offsets for each split in this chunk.
pub split_offset_mapping: Option<HashMap<SplitId, String>>,
}

Expand Down
1 change: 1 addition & 0 deletions src/source/src/connector_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ impl ConnectorSource {
}
}

/// If `state` is `None`, returns a pending stream.
pub async fn stream_reader(
&self,
state: ConnectorState,
Expand Down
Loading

0 comments on commit a7da39f

Please sign in to comment.