diff --git a/Cargo.lock b/Cargo.lock index 9e2374df8e25..a5d1a503432b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -749,7 +749,7 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdca6a10ecad987bda04e95606ef85a5417dcaac1a78455242d72e031e2b6b62" dependencies = [ - "heck", + "heck 0.4.1", "proc-macro2", "quote", "syn 2.0.28", @@ -1456,7 +1456,7 @@ version = "3.2.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae6371b8bdc8b7d3959e9cf7b22d4435ef3e79e138688421ec654acf8c81b008" dependencies = [ - "heck", + "heck 0.4.1", "proc-macro-error", "proc-macro2", "quote", @@ -1469,7 +1469,7 @@ version = "4.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "54a9bb5758fc5dfe728d1019941681eccaf0cf8a4189b692a0ee2f2ecf90a050" dependencies = [ - "heck", + "heck 0.4.1", "proc-macro2", "quote", "syn 2.0.28", @@ -1668,6 +1668,7 @@ dependencies = [ "paste", "regex", "snafu", + "strum 0.21.0", "tokio", "tokio-util", "url", @@ -4245,6 +4246,15 @@ dependencies = [ "http", ] +[[package]] +name = "heck" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "heck" version = "0.4.1" @@ -5522,6 +5532,7 @@ dependencies = [ "snafu", "storage", "store-api", + "strum 0.21.0", "table", "tokio", "uuid", @@ -5600,7 +5611,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56b0d8a0db9bf6d2213e11f2c701cb91387b0614361625ab7b9743b41aa4938f" dependencies = [ "darling 0.20.3", - "heck", + "heck 0.4.1", "num-bigint", "proc-macro-crate 1.3.1", "proc-macro-error", @@ -7035,7 +7046,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270" dependencies = [ "bytes", - "heck", + "heck 0.4.1", "itertools 0.10.5", "lazy_static", "log", @@ -9102,7 +9113,7 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "990079665f075b699031e9c08fd3ab99be5029b96f3b78dc0709e8f77e4efebf" dependencies = [ - "heck", + "heck 0.4.1", "proc-macro2", "quote", "syn 1.0.109", @@ -9354,7 +9365,7 @@ checksum = "9966e64ae989e7e575b19d7265cb79d7fc3cbbdf179835cb0d716f294c2049c9" dependencies = [ "dotenvy", "either", - "heck", + "heck 0.4.1", "once_cell", "proc-macro2", "quote", @@ -9556,6 +9567,15 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +[[package]] +name = "strum" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aaf86bbcfd1fa9670b7a129f64fc0c9fcbbfe4f1bc4210e9e98fe71ffc12cde2" +dependencies = [ + "strum_macros 0.21.1", +] + [[package]] name = "strum" version = "0.24.1" @@ -9574,13 +9594,25 @@ dependencies = [ "strum_macros 0.25.1", ] +[[package]] +name = "strum_macros" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d06aaeeee809dbc59eb4556183dd927df67db1540de5be8d3ec0b6636358a5ec" +dependencies = [ + "heck 0.3.3", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "strum_macros" version = "0.24.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e385be0d24f186b4ce2f9982191e7101bb737312ad61c1f2f984f34bcf85d59" dependencies = [ - "heck", + "heck 0.4.1", "proc-macro2", "quote", "rustversion", @@ -9593,7 +9625,7 @@ version = "0.25.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6069ca09d878a33f883cc06aaa9718ede171841d3832450354410b718b097232" dependencies = [ - "heck", + "heck 0.4.1", "proc-macro2", "quote", "rustversion", @@ -9643,7 +9675,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3ae64fb7ad0670c7d6d53d57b1b91beb2212afc30e164cc8edb02d6b2cff32a" dependencies = [ "gix", - "heck", + "heck 0.4.1", "prettyplease 0.2.12", "prost", "prost-build", @@ -9665,7 +9697,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2ac1ce8315086b127ca0abf162c62279550942bb26ebf7946fe17fe114446472" dependencies = [ "git2", - "heck", + "heck 0.4.1", "prettyplease 0.2.12", "prost", "prost-build", @@ -10704,7 +10736,7 @@ version = "0.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95d27d749378ceab6ec22188ed7ad102205c89ddb92ab662371c850ffc71aa1a" dependencies = [ - "heck", + "heck 0.4.1", "log", "proc-macro2", "quote", @@ -10722,7 +10754,7 @@ version = "0.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c8d9ecedde2fd77e975c38eeb9ca40b34ad0247b2259c6e6bbd2a8d6cc2444f" dependencies = [ - "heck", + "heck 0.4.1", "log", "proc-macro2", "quote", diff --git a/src/common/datasource/Cargo.toml b/src/common/datasource/Cargo.toml index cfdedd8db3be..f67ca0a7cd06 100644 --- a/src/common/datasource/Cargo.toml +++ b/src/common/datasource/Cargo.toml @@ -27,6 +27,7 @@ orc-rust = "0.2" paste = "1.0" regex = "1.7" snafu.workspace = true +strum = { version = "0.21", features = ["derive"] } tokio-util.workspace = true tokio.workspace = true url = "2.3" diff --git a/src/common/datasource/src/compression.rs b/src/common/datasource/src/compression.rs index bc840cd6a816..23c28786a5ef 100644 --- a/src/common/datasource/src/compression.rs +++ b/src/common/datasource/src/compression.rs @@ -20,11 +20,12 @@ use async_compression::tokio::bufread::{BzDecoder, GzipDecoder, XzDecoder, ZstdD use async_compression::tokio::write; use bytes::Bytes; use futures::Stream; +use strum::EnumIter; use tokio::io::{AsyncRead, AsyncWriteExt, BufReader}; use tokio_util::io::{ReaderStream, StreamReader}; use crate::error::{self, Error, Result}; -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumIter)] pub enum CompressionType { /// Gzip-ed file Gzip, diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 09dc9c0db89e..2ee9dfb93174 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -46,6 +46,7 @@ serde_json = "1.0" snafu.workspace = true storage = { workspace = true } store-api = { workspace = true } +strum = "0.21" table = { workspace = true } tokio.workspace = true uuid.workspace = true diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 5462c9bac655..f1016f33ff95 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -37,7 +37,7 @@ pub struct MitoConfig { // Manifest configs: /// Number of meta action updated to trigger a new checkpoint /// for the manifest (default 10). - pub manifest_checkpoint_interval: u64, + pub manifest_checkpoint_distance: u64, /// Manifest compression type (default uncompressed). pub manifest_compress_type: CompressionType, } @@ -48,7 +48,7 @@ impl Default for MitoConfig { num_workers: DEFAULT_NUM_WORKERS, worker_channel_size: 128, worker_request_batch_size: 64, - manifest_checkpoint_interval: 10, + manifest_checkpoint_distance: 10, manifest_compress_type: CompressionType::Uncompressed, } } diff --git a/src/mito2/src/manifest.rs b/src/mito2/src/manifest.rs index 1b2ec86e5d3a..604401665aee 100644 --- a/src/mito2/src/manifest.rs +++ b/src/mito2/src/manifest.rs @@ -15,9 +15,8 @@ //! manifest storage pub mod action; -pub mod gc_task; -pub mod helper; #[allow(unused_variables)] pub mod manager; -pub mod options; pub mod storage; +#[cfg(test)] +mod tests; diff --git a/src/mito2/src/manifest/action.rs b/src/mito2/src/manifest/action.rs index c7ac51436198..0db4882e8a0f 100644 --- a/src/mito2/src/manifest/action.rs +++ b/src/mito2/src/manifest/action.rs @@ -12,18 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +//! Defines [RegionMetaAction] related structs and [RegionCheckpoint]. + use std::collections::HashMap; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; -use storage::metadata::VersionNumber; -use storage::sst::{FileId, FileMeta}; use store_api::manifest::action::{ProtocolAction, ProtocolVersion}; use store_api::manifest::ManifestVersion; use store_api::storage::{RegionId, SequenceNumber}; use crate::error::{RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu}; use crate::metadata::RegionMetadataRef; +use crate::sst::file::{FileId, FileMeta}; /// Actions that can be applied to region manifest. #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] @@ -46,7 +47,6 @@ pub struct RegionChange { #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] pub struct RegionEdit { - pub region_version: VersionNumber, pub files_to_add: Vec, pub files_to_remove: Vec, pub compaction_time_window: Option, @@ -123,8 +123,6 @@ impl RegionManifestBuilder { // The checkpoint of region manifest, generated by checkpointer. #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)] pub struct RegionCheckpoint { - /// The snasphot protocol - pub protocol: ProtocolAction, /// The last manifest version that this checkpoint compacts(inclusive). pub last_version: ManifestVersion, // The number of manifest actions that this checkpoint compacts. @@ -134,21 +132,20 @@ pub struct RegionCheckpoint { } impl RegionCheckpoint { - pub fn set_protocol(&mut self, action: ProtocolAction) { - self.protocol = action; - } - pub fn last_version(&self) -> ManifestVersion { self.last_version } pub fn encode(&self) -> Result> { - todo!() + let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?; + + Ok(json.into_bytes()) } - pub fn decode(bs: &[u8]) -> Result { - // helper::decode_checkpoint(bs, reader_version) - todo!() + pub fn decode(bytes: &[u8]) -> Result { + let data = std::str::from_utf8(bytes).context(Utf8Snafu)?; + + serde_json::from_str(data).context(SerdeJsonSnafu) } } @@ -207,7 +204,6 @@ impl RegionMetaActionIter { #[cfg(test)] mod tests { - use storage::sst::FileId; use super::*; @@ -238,7 +234,7 @@ mod tests { FileMeta { region_id: 0.into(), file_id: FileId::random(), - time_range: None, + time_range: (0.into(), 10000.into()), level: 0, file_size: 1024, } diff --git a/src/mito2/src/manifest/helper.rs b/src/mito2/src/manifest/helper.rs deleted file mode 100644 index 398eb2bb2fe5..000000000000 --- a/src/mito2/src/manifest/helper.rs +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use serde::Serialize; -use store_api::manifest::action::ProtocolVersion; -use store_api::manifest::ManifestVersion; - -use crate::error::Result; -use crate::manifest::action::RegionCheckpoint; -pub const NEWLINE: &[u8] = b"\n"; - -pub fn encode_actions( - prev_version: ManifestVersion, - actions: &[T], -) -> Result> { - todo!() -} - -pub fn encode_checkpoint(snasphot: &RegionCheckpoint) -> Result> { - todo!() -} - -pub fn decode_checkpoint(bs: &[u8], reader_version: ProtocolVersion) -> Result { - todo!() -} diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index 25f9c4c7bef7..a9a675252b9b 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -14,20 +14,32 @@ use std::sync::Arc; +use common_datasource::compression::CompressionType; use common_telemetry::{debug, info}; -use store_api::manifest::action::{ProtocolAction, ProtocolVersion}; +use object_store::ObjectStore; use store_api::manifest::{ManifestVersion, MAX_VERSION, MIN_VERSION}; use tokio::sync::RwLock; use crate::error::Result; use crate::manifest::action::{ RegionChange, RegionCheckpoint, RegionManifest, RegionManifestBuilder, RegionMetaAction, - RegionMetaActionIter, RegionMetaActionList, + RegionMetaActionList, }; -use crate::manifest::options::RegionManifestOptions; use crate::manifest::storage::ManifestObjectStore; use crate::metadata::RegionMetadataRef; +/// Options for [RegionManifestManager]. +#[derive(Debug, Clone)] +pub struct RegionManifestOptions { + /// Directory to store manifest. + pub manifest_dir: String, + pub object_store: ObjectStore, + pub compress_type: CompressionType, + /// Interval of version ([ManifestVersion](store_api::manifest::ManifestVersion)) between two checkpoints. + /// Set to 0 to disable checkpoint. + pub checkpoint_distance: u64, +} + // rewrite note: // trait Checkpoint -> struct RegionCheckpoint // trait MetaAction -> struct RegionMetaActionList @@ -136,6 +148,12 @@ impl RegionManifestManager { let inner = self.inner.read().await; inner.manifest.clone() } + + #[cfg(test)] + pub async fn store(&self) -> ManifestObjectStore { + let inner = self.inner.read().await; + inner.store.clone() + } } #[cfg(test)] @@ -155,10 +173,12 @@ impl RegionManifestManager { } #[derive(Debug)] -struct RegionManifestManagerInner { +pub(crate) struct RegionManifestManagerInner { store: ManifestObjectStore, options: RegionManifestOptions, last_version: ManifestVersion, + /// The last version included in checkpoint file. + last_checkpoint_version: ManifestVersion, manifest: Arc, } @@ -198,12 +218,11 @@ impl RegionManifestManagerInner { RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange { metadata })); store.save(version, &action_list.encode()?).await?; - // todo: start gc task - Ok(Self { store, options, last_version: version, + last_checkpoint_version: MIN_VERSION, manifest: Arc::new(manifest), }) } @@ -222,10 +241,11 @@ impl RegionManifestManagerInner { // recover from storage // construct manifest builder let mut version = MIN_VERSION; - let last_checkpoint = store.load_last_checkpoint().await?; - let checkpoint = last_checkpoint - .map(|(_, raw_checkpoint)| RegionCheckpoint::decode(&raw_checkpoint)) - .transpose()?; + let checkpoint = Self::last_checkpoint(&store).await?; + let last_checkpoint_version = checkpoint + .as_ref() + .map(|checkpoint| checkpoint.last_version) + .unwrap_or(MIN_VERSION); let mut manifest_builder = if let Some(checkpoint) = checkpoint { info!( "Recover region manifest {} from checkpoint version {}", @@ -276,18 +296,16 @@ impl RegionManifestManagerInner { ); let version = manifest.manifest_version; - // todo: start gc task - Ok(Some(Self { store, options, last_version: version, + last_checkpoint_version, manifest: Arc::new(manifest), })) } async fn stop(&mut self) -> Result<()> { - // todo: stop gc task Ok(()) } @@ -315,6 +333,7 @@ impl RegionManifestManagerInner { } let new_manifest = manifest_builder.try_build()?; self.manifest = Arc::new(new_manifest); + self.may_do_checkpoint(version).await?; Ok(version) } @@ -327,68 +346,102 @@ impl RegionManifestManagerInner { self.last_version } - // pub (crate) fn checkpointer(&self) -> Checkpointer { - // todo!() - // } - - pub(crate) fn set_last_checkpoint_version(&self, _version: ManifestVersion) { - todo!() - } - - /// Update inner state. - pub fn update_state(&self, _version: ManifestVersion, _protocol: Option) { - todo!() - } - - pub(crate) async fn save_checkpoint(&self, checkpoint: &RegionCheckpoint) -> Result<()> { - todo!() - } + pub(crate) async fn may_do_checkpoint(&mut self, version: ManifestVersion) -> Result<()> { + if version - self.last_checkpoint_version >= self.options.checkpoint_distance + && self.options.checkpoint_distance != 0 + { + debug!( + "Going to do checkpoint for version [{} ~ {}]", + self.last_checkpoint_version, version + ); + if let Some(checkpoint) = self.do_checkpoint().await? { + self.last_checkpoint_version = checkpoint.last_version(); + } + } - pub(crate) async fn may_do_checkpoint(&self, version: ManifestVersion) -> Result<()> { - todo!() + Ok(()) } - // pub(crate) fn manifest_store(&self) -> &Arc { - // todo!() - // } - - // from Manifest + /// Make a new checkpoint. Return the fresh one if there are some actions to compact. + async fn do_checkpoint(&self) -> Result> { + let last_checkpoint = Self::last_checkpoint(&self.store).await?; + let current_version = self.last_version; + + let (start_version, mut manifest_builder) = if let Some(checkpoint) = last_checkpoint { + ( + checkpoint.last_version + 1, + RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint), + ) + } else { + (MIN_VERSION, RegionManifestBuilder::default()) + }; + let end_version = current_version; - async fn scan( - &self, - start: ManifestVersion, - end: ManifestVersion, - ) -> Result { - todo!() - } + if start_version >= end_version { + return Ok(None); + } - async fn do_checkpoint(&self) -> Result> { - todo!() - } + let mut iter = self.store.scan(start_version, end_version).await?; + let mut last_version = start_version; + let mut compacted_actions = 0; + while let Some((version, raw_action_list)) = iter.next_log().await? { + let action_list = RegionMetaActionList::decode(&raw_action_list)?; + for action in action_list.actions { + match action { + RegionMetaAction::Change(action) => { + manifest_builder.apply_change(version, action); + } + RegionMetaAction::Edit(action) => { + manifest_builder.apply_edit(version, action); + } + RegionMetaAction::Remove(_) | RegionMetaAction::Protocol(_) => { + debug!( + "Unhandled action for region {}, action: {:?}", + self.manifest.metadata.region_id, action + ); + } + } + } + last_version = version; + compacted_actions += 1; + } - async fn last_checkpoint(&self) -> Result> { - todo!() - } + if compacted_actions == 0 { + return Ok(None); + } - // from Checkpoint + let region_manifest = manifest_builder.try_build()?; + let checkpoint = RegionCheckpoint { + last_version, + compacted_actions, + checkpoint: Some(region_manifest), + }; - /// Set a protocol action into checkpoint - pub fn set_protocol(&mut self, _action: ProtocolAction) { - todo!() - } + self.store + .save_checkpoint(last_version, &checkpoint.encode()?) + .await?; + // TODO(ruihang): this task can be detached + self.store.delete_until(last_version, true).await?; - /// The last compacted action's version of checkpoint - pub fn last_version(&self) -> ManifestVersion { - todo!() + info!( + "Done manifest checkpoint for region {}, version: [{}, {}], current latest version: {}, compacted {} actions.", + self.manifest.metadata.region_id, start_version, end_version, last_version, compacted_actions + ); + Ok(Some(checkpoint)) } - /// Encode this checkpoint into a byte vector - pub fn encode(&self) -> Result> { - todo!() - } + /// Fetch the last [RegionCheckpoint] from storage. + pub(crate) async fn last_checkpoint( + store: &ManifestObjectStore, + ) -> Result> { + let last_checkpoint = store.load_last_checkpoint().await?; - pub fn decode(_bytes: &[u8], _reader_version: ProtocolVersion) -> Result { - todo!() + if let Some((version, bytes)) = last_checkpoint { + let checkpoint = RegionCheckpoint::decode(&bytes)?; + Ok(Some(checkpoint)) + } else { + Ok(None) + } } } @@ -397,43 +450,13 @@ mod test { use common_datasource::compression::CompressionType; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; - use store_api::storage::RegionId; use super::*; use crate::manifest::action::RegionChange; - use crate::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder, SemanticType}; + use crate::manifest::tests::utils::basic_region_metadata; + use crate::metadata::{ColumnMetadata, RegionMetadataBuilder, SemanticType}; use crate::test_util::TestEnv; - fn basic_region_metadata() -> RegionMetadata { - let mut builder = RegionMetadataBuilder::new(RegionId::new(23, 33), 0); - builder - .push_column_metadata(ColumnMetadata { - column_schema: ColumnSchema::new( - "ts", - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ), - semantic_type: SemanticType::Timestamp, - column_id: 45, - }) - .push_column_metadata(ColumnMetadata { - column_schema: ColumnSchema::new("pk", ConcreteDataType::string_datatype(), false), - semantic_type: SemanticType::Tag, - column_id: 36, - }) - .push_column_metadata(ColumnMetadata { - column_schema: ColumnSchema::new( - "val", - ConcreteDataType::float64_datatype(), - false, - ), - semantic_type: SemanticType::Field, - column_id: 251, - }) - .primary_key(vec![36]); - builder.build().unwrap() - } - #[tokio::test] async fn create_manifest_manager() { let metadata = Arc::new(basic_region_metadata()); diff --git a/src/mito2/src/manifest/options.rs b/src/mito2/src/manifest/options.rs deleted file mode 100644 index d72ea5ff2333..000000000000 --- a/src/mito2/src/manifest/options.rs +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! Options for [RegionManifestManager](crate::manifest::manager::RegionManifestManager). - -use common_datasource::compression::CompressionType; -use object_store::ObjectStore; - -/// Options for manifest. -#[derive(Debug, Clone)] -pub struct RegionManifestOptions { - /// Directory to store manifest. - pub manifest_dir: String, - pub object_store: ObjectStore, - pub compress_type: CompressionType, - /// Interval of version ([ManifestVersion](store_api::manifest::ManifestVersion)) between two checkpoints. - pub checkpoint_interval: u64, -} diff --git a/src/mito2/src/manifest/storage.rs b/src/mito2/src/manifest/storage.rs index db270056d138..b136faac3862 100644 --- a/src/mito2/src/manifest/storage.rs +++ b/src/mito2/src/manifest/storage.rs @@ -157,13 +157,13 @@ impl ManifestObjectStore { /// Returns the last checkpoint path, because the last checkpoint is not compressed, /// so its path name has nothing to do with the compression algorithm used by `ManifestObjectStore` - fn last_checkpoint_path(&self) -> String { + pub(crate) fn last_checkpoint_path(&self) -> String { format!("{}{}", self.path, LAST_CHECKPOINT_FILE) } /// Return all `R`s in the root directory that meet the `filter` conditions (that is, the `filter` closure returns `Some(R)`), /// and discard `R` that does not meet the conditions (that is, the `filter` closure returns `None`) - async fn get_paths(&self, filter: F) -> Result> + pub async fn get_paths(&self, filter: F) -> Result> where F: Fn(Entry) -> Option, { @@ -211,7 +211,7 @@ impl ManifestObjectStore { }) } - async fn delete_until( + pub async fn delete_until( &self, end: ManifestVersion, keep_last_checkpoint: bool, @@ -370,7 +370,7 @@ impl ManifestObjectStore { Ok(()) } - async fn save_checkpoint(&self, version: ManifestVersion, bytes: &[u8]) -> Result<()> { + pub async fn save_checkpoint(&self, version: ManifestVersion, bytes: &[u8]) -> Result<()> { let path = self.checkpoint_file_path(version); let data = self .compress_type @@ -409,7 +409,7 @@ impl ManifestObjectStore { Ok(()) } - async fn load_checkpoint( + pub async fn load_checkpoint( &self, version: ManifestVersion, ) -> Result)>> { @@ -510,6 +510,11 @@ impl ManifestObjectStore { self.load_checkpoint(checkpoint_metadata.version).await } + + #[cfg(test)] + pub async fn read_file(&self, path: &str) -> Result> { + self.object_store.read(path).await.context(OpenDalSnafu) + } } #[derive(Serialize, Deserialize, Debug)] diff --git a/src/mito2/src/manifest/gc_task.rs b/src/mito2/src/manifest/tests.rs similarity index 64% rename from src/mito2/src/manifest/gc_task.rs rename to src/mito2/src/manifest/tests.rs index 47e459eff689..01ab6e16a0c6 100644 --- a/src/mito2/src/manifest/gc_task.rs +++ b/src/mito2/src/manifest/tests.rs @@ -12,19 +12,5 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_runtime::TaskFunction; - -struct ManifestGcTask {} - -#[async_trait::async_trait] -impl TaskFunction<()> for ManifestGcTask { - /// Invoke the task. - async fn call(&mut self) -> std::result::Result<(), ()> { - todo!() - } - - /// Name of the task. - fn name(&self) -> &str { - todo!() - } -} +mod checkpoint; +pub mod utils; diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs new file mode 100644 index 000000000000..fa0f87616039 --- /dev/null +++ b/src/mito2/src/manifest/tests/checkpoint.rs @@ -0,0 +1,207 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_datasource::compression::CompressionType; +use store_api::storage::RegionId; +use strum::IntoEnumIterator; + +use crate::manifest::action::{ + RegionCheckpoint, RegionEdit, RegionMetaAction, RegionMetaActionList, +}; +use crate::manifest::manager::{RegionManifestManager, RegionManifestManagerInner}; +use crate::manifest::tests::utils::basic_region_metadata; +use crate::sst::file::{FileId, FileMeta}; +use crate::test_util::TestEnv; + +async fn build_manager( + checkpoint_distance: u64, + compress_type: CompressionType, +) -> (TestEnv, RegionManifestManager) { + let metadata = Arc::new(basic_region_metadata()); + let env = TestEnv::new(); + let manager = env + .create_manifest_manager(compress_type, checkpoint_distance, Some(metadata.clone())) + .await + .unwrap() + .unwrap(); + + (env, manager) +} + +async fn reopen_manager( + env: &TestEnv, + checkpoint_distance: u64, + compress_type: CompressionType, +) -> RegionManifestManager { + env.create_manifest_manager(compress_type, checkpoint_distance, None) + .await + .unwrap() + .unwrap() +} + +fn nop_action() -> RegionMetaActionList { + RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit { + files_to_add: vec![], + files_to_remove: vec![], + compaction_time_window: None, + flushed_sequence: None, + })]) +} + +#[tokio::test] +async fn manager_without_checkpoint() { + let (_env, manager) = build_manager(0, CompressionType::Uncompressed).await; + + // apply 10 actions + for i in 0..10 { + manager.update(nop_action()).await.unwrap(); + } + + // no checkpoint + assert!(manager + .store() + .await + .load_last_checkpoint() + .await + .unwrap() + .is_none()); + + // check files + let mut expected = vec![ + "00000000000000000010.json", + "00000000000000000009.json", + "00000000000000000008.json", + "00000000000000000007.json", + "00000000000000000006.json", + "00000000000000000005.json", + "00000000000000000004.json", + "00000000000000000003.json", + "00000000000000000002.json", + "00000000000000000001.json", + "00000000000000000000.json", + ]; + expected.sort_unstable(); + let mut paths = manager + .store() + .await + .get_paths(|e| Some(e.name().to_string())) + .await + .unwrap(); + paths.sort_unstable(); + assert_eq!(expected, paths); +} + +#[tokio::test] +async fn manager_with_checkpoint_distance_1() { + common_telemetry::init_default_ut_logging(); + let (env, manager) = build_manager(1, CompressionType::Uncompressed).await; + + // apply 10 actions + for i in 0..10 { + manager.update(nop_action()).await.unwrap(); + } + + // has checkpoint + assert!(manager + .store() + .await + .load_last_checkpoint() + .await + .unwrap() + .is_some()); + + // check files + let mut expected = vec![ + "00000000000000000009.checkpoint", + "00000000000000000010.json", + "00000000000000000008.checkpoint", + "00000000000000000009.json", + "_last_checkpoint", + ]; + expected.sort_unstable(); + let mut paths = manager + .store() + .await + .get_paths(|e| Some(e.name().to_string())) + .await + .unwrap(); + paths.sort_unstable(); + assert_eq!(expected, paths); + + // check content in `_last_checkpoint` + let raw_bytes = manager + .store() + .await + .read_file(&manager.store().await.last_checkpoint_path()) + .await + .unwrap(); + let raw_json = std::str::from_utf8(&raw_bytes).unwrap(); + let expected_json = "{\"size\":741,\"version\":9,\"checksum\":null,\"extend_metadata\":{}}"; + assert_eq!(expected_json, raw_json); + + // reopen the manager + manager.stop().await.unwrap(); + let manager = reopen_manager(&env, 1, CompressionType::Uncompressed).await; + assert_eq!(10, manager.manifest().await.manifest_version); +} + +#[tokio::test] +async fn checkpoint_with_different_compression_types() { + common_telemetry::init_default_ut_logging(); + + let mut actions = vec![]; + for _ in 0..10 { + let file_meta = FileMeta { + region_id: RegionId::new(123, 456), + file_id: FileId::random(), + time_range: (0.into(), 10000000.into()), + level: 0, + file_size: 1024000, + }; + let action = RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit { + files_to_add: vec![file_meta], + files_to_remove: vec![], + compaction_time_window: None, + flushed_sequence: None, + })]); + actions.push(action); + } + + // collect and check all compression types + let mut checkpoints = vec![]; + for compress_type in CompressionType::iter() { + checkpoints + .push(generate_checkpoint_with_compression_types(compress_type, actions.clone()).await); + } + let last = checkpoints.last().unwrap().clone(); + assert!(checkpoints.into_iter().all(|ckpt| last.eq(&ckpt))); +} + +async fn generate_checkpoint_with_compression_types( + compress_type: CompressionType, + actions: Vec, +) -> RegionCheckpoint { + let (env, manager) = build_manager(1, CompressionType::Uncompressed).await; + + for action in actions { + manager.update(action).await.unwrap(); + } + + RegionManifestManagerInner::last_checkpoint(&manager.store().await) + .await + .unwrap() + .unwrap() +} diff --git a/src/mito2/src/manifest/tests/utils.rs b/src/mito2/src/manifest/tests/utils.rs new file mode 100644 index 000000000000..ae6d7df78e84 --- /dev/null +++ b/src/mito2/src/manifest/tests/utils.rs @@ -0,0 +1,50 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::ColumnSchema; +use store_api::storage::RegionId; + +use crate::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder, SemanticType}; + +/// Build a basic region metadata for testing. +/// It contains three columns: +/// - ts: timestamp millisecond, semantic type: `Timestamp`, column id: 45 +/// - pk: string, semantic type: `Tag`, column id: 36 +/// - val: float64, semantic type: `Field`, column id: 251 +pub fn basic_region_metadata() -> RegionMetadata { + let mut builder = RegionMetadataBuilder::new(RegionId::new(23, 33), 0); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 45, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("pk", ConcreteDataType::string_datatype(), false), + semantic_type: SemanticType::Tag, + column_id: 36, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("val", ConcreteDataType::float64_datatype(), false), + semantic_type: SemanticType::Field, + column_id: 251, + }) + .primary_key(vec![36]); + builder.build().unwrap() +} diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index a4e477952e9d..4071018e74af 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -23,8 +23,7 @@ use store_api::storage::RegionId; use crate::config::MitoConfig; use crate::error::{RegionCorruptedSnafu, RegionNotFoundSnafu, Result}; -use crate::manifest::manager::RegionManifestManager; -use crate::manifest::options::RegionManifestOptions; +use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; use crate::memtable::MemtableBuilderRef; use crate::metadata::RegionMetadata; use crate::region::version::{VersionBuilder, VersionControl}; @@ -80,7 +79,7 @@ impl RegionOpener { manifest_dir: new_manifest_dir(&self.region_dir), object_store: self.object_store, compress_type: config.manifest_compress_type, - checkpoint_interval: config.manifest_checkpoint_interval, + checkpoint_distance: config.manifest_checkpoint_distance, }; // Writes regions to the manifest file. let manifest_manager = RegionManifestManager::new(metadata.clone(), options).await?; @@ -105,7 +104,7 @@ impl RegionOpener { manifest_dir: new_manifest_dir(&self.region_dir), object_store: self.object_store, compress_type: config.manifest_compress_type, - checkpoint_interval: config.manifest_checkpoint_interval, + checkpoint_distance: config.manifest_checkpoint_distance, }; let manifest_manager = RegionManifestManager::open(options) diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 68b411c7ae74..9dc1d5b6fe8d 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -29,8 +29,7 @@ use store_api::storage::RegionId; use crate::config::MitoConfig; use crate::engine::MitoEngine; use crate::error::Result; -use crate::manifest::manager::RegionManifestManager; -use crate::manifest::options::RegionManifestOptions; +use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; use crate::metadata::{ColumnMetadata, RegionMetadataRef, SemanticType}; use crate::request::{CreateRequest, RegionOptions}; use crate::worker::WorkerGroup; @@ -95,7 +94,7 @@ impl TestEnv { pub async fn create_manifest_manager( &self, compress_type: CompressionType, - checkpoint_interval: u64, + checkpoint_distance: u64, initial_metadata: Option, ) -> Result> { let data_home = self.data_home.path(); @@ -116,7 +115,7 @@ impl TestEnv { manifest_dir, object_store, compress_type, - checkpoint_interval, + checkpoint_distance, }; if let Some(metadata) = initial_metadata {