Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(state_table): state table try flush for non-checkpoint barrier #12418

Closed
wants to merge 43 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
faeb44c
save work
wcy-fdu Aug 31, 2023
5f393af
add unit tests
wcy-fdu Sep 1, 2023
ea15a02
minor
wcy-fdu Sep 1, 2023
7319bbd
minor refactor
wcy-fdu Sep 4, 2023
97576bd
save work
wcy-fdu Sep 4, 2023
c442aa0
refactor interface
wcy-fdu Sep 4, 2023
d71293f
miss delete range
wcy-fdu Sep 4, 2023
e04eb82
minor
wcy-fdu Sep 4, 2023
53f186e
Merge branch 'main' into wcy/write_anytime
wcy-fdu Sep 4, 2023
24a1f88
impl try flush for traced store
wcy-fdu Sep 4, 2023
597bf79
Merge branch 'wcy/write_anytime' of https://github.com/risingwavelabs…
wcy-fdu Sep 4, 2023
02e7dc1
add epoch gap limit
wcy-fdu Sep 5, 2023
059203a
init prev epoch
wcy-fdu Sep 6, 2023
cbe8404
todo: make threshold configable
wcy-fdu Sep 6, 2023
cfe8615
only try flush, no gap epoch
wcy-fdu Sep 19, 2023
0df0fb7
only try flush, add it to hash join
wcy-fdu Sep 19, 2023
e82ac05
minor
wcy-fdu Sep 19, 2023
f6e73df
resolve conflict
wcy-fdu Sep 19, 2023
0ee8eef
mem table can not be empty when sealing epoch
wcy-fdu Sep 19, 2023
51c896d
apply is_checkpoint judgement for all executors
wcy-fdu Sep 20, 2023
5967702
remove is_dirty_check
wcy-fdu Sep 20, 2023
e438ec6
generate-example-config
wcy-fdu Sep 20, 2023
92d576f
Merge branch 'main' into wcy/try_flush_only
wcy-fdu Sep 20, 2023
9cdd3d5
barrier 100 ms, ckpt 1s for test
wcy-fdu Sep 22, 2023
706fda8
Merge branch 'wcy/try_flush_only' of https://github.com/risingwavelab…
wcy-fdu Sep 22, 2023
c31ffa1
Merge remote-tracking branch 'origin/main' into wcy/try_flush_only
fuyufjh Oct 3, 2023
134af8a
resolve conflict
wcy-fdu Oct 11, 2023
e7b3698
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
wcy-fdu Oct 11, 2023
55892f3
resolve conflict
wcy-fdu Oct 11, 2023
f99ea57
Merge branch 'wcy/try_flush_only' of https://github.com/risingwavelab…
wcy-fdu Oct 11, 2023
6d30dc0
disable decouple
wcy-fdu Oct 11, 2023
d020c84
minor
wcy-fdu Oct 11, 2023
d790782
resolve some comments
wcy-fdu Oct 12, 2023
32653dc
rebase main
wcy-fdu Oct 18, 2023
f535583
Merge branch 'main' into wcy/try_flush_only
wcy-fdu Oct 18, 2023
1fa5fa6
minor
wcy-fdu Oct 19, 2023
6328bee
Merge branch 'wcy/try_flush_only' of https://github.com/risingwavelab…
wcy-fdu Oct 19, 2023
db4235c
Merge branch 'main' into wcy/try_flush_only
wcy-fdu Oct 20, 2023
7faff93
rebase main
wcy-fdu Nov 8, 2023
331aef8
Merge branch 'wcy/try_flush_only' of https://github.com/risingwavelab…
wcy-fdu Nov 8, 2023
792fc22
merge main
wcy-fdu Nov 15, 2023
e18d87e
rebase main
wcy-fdu Nov 28, 2023
32d56db
fix
wcy-fdu Nov 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ async fn test_row_seq_scan() -> Result<()> {
]));

epoch.inc();
state.commit(epoch).await.unwrap();
state.commit(epoch, true).await.unwrap();

let executor = Box::new(RowSeqScanExecutor::new(
table,
Expand Down
4 changes: 2 additions & 2 deletions src/stream/benches/bench_state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ async fn run_bench_state_table_inserts<const USE_WATERMARK_CACHE: bool>(
state_table.insert(row);
}
epoch.inc();
state_table.commit(epoch).await.unwrap();
state_table.commit(epoch, true).await.unwrap();
}

fn bench_state_table_inserts(c: &mut Criterion) {
Expand Down Expand Up @@ -179,7 +179,7 @@ async fn run_bench_state_table_chunks<const USE_WATERMARK_CACHE: bool>(
state_table.write_chunk(chunk);
}
epoch.inc();
state_table.commit(epoch).await.unwrap();
state_table.commit(epoch, true).await.unwrap();
}

fn bench_state_table_write_chunk(c: &mut Criterion) {
Expand Down
133 changes: 74 additions & 59 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -898,71 +898,87 @@ where
}
}

pub async fn commit(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
pub async fn commit(
&mut self,
new_epoch: EpochPair,
is_checkpoint: bool,
) -> StreamExecutorResult<()> {
assert_eq!(self.epoch(), new_epoch.prev);
trace!(
table_id = %self.table_id,
epoch = ?self.epoch(),
"commit state table"
);
// Tick the watermark buffer here because state table is expected to be committed once
// per epoch.
self.watermark_buffer_strategy.tick();
if !self.is_dirty() {
// If the state table is not modified, go fast path.
self.local_store
.seal_current_epoch(new_epoch.curr, SealCurrentEpochOptions::new());
return Ok(());
} else {
self.seal_current_epoch(new_epoch.curr)
.instrument(tracing::info_span!("state_table_commit"))
.await?;
}
match is_checkpoint {
false => {
self.local_store.try_flush().await?;
if !self.is_dirty() {
// If the state table is not modified, go fast path.
self.local_store
.seal_current_epoch(new_epoch.curr, SealCurrentEpochOptions::new());
return Ok(());
} else {
self.seal_current_epoch(new_epoch.curr)
.instrument(tracing::info_span!("state_table_commit"))
.await?;
}
}
true => {
self.watermark_buffer_strategy.tick();
if !self.is_dirty() {
// If the state table is not modified, go fast path.
self.local_store
.seal_current_epoch(new_epoch.curr, SealCurrentEpochOptions::new());
return Ok(());
} else {
self.seal_current_epoch(new_epoch.curr)
.instrument(tracing::info_span!("state_table_commit"))
.await?;
}

// Refresh watermark cache if it is out of sync.
if USE_WATERMARK_CACHE && !self.watermark_cache.is_synced() {
if let Some(ref watermark) = self.prev_cleaned_watermark {
let range: (Bound<Once<Datum>>, Bound<Once<Datum>>) =
(Included(once(Some(watermark.clone()))), Unbounded);
// NOTE(kwannoel): We buffer `pks` before inserting into watermark cache
// because we can't hold an immutable ref (via `iter_key_and_val_with_pk_range`)
// and a mutable ref (via `self.watermark_cache.insert`) at the same time.
// TODO(kwannoel): We can optimize it with:
// 1. Either use `RefCell`.
// 2. Or pass in a direct reference to LocalStateStore,
// instead of referencing it indirectly from `self`.
// Similar to how we do for pk_indices.
let mut pks = Vec::with_capacity(self.watermark_cache.capacity());
{
let mut streams = vec![];
for vnode in self.vnodes().iter_vnodes() {
let stream = self
.iter_with_vnode(vnode, &range, PrefetchOptions::default())
.await?;
streams.push(Box::pin(stream));
}
let merged_stream = merge_sort(streams);
pin_mut!(merged_stream);

#[for_await]
for entry in merged_stream.take(self.watermark_cache.capacity()) {
let keyed_row = entry?;
let pk = self.pk_serde.deserialize(keyed_row.key())?;
if !pk.is_null_at(0) {
pks.push(pk);
// Refresh watermark cache if it is out of sync.
if USE_WATERMARK_CACHE && !self.watermark_cache.is_synced() {
if let Some(ref watermark) = self.prev_cleaned_watermark {
let range: (Bound<Once<Datum>>, Bound<Once<Datum>>) =
(Included(once(Some(watermark.clone()))), Unbounded);
// NOTE(kwannoel): We buffer `pks` before inserting into watermark cache
// because we can't hold an immutable ref (via `iter_key_and_val_with_pk_range`)
// and a mutable ref (via `self.watermark_cache.insert`) at the same time.
// TODO(kwannoel): We can optimize it with:
// 1. Either use `RefCell`.
// 2. Or pass in a direct reference to LocalStateStore,
// instead of referencing it indirectly from `self`.
// Similar to how we do for pk_indices.
let mut pks = Vec::with_capacity(self.watermark_cache.capacity());
{
let mut streams = vec![];
for vnode in self.vnodes().iter_vnodes() {
let stream = self
.iter_with_vnode(vnode, &range, PrefetchOptions::default())
.await?;
streams.push(Box::pin(stream));
}
let merged_stream = merge_sort(streams);
pin_mut!(merged_stream);

#[for_await]
for entry in merged_stream.take(self.watermark_cache.capacity()) {
let keyed_row = entry?;
let pk = self.pk_serde.deserialize(keyed_row.key())?;
if !pk.is_null_at(0) {
pks.push(pk);
}
}
}
}
}

let mut filler = self.watermark_cache.begin_syncing();
for pk in pks {
filler.insert_unchecked(DefaultOrdered(pk), ());
}
filler.finish();
let mut filler = self.watermark_cache.begin_syncing();
for pk in pks {
filler.insert_unchecked(DefaultOrdered(pk), ());
}
filler.finish();

let n_cache_entries = self.watermark_cache.len();
if n_cache_entries < self.watermark_cache.capacity() {
self.watermark_cache.set_table_row_count(n_cache_entries);
let n_cache_entries = self.watermark_cache.len();
if n_cache_entries < self.watermark_cache.capacity() {
self.watermark_cache.set_table_row_count(n_cache_entries);
}
}
}
}
}
Expand All @@ -975,7 +991,6 @@ where
/// in the epoch will be visible
pub fn commit_no_data_expected(&mut self, new_epoch: EpochPair) {
assert_eq!(self.epoch(), new_epoch.prev);
assert!(!self.is_dirty());
// Tick the watermark buffer here because state table is expected to be committed once
// per epoch.
self.watermark_buffer_strategy.tick();
Expand Down
34 changes: 17 additions & 17 deletions src/stream/src/common/table/test_state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ async fn test_state_table_update_insert() {
]));

epoch.inc();
state_table.commit(epoch).await.unwrap();
state_table.commit(epoch, true).await.unwrap();

state_table.delete(OwnedRow::new(vec![
Some(6_i32.into()),
Expand Down Expand Up @@ -132,7 +132,7 @@ async fn test_state_table_update_insert() {
);

epoch.inc();
state_table.commit(epoch).await.unwrap();
state_table.commit(epoch, true).await.unwrap();

let row6_commit = state_table
.get_row(&OwnedRow::new(vec![Some(6_i32.into())]))
Expand Down Expand Up @@ -169,7 +169,7 @@ async fn test_state_table_update_insert() {
]));

epoch.inc();
state_table.commit(epoch).await.unwrap();
state_table.commit(epoch, true).await.unwrap();

// one epoch: delete (1, 2, 3, 4), insert (5, 6, 7, None), delete(5, 6, 7, None)
state_table.delete(OwnedRow::new(vec![
Expand Down Expand Up @@ -198,7 +198,7 @@ async fn test_state_table_update_insert() {
assert_eq!(row1, None);

epoch.inc();
state_table.commit(epoch).await.unwrap();
state_table.commit(epoch, true).await.unwrap();

let row1_commit = state_table
.get_row(&OwnedRow::new(vec![Some(1_i32.into())]))
Expand Down Expand Up @@ -263,7 +263,7 @@ async fn test_state_table_iter_with_prefix() {
]));

epoch.inc();
state_table.commit(epoch).await.unwrap();
state_table.commit(epoch, true).await.unwrap();

state_table.insert(OwnedRow::new(vec![
Some(1_i32.into()),
Expand Down Expand Up @@ -392,7 +392,7 @@ async fn test_state_table_iter_with_pk_range() {
]));

epoch.inc();
state_table.commit(epoch).await.unwrap();
state_table.commit(epoch, true).await.unwrap();

state_table.insert(OwnedRow::new(vec![
Some(1_i32.into()),
Expand Down Expand Up @@ -597,7 +597,7 @@ async fn test_state_table_iter_with_value_indices() {
}

epoch.inc();
state_table.commit(epoch).await.unwrap();
state_table.commit(epoch, true).await.unwrap();

// write [3, 33, 333], [4, 44, 444], [5, 55, 555], [7, 77, 777], [8, 88, 888]into mem_table,
// [3, 33, 3333], [6, 66, 666], [9, 99, 999] exists in
Expand Down Expand Up @@ -785,7 +785,7 @@ async fn test_state_table_iter_with_shuffle_value_indices() {
}

epoch.inc();
state_table.commit(epoch).await.unwrap();
state_table.commit(epoch, true).await.unwrap();

// write [3, 33, 333], [4, 44, 444], [5, 55, 555], [7, 77, 777], [8, 88, 888]into mem_table,
// [3, 33, 3333], [6, 66, 666], [9, 99, 999] exists in
Expand Down Expand Up @@ -1341,7 +1341,7 @@ async fn test_state_table_may_exist() {
check_may_exist(&state_table, vec![1, 4], vec![2, 3, 6, 12]).await;

epoch.inc();
state_table.commit(epoch).await.unwrap();
state_table.commit(epoch, true).await.unwrap();
let e1 = epoch.prev;

// test may_exist with data only in immutable memtable (e1)
Expand Down Expand Up @@ -1382,7 +1382,7 @@ async fn test_state_table_may_exist() {
check_may_exist(&state_table, vec![1, 4, 6], vec![2, 3, 12]).await;

epoch.inc();
state_table.commit(epoch).await.unwrap();
state_table.commit(epoch, true).await.unwrap();
let e2 = epoch.prev;

// test may_exist with data in immutable memtable (e2), committed ssts (e1)
Expand All @@ -1408,7 +1408,7 @@ async fn test_state_table_may_exist() {
check_may_exist(&state_table, vec![1, 3, 4, 6], vec![2, 12]).await;

epoch.inc();
state_table.commit(epoch).await.unwrap();
state_table.commit(epoch, true).await.unwrap();
let e3 = epoch.prev;

// test may_exist with data in immutable memtable (e3), uncommitted ssts (e2), committed
Expand Down Expand Up @@ -1438,7 +1438,7 @@ async fn test_state_table_may_exist() {
test_env.storage.try_wait_epoch_for_test(e2).await;

epoch.inc();
state_table.commit(epoch).await.unwrap();
state_table.commit(epoch, true).await.unwrap();
let e4 = epoch.prev;

let e3_res = test_env.storage.seal_and_sync_epoch(e3).await.unwrap();
Expand Down Expand Up @@ -1545,7 +1545,7 @@ async fn test_state_table_watermark_cache_ignore_null() {
state_table.update_watermark(watermark, true);

epoch.inc();
state_table.commit(epoch).await.unwrap();
state_table.commit(epoch, true).await.unwrap();

let cache = state_table.get_watermark_cache();
assert_eq!(cache.len(), 1);
Expand Down Expand Up @@ -1626,7 +1626,7 @@ async fn test_state_table_watermark_cache_write_chunk() {
state_table.update_watermark(watermark, true);

epoch.inc();
state_table.commit(epoch).await.unwrap();
state_table.commit(epoch, true).await.unwrap();

let inserts_1 = vec![
(
Expand Down Expand Up @@ -1735,7 +1735,7 @@ async fn test_state_table_watermark_cache_write_chunk() {
state_table.update_watermark(watermark, true);

epoch.inc();
state_table.commit(epoch).await.unwrap();
state_table.commit(epoch, true).await.unwrap();

// After sync, we should scan all rows into watermark cache.
let cache = state_table.get_watermark_cache();
Expand Down Expand Up @@ -1832,7 +1832,7 @@ async fn test_state_table_watermark_cache_refill() {
state_table.update_watermark(watermark, true);

epoch.inc();
state_table.commit(epoch).await.unwrap();
state_table.commit(epoch, true).await.unwrap();

// After the first barrier, watermark cache won't be filled.
let cache = state_table.get_watermark_cache();
Expand Down Expand Up @@ -1898,7 +1898,7 @@ async fn test_state_table_iter_prefix_and_sub_range() {
]));

epoch.inc();
state_table.commit(epoch).await.unwrap();
state_table.commit(epoch, true).await.unwrap();

let pk_prefix = OwnedRow::new(vec![Some(1_i32.into())]);

Expand Down
8 changes: 4 additions & 4 deletions src/stream/src/common/table/test_storage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ async fn test_storage_table_value_indices() {
]));

epoch.inc();
state.commit(epoch).await.unwrap();
state.commit(epoch, true).await.unwrap();
test_env.commit_epoch(epoch.prev).await;

let get_row1_res = table
Expand Down Expand Up @@ -222,7 +222,7 @@ async fn test_shuffled_column_id_for_storage_table_get_row() {
]));

epoch.inc();
state.commit(epoch).await.unwrap();
state.commit(epoch, true).await.unwrap();
test_env.commit_epoch(epoch.prev).await;

let get_row1_res = table
Expand Down Expand Up @@ -329,7 +329,7 @@ async fn test_row_based_storage_table_point_get_in_batch_mode() {
Some(222_i32.into()),
]));
epoch.inc();
state.commit(epoch).await.unwrap();
state.commit(epoch, true).await.unwrap();
test_env.commit_epoch(epoch.prev).await;

let get_row1_res = table
Expand Down Expand Up @@ -444,7 +444,7 @@ async fn test_batch_scan_with_value_indices() {
]));

epoch.inc();
state.commit(epoch).await.unwrap();
state.commit(epoch, true).await.unwrap();
test_env.commit_epoch(epoch.prev).await;

let iter = table
Expand Down
Loading
Loading