Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hide more file writing details from engine #269

Merged
merged 9 commits into from
Sep 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
* Add metadata deletion capability to `FileSystem` trait. Users can implement `exists_metadata` and `delete_metadata` to clean up obsolete metadata from older versions of Raft Engine.
* Add `Engine::scan_messages` and `Engine::scan_raw_messages` for iterating over written key-values.
* Add `Engine::get` for getting raw value.
* Move `sync` from `env::WriteExt` to `env::Handle`.
* Deprecate `bytes_per_sync`.

### Behavior Changes

Expand Down
18 changes: 9 additions & 9 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,12 @@ pub struct Config {
///
/// Default: "8KB"
pub batch_compression_threshold: ReadableSize,
/// Deprecated.
/// Incrementally sync log files after specified bytes have been written.
/// Setting it to zero disables incremental sync.
///
/// Default: "4MB"
pub bytes_per_sync: ReadableSize,
pub bytes_per_sync: Option<ReadableSize>,

/// Version of the log file.
///
Expand Down Expand Up @@ -101,7 +102,7 @@ impl Default for Config {
recovery_read_block_size: ReadableSize::kb(16),
recovery_threads: 4,
batch_compression_threshold: ReadableSize::kb(8),
bytes_per_sync: ReadableSize::mb(4),
bytes_per_sync: None,
format_version: Version::V2,
target_file_size: ReadableSize::mb(128),
purge_threshold: ReadableSize::gb(10),
Expand All @@ -114,7 +115,6 @@ impl Default for Config {
#[cfg(test)]
{
cfg.memory_limit = Some(ReadableSize(0));
cfg.format_version = Version::V2;
cfg.enable_log_recycle = true;
}
cfg
Expand All @@ -132,8 +132,8 @@ impl Config {
self.target_file_size.0,
)));
}
if self.bytes_per_sync.0 == 0 {
self.bytes_per_sync = ReadableSize(u64::MAX);
if self.bytes_per_sync.is_some() {
warn!("bytes-per-sync has been deprecated.");
}
let min_recovery_read_block_size = ReadableSize(MIN_RECOVERY_READ_BLOCK_SIZE as u64);
if self.recovery_read_block_size < min_recovery_read_block_size {
Expand Down Expand Up @@ -204,14 +204,16 @@ mod tests {
target-file-size = "1MB"
purge-threshold = "3MB"
format-version = 1
enable-log-recycle = false
"#;
let load: Config = toml::from_str(custom).unwrap();
let mut load: Config = toml::from_str(custom).unwrap();
assert_eq!(load.dir, "custom_dir");
assert_eq!(load.recovery_mode, RecoveryMode::TolerateTailCorruption);
assert_eq!(load.bytes_per_sync, ReadableSize::kb(2));
assert_eq!(load.bytes_per_sync, Some(ReadableSize::kb(2)));
assert_eq!(load.target_file_size, ReadableSize::mb(1));
assert_eq!(load.purge_threshold, ReadableSize::mb(3));
assert_eq!(load.format_version, Version::V1);
load.sanitize().unwrap();
}

#[test]
Expand All @@ -226,7 +228,6 @@ mod tests {
let soft_error = r#"
recovery-read-block-size = "1KB"
recovery-threads = 0
bytes-per-sync = "0KB"
target-file-size = "5000MB"
format-version = 2
enable-log-recycle = true
Expand All @@ -236,7 +237,6 @@ mod tests {
soft_sanitized.sanitize().unwrap();
assert!(soft_sanitized.recovery_read_block_size.0 >= MIN_RECOVERY_READ_BLOCK_SIZE as u64);
assert!(soft_sanitized.recovery_threads >= MIN_RECOVERY_THREADS);
assert_eq!(soft_sanitized.bytes_per_sync.0, u64::MAX);
assert_eq!(
soft_sanitized.purge_rewrite_threshold.unwrap(),
soft_sanitized.target_file_size
Expand Down
28 changes: 12 additions & 16 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,12 @@ where
if let Some(mut group) = self.write_barrier.enter(&mut writer) {
let now = Instant::now();
let _t = StopWatch::new_with(&*ENGINE_WRITE_LEADER_DURATION_HISTOGRAM, now);
let file_context = self.pipe_log.fetch_active_file(LogQueue::Append);
for writer in group.iter_mut() {
writer.entered_time = Some(now);
sync |= writer.sync;
let log_batch = writer.mut_payload();
let res = if !log_batch.is_empty() {
log_batch.prepare_write(&file_context)?;
self.pipe_log
.append(LogQueue::Append, log_batch.encoded_bytes())
self.pipe_log.append(LogQueue::Append, log_batch)
} else {
// TODO(tabokie): use Option<FileBlockHandle> instead.
Ok(FileBlockHandle {
Expand All @@ -166,16 +163,11 @@ where
};
writer.set_output(res);
}
debug_assert!(
file_context.id == self.pipe_log.fetch_active_file(LogQueue::Append).id
);
perf_context!(log_write_duration).observe_since(now);
if let Err(e) = self.pipe_log.maybe_sync(LogQueue::Append, sync) {
panic!(
"Cannot sync {:?} queue due to IO error: {}",
LogQueue::Append,
e
);
if sync {
// As per trait protocol, this error should be retriable. But we panic anyway to
// save the trouble of propagating it to other group members.
self.pipe_log.sync(LogQueue::Append).expect("pipe::sync()");
}
// Pass the perf context diff to all the writers.
let diff = get_perf_context();
Expand Down Expand Up @@ -1258,8 +1250,12 @@ mod tests {
check_purge(vec![1, 2, 3]);
}

// 10th, rewrited
check_purge(vec![]);
// 10th, rewritten, but still needs to be compacted.
check_purge(vec![1, 2, 3]);
for rid in 1..=3 {
let memtable = engine.memtables.get(rid).unwrap();
assert_eq!(memtable.read().rewrite_count(), 50);
}

// compact and write some new data to trigger compact again.
for rid in 2..=50 {
Expand Down Expand Up @@ -1454,7 +1450,7 @@ mod tests {
assert_eq!(engine.file_span(LogQueue::Append).0, old_active_file + 1);
let old_active_file = engine.file_span(LogQueue::Rewrite).1;
engine.purge_manager.must_rewrite_rewrite_queue();
assert_eq!(engine.file_span(LogQueue::Rewrite).0, old_active_file + 1);
assert!(engine.file_span(LogQueue::Rewrite).0 > old_active_file);

let engine = engine.reopen();
for rid in 1..=3 {
Expand Down
37 changes: 17 additions & 20 deletions src/env/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,6 @@ impl LogFd {
close(self.0).map_err(|e| from_nix_error(e, "close"))
}

/// Synchronizes all in-memory data of the file except metadata to the
/// filesystem.
pub fn sync(&self) -> IoResult<()> {
fail_point!("log_fd::sync::err", |_| {
Err(from_nix_error(nix::Error::EINVAL, "fp"))
});
#[cfg(target_os = "linux")]
{
nix::unistd::fdatasync(self.0).map_err(|e| from_nix_error(e, "fdatasync"))
}
#[cfg(not(target_os = "linux"))]
{
nix::unistd::fsync(self.0).map_err(|e| from_nix_error(e, "fsync"))
}
}

/// Reads some bytes starting at `offset` from this file into the specified
/// buffer. Returns how many bytes were read.
pub fn read(&self, mut offset: usize, buf: &mut [u8]) -> IoResult<usize> {
Expand Down Expand Up @@ -168,13 +152,15 @@ impl LogFd {
}

impl Handle for LogFd {
#[inline]
fn truncate(&self, offset: usize) -> IoResult<()> {
fail_point!("log_fd::truncate::err", |_| {
Err(from_nix_error(nix::Error::EINVAL, "fp"))
});
ftruncate(self.0, offset as i64).map_err(|e| from_nix_error(e, "ftruncate"))
}

#[inline]
fn file_size(&self) -> IoResult<usize> {
fail_point!("log_fd::file_size::err", |_| {
Err(from_nix_error(nix::Error::EINVAL, "fp"))
Expand All @@ -183,6 +169,21 @@ impl Handle for LogFd {
.map(|n| n as usize)
.map_err(|e| from_nix_error(e, "lseek"))
}

#[inline]
fn sync(&self) -> IoResult<()> {
fail_point!("log_fd::sync::err", |_| {
Err(from_nix_error(nix::Error::EINVAL, "fp"))
});
#[cfg(target_os = "linux")]
{
nix::unistd::fdatasync(self.0).map_err(|e| from_nix_error(e, "fdatasync"))
}
#[cfg(not(target_os = "linux"))]
{
nix::unistd::fsync(self.0).map_err(|e| from_nix_error(e, "fsync"))
}
}
}

impl Drop for LogFd {
Expand Down Expand Up @@ -251,10 +252,6 @@ impl WriteExt for LogFile {
Ok(())
}

fn sync(&mut self) -> IoResult<()> {
self.inner.sync()
}

fn allocate(&mut self, offset: usize, size: usize) -> IoResult<()> {
self.inner.allocate(offset, size)
}
Expand Down
3 changes: 2 additions & 1 deletion src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,12 @@ pub trait Handle {

/// Returns the current size of this file.
fn file_size(&self) -> Result<usize>;

fn sync(&self) -> Result<()>;
}

/// WriteExt is writer extension api
pub trait WriteExt {
fn truncate(&mut self, offset: usize) -> Result<()>;
fn sync(&mut self) -> Result<()>;
fn allocate(&mut self, offset: usize, size: usize) -> Result<()>;
}
4 changes: 0 additions & 4 deletions src/env/obfuscated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,6 @@ impl WriteExt for ObfuscatedWriter {
self.0.truncate(offset)
}

fn sync(&mut self) -> IoResult<()> {
self.0.sync()
}

fn allocate(&mut self, offset: usize, size: usize) -> IoResult<()> {
self.0.allocate(offset, size)
}
Expand Down
17 changes: 4 additions & 13 deletions src/file_pipe_log/log_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ pub(super) fn build_file_writer<F: FileSystem>(

/// Append-only writer for log file. It also handles the file header write.
pub struct LogFileWriter<F: FileSystem> {
handle: Arc<F::Handle>,
writer: F::Writer,
written: usize,
capacity: usize,
last_sync: usize,
}

impl<F: FileSystem> LogFileWriter<F> {
Expand All @@ -52,10 +52,10 @@ impl<F: FileSystem> LogFileWriter<F> {
) -> Result<Self> {
let file_size = handle.file_size()?;
let mut f = Self {
handle,
writer,
written: file_size,
capacity: file_size,
last_sync: file_size,
};
// TODO: add tests for file_size in [header_len, max_encode_len].
if file_size < LogFileFormat::encode_len(format.version) || force_reset {
Expand All @@ -68,7 +68,6 @@ impl<F: FileSystem> LogFileWriter<F> {

fn write_header(&mut self, format: LogFileFormat) -> Result<()> {
self.writer.seek(SeekFrom::Start(0))?;
self.last_sync = 0;
self.written = 0;
let mut buf = Vec::with_capacity(LogFileFormat::encode_len(format.version));
format.encode(&mut buf)?;
Expand Down Expand Up @@ -121,19 +120,11 @@ impl<F: FileSystem> LogFileWriter<F> {
}

pub fn sync(&mut self) -> Result<()> {
if self.last_sync < self.written {
let _t = StopWatch::new(&*LOG_SYNC_DURATION_HISTOGRAM);
self.writer.sync()?;
self.last_sync = self.written;
}
let _t = StopWatch::new(&*LOG_SYNC_DURATION_HISTOGRAM);
self.handle.sync()?;
Ok(())
}

#[inline]
pub fn since_last_sync(&self) -> usize {
self.written - self.last_sync
}

#[inline]
pub fn offset(&self) -> usize {
self.written
Expand Down
Loading