Skip to content

Commit

Permalink
Merge branch 'main' into feat-bloom-fold-domain
Browse files Browse the repository at this point in the history
  • Loading branch information
b41sh committed May 23, 2024
2 parents e4ba210 + ddf4099 commit 8de6763
Show file tree
Hide file tree
Showing 16 changed files with 405 additions and 150 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 20 additions & 12 deletions src/query/service/src/interpreters/interpreter_table_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,25 @@ impl Interpreter for AnalyzeTableInterpreter {
.read_table_snapshot_statistics(Some(&snapshot))
.await?;

let temporal_str = if let Some(table_statistics) = &table_statistics {
let is_full = table
let (is_full, temporal_str) = if let Some(table_statistics) = &table_statistics {
let is_full = match table
.navigate_to_point(
&NavigationPoint::SnapshotID(
table_statistics.snapshot_id.simple().to_string(),
),
self.ctx.clone().get_abort_checker(),
)
.await
.is_err();

if is_full {
format!("AT (snapshot => '{}')", snapshot.snapshot_id.simple(),)
{
Ok(t) => !t
.read_table_snapshot()
.await
.is_ok_and(|s| s.is_some_and(|s| s.prev_table_seq.is_some())),
Err(_) => true,
};

let temporal_str = if is_full {
format!("AT (snapshot => '{}')", snapshot.snapshot_id.simple())
} else {
// analyze only need to collect the added blocks.
let table_alias = format!("_change_insert${:08x}", Utc::now().timestamp());
Expand All @@ -108,9 +114,13 @@ impl Interpreter for AnalyzeTableInterpreter {
table_statistics.snapshot_id.simple(),
snapshot.snapshot_id.simple(),
)
}
};
(is_full, temporal_str)
} else {
format!("AT (snapshot => '{}')", snapshot.snapshot_id.simple(),)
(
true,
format!("AT (snapshot => '{}')", snapshot.snapshot_id.simple()),
)
};

let index_cols: Vec<(u32, String)> = schema
Expand All @@ -134,10 +144,8 @@ impl Interpreter for AnalyzeTableInterpreter {
.join(", ");

let sql = format!(
"SELECT {select_expr}, {} as is_full from {}.{} {temporal_str}",
temporal_str.is_empty(),
plan.database,
plan.table,
"SELECT {select_expr}, {is_full} as is_full from {}.{} {temporal_str}",
plan.database, plan.table,
);

log::info!("Analyze via sql {:?}", sql);
Expand Down
50 changes: 44 additions & 6 deletions src/query/service/src/interpreters/interpreter_txn_commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_storages_fuse::TableContext;
use databend_storages_common_txn::TxnManagerRef;
use log::error;
use log::info;

use crate::interpreters::Interpreter;
use crate::pipelines::PipelineBuildResult;
Expand Down Expand Up @@ -55,15 +57,51 @@ impl Interpreter for CommitInterpreter {
let is_active = self.ctx.txn_mgr().lock().is_active();
if is_active {
let catalog = self.ctx.get_default_catalog()?;

let req = self.ctx.txn_mgr().lock().req();
let mismatched_tids = catalog.update_multi_table_meta(req).await?;

let update_summary = {
let table_descriptions = req
.update_table_metas
.iter()
.map(|req| (req.table_id, req.seq, req.new_table_meta.engine.clone()))
.collect::<Vec<_>>();
let stream_descriptions = req
.update_stream_metas
.iter()
.map(|s| (s.stream_id, s.seq, "stream"))
.collect::<Vec<_>>();
(table_descriptions, stream_descriptions)
};

let mismatched_tids = {
let ret = catalog.update_multi_table_meta(req).await;
if let Err(ref e) = ret {
// other errors may occur, especially the version mismatch of streams,
// let's log it here for the convenience of diagnostics
error!(
"Non-recoverable fault occurred during updating tables. {}",
e
);
}
ret?
};

match &mismatched_tids {
Ok(_) => {}
Ok(_) => {
info!(
"COMMIT: Commit explicit transaction success, targets updated {:?}",
update_summary
);
}
Err(e) => {
return Err(ErrorCode::TableVersionMismatched(format!(
"Table version mismatched in multi statement transaction, tids: {:?}",
e.iter().map(|(tid, _, _)| tid).collect::<Vec<_>>()
)));
let err_msg = format!(
"COMMIT: Table versions mismatched in multi statement transaction, conflict tables: {:?}",
e.iter()
.map(|(tid, seq, meta)| (tid, seq, &meta.engine))
.collect::<Vec<_>>()
);
return Err(ErrorCode::TableVersionMismatched(err_msg));
}
}
let need_purge_files = self.ctx.txn_mgr().lock().need_purge_files();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ use std::sync::Arc;

use databend_common_base::base::tokio;
use databend_common_catalog::table::Table;
use databend_common_catalog::table::TableExt;
use databend_common_exception::Result;
use databend_common_expression::types::number::NumberScalar;
use databend_common_expression::ColumnId;
use databend_common_expression::Scalar;
use databend_common_io::prelude::borsh_deserialize_from_slice;
use databend_common_storages_fuse::io::MetaReaders;
use databend_common_storages_fuse::io::MetaWriter;
use databend_common_storages_fuse::statistics::reducers::merge_statistics_mut;
use databend_common_storages_fuse::FuseTable;
use databend_query::sessions::QueryContext;
Expand All @@ -29,8 +33,12 @@ use databend_query::sql::plans::Plan;
use databend_query::sql::Planner;
use databend_query::test_kits::*;
use databend_storages_common_cache::LoadParams;
use databend_storages_common_table_meta::meta::MetaHLL;
use databend_storages_common_table_meta::meta::SegmentInfo;
use databend_storages_common_table_meta::meta::Statistics;
use databend_storages_common_table_meta::meta::TableSnapshot;
use databend_storages_common_table_meta::meta::TableSnapshotStatistics;
use databend_storages_common_table_meta::meta::Versioned;

#[tokio::test(flavor = "multi_thread")]
async fn test_table_modify_column_ndv_statistics() -> Result<()> {
Expand Down Expand Up @@ -179,6 +187,69 @@ async fn check_column_ndv_statistics(
Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn test_table_analyze_without_prev_table_seq() -> Result<()> {
let fixture = TestFixture::setup().await?;
let ctx = fixture.new_query_ctx().await?;

// setup
let create_tbl_command = "create table t(c int)";
fixture.execute_command(create_tbl_command).await?;

append_rows(ctx.clone(), 3).await?;
let catalog = ctx.get_catalog("default").await?;
let table = catalog.get_table(&ctx.get_tenant(), "default", "t").await?;
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
let location_gen = fuse_table.meta_location_generator();
let operator = fuse_table.get_operator();

// genenrate snapshot without prev_table_seq
let snapshot_0 = fuse_table.read_table_snapshot().await?.unwrap();
let snapshot_1 = TableSnapshot::from_previous(&snapshot_0, None);
let snapshot_loc_1 = location_gen
.snapshot_location_from_uuid(&snapshot_1.snapshot_id, TableSnapshot::VERSION)?;
snapshot_1.write_meta(&operator, &snapshot_loc_1).await?;

// generate table statistics.
let col: Vec<u8> = vec![1, 3, 0, 0, 0, 118, 5, 1, 21, 6, 3, 229, 13, 3];
let hll: HashMap<ColumnId, MetaHLL> = HashMap::from([(0, borsh_deserialize_from_slice(&col)?)]);
let table_statistics = TableSnapshotStatistics::new(hll, snapshot_1.snapshot_id);
let table_statistics_location = location_gen.snapshot_statistics_location_from_uuid(
&table_statistics.snapshot_id,
table_statistics.format_version(),
)?;
// genenrate snapshot without prev_table_seq
let mut snapshot_2 = TableSnapshot::from_previous(&snapshot_1, None);
snapshot_2.table_statistics_location = Some(table_statistics_location);
FuseTable::commit_to_meta_server(
fixture.new_query_ctx().await?.as_ref(),
fuse_table.get_table_info(),
location_gen,
snapshot_2,
Some(table_statistics),
&None,
&operator,
)
.await?;

// check statistics.
let table = table.refresh(ctx.as_ref()).await?;
let expected = HashMap::from([(0, 3_u64)]);
check_column_ndv_statistics(ctx.clone(), table.clone(), expected.clone()).await?;

let qry = "insert into t values(4)";
execute_command(ctx.clone(), qry).await?;

ctx.evict_table_from_cache("default", "default", "t")?;
let statistics_sql = "analyze table default.t";
fixture.execute_command(statistics_sql).await?;

let table = table.refresh(ctx.as_ref()).await?;
let expected = HashMap::from([(0, 4_u64)]);
check_column_ndv_statistics(ctx.clone(), table.clone(), expected.clone()).await?;
Ok(())
}

async fn append_rows(ctx: Arc<QueryContext>, n: usize) -> Result<()> {
for i in 0..n {
let qry = format!("insert into t values({})", i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ use databend_common_pipeline_sinks::AsyncSink;
use databend_storages_common_table_meta::meta::TableSnapshot;
use databend_storages_common_table_meta::meta::Versioned;
use log::debug;
use log::error;
use log::info;

use crate::operations::set_backoff;
use crate::operations::AppendGenerator;
Expand Down Expand Up @@ -96,6 +98,7 @@ impl AsyncSink for CommitMultiTableInsert {
let is_active = self.ctx.txn_mgr().lock().is_active();
match is_active {
true => {
// inside explicit transaction
if update_table_meta_reqs.is_empty() {
return Err(ErrorCode::Internal(
"No table meta to update in multi table insert commit. It's a bug",
Expand All @@ -107,14 +110,12 @@ impl AsyncSink for CommitMultiTableInsert {
update_table_meta_reqs[0].update_stream_meta =
std::mem::take(&mut self.update_stream_meta);
update_table_meta_reqs[0].deduplicated_label = self.deduplicated_label.clone();
for (req, info) in update_table_meta_reqs
.into_iter()
.zip(table_infos.into_iter())
{
for (req, info) in update_table_meta_reqs.into_iter().zip(table_infos.iter()) {
self.catalog.update_table_meta(info, req).await?;
}
}
false => {
// auto commit
let mut backoff = set_backoff(None, None, None);
let mut retries = 0;

Expand All @@ -125,26 +126,58 @@ impl AsyncSink for CommitMultiTableInsert {
update_stream_metas: self.update_stream_meta.clone(),
deduplicated_labels: self.deduplicated_label.clone().into_iter().collect(),
};
let update_meta_result = self
.catalog
.update_multi_table_meta(update_multi_table_meta_req)
.await?;

let update_meta_result = {
let ret = self
.catalog
.update_multi_table_meta(update_multi_table_meta_req)
.await;
if let Err(ref e) = ret {
// other errors may occur, especially the version mismatch of streams,
// let's log it here for the convenience of diagnostics
error!(
"Non-recoverable fault occurred during updating tables. {}",
e
);
}
ret?
};

let Err(update_failed_tbls) = update_meta_result else {
let table_descriptions = self
.tables
.values()
.map(|tbl| {
let table_info = tbl.get_table_info();
(&table_info.desc, &table_info.ident, &table_info.meta.engine)
})
.collect::<Vec<_>>();
let stream_descriptions = self
.update_stream_meta
.iter()
.map(|s| (s.stream_id, s.seq, "stream"))
.collect::<Vec<_>>();
info!(
"update tables success (auto commit), tables updated {:?}, streams updated {:?}",
table_descriptions, stream_descriptions
);

return Ok(());
};
let update_failed_tbls_name: Vec<String> = update_failed_tbls
let update_failed_tbl_descriptions: Vec<_> = update_failed_tbls
.iter()
.map(|(tid, _, _)| {
self.tables.get(tid).unwrap().get_table_info().name.clone()
.map(|(tid, seq, meta)| {
let tbl_info = self.tables.get(tid).unwrap().get_table_info();
(&tbl_info.desc, (tid, seq), &meta.engine)
})
.collect();
match backoff.next_backoff() {
Some(duration) => {
retries += 1;

debug!(
"Failed to update tables: {:?}, the commit process of multi-table insert will be retried after {} ms, retrying {} times",
update_failed_tbls_name,
"Failed(temporarily) to update tables: {:?}, the commit process of multi-table insert will be retried after {} ms, retrying {} times",
update_failed_tbl_descriptions,
duration.as_millis(),
retries,
);
Expand All @@ -167,14 +200,16 @@ impl AsyncSink for CommitMultiTableInsert {
}
}
None => {
return Err(ErrorCode::OCCRetryFailure(format!(
"Can not fulfill the tx after retries({} times, {} ms), aborted. target table names {:?}",
let err_msg = format!(
"Can not fulfill the tx after retries({} times, {} ms), aborted. updated tables {:?}",
retries,
Instant::now()
.duration_since(backoff.start_time)
.as_millis(),
update_failed_tbls_name,
)));
update_failed_tbl_descriptions,
);
error!("{}", err_msg);
return Err(ErrorCode::OCCRetryFailure(err_msg));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,19 @@ where F: SnapshotGenerator + Send + 'static
for segment_loc in std::mem::take(&mut self.new_segment_locs).into_iter() {
self.ctx.add_segment_location(segment_loc)?;
}

let target_descriptions = {
let table_info = self.table.get_table_info();
let tbl = (&table_info.name, table_info.ident, &table_info.meta.engine);

let stream_descriptions = self
.update_stream_meta
.iter()
.map(|s| (s.stream_id, s.seq, "stream"))
.collect::<Vec<_>>();
(tbl, stream_descriptions)
};
info!("commit mutation success, targets {:?}", target_descriptions);
self.state = State::Finish;
}
Err(e) if self.is_error_recoverable(&e) => {
Expand Down
Loading

0 comments on commit 8de6763

Please sign in to comment.