From ab85913027b1de261b451716f8058bb0d6a49a2c Mon Sep 17 00:00:00 2001 From: Liang Zhao Date: Wed, 16 Nov 2022 22:30:00 +0800 Subject: [PATCH 1/7] feat(stream): implement now executor (close #6407) --- src/common/src/util/epoch.rs | 8 +- src/stream/src/executor/mod.rs | 1 + src/stream/src/executor/now.rs | 155 +++++++++++++++++++++++++++++++++ 3 files changed, 163 insertions(+), 1 deletion(-) create mode 100644 src/stream/src/executor/now.rs diff --git a/src/common/src/util/epoch.rs b/src/common/src/util/epoch.rs index bece2d7ce8a53..d9f9bb25858af 100644 --- a/src/common/src/util/epoch.rs +++ b/src/common/src/util/epoch.rs @@ -18,10 +18,12 @@ use std::time::{Duration, SystemTime}; use parse_display::Display; +static UNIX_SINGULARITY_DATE_SEC: u64 = 1_617_235_200; + /// `UNIX_SINGULARITY_DATE_EPOCH` represents the singularity date of the UNIX epoch: /// 2021-04-01T00:00:00Z. pub static UNIX_SINGULARITY_DATE_EPOCH: LazyLock = - LazyLock::new(|| SystemTime::UNIX_EPOCH + Duration::from_secs(1_617_235_200)); + LazyLock::new(|| SystemTime::UNIX_EPOCH + Duration::from_secs(UNIX_SINGULARITY_DATE_SEC)); #[derive(Clone, Copy, Debug, Display, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct Epoch(pub u64); @@ -72,6 +74,10 @@ impl Epoch { .as_millis() as u64 } + pub fn as_unix_millis(&self) -> u64 { + UNIX_SINGULARITY_DATE_SEC * 1000 + self.physical_time() + } + /// Returns the epoch in real system time. pub fn as_system_time(&self) -> SystemTime { *UNIX_SINGULARITY_DATE_EPOCH + Duration::from_millis(self.physical_time()) diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index 91fe35895609c..f2a9781f60758 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -69,6 +69,7 @@ mod lookup_union; mod managed_state; mod merge; mod mview; +mod now; mod project; mod project_set; mod rearranged_chain; diff --git a/src/stream/src/executor/now.rs b/src/stream/src/executor/now.rs new file mode 100644 index 0000000000000..4d32ee83216b3 --- /dev/null +++ b/src/stream/src/executor/now.rs @@ -0,0 +1,155 @@ +// Copyright 2022 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use chrono::NaiveDateTime; +use futures::{pin_mut, StreamExt}; +use futures_async_stream::try_stream; +use risingwave_common::array::{Op, StreamChunk}; +use risingwave_common::catalog::{Field, Schema}; +use risingwave_common::row::Row; +use risingwave_common::types::{DataType, NaiveDateTimeWrapper, ScalarImpl}; +use risingwave_common::util::chunk_coalesce::DataChunkBuilder; +use risingwave_common::util::epoch::Epoch; +use risingwave_storage::table::streaming_table::state_table::StateTable; +use risingwave_storage::StateStore; + +use super::{ + expect_first_barrier, BoxedExecutor, BoxedMessageStream, Executor, Message, PkIndices, + PkIndicesRef, StreamExecutorError, Watermark, +}; + +pub struct NowExecutor { + input: Option, + pk_indices: PkIndices, + identity: String, + schema: Schema, + state_table: StateTable, +} + +impl NowExecutor { + #[allow(dead_code)] + pub fn new(input: BoxedExecutor, executor_id: u64, state_table: StateTable) -> Self { + let schema = Schema::new(vec![Field { + data_type: DataType::Timestamp, + name: String::from("now"), + sub_fields: vec![], + type_name: String::default(), + }]); + Self { + input: Some(input), + pk_indices: vec![0], + identity: format!("NowExecutor {:X}", executor_id), + schema, + state_table, + } + } + + #[try_stream(ok = Message, error = StreamExecutorError)] + async fn into_stream(mut self) { + let mut input = self.input.take().unwrap().execute(); + + // Consume the first barrier message and initialize state table. + let barrier = expect_first_barrier(&mut input).await?; + self.state_table.init_epoch(barrier.epoch); + + // The first barrier message should be propagated. + yield Message::Barrier(barrier); + + let state_row = { + let data_iter = self.state_table.iter().await?; + pin_mut!(data_iter); + if let Some(state_row) = data_iter.next().await { + Some(state_row?) + } else { + None + } + }; + + let mut last_timestamp = state_row.and_then(|row| row[0].clone()); + + #[for_await] + for msg in input { + match msg? { + Message::Barrier(barrier) => { + if !barrier.is_update() { + let time_millis = Epoch::from(barrier.epoch.curr).as_unix_millis(); + let timestamp = Some(ScalarImpl::NaiveDateTime(NaiveDateTimeWrapper::new( + NaiveDateTime::from_timestamp( + (time_millis / 1000) as i64, + (time_millis % 1000 * 1_000_000) as u32, + ), + ))); + + let mut data_chunk_builder = DataChunkBuilder::new( + self.schema().data_types(), + if last_timestamp.is_some() { 2 } else { 1 }, + ); + if last_timestamp.is_some() { + let chunk_popped = data_chunk_builder + .append_one_row_from_datums([&last_timestamp].into_iter()); + debug_assert!(chunk_popped.is_none()); + } + let data_chunk = data_chunk_builder + .append_one_row_from_datums([×tamp].into_iter()) + .unwrap(); + let mut ops = if last_timestamp.is_some() { + vec![Op::Delete] + } else { + vec![] + }; + ops.push(Op::Insert); + let stream_chunk = StreamChunk::from_parts(ops, data_chunk); + yield Message::Chunk(stream_chunk); + + yield Message::Watermark(Watermark::new( + 0, + timestamp.as_ref().unwrap().clone(), + )); + + if last_timestamp.is_some() { + self.state_table.delete(Row::new(vec![last_timestamp])); + } + self.state_table.insert(Row::new(vec![timestamp.clone()])); + last_timestamp = timestamp; + + self.state_table.commit(barrier.epoch).await?; + } else { + self.state_table.commit_no_data_expected(barrier.epoch); + } + + yield Message::Barrier(barrier); + } + _ => {} + } + } + } +} + +impl Executor for NowExecutor { + fn execute(self: Box) -> BoxedMessageStream { + self.into_stream().boxed() + } + + fn schema(&self) -> &Schema { + &self.schema + } + + fn pk_indices(&self) -> PkIndicesRef<'_> { + &self.pk_indices + } + + fn identity(&self) -> &str { + self.identity.as_str() + } +} From 3becad81784b5a730808cc2a46ead97bb9371e4e Mon Sep 17 00:00:00 2001 From: Liang Zhao Date: Wed, 16 Nov 2022 23:07:55 +0800 Subject: [PATCH 2/7] use if let --- src/stream/src/executor/now.rs | 91 ++++++++++++++++------------------ 1 file changed, 44 insertions(+), 47 deletions(-) diff --git a/src/stream/src/executor/now.rs b/src/stream/src/executor/now.rs index 4d32ee83216b3..5436c3c334539 100644 --- a/src/stream/src/executor/now.rs +++ b/src/stream/src/executor/now.rs @@ -80,57 +80,54 @@ impl NowExecutor { #[for_await] for msg in input { - match msg? { - Message::Barrier(barrier) => { - if !barrier.is_update() { - let time_millis = Epoch::from(barrier.epoch.curr).as_unix_millis(); - let timestamp = Some(ScalarImpl::NaiveDateTime(NaiveDateTimeWrapper::new( - NaiveDateTime::from_timestamp( - (time_millis / 1000) as i64, - (time_millis % 1000 * 1_000_000) as u32, - ), - ))); - - let mut data_chunk_builder = DataChunkBuilder::new( - self.schema().data_types(), - if last_timestamp.is_some() { 2 } else { 1 }, - ); - if last_timestamp.is_some() { - let chunk_popped = data_chunk_builder - .append_one_row_from_datums([&last_timestamp].into_iter()); - debug_assert!(chunk_popped.is_none()); - } - let data_chunk = data_chunk_builder - .append_one_row_from_datums([×tamp].into_iter()) - .unwrap(); - let mut ops = if last_timestamp.is_some() { - vec![Op::Delete] - } else { - vec![] - }; - ops.push(Op::Insert); - let stream_chunk = StreamChunk::from_parts(ops, data_chunk); - yield Message::Chunk(stream_chunk); - - yield Message::Watermark(Watermark::new( - 0, - timestamp.as_ref().unwrap().clone(), - )); - - if last_timestamp.is_some() { - self.state_table.delete(Row::new(vec![last_timestamp])); - } - self.state_table.insert(Row::new(vec![timestamp.clone()])); - last_timestamp = timestamp; - - self.state_table.commit(barrier.epoch).await?; + if let Message::Barrier(barrier) = msg? { + if !barrier.is_update() { + let time_millis = Epoch::from(barrier.epoch.curr).as_unix_millis(); + let timestamp = Some(ScalarImpl::NaiveDateTime(NaiveDateTimeWrapper::new( + NaiveDateTime::from_timestamp( + (time_millis / 1000) as i64, + (time_millis % 1000 * 1_000_000) as u32, + ), + ))); + + let mut data_chunk_builder = DataChunkBuilder::new( + self.schema().data_types(), + if last_timestamp.is_some() { 2 } else { 1 }, + ); + if last_timestamp.is_some() { + let chunk_popped = data_chunk_builder + .append_one_row_from_datums([&last_timestamp].into_iter()); + debug_assert!(chunk_popped.is_none()); + } + let data_chunk = data_chunk_builder + .append_one_row_from_datums([×tamp].into_iter()) + .unwrap(); + let mut ops = if last_timestamp.is_some() { + vec![Op::Delete] } else { - self.state_table.commit_no_data_expected(barrier.epoch); + vec![] + }; + ops.push(Op::Insert); + let stream_chunk = StreamChunk::from_parts(ops, data_chunk); + yield Message::Chunk(stream_chunk); + + yield Message::Watermark(Watermark::new( + 0, + timestamp.as_ref().unwrap().clone(), + )); + + if last_timestamp.is_some() { + self.state_table.delete(Row::new(vec![last_timestamp])); } + self.state_table.insert(Row::new(vec![timestamp.clone()])); + last_timestamp = timestamp; - yield Message::Barrier(barrier); + self.state_table.commit(barrier.epoch).await?; + } else { + self.state_table.commit_no_data_expected(barrier.epoch); } - _ => {} + + yield Message::Barrier(barrier); } } } From 68de9315f656f94f970117ebde8bdae653a7c0f9 Mon Sep 17 00:00:00 2001 From: Liang Zhao Date: Wed, 16 Nov 2022 23:33:09 +0800 Subject: [PATCH 3/7] use barrier receiver --- src/stream/src/executor/now.rs | 114 +++++++++++++++++---------------- 1 file changed, 60 insertions(+), 54 deletions(-) diff --git a/src/stream/src/executor/now.rs b/src/stream/src/executor/now.rs index 5436c3c334539..9633f412a853b 100644 --- a/src/stream/src/executor/now.rs +++ b/src/stream/src/executor/now.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use async_stack_trace::StackTrace; use chrono::NaiveDateTime; use futures::{pin_mut, StreamExt}; use futures_async_stream::try_stream; @@ -23,14 +24,17 @@ use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_common::util::epoch::Epoch; use risingwave_storage::table::streaming_table::state_table::StateTable; use risingwave_storage::StateStore; +use tokio::sync::mpsc::UnboundedReceiver; use super::{ - expect_first_barrier, BoxedExecutor, BoxedMessageStream, Executor, Message, PkIndices, - PkIndicesRef, StreamExecutorError, Watermark, + Barrier, BoxedMessageStream, Executor, Message, PkIndices, PkIndicesRef, StreamExecutorError, + Watermark, }; pub struct NowExecutor { - input: Option, + /// Receiver of barrier channel. + barrier_receiver: Option>, + pk_indices: PkIndices, identity: String, schema: Schema, @@ -39,7 +43,11 @@ pub struct NowExecutor { impl NowExecutor { #[allow(dead_code)] - pub fn new(input: BoxedExecutor, executor_id: u64, state_table: StateTable) -> Self { + pub fn new( + barrier_receiver: UnboundedReceiver, + executor_id: u64, + state_table: StateTable, + ) -> Self { let schema = Schema::new(vec![Field { data_type: DataType::Timestamp, name: String::from("now"), @@ -47,7 +55,7 @@ impl NowExecutor { type_name: String::default(), }]); Self { - input: Some(input), + barrier_receiver: Some(barrier_receiver), pk_indices: vec![0], identity: format!("NowExecutor {:X}", executor_id), schema, @@ -57,10 +65,14 @@ impl NowExecutor { #[try_stream(ok = Message, error = StreamExecutorError)] async fn into_stream(mut self) { - let mut input = self.input.take().unwrap().execute(); + let mut barrier_receiver = self.barrier_receiver.take().unwrap(); // Consume the first barrier message and initialize state table. - let barrier = expect_first_barrier(&mut input).await?; + let barrier = barrier_receiver + .recv() + .stack_trace("now_executor_recv_first_barrier") + .await + .unwrap(); self.state_table.init_epoch(barrier.epoch); // The first barrier message should be propagated. @@ -78,57 +90,51 @@ impl NowExecutor { let mut last_timestamp = state_row.and_then(|row| row[0].clone()); - #[for_await] - for msg in input { - if let Message::Barrier(barrier) = msg? { - if !barrier.is_update() { - let time_millis = Epoch::from(barrier.epoch.curr).as_unix_millis(); - let timestamp = Some(ScalarImpl::NaiveDateTime(NaiveDateTimeWrapper::new( - NaiveDateTime::from_timestamp( - (time_millis / 1000) as i64, - (time_millis % 1000 * 1_000_000) as u32, - ), - ))); - - let mut data_chunk_builder = DataChunkBuilder::new( - self.schema().data_types(), - if last_timestamp.is_some() { 2 } else { 1 }, - ); - if last_timestamp.is_some() { - let chunk_popped = data_chunk_builder - .append_one_row_from_datums([&last_timestamp].into_iter()); - debug_assert!(chunk_popped.is_none()); - } - let data_chunk = data_chunk_builder - .append_one_row_from_datums([×tamp].into_iter()) - .unwrap(); - let mut ops = if last_timestamp.is_some() { - vec![Op::Delete] - } else { - vec![] - }; - ops.push(Op::Insert); - let stream_chunk = StreamChunk::from_parts(ops, data_chunk); - yield Message::Chunk(stream_chunk); - - yield Message::Watermark(Watermark::new( - 0, - timestamp.as_ref().unwrap().clone(), - )); - - if last_timestamp.is_some() { - self.state_table.delete(Row::new(vec![last_timestamp])); - } - self.state_table.insert(Row::new(vec![timestamp.clone()])); - last_timestamp = timestamp; - - self.state_table.commit(barrier.epoch).await?; + while let Some(barrier) = barrier_receiver.recv().await { + if !barrier.is_update() { + let time_millis = Epoch::from(barrier.epoch.curr).as_unix_millis(); + let timestamp = Some(ScalarImpl::NaiveDateTime(NaiveDateTimeWrapper::new( + NaiveDateTime::from_timestamp( + (time_millis / 1000) as i64, + (time_millis % 1000 * 1_000_000) as u32, + ), + ))); + + let mut data_chunk_builder = DataChunkBuilder::new( + self.schema().data_types(), + if last_timestamp.is_some() { 2 } else { 1 }, + ); + if last_timestamp.is_some() { + let chunk_popped = data_chunk_builder + .append_one_row_from_datums([&last_timestamp].into_iter()); + debug_assert!(chunk_popped.is_none()); + } + let data_chunk = data_chunk_builder + .append_one_row_from_datums([×tamp].into_iter()) + .unwrap(); + let mut ops = if last_timestamp.is_some() { + vec![Op::Delete] } else { - self.state_table.commit_no_data_expected(barrier.epoch); + vec![] + }; + ops.push(Op::Insert); + let stream_chunk = StreamChunk::from_parts(ops, data_chunk); + yield Message::Chunk(stream_chunk); + + yield Message::Watermark(Watermark::new(0, timestamp.as_ref().unwrap().clone())); + + if last_timestamp.is_some() { + self.state_table.delete(Row::new(vec![last_timestamp])); } + self.state_table.insert(Row::new(vec![timestamp.clone()])); + last_timestamp = timestamp; - yield Message::Barrier(barrier); + self.state_table.commit(barrier.epoch).await?; + } else { + self.state_table.commit_no_data_expected(barrier.epoch); } + + yield Message::Barrier(barrier); } } } From af41b83db08985c677c16255d80f87c091f64550 Mon Sep 17 00:00:00 2001 From: Liang Zhao Date: Wed, 16 Nov 2022 23:40:27 +0800 Subject: [PATCH 4/7] timestamp sanity check --- src/stream/src/executor/now.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/stream/src/executor/now.rs b/src/stream/src/executor/now.rs index 9633f412a853b..2fdbbb100cdd0 100644 --- a/src/stream/src/executor/now.rs +++ b/src/stream/src/executor/now.rs @@ -91,15 +91,23 @@ impl NowExecutor { let mut last_timestamp = state_row.and_then(|row| row[0].clone()); while let Some(barrier) = barrier_receiver.recv().await { - if !barrier.is_update() { + let mut timestamp = None; + let should_update = if barrier.is_update() { + false + } else { let time_millis = Epoch::from(barrier.epoch.curr).as_unix_millis(); - let timestamp = Some(ScalarImpl::NaiveDateTime(NaiveDateTimeWrapper::new( + timestamp = Some(ScalarImpl::NaiveDateTime(NaiveDateTimeWrapper::new( NaiveDateTime::from_timestamp( (time_millis / 1000) as i64, (time_millis % 1000 * 1_000_000) as u32, ), ))); + last_timestamp.as_ref().map_or(true, |last_timestamp| { + last_timestamp <= timestamp.as_ref().unwrap() + }) + }; + if should_update { let mut data_chunk_builder = DataChunkBuilder::new( self.schema().data_types(), if last_timestamp.is_some() { 2 } else { 1 }, From e4e4a39bf5fd6eb92c696414a92fab7a87b306de Mon Sep 17 00:00:00 2001 From: Liang Zhao Date: Thu, 17 Nov 2022 10:33:22 +0800 Subject: [PATCH 5/7] add now executor builder --- dashboard/proto/gen/stream_plan.ts | 42 ++++++ proto/stream_plan.proto | 6 + src/stream/src/executor/mod.rs | 21 +++ src/stream/src/executor/now.rs | 213 +++++++++++++++++++++++------ src/stream/src/from_proto/mod.rs | 3 + src/stream/src/from_proto/now.rs | 52 +++++++ 6 files changed, 298 insertions(+), 39 deletions(-) create mode 100644 src/stream/src/from_proto/now.rs diff --git a/dashboard/proto/gen/stream_plan.ts b/dashboard/proto/gen/stream_plan.ts index 22d8287d95968..ab85df349cd3e 100644 --- a/dashboard/proto/gen/stream_plan.ts +++ b/dashboard/proto/gen/stream_plan.ts @@ -701,6 +701,11 @@ export interface RowIdGenNode { rowIdIndex: number; } +export interface NowNode { + /** Persists emitted 'now'. */ + stateTable: Table | undefined; +} + export interface StreamNode { nodeBody?: | { $case: "source"; source: SourceNode } @@ -730,8 +735,13 @@ export interface StreamNode { | { $case: "groupTopN"; groupTopN: GroupTopNNode } | { $case: "sort"; sort: SortNode } | { $case: "watermarkFilter"; watermarkFilter: WatermarkFilterNode } +<<<<<<< HEAD | { $case: "dml"; dml: DmlNode } | { $case: "rowIdGen"; rowIdGen: RowIdGenNode }; +======= + | { $case: "rowIdGen"; rowIdGen: RowIdGenNode } + | { $case: "now"; now: NowNode }; +>>>>>>> 5f8266a37 (add now executor builder) /** * The id for the operator. This is local per mview. * TODO: should better be a uint32. @@ -3085,6 +3095,31 @@ export const RowIdGenNode = { }, }; +function createBaseNowNode(): NowNode { + return { stateTable: undefined }; +} + +export const NowNode = { + fromJSON(object: any): NowNode { + return { stateTable: isSet(object.stateTable) ? Table.fromJSON(object.stateTable) : undefined }; + }, + + toJSON(message: NowNode): unknown { + const obj: any = {}; + message.stateTable !== undefined && + (obj.stateTable = message.stateTable ? Table.toJSON(message.stateTable) : undefined); + return obj; + }, + + fromPartial, I>>(object: I): NowNode { + const message = createBaseNowNode(); + message.stateTable = (object.stateTable !== undefined && object.stateTable !== null) + ? Table.fromPartial(object.stateTable) + : undefined; + return message; + }, +}; + function createBaseStreamNode(): StreamNode { return { nodeBody: undefined, operatorId: 0, input: [], streamKey: [], appendOnly: false, identity: "", fields: [] }; } @@ -3150,6 +3185,8 @@ export const StreamNode = { ? { $case: "dml", dml: DmlNode.fromJSON(object.dml) } : isSet(object.rowIdGen) ? { $case: "rowIdGen", rowIdGen: RowIdGenNode.fromJSON(object.rowIdGen) } + : isSet(object.now) + ? { $case: "now", now: NowNode.fromJSON(object.now) } : undefined, operatorId: isSet(object.operatorId) ? Number(object.operatorId) : 0, input: Array.isArray(object?.input) @@ -3230,6 +3267,8 @@ export const StreamNode = { (obj.dml = message.nodeBody?.dml ? DmlNode.toJSON(message.nodeBody?.dml) : undefined); message.nodeBody?.$case === "rowIdGen" && (obj.rowIdGen = message.nodeBody?.rowIdGen ? RowIdGenNode.toJSON(message.nodeBody?.rowIdGen) : undefined); + message.nodeBody?.$case === "now" && + (obj.now = message.nodeBody?.now ? NowNode.toJSON(message.nodeBody?.now) : undefined); message.operatorId !== undefined && (obj.operatorId = Math.round(message.operatorId)); if (message.input) { obj.input = message.input.map((e) => @@ -3448,6 +3487,9 @@ export const StreamNode = { ) { message.nodeBody = { $case: "rowIdGen", rowIdGen: RowIdGenNode.fromPartial(object.nodeBody.rowIdGen) }; } + if (object.nodeBody?.$case === "now" && object.nodeBody?.now !== undefined && object.nodeBody?.now !== null) { + message.nodeBody = { $case: "now", now: NowNode.fromPartial(object.nodeBody.now) }; + } message.operatorId = object.operatorId ?? 0; message.input = object.input?.map((e) => StreamNode.fromPartial(e)) || []; message.streamKey = object.streamKey?.map((e) => e) || []; diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index e9103359d7c83..518afc9115dd8 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -440,6 +440,11 @@ message RowIdGenNode { uint64 row_id_index = 1; } +message NowNode { + // Persists emitted 'now'. + catalog.Table state_table = 1; +} + message StreamNode { oneof node_body { SourceNode source = 100; @@ -471,6 +476,7 @@ message StreamNode { WatermarkFilterNode watermark_filter = 126; DmlNode dml = 127; RowIdGenNode row_id_gen = 128; + NowNode now = 129; } // The id for the operator. This is local per mview. // TODO: should better be a uint32. diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index f2a9781f60758..aabdd63c06956 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -110,6 +110,7 @@ pub use lookup::*; pub use lookup_union::LookupUnionExecutor; pub use merge::MergeExecutor; pub use mview::*; +pub use now::NowExecutor; pub use project::ProjectExecutor; pub use project_set::*; pub use rearranged_chain::RearrangedChainExecutor; @@ -236,6 +237,15 @@ impl Barrier { } } + pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self { + Self { + epoch: EpochPair::new(epoch, prev_epoch), + checkpoint: true, + mutation: Default::default(), + passed_actors: Default::default(), + } + } + #[must_use] pub fn with_mutation(self, mutation: Mutation) -> Self { Self { @@ -280,11 +290,22 @@ impl Barrier { ) } + /// Whether this barrier is for pause. + pub fn is_pause(&self) -> bool { + matches!(self.mutation.as_deref(), Some(Mutation::Pause)) + } + /// Whether this barrier is for configuration change. Used for source executor initialization. pub fn is_update(&self) -> bool { matches!(self.mutation.as_deref(), Some(Mutation::Update { .. })) } + /// Whether this barrier is for resume. Used for now executor to determine whether to yield a + /// chunk and a watermark before this barrier. + pub fn is_resume(&self) -> bool { + matches!(self.mutation.as_deref(), Some(Mutation::Resume)) + } + /// Returns the [`MergeUpdate`] if this barrier is to update the merge executors for the actor /// with `actor_id`. pub fn as_update_merge( diff --git a/src/stream/src/executor/now.rs b/src/stream/src/executor/now.rs index 2fdbbb100cdd0..f0fe527957d0f 100644 --- a/src/stream/src/executor/now.rs +++ b/src/stream/src/executor/now.rs @@ -16,24 +16,23 @@ use async_stack_trace::StackTrace; use chrono::NaiveDateTime; use futures::{pin_mut, StreamExt}; use futures_async_stream::try_stream; -use risingwave_common::array::{Op, StreamChunk}; +use risingwave_common::array::{DataChunk, Op, StreamChunk}; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::row::Row; use risingwave_common::types::{DataType, NaiveDateTimeWrapper, ScalarImpl}; -use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_common::util::epoch::Epoch; -use risingwave_storage::table::streaming_table::state_table::StateTable; use risingwave_storage::StateStore; use tokio::sync::mpsc::UnboundedReceiver; use super::{ - Barrier, BoxedMessageStream, Executor, Message, PkIndices, PkIndicesRef, StreamExecutorError, - Watermark, + Barrier, BoxedMessageStream, Executor, Message, Mutation, PkIndices, PkIndicesRef, + StreamExecutorError, Watermark, }; +use crate::common::table::state_table::StateTable; pub struct NowExecutor { /// Receiver of barrier channel. - barrier_receiver: Option>, + barrier_receiver: UnboundedReceiver, pk_indices: PkIndices, identity: String, @@ -42,7 +41,6 @@ pub struct NowExecutor { } impl NowExecutor { - #[allow(dead_code)] pub fn new( barrier_receiver: UnboundedReceiver, executor_id: u64, @@ -55,8 +53,8 @@ impl NowExecutor { type_name: String::default(), }]); Self { - barrier_receiver: Some(barrier_receiver), - pk_indices: vec![0], + barrier_receiver, + pk_indices: vec![], identity: format!("NowExecutor {:X}", executor_id), schema, state_table, @@ -64,8 +62,13 @@ impl NowExecutor { } #[try_stream(ok = Message, error = StreamExecutorError)] - async fn into_stream(mut self) { - let mut barrier_receiver = self.barrier_receiver.take().unwrap(); + async fn into_stream(self) { + let Self { + mut barrier_receiver, + mut state_table, + schema, + .. + } = self; // Consume the first barrier message and initialize state table. let barrier = barrier_receiver @@ -73,13 +76,15 @@ impl NowExecutor { .stack_trace("now_executor_recv_first_barrier") .await .unwrap(); - self.state_table.init_epoch(barrier.epoch); + let mut is_pausing = barrier.is_pause() || barrier.is_update(); + + state_table.init_epoch(barrier.epoch); // The first barrier message should be propagated. yield Message::Barrier(barrier); let state_row = { - let data_iter = self.state_table.iter().await?; + let data_iter = state_table.iter().await?; pin_mut!(data_iter); if let Some(state_row) = data_iter.next().await { Some(state_row?) @@ -91,35 +96,26 @@ impl NowExecutor { let mut last_timestamp = state_row.and_then(|row| row[0].clone()); while let Some(barrier) = barrier_receiver.recv().await { - let mut timestamp = None; - let should_update = if barrier.is_update() { - false - } else { + if !is_pausing { let time_millis = Epoch::from(barrier.epoch.curr).as_unix_millis(); - timestamp = Some(ScalarImpl::NaiveDateTime(NaiveDateTimeWrapper::new( + let timestamp = Some(ScalarImpl::NaiveDateTime(NaiveDateTimeWrapper::new( NaiveDateTime::from_timestamp( (time_millis / 1000) as i64, (time_millis % 1000 * 1_000_000) as u32, ), ))); - last_timestamp.as_ref().map_or(true, |last_timestamp| { - last_timestamp <= timestamp.as_ref().unwrap() - }) - }; - if should_update { - let mut data_chunk_builder = DataChunkBuilder::new( - self.schema().data_types(), - if last_timestamp.is_some() { 2 } else { 1 }, + let data_chunk = DataChunk::from_rows( + &if last_timestamp.is_some() { + vec![ + Row::new(vec![last_timestamp.clone()]), + Row::new(vec![timestamp.clone()]), + ] + } else { + vec![Row::new(vec![timestamp.clone()])] + }, + &schema.data_types(), ); - if last_timestamp.is_some() { - let chunk_popped = data_chunk_builder - .append_one_row_from_datums([&last_timestamp].into_iter()); - debug_assert!(chunk_popped.is_none()); - } - let data_chunk = data_chunk_builder - .append_one_row_from_datums([×tamp].into_iter()) - .unwrap(); let mut ops = if last_timestamp.is_some() { vec![Op::Delete] } else { @@ -129,17 +125,28 @@ impl NowExecutor { let stream_chunk = StreamChunk::from_parts(ops, data_chunk); yield Message::Chunk(stream_chunk); - yield Message::Watermark(Watermark::new(0, timestamp.as_ref().unwrap().clone())); + yield Message::Watermark(Watermark::new( + 0, + DataType::TIMESTAMP, + timestamp.as_ref().unwrap().clone(), + )); if last_timestamp.is_some() { - self.state_table.delete(Row::new(vec![last_timestamp])); + state_table.delete(Row::new(vec![last_timestamp])); } - self.state_table.insert(Row::new(vec![timestamp.clone()])); + state_table.insert(Row::new(vec![timestamp.clone()])); last_timestamp = timestamp; - self.state_table.commit(barrier.epoch).await?; + state_table.commit(barrier.epoch).await?; } else { - self.state_table.commit_no_data_expected(barrier.epoch); + state_table.commit_no_data_expected(barrier.epoch); + } + if let Some(mutation) = barrier.mutation.as_deref() { + match mutation { + Mutation::Pause | Mutation::Update { .. } => is_pausing = true, + Mutation::Resume => is_pausing = false, + _ => {} + } } yield Message::Barrier(barrier); @@ -164,3 +171,131 @@ impl Executor for NowExecutor { self.identity.as_str() } } + +#[cfg(test)] +mod tests { + use std::str::FromStr; + + use chrono::NaiveDateTime; + use futures::StreamExt; + use risingwave_common::array::StreamChunk; + use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId}; + use risingwave_common::test_prelude::StreamChunkTestExt; + use risingwave_common::types::{DataType, NaiveDateTimeWrapper, ScalarImpl}; + use risingwave_common::util::sort_util::OrderType; + use risingwave_storage::memory::MemoryStateStore; + use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; + + use super::NowExecutor; + use crate::common::table::state_table::StateTable; + use crate::executor::{Barrier, BoxedMessageStream, Executor, Message, PkIndices, Watermark}; + + #[tokio::test] + async fn test_now() { + let state_table = create_state_table().await; + let (tx, mut now_executor) = create_executor(state_table); + + // Init barrier + tx.send(Barrier::new_test_barrier(1)).unwrap(); + + // Consume the barrier + now_executor.next().await.unwrap().unwrap(); + + tx.send(Barrier::with_prev_epoch_for_test(1 << 16, 1)) + .unwrap(); + + // Consume the data chunk + let chunk_msg = now_executor.next().await.unwrap().unwrap(); + + assert_eq!( + chunk_msg.into_chunk().unwrap().compact(), + StreamChunk::from_pretty( + " TS + + 2021-04-01T00:00:00.001" + ) + ); + + // Consume the watermark + let watermark = now_executor.next().await.unwrap().unwrap(); + + assert_eq!( + watermark, + Message::Watermark(Watermark::new( + 0, + DataType::TIMESTAMP, + ScalarImpl::NaiveDateTime(NaiveDateTimeWrapper::new( + NaiveDateTime::from_str("2021-04-01T00:00:00.001").unwrap() + )) + )) + ); + + // Consume the barrier + now_executor.next().await.unwrap().unwrap(); + + tx.send(Barrier::with_prev_epoch_for_test(2 << 16, 1 << 16)) + .unwrap(); + + // Consume the data chunk + let chunk_msg = now_executor.next().await.unwrap().unwrap(); + + assert_eq!( + chunk_msg.into_chunk().unwrap().compact(), + StreamChunk::from_pretty( + " TS + - 2021-04-01T00:00:00.001 + + 2021-04-01T00:00:00.002" + ) + ); + + // Consume the watermark + let watermark = now_executor.next().await.unwrap().unwrap(); + + assert_eq!( + watermark, + Message::Watermark(Watermark::new( + 0, + DataType::TIMESTAMP, + ScalarImpl::NaiveDateTime(NaiveDateTimeWrapper::new( + NaiveDateTime::from_str("2021-04-01T00:00:00.002").unwrap() + )) + )) + ); + + // Consume the barrier + now_executor.next().await.unwrap().unwrap(); + } + + #[inline] + fn create_pk_indices() -> PkIndices { + vec![] + } + + #[inline] + fn create_order_types() -> Vec { + vec![] + } + + async fn create_state_table() -> StateTable { + let memory_state_store = MemoryStateStore::new(); + let table_id = TableId::new(1); + let column_descs = vec![ColumnDesc::unnamed(ColumnId::new(0), DataType::Timestamp)]; + let order_types = create_order_types(); + let pk_indices = create_pk_indices(); + StateTable::new_without_distribution( + memory_state_store, + table_id, + column_descs, + order_types, + pk_indices, + ) + .await + } + + fn create_executor( + state_table: StateTable, + ) -> (UnboundedSender, BoxedMessageStream) { + let (sender, barrier_receiver) = unbounded_channel(); + let now_executor = NowExecutor::new(barrier_receiver, 1, state_table); + (sender, Box::new(now_executor).execute()) + } +} diff --git a/src/stream/src/from_proto/mod.rs b/src/stream/src/from_proto/mod.rs index f5107b7db17e0..6ca89e32551d9 100644 --- a/src/stream/src/from_proto/mod.rs +++ b/src/stream/src/from_proto/mod.rs @@ -31,6 +31,7 @@ mod lookup; mod lookup_union; mod merge; mod mview; +mod now; mod project; mod project_set; mod row_id_gen; @@ -64,6 +65,7 @@ use self::lookup::*; use self::lookup_union::*; use self::merge::*; use self::mview::*; +use self::now::NowExecutorBuilder; use self::project::*; use self::project_set::*; use self::row_id_gen::RowIdGenExecutorBuilder; @@ -143,5 +145,6 @@ pub async fn create_executor( NodeBody::WatermarkFilter => WatermarkFilterBuilder, NodeBody::Dml => DmlExecutorBuilder, NodeBody::RowIdGen => RowIdGenExecutorBuilder, + NodeBody::Now => NowExecutorBuilder, } } diff --git a/src/stream/src/from_proto/now.rs b/src/stream/src/from_proto/now.rs new file mode 100644 index 0000000000000..b52d5fed2bae4 --- /dev/null +++ b/src/stream/src/from_proto/now.rs @@ -0,0 +1,52 @@ +// Copyright 2022 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_pb::stream_plan::NowNode; +use risingwave_storage::StateStore; +use tokio::sync::mpsc::unbounded_channel; + +use super::ExecutorBuilder; +use crate::common::table::state_table::StateTable; +use crate::error::StreamResult; +use crate::executor::{BoxedExecutor, NowExecutor}; +use crate::task::{ExecutorParams, LocalStreamManagerCore}; + +pub struct NowExecutorBuilder; + +#[async_trait::async_trait] +impl ExecutorBuilder for NowExecutorBuilder { + type Node = NowNode; + + async fn new_boxed_executor( + params: ExecutorParams, + node: &NowNode, + store: impl StateStore, + stream: &mut LocalStreamManagerCore, + ) -> StreamResult { + let (sender, barrier_receiver) = unbounded_channel(); + stream + .context + .lock_barrier_manager() + .register_sender(params.actor_context.id, sender); + + let state_table = + StateTable::from_table_catalog(node.get_state_table()?, store, None).await; + + Ok(Box::new(NowExecutor::new( + barrier_receiver, + params.executor_id, + state_table, + ))) + } +} From cde35d65e394fd4a4cb70acef02014dfe7b395d5 Mon Sep 17 00:00:00 2001 From: Liang Zhao Date: Thu, 24 Nov 2022 16:45:43 +0800 Subject: [PATCH 6/7] fix gen proto --- dashboard/proto/gen/stream_plan.ts | 4 ---- 1 file changed, 4 deletions(-) diff --git a/dashboard/proto/gen/stream_plan.ts b/dashboard/proto/gen/stream_plan.ts index ab85df349cd3e..e988c9702ea70 100644 --- a/dashboard/proto/gen/stream_plan.ts +++ b/dashboard/proto/gen/stream_plan.ts @@ -735,13 +735,9 @@ export interface StreamNode { | { $case: "groupTopN"; groupTopN: GroupTopNNode } | { $case: "sort"; sort: SortNode } | { $case: "watermarkFilter"; watermarkFilter: WatermarkFilterNode } -<<<<<<< HEAD | { $case: "dml"; dml: DmlNode } - | { $case: "rowIdGen"; rowIdGen: RowIdGenNode }; -======= | { $case: "rowIdGen"; rowIdGen: RowIdGenNode } | { $case: "now"; now: NowNode }; ->>>>>>> 5f8266a37 (add now executor builder) /** * The id for the operator. This is local per mview. * TODO: should better be a uint32. From 26d572f608f92b60893a037bdbeaa6f03a6df7a0 Mon Sep 17 00:00:00 2001 From: Liang Zhao Date: Thu, 24 Nov 2022 17:02:18 +0800 Subject: [PATCH 7/7] rebase main --- src/stream/src/executor/now.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/stream/src/executor/now.rs b/src/stream/src/executor/now.rs index f0fe527957d0f..612589946b3c5 100644 --- a/src/stream/src/executor/now.rs +++ b/src/stream/src/executor/now.rs @@ -99,10 +99,11 @@ impl NowExecutor { if !is_pausing { let time_millis = Epoch::from(barrier.epoch.curr).as_unix_millis(); let timestamp = Some(ScalarImpl::NaiveDateTime(NaiveDateTimeWrapper::new( - NaiveDateTime::from_timestamp( + NaiveDateTime::from_timestamp_opt( (time_millis / 1000) as i64, (time_millis % 1000 * 1_000_000) as u32, - ), + ) + .unwrap(), ))); let data_chunk = DataChunk::from_rows(