From a7da39f148fe3c22b656a86d683967fd92ba2787 Mon Sep 17 00:00:00 2001 From: xxchan Date: Sat, 23 Dec 2023 11:50:28 +0800 Subject: [PATCH] feat(stream): add kafka backill executor --- src/common/src/array/data_chunk.rs | 5 + src/common/src/array/data_chunk_iter.rs | 18 + src/common/src/array/stream_chunk_iter.rs | 6 + src/common/src/buffer/bitmap.rs | 7 + src/common/src/types/mod.rs | 7 +- src/connector/src/source/base.rs | 1 + src/source/src/connector_source.rs | 1 + .../source/kafka_backfill_executor.rs | 943 ++++++++++++++++++ src/stream/src/executor/source/mod.rs | 1 + 9 files changed, 988 insertions(+), 1 deletion(-) create mode 100644 src/stream/src/executor/source/kafka_backfill_executor.rs diff --git a/src/common/src/array/data_chunk.rs b/src/common/src/array/data_chunk.rs index 90c2560cadcb2..8a14cba3e0509 100644 --- a/src/common/src/array/data_chunk.rs +++ b/src/common/src/array/data_chunk.rs @@ -121,6 +121,11 @@ impl DataChunk { self.visibility.next_set_bit(row_idx) } + /// Return the prev visible row index ~~on or~~ before `row_idx`. + pub fn prev_visible_row_idx(&self, row_idx: usize) -> Option { + self.visibility.next_set_bit(row_idx) + } + pub fn into_parts(self) -> (Vec, Bitmap) { (self.columns.to_vec(), self.visibility) } diff --git a/src/common/src/array/data_chunk_iter.rs b/src/common/src/array/data_chunk_iter.rs index 887a9ab02437e..51264a011e388 100644 --- a/src/common/src/array/data_chunk_iter.rs +++ b/src/common/src/array/data_chunk_iter.rs @@ -85,6 +85,24 @@ impl<'a> Iterator for DataChunkRefIter<'a> { } } +impl<'a> DoubleEndedIterator for DataChunkRefIter<'a> { + fn next_back(&mut self) -> Option { + if self.idx.start == self.idx.end { + return None; + } + match self.chunk.prev_visible_row_idx(self.idx.end) { + Some(idx) if idx >= self.idx.start => { + self.idx.end = idx; + Some(RowRef::new(self.chunk, idx)) + } + _ => { + self.idx.end = self.idx.start; + None + } + } + } +} + impl<'a> FusedIterator for DataChunkRefIter<'a> {} pub struct DataChunkRefIterWithHoles<'a> { diff --git a/src/common/src/array/stream_chunk_iter.rs b/src/common/src/array/stream_chunk_iter.rs index 82a8e9997b661..06dc7bdec8706 100644 --- a/src/common/src/array/stream_chunk_iter.rs +++ b/src/common/src/array/stream_chunk_iter.rs @@ -35,6 +35,12 @@ impl StreamChunk { self.rows_in(0..self.capacity()) } + /// # Panics + /// If the chunk is empty. + pub fn last_row(&self) -> RowRef<'_> { + self.data_chunk().rows().next_back().unwrap() + } + /// Return an iterator on rows of this stream chunk in a range. pub fn rows_in(&self, range: Range) -> impl Iterator)> { self.data_chunk().rows_in(range).map(|row| { diff --git a/src/common/src/buffer/bitmap.rs b/src/common/src/buffer/bitmap.rs index dd76f59ade44f..4dc10886babcd 100644 --- a/src/common/src/buffer/bitmap.rs +++ b/src/common/src/buffer/bitmap.rs @@ -315,6 +315,13 @@ impl Bitmap { (bit_idx..self.len()).find(|&idx| unsafe { self.is_set_unchecked(idx) }) } + /// Return the prev set bit index ~~on or~~ before `bit_idx`. + pub fn prev_set_bit(&self, bit_idx: usize) -> Option { + (0..bit_idx) + .rev() + .find(|&idx| unsafe { self.is_set_unchecked(idx) }) + } + /// Counts the number of bits set to 1. pub fn count_ones(&self) -> usize { self.count_ones diff --git a/src/common/src/types/mod.rs b/src/common/src/types/mod.rs index 7b6f393725e71..6d04c899c3b1e 100644 --- a/src/common/src/types/mod.rs +++ b/src/common/src/types/mod.rs @@ -678,6 +678,8 @@ macro_rules! impl_convert { paste! { impl ScalarImpl { + /// # Panics + /// If the scalar is not of the expected type. pub fn [](&self) -> &$scalar { match self { Self::$variant_name(ref scalar) => scalar, @@ -685,6 +687,8 @@ macro_rules! impl_convert { } } + /// # Panics + /// If the scalar is not of the expected type. pub fn [](self) -> $scalar { match self { Self::$variant_name(scalar) => scalar, @@ -694,7 +698,8 @@ macro_rules! impl_convert { } impl <'scalar> ScalarRefImpl<'scalar> { - // Note that this conversion consume self. + /// # Panics + /// If the scalar is not of the expected type. pub fn [](self) -> $scalar_ref { match self { Self::$variant_name(inner) => inner, diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 8db65bb9681bf..0e0762b0eb4cd 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -330,6 +330,7 @@ pub type BoxTryStream = BoxStream<'static, Result>; #[derive(Clone, Debug, PartialEq)] pub struct StreamChunkWithState { pub chunk: StreamChunk, + /// The latest offsets for each split in this chunk. pub split_offset_mapping: Option>, } diff --git a/src/source/src/connector_source.rs b/src/source/src/connector_source.rs index 00c7e33196abf..8419976df52d9 100644 --- a/src/source/src/connector_source.rs +++ b/src/source/src/connector_source.rs @@ -107,6 +107,7 @@ impl ConnectorSource { } } + /// If `state` is `None`, returns a pending stream. pub async fn stream_reader( &self, state: ConnectorState, diff --git a/src/stream/src/executor/source/kafka_backfill_executor.rs b/src/stream/src/executor/source/kafka_backfill_executor.rs new file mode 100644 index 0000000000000..4771473e02a8b --- /dev/null +++ b/src/stream/src/executor/source/kafka_backfill_executor.rs @@ -0,0 +1,943 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::assert_matches::assert_matches; +use std::fmt::Formatter; +use std::pin::pin; +use std::time::Duration; + +use anyhow::anyhow; +use either::Either; +use futures::stream::{select_with_strategy, AbortHandle, Abortable}; +use futures::StreamExt; +use futures_async_stream::try_stream; +use risingwave_common::metrics::GLOBAL_ERROR_METRICS; +use risingwave_common::row::Row; +use risingwave_common::system_param::local_manager::SystemParamsReaderRef; +use risingwave_connector::source::{ + BoxSourceWithStateStream, ConnectorState, SourceContext, SourceCtrlOpts, SplitId, + SplitMetaData, StreamChunkWithState, +}; +use risingwave_connector::ConnectorParams; +use risingwave_source::source_desc::{SourceDesc, SourceDescBuilder}; +use risingwave_storage::StateStore; +use thiserror_ext::AsReport; +use tokio::sync::mpsc::UnboundedReceiver; +use tokio::time::Instant; + +use super::executor_core::StreamSourceCore; +use crate::executor::monitor::StreamingMetrics; +use crate::executor::stream_reader::StreamReaderWithPause; +use crate::executor::*; + +type ExecutorSplitId = String; + +/// A constant to multiply when calculating the maximum time to wait for a barrier. This is due to +/// some latencies in network and cost in meta. +const WAIT_BARRIER_MULTIPLE_TIMES: u128 = 5; + +pub struct KafkaBackfillExecutorWrapper { + inner: KafkaBackfillExecutor, + /// Upstream changelog stream which may contain metadata columns, e.g. `_rw_offset` + input: Box, +} + +pub struct KafkaBackfillExecutor { + actor_ctx: ActorContextRef, + info: ExecutorInfo, + + /// Streaming source for external + stream_source_core: Option>, + + /// Metrics for monitor. + metrics: Arc, + + // /// Receiver of barrier channel. + // barrier_receiver: Option>, + /// System parameter reader to read barrier interval + system_params: SystemParamsReaderRef, + + // control options for connector level + source_ctrl_opts: SourceCtrlOpts, + + // config for the connector node + connector_params: ConnectorParams, +} + +impl KafkaBackfillExecutor { + #[allow(clippy::too_many_arguments)] + pub fn new( + actor_ctx: ActorContextRef, + info: ExecutorInfo, + stream_source_core: Option>, + metrics: Arc, + // barrier_receiver: UnboundedReceiver, + system_params: SystemParamsReaderRef, + source_ctrl_opts: SourceCtrlOpts, + connector_params: ConnectorParams, + ) -> Self { + Self { + actor_ctx, + info, + stream_source_core, + metrics, + // barrier_receiver: Some(barrier_receiver), + system_params, + source_ctrl_opts, + connector_params, + } + } + + async fn build_stream_source_reader( + &self, + source_desc: &SourceDesc, + state: ConnectorState, + ) -> StreamExecutorResult<( + BoxSourceWithStateStream, + HashMap, + )> { + let column_ids = source_desc + .columns + .iter() + .map(|column_desc| column_desc.column_id) + .collect_vec(); + let source_ctx = SourceContext::new_with_suppressor( + self.actor_ctx.id, + self.stream_source_core.as_ref().unwrap().source_id, + self.actor_ctx.fragment_id, + source_desc.metrics.clone(), + self.source_ctrl_opts.clone(), + self.connector_params.connector_client.clone(), + self.actor_ctx.error_suppressor.clone(), + ); + let source_ctx = Arc::new(source_ctx); + // Unlike SourceExecutor, which creates a stream_reader with all splits, + // we create a separate stream_reader for each split here, because we + // want to abort early for each split after the split's backfilling is finished. + match state { + Some(splits) => { + let mut abort_handles = HashMap::new(); + let mut streams = vec![]; + for split in splits { + let split_id = split.id().to_string(); + let reader = source_desc + .source + .stream_reader(Some(vec![split]), column_ids.clone(), source_ctx.clone()) + .await + .map_err(StreamExecutorError::connector_error)?; + let (abort_handle, abort_registration) = AbortHandle::new_pair(); + let stream = Abortable::new(reader, abort_registration); + abort_handles.insert(split_id, abort_handle); + streams.push(stream); + } + return Ok((futures::stream::select_all(streams).boxed(), abort_handles)); + } + None => return Ok((futures::stream::pending().boxed(), HashMap::new())), + } + } + + async fn apply_split_change( + &mut self, + source_desc: &SourceDesc, + stream: &mut StreamReaderWithPause, + split_assignment: &HashMap>, + ) -> StreamExecutorResult>> { + // self.metrics + // .source_split_change_count + // .with_label_values( + // &self + // .get_metric_labels() + // .iter() + // .map(AsRef::as_ref) + // .collect::>(), + // ) + // .inc(); + if let Some(target_splits) = split_assignment.get(&self.actor_ctx.id).cloned() { + if let Some(target_state) = self.update_state_if_changed(Some(target_splits)).await? { + tracing::info!( + actor_id = self.actor_ctx.id, + state = ?target_state, + "apply split change" + ); + + self.replace_stream_reader_with_target_state( + source_desc, + stream, + target_state.clone(), + ) + .await?; + + return Ok(Some(target_state)); + } + } + + Ok(None) + } + + // Note: `update_state_if_changed` will modify `state_cache` + async fn update_state_if_changed( + &mut self, + state: ConnectorState, + ) -> StreamExecutorResult { + let core = self.stream_source_core.as_mut().unwrap(); + + let target_splits: HashMap<_, _> = state + .unwrap() + .into_iter() + .map(|split| (split.id(), split)) + .collect(); + + let mut target_state: Vec = Vec::with_capacity(target_splits.len()); + + let mut split_changed = false; + + for (split_id, split) in &target_splits { + if let Some(s) = core.state_cache.get(split_id) { + // existing split, no change, clone from cache + target_state.push(s.clone()) + } else { + split_changed = true; + // write new assigned split to state cache. snapshot is base on cache. + + let initial_state = if let Some(recover_state) = core + .split_state_store + .try_recover_from_state_store(split) + .await? + { + recover_state + } else { + split.clone() + }; + + core.state_cache + .entry(split.id()) + .or_insert_with(|| initial_state.clone()); + + target_state.push(initial_state); + } + } + + // state cache may be stale + for existing_split_id in core.stream_source_splits.keys() { + if !target_splits.contains_key(existing_split_id) { + tracing::info!("split dropping detected: {}", existing_split_id); + split_changed = true; + } + } + + Ok(split_changed.then_some(target_state)) + } + + /// Rebuild stream if there is a err in stream + async fn rebuild_stream_reader_from_error( + &mut self, + source_desc: &SourceDesc, + stream: &mut StreamReaderWithPause, + split_info: &mut [SplitImpl], + e: StreamExecutorError, + ) -> StreamExecutorResult<()> { + let core = self.stream_source_core.as_mut().unwrap(); + tracing::warn!( + "stream source reader error, actor: {:?}, source: {:?}", + self.actor_ctx.id, + core.source_id, + ); + GLOBAL_ERROR_METRICS.user_source_reader_error.report([ + "SourceReaderError".to_owned(), + e.to_report_string(), + "KafkaBackfillExecutor".to_owned(), + self.actor_ctx.id.to_string(), + core.source_id.to_string(), + ]); + // fetch the newest offset, either it's in cache (before barrier) + // or in state table (just after barrier) + let target_state = if core.state_cache.is_empty() { + for ele in &mut *split_info { + if let Some(recover_state) = core + .split_state_store + .try_recover_from_state_store(ele) + .await? + { + *ele = recover_state; + } + } + split_info.to_owned() + } else { + core.state_cache + .values() + .map(|split_impl| split_impl.to_owned()) + .collect_vec() + }; + + self.replace_stream_reader_with_target_state(source_desc, stream, target_state) + .await + } + + async fn replace_stream_reader_with_target_state( + &mut self, + source_desc: &SourceDesc, + stream: &mut StreamReaderWithPause, + target_state: Vec, + ) -> StreamExecutorResult<()> { + tracing::info!( + "actor {:?} apply source split change to {:?}", + self.actor_ctx.id, + target_state + ); + + // Replace the source reader with a new one of the new state. + + let (reader, abort_handles) = self + .build_stream_source_reader(source_desc, Some(target_state.clone())) + .await?; + + stream.replace_data_stream(reader); + + Ok(()) + } + + async fn take_snapshot_and_clear_cache( + &mut self, + epoch: EpochPair, + target_state: Option>, + should_trim_state: bool, + ) -> StreamExecutorResult<()> { + let core = self.stream_source_core.as_mut().unwrap(); + + let mut cache = core + .state_cache + .values() + .map(|split_impl| split_impl.to_owned()) + .collect_vec(); + + if let Some(target_splits) = target_state { + let target_split_ids: HashSet<_> = + target_splits.iter().map(|split| split.id()).collect(); + + cache.retain(|split| target_split_ids.contains(&split.id())); + + let dropped_splits = core + .stream_source_splits + .extract_if(|split_id, _| !target_split_ids.contains(split_id)) + .map(|(_, split)| split) + .collect_vec(); + + if should_trim_state && !dropped_splits.is_empty() { + // trim dropped splits' state + core.split_state_store.trim_state(&dropped_splits).await?; + } + + core.stream_source_splits = target_splits + .into_iter() + .map(|split| (split.id(), split)) + .collect(); + } + + if !cache.is_empty() { + tracing::debug!(actor_id = self.actor_ctx.id, state = ?cache, "take snapshot"); + core.split_state_store.take_snapshot(cache).await? + } + // commit anyway, even if no message saved + core.split_state_store.state_store.commit(epoch).await?; + + core.state_cache.clear(); + + Ok(()) + } + + async fn try_flush_data(&mut self) -> StreamExecutorResult<()> { + let core = self.stream_source_core.as_mut().unwrap(); + core.split_state_store.state_store.try_flush().await?; + + Ok(()) + } + + /// A source executor with a stream source receives: + /// 1. Barrier messages + /// 2. Data from external source + /// and acts accordingly. + #[try_stream(ok = Message, error = StreamExecutorError)] + async fn execute_with_stream_source(mut self, input: BoxedExecutor) { + // FIXME: + let split_column_idx = 1; + let offset_column_idx = 2; + // let mut barrier_receiver = self.barrier_receiver.take().unwrap(); + // let barrier = barrier_receiver + // .recv() + // .instrument_await("source_recv_first_barrier") + // .await + // .ok_or_else(|| { + // anyhow!( + // "failed to receive the first barrier, actor_id: {:?}, source_id: {:?}", + // self.actor_ctx.id, + // self.stream_source_core.as_ref().unwrap().source_id + // ) + // })?; + let mut input = input.execute(); + + // Poll the upstream to get the first barrier. + let barrier = expect_first_barrier(&mut input).await?; + + let mut core = self.stream_source_core.unwrap(); + + // Build source description from the builder. + let source_desc_builder: SourceDescBuilder = core.source_desc_builder.take().unwrap(); + let source_desc = source_desc_builder + .build() + .map_err(StreamExecutorError::connector_error)?; + + let mut boot_state = Vec::default(); + if let Some(mutation) = barrier.mutation.as_ref() { + match mutation.as_ref() { + Mutation::Add(AddMutation { splits, .. }) + | Mutation::Update(UpdateMutation { + actor_splits: splits, + .. + }) => { + if let Some(splits) = splits.get(&self.actor_ctx.id) { + tracing::info!( + "source exector: actor {:?} boot with splits: {:?}", + self.actor_ctx.id, + splits + ); + boot_state = splits.clone(); + } + } + _ => {} + } + } + let mut latest_split_info = boot_state.clone(); + + core.split_state_store.init_epoch(barrier.epoch); + + for ele in &mut boot_state { + if let Some(recover_state) = core + .split_state_store + .try_recover_from_state_store(ele) + .await? + { + *ele = recover_state; + } + } + + // init in-memory split states with persisted state if any + core.init_split_state(boot_state.clone()); + + // Return the ownership of `stream_source_core` to the source executor. + self.stream_source_core = Some(core); + + let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state); + tracing::info!(actor_id = self.actor_ctx.id, state = ?recover_state, "start with state"); + let (source_chunk_reader, abort_handles) = self + .build_stream_source_reader(&source_desc, recover_state) + .instrument_await("source_build_reader") + .await?; + let source_chunk_reader = pin!(source_chunk_reader); + + // // Merge the chunks from source and the barriers into a single stream. We prioritize + // // barriers over source data chunks here. + // let mut stream = + // StreamReaderWithPause::::new(input, source_chunk_reader); + + // If the first barrier requires us to pause on startup, pause the stream. + if barrier.is_pause_on_startup() { + // stream.pause_stream(); + todo!() + } + + yield Message::Barrier(barrier); + + // XXX: + // - What's the best poll strategy? + // - Should we also add a barrier stream for backfill executor? + // - TODO: support pause source chunk + let mut backfill_stream = select_with_strategy( + input.by_ref().map(Either::Left), + source_chunk_reader.map(Either::Right), + |_: &mut ()| futures::stream::PollNext::Left, + ); + + // split -> offset + // XXX: the offset's type is quite a headache... + #[derive(Debug)] + enum BackfillState { + Backfilling(String), + /// Backfill is stopped at this offset. Source needs to filter out messages before this offset. + SourceCachingUp(String), + Finished, + } + + let mut backfill_state = HashMap::::new(); + let mut finished_cnt = 0; + // TODO: + let backfill_finished = true; + + if !backfill_finished { + #[for_await] + for either in &mut backfill_stream { + match either { + // Upstream + Either::Left(msg) => { + match msg? { + Message::Barrier(barrier) => { + todo!() + } + Message::Chunk(chunk) => { + let (_op, first_row) = chunk.rows().next().unwrap(); + // FIXME: a chunk might have multiple splits? If not, we will need to + // iterate over the chunk, which seems to be a bit inefficient. + let split = + first_row.datum_at(split_column_idx).unwrap().into_int64(); + // XXX: Is the offset in a chunk always ordered? + let offset = + first_row.datum_at(offset_column_idx).unwrap().into_int64(); + let backfill_state = + backfill_state.get_mut(&split.to_string()).unwrap(); + match backfill_state { + BackfillState::Backfilling(backfill_offset) => { + match compare_kafka_offset(backfill_offset, offset) { + std::cmp::Ordering::Less => { + // continue backfilling. Ignore this chunk + } + _ => { + // backfilling for this split is finished! + *backfill_state = BackfillState::SourceCachingUp( + offset.to_string(), + ); + abort_handles + .get(&split.to_string()) + .unwrap() + .abort(); + } + } + } + BackfillState::SourceCachingUp(backfill_offset) => { + todo!() + } + BackfillState::Finished => { + // simply forward the chunk + // This split's backfilling is finisehd, we are waiting for other splits + yield Message::Chunk(chunk) + } + } + } + Message::Watermark(_) => { + // Ignore watermark during backfill. (?) + todo!() + } + } + } + // backfill + Either::Right(msg) => { + let StreamChunkWithState { + chunk, + split_offset_mapping, + } = msg.map_err(StreamExecutorError::connector_error)?; + let split_offset_mapping = + split_offset_mapping.expect("kafka source should have offsets"); + + let state: HashMap<_, _> = split_offset_mapping + .iter() + .flat_map(|(split_id, offset)| { + let origin_split_impl = self + .stream_source_core + .as_mut() + .unwrap() + .stream_source_splits + .get_mut(split_id); + + // update backfill progress + let prev_state = backfill_state.insert( + split_id.to_string(), + BackfillState::Backfilling(offset.to_string()), + ); + // abort_handles should prevents other cases happening + assert_matches!( + prev_state, + Some(BackfillState::Backfilling(_)), + "Unexpected backfilling state, split_id: {split_id}" + ); + + origin_split_impl.map(|split_impl| { + split_impl.update_in_place(offset.clone())?; + Ok::<_, anyhow::Error>((split_id.clone(), split_impl.clone())) + }) + }) + .try_collect()?; + + self.stream_source_core + .as_mut() + .unwrap() + .state_cache + .extend(state); + + yield Message::Chunk(chunk); + self.try_flush_data().await?; + } + } + } + } + + // All splits finished backfilling. Now we only forward the source data. + #[for_await] + for msg in input { + match msg? { + Message::Barrier(barrier) => { + todo!() + } + Message::Chunk(chunk) => { + yield Message::Chunk(chunk); + } + Message::Watermark(watermark) => { + yield Message::Watermark(watermark); + } + } + } + } +} + +fn compare_kafka_offset(a: &str, b: i64) -> std::cmp::Ordering { + let a = a.parse::().unwrap(); + a.cmp(&b) +} + +impl Executor for KafkaBackfillExecutorWrapper { + fn execute(self: Box) -> BoxedMessageStream { + self.inner.execute_with_stream_source(self.input).boxed() + } + + fn schema(&self) -> &Schema { + &self.inner.info.schema + } + + fn pk_indices(&self) -> PkIndicesRef<'_> { + &self.inner.info.pk_indices + } + + fn identity(&self) -> &str { + &self.inner.info.identity + } +} + +impl Debug for KafkaBackfillExecutor { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + if let Some(core) = &self.stream_source_core { + f.debug_struct("KafkaBackfillExecutor") + .field("source_id", &core.source_id) + .field("column_ids", &core.column_ids) + .field("pk_indices", &self.info.pk_indices) + .finish() + } else { + f.debug_struct("KafkaBackfillExecutor").finish() + } + } +} + +// #[cfg(test)] +// mod tests { +// use std::time::Duration; + +// use futures::StreamExt; +// use maplit::{convert_args, hashmap}; +// use risingwave_common::array::StreamChunk; +// use risingwave_common::catalog::{ColumnId, Field, Schema, TableId}; +// use risingwave_common::system_param::local_manager::LocalSystemParamsManager; +// use risingwave_common::test_prelude::StreamChunkTestExt; +// use risingwave_common::types::DataType; +// use risingwave_connector::source::datagen::DatagenSplit; +// use risingwave_pb::catalog::StreamSourceInfo; +// use risingwave_pb::plan_common::PbRowFormatType; +// use risingwave_source::connector_test_utils::create_source_desc_builder; +// use risingwave_storage::memory::MemoryStateStore; +// use tokio::sync::mpsc::unbounded_channel; +// use tracing_test::traced_test; + +// use super::*; +// use crate::executor::ActorContext; + +// const MOCK_SOURCE_NAME: &str = "mock_source"; + +// #[tokio::test] +// async fn test_source_executor() { +// let table_id = TableId::default(); +// let schema = Schema { +// fields: vec![Field::with_name(DataType::Int32, "sequence_int")], +// }; +// let row_id_index = None; +// let pk_indices = vec![0]; +// let source_info = StreamSourceInfo { +// row_format: PbRowFormatType::Native as i32, +// ..Default::default() +// }; +// let (barrier_tx, barrier_rx) = unbounded_channel::(); +// let column_ids = vec![0].into_iter().map(ColumnId::from).collect(); + +// // This datagen will generate 3 rows at one time. +// let properties: HashMap = convert_args!(hashmap!( +// "connector" => "datagen", +// "datagen.rows.per.second" => "3", +// "fields.sequence_int.kind" => "sequence", +// "fields.sequence_int.start" => "11", +// "fields.sequence_int.end" => "11111", +// )); +// let source_desc_builder = +// create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]); +// let split_state_store = SourceStateTableHandler::from_table_catalog( +// &default_source_internal_table(0x2333), +// MemoryStateStore::new(), +// ) +// .await; +// let core = StreamSourceCore:: { +// source_id: table_id, +// column_ids, +// source_desc_builder: Some(source_desc_builder), +// stream_source_splits: HashMap::new(), +// split_state_store, +// state_cache: HashMap::new(), +// source_name: MOCK_SOURCE_NAME.to_string(), +// }; + +// let system_params_manager = LocalSystemParamsManager::for_test(); + +// let executor = KafkaBackfillExecutor::new( +// ActorContext::create(0), +// ExecutorInfo { +// schema, +// pk_indices, +// identity: "KafkaBackfillExecutor".to_string(), +// }, +// Some(core), +// Arc::new(StreamingMetrics::unused()), +// barrier_rx, +// system_params_manager.get_params(), +// SourceCtrlOpts::default(), +// ConnectorParams::default(), +// ); +// let mut executor = Box::new(executor).execute(); + +// let init_barrier = Barrier::new_test_barrier(1).with_mutation(Mutation::Add(AddMutation { +// adds: HashMap::new(), +// added_actors: HashSet::new(), +// splits: hashmap! { +// ActorId::default() => vec![ +// SplitImpl::Datagen(DatagenSplit { +// split_index: 0, +// split_num: 1, +// start_offset: None, +// }), +// ], +// }, +// pause: false, +// })); +// barrier_tx.send(init_barrier).unwrap(); + +// // Consume barrier. +// executor.next().await.unwrap().unwrap(); + +// // Consume data chunk. +// let msg = executor.next().await.unwrap().unwrap(); + +// // Row id will not be filled here. +// assert_eq!( +// msg.into_chunk().unwrap(), +// StreamChunk::from_pretty( +// " i +// + 11 +// + 12 +// + 13" +// ) +// ); +// } + +// #[traced_test] +// #[tokio::test] +// async fn test_split_change_mutation() { +// let table_id = TableId::default(); +// let schema = Schema { +// fields: vec![Field::with_name(DataType::Int32, "v1")], +// }; +// let row_id_index = None; +// let pk_indices = vec![0_usize]; +// let source_info = StreamSourceInfo { +// row_format: PbRowFormatType::Native as i32, +// ..Default::default() +// }; +// let properties = convert_args!(hashmap!( +// "connector" => "datagen", +// "fields.v1.kind" => "sequence", +// "fields.v1.start" => "11", +// "fields.v1.end" => "11111", +// )); + +// let source_desc_builder = +// create_source_desc_builder(&schema, row_id_index, source_info, properties, vec![]); +// let mem_state_store = MemoryStateStore::new(); + +// let column_ids = vec![ColumnId::from(0)]; +// let (barrier_tx, barrier_rx) = unbounded_channel::(); +// let split_state_store = SourceStateTableHandler::from_table_catalog( +// &default_source_internal_table(0x2333), +// mem_state_store.clone(), +// ) +// .await; + +// let core = StreamSourceCore:: { +// source_id: table_id, +// column_ids: column_ids.clone(), +// source_desc_builder: Some(source_desc_builder), +// stream_source_splits: HashMap::new(), +// split_state_store, +// state_cache: HashMap::new(), +// source_name: MOCK_SOURCE_NAME.to_string(), +// }; + +// let system_params_manager = LocalSystemParamsManager::for_test(); + +// let executor = KafkaBackfillExecutor::new( +// ActorContext::create(0), +// ExecutorInfo { +// schema, +// pk_indices, +// identity: "KafkaBackfillExecutor".to_string(), +// }, +// Some(core), +// Arc::new(StreamingMetrics::unused()), +// barrier_rx, +// system_params_manager.get_params(), +// SourceCtrlOpts::default(), +// ConnectorParams::default(), +// ); +// let mut handler = Box::new(executor).execute(); + +// let init_barrier = Barrier::new_test_barrier(1).with_mutation(Mutation::Add(AddMutation { +// adds: HashMap::new(), +// added_actors: HashSet::new(), +// splits: hashmap! { +// ActorId::default() => vec![ +// SplitImpl::Datagen(DatagenSplit { +// split_index: 0, +// split_num: 3, +// start_offset: None, +// }), +// ], +// }, +// pause: false, +// })); +// barrier_tx.send(init_barrier).unwrap(); + +// // Consume barrier. +// handler +// .next() +// .await +// .unwrap() +// .unwrap() +// .into_barrier() +// .unwrap(); + +// let mut ready_chunks = handler.ready_chunks(10); + +// let _ = ready_chunks.next().await.unwrap(); + +// let new_assignment = vec![ +// SplitImpl::Datagen(DatagenSplit { +// split_index: 0, +// split_num: 3, +// start_offset: None, +// }), +// SplitImpl::Datagen(DatagenSplit { +// split_index: 1, +// split_num: 3, +// start_offset: None, +// }), +// SplitImpl::Datagen(DatagenSplit { +// split_index: 2, +// split_num: 3, +// start_offset: None, +// }), +// ]; + +// let change_split_mutation = +// Barrier::new_test_barrier(2).with_mutation(Mutation::SourceChangeSplit(hashmap! { +// ActorId::default() => new_assignment.clone() +// })); + +// barrier_tx.send(change_split_mutation).unwrap(); + +// let _ = ready_chunks.next().await.unwrap(); // barrier + +// let mut source_state_handler = SourceStateTableHandler::from_table_catalog( +// &default_source_internal_table(0x2333), +// mem_state_store.clone(), +// ) +// .await; +// // there must exist state for new add partition +// source_state_handler.init_epoch(EpochPair::new_test_epoch(2)); +// source_state_handler +// .get(new_assignment[1].id()) +// .await +// .unwrap() +// .unwrap(); + +// tokio::time::sleep(Duration::from_millis(100)).await; + +// let _ = ready_chunks.next().await.unwrap(); + +// let barrier = Barrier::new_test_barrier(3).with_mutation(Mutation::Pause); +// barrier_tx.send(barrier).unwrap(); + +// let barrier = Barrier::new_test_barrier(4).with_mutation(Mutation::Resume); +// barrier_tx.send(barrier).unwrap(); + +// // receive all +// ready_chunks.next().await.unwrap(); + +// let prev_assignment = new_assignment; +// let new_assignment = vec![prev_assignment[2].clone()]; + +// let drop_split_mutation = +// Barrier::new_test_barrier(5).with_mutation(Mutation::SourceChangeSplit(hashmap! { +// ActorId::default() => new_assignment.clone() +// })); + +// barrier_tx.send(drop_split_mutation).unwrap(); + +// ready_chunks.next().await.unwrap(); // barrier + +// let mut source_state_handler = SourceStateTableHandler::from_table_catalog( +// &default_source_internal_table(0x2333), +// mem_state_store.clone(), +// ) +// .await; + +// source_state_handler.init_epoch(EpochPair::new_test_epoch(5)); + +// assert!(source_state_handler +// .try_recover_from_state_store(&prev_assignment[0]) +// .await +// .unwrap() +// .is_none()); + +// assert!(source_state_handler +// .try_recover_from_state_store(&prev_assignment[1]) +// .await +// .unwrap() +// .is_none()); + +// assert!(source_state_handler +// .try_recover_from_state_store(&prev_assignment[2]) +// .await +// .unwrap() +// .is_some()); +// } +// } diff --git a/src/stream/src/executor/source/mod.rs b/src/stream/src/executor/source/mod.rs index 18f7346777d6b..0f16db1c4d435 100644 --- a/src/stream/src/executor/source/mod.rs +++ b/src/stream/src/executor/source/mod.rs @@ -22,6 +22,7 @@ pub use state_table_handler::*; pub mod fetch_executor; pub use fetch_executor::*; +pub mod kafka_backfill_executor; pub mod source_executor; pub mod list_executor;