Skip to content

Commit

Permalink
hide more file writing details from engine (#269)
Browse files Browse the repository at this point in the history
This is a preparing work for #258.

Changes:
- Move `fsync` to file handle instead of writer, so that we can call it concurrently in the future. In doing so, we have to remove the sync offset tracking and deprecate `bytes_per_sync` support
- Move `prepare_write` into pipe_log. Introduce a trait `ReactiveBytes` for this purpose.
- Rotate the file during write instead of doing it every write group, the timing is also delayed to the next write after exceeding limit (instead of the write that exceeds the limit).
- Differentiate two types of I/O errors: unrecoverable error from fsync, and retriable error from pwrite. Only bubble error for the latter case, panic early for unrecoverable ones.
- Also refactor the purge code, fix two cases where force-compact is not triggered

Signed-off-by: tabokie <[email protected]>
  • Loading branch information
tabokie authored Sep 6, 2022
1 parent 9751c6d commit 618eea7
Show file tree
Hide file tree
Showing 14 changed files with 171 additions and 166 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
* Add metadata deletion capability to `FileSystem` trait. Users can implement `exists_metadata` and `delete_metadata` to clean up obsolete metadata from older versions of Raft Engine.
* Add `Engine::scan_messages` and `Engine::scan_raw_messages` for iterating over written key-values.
* Add `Engine::get` for getting raw value.
* Move `sync` from `env::WriteExt` to `env::Handle`.
* Deprecate `bytes_per_sync`.

### Behavior Changes

Expand Down
18 changes: 9 additions & 9 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,12 @@ pub struct Config {
///
/// Default: "8KB"
pub batch_compression_threshold: ReadableSize,
/// Deprecated.
/// Incrementally sync log files after specified bytes have been written.
/// Setting it to zero disables incremental sync.
///
/// Default: "4MB"
pub bytes_per_sync: ReadableSize,
pub bytes_per_sync: Option<ReadableSize>,

/// Version of the log file.
///
Expand Down Expand Up @@ -101,7 +102,7 @@ impl Default for Config {
recovery_read_block_size: ReadableSize::kb(16),
recovery_threads: 4,
batch_compression_threshold: ReadableSize::kb(8),
bytes_per_sync: ReadableSize::mb(4),
bytes_per_sync: None,
format_version: Version::V2,
target_file_size: ReadableSize::mb(128),
purge_threshold: ReadableSize::gb(10),
Expand All @@ -114,7 +115,6 @@ impl Default for Config {
#[cfg(test)]
{
cfg.memory_limit = Some(ReadableSize(0));
cfg.format_version = Version::V2;
cfg.enable_log_recycle = true;
}
cfg
Expand All @@ -132,8 +132,8 @@ impl Config {
self.target_file_size.0,
)));
}
if self.bytes_per_sync.0 == 0 {
self.bytes_per_sync = ReadableSize(u64::MAX);
if self.bytes_per_sync.is_some() {
warn!("bytes-per-sync has been deprecated.");
}
let min_recovery_read_block_size = ReadableSize(MIN_RECOVERY_READ_BLOCK_SIZE as u64);
if self.recovery_read_block_size < min_recovery_read_block_size {
Expand Down Expand Up @@ -204,14 +204,16 @@ mod tests {
target-file-size = "1MB"
purge-threshold = "3MB"
format-version = 1
enable-log-recycle = false
"#;
let load: Config = toml::from_str(custom).unwrap();
let mut load: Config = toml::from_str(custom).unwrap();
assert_eq!(load.dir, "custom_dir");
assert_eq!(load.recovery_mode, RecoveryMode::TolerateTailCorruption);
assert_eq!(load.bytes_per_sync, ReadableSize::kb(2));
assert_eq!(load.bytes_per_sync, Some(ReadableSize::kb(2)));
assert_eq!(load.target_file_size, ReadableSize::mb(1));
assert_eq!(load.purge_threshold, ReadableSize::mb(3));
assert_eq!(load.format_version, Version::V1);
load.sanitize().unwrap();
}

#[test]
Expand All @@ -226,7 +228,6 @@ mod tests {
let soft_error = r#"
recovery-read-block-size = "1KB"
recovery-threads = 0
bytes-per-sync = "0KB"
target-file-size = "5000MB"
format-version = 2
enable-log-recycle = true
Expand All @@ -236,7 +237,6 @@ mod tests {
soft_sanitized.sanitize().unwrap();
assert!(soft_sanitized.recovery_read_block_size.0 >= MIN_RECOVERY_READ_BLOCK_SIZE as u64);
assert!(soft_sanitized.recovery_threads >= MIN_RECOVERY_THREADS);
assert_eq!(soft_sanitized.bytes_per_sync.0, u64::MAX);
assert_eq!(
soft_sanitized.purge_rewrite_threshold.unwrap(),
soft_sanitized.target_file_size
Expand Down
28 changes: 12 additions & 16 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,12 @@ where
if let Some(mut group) = self.write_barrier.enter(&mut writer) {
let now = Instant::now();
let _t = StopWatch::new_with(&*ENGINE_WRITE_LEADER_DURATION_HISTOGRAM, now);
let file_context = self.pipe_log.fetch_active_file(LogQueue::Append);
for writer in group.iter_mut() {
writer.entered_time = Some(now);
sync |= writer.sync;
let log_batch = writer.mut_payload();
let res = if !log_batch.is_empty() {
log_batch.prepare_write(&file_context)?;
self.pipe_log
.append(LogQueue::Append, log_batch.encoded_bytes())
self.pipe_log.append(LogQueue::Append, log_batch)
} else {
// TODO(tabokie): use Option<FileBlockHandle> instead.
Ok(FileBlockHandle {
Expand All @@ -166,16 +163,11 @@ where
};
writer.set_output(res);
}
debug_assert!(
file_context.id == self.pipe_log.fetch_active_file(LogQueue::Append).id
);
perf_context!(log_write_duration).observe_since(now);
if let Err(e) = self.pipe_log.maybe_sync(LogQueue::Append, sync) {
panic!(
"Cannot sync {:?} queue due to IO error: {}",
LogQueue::Append,
e
);
if sync {
// As per trait protocol, this error should be retriable. But we panic anyway to
// save the trouble of propagating it to other group members.
self.pipe_log.sync(LogQueue::Append).expect("pipe::sync()");
}
// Pass the perf context diff to all the writers.
let diff = get_perf_context();
Expand Down Expand Up @@ -1258,8 +1250,12 @@ mod tests {
check_purge(vec![1, 2, 3]);
}

// 10th, rewrited
check_purge(vec![]);
// 10th, rewritten, but still needs to be compacted.
check_purge(vec![1, 2, 3]);
for rid in 1..=3 {
let memtable = engine.memtables.get(rid).unwrap();
assert_eq!(memtable.read().rewrite_count(), 50);
}

// compact and write some new data to trigger compact again.
for rid in 2..=50 {
Expand Down Expand Up @@ -1454,7 +1450,7 @@ mod tests {
assert_eq!(engine.file_span(LogQueue::Append).0, old_active_file + 1);
let old_active_file = engine.file_span(LogQueue::Rewrite).1;
engine.purge_manager.must_rewrite_rewrite_queue();
assert_eq!(engine.file_span(LogQueue::Rewrite).0, old_active_file + 1);
assert!(engine.file_span(LogQueue::Rewrite).0 > old_active_file);

let engine = engine.reopen();
for rid in 1..=3 {
Expand Down
37 changes: 17 additions & 20 deletions src/env/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,6 @@ impl LogFd {
close(self.0).map_err(|e| from_nix_error(e, "close"))
}

/// Synchronizes all in-memory data of the file except metadata to the
/// filesystem.
pub fn sync(&self) -> IoResult<()> {
fail_point!("log_fd::sync::err", |_| {
Err(from_nix_error(nix::Error::EINVAL, "fp"))
});
#[cfg(target_os = "linux")]
{
nix::unistd::fdatasync(self.0).map_err(|e| from_nix_error(e, "fdatasync"))
}
#[cfg(not(target_os = "linux"))]
{
nix::unistd::fsync(self.0).map_err(|e| from_nix_error(e, "fsync"))
}
}

/// Reads some bytes starting at `offset` from this file into the specified
/// buffer. Returns how many bytes were read.
pub fn read(&self, mut offset: usize, buf: &mut [u8]) -> IoResult<usize> {
Expand Down Expand Up @@ -168,13 +152,15 @@ impl LogFd {
}

impl Handle for LogFd {
#[inline]
fn truncate(&self, offset: usize) -> IoResult<()> {
fail_point!("log_fd::truncate::err", |_| {
Err(from_nix_error(nix::Error::EINVAL, "fp"))
});
ftruncate(self.0, offset as i64).map_err(|e| from_nix_error(e, "ftruncate"))
}

#[inline]
fn file_size(&self) -> IoResult<usize> {
fail_point!("log_fd::file_size::err", |_| {
Err(from_nix_error(nix::Error::EINVAL, "fp"))
Expand All @@ -183,6 +169,21 @@ impl Handle for LogFd {
.map(|n| n as usize)
.map_err(|e| from_nix_error(e, "lseek"))
}

#[inline]
fn sync(&self) -> IoResult<()> {
fail_point!("log_fd::sync::err", |_| {
Err(from_nix_error(nix::Error::EINVAL, "fp"))
});
#[cfg(target_os = "linux")]
{
nix::unistd::fdatasync(self.0).map_err(|e| from_nix_error(e, "fdatasync"))
}
#[cfg(not(target_os = "linux"))]
{
nix::unistd::fsync(self.0).map_err(|e| from_nix_error(e, "fsync"))
}
}
}

impl Drop for LogFd {
Expand Down Expand Up @@ -251,10 +252,6 @@ impl WriteExt for LogFile {
Ok(())
}

fn sync(&mut self) -> IoResult<()> {
self.inner.sync()
}

fn allocate(&mut self, offset: usize, size: usize) -> IoResult<()> {
self.inner.allocate(offset, size)
}
Expand Down
3 changes: 2 additions & 1 deletion src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,12 @@ pub trait Handle {

/// Returns the current size of this file.
fn file_size(&self) -> Result<usize>;

fn sync(&self) -> Result<()>;
}

/// WriteExt is writer extension api
pub trait WriteExt {
fn truncate(&mut self, offset: usize) -> Result<()>;
fn sync(&mut self) -> Result<()>;
fn allocate(&mut self, offset: usize, size: usize) -> Result<()>;
}
4 changes: 0 additions & 4 deletions src/env/obfuscated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,6 @@ impl WriteExt for ObfuscatedWriter {
self.0.truncate(offset)
}

fn sync(&mut self) -> IoResult<()> {
self.0.sync()
}

fn allocate(&mut self, offset: usize, size: usize) -> IoResult<()> {
self.0.allocate(offset, size)
}
Expand Down
17 changes: 4 additions & 13 deletions src/file_pipe_log/log_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ pub(super) fn build_file_writer<F: FileSystem>(

/// Append-only writer for log file. It also handles the file header write.
pub struct LogFileWriter<F: FileSystem> {
handle: Arc<F::Handle>,
writer: F::Writer,
written: usize,
capacity: usize,
last_sync: usize,
}

impl<F: FileSystem> LogFileWriter<F> {
Expand All @@ -52,10 +52,10 @@ impl<F: FileSystem> LogFileWriter<F> {
) -> Result<Self> {
let file_size = handle.file_size()?;
let mut f = Self {
handle,
writer,
written: file_size,
capacity: file_size,
last_sync: file_size,
};
// TODO: add tests for file_size in [header_len, max_encode_len].
if file_size < LogFileFormat::encode_len(format.version) || force_reset {
Expand All @@ -68,7 +68,6 @@ impl<F: FileSystem> LogFileWriter<F> {

fn write_header(&mut self, format: LogFileFormat) -> Result<()> {
self.writer.seek(SeekFrom::Start(0))?;
self.last_sync = 0;
self.written = 0;
let mut buf = Vec::with_capacity(LogFileFormat::encode_len(format.version));
format.encode(&mut buf)?;
Expand Down Expand Up @@ -121,19 +120,11 @@ impl<F: FileSystem> LogFileWriter<F> {
}

pub fn sync(&mut self) -> Result<()> {
if self.last_sync < self.written {
let _t = StopWatch::new(&*LOG_SYNC_DURATION_HISTOGRAM);
self.writer.sync()?;
self.last_sync = self.written;
}
let _t = StopWatch::new(&*LOG_SYNC_DURATION_HISTOGRAM);
self.handle.sync()?;
Ok(())
}

#[inline]
pub fn since_last_sync(&self) -> usize {
self.written - self.last_sync
}

#[inline]
pub fn offset(&self) -> usize {
self.written
Expand Down
Loading

0 comments on commit 618eea7

Please sign in to comment.