Skip to content

Commit e60d79c

Browse files
soundOfDestinymergify[bot]
authored andcommitted
feat(stream): implement streaming part of now executor (#6408)
* feat(stream): implement now executor (close #6407) * use if let * use barrier receiver * timestamp sanity check * add now executor builder * fix gen proto * rebase main Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
1 parent 5a0fad1 commit e60d79c

File tree

7 files changed

+431
-2
lines changed

7 files changed

+431
-2
lines changed

dashboard/proto/gen/stream_plan.ts

+39-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

proto/stream_plan.proto

+6
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,11 @@ message RowIdGenNode {
442442
uint64 row_id_index = 1;
443443
}
444444

445+
message NowNode {
446+
// Persists emitted 'now'.
447+
catalog.Table state_table = 1;
448+
}
449+
445450
message StreamNode {
446451
oneof node_body {
447452
SourceNode source = 100;
@@ -473,6 +478,7 @@ message StreamNode {
473478
WatermarkFilterNode watermark_filter = 126;
474479
DmlNode dml = 127;
475480
RowIdGenNode row_id_gen = 128;
481+
NowNode now = 129;
476482
}
477483
// The id for the operator. This is local per mview.
478484
// TODO: should better be a uint32.

src/common/src/util/epoch.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@ use std::time::{Duration, SystemTime};
1818

1919
use parse_display::Display;
2020

21+
static UNIX_SINGULARITY_DATE_SEC: u64 = 1_617_235_200;
22+
2123
/// `UNIX_SINGULARITY_DATE_EPOCH` represents the singularity date of the UNIX epoch:
2224
/// 2021-04-01T00:00:00Z.
2325
pub static UNIX_SINGULARITY_DATE_EPOCH: LazyLock<SystemTime> =
24-
LazyLock::new(|| SystemTime::UNIX_EPOCH + Duration::from_secs(1_617_235_200));
26+
LazyLock::new(|| SystemTime::UNIX_EPOCH + Duration::from_secs(UNIX_SINGULARITY_DATE_SEC));
2527

2628
#[derive(Clone, Copy, Debug, Display, PartialEq, Eq, PartialOrd, Ord, Hash)]
2729
pub struct Epoch(pub u64);
@@ -72,6 +74,10 @@ impl Epoch {
7274
.as_millis() as u64
7375
}
7476

77+
pub fn as_unix_millis(&self) -> u64 {
78+
UNIX_SINGULARITY_DATE_SEC * 1000 + self.physical_time()
79+
}
80+
7581
/// Returns the epoch in real system time.
7682
pub fn as_system_time(&self) -> SystemTime {
7783
*UNIX_SINGULARITY_DATE_EPOCH + Duration::from_millis(self.physical_time())

src/stream/src/executor/mod.rs

+22
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ mod lookup_union;
6969
mod managed_state;
7070
mod merge;
7171
mod mview;
72+
mod now;
7273
mod project;
7374
mod project_set;
7475
mod rearranged_chain;
@@ -109,6 +110,7 @@ pub use lookup::*;
109110
pub use lookup_union::LookupUnionExecutor;
110111
pub use merge::MergeExecutor;
111112
pub use mview::*;
113+
pub use now::NowExecutor;
112114
pub use project::ProjectExecutor;
113115
pub use project_set::*;
114116
pub use rearranged_chain::RearrangedChainExecutor;
@@ -235,6 +237,15 @@ impl Barrier {
235237
}
236238
}
237239

240+
pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self {
241+
Self {
242+
epoch: EpochPair::new(epoch, prev_epoch),
243+
checkpoint: true,
244+
mutation: Default::default(),
245+
passed_actors: Default::default(),
246+
}
247+
}
248+
238249
#[must_use]
239250
pub fn with_mutation(self, mutation: Mutation) -> Self {
240251
Self {
@@ -279,11 +290,22 @@ impl Barrier {
279290
)
280291
}
281292

293+
/// Whether this barrier is for pause.
294+
pub fn is_pause(&self) -> bool {
295+
matches!(self.mutation.as_deref(), Some(Mutation::Pause))
296+
}
297+
282298
/// Whether this barrier is for configuration change. Used for source executor initialization.
283299
pub fn is_update(&self) -> bool {
284300
matches!(self.mutation.as_deref(), Some(Mutation::Update { .. }))
285301
}
286302

303+
/// Whether this barrier is for resume. Used for now executor to determine whether to yield a
304+
/// chunk and a watermark before this barrier.
305+
pub fn is_resume(&self) -> bool {
306+
matches!(self.mutation.as_deref(), Some(Mutation::Resume))
307+
}
308+
287309
/// Returns the [`MergeUpdate`] if this barrier is to update the merge executors for the actor
288310
/// with `actor_id`.
289311
pub fn as_update_merge(

0 commit comments

Comments
 (0)