Skip to content

Commit

Permalink
add unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
Liang Zhao committed Nov 17, 2022
1 parent 729f62c commit 42a69db
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 0 deletions.
9 changes: 9 additions & 0 deletions src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,15 @@ impl Barrier {
}
}

pub fn new_test_barrier_with_prev_epoch(epoch: u64, prev_epoch: u64) -> Self {
Self {
epoch: EpochPair::new(epoch, prev_epoch),
checkpoint: true,
mutation: Default::default(),
passed_actors: Default::default(),
}
}

#[must_use]
pub fn with_mutation(self, mutation: Mutation) -> Self {
Self {
Expand Down
121 changes: 121 additions & 0 deletions src/stream/src/executor/now.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,124 @@ impl<S: StateStore> Executor for NowExecutor<S> {
self.identity.as_str()
}
}

#[cfg(test)]
mod tests {
use std::str::FromStr;

use chrono::NaiveDateTime;
use futures::StreamExt;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId};
use risingwave_common::test_prelude::StreamChunkTestExt;
use risingwave_common::types::{DataType, NaiveDateTimeWrapper, ScalarImpl};
use risingwave_common::util::sort_util::OrderType;
use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::table::streaming_table::state_table::StateTable;
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};

use super::NowExecutor;
use crate::executor::{Barrier, BoxedMessageStream, Executor, Message, PkIndices, Watermark};

#[tokio::test]
async fn test_now() {
let state_table = create_state_table().await;
let (tx, mut now_executor) = create_executor(state_table);

// Init barrier
tx.send(Barrier::new_test_barrier(1)).unwrap();

// Consume the barrier
now_executor.next().await.unwrap().unwrap();

tx.send(Barrier::new_test_barrier_with_prev_epoch(1 << 16, 1))
.unwrap();

// Consume the data chunk
let chunk_msg = now_executor.next().await.unwrap().unwrap();

assert_eq!(
chunk_msg.into_chunk().unwrap().compact(),
StreamChunk::from_pretty(
" TS
+ 2021-04-01T00:00:00.001"
)
);

// Consume the watermark
let watermark = now_executor.next().await.unwrap().unwrap();

assert_eq!(
watermark,
Message::Watermark(Watermark::new(
0,
ScalarImpl::NaiveDateTime(NaiveDateTimeWrapper::new(
NaiveDateTime::from_str("2021-04-01T00:00:00.001").unwrap()
))
))
);

// Consume the barrier
now_executor.next().await.unwrap().unwrap();

tx.send(Barrier::new_test_barrier_with_prev_epoch(2 << 16, 1 << 16))
.unwrap();

// Consume the data chunk
let chunk_msg = now_executor.next().await.unwrap().unwrap();

assert_eq!(
chunk_msg.into_chunk().unwrap().compact(),
StreamChunk::from_pretty(
" TS
- 2021-04-01T00:00:00.001
+ 2021-04-01T00:00:00.002"
)
);

// Consume the watermark
let watermark = now_executor.next().await.unwrap().unwrap();

assert_eq!(
watermark,
Message::Watermark(Watermark::new(
0,
ScalarImpl::NaiveDateTime(NaiveDateTimeWrapper::new(
NaiveDateTime::from_str("2021-04-01T00:00:00.002").unwrap()
))
))
);

// Consume the barrier
now_executor.next().await.unwrap().unwrap();
}

#[inline]
fn create_pk_indices() -> PkIndices {
vec![0]
}

async fn create_state_table() -> StateTable<MemoryStateStore> {
let memory_state_store = MemoryStateStore::new();
let table_id = TableId::new(1);
let column_descs = vec![ColumnDesc::unnamed(ColumnId::new(0), DataType::Timestamp)];
let order_types = vec![OrderType::Ascending];
let pk_indices = create_pk_indices();
StateTable::new_without_distribution(
memory_state_store,
table_id,
column_descs,
order_types,
pk_indices,
)
.await
}

fn create_executor(
state_table: StateTable<MemoryStateStore>,
) -> (UnboundedSender<Barrier>, BoxedMessageStream) {
let (sender, barrier_receiver) = unbounded_channel();
let now_executor = NowExecutor::new(barrier_receiver, 1, state_table);
(sender, Box::new(now_executor).execute())
}
}

0 comments on commit 42a69db

Please sign in to comment.