diff --git a/Cargo.lock b/Cargo.lock index 689b92c80df4..a039cc153d58 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2737,6 +2737,9 @@ name = "deranged" version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2696e8a945f658fd14dc3b87242e6b80cd0f36ff04ea560fa39082368847946" +dependencies = [ + "serde", +] [[package]] name = "derive-new" @@ -4477,6 +4480,7 @@ checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d" dependencies = [ "equivalent", "hashbrown 0.14.0", + "serde", ] [[package]] @@ -5497,6 +5501,7 @@ dependencies = [ "regex", "serde", "serde_json", + "serde_with", "smallvec", "snafu", "store-api", @@ -8662,6 +8667,35 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_with" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ca3b16a3d82c4088f343b7480a93550b3eabe1a358569c2dfe38bbcead07237" +dependencies = [ + "base64 0.21.3", + "chrono", + "hex", + "indexmap 1.9.3", + "indexmap 2.0.0", + "serde", + "serde_json", + "serde_with_macros", + "time 0.3.28", +] + +[[package]] +name = "serde_with_macros" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e6be15c453eb305019bfa438b1593c731f36a289a7853f7707ee29e870b3b3c" +dependencies = [ + "darling 0.20.3", + "proc-macro2", + "quote", + "syn 2.0.29", +] + [[package]] name = "serde_yaml" version = "0.9.25" diff --git a/src/cmd/src/cli/bench.rs b/src/cmd/src/cli/bench.rs index fbeb70fef5e6..54373cfa7ac9 100644 --- a/src/cmd/src/cli/bench.rs +++ b/src/cmd/src/cli/bench.rs @@ -120,7 +120,6 @@ fn create_table_info(table_id: TableId, table_name: TableName) -> RawTableInfo { created_on: chrono::DateTime::default(), primary_key_indices: vec![], next_column_id: columns as u32 + 1, - engine_options: Default::default(), value_indices: vec![], options: Default::default(), region_numbers: (1..=100).collect(), diff --git a/src/common/meta/src/key/table_info.rs b/src/common/meta/src/key/table_info.rs index 9ca861444603..50d7e56d5164 100644 --- a/src/common/meta/src/key/table_info.rs +++ b/src/common/meta/src/key/table_info.rs @@ -275,7 +275,6 @@ mod tests { created_on: chrono::DateTime::default(), primary_key_indices: vec![0, 1], next_column_id: 3, - engine_options: Default::default(), value_indices: vec![2, 3], options: Default::default(), region_numbers: vec![1], diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index 965ae2603713..b884e70a0a03 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -100,7 +100,6 @@ pub mod mock { #[cfg(test)] pub mod test_data { - use std::collections::HashMap; use std::sync::Arc; use chrono::DateTime; @@ -178,7 +177,6 @@ pub mod test_data { engine: MITO2_ENGINE.to_string(), next_column_id: 3, region_numbers: vec![1, 2, 3], - engine_options: HashMap::new(), options: TableOptions::default(), created_on: DateTime::default(), partition_key_indices: vec![], diff --git a/src/meta-srv/src/table_routes.rs b/src/meta-srv/src/table_routes.rs index cbdfd12263a9..170082aae5ce 100644 --- a/src/meta-srv/src/table_routes.rs +++ b/src/meta-srv/src/table_routes.rs @@ -70,8 +70,6 @@ pub(crate) async fn fetch_tables( #[cfg(test)] pub(crate) mod tests { - use std::collections::HashMap; - use chrono::DateTime; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; use common_meta::key::TableMetadataManagerRef; @@ -103,7 +101,6 @@ pub(crate) mod tests { engine: MITO_ENGINE.to_string(), next_column_id: 1, region_numbers: vec![1, 2, 3, 4], - engine_options: HashMap::new(), options: TableOptions::default(), created_on: DateTime::default(), partition_key_indices: vec![], diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 37c0ec53d444..0a1ff27f9e4c 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -46,6 +46,7 @@ prost.workspace = true regex = "1.5" serde = { version = "1.0", features = ["derive"] } serde_json.workspace = true +serde_with = "3" smallvec.workspace = true snafu.workspace = true store-api = { workspace = true } diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 8aff4414875d..23ce3517e0ed 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -20,12 +20,11 @@ mod twcs; use std::collections::HashMap; use std::sync::Arc; -use std::time::Duration; use common_telemetry::{debug, error}; pub use picker::CompactionPickerRef; use snafu::ResultExt; -use store_api::storage::{CompactionStrategy, RegionId, TwcsOptions}; +use store_api::storage::RegionId; use tokio::sync::mpsc::{self, Sender}; use crate::access_layer::AccessLayerRef; @@ -33,6 +32,7 @@ use crate::compaction::twcs::TwcsPicker; use crate::error::{ CompactRegionSnafu, Error, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result, }; +use crate::region::options::CompactionOptions; use crate::region::version::{VersionControlRef, VersionRef}; use crate::request::{OptionOutputTx, OutputTx, WorkerRequest}; use crate::schedule::scheduler::SchedulerRef; @@ -42,7 +42,6 @@ use crate::sst::file_purger::FilePurgerRef; pub struct CompactionRequest { pub(crate) current_version: VersionRef, pub(crate) access_layer: AccessLayerRef, - pub(crate) ttl: Option, pub(crate) compaction_time_window: Option, /// Sender to send notification to the region worker. pub(crate) request_sender: mpsc::Sender, @@ -64,13 +63,13 @@ impl CompactionRequest { } } -/// Builds compaction picker according to [CompactionStrategy]. -pub fn compaction_strategy_to_picker(strategy: &CompactionStrategy) -> CompactionPickerRef { +/// Builds compaction picker according to [CompactionOptions]. +pub fn compaction_options_to_picker(strategy: &CompactionOptions) -> CompactionPickerRef { match strategy { - CompactionStrategy::Twcs(twcs_opts) => Arc::new(TwcsPicker::new( + CompactionOptions::Twcs(twcs_opts) => Arc::new(TwcsPicker::new( twcs_opts.max_active_window_files, twcs_opts.max_inactive_window_files, - twcs_opts.time_window_seconds, + twcs_opts.time_window_seconds(), )) as Arc<_>, } } @@ -175,9 +174,7 @@ impl CompactionScheduler { /// /// If the region has nothing to compact, it removes the region from the status map. fn schedule_compaction_request(&mut self, request: CompactionRequest) -> Result<()> { - // TODO(hl): build picker according to region options. - let picker = - compaction_strategy_to_picker(&CompactionStrategy::Twcs(TwcsOptions::default())); + let picker = compaction_options_to_picker(&request.current_version.options.compaction); let region_id = request.region_id(); debug!( "Pick compaction strategy {:?} for region: {}", @@ -309,8 +306,6 @@ impl CompactionStatus { let mut req = CompactionRequest { current_version, access_layer: self.access_layer.clone(), - // TODO(hl): get TTL info from region metadata - ttl: None, // TODO(hl): get persisted region compaction time window compaction_time_window: None, request_sender: request_sender.clone(), diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 9812e2c00f5d..5f03a3aa5fdb 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -120,7 +120,6 @@ impl Picker for TwcsPicker { let CompactionRequest { current_version, access_layer, - ttl, compaction_time_window, request_sender, waiters, @@ -131,6 +130,7 @@ impl Picker for TwcsPicker { let region_id = region_metadata.region_id; let levels = current_version.ssts.levels(); + let ttl = current_version.options.ttl; let expired_ssts = get_expired_ssts(levels, ttl, Timestamp::current_millis()); if !expired_ssts.is_empty() { info!("Expired SSTs in region {}: {:?}", region_id, expired_ssts); @@ -376,7 +376,6 @@ impl CompactionTask for TwcsCompactionTask { notify, }) .await; - // TODO(hl): handle reschedule } } diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 8b343305e220..3ca21a1aec91 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -25,8 +25,6 @@ use serde::{Deserialize, Serialize}; const DEFAULT_NUM_WORKERS: usize = 1; /// Default max running background job. const DEFAULT_MAX_BG_JOB: usize = 4; -/// Default region write buffer size. -pub(crate) const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(32); /// Configuration for [MitoEngine](crate::engine::MitoEngine). #[derive(Debug, Serialize, Deserialize, Clone)] diff --git a/src/mito2/src/engine/create_test.rs b/src/mito2/src/engine/create_test.rs index 2b9c8bae9e84..b5cd6615d0d1 100644 --- a/src/mito2/src/engine/create_test.rs +++ b/src/mito2/src/engine/create_test.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::time::Duration; + use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use store_api::region_engine::RegionEngine; @@ -77,3 +79,25 @@ async fn test_engine_create_existing_region() { "unexpected err: {err}" ); } + +#[tokio::test] +async fn test_engine_create_with_options() { + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new() + .insert_option("ttl", "10d") + .build(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + assert!(engine.is_region_exists(region_id)); + let region = engine.get_region(region_id).unwrap(); + assert_eq!( + Duration::from_secs(3600 * 24 * 10), + region.version().options.ttl.unwrap() + ); +} diff --git a/src/mito2/src/engine/open_test.rs b/src/mito2/src/engine/open_test.rs index edacb0e067ef..e8254cf71d05 100644 --- a/src/mito2/src/engine/open_test.rs +++ b/src/mito2/src/engine/open_test.rs @@ -13,12 +13,15 @@ // limitations under the License. use std::collections::HashMap; +use std::time::Duration; use api::v1::Rows; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use store_api::region_engine::RegionEngine; -use store_api::region_request::{RegionOpenRequest, RegionPutRequest, RegionRequest}; +use store_api::region_request::{ + RegionCloseRequest, RegionOpenRequest, RegionPutRequest, RegionRequest, +}; use store_api::storage::RegionId; use crate::config::MitoConfig; @@ -125,3 +128,42 @@ async fn test_engine_open_readonly() { engine.set_writable(region_id, true).unwrap(); put_rows(&engine, region_id, rows).await; } + +#[tokio::test] +async fn test_engine_region_open_with_options() { + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let region_dir = request.region_dir.clone(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Close the region. + engine + .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) + .await + .unwrap(); + + // Open the region again with options. + engine + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + region_dir, + options: HashMap::from([("ttl".to_string(), "4d".to_string())]), + }), + ) + .await + .unwrap(); + + let region = engine.get_region(region_id).unwrap(); + assert_eq!( + Duration::from_secs(3600 * 24 * 4), + region.version().options.ttl.unwrap() + ); +} diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 7986db4c8a81..bcc10d175c5c 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -453,6 +453,12 @@ pub enum Error { region_id: RegionId, location: Location, }, + + #[snafu(display("Invalid options, source: {}", source))] + JsonOptions { + source: serde_json::Error, + location: Location, + }, } pub type Result = std::result::Result; @@ -522,6 +528,7 @@ impl ErrorExt for Error { CompatReader { .. } => StatusCode::Unexpected, InvalidRegionRequest { source, .. } => source.status_code(), RegionReadonly { .. } => StatusCode::RegionReadonly, + JsonOptions { .. } => StatusCode::InvalidArguments, } } diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index a2a3252462ea..2feb69676ce8 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -36,7 +36,7 @@ pub mod memtable; mod metrics; #[allow(dead_code)] pub mod read; -mod region; +pub mod region; mod region_write_ctx; #[allow(dead_code)] pub mod request; diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 44f82fd36790..b9b5aea3fa34 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -15,6 +15,7 @@ //! Mito region. pub(crate) mod opener; +pub mod options; pub(crate) mod version; use std::collections::HashMap; diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 157eff1a4a5d..8b2b533c8b3d 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -14,6 +14,7 @@ //! Region opener. +use std::collections::HashMap; use std::sync::atomic::{AtomicBool, AtomicI64}; use std::sync::Arc; @@ -32,6 +33,7 @@ use crate::config::MitoConfig; use crate::error::{RegionCorruptedSnafu, RegionNotFoundSnafu, Result}; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; use crate::memtable::MemtableBuilderRef; +use crate::region::options::RegionOptions; use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef}; use crate::region::MitoRegion; use crate::region_write_ctx::RegionWriteCtx; @@ -48,6 +50,7 @@ pub(crate) struct RegionOpener { object_store: ObjectStore, region_dir: String, scheduler: SchedulerRef, + options: HashMap, } impl RegionOpener { @@ -65,6 +68,7 @@ impl RegionOpener { object_store, region_dir: String::new(), scheduler, + options: HashMap::new(), } } @@ -80,6 +84,12 @@ impl RegionOpener { self } + /// Sets options for the region. + pub(crate) fn options(mut self, value: HashMap) -> Self { + self.options = value; + self + } + /// Writes region manifest and creates a new region. /// /// # Panics @@ -100,7 +110,10 @@ impl RegionOpener { let mutable = self.memtable_builder.build(&metadata); - let version = VersionBuilder::new(metadata, mutable).build(); + let options = RegionOptions::try_from(&self.options)?; + let version = VersionBuilder::new(metadata, mutable) + .options(options) + .build(); let version_control = Arc::new(VersionControl::new(version)); let access_layer = Arc::new(AccessLayer::new(self.region_dir, self.object_store.clone())); @@ -152,11 +165,13 @@ impl RegionOpener { let access_layer = Arc::new(AccessLayer::new(self.region_dir, self.object_store.clone())); let file_purger = Arc::new(LocalFilePurger::new(self.scheduler, access_layer.clone())); let mutable = self.memtable_builder.build(&metadata); + let options = RegionOptions::try_from(&self.options)?; let version = VersionBuilder::new(metadata, mutable) .add_files(file_purger.clone(), manifest.files.values().cloned()) .flushed_entry_id(manifest.flushed_entry_id) .flushed_sequence(manifest.flushed_sequence) .truncated_entry_id(manifest.truncated_entry_id) + .options(options) .build(); let flushed_entry_id = version.flushed_entry_id; let version_control = Arc::new(VersionControl::new(version)); diff --git a/src/mito2/src/region/options.rs b/src/mito2/src/region/options.rs new file mode 100644 index 000000000000..c8ef80ddf513 --- /dev/null +++ b/src/mito2/src/region/options.rs @@ -0,0 +1,237 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Options for a region. + +use std::collections::HashMap; +use std::time::Duration; + +use serde::Deserialize; +use serde_json::Value; +use serde_with::{serde_as, with_prefix, DisplayFromStr}; +use snafu::ResultExt; + +use crate::error::{Error, JsonOptionsSnafu, Result}; + +/// Options that affect the entire region. +/// +/// Users need to specify the options while creating/opening a region. +#[derive(Debug, Default, Clone, PartialEq, Eq, Deserialize)] +#[serde(default)] +pub struct RegionOptions { + /// Region SST files TTL. + #[serde(with = "humantime_serde")] + pub ttl: Option, + /// Compaction options. + pub compaction: CompactionOptions, +} + +impl TryFrom<&HashMap> for RegionOptions { + type Error = Error; + + fn try_from(options_map: &HashMap) -> Result { + let value = options_map_to_value(options_map); + let json = serde_json::to_string(&value).context(JsonOptionsSnafu)?; + + // #[serde(flatten)] doesn't work with #[serde(default)] so we need to parse + // each field manually instead of using #[serde(flatten)] for `compaction`. + // See https://github.com/serde-rs/serde/issues/1626 + let options: RegionOptionsWithoutEnum = + serde_json::from_str(&json).context(JsonOptionsSnafu)?; + let compaction: CompactionOptions = serde_json::from_str(&json).unwrap_or_default(); + + Ok(RegionOptions { + ttl: options.ttl, + compaction, + }) + } +} + +/// Options for compactions +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +#[serde(tag = "compaction.type")] +#[serde(rename_all = "lowercase")] +pub enum CompactionOptions { + /// Time window compaction strategy. + #[serde(with = "prefix_twcs")] + Twcs(TwcsOptions), +} + +impl Default for CompactionOptions { + fn default() -> Self { + Self::Twcs(TwcsOptions::default()) + } +} + +/// Time window compaction options. +#[serde_as] +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +#[serde(default)] +pub struct TwcsOptions { + /// Max num of files that can be kept in active writing time window. + #[serde_as(as = "DisplayFromStr")] + pub max_active_window_files: usize, + /// Max num of files that can be kept in inactive time window. + #[serde_as(as = "DisplayFromStr")] + pub max_inactive_window_files: usize, + /// Compaction time window defined when creating tables. + #[serde(with = "humantime_serde")] + pub time_window: Option, +} + +with_prefix!(prefix_twcs "compaction.twcs."); + +impl TwcsOptions { + /// Returns time window in second resolution. + pub fn time_window_seconds(&self) -> Option { + self.time_window.and_then(|window| { + let window_secs = window.as_secs(); + if window_secs == 0 { + None + } else { + window_secs.try_into().ok() + } + }) + } +} + +impl Default for TwcsOptions { + fn default() -> Self { + Self { + max_active_window_files: 4, + max_inactive_window_files: 1, + time_window: None, + } + } +} + +/// We need to define a new struct without enum fields as `#[serde(default)]` does not +/// support external tagging. +#[derive(Debug, Deserialize)] +#[serde(default)] +struct RegionOptionsWithoutEnum { + /// Region SST files TTL. + #[serde(with = "humantime_serde")] + ttl: Option, +} + +impl Default for RegionOptionsWithoutEnum { + fn default() -> Self { + let options = RegionOptions::default(); + RegionOptionsWithoutEnum { ttl: options.ttl } + } +} + +/// Converts the `options` map to a json object. +/// +/// Converts all key-values to lowercase and replaces "null" strings by `null` json values. +fn options_map_to_value(options: &HashMap) -> Value { + let map = options + .iter() + .map(|(key, value)| { + let (key, value) = (key.to_lowercase(), value.to_lowercase()); + + if value == "null" { + (key, Value::Null) + } else { + (key, Value::from(value)) + } + }) + .collect(); + Value::Object(map) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_map(options: &[(&str, &str)]) -> HashMap { + options + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect() + } + + #[test] + fn test_empty_region_options() { + let map = make_map(&[]); + let options = RegionOptions::try_from(&map).unwrap(); + assert_eq!(RegionOptions::default(), options); + } + + #[test] + fn test_with_ttl() { + let map = make_map(&[("ttl", "7d")]); + let options = RegionOptions::try_from(&map).unwrap(); + let expect = RegionOptions { + ttl: Some(Duration::from_secs(3600 * 24 * 7)), + ..Default::default() + }; + assert_eq!(expect, options); + } + + #[test] + fn test_without_compaction_type() { + // If `compaction.type` is not provided, we ignore all compaction + // related options. Actually serde does not support deserialize + // an enum without knowning its type. + let map = make_map(&[ + ("compaction.twcs.max_active_window_files", "8"), + ("compaction.twcs.time_window", "2h"), + ]); + let options = RegionOptions::try_from(&map).unwrap(); + let expect = RegionOptions::default(); + assert_eq!(expect, options); + } + + #[test] + fn test_with_compaction_type() { + let map = make_map(&[ + ("compaction.twcs.max_active_window_files", "8"), + ("compaction.twcs.time_window", "2h"), + ("compaction.type", "twcs"), + ]); + let options = RegionOptions::try_from(&map).unwrap(); + let expect = RegionOptions { + compaction: CompactionOptions::Twcs(TwcsOptions { + max_active_window_files: 8, + time_window: Some(Duration::from_secs(3600 * 2)), + ..Default::default() + }), + ..Default::default() + }; + assert_eq!(expect, options); + } + + #[test] + fn test_with_all() { + let map = make_map(&[ + ("ttl", "7d"), + ("compaction.twcs.max_active_window_files", "8"), + ("compaction.twcs.max_inactive_window_files", "2"), + ("compaction.twcs.time_window", "2h"), + ("compaction.type", "twcs"), + ]); + let options = RegionOptions::try_from(&map).unwrap(); + let expect = RegionOptions { + ttl: Some(Duration::from_secs(3600 * 24 * 7)), + compaction: CompactionOptions::Twcs(TwcsOptions { + max_active_window_files: 8, + max_inactive_window_files: 2, + time_window: Some(Duration::from_secs(3600 * 2)), + }), + }; + assert_eq!(expect, options); + } +} diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index 87cc71bb29be..e88fafedaf08 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -31,6 +31,7 @@ use store_api::storage::SequenceNumber; use crate::manifest::action::RegionEdit; use crate::memtable::version::{MemtableVersion, MemtableVersionRef}; use crate::memtable::{MemtableBuilderRef, MemtableId, MemtableRef}; +use crate::region::options::RegionOptions; use crate::sst::file::FileMeta; use crate::sst::file_purger::FilePurgerRef; use crate::sst::version::{SstVersion, SstVersionRef}; @@ -204,7 +205,8 @@ pub(crate) struct Version { /// /// Used to check if it is a flush task during the truncating table. pub(crate) truncated_entry_id: Option, - // TODO(yingwen): RegionOptions. + /// Options of the region. + pub(crate) options: RegionOptions, } pub(crate) type VersionRef = Arc; @@ -217,6 +219,7 @@ pub(crate) struct VersionBuilder { flushed_entry_id: EntryId, flushed_sequence: SequenceNumber, truncated_entry_id: Option, + options: RegionOptions, } impl VersionBuilder { @@ -229,6 +232,7 @@ impl VersionBuilder { flushed_entry_id: 0, flushed_sequence: 0, truncated_entry_id: None, + options: RegionOptions::default(), } } @@ -241,6 +245,7 @@ impl VersionBuilder { flushed_entry_id: version.flushed_entry_id, flushed_sequence: version.flushed_sequence, truncated_entry_id: version.truncated_entry_id, + options: version.options.clone(), } } @@ -274,6 +279,12 @@ impl VersionBuilder { self } + /// Sets options. + pub(crate) fn options(mut self, options: RegionOptions) -> Self { + self.options = options; + self + } + /// Apply edit to the builder. pub(crate) fn apply_edit(mut self, edit: RegionEdit, file_purger: FilePurgerRef) -> Self { if let Some(entry_id) = edit.flushed_entry_id { @@ -324,6 +335,7 @@ impl VersionBuilder { flushed_entry_id: self.flushed_entry_id, flushed_sequence: self.flushed_sequence, truncated_entry_id: self.truncated_entry_id, + options: self.options, } } } diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 1fb24446909a..1fbcbc962338 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -16,14 +16,12 @@ use std::collections::HashMap; use std::sync::Arc; -use std::time::Duration; use api::helper::{ is_column_type_value_eq, is_semantic_type_eq, proto_value_type, to_column_data_type, to_proto_value, }; use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value}; -use common_base::readable_size::ReadableSize; use common_query::Output; use common_query::Output::AffectedRows; use common_telemetry::tracing::log::info; @@ -37,10 +35,9 @@ use store_api::region_request::{ RegionAlterRequest, RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, RegionDropRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest, }; -use store_api::storage::{CompactionStrategy, RegionId, SequenceNumber}; +use store_api::storage::{RegionId, SequenceNumber}; use tokio::sync::oneshot::{self, Receiver, Sender}; -use crate::config::DEFAULT_WRITE_BUFFER_SIZE; use crate::error::{ CompactRegionSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu, FlushRegionSnafu, InvalidRequestSnafu, Result, @@ -50,29 +47,6 @@ use crate::sst::file::FileMeta; use crate::sst::file_purger::{FilePurgerRef, PurgeRequest}; use crate::wal::EntryId; -/// Options that affect the entire region. -/// -/// Users need to specify the options while creating/opening a region. -#[derive(Debug)] -pub struct RegionOptions { - /// Region memtable max size in bytes. - pub write_buffer_size: Option, - /// Region SST files TTL. - pub ttl: Option, - /// Compaction strategy. - pub compaction_strategy: CompactionStrategy, -} - -impl Default for RegionOptions { - fn default() -> Self { - RegionOptions { - write_buffer_size: Some(DEFAULT_WRITE_BUFFER_SIZE), - ttl: None, - compaction_strategy: CompactionStrategy::default(), - } - } -} - /// Request to write a region. #[derive(Debug)] pub struct WriteRequest { diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 43195da18f82..2d81f0cf63b7 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -209,6 +209,7 @@ pub struct CreateRequestBuilder { tag_num: usize, field_num: usize, create_if_not_exists: bool, + options: HashMap, } impl Default for CreateRequestBuilder { @@ -218,6 +219,7 @@ impl Default for CreateRequestBuilder { tag_num: 1, field_num: 1, create_if_not_exists: false, + options: HashMap::new(), } } } @@ -247,6 +249,11 @@ impl CreateRequestBuilder { self } + pub fn insert_option(mut self, key: &str, value: &str) -> Self { + self.options.insert(key.to_string(), value.to_string()); + self + } + pub fn build(&self) -> RegionCreateRequest { let mut column_id = 0; let mut column_metadatas = Vec::with_capacity(self.tag_num + self.field_num + 1); @@ -292,7 +299,7 @@ impl CreateRequestBuilder { column_metadatas, primary_key, create_if_not_exists: self.create_if_not_exists, - options: HashMap::default(), + options: self.options.clone(), region_dir: self.region_dir.clone(), } } diff --git a/src/mito2/src/worker/handle_create.rs b/src/mito2/src/worker/handle_create.rs index 31fee02008ed..6db800a3e604 100644 --- a/src/mito2/src/worker/handle_create.rs +++ b/src/mito2/src/worker/handle_create.rs @@ -62,6 +62,7 @@ impl RegionWorkerLoop { ) .metadata(metadata) .region_dir(&request.region_dir) + .options(request.options) .create(&self.config) .await?; diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index 5614178c0841..e6de10b66933 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -62,6 +62,7 @@ impl RegionWorkerLoop { self.scheduler.clone(), ) .region_dir(&request.region_dir) + .options(request.options) .open(&self.config, &self.wal) .await?; diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 460f971df53f..768f03a35878 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -446,7 +446,6 @@ fn create_table_info( engine: create_table.engine.clone(), next_column_id: column_schemas.len() as u32, region_numbers: vec![], - engine_options: HashMap::new(), options: table_options, created_on: DateTime::default(), partition_key_indices, diff --git a/src/query/src/sql/show_create_table.rs b/src/query/src/sql/show_create_table.rs index f67623ce7e8e..97d8aba4fbbf 100644 --- a/src/query/src/sql/show_create_table.rs +++ b/src/query/src/sql/show_create_table.rs @@ -228,7 +228,6 @@ mod tests { .value_indices(vec![2, 3]) .engine("mito".to_string()) .next_column_id(0) - .engine_options(Default::default()) .options(Default::default()) .created_on(Default::default()) .region_numbers(regions) @@ -297,7 +296,6 @@ WITH( .primary_key_indices(vec![]) .engine("file".to_string()) .next_column_id(0) - .engine_options(Default::default()) .options(options) .created_on(Default::default()) .build() diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index 0f8880a14ce0..0a0c598c5625 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -107,9 +107,6 @@ pub struct TableMeta { #[builder(default, setter(into))] pub region_numbers: Vec, pub next_column_id: ColumnId, - /// Options for table engine. - #[builder(default)] - pub engine_options: HashMap, /// Table options. #[builder(default)] pub options: TableOptions, @@ -229,7 +226,6 @@ impl TableMeta { let mut builder = TableMetaBuilder::default(); let _ = builder .engine(&self.engine) - .engine_options(self.engine_options.clone()) .options(self.options.clone()) .created_on(self.created_on) .region_numbers(self.region_numbers.clone()) @@ -531,7 +527,6 @@ pub struct RawTableMeta { pub engine: String, pub next_column_id: ColumnId, pub region_numbers: Vec, - pub engine_options: HashMap, pub options: TableOptions, pub created_on: DateTime, #[serde(default)] @@ -547,7 +542,6 @@ impl From for RawTableMeta { engine: meta.engine, next_column_id: meta.next_column_id, region_numbers: meta.region_numbers, - engine_options: meta.engine_options, options: meta.options, created_on: meta.created_on, partition_key_indices: meta.partition_key_indices, @@ -566,7 +560,6 @@ impl TryFrom for TableMeta { engine: raw.engine, region_numbers: raw.region_numbers, next_column_id: raw.next_column_id, - engine_options: raw.engine_options, options: raw.options, created_on: raw.created_on, partition_key_indices: raw.partition_key_indices, diff --git a/src/table/src/test_util/memtable.rs b/src/table/src/test_util/memtable.rs index 35dbdf3d30d8..cb36bac2c77c 100644 --- a/src/table/src/test_util/memtable.rs +++ b/src/table/src/test_util/memtable.rs @@ -73,7 +73,6 @@ impl MemTable { .value_indices(vec![]) .engine("mito".to_string()) .next_column_id(0) - .engine_options(Default::default()) .options(Default::default()) .created_on(Default::default()) .region_numbers(regions) diff --git a/src/table/src/test_util/table_info.rs b/src/table/src/test_util/table_info.rs index ae061ccb02ae..7ddf6c019cca 100644 --- a/src/table/src/test_util/table_info.rs +++ b/src/table/src/test_util/table_info.rs @@ -29,7 +29,6 @@ pub fn test_table_info( .value_indices(vec![]) .engine("mito".to_string()) .next_column_id(0) - .engine_options(Default::default()) .options(Default::default()) .created_on(Default::default()) .region_numbers(vec![1])