Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: analyze table error #15614

Merged
merged 2 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 20 additions & 12 deletions src/query/service/src/interpreters/interpreter_table_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,25 @@ impl Interpreter for AnalyzeTableInterpreter {
.read_table_snapshot_statistics(Some(&snapshot))
.await?;

let temporal_str = if let Some(table_statistics) = &table_statistics {
let is_full = table
let (is_full, temporal_str) = if let Some(table_statistics) = &table_statistics {
let is_full = match table
.navigate_to_point(
&NavigationPoint::SnapshotID(
table_statistics.snapshot_id.simple().to_string(),
),
self.ctx.clone().get_abort_checker(),
)
.await
.is_err();

if is_full {
format!("AT (snapshot => '{}')", snapshot.snapshot_id.simple(),)
{
Ok(t) => !t
.read_table_snapshot()
.await
.is_ok_and(|s| s.is_some_and(|s| s.prev_table_seq.is_some())),
Err(_) => true,
};

let temporal_str = if is_full {
format!("AT (snapshot => '{}')", snapshot.snapshot_id.simple())
} else {
// analyze only need to collect the added blocks.
let table_alias = format!("_change_insert${:08x}", Utc::now().timestamp());
Expand All @@ -108,9 +114,13 @@ impl Interpreter for AnalyzeTableInterpreter {
table_statistics.snapshot_id.simple(),
snapshot.snapshot_id.simple(),
)
}
};
(is_full, temporal_str)
} else {
format!("AT (snapshot => '{}')", snapshot.snapshot_id.simple(),)
(
true,
format!("AT (snapshot => '{}')", snapshot.snapshot_id.simple()),
)
};

let index_cols: Vec<(u32, String)> = schema
Expand All @@ -134,10 +144,8 @@ impl Interpreter for AnalyzeTableInterpreter {
.join(", ");

let sql = format!(
"SELECT {select_expr}, {} as is_full from {}.{} {temporal_str}",
temporal_str.is_empty(),
plan.database,
plan.table,
"SELECT {select_expr}, {is_full} as is_full from {}.{} {temporal_str}",
plan.database, plan.table,
);

log::info!("Analyze via sql {:?}", sql);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ use std::sync::Arc;

use databend_common_base::base::tokio;
use databend_common_catalog::table::Table;
use databend_common_catalog::table::TableExt;
use databend_common_exception::Result;
use databend_common_expression::types::number::NumberScalar;
use databend_common_expression::ColumnId;
use databend_common_expression::Scalar;
use databend_common_io::prelude::borsh_deserialize_from_slice;
use databend_common_storages_fuse::io::MetaReaders;
use databend_common_storages_fuse::io::MetaWriter;
use databend_common_storages_fuse::statistics::reducers::merge_statistics_mut;
use databend_common_storages_fuse::FuseTable;
use databend_query::sessions::QueryContext;
Expand All @@ -29,8 +33,12 @@ use databend_query::sql::plans::Plan;
use databend_query::sql::Planner;
use databend_query::test_kits::*;
use databend_storages_common_cache::LoadParams;
use databend_storages_common_table_meta::meta::MetaHLL;
use databend_storages_common_table_meta::meta::SegmentInfo;
use databend_storages_common_table_meta::meta::Statistics;
use databend_storages_common_table_meta::meta::TableSnapshot;
use databend_storages_common_table_meta::meta::TableSnapshotStatistics;
use databend_storages_common_table_meta::meta::Versioned;

#[tokio::test(flavor = "multi_thread")]
async fn test_table_modify_column_ndv_statistics() -> Result<()> {
Expand Down Expand Up @@ -179,6 +187,69 @@ async fn check_column_ndv_statistics(
Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn test_table_analyze_without_prev_table_seq() -> Result<()> {
let fixture = TestFixture::setup().await?;
let ctx = fixture.new_query_ctx().await?;

// setup
let create_tbl_command = "create table t(c int)";
fixture.execute_command(create_tbl_command).await?;

append_rows(ctx.clone(), 3).await?;
let catalog = ctx.get_catalog("default").await?;
let table = catalog.get_table(&ctx.get_tenant(), "default", "t").await?;
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
let location_gen = fuse_table.meta_location_generator();
let operator = fuse_table.get_operator();

// genenrate snapshot without prev_table_seq
let snapshot_0 = fuse_table.read_table_snapshot().await?.unwrap();
let snapshot_1 = TableSnapshot::from_previous(&snapshot_0, None);
let snapshot_loc_1 = location_gen
.snapshot_location_from_uuid(&snapshot_1.snapshot_id, TableSnapshot::VERSION)?;
snapshot_1.write_meta(&operator, &snapshot_loc_1).await?;

// generate table statistics.
let col: Vec<u8> = vec![1, 3, 0, 0, 0, 118, 5, 1, 21, 6, 3, 229, 13, 3];
let hll: HashMap<ColumnId, MetaHLL> = HashMap::from([(0, borsh_deserialize_from_slice(&col)?)]);
let table_statistics = TableSnapshotStatistics::new(hll, snapshot_1.snapshot_id);
let table_statistics_location = location_gen.snapshot_statistics_location_from_uuid(
&table_statistics.snapshot_id,
table_statistics.format_version(),
)?;
// genenrate snapshot without prev_table_seq
let mut snapshot_2 = TableSnapshot::from_previous(&snapshot_1, None);
snapshot_2.table_statistics_location = Some(table_statistics_location);
FuseTable::commit_to_meta_server(
fixture.new_query_ctx().await?.as_ref(),
fuse_table.get_table_info(),
location_gen,
snapshot_2,
Some(table_statistics),
&None,
&operator,
)
.await?;

// check statistics.
let table = table.refresh(ctx.as_ref()).await?;
let expected = HashMap::from([(0, 3 as u64)]);
check_column_ndv_statistics(ctx.clone(), table.clone(), expected.clone()).await?;

let qry = format!("insert into t values(4)");
execute_command(ctx.clone(), &qry).await?;

ctx.evict_table_from_cache("default", "default", "t")?;
let statistics_sql = "analyze table default.t";
fixture.execute_command(statistics_sql).await?;

let table = table.refresh(ctx.as_ref()).await?;
let expected = HashMap::from([(0, 4 as u64)]);
check_column_ndv_statistics(ctx.clone(), table.clone(), expected.clone()).await?;
Ok(())
}

async fn append_rows(ctx: Arc<QueryContext>, n: usize) -> Result<()> {
for i in 0..n {
let qry = format!("insert into t values({})", i);
Expand Down
Loading