From c8297de74995a82df68c07393e0232f1ced42ac6 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Wed, 20 Sep 2023 14:36:46 +0800 Subject: [PATCH] feat(mito): Allow to retry create request and alter request (#2447) * feat: RegionMetadataBuilder allow adding/dropping columns multiple times * test: test add if not exists/drop if exists * feat: change validator and add need_alter * test: fix tests and test need_alter * test: test alter retry * feat: open before create * style: fix clippy --- src/mito2/src/engine/alter_test.rs | 51 +++++++- src/mito2/src/engine/create_test.rs | 60 +++++++-- src/mito2/src/engine/flush_test.rs | 1 - src/mito2/src/error.rs | 12 ++ src/mito2/src/region/opener.rs | 178 ++++++++++++++++++++------ src/mito2/src/test_util.rs | 19 ++- src/mito2/src/worker/handle_alter.rs | 26 +++- src/mito2/src/worker/handle_create.rs | 22 ++-- src/store-api/src/metadata.rs | 101 ++++++++++++++- src/store-api/src/region_request.rs | 108 +++++++++------- 10 files changed, 451 insertions(+), 127 deletions(-) diff --git a/src/mito2/src/engine/alter_test.rs b/src/mito2/src/engine/alter_test.rs index a1fddab9c7b7..c3d4045df334 100644 --- a/src/mito2/src/engine/alter_test.rs +++ b/src/mito2/src/engine/alter_test.rs @@ -106,7 +106,6 @@ async fn test_alter_region() { assert_eq!(1, version_data.last_entry_id); assert_eq!(3, version_data.committed_sequence); assert_eq!(1, version_data.version.flushed_entry_id); - assert_eq!(1, version_data.version.flushed_entry_id); assert_eq!(3, version_data.version.flushed_sequence); }; check_region(&engine); @@ -245,3 +244,53 @@ async fn test_put_after_alter() { let batches = RecordBatches::try_collect(stream).await.unwrap(); assert_eq!(expected, batches.pretty_print().unwrap()); } + +#[tokio::test] +async fn test_alter_region_retry() { + common_telemetry::init_default_ut_logging(); + + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let rows = Rows { + schema: column_schemas, + rows: build_rows_for_key("a", 0, 2, 0), + }; + put_rows(&engine, region_id, rows).await; + + let request = add_tag1(); + engine + .handle_request(region_id, RegionRequest::Alter(request)) + .await + .unwrap(); + // Retries request. + let request = add_tag1(); + engine + .handle_request(region_id, RegionRequest::Alter(request)) + .await + .unwrap(); + + let expected = "\ ++-------+-------+---------+---------------------+ +| tag_1 | tag_0 | field_0 | ts | ++-------+-------+---------+---------------------+ +| | a | 0.0 | 1970-01-01T00:00:00 | +| | a | 1.0 | 1970-01-01T00:00:01 | ++-------+-------+---------+---------------------+"; + scan_check_after_alter(&engine, region_id, expected).await; + let region = engine.get_region(region_id).unwrap(); + let version_data = region.version_control.current(); + assert_eq!(1, version_data.last_entry_id); + assert_eq!(2, version_data.committed_sequence); + assert_eq!(1, version_data.version.flushed_entry_id); + assert_eq!(2, version_data.version.flushed_sequence); +} diff --git a/src/mito2/src/engine/create_test.rs b/src/mito2/src/engine/create_test.rs index b5cd6615d0d1..31cb4fd031cd 100644 --- a/src/mito2/src/engine/create_test.rs +++ b/src/mito2/src/engine/create_test.rs @@ -14,8 +14,6 @@ use std::time::Duration; -use common_error::ext::ErrorExt; -use common_error::status_code::StatusCode; use store_api::region_engine::RegionEngine; use store_api::region_request::RegionRequest; use store_api::storage::RegionId; @@ -39,12 +37,12 @@ async fn test_engine_create_new_region() { } #[tokio::test] -async fn test_engine_create_region_if_not_exists() { - let mut env = TestEnv::with_prefix("create-not-exists"); +async fn test_engine_create_existing_region() { + let mut env = TestEnv::with_prefix("create-existing"); let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); - let builder = CreateRequestBuilder::new().create_if_not_exists(true); + let builder = CreateRequestBuilder::new(); engine .handle_request(region_id, RegionRequest::Create(builder.build())) .await @@ -58,8 +56,8 @@ async fn test_engine_create_region_if_not_exists() { } #[tokio::test] -async fn test_engine_create_existing_region() { - let mut env = TestEnv::with_prefix("create-existing"); +async fn test_engine_create_with_different_id() { + let mut env = TestEnv::new(); let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -69,15 +67,51 @@ async fn test_engine_create_existing_region() { .await .unwrap(); - // Create the same region again. - let err = engine + // Creates with different id. + engine + .handle_request(RegionId::new(2, 1), RegionRequest::Create(builder.build())) + .await + .unwrap_err(); +} + +#[tokio::test] +async fn test_engine_create_with_different_schema() { + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let builder = CreateRequestBuilder::new(); + engine + .handle_request(region_id, RegionRequest::Create(builder.build())) + .await + .unwrap(); + + // Creates with different schema. + let builder = builder.tag_num(2); + engine + .handle_request(region_id, RegionRequest::Create(builder.build())) + .await + .unwrap_err(); +} + +#[tokio::test] +async fn test_engine_create_with_different_primary_key() { + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let builder = CreateRequestBuilder::new().tag_num(2); + engine + .handle_request(region_id, RegionRequest::Create(builder.build())) + .await + .unwrap(); + + // Creates with different schema. + let builder = builder.primary_key(vec![1]); + engine .handle_request(region_id, RegionRequest::Create(builder.build())) .await .unwrap_err(); - assert!( - matches!(err.status_code(), StatusCode::RegionAlreadyExists), - "unexpected err: {err}" - ); } #[tokio::test] diff --git a/src/mito2/src/engine/flush_test.rs b/src/mito2/src/engine/flush_test.rs index 12f0bd4e06b7..636ce7df2ddd 100644 --- a/src/mito2/src/engine/flush_test.rs +++ b/src/mito2/src/engine/flush_test.rs @@ -250,7 +250,6 @@ async fn test_flush_reopen_region() { assert_eq!(1, version_data.last_entry_id); assert_eq!(3, version_data.committed_sequence); assert_eq!(1, version_data.version.flushed_entry_id); - assert_eq!(1, version_data.version.flushed_entry_id); assert_eq!(3, version_data.version.flushed_sequence); }; check_region(); diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index bcc10d175c5c..278c7e782832 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -459,6 +459,17 @@ pub enum Error { source: serde_json::Error, location: Location, }, + + #[snafu(display( + "Empty region directory, region_id: {}, region_dir: {}", + region_id, + region_dir, + ))] + EmptyRegionDir { + region_id: RegionId, + region_dir: String, + location: Location, + }, } pub type Result = std::result::Result; @@ -529,6 +540,7 @@ impl ErrorExt for Error { InvalidRegionRequest { source, .. } => source.status_code(), RegionReadonly { .. } => StatusCode::RegionReadonly, JsonOptions { .. } => StatusCode::InvalidArguments, + EmptyRegionDir { .. } => StatusCode::RegionNotFound, } } diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 8b2b533c8b3d..fb6e5f89896b 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -18,19 +18,19 @@ use std::collections::HashMap; use std::sync::atomic::{AtomicBool, AtomicI64}; use std::sync::Arc; -use common_telemetry::info; +use common_telemetry::{debug, error, info, warn}; use common_time::util::current_time_millis; use futures::StreamExt; use object_store::util::join_dir; use object_store::ObjectStore; use snafu::{ensure, OptionExt}; use store_api::logstore::LogStore; -use store_api::metadata::RegionMetadata; -use store_api::storage::RegionId; +use store_api::metadata::{ColumnMetadata, RegionMetadata}; +use store_api::storage::{ColumnId, RegionId}; use crate::access_layer::AccessLayer; use crate::config::MitoConfig; -use crate::error::{RegionCorruptedSnafu, RegionNotFoundSnafu, Result}; +use crate::error::{EmptyRegionDirSnafu, RegionCorruptedSnafu, Result}; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; use crate::memtable::MemtableBuilderRef; use crate::region::options::RegionOptions; @@ -90,22 +90,50 @@ impl RegionOpener { self } - /// Writes region manifest and creates a new region. + /// Writes region manifest and creates a new region if it does not exist. + /// Opens the region if it already exists. /// /// # Panics /// Panics if metadata is not set. - pub(crate) async fn create(self, config: &MitoConfig) -> Result { + pub(crate) async fn create_or_open( + self, + config: &MitoConfig, + wal: &Wal, + ) -> Result { let region_id = self.region_id; - let metadata = Arc::new(self.metadata.unwrap()); + let options = self.manifest_options(config); - // Create a manifest manager for this region. - let options = RegionManifestOptions { - manifest_dir: new_manifest_dir(&self.region_dir), - object_store: self.object_store.clone(), - compress_type: config.manifest_compress_type, - checkpoint_distance: config.manifest_checkpoint_distance, - }; - // Writes regions to the manifest file. + // Tries to open the region. + match self.maybe_open(config, wal).await { + Ok(Some(region)) => { + let recovered = region.metadata(); + // Checks the schema of the region. + let expect = self.metadata.as_ref().unwrap(); + check_recovered_region( + &recovered, + expect.region_id, + &expect.column_metadatas, + &expect.primary_key, + )?; + + return Ok(region); + } + Ok(None) => { + debug!( + "No data under directory {}, region_id: {}", + self.region_dir, self.region_id + ); + } + Err(e) => { + warn!( + "Failed to open region {} before creating it, region_dir: {}, err: {}", + self.region_id, self.region_dir, e + ); + } + } + + let metadata = Arc::new(self.metadata.unwrap()); + // Create a manifest manager for this region and writes regions to the manifest file. let manifest_manager = RegionManifestManager::new(metadata.clone(), options).await?; let mutable = self.memtable_builder.build(&metadata); @@ -137,33 +165,52 @@ impl RegionOpener { config: &MitoConfig, wal: &Wal, ) -> Result { - let options = RegionManifestOptions { - manifest_dir: new_manifest_dir(&self.region_dir), - object_store: self.object_store.clone(), - compress_type: config.manifest_compress_type, - checkpoint_distance: config.manifest_checkpoint_distance, - }; - let manifest_manager = - RegionManifestManager::open(options) - .await? - .context(RegionNotFoundSnafu { - region_id: self.region_id, - })?; - - let manifest = manifest_manager.manifest().await; - let metadata = manifest.metadata.clone(); + let region_id = self.region_id; + let region = self + .maybe_open(config, wal) + .await? + .context(EmptyRegionDirSnafu { + region_id, + region_dir: self.region_dir, + })?; ensure!( - metadata.region_id == self.region_id, + region.region_id == self.region_id, RegionCorruptedSnafu { region_id: self.region_id, - reason: format!("region id in metadata is {}", metadata.region_id), + reason: format!( + "recovered region has different region id {}", + region.region_id + ), } ); - let region_id = metadata.region_id; - let access_layer = Arc::new(AccessLayer::new(self.region_dir, self.object_store.clone())); - let file_purger = Arc::new(LocalFilePurger::new(self.scheduler, access_layer.clone())); + Ok(region) + } + + /// Tries to open the region and returns `None` if the region directory is empty. + async fn maybe_open( + &self, + config: &MitoConfig, + wal: &Wal, + ) -> Result> { + let options = self.manifest_options(config); + let Some(manifest_manager) = RegionManifestManager::open(options).await? else { + return Ok(None); + }; + + let manifest = manifest_manager.manifest().await; + let metadata = manifest.metadata.clone(); + + let region_id = self.region_id; + let access_layer = Arc::new(AccessLayer::new( + self.region_dir.clone(), + self.object_store.clone(), + )); + let file_purger = Arc::new(LocalFilePurger::new( + self.scheduler.clone(), + access_layer.clone(), + )); let mutable = self.memtable_builder.build(&metadata); let options = RegionOptions::try_from(&self.options)?; let version = VersionBuilder::new(metadata, mutable) @@ -187,8 +234,67 @@ impl RegionOpener { // Region is always opened in read only mode. writable: AtomicBool::new(false), }; - Ok(region) + Ok(Some(region)) } + + /// Returns a new manifest options. + fn manifest_options(&self, config: &MitoConfig) -> RegionManifestOptions { + RegionManifestOptions { + manifest_dir: new_manifest_dir(&self.region_dir), + object_store: self.object_store.clone(), + compress_type: config.manifest_compress_type, + checkpoint_distance: config.manifest_checkpoint_distance, + } + } +} + +/// Checks whether the recovered region has the same schema as region to create. +pub(crate) fn check_recovered_region( + recovered: &RegionMetadata, + region_id: RegionId, + column_metadatas: &[ColumnMetadata], + primary_key: &[ColumnId], +) -> Result<()> { + if recovered.region_id != region_id { + error!( + "Recovered region {}, expect region {}", + recovered.region_id, region_id + ); + return RegionCorruptedSnafu { + region_id, + reason: format!( + "recovered metadata has different region id {}", + recovered.region_id + ), + } + .fail(); + } + if recovered.column_metadatas != column_metadatas { + error!( + "Unexpected schema in recovered region {}, recovered: {:?}, expect: {:?}", + recovered.region_id, recovered.column_metadatas, column_metadatas + ); + + return RegionCorruptedSnafu { + region_id, + reason: "recovered metadata has different schema", + } + .fail(); + } + if recovered.primary_key != primary_key { + error!( + "Unexpected primary key in recovered region {}, recovered: {:?}, expect: {:?}", + recovered.region_id, recovered.primary_key, primary_key + ); + + return RegionCorruptedSnafu { + region_id, + reason: "recovered metadata has different primary key", + } + .fail(); + } + + Ok(()) } /// Replays the mutations from WAL and inserts mutations to memtable of given region. diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 2d81f0cf63b7..887466838e43 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -42,7 +42,7 @@ use store_api::region_request::{ RegionCloseRequest, RegionCreateRequest, RegionDeleteRequest, RegionFlushRequest, RegionOpenRequest, RegionPutRequest, RegionRequest, }; -use store_api::storage::RegionId; +use store_api::storage::{ColumnId, RegionId}; use crate::config::MitoConfig; use crate::engine::listener::EventListenerRef; @@ -208,8 +208,8 @@ pub struct CreateRequestBuilder { region_dir: String, tag_num: usize, field_num: usize, - create_if_not_exists: bool, options: HashMap, + primary_key: Option>, } impl Default for CreateRequestBuilder { @@ -218,34 +218,39 @@ impl Default for CreateRequestBuilder { region_dir: "test".to_string(), tag_num: 1, field_num: 1, - create_if_not_exists: false, options: HashMap::new(), + primary_key: None, } } } impl CreateRequestBuilder { + #[must_use] pub fn new() -> CreateRequestBuilder { CreateRequestBuilder::default() } + #[must_use] pub fn region_dir(mut self, value: &str) -> Self { self.region_dir = value.to_string(); self } + #[must_use] pub fn tag_num(mut self, value: usize) -> Self { self.tag_num = value; self } + #[must_use] pub fn field_num(mut self, value: usize) -> Self { self.field_num = value; self } - pub fn create_if_not_exists(mut self, value: bool) -> Self { - self.create_if_not_exists = value; + #[must_use] + pub fn primary_key(mut self, primary_key: Vec) -> Self { + self.primary_key = Some(primary_key); self } @@ -297,9 +302,9 @@ impl CreateRequestBuilder { // We use empty engine name as we already locates the engine. engine: String::new(), column_metadatas, - primary_key, - create_if_not_exists: self.create_if_not_exists, + primary_key: self.primary_key.clone().unwrap_or(primary_key), options: self.options.clone(), + create_if_not_exists: false, region_dir: self.region_dir.clone(), } } diff --git a/src/mito2/src/worker/handle_alter.rs b/src/mito2/src/worker/handle_alter.rs index 468bfa6b404f..ceb4ad4586ee 100644 --- a/src/mito2/src/worker/handle_alter.rs +++ b/src/mito2/src/worker/handle_alter.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use common_query::Output; -use common_telemetry::{error, info, warn}; +use common_telemetry::{debug, error, info, warn}; use snafu::ResultExt; use store_api::metadata::{RegionMetadata, RegionMetadataBuilder, RegionMetadataRef}; use store_api::region_request::RegionAlterRequest; @@ -48,14 +48,31 @@ impl RegionWorkerLoop { // Get the version before alter. let version = region.version(); if version.metadata.schema_version > request.schema_version { + // This is possible if we retry the request. warn!( - "Ignored alert request, region id:{}, region schema version {} is greater than request schema version {}", + "Ignores alter request, region id:{}, region schema version {} is greater than request schema version {}", region_id, version.metadata.schema_version, request.schema_version ); // Returns if it altered. sender.send(Ok(Output::AffectedRows(0))); return; } + // Validate request. + if let Err(e) = request.validate(&version.metadata) { + // Invalid request. + sender.send(Err(e).context(InvalidRegionRequestSnafu)); + return; + } + // Checks whether we need to alter the region. + if !request.need_alter(&version.metadata) { + debug!( + "Ignores alter request as it alters nothing, region_id: {}, request: {:?}", + region_id, request + ); + sender.send(Ok(Output::AffectedRows(0))); + return; + } + // Checks whether we can alter the region directly. if !version.memtables.is_empty() { // If memtable is not empty, we can't alter it directly and need to flush @@ -132,11 +149,6 @@ fn metadata_after_alteration( metadata: &RegionMetadata, request: RegionAlterRequest, ) -> Result { - // Validates request. - request - .validate(metadata) - .context(InvalidRegionRequestSnafu)?; - let mut builder = RegionMetadataBuilder::from_existing(metadata.clone()); builder .alter(request.kind) diff --git a/src/mito2/src/worker/handle_create.rs b/src/mito2/src/worker/handle_create.rs index 6db800a3e604..38154022a77b 100644 --- a/src/mito2/src/worker/handle_create.rs +++ b/src/mito2/src/worker/handle_create.rs @@ -18,14 +18,14 @@ use std::sync::Arc; use common_query::Output; use common_telemetry::info; -use snafu::{ensure, ResultExt}; +use snafu::ResultExt; use store_api::logstore::LogStore; use store_api::metadata::RegionMetadataBuilder; use store_api::region_request::RegionCreateRequest; use store_api::storage::RegionId; -use crate::error::{InvalidMetadataSnafu, RegionExistsSnafu, Result}; -use crate::region::opener::RegionOpener; +use crate::error::{InvalidMetadataSnafu, Result}; +use crate::region::opener::{check_recovered_region, RegionOpener}; use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { @@ -35,13 +35,15 @@ impl RegionWorkerLoop { request: RegionCreateRequest, ) -> Result { // Checks whether the table exists. - if self.regions.is_region_exists(region_id) { - ensure!( - request.create_if_not_exists, - RegionExistsSnafu { region_id } - ); - + if let Some(region) = self.regions.get_region(region_id) { // Region already exists. + check_recovered_region( + ®ion.metadata(), + region_id, + &request.column_metadatas, + &request.primary_key, + )?; + return Ok(Output::AffectedRows(0)); } @@ -63,7 +65,7 @@ impl RegionWorkerLoop { .metadata(metadata) .region_dir(&request.region_dir) .options(request.options) - .create(&self.config) + .create_or_open(&self.config, &self.wal) .await?; // TODO(yingwen): Custom the Debug format for the metadata and also print it. diff --git a/src/store-api/src/metadata.rs b/src/store-api/src/metadata.rs index 0798478a1ef9..2480d950847f 100644 --- a/src/store-api/src/metadata.rs +++ b/src/store-api/src/metadata.rs @@ -353,7 +353,6 @@ impl RegionMetadata { /// Checks whether it is a valid column. fn validate_column_metadata(column_metadata: &ColumnMetadata) -> Result<()> { - // TODO(yingwen): Ensure column name is not internal columns. if column_metadata.semantic_type == SemanticType::Timestamp { ensure!( column_metadata @@ -460,11 +459,23 @@ impl RegionMetadataBuilder { Ok(meta) } - /// Adds columns to the metadata. + /// Adds columns to the metadata if not exist. fn add_columns(&mut self, columns: Vec) -> Result<()> { + let mut names: HashSet<_> = self + .column_metadatas + .iter() + .map(|col| col.column_schema.name.clone()) + .collect(); + for add_column in columns { + if names.contains(&add_column.column_metadata.column_schema.name) { + // Column already exists. + continue; + } + let column_id = add_column.column_metadata.column_id; let semantic_type = add_column.column_metadata.semantic_type; + let column_name = add_column.column_metadata.column_schema.name.clone(); match add_column.location { None => { self.column_metadatas.push(add_column.column_metadata); @@ -489,6 +500,7 @@ impl RegionMetadataBuilder { .insert(pos + 1, add_column.column_metadata); } } + names.insert(column_name); if semantic_type == SemanticType::Tag { // For a new tag, we extend the primary key. self.primary_key.push(column_id); @@ -498,7 +510,7 @@ impl RegionMetadataBuilder { Ok(()) } - /// Drops columns from the metadata. + /// Drops columns from the metadata if exist. fn drop_columns(&mut self, names: &[String]) { let name_set: HashSet<_> = names.iter().collect(); self.column_metadatas @@ -938,6 +950,7 @@ mod test { // a (tag), b (field), c (ts) let metadata = build_test_region_metadata(); let mut builder = RegionMetadataBuilder::from_existing(metadata); + // tag d builder .alter(AlterKind::AddColumns { columns: vec![AddColumn { @@ -1005,11 +1018,91 @@ mod test { names: vec!["a".to_string()], }) .unwrap(); - // Build returns error as the primary key has more columns. + // Build returns error as the primary key contains a. let err = builder.build().unwrap_err(); assert_eq!(StatusCode::InvalidArguments, err.status_code()); } + #[test] + fn test_add_if_not_exists() { + // a (tag), b (field), c (ts) + let metadata = build_test_region_metadata(); + let mut builder = RegionMetadataBuilder::from_existing(metadata); + // tag d + builder + .alter(AlterKind::AddColumns { + columns: vec![ + AddColumn { + column_metadata: new_column_metadata("d", true, 4), + location: None, + }, + AddColumn { + column_metadata: new_column_metadata("d", true, 4), + location: None, + }, + ], + }) + .unwrap(); + let metadata = builder.build().unwrap(); + check_columns(&metadata, &["a", "b", "c", "d"]); + assert_eq!([1, 4], &metadata.primary_key[..]); + + let mut builder = RegionMetadataBuilder::from_existing(metadata); + // field b. + builder + .alter(AlterKind::AddColumns { + columns: vec![AddColumn { + column_metadata: new_column_metadata("b", false, 2), + location: None, + }], + }) + .unwrap(); + let metadata = builder.build().unwrap(); + check_columns(&metadata, &["a", "b", "c", "d"]); + } + + #[test] + fn test_drop_if_exists() { + // a (tag), b (field), c (ts) + let metadata = build_test_region_metadata(); + let mut builder = RegionMetadataBuilder::from_existing(metadata); + // field d, e + builder + .alter(AlterKind::AddColumns { + columns: vec![ + AddColumn { + column_metadata: new_column_metadata("d", false, 4), + location: None, + }, + AddColumn { + column_metadata: new_column_metadata("e", false, 5), + location: None, + }, + ], + }) + .unwrap(); + let metadata = builder.build().unwrap(); + check_columns(&metadata, &["a", "b", "c", "d", "e"]); + + let mut builder = RegionMetadataBuilder::from_existing(metadata); + builder + .alter(AlterKind::DropColumns { + names: vec!["b".to_string(), "b".to_string()], + }) + .unwrap(); + let metadata = builder.build().unwrap(); + check_columns(&metadata, &["a", "c", "d", "e"]); + + let mut builder = RegionMetadataBuilder::from_existing(metadata); + builder + .alter(AlterKind::DropColumns { + names: vec!["b".to_string(), "e".to_string()], + }) + .unwrap(); + let metadata = builder.build().unwrap(); + check_columns(&metadata, &["a", "c", "d"]); + } + #[test] fn test_invalid_column_name() { let mut builder = create_builder(); diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 25c268b0872a..693e1a55c5c4 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::fmt; use api::v1::add_column_location::LocationType; @@ -151,6 +151,7 @@ pub struct RegionCreateRequest { /// Columns in the primary key. pub primary_key: Vec, /// Create region if not exists. + // TODO(yingwen): Remove this. pub create_if_not_exists: bool, /// Options of the created region. pub options: HashMap, @@ -203,6 +204,14 @@ impl RegionAlterRequest { Ok(()) } + + /// Returns true if we need to apply the request to the region. + /// + /// The `request` should be valid. + pub fn need_alter(&self, metadata: &RegionMetadata) -> bool { + debug_assert!(self.validate(metadata).is_ok()); + self.kind.need_alter(metadata) + } } impl TryFrom for RegionAlterRequest { @@ -238,23 +247,13 @@ pub enum AlterKind { impl AlterKind { /// Returns an error if the the alter kind is invalid. + /// + /// It allows adding column if not exists and dropping column if exists. pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> { match self { AlterKind::AddColumns { columns } => { - let mut names = HashSet::with_capacity(columns.len()); for col_to_add in columns { - ensure!( - !names.contains(&col_to_add.column_metadata.column_schema.name), - InvalidRegionRequestSnafu { - region_id: metadata.region_id, - err: format!( - "add column {} more than once", - col_to_add.column_metadata.column_schema.name - ), - } - ); col_to_add.validate(metadata)?; - names.insert(&col_to_add.column_metadata.column_schema.name); } } AlterKind::DropColumns { names } => { @@ -266,14 +265,24 @@ impl AlterKind { Ok(()) } + /// Returns true if we need to apply the alteration to the region. + pub fn need_alter(&self, metadata: &RegionMetadata) -> bool { + debug_assert!(self.validate(metadata).is_ok()); + match self { + AlterKind::AddColumns { columns } => columns + .iter() + .any(|col_to_add| col_to_add.need_alter(metadata)), + AlterKind::DropColumns { names } => names + .iter() + .any(|name| metadata.column_by_name(name).is_some()), + } + } + /// Returns an error if the column to drop is invalid. fn validate_column_to_drop(name: &str, metadata: &RegionMetadata) -> Result<()> { - let column = metadata - .column_by_name(name) - .with_context(|| InvalidRegionRequestSnafu { - region_id: metadata.region_id, - err: format!("column {} does not exist", name), - })?; + let Some(column) = metadata.column_by_name(name) else { + return Ok(()); + }; ensure!( column.semantic_type == SemanticType::Field, InvalidRegionRequestSnafu { @@ -320,6 +329,8 @@ pub struct AddColumn { impl AddColumn { /// Returns an error if the column to add is invalid. + /// + /// It allows adding existing columns. pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> { ensure!( self.column_metadata.column_schema.is_nullable() @@ -336,21 +347,17 @@ impl AddColumn { ), } ); - ensure!( - metadata - .column_by_name(&self.column_metadata.column_schema.name) - .is_none(), - InvalidRegionRequestSnafu { - region_id: metadata.region_id, - err: format!( - "column {} already exists", - self.column_metadata.column_schema.name - ), - } - ); Ok(()) } + + /// Returns true if no column to add to the region. + pub fn need_alter(&self, metadata: &RegionMetadata) -> bool { + debug_assert!(self.validate(metadata).is_ok()); + metadata + .column_by_name(&self.column_metadata.column_schema.name) + .is_none() + } } impl TryFrom for AddColumn { @@ -574,7 +581,7 @@ mod tests { #[test] fn test_add_column_validate() { let metadata = new_metadata(); - AddColumn { + let add_column = AddColumn { column_metadata: ColumnMetadata { column_schema: ColumnSchema::new( "tag_1", @@ -585,10 +592,11 @@ mod tests { column_id: 4, }, location: None, - } - .validate(&metadata) - .unwrap(); + }; + add_column.validate(&metadata).unwrap(); + assert!(add_column.need_alter(&metadata)); + // Add not null column. AddColumn { column_metadata: ColumnMetadata { column_schema: ColumnSchema::new( @@ -604,7 +612,8 @@ mod tests { .validate(&metadata) .unwrap_err(); - AddColumn { + // Add existing column. + let add_column = AddColumn { column_metadata: ColumnMetadata { column_schema: ColumnSchema::new( "tag_0", @@ -615,9 +624,9 @@ mod tests { column_id: 4, }, location: None, - } - .validate(&metadata) - .unwrap_err(); + }; + add_column.validate(&metadata).unwrap(); + assert!(!add_column.need_alter(&metadata)); } #[test] @@ -651,27 +660,30 @@ mod tests { ], }; let metadata = new_metadata(); - kind.validate(&metadata).unwrap_err(); + kind.validate(&metadata).unwrap(); + assert!(kind.need_alter(&metadata)); } #[test] fn test_validate_drop_column() { let metadata = new_metadata(); - AlterKind::DropColumns { + let kind = AlterKind::DropColumns { names: vec!["xxxx".to_string()], - } - .validate(&metadata) - .unwrap_err(); + }; + kind.validate(&metadata).unwrap(); + assert!(!kind.need_alter(&metadata)); + AlterKind::DropColumns { names: vec!["tag_0".to_string()], } .validate(&metadata) .unwrap_err(); - AlterKind::DropColumns { + + let kind = AlterKind::DropColumns { names: vec!["field_0".to_string()], - } - .validate(&metadata) - .unwrap(); + }; + kind.validate(&metadata).unwrap(); + assert!(kind.need_alter(&metadata)); } #[test]