From 0ce73ee92c9e9b1dd76b84c9d0a4742b9ce40f70 Mon Sep 17 00:00:00 2001 From: zhyass Date: Wed, 22 May 2024 21:06:34 +0800 Subject: [PATCH 1/2] fix analyze table error --- .../interpreters/interpreter_table_analyze.rs | 32 +++++---- .../storages/fuse/operations/table_analyze.rs | 71 +++++++++++++++++++ 2 files changed, 91 insertions(+), 12 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_table_analyze.rs b/src/query/service/src/interpreters/interpreter_table_analyze.rs index 4f1554e5585c..5cac4bb7614e 100644 --- a/src/query/service/src/interpreters/interpreter_table_analyze.rs +++ b/src/query/service/src/interpreters/interpreter_table_analyze.rs @@ -87,8 +87,8 @@ impl Interpreter for AnalyzeTableInterpreter { .read_table_snapshot_statistics(Some(&snapshot)) .await?; - let temporal_str = if let Some(table_statistics) = &table_statistics { - let is_full = table + let (is_full, temporal_str) = if let Some(table_statistics) = &table_statistics { + let is_full = match table .navigate_to_point( &NavigationPoint::SnapshotID( table_statistics.snapshot_id.simple().to_string(), @@ -96,10 +96,16 @@ impl Interpreter for AnalyzeTableInterpreter { self.ctx.clone().get_abort_checker(), ) .await - .is_err(); - - if is_full { - format!("AT (snapshot => '{}')", snapshot.snapshot_id.simple(),) + { + Ok(t) => !t + .read_table_snapshot() + .await + .is_ok_and(|s| s.is_some_and(|s| s.prev_table_seq.is_some())), + Err(_) => true, + }; + + let temporal_str = if is_full { + format!("AT (snapshot => '{}')", snapshot.snapshot_id.simple()) } else { // analyze only need to collect the added blocks. let table_alias = format!("_change_insert${:08x}", Utc::now().timestamp()); @@ -108,9 +114,13 @@ impl Interpreter for AnalyzeTableInterpreter { table_statistics.snapshot_id.simple(), snapshot.snapshot_id.simple(), ) - } + }; + (is_full, temporal_str) } else { - format!("AT (snapshot => '{}')", snapshot.snapshot_id.simple(),) + ( + true, + format!("AT (snapshot => '{}')", snapshot.snapshot_id.simple()), + ) }; let index_cols: Vec<(u32, String)> = schema @@ -134,10 +144,8 @@ impl Interpreter for AnalyzeTableInterpreter { .join(", "); let sql = format!( - "SELECT {select_expr}, {} as is_full from {}.{} {temporal_str}", - temporal_str.is_empty(), - plan.database, - plan.table, + "SELECT {select_expr}, {is_full} as is_full from {}.{} {temporal_str}", + plan.database, plan.table, ); log::info!("Analyze via sql {:?}", sql); diff --git a/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs b/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs index 51ca787854c5..034b20ef1a20 100644 --- a/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs +++ b/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs @@ -17,10 +17,14 @@ use std::sync::Arc; use databend_common_base::base::tokio; use databend_common_catalog::table::Table; +use databend_common_catalog::table::TableExt; use databend_common_exception::Result; use databend_common_expression::types::number::NumberScalar; +use databend_common_expression::ColumnId; use databend_common_expression::Scalar; +use databend_common_io::prelude::borsh_deserialize_from_slice; use databend_common_storages_fuse::io::MetaReaders; +use databend_common_storages_fuse::io::MetaWriter; use databend_common_storages_fuse::statistics::reducers::merge_statistics_mut; use databend_common_storages_fuse::FuseTable; use databend_query::sessions::QueryContext; @@ -29,8 +33,12 @@ use databend_query::sql::plans::Plan; use databend_query::sql::Planner; use databend_query::test_kits::*; use databend_storages_common_cache::LoadParams; +use databend_storages_common_table_meta::meta::MetaHLL; use databend_storages_common_table_meta::meta::SegmentInfo; use databend_storages_common_table_meta::meta::Statistics; +use databend_storages_common_table_meta::meta::TableSnapshot; +use databend_storages_common_table_meta::meta::TableSnapshotStatistics; +use databend_storages_common_table_meta::meta::Versioned; #[tokio::test(flavor = "multi_thread")] async fn test_table_modify_column_ndv_statistics() -> Result<()> { @@ -179,6 +187,69 @@ async fn check_column_ndv_statistics( Ok(()) } +#[tokio::test(flavor = "multi_thread")] +async fn test_table_analyze_without_prev_table_seq() -> Result<()> { + let fixture = TestFixture::setup().await?; + let ctx = fixture.new_query_ctx().await?; + + // setup + let create_tbl_command = "create table t(c int)"; + fixture.execute_command(create_tbl_command).await?; + + append_rows(ctx.clone(), 3).await?; + let catalog = ctx.get_catalog("default").await?; + let table = catalog.get_table(&ctx.get_tenant(), "default", "t").await?; + let fuse_table = FuseTable::try_from_table(table.as_ref())?; + let location_gen = fuse_table.meta_location_generator(); + let operator = fuse_table.get_operator(); + + // genenrate snapshot without prev_table_seq + let snapshot_0 = fuse_table.read_table_snapshot().await?.unwrap(); + let snapshot_1 = TableSnapshot::from_previous(&snapshot_0, None); + let snapshot_loc_1 = location_gen + .snapshot_location_from_uuid(&snapshot_1.snapshot_id, TableSnapshot::VERSION)?; + snapshot_1.write_meta(&operator, &snapshot_loc_1).await?; + + // generate table statistics. + let col: Vec = vec![1, 3, 0, 0, 0, 118, 5, 1, 21, 6, 3, 229, 13, 3]; + let hll: HashMap = HashMap::from([(0, borsh_deserialize_from_slice(&col)?)]); + let table_statistics = TableSnapshotStatistics::new(hll, snapshot_1.snapshot_id); + let table_statistics_location = location_gen.snapshot_statistics_location_from_uuid( + &table_statistics.snapshot_id, + table_statistics.format_version(), + )?; + // genenrate snapshot without prev_table_seq + let mut snapshot_2 = TableSnapshot::from_previous(&snapshot_1, None); + snapshot_2.table_statistics_location = Some(table_statistics_location); + FuseTable::commit_to_meta_server( + fixture.new_query_ctx().await?.as_ref(), + fuse_table.get_table_info(), + location_gen, + snapshot_2, + Some(table_statistics), + &None, + &operator, + ) + .await?; + + // check statistics. + let table = table.refresh(ctx.as_ref()).await?; + let expected = HashMap::from([(0, 3 as u64)]); + check_column_ndv_statistics(ctx.clone(), table.clone(), expected.clone()).await?; + + let qry = format!("insert into t values(4)"); + execute_command(ctx.clone(), &qry).await?; + + ctx.evict_table_from_cache("default", "default", "t")?; + let statistics_sql = "analyze table default.t"; + fixture.execute_command(statistics_sql).await?; + + let table = table.refresh(ctx.as_ref()).await?; + let expected = HashMap::from([(0, 4 as u64)]); + check_column_ndv_statistics(ctx.clone(), table.clone(), expected.clone()).await?; + Ok(()) +} + async fn append_rows(ctx: Arc, n: usize) -> Result<()> { for i in 0..n { let qry = format!("insert into t values({})", i); From 8263ec657bec95e395f556fdca4cb60b940002e3 Mon Sep 17 00:00:00 2001 From: zhyass Date: Wed, 22 May 2024 21:31:21 +0800 Subject: [PATCH 2/2] make lint --- .../tests/it/storages/fuse/operations/table_analyze.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs b/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs index 034b20ef1a20..08d2561742e7 100644 --- a/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs +++ b/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs @@ -234,18 +234,18 @@ async fn test_table_analyze_without_prev_table_seq() -> Result<()> { // check statistics. let table = table.refresh(ctx.as_ref()).await?; - let expected = HashMap::from([(0, 3 as u64)]); + let expected = HashMap::from([(0, 3_u64)]); check_column_ndv_statistics(ctx.clone(), table.clone(), expected.clone()).await?; - let qry = format!("insert into t values(4)"); - execute_command(ctx.clone(), &qry).await?; + let qry = "insert into t values(4)"; + execute_command(ctx.clone(), qry).await?; ctx.evict_table_from_cache("default", "default", "t")?; let statistics_sql = "analyze table default.t"; fixture.execute_command(statistics_sql).await?; let table = table.refresh(ctx.as_ref()).await?; - let expected = HashMap::from([(0, 4 as u64)]); + let expected = HashMap::from([(0, 4_u64)]); check_column_ndv_statistics(ctx.clone(), table.clone(), expected.clone()).await?; Ok(()) }