diff --git a/src/storage/allocator/data_portion_allocator.rs b/src/storage/allocator/data_portion_allocator.rs index 756343b..948c617 100644 --- a/src/storage/allocator/data_portion_allocator.rs +++ b/src/storage/allocator/data_portion_allocator.rs @@ -195,7 +195,7 @@ impl DataPortionAllocator { // 現在の実装では `nth(0)` を用いているため、 // フリーリスト内の相異なる部分領域が互いに素であるという前提が必要である。 // ただしこの前提は通常のCannyLSの使用であれば成立する。 - fn is_allocated_portion(&self, portion: &DataPortion) -> bool { + pub(crate) fn is_allocated_portion(&self, portion: &DataPortion) -> bool { let key = EndBasedFreePortion(FreePortion::new(portion.start, 0)); if let Some(next) = self.end_to_free.range((Excluded(&key), Unbounded)).nth(0) { // 終端位置が `portion.start` を超えるfree portionのうち最小のもの `next` については diff --git a/src/storage/builder.rs b/src/storage/builder.rs index 25746e3..11b6e06 100644 --- a/src/storage/builder.rs +++ b/src/storage/builder.rs @@ -23,6 +23,7 @@ pub struct StorageBuilder { instance_uuid: Option, journal: JournalRegionOptions, metrics: MetricBuilder, + safe_release_mode: bool, } impl StorageBuilder { /// 新しい`StorageBuilder`インスタンスを生成する. @@ -32,9 +33,18 @@ impl StorageBuilder { instance_uuid: None, journal: JournalRegionOptions::default(), metrics: MetricBuilder::new(), + safe_release_mode: false, } } + /// 安全にリソースを解放する状態でStorageを作成する。 + /// + /// 安全な解放については [wiki](https://github.com/frugalos/cannyls/wiki/Safe-Release-Mode) を参考のこと。 + pub fn enable_safe_release_mode(&mut self) -> &mut Self { + self.safe_release_mode = true; + self + } + /// ストレージインスタンスを識別するためのUUIDを設定する. /// /// ストレージの作成時とオープン時で、指定した値の使われ方が異なる: @@ -199,7 +209,8 @@ impl StorageBuilder { ))?; // データ領域を準備 - let data_region = DataRegion::new(&self.metrics, allocator, data_nvm); + let mut data_region = DataRegion::new(&self.metrics, allocator, data_nvm); + data_region.enable_safe_release_mode(self.safe_release_mode); let metrics = StorageMetrics::new( &self.metrics, diff --git a/src/storage/data_region.rs b/src/storage/data_region.rs index 8cf3752..e8f5f72 100644 --- a/src/storage/data_region.rs +++ b/src/storage/data_region.rs @@ -19,6 +19,12 @@ pub struct DataRegion { nvm: N, block_size: BlockSize, metrics: DataRegionMetrics, + safe_release_mode: bool, + + // 削除処理によってlump indexから外されたが + // まだアロケータに通知して解放することができない + // portionたちのバッファ + pending_portions: Vec, } impl DataRegion where @@ -34,7 +40,31 @@ where nvm, block_size, metrics: DataRegionMetrics::new(metric_builder, capacity, allocator_metrics), + safe_release_mode: false, + pending_portions: Vec::new(), + } + } + + /// 安全にリソースを解放するモードに移行するためのメソッド。 + /// * `true`を渡すと、安全な解放モードに入る。 + /// * 安全な解放については [wiki](https://github.com/frugalos/cannyls/wiki/Safe-Release-Mode) を参考のこと。 + /// * `false`を渡すと、従来の解放モードに入る。 + /// * このモードでは、削除されたデータポーションが即座にアロケータから解放される。 + /// * [issue28](https://github.com/frugalos/cannyls/issues28)があるため安全ではない。 + pub fn enable_safe_release_mode(&mut self, enabling: bool) { + if !enabling && !self.pending_portions.is_empty() { + // 削除対象ポーションが存在する状況で即時削除モードに切り替える場合は + // この段階で削除を行う + self.release_pending_portions(); } + self.safe_release_mode = enabling; + } + + /// 安全にリソースを解放するモードに入っているかどうかを返す。 + /// * `true`なら安全な解放モード + /// * `false`なら従来の解放モード + pub fn is_in_safe_release_mode(&self) -> bool { + self.safe_release_mode } /// データ領域のメトリクスを返す. @@ -89,9 +119,39 @@ where /// `portion`で未割当の領域が指定された場合には、 /// 現在の実行スレッドがパニックする. pub fn delete(&mut self, portion: DataPortion) { + if self.safe_release_mode { + self.pending_portions.push(portion); + } else { + self.allocator.release(portion); + } + } + + /// 指定された領域に格納されているデータを削除する. + /// + /// deleteと似ているが、こちらは安全解放モードでもpendingせず解放する。 + /// # パニック + /// + /// `portion`で未割当の領域が指定された場合には、 + /// 現在の実行スレッドがパニックする. + pub fn release_portion(&mut self, portion: DataPortion) { self.allocator.release(portion); } + /// 解放を遅延させているデータポーションをアロケータに送ることで全て解放する。 + /// + /// # 安全解放モードで呼び出す前提条件 + /// 解放されるデータポーションに対応する削除レコードが、全て永続化されていること。 + /// + /// # メモ + /// 通常の解放モードで呼び出しても効果はない。 + pub fn release_pending_portions(&mut self) { + if self.safe_release_mode { + for p in ::std::mem::replace(&mut self.pending_portions, Vec::new()) { + self.allocator.release(p); + } + } + } + /// 部分領域の単位をブロックからバイトに変換する. fn real_portion(&self, portion: &DataPortion) -> (u64, usize) { let offset = portion.start.as_u64() * u64::from(self.block_size.as_u16()); @@ -103,6 +163,11 @@ where fn block_count(&self, size: u32) -> u32 { (size + u32::from(self.block_size.as_u16()) - 1) / u32::from(self.block_size.as_u16()) } + + #[cfg(test)] + pub fn is_allocated_portion(&self, portion: &DataPortion) -> bool { + self.allocator.is_allocated_portion(portion) + } } #[derive(Debug, Clone)] @@ -173,17 +238,23 @@ mod tests { use metrics::DataAllocatorMetrics; use nvm::MemoryNvm; + macro_rules! make_data_region_on_memory { + ($capacity:expr, $block_size:expr) => {{ + let metrics = MetricBuilder::new(); + let allocator = track!(DataPortionAllocator::build( + DataAllocatorMetrics::new(&metrics, $capacity, $block_size), + iter::empty(), + ))?; + let nvm = MemoryNvm::new(vec![0; $capacity as usize]); + DataRegion::new(&metrics, allocator, nvm) + }}; + } + #[test] fn data_region_works() -> TestResult { let capacity = 10 * 1024; let block_size = BlockSize::min(); - let metrics = MetricBuilder::new(); - let allocator = track!(DataPortionAllocator::build( - DataAllocatorMetrics::new(&metrics, capacity, block_size), - iter::empty(), - ))?; - let nvm = MemoryNvm::new(vec![0; capacity as usize]); - let mut region = DataRegion::new(&metrics, allocator, nvm); + let mut region = make_data_region_on_memory!(capacity, block_size); // put let mut data = DataRegionLumpData::new(3, block_size); @@ -197,4 +268,53 @@ mod tests { ); Ok(()) } + + #[test] + fn enabling_and_confirming_safe_release_mode_work() -> TestResult { + let capacity = 10 * 1024; + let block_size = BlockSize::min(); + let mut region = make_data_region_on_memory!(capacity, block_size); + + // デフォルトでは安全解放モードを使わない。 + assert_eq!(region.is_in_safe_release_mode(), false); + + region.enable_safe_release_mode(true); + assert_eq!(region.is_in_safe_release_mode(), true); + + Ok(()) + } + + #[test] + fn delayed_releasing_works() -> TestResult { + let capacity = 10 * 1024; + let block_size = BlockSize::min(); + let mut region = make_data_region_on_memory!(capacity, block_size); + + region.enable_safe_release_mode(true); + + // put + let mut data = DataRegionLumpData::new(3, block_size); + data.as_bytes_mut().copy_from_slice("foo".as_bytes()); + let portion = track!(region.put(&data))?; + + // get + assert_eq!( + region.get(portion).ok().map(|d| d.as_bytes().to_owned()), + Some("foo".as_bytes().to_owned()) + ); + + region.delete(portion.clone()); + + // まだ解放されていない。 + assert_eq!(region.allocator.is_allocated_portion(&portion), true); + + assert_eq!(region.pending_portions.len(), 1); + + region.release_pending_portions(); + + // 解放された。 + assert_eq!(region.allocator.is_allocated_portion(&portion), false); + + Ok(()) + } } diff --git a/src/storage/index.rs b/src/storage/index.rs index 05cbae9..8e77e31 100644 --- a/src/storage/index.rs +++ b/src/storage/index.rs @@ -38,7 +38,7 @@ impl LumpIndex { /// /// 結果は昇順にソートされている. pub fn remove(&mut self, lump_id: &LumpId) -> Option { - self.map.remove(lump_id).map(|p| p.into()) + self.map.remove(lump_id).map(std::convert::Into::into) } /// 登録されているlumpのID一覧を返す. diff --git a/src/storage/journal/region.rs b/src/storage/journal/region.rs index fd179e3..e75f6ac 100644 --- a/src/storage/journal/region.rs +++ b/src/storage/journal/region.rs @@ -44,6 +44,11 @@ impl JournalRegion where N: NonVolatileMemory, { + #[cfg(test)] + pub fn options(&self) -> &JournalRegionOptions { + &self.options + } + pub fn journal_entries(&mut self) -> Result<(u64, u64, u64, Vec)> { self.ring_buffer.journal_entries() } @@ -167,6 +172,20 @@ where &self.metrics } + /// ジャーナルバッファが同期され永続化された直後の状態かどうかを返す。 + /// + /// この状態は、厳密には次を意味する。 + /// 1. 現時点までに以下四種類の操作でバッファに書き込まれたジャーナルエントリは + /// 全てDiskに同期されている。 + /// (a) records_put + /// (b) records_embed + /// (c) records_delete + /// (d) records_delete_range + /// 2. ジャーナルバッファは空である。 + pub fn is_just_synced(&self) -> bool { + self.sync_countdown == self.options.sync_interval + } + /// GC処理を一単位実行する. fn gc_once(&mut self, index: &mut LumpIndex) -> Result<()> { if self.gc_queue.is_empty() && self.ring_buffer.capacity() < self.ring_buffer.usage() * 2 { @@ -248,10 +267,11 @@ where if self.gc_after_append { track!(self.gc_once(index))?; // レコード追記に合わせてGCを一単位行うことでコストを償却する } - track!(self.try_sync())?; Ok(()) } + // ジャーナルキュー(のバッファ)にエントリを追加する。 + // 十分なエントリが溜まっていれば同期を行う。 fn append_record(&mut self, index: &mut LumpIndex, record: &JournalRecord) -> Result<()> where B: AsRef<[u8]>, @@ -260,14 +280,16 @@ where if let Some((lump_id, portion)) = embedded { index.insert(lump_id, Portion::Journal(portion)); } + track!(self.try_sync())?; Ok(()) } fn try_sync(&mut self) -> Result<()> { + assert!(self.sync_countdown >= 1); + self.sync_countdown -= 1; + if self.sync_countdown == 0 { track!(self.sync())?; - } else { - self.sync_countdown -= 1; } Ok(()) } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index eb71e3f..0501232 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -88,6 +88,20 @@ impl Storage where N: NonVolatileMemory, { + /// 安全にリソースを解放する状態でStorageを作成する。 + /// + /// 安全な解放については [wiki](https://github.com/frugalos/cannyls/wiki/Safe-Release-Mode) を参考のこと。 + pub fn enable_safe_release_mode(&mut self, enabling: bool) { + self.data_region.enable_safe_release_mode(enabling); + } + + /// 安全にリソースを解放するモードに入っているかどうかを返す。 + /// * `true`なら安全な解放モード + /// * `false`なら従来の解放モード + pub fn is_in_safe_release_mode(&self) -> bool { + self.data_region.is_in_safe_release_mode() + } + pub(crate) fn new( header: StorageHeader, journal_region: JournalRegion, @@ -176,6 +190,18 @@ where self.lump_index.list_range(range) } + /// ジャーナル領域が直前の操作に伴って同期済みであるならば + /// 遅延させている削除済みポーションを全て解放する。 + /// + /// syncは実際にはバッファのflushとディスク同期の両方を行うため + /// is_just_synced() == trueならば + /// この時点までの全てのジャーナルレコードが同期済みである。 + fn release_pending_portions(&mut self) { + if self.journal_region.is_just_synced() { + self.data_region.release_pending_portions(); + } + } + /// lumpを保存する. /// /// 既に同じIDのlumpが存在する場合にはデータが上書きされる. @@ -211,6 +237,10 @@ where } } self.metrics.put_lumps_at_running.increment(); + + if self.is_in_safe_release_mode() { + self.release_pending_portions(); + } Ok(!updated) } @@ -224,7 +254,11 @@ where /// 不整合ないしI/O周りで致命的な問題が発生している可能性があるので、 /// 以後はこのインスタンスの使用を中止するのが望ましい. pub fn delete(&mut self, lump_id: &LumpId) -> Result { - track!(self.delete_if_exists(lump_id, true)) + let result = track!(self.delete_if_exists(lump_id, true))?; + if self.is_in_safe_release_mode() { + self.release_pending_portions(); + } + Ok(result) } /// LumpIdのrange [start..end) を用いて、これに含まれるLumpIdを全て削除する。 @@ -248,9 +282,9 @@ where // ジャーナル領域に範囲削除レコードを一つ書き込むため、一度のディスクアクセスが起こる。 // 削除レコードを範囲分書き込むわけ *ではない* ため、複数回のディスクアクセスは発生しない。 track!(self - .journal_region - .records_delete_range(&mut self.lump_index, range))?; - + .journal_region + .records_delete_range(&mut self.lump_index, range))?; + for lump_id in &targets { if let Some(portion) = self.lump_index.remove(lump_id) { self.metrics.delete_lumps.increment(); @@ -259,10 +293,15 @@ where // DataRegion::deleteはメモリアロケータに対する解放要求をするのみで // ディスクにアクセスすることはない。 // (管理領域から外すだけで、例えばディスク上の値を0クリアするようなことはない) - self.data_region.delete(portion); + self.data_region.release_portion(portion); } } - } + } + + if self.is_in_safe_release_mode() { + track!(self.journal_sync())?; + self.release_pending_portions(); + } Ok(targets) } @@ -846,4 +885,64 @@ mod tests { Ok(()) } + + #[test] + fn safe_releasing_works() -> TestResult { + let dir = track_io!(TempDir::new("cannyls_test"))?; + + let nvm = track!(FileNvm::create( + dir.path().join("test.lusf"), + BlockSize::min().ceil_align(512 * 0x1000 * 10) + ))?; + let mut storage = track!(StorageBuilder::new() + .journal_region_ratio(0.5) + .enable_safe_release_mode() + .create(nvm))?; + assert!(storage.is_in_safe_release_mode()); + + assert_eq!(storage.journal_region.options().sync_interval, 0x1000); + + let mut portions = Vec::new(); + + for i in 0..(0x1000 / 2 - 1) { + track!(storage.put(&LumpId::new(i), &zeroed_data(42)))?; + let portion = storage + .lump_index + .get(&LumpId::new(i)) + .expect("should succeed"); + portions.push(portion.clone()); + track!(storage.delete(&LumpId::new(i)))?; + } + track!(storage.put(&LumpId::new(10000), &zeroed_data(42)))?; + let portion = storage + .lump_index + .get(&LumpId::new(10000)) + .expect("should succeed"); + portions.push(portion.clone()); + + for p in &portions { + if let Portion::Data(portion) = p { + assert!(storage.data_region.is_allocated_portion(portion)); + } else { + unreachable!("since we've put unembedded lump data"); + } + } + + assert_eq!(storage.journal_region.is_just_synced(), false); + + // このDeleteによって0x1000個目のエントリが書き込まれるため + // ジャーナル領域の同期が行われる。 + track!(storage.delete(&LumpId::new(10000)))?; + assert_eq!(storage.journal_region.is_just_synced(), true); + + for p in &portions { + if let Portion::Data(portion) = p { + assert!(!storage.data_region.is_allocated_portion(portion)); + } else { + unreachable!("since we've put unembedded lump data"); + } + } + + Ok(()) + } }