Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(state_table): state table try flush for non-checkpoint barrier #12418

Closed
wants to merge 43 commits into from
Closed
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
faeb44c
save work
wcy-fdu Aug 31, 2023
5f393af
add unit tests
wcy-fdu Sep 1, 2023
ea15a02
minor
wcy-fdu Sep 1, 2023
7319bbd
minor refactor
wcy-fdu Sep 4, 2023
97576bd
save work
wcy-fdu Sep 4, 2023
c442aa0
refactor interface
wcy-fdu Sep 4, 2023
d71293f
miss delete range
wcy-fdu Sep 4, 2023
e04eb82
minor
wcy-fdu Sep 4, 2023
53f186e
Merge branch 'main' into wcy/write_anytime
wcy-fdu Sep 4, 2023
24a1f88
impl try flush for traced store
wcy-fdu Sep 4, 2023
597bf79
Merge branch 'wcy/write_anytime' of https://github.com/risingwavelabs…
wcy-fdu Sep 4, 2023
02e7dc1
add epoch gap limit
wcy-fdu Sep 5, 2023
059203a
init prev epoch
wcy-fdu Sep 6, 2023
cbe8404
todo: make threshold configable
wcy-fdu Sep 6, 2023
cfe8615
only try flush, no gap epoch
wcy-fdu Sep 19, 2023
0df0fb7
only try flush, add it to hash join
wcy-fdu Sep 19, 2023
e82ac05
minor
wcy-fdu Sep 19, 2023
f6e73df
resolve conflict
wcy-fdu Sep 19, 2023
0ee8eef
mem table can not be empty when sealing epoch
wcy-fdu Sep 19, 2023
51c896d
apply is_checkpoint judgement for all executors
wcy-fdu Sep 20, 2023
5967702
remove is_dirty_check
wcy-fdu Sep 20, 2023
e438ec6
generate-example-config
wcy-fdu Sep 20, 2023
92d576f
Merge branch 'main' into wcy/try_flush_only
wcy-fdu Sep 20, 2023
9cdd3d5
barrier 100 ms, ckpt 1s for test
wcy-fdu Sep 22, 2023
706fda8
Merge branch 'wcy/try_flush_only' of https://github.com/risingwavelab…
wcy-fdu Sep 22, 2023
c31ffa1
Merge remote-tracking branch 'origin/main' into wcy/try_flush_only
fuyufjh Oct 3, 2023
134af8a
resolve conflict
wcy-fdu Oct 11, 2023
e7b3698
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
wcy-fdu Oct 11, 2023
55892f3
resolve conflict
wcy-fdu Oct 11, 2023
f99ea57
Merge branch 'wcy/try_flush_only' of https://github.com/risingwavelab…
wcy-fdu Oct 11, 2023
6d30dc0
disable decouple
wcy-fdu Oct 11, 2023
d020c84
minor
wcy-fdu Oct 11, 2023
d790782
resolve some comments
wcy-fdu Oct 12, 2023
32653dc
rebase main
wcy-fdu Oct 18, 2023
f535583
Merge branch 'main' into wcy/try_flush_only
wcy-fdu Oct 18, 2023
1fa5fa6
minor
wcy-fdu Oct 19, 2023
6328bee
Merge branch 'wcy/try_flush_only' of https://github.com/risingwavelab…
wcy-fdu Oct 19, 2023
db4235c
Merge branch 'main' into wcy/try_flush_only
wcy-fdu Oct 20, 2023
7faff93
rebase main
wcy-fdu Nov 8, 2023
331aef8
Merge branch 'wcy/try_flush_only' of https://github.com/risingwavelab…
wcy-fdu Nov 8, 2023
792fc22
merge main
wcy-fdu Nov 15, 2023
e18d87e
rebase main
wcy-fdu Nov 28, 2023
32d56db
fix
wcy-fdu Nov 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/storage/hummock_trace/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,10 @@ impl TraceSpan {
Self::new_global_op(Operation::Flush(delete_range), storage_type)
}

pub fn new_try_flush_span(storage_type: StorageType) -> MayTraceSpan {
Self::new_global_op(Operation::Flush(vec![]), storage_type)
}

pub fn new_meta_message_span(resp: SubscribeResponse) -> MayTraceSpan {
Self::new_global_op(
Operation::MetaMessage(Box::new(TracedSubResp::from(resp))),
Expand Down
1 change: 1 addition & 0 deletions src/storage/hummock_trace/src/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ pub enum OperationResult {
Get(TraceResult<Option<TracedBytes>>),
Insert(TraceResult<()>),
Delete(TraceResult<()>),
TryFlush(TraceResult<()>),
Flush(TraceResult<usize>),
Iter(TraceResult<()>),
IterNext(TraceResult<Option<(TracedBytes, TracedBytes)>>),
Expand Down
15 changes: 14 additions & 1 deletion src/storage/src/hummock/store/local_hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use crate::storage_value::StorageValue;
use crate::store::*;
use crate::StateStoreIter;

const MEM_TABLE_SPILL_THRESHOLD: usize = 64 * 1024 * 1024;
/// `LocalHummockStorage` is a handle for a state table shard to access data from and write data to
/// the hummock state backend. It is created via `HummockStorage::new_local`.
pub struct LocalHummockStorage {
Expand Down Expand Up @@ -321,6 +322,19 @@ impl LocalStateStore for LocalHummockStorage {
.await
}

async fn try_flush(&mut self) -> StorageResult<()> {
if self.mem_table.kv_size.size() > MEM_TABLE_SPILL_THRESHOLD {
tracing::info!(
"The size of mem table exceeds 64 Mb and spill occurs. table_id {}",
self.table_id.table_id()
);
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved

self.flush(vec![]).await?;
}

Ok(())
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
}

fn epoch(&self) -> u64 {
self.epoch.expect("should have set the epoch")
}
Expand All @@ -343,7 +357,6 @@ impl LocalStateStore for LocalHummockStorage {
}

fn seal_current_epoch(&mut self, next_epoch: u64) {
assert!(!self.is_dirty());
let prev_epoch = self
.epoch
.replace(next_epoch)
Expand Down
13 changes: 13 additions & 0 deletions src/storage/src/mem_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,19 @@ impl<S: StateStoreWrite + StateStoreRead> LocalStateStore for MemtableLocalState
prev_epoch
);
}

async fn try_flush(&mut self) -> StorageResult<()> {
tracing::info!("In memory state store");
if self.mem_table.kv_size.size() > 64 * 1024 * 1024 {
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
tracing::info!(
"The size of mem table exceeds 64 Mb and spill occurs. table_id {}",
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
self.table_id.table_id()
);
self.flush(vec![]).await?;
}

Ok(())
}
}

#[cfg(test)]
Expand Down
7 changes: 6 additions & 1 deletion src/storage/src/monitor/monitored_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,6 @@ impl<S: LocalStateStore> LocalStateStore for MonitoredStateStore<S> {
&mut self,
delete_ranges: Vec<(Bound<Bytes>, Bound<Bytes>)>,
) -> impl Future<Output = StorageResult<usize>> + Send + '_ {
// TODO: collect metrics
self.inner
.flush(delete_ranges)
.verbose_instrument_await("store_flush")
Expand All @@ -285,6 +284,12 @@ impl<S: LocalStateStore> LocalStateStore for MonitoredStateStore<S> {
// TODO: may collect metrics
self.inner.seal_current_epoch(next_epoch)
}

fn try_flush(&mut self) -> impl Future<Output = StorageResult<()>> + Send + '_ {
self.inner
.try_flush()
.verbose_instrument_await("store_try_flush")
}
}

impl<S: StateStore> StateStore for MonitoredStateStore<S> {
Expand Down
7 changes: 7 additions & 0 deletions src/storage/src/monitor/traced_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,13 @@ impl<S: LocalStateStore> LocalStateStore for TracedStateStore<S> {
let _span = TraceSpan::new_seal_current_epoch_span(next_epoch, self.storage_type);
self.inner.seal_current_epoch(next_epoch)
}

async fn try_flush(&mut self) -> StorageResult<()> {
let span = TraceSpan::new_try_flush_span(self.storage_type);
let res = self.inner.try_flush().await;
span.may_send_result(OperationResult::TryFlush(res.as_ref().map(|o| *o).into()));
res
}
}

impl<S: StateStore> StateStore for TracedStateStore<S> {
Expand Down
5 changes: 5 additions & 0 deletions src/storage/src/panic_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ impl LocalStateStore for PanicStateStore {
fn seal_current_epoch(&mut self, _next_epoch: u64) {
panic!("should not operate on the panic state store!")
}

#[allow(clippy::unused_async)]
async fn try_flush(&mut self) -> StorageResult<()> {
panic!("should not operate on the panic state store!");
}
}

impl StateStore for PanicStateStore {
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ pub trait LocalStateStore: StaticSendSync {
delete_ranges: Vec<(Bound<Bytes>, Bound<Bytes>)>,
) -> impl Future<Output = StorageResult<usize>> + Send + '_;

fn try_flush(&mut self) -> impl Future<Output = StorageResult<()>> + Send + '_;
fn epoch(&self) -> u64;

fn is_dirty(&self) -> bool;
Expand Down
17 changes: 17 additions & 0 deletions src/storage/src/store_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,13 @@ pub mod verify {
self.actual.flush(delete_ranges).await
}

async fn try_flush(&mut self) -> StorageResult<()> {
if let Some(expected) = &mut self.expected {
expected.try_flush().await?;
}
self.actual.try_flush().await
}

async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
self.actual.init(options.clone()).await?;
if let Some(expected) = &mut self.expected {
Expand Down Expand Up @@ -828,6 +835,8 @@ pub mod boxed_state_store {
delete_ranges: Vec<(Bound<Bytes>, Bound<Bytes>)>,
) -> StorageResult<usize>;

async fn try_flush(&mut self) -> StorageResult<()>;

fn epoch(&self) -> u64;

fn is_dirty(&self) -> bool;
Expand Down Expand Up @@ -883,6 +892,10 @@ pub mod boxed_state_store {
self.flush(delete_ranges).await
}

async fn try_flush(&mut self) -> StorageResult<()> {
self.try_flush().await
}

fn epoch(&self) -> u64 {
self.epoch()
}
Expand Down Expand Up @@ -949,6 +962,10 @@ pub mod boxed_state_store {
self.deref_mut().flush(delete_ranges)
}

fn try_flush(&mut self) -> impl Future<Output = StorageResult<()>> + Send + '_ {
self.deref_mut().try_flush()
}

fn epoch(&self) -> u64 {
self.deref().epoch()
}
Expand Down
11 changes: 10 additions & 1 deletion src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -903,7 +903,10 @@ where
}

pub async fn commit(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
assert_eq!(self.epoch(), new_epoch.prev);
assert!(self.epoch() >= new_epoch.prev);
if self.epoch() >= new_epoch.curr {
panic!("Fail to commit, the epoch gap runs out");
}
trace!(
table_id = %self.table_id,
epoch = ?self.epoch(),
Expand Down Expand Up @@ -1078,6 +1081,12 @@ where
self.local_store.seal_current_epoch(next_epoch);
Ok(())
}

pub async fn try_flush(&mut self, next_epoch: u64) -> StreamExecutorResult<()> {
self.local_store.try_flush().await?;
self.local_store.seal_current_epoch(next_epoch);
Ok(())
}
}

// Iterator functions
Expand Down
15 changes: 14 additions & 1 deletion src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use risingwave_common::util::epoch::EpochPair;
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_expr::expr::BoxedExpression;
use risingwave_expr::ExprError;
use risingwave_pb::stream_plan::barrier::BarrierKind;
use risingwave_storage::StateStore;
use tokio::time::Instant;

Expand Down Expand Up @@ -795,7 +796,11 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
}
AlignedMessage::Barrier(barrier) => {
let barrier_start_time = Instant::now();
self.flush_data(barrier.epoch).await?;
if barrier.kind == BarrierKind::Barrier {
self.try_flush_data(barrier.epoch).await?;
} else {
self.flush_data(barrier.epoch).await?;
}

// Update the vnode bitmap for state tables of both sides if asked.
if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(self.ctx.id) {
Expand Down Expand Up @@ -847,6 +852,14 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
Ok(())
}

async fn try_flush_data(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
// All changes to the state has been buffered in the mem-table of the state table. Just
// `commit` them here.
self.side_l.ht.try_flush(epoch).await?;
self.side_r.ht.try_flush(epoch).await?;
Ok(())
}

// We need to manually evict the cache.
fn evict_cache(
side_update: &mut JoinSide<K, S>,
Expand Down
6 changes: 6 additions & 0 deletions src/stream/src/executor/managed_state/join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,12 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
Ok(())
}

pub async fn try_flush(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
self.state.table.try_flush(epoch.curr).await?;
self.degree_state.table.try_flush(epoch.curr).await?;
Ok(())
}

/// Insert a join row
#[allow(clippy::unused_async)]
pub async fn insert(&mut self, key: &K, value: JoinRow<impl Row>) -> StreamExecutorResult<()> {
Expand Down