Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mito): make use of options in RegionCreate/OpenRequest #2436

Merged
merged 14 commits into from
Sep 19, 2023
Merged
34 changes: 34 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion src/cmd/src/cli/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ fn create_table_info(table_id: TableId, table_name: TableName) -> RawTableInfo {
created_on: chrono::DateTime::default(),
primary_key_indices: vec![],
next_column_id: columns as u32 + 1,
engine_options: Default::default(),
value_indices: vec![],
options: Default::default(),
region_numbers: (1..=100).collect(),
Expand Down
1 change: 0 additions & 1 deletion src/common/meta/src/key/table_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,6 @@ mod tests {
created_on: chrono::DateTime::default(),
primary_key_indices: vec![0, 1],
next_column_id: 3,
engine_options: Default::default(),
value_indices: vec![2, 3],
options: Default::default(),
region_numbers: vec![1],
Expand Down
2 changes: 0 additions & 2 deletions src/meta-srv/src/procedure/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ pub mod mock {

#[cfg(test)]
pub mod test_data {
use std::collections::HashMap;
use std::sync::Arc;

use chrono::DateTime;
Expand Down Expand Up @@ -178,7 +177,6 @@ pub mod test_data {
engine: MITO2_ENGINE.to_string(),
next_column_id: 3,
region_numbers: vec![1, 2, 3],
engine_options: HashMap::new(),
options: TableOptions::default(),
created_on: DateTime::default(),
partition_key_indices: vec![],
Expand Down
3 changes: 0 additions & 3 deletions src/meta-srv/src/table_routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ pub(crate) async fn fetch_tables(

#[cfg(test)]
pub(crate) mod tests {
use std::collections::HashMap;

use chrono::DateTime;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
use common_meta::key::TableMetadataManagerRef;
Expand Down Expand Up @@ -103,7 +101,6 @@ pub(crate) mod tests {
engine: MITO_ENGINE.to_string(),
next_column_id: 1,
region_numbers: vec![1, 2, 3, 4],
engine_options: HashMap::new(),
options: TableOptions::default(),
created_on: DateTime::default(),
partition_key_indices: vec![],
Expand Down
1 change: 1 addition & 0 deletions src/mito2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ prost.workspace = true
regex = "1.5"
serde = { version = "1.0", features = ["derive"] }
serde_json.workspace = true
serde_with = "3"
smallvec.workspace = true
snafu.workspace = true
store-api = { workspace = true }
Expand Down
19 changes: 7 additions & 12 deletions src/mito2/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,19 @@ mod twcs;

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use common_telemetry::{debug, error};
pub use picker::CompactionPickerRef;
use snafu::ResultExt;
use store_api::storage::{CompactionStrategy, RegionId, TwcsOptions};
use store_api::storage::RegionId;
use tokio::sync::mpsc::{self, Sender};

use crate::access_layer::AccessLayerRef;
use crate::compaction::twcs::TwcsPicker;
use crate::error::{
CompactRegionSnafu, Error, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result,
};
use crate::region::options::CompactionOptions;
use crate::region::version::{VersionControlRef, VersionRef};
use crate::request::{OptionOutputTx, OutputTx, WorkerRequest};
use crate::schedule::scheduler::SchedulerRef;
Expand All @@ -42,7 +42,6 @@ use crate::sst::file_purger::FilePurgerRef;
pub struct CompactionRequest {
pub(crate) current_version: VersionRef,
pub(crate) access_layer: AccessLayerRef,
pub(crate) ttl: Option<Duration>,
pub(crate) compaction_time_window: Option<i64>,
/// Sender to send notification to the region worker.
pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
Expand All @@ -64,13 +63,13 @@ impl CompactionRequest {
}
}

/// Builds compaction picker according to [CompactionStrategy].
pub fn compaction_strategy_to_picker(strategy: &CompactionStrategy) -> CompactionPickerRef {
/// Builds compaction picker according to [CompactionOptions].
pub fn compaction_options_to_picker(strategy: &CompactionOptions) -> CompactionPickerRef {
match strategy {
CompactionStrategy::Twcs(twcs_opts) => Arc::new(TwcsPicker::new(
CompactionOptions::Twcs(twcs_opts) => Arc::new(TwcsPicker::new(
twcs_opts.max_active_window_files,
twcs_opts.max_inactive_window_files,
twcs_opts.time_window_seconds,
twcs_opts.time_window_seconds(),
)) as Arc<_>,
}
}
Expand Down Expand Up @@ -175,9 +174,7 @@ impl CompactionScheduler {
///
/// If the region has nothing to compact, it removes the region from the status map.
fn schedule_compaction_request(&mut self, request: CompactionRequest) -> Result<()> {
// TODO(hl): build picker according to region options.
let picker =
compaction_strategy_to_picker(&CompactionStrategy::Twcs(TwcsOptions::default()));
let picker = compaction_options_to_picker(&request.current_version.options.compaction);
let region_id = request.region_id();
debug!(
"Pick compaction strategy {:?} for region: {}",
Expand Down Expand Up @@ -309,8 +306,6 @@ impl CompactionStatus {
let mut req = CompactionRequest {
current_version,
access_layer: self.access_layer.clone(),
// TODO(hl): get TTL info from region metadata
ttl: None,
// TODO(hl): get persisted region compaction time window
compaction_time_window: None,
request_sender: request_sender.clone(),
Expand Down
3 changes: 1 addition & 2 deletions src/mito2/src/compaction/twcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ impl Picker for TwcsPicker {
let CompactionRequest {
current_version,
access_layer,
ttl,
compaction_time_window,
request_sender,
waiters,
Expand All @@ -131,6 +130,7 @@ impl Picker for TwcsPicker {
let region_id = region_metadata.region_id;

let levels = current_version.ssts.levels();
let ttl = current_version.options.ttl;
let expired_ssts = get_expired_ssts(levels, ttl, Timestamp::current_millis());
if !expired_ssts.is_empty() {
info!("Expired SSTs in region {}: {:?}", region_id, expired_ssts);
Expand Down Expand Up @@ -376,7 +376,6 @@ impl CompactionTask for TwcsCompactionTask {
notify,
})
.await;
// TODO(hl): handle reschedule
}
}

Expand Down
2 changes: 0 additions & 2 deletions src/mito2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ use serde::{Deserialize, Serialize};
const DEFAULT_NUM_WORKERS: usize = 1;
/// Default max running background job.
const DEFAULT_MAX_BG_JOB: usize = 4;
/// Default region write buffer size.
pub(crate) const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(32);

/// Configuration for [MitoEngine](crate::engine::MitoEngine).
#[derive(Debug, Serialize, Deserialize, Clone)]
Expand Down
24 changes: 24 additions & 0 deletions src/mito2/src/engine/create_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;

use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use store_api::region_engine::RegionEngine;
Expand Down Expand Up @@ -77,3 +79,25 @@ async fn test_engine_create_existing_region() {
"unexpected err: {err}"
);
}

#[tokio::test]
async fn test_engine_create_with_options() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new()
.insert_option("ttl", "10d")
.build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

assert!(engine.is_region_exists(region_id));
let region = engine.get_region(region_id).unwrap();
assert_eq!(
Duration::from_secs(3600 * 24 * 10),
region.version().options.ttl.unwrap()
);
}
44 changes: 43 additions & 1 deletion src/mito2/src/engine/open_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@
// limitations under the License.

use std::collections::HashMap;
use std::time::Duration;

use api::v1::Rows;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionOpenRequest, RegionPutRequest, RegionRequest};
use store_api::region_request::{
RegionCloseRequest, RegionOpenRequest, RegionPutRequest, RegionRequest,
};
use store_api::storage::RegionId;

use crate::config::MitoConfig;
Expand Down Expand Up @@ -125,3 +128,42 @@ async fn test_engine_open_readonly() {
engine.set_writable(region_id, true).unwrap();
put_rows(&engine, region_id, rows).await;
}

#[tokio::test]
async fn test_engine_region_open_with_options() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let region_dir = request.region_dir.clone();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

// Close the region.
engine
.handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
.await
.unwrap();

// Open the region again with options.
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir,
options: HashMap::from([("ttl".to_string(), "4d".to_string())]),
}),
)
.await
.unwrap();

let region = engine.get_region(region_id).unwrap();
assert_eq!(
Duration::from_secs(3600 * 24 * 4),
region.version().options.ttl.unwrap()
);
}
7 changes: 7 additions & 0 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,12 @@ pub enum Error {
region_id: RegionId,
location: Location,
},

#[snafu(display("Invalid options, source: {}", source))]
JsonOptions {
source: serde_json::Error,
location: Location,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -522,6 +528,7 @@ impl ErrorExt for Error {
CompatReader { .. } => StatusCode::Unexpected,
InvalidRegionRequest { source, .. } => source.status_code(),
RegionReadonly { .. } => StatusCode::RegionReadonly,
JsonOptions { .. } => StatusCode::InvalidArguments,
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub mod memtable;
mod metrics;
#[allow(dead_code)]
pub mod read;
mod region;
pub mod region;
mod region_write_ctx;
#[allow(dead_code)]
pub mod request;
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//! Mito region.

pub(crate) mod opener;
pub mod options;
pub(crate) mod version;

use std::collections::HashMap;
Expand Down
Loading