Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue28のための新しいパッチ #34

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/storage/allocator/data_portion_allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ impl DataPortionAllocator {
// 現在の実装では `nth(0)` を用いているため、
// フリーリスト内の相異なる部分領域が互いに素であるという前提が必要である。
// ただしこの前提は通常のCannyLSの使用であれば成立する。
fn is_allocated_portion(&self, portion: &DataPortion) -> bool {
pub(crate) fn is_allocated_portion(&self, portion: &DataPortion) -> bool {
let key = EndBasedFreePortion(FreePortion::new(portion.start, 0));
if let Some(next) = self.end_to_free.range((Excluded(&key), Unbounded)).nth(0) {
// 終端位置が `portion.start` を超えるfree portionのうち最小のもの `next` については
Expand Down
13 changes: 12 additions & 1 deletion src/storage/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub struct StorageBuilder {
instance_uuid: Option<Uuid>,
journal: JournalRegionOptions,
metrics: MetricBuilder,
safe_release_mode: bool,
}
impl StorageBuilder {
/// 新しい`StorageBuilder`インスタンスを生成する.
Expand All @@ -32,9 +33,18 @@ impl StorageBuilder {
instance_uuid: None,
journal: JournalRegionOptions::default(),
metrics: MetricBuilder::new(),
safe_release_mode: false,
}
}

/// 安全にリソースを解放する状態でStorageを作成する。
///
/// 安全な解放については [wiki](https://github.com/frugalos/cannyls/wiki/Safe-Release-Mode) を参考のこと。
pub fn enable_safe_release_mode(&mut self) -> &mut Self {
self.safe_release_mode = true;
self
}

/// ストレージインスタンスを識別するためのUUIDを設定する.
///
/// ストレージの作成時とオープン時で、指定した値の使われ方が異なる:
Expand Down Expand Up @@ -199,7 +209,8 @@ impl StorageBuilder {
))?;

// データ領域を準備
let data_region = DataRegion::new(&self.metrics, allocator, data_nvm);
let mut data_region = DataRegion::new(&self.metrics, allocator, data_nvm);
data_region.enable_safe_release_mode(self.safe_release_mode);

let metrics = StorageMetrics::new(
&self.metrics,
Expand Down
134 changes: 127 additions & 7 deletions src/storage/data_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ pub struct DataRegion<N> {
nvm: N,
block_size: BlockSize,
metrics: DataRegionMetrics,
safe_release_mode: bool,

// 削除処理によってlump indexから外されたが
// まだアロケータに通知して解放することができない
// portionたちのバッファ
pending_portions: Vec<DataPortion>,
}
impl<N> DataRegion<N>
where
Expand All @@ -34,7 +40,31 @@ where
nvm,
block_size,
metrics: DataRegionMetrics::new(metric_builder, capacity, allocator_metrics),
safe_release_mode: false,
pending_portions: Vec::new(),
}
}

/// 安全にリソースを解放するモードに移行するためのメソッド。
/// * `true`を渡すと、安全な解放モードに入る。
/// * 安全な解放については [wiki](https://github.com/frugalos/cannyls/wiki/Safe-Release-Mode) を参考のこと。
/// * `false`を渡すと、従来の解放モードに入る。
/// * このモードでは、削除されたデータポーションが即座にアロケータから解放される。
/// * [issue28](https://github.com/frugalos/cannyls/issues28)があるため安全ではない。
pub fn enable_safe_release_mode(&mut self, enabling: bool) {
if !enabling && !self.pending_portions.is_empty() {
// 削除対象ポーションが存在する状況で即時削除モードに切り替える場合は
// この段階で削除を行う
self.release_pending_portions();
}
self.safe_release_mode = enabling;
}

/// 安全にリソースを解放するモードに入っているかどうかを返す。
/// * `true`なら安全な解放モード
/// * `false`なら従来の解放モード
pub fn is_in_safe_release_mode(&self) -> bool {
self.safe_release_mode
}

/// データ領域のメトリクスを返す.
Expand Down Expand Up @@ -89,9 +119,39 @@ where
/// `portion`で未割当の領域が指定された場合には、
/// 現在の実行スレッドがパニックする.
pub fn delete(&mut self, portion: DataPortion) {
if self.safe_release_mode {
self.pending_portions.push(portion);
} else {
self.allocator.release(portion);
}
}

/// 指定された領域に格納されているデータを削除する.
///
/// deleteと似ているが、こちらは安全解放モードでもpendingせず解放する。
/// # パニック
///
/// `portion`で未割当の領域が指定された場合には、
/// 現在の実行スレッドがパニックする.
pub fn release_portion(&mut self, portion: DataPortion) {
self.allocator.release(portion);
}

/// 解放を遅延させているデータポーションをアロケータに送ることで全て解放する。
///
/// # 安全解放モードで呼び出す前提条件
/// 解放されるデータポーションに対応する削除レコードが、全て永続化されていること。
///
/// # メモ
/// 通常の解放モードで呼び出しても効果はない。
pub fn release_pending_portions(&mut self) {
if self.safe_release_mode {
for p in ::std::mem::replace(&mut self.pending_portions, Vec::new()) {
self.allocator.release(p);
}
}
}

/// 部分領域の単位をブロックからバイトに変換する.
fn real_portion(&self, portion: &DataPortion) -> (u64, usize) {
let offset = portion.start.as_u64() * u64::from(self.block_size.as_u16());
Expand All @@ -103,6 +163,11 @@ where
fn block_count(&self, size: u32) -> u32 {
(size + u32::from(self.block_size.as_u16()) - 1) / u32::from(self.block_size.as_u16())
}

#[cfg(test)]
pub fn is_allocated_portion(&self, portion: &DataPortion) -> bool {
self.allocator.is_allocated_portion(portion)
}
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -173,17 +238,23 @@ mod tests {
use metrics::DataAllocatorMetrics;
use nvm::MemoryNvm;

macro_rules! make_data_region_on_memory {
($capacity:expr, $block_size:expr) => {{
let metrics = MetricBuilder::new();
let allocator = track!(DataPortionAllocator::build(
DataAllocatorMetrics::new(&metrics, $capacity, $block_size),
iter::empty(),
))?;
let nvm = MemoryNvm::new(vec![0; $capacity as usize]);
DataRegion::new(&metrics, allocator, nvm)
}};
}

#[test]
fn data_region_works() -> TestResult {
let capacity = 10 * 1024;
let block_size = BlockSize::min();
let metrics = MetricBuilder::new();
let allocator = track!(DataPortionAllocator::build(
DataAllocatorMetrics::new(&metrics, capacity, block_size),
iter::empty(),
))?;
let nvm = MemoryNvm::new(vec![0; capacity as usize]);
let mut region = DataRegion::new(&metrics, allocator, nvm);
let mut region = make_data_region_on_memory!(capacity, block_size);

// put
let mut data = DataRegionLumpData::new(3, block_size);
Expand All @@ -197,4 +268,53 @@ mod tests {
);
Ok(())
}

#[test]
fn enabling_and_confirming_safe_release_mode_work() -> TestResult {
let capacity = 10 * 1024;
let block_size = BlockSize::min();
let mut region = make_data_region_on_memory!(capacity, block_size);

// デフォルトでは安全解放モードを使わない。
assert_eq!(region.is_in_safe_release_mode(), false);

region.enable_safe_release_mode(true);
assert_eq!(region.is_in_safe_release_mode(), true);

Ok(())
}

#[test]
fn delayed_releasing_works() -> TestResult {
let capacity = 10 * 1024;
let block_size = BlockSize::min();
let mut region = make_data_region_on_memory!(capacity, block_size);

region.enable_safe_release_mode(true);

// put
let mut data = DataRegionLumpData::new(3, block_size);
data.as_bytes_mut().copy_from_slice("foo".as_bytes());
let portion = track!(region.put(&data))?;

// get
assert_eq!(
region.get(portion).ok().map(|d| d.as_bytes().to_owned()),
Some("foo".as_bytes().to_owned())
);

region.delete(portion.clone());

// まだ解放されていない。
assert_eq!(region.allocator.is_allocated_portion(&portion), true);

assert_eq!(region.pending_portions.len(), 1);

region.release_pending_portions();

// 解放された。
assert_eq!(region.allocator.is_allocated_portion(&portion), false);

Ok(())
}
}
2 changes: 1 addition & 1 deletion src/storage/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl LumpIndex {
///
/// 結果は昇順にソートされている.
pub fn remove(&mut self, lump_id: &LumpId) -> Option<Portion> {
self.map.remove(lump_id).map(|p| p.into())
self.map.remove(lump_id).map(std::convert::Into::into)
}

/// 登録されているlumpのID一覧を返す.
Expand Down
28 changes: 25 additions & 3 deletions src/storage/journal/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ impl<N> JournalRegion<N>
where
N: NonVolatileMemory,
{
#[cfg(test)]
pub fn options(&self) -> &JournalRegionOptions {
&self.options
}

pub fn journal_entries(&mut self) -> Result<(u64, u64, u64, Vec<JournalEntry>)> {
self.ring_buffer.journal_entries()
}
Expand Down Expand Up @@ -167,6 +172,20 @@ where
&self.metrics
}

/// ジャーナルバッファが同期され永続化された直後の状態かどうかを返す。
///
/// この状態は、厳密には次を意味する。
/// 1. 現時点までに以下四種類の操作でバッファに書き込まれたジャーナルエントリは
/// 全てDiskに同期されている。
/// (a) records_put
/// (b) records_embed
/// (c) records_delete
/// (d) records_delete_range
/// 2. ジャーナルバッファは空である。
pub fn is_just_synced(&self) -> bool {
self.sync_countdown == self.options.sync_interval
}

/// GC処理を一単位実行する.
fn gc_once(&mut self, index: &mut LumpIndex) -> Result<()> {
if self.gc_queue.is_empty() && self.ring_buffer.capacity() < self.ring_buffer.usage() * 2 {
Expand Down Expand Up @@ -248,10 +267,11 @@ where
if self.gc_after_append {
track!(self.gc_once(index))?; // レコード追記に合わせてGCを一単位行うことでコストを償却する
}
track!(self.try_sync())?;
Ok(())
}

// ジャーナルキュー(のバッファ)にエントリを追加する。
// 十分なエントリが溜まっていれば同期を行う。
fn append_record<B>(&mut self, index: &mut LumpIndex, record: &JournalRecord<B>) -> Result<()>
where
B: AsRef<[u8]>,
Expand All @@ -260,14 +280,16 @@ where
if let Some((lump_id, portion)) = embedded {
index.insert(lump_id, Portion::Journal(portion));
}
track!(self.try_sync())?;
Ok(())
}

fn try_sync(&mut self) -> Result<()> {
assert!(self.sync_countdown >= 1);
self.sync_countdown -= 1;

if self.sync_countdown == 0 {
track!(self.sync())?;
} else {
self.sync_countdown -= 1;
}
Ok(())
}
Expand Down
Loading