Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stream): implement streaming part of now executor #6408

Merged
merged 8 commits into from
Dec 2, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion dashboard/proto/gen/stream_plan.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,11 @@ message RowIdGenNode {
uint64 row_id_index = 1;
}

message NowNode {
// Persists emitted 'now'.
catalog.Table state_table = 1;
}

message StreamNode {
oneof node_body {
SourceNode source = 100;
Expand Down Expand Up @@ -471,6 +476,7 @@ message StreamNode {
WatermarkFilterNode watermark_filter = 126;
DmlNode dml = 127;
RowIdGenNode row_id_gen = 128;
NowNode now = 129;
}
// The id for the operator. This is local per mview.
// TODO: should better be a uint32.
Expand Down
8 changes: 7 additions & 1 deletion src/common/src/util/epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ use std::time::{Duration, SystemTime};

use parse_display::Display;

static UNIX_SINGULARITY_DATE_SEC: u64 = 1_617_235_200;

/// `UNIX_SINGULARITY_DATE_EPOCH` represents the singularity date of the UNIX epoch:
/// 2021-04-01T00:00:00Z.
pub static UNIX_SINGULARITY_DATE_EPOCH: LazyLock<SystemTime> =
LazyLock::new(|| SystemTime::UNIX_EPOCH + Duration::from_secs(1_617_235_200));
LazyLock::new(|| SystemTime::UNIX_EPOCH + Duration::from_secs(UNIX_SINGULARITY_DATE_SEC));

#[derive(Clone, Copy, Debug, Display, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Epoch(pub u64);
Expand Down Expand Up @@ -72,6 +74,10 @@ impl Epoch {
.as_millis() as u64
}

pub fn as_unix_millis(&self) -> u64 {
UNIX_SINGULARITY_DATE_SEC * 1000 + self.physical_time()
}

/// Returns the epoch in real system time.
pub fn as_system_time(&self) -> SystemTime {
*UNIX_SINGULARITY_DATE_EPOCH + Duration::from_millis(self.physical_time())
Expand Down
22 changes: 22 additions & 0 deletions src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ mod lookup_union;
mod managed_state;
mod merge;
mod mview;
mod now;
mod project;
mod project_set;
mod rearranged_chain;
Expand Down Expand Up @@ -109,6 +110,7 @@ pub use lookup::*;
pub use lookup_union::LookupUnionExecutor;
pub use merge::MergeExecutor;
pub use mview::*;
pub use now::NowExecutor;
pub use project::ProjectExecutor;
pub use project_set::*;
pub use rearranged_chain::RearrangedChainExecutor;
Expand Down Expand Up @@ -235,6 +237,15 @@ impl Barrier {
}
}

pub fn with_prev_epoch_for_test(epoch: u64, prev_epoch: u64) -> Self {
Self {
epoch: EpochPair::new(epoch, prev_epoch),
checkpoint: true,
mutation: Default::default(),
passed_actors: Default::default(),
}
}

#[must_use]
pub fn with_mutation(self, mutation: Mutation) -> Self {
Self {
Expand Down Expand Up @@ -279,11 +290,22 @@ impl Barrier {
)
}

/// Whether this barrier is for pause.
pub fn is_pause(&self) -> bool {
matches!(self.mutation.as_deref(), Some(Mutation::Pause))
}

/// Whether this barrier is for configuration change. Used for source executor initialization.
pub fn is_update(&self) -> bool {
matches!(self.mutation.as_deref(), Some(Mutation::Update { .. }))
}

/// Whether this barrier is for resume. Used for now executor to determine whether to yield a
/// chunk and a watermark before this barrier.
pub fn is_resume(&self) -> bool {
matches!(self.mutation.as_deref(), Some(Mutation::Resume))
}

/// Returns the [`MergeUpdate`] if this barrier is to update the merge executors for the actor
/// with `actor_id`.
pub fn as_update_merge(
Expand Down
Loading