Skip to content

Commit

Permalink
issue27のパッチ
Browse files Browse the repository at this point in the history
  • Loading branch information
yuezato committed Sep 30, 2019
1 parent 8759940 commit 83556e3
Show file tree
Hide file tree
Showing 8 changed files with 209 additions and 26 deletions.
2 changes: 1 addition & 1 deletion src/nvm/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ mod tests {
let mut parent = dir.as_ref();
while let Some(p) = parent.parent() {
parent = p;
};
}
assert!(create_parent_directories(parent).is_ok());
Ok(())
}
Expand Down
25 changes: 24 additions & 1 deletion src/storage/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,29 @@ impl StorageBuilder {
self
}

/// ジャーナルバッファをdiskにflushする際に、
/// atomicにdiskに永続化されるように安全な手順で書き出す。
///
/// これにより、
/// ジャーナルバッファを書き出している途中でプロセスがクラッシュしても
/// 再起動後には書き出し直前の状態に戻ることができる。
pub fn journal_safe_flush(&mut self, safe_flush: bool) -> &mut Self {
self.journal.buffer_options.safe_flush = safe_flush;
self
}

/// enqueue時にGoToFrontでEndOfRecordsを上書きする必要がある際は、
/// 先に新しいレコードとEndOfRecordsを書き込んでから、
/// その後にGoToFrontを書き込むようにする。
///
/// GoToFrontを書き込んでからジャーナル領域先頭にseekするという
/// 比較的長い処理をしている間にプロセスがクラッシュしても、
/// 再起動後には直前の状態の戻ることができる。
pub fn journal_safe_enqueue(&mut self, safe_enqueue: bool) -> &mut Self {
self.journal.buffer_options.safe_enqueue = safe_enqueue;
self
}

/// メトリクス用の共通設定を登録する.
///
/// デフォルト値は`MetricBuilder::new()`.
Expand Down Expand Up @@ -175,7 +198,7 @@ impl StorageBuilder {
header.block_size.contains(nvm.block_size()),
ErrorKind::InvalidInput
);
let mut journal_options = self.journal.clone();
let mut journal_options = self.journal;
journal_options.block_size = header.block_size;

// UUIDをチェック
Expand Down
2 changes: 1 addition & 1 deletion src/storage/journal/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pub use self::header::{JournalHeader, JournalHeaderRegion};
pub use self::nvm_buffer::JournalNvmBuffer;
pub use self::options::JournalRegionOptions;
pub use self::options::{JournalBufferOptions, JournalRegionOptions};
pub use self::record::{JournalEntry, JournalRecord};
pub use self::region::JournalRegion;

Expand Down
64 changes: 60 additions & 4 deletions src/storage/journal/nvm_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ pub struct JournalNvmBuffer<N: NonVolatileMemory> {
// ジャーナル領域が発行した読み込み要求を、
// 内部NVMのブロック境界に合うようにアライメントするために使用される。
read_buf: AlignedBytes,

// バッファの安全なflushを行うかどうかを意味するフラグ
//
// trueの場合は、バッファの先頭から512バイト以降を書き出してsyncした後に、
// 先頭から512バイトをatomicに書き出す。
//
// falseの場合は、バッファ全体をdiskに向けて単にflushする
safe_flush: bool,
}
impl<N: NonVolatileMemory> JournalNvmBuffer<N> {
/// 新しい`JournalNvmBuffer`インスタンスを生成する.
Expand All @@ -65,7 +73,7 @@ impl<N: NonVolatileMemory> JournalNvmBuffer<N> {
///
/// ただし、シーク時には、シーク地点を含まない次のブロック境界までのデータは
/// 上書きされてしまうので注意が必要.
pub fn new(nvm: N) -> Self {
pub fn new(nvm: N, safe_flush: bool) -> Self {
let block_size = nvm.block_size();
JournalNvmBuffer {
inner: nvm,
Expand All @@ -74,6 +82,7 @@ impl<N: NonVolatileMemory> JournalNvmBuffer<N> {
write_buf_offset: 0,
write_buf: AlignedBytes::new(0, block_size),
read_buf: AlignedBytes::new(0, block_size),
safe_flush,
}
}

Expand All @@ -95,13 +104,60 @@ impl<N: NonVolatileMemory> JournalNvmBuffer<N> {
}
}

/*
* バッファの内容をdiskに書き出す。
*
* 関数名が表すように、flushを意図したものであってdiskへの同期までは考慮していない。
* ただし、safe_syncがtrueの場合は、書き出し順をコントロールするために、
* 内部的にパラメタ化されているNVMのsyncメソッドを呼び出すことになる。
* ただしその場合でも、実装の都合により、メモリバッファ全体がdiskへ永続化されるとは限らない。
*/
fn flush_write_buf(&mut self) -> Result<()> {
if self.write_buf.is_empty() || !self.maybe_dirty {
return Ok(());
}

track_io!(self.inner.seek(SeekFrom::Start(self.write_buf_offset)))?;
track_io!(self.inner.write(&self.write_buf))?;
if self.safe_flush {
/*
* issue 27(https://github.com/frugalos/cannyls/issues/27)を考慮した
* 順序づいた書き込みを行う。
*
* ここで順序とは次を意味する
* 1. 書き込みバッファの512バイト以降を全て書き出す。
* 2. Diskへの同期命令を発行する。
* 3. 書き込みバッファの先頭512バイトを書き出す。
*
* 3.のステップで既存のEORを上書きするため、
* これを最後に行うことにより、DiskからEORが消えた状態になることを避ける。
*
* パフォーマンスに関する問題点:
* a. Diskへの同期命令はミリセカンド単位でのブロックを生じる。
* b. ステップ3でシークが生じるのでシーケンシャルwriteでなくなってしまう。
*
* 先頭512バイトについて:
* 先頭は512*nバイトであれば、DIRECT_IOとの兼ね合いとしては問題がない。
* ただし、「多くのHDDについては」512バイト=セクタサイズであり
* 先頭部分の書き出しがatomicな書き出しになるという利点がある。
* ただし用いているファイルシステムの実装によっては
* 実際には512バイトが分断されて書き出される可能性もあり、常にこの利点を享受できるとは限らない。
*/
let buf: &[u8] = &self.write_buf;
assert!(buf.as_ptr() as usize % 512 == 0);
assert!(buf.len() % 512 == 0);
if buf.len() > 512 {
track_io!(self
.inner
.seek(SeekFrom::Start(self.write_buf_offset + 512)))?;
track_io!(self.inner.write(&buf[512..]))?;
track!(self.inner.sync())?;
}
track_io!(self.inner.seek(SeekFrom::Start(self.write_buf_offset)))?;
track_io!(self.inner.write(&buf[..512]))?;
} else {
track_io!(self.inner.seek(SeekFrom::Start(self.write_buf_offset)))?;
track_io!(self.inner.write(&self.write_buf))?;
}

if self.write_buf.len() > self.block_size().as_u16() as usize {
// このif節では、
// バッファに末端のalignmentバイト分(= new_len)の情報を残す。
Expand Down Expand Up @@ -366,6 +422,6 @@ mod tests {

fn new_buffer() -> JournalNvmBuffer<MemoryNvm> {
let nvm = MemoryNvm::new(vec![0; 10 * 1024]);
JournalNvmBuffer::new(nvm)
JournalNvmBuffer::new(nvm, false)
}
}
18 changes: 17 additions & 1 deletion src/storage/journal/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,34 @@ use block::BlockSize;
/// ジャーナル領域の挙動を調整するためのパラメータ群.
///
/// 各オプションの説明は`StorageBuilder'のドキュメントを参照のこと.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Copy)]
pub struct JournalRegionOptions {
pub gc_queue_size: usize,
pub sync_interval: usize,
pub block_size: BlockSize,
pub buffer_options: JournalBufferOptions,
}
impl Default for JournalRegionOptions {
fn default() -> Self {
JournalRegionOptions {
gc_queue_size: 0x1000,
sync_interval: 0x1000,
block_size: BlockSize::min(),
buffer_options: Default::default(),
}
}
}

#[derive(Debug, Clone, Copy)]
pub struct JournalBufferOptions {
pub safe_flush: bool,
pub safe_enqueue: bool,
}
impl Default for JournalBufferOptions {
fn default() -> Self {
JournalBufferOptions {
safe_flush: false,
safe_enqueue: false,
}
}
}
8 changes: 6 additions & 2 deletions src/storage/journal/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,12 @@ where

let mut header_region = JournalHeaderRegion::new(header_nvm, block_size);
let header = track!(header_region.read_header())?;
let ring_buffer =
JournalRingBuffer::new(ring_buffer_nvm, header.ring_buffer_head, metric_builder);
let ring_buffer = JournalRingBuffer::new(
ring_buffer_nvm,
header.ring_buffer_head,
options.buffer_options,
metric_builder,
);

let metrics = JournalRegionMetrics::new(metric_builder, ring_buffer.metrics().clone());
let mut journal = JournalRegion {
Expand Down
101 changes: 92 additions & 9 deletions src/storage/journal/ring_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use prometrics::metrics::MetricBuilder;
use std::io::{BufReader, Read, Seek, SeekFrom};

use super::record::{EMBEDDED_DATA_OFFSET, END_OF_RECORDS_SIZE};
use super::{JournalEntry, JournalNvmBuffer, JournalRecord};
use super::{JournalBufferOptions, JournalEntry, JournalNvmBuffer, JournalRecord};
use lump::LumpId;
use metrics::JournalQueueMetrics;
use nvm::NonVolatileMemory;
Expand Down Expand Up @@ -34,6 +34,8 @@ pub struct JournalRingBuffer<N: NonVolatileMemory> {
tail: u64,

metrics: JournalQueueMetrics,

safe_enqueue: bool,
}
impl<N: NonVolatileMemory> JournalRingBuffer<N> {
pub fn head(&self) -> u64 {
Expand All @@ -51,15 +53,21 @@ impl<N: NonVolatileMemory> JournalRingBuffer<N> {
}

/// `JournalRingBuffer`インスタンスを生成する.
pub fn new(nvm: N, head: u64, metric_builder: &MetricBuilder) -> Self {
pub fn new(
nvm: N,
head: u64,
options: JournalBufferOptions,
metric_builder: &MetricBuilder,
) -> Self {
let metrics = JournalQueueMetrics::new(metric_builder);
metrics.capacity_bytes.set(nvm.capacity() as f64);
JournalRingBuffer {
nvm: JournalNvmBuffer::new(nvm),
nvm: JournalNvmBuffer::new(nvm, options.safe_flush),
unreleased_head: head,
head,
tail: head,
metrics,
safe_enqueue: options.safe_enqueue,
}
}

Expand Down Expand Up @@ -108,10 +116,21 @@ impl<N: NonVolatileMemory> JournalRingBuffer<N> {
track!(self.nvm.sync())
}

pub fn enqueue<B: AsRef<[u8]>>(
&mut self,
record: &JournalRecord<B>,
) -> Result<Option<(LumpId, JournalPortion)>> {
if self.safe_enqueue {
self.safe_enqueue(record)
} else {
self.fast_enqueue(record)
}
}

/// レコードをジャーナルの末尾に追記する.
///
/// レコードが`JournalRecord::Embed`だった場合には、データを埋め込んだ位置を結果として返す.
pub fn enqueue<B: AsRef<[u8]>>(
fn fast_enqueue<B: AsRef<[u8]>>(
&mut self,
record: &JournalRecord<B>,
) -> Result<Option<(LumpId, JournalPortion)>> {
Expand Down Expand Up @@ -157,6 +176,70 @@ impl<N: NonVolatileMemory> JournalRingBuffer<N> {
}
}

/// レコードをジャーナルの末尾に、安全に、追記する.
///
/// レコードが`JournalRecord::Embed`だった場合には、データを埋め込んだ位置を結果として返す.
fn safe_enqueue<B: AsRef<[u8]>>(
&mut self,
record: &JournalRecord<B>,
) -> Result<Option<(LumpId, JournalPortion)>> {
// GoToFrontレコードを書き出す場所を覚えるための変数
let mut pos_for_gotofront = None;

// 1. 十分な空き領域が存在するかをチェック
track!(self.check_free_space(record))?;

// 2. リングバッファの終端チェック
if self.will_overflow(record) {
// tail位置からでは空きがないので、先頭に戻って再試行
// 後で先頭から復帰するために場所を覚えておく
pos_for_gotofront = Some(self.tail);

self.metrics
.consumed_bytes_at_running
.add_u64(self.nvm.capacity() - self.tail);

// 先頭に移動した上で
// 再度、十分な空き領域が存在するかをチェック
self.tail = 0;
debug_assert!(!self.will_overflow(record));
track!(self.check_free_space(record))?;
}

// 3. レコードを書き込む
let prev_tail = self.tail;
track_io!(self.nvm.seek(SeekFrom::Start(self.tail)))?;
track!(record.write_to(&mut self.nvm))?;
self.metrics.enqueued_records_at_running.increment(record);

// 4. 終端を示すレコードも書き込む
self.tail = self.nvm.position(); // 次回の追記開始位置を保存 (`EndOfRecords`の直前)
self.metrics
.consumed_bytes_at_running
.add_u64(self.tail - prev_tail);
track!(JournalRecord::EndOfRecords::<[_; 0]>.write_to(&mut self.nvm))?;

// 5. GoToFrontを書き込む必要があれば、一度末尾までジャンプして書き込んだ後に戻ってくる。
// GoToFrontの書き出しを先に行ってしまうと、EndOfRecordsが存在しない状態が永続化される可能性がある。
if let Some(pos_for_gotofront) = pos_for_gotofront {
track!(self.nvm.sync())?; // 新しいEndOfRecordsの書き込みを先に永続化する。
track_io!(self.nvm.seek(SeekFrom::Start(pos_for_gotofront)))?;
track!(JournalRecord::GoToFront::<[_; 0]>.write_to(&mut self.nvm))?; // 古いEndOfRecordsに上書き
track_io!(self.nvm.seek(SeekFrom::Start(self.tail)))?;
}

// 6. 埋め込みPUTの場合には、インデックスに位置情報を返す
if let JournalRecord::Embed(ref lump_id, ref data) = *record {
let portion = JournalPortion {
start: Address::from_u64(prev_tail + EMBEDDED_DATA_OFFSET as u64).unwrap(),
len: data.as_ref().len() as u16,
};
Ok(Some((*lump_id, portion)))
} else {
Ok(None)
}
}

/// リングバッファの先頭からエントリ群を取り出す.
///
/// `EndOfRecords`に到達した時点で走査は終了する.
Expand Down Expand Up @@ -364,7 +447,7 @@ mod tests {
#[test]
fn append_and_read_records() -> TestResult {
let nvm = MemoryNvm::new(vec![0; 1024]);
let mut ring = JournalRingBuffer::new(nvm, 0, &MetricBuilder::new());
let mut ring = JournalRingBuffer::new(nvm, 0, Default::default(), &MetricBuilder::new());

let records = vec![
record_put("000", 30, 5),
Expand Down Expand Up @@ -397,7 +480,7 @@ mod tests {
#[test]
fn read_embedded_data() -> TestResult {
let nvm = MemoryNvm::new(vec![0; 1024]);
let mut ring = JournalRingBuffer::new(nvm, 0, &MetricBuilder::new());
let mut ring = JournalRingBuffer::new(nvm, 0, Default::default(), &MetricBuilder::new());

track!(ring.enqueue(&record_put("000", 30, 5)))?;
track!(ring.enqueue(&record_delete("111")))?;
Expand All @@ -415,7 +498,7 @@ mod tests {
#[test]
fn go_round_ring_buffer() -> TestResult {
let nvm = MemoryNvm::new(vec![0; 1024]);
let mut ring = JournalRingBuffer::new(nvm, 512, &MetricBuilder::new());
let mut ring = JournalRingBuffer::new(nvm, 512, Default::default(), &MetricBuilder::new());
assert_eq!(ring.head, 512);
assert_eq!(ring.tail, 512);

Expand All @@ -433,7 +516,7 @@ mod tests {
#[test]
fn full() -> TestResult {
let nvm = MemoryNvm::new(vec![0; 1024]);
let mut ring = JournalRingBuffer::new(nvm, 0, &MetricBuilder::new());
let mut ring = JournalRingBuffer::new(nvm, 0, Default::default(), &MetricBuilder::new());

let record = record_put("000", 1, 2);
while ring.tail <= 1024 - record.external_size() as u64 {
Expand Down Expand Up @@ -464,7 +547,7 @@ mod tests {
#[test]
fn too_large_record() {
let nvm = MemoryNvm::new(vec![0; 1024]);
let mut ring = JournalRingBuffer::new(nvm, 0, &MetricBuilder::new());
let mut ring = JournalRingBuffer::new(nvm, 0, Default::default(), &MetricBuilder::new());

let record = record_embed("000", &vec![0; 997]);
assert_eq!(record.external_size(), 1020);
Expand Down
Loading

0 comments on commit 83556e3

Please sign in to comment.