diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d2b9962..8dd8aca4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,8 @@ * Add metadata deletion capability to `FileSystem` trait. Users can implement `exists_metadata` and `delete_metadata` to clean up obsolete metadata from older versions of Raft Engine. * Add `Engine::scan_messages` and `Engine::scan_raw_messages` for iterating over written key-values. * Add `Engine::get` for getting raw value. +* Move `sync` from `env::WriteExt` to `env::Handle`. +* Deprecate `bytes_per_sync`. ### Behavior Changes diff --git a/src/config.rs b/src/config.rs index 7ae133fe..5851cb71 100644 --- a/src/config.rs +++ b/src/config.rs @@ -49,11 +49,12 @@ pub struct Config { /// /// Default: "8KB" pub batch_compression_threshold: ReadableSize, + /// Deprecated. /// Incrementally sync log files after specified bytes have been written. /// Setting it to zero disables incremental sync. /// /// Default: "4MB" - pub bytes_per_sync: ReadableSize, + pub bytes_per_sync: Option, /// Version of the log file. /// @@ -101,7 +102,7 @@ impl Default for Config { recovery_read_block_size: ReadableSize::kb(16), recovery_threads: 4, batch_compression_threshold: ReadableSize::kb(8), - bytes_per_sync: ReadableSize::mb(4), + bytes_per_sync: None, format_version: Version::V2, target_file_size: ReadableSize::mb(128), purge_threshold: ReadableSize::gb(10), @@ -114,7 +115,6 @@ impl Default for Config { #[cfg(test)] { cfg.memory_limit = Some(ReadableSize(0)); - cfg.format_version = Version::V2; cfg.enable_log_recycle = true; } cfg @@ -132,8 +132,8 @@ impl Config { self.target_file_size.0, ))); } - if self.bytes_per_sync.0 == 0 { - self.bytes_per_sync = ReadableSize(u64::MAX); + if self.bytes_per_sync.is_some() { + warn!("bytes-per-sync has been deprecated."); } let min_recovery_read_block_size = ReadableSize(MIN_RECOVERY_READ_BLOCK_SIZE as u64); if self.recovery_read_block_size < min_recovery_read_block_size { @@ -204,14 +204,16 @@ mod tests { target-file-size = "1MB" purge-threshold = "3MB" format-version = 1 + enable-log-recycle = false "#; - let load: Config = toml::from_str(custom).unwrap(); + let mut load: Config = toml::from_str(custom).unwrap(); assert_eq!(load.dir, "custom_dir"); assert_eq!(load.recovery_mode, RecoveryMode::TolerateTailCorruption); - assert_eq!(load.bytes_per_sync, ReadableSize::kb(2)); + assert_eq!(load.bytes_per_sync, Some(ReadableSize::kb(2))); assert_eq!(load.target_file_size, ReadableSize::mb(1)); assert_eq!(load.purge_threshold, ReadableSize::mb(3)); assert_eq!(load.format_version, Version::V1); + load.sanitize().unwrap(); } #[test] @@ -226,7 +228,6 @@ mod tests { let soft_error = r#" recovery-read-block-size = "1KB" recovery-threads = 0 - bytes-per-sync = "0KB" target-file-size = "5000MB" format-version = 2 enable-log-recycle = true @@ -236,7 +237,6 @@ mod tests { soft_sanitized.sanitize().unwrap(); assert!(soft_sanitized.recovery_read_block_size.0 >= MIN_RECOVERY_READ_BLOCK_SIZE as u64); assert!(soft_sanitized.recovery_threads >= MIN_RECOVERY_THREADS); - assert_eq!(soft_sanitized.bytes_per_sync.0, u64::MAX); assert_eq!( soft_sanitized.purge_rewrite_threshold.unwrap(), soft_sanitized.target_file_size diff --git a/src/engine.rs b/src/engine.rs index 78e28c2e..bf9df9ef 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -147,15 +147,12 @@ where if let Some(mut group) = self.write_barrier.enter(&mut writer) { let now = Instant::now(); let _t = StopWatch::new_with(&*ENGINE_WRITE_LEADER_DURATION_HISTOGRAM, now); - let file_context = self.pipe_log.fetch_active_file(LogQueue::Append); for writer in group.iter_mut() { writer.entered_time = Some(now); sync |= writer.sync; let log_batch = writer.mut_payload(); let res = if !log_batch.is_empty() { - log_batch.prepare_write(&file_context)?; - self.pipe_log - .append(LogQueue::Append, log_batch.encoded_bytes()) + self.pipe_log.append(LogQueue::Append, log_batch) } else { // TODO(tabokie): use Option instead. Ok(FileBlockHandle { @@ -166,16 +163,11 @@ where }; writer.set_output(res); } - debug_assert!( - file_context.id == self.pipe_log.fetch_active_file(LogQueue::Append).id - ); perf_context!(log_write_duration).observe_since(now); - if let Err(e) = self.pipe_log.maybe_sync(LogQueue::Append, sync) { - panic!( - "Cannot sync {:?} queue due to IO error: {}", - LogQueue::Append, - e - ); + if sync { + // As per trait protocol, this error should be retriable. But we panic anyway to + // save the trouble of propagating it to other group members. + self.pipe_log.sync(LogQueue::Append).expect("pipe::sync()"); } // Pass the perf context diff to all the writers. let diff = get_perf_context(); @@ -1258,8 +1250,12 @@ mod tests { check_purge(vec![1, 2, 3]); } - // 10th, rewrited - check_purge(vec![]); + // 10th, rewritten, but still needs to be compacted. + check_purge(vec![1, 2, 3]); + for rid in 1..=3 { + let memtable = engine.memtables.get(rid).unwrap(); + assert_eq!(memtable.read().rewrite_count(), 50); + } // compact and write some new data to trigger compact again. for rid in 2..=50 { @@ -1454,7 +1450,7 @@ mod tests { assert_eq!(engine.file_span(LogQueue::Append).0, old_active_file + 1); let old_active_file = engine.file_span(LogQueue::Rewrite).1; engine.purge_manager.must_rewrite_rewrite_queue(); - assert_eq!(engine.file_span(LogQueue::Rewrite).0, old_active_file + 1); + assert!(engine.file_span(LogQueue::Rewrite).0 > old_active_file); let engine = engine.reopen(); for rid in 1..=3 { diff --git a/src/env/default.rs b/src/env/default.rs index 4c5de77d..451ee9fe 100644 --- a/src/env/default.rs +++ b/src/env/default.rs @@ -74,22 +74,6 @@ impl LogFd { close(self.0).map_err(|e| from_nix_error(e, "close")) } - /// Synchronizes all in-memory data of the file except metadata to the - /// filesystem. - pub fn sync(&self) -> IoResult<()> { - fail_point!("log_fd::sync::err", |_| { - Err(from_nix_error(nix::Error::EINVAL, "fp")) - }); - #[cfg(target_os = "linux")] - { - nix::unistd::fdatasync(self.0).map_err(|e| from_nix_error(e, "fdatasync")) - } - #[cfg(not(target_os = "linux"))] - { - nix::unistd::fsync(self.0).map_err(|e| from_nix_error(e, "fsync")) - } - } - /// Reads some bytes starting at `offset` from this file into the specified /// buffer. Returns how many bytes were read. pub fn read(&self, mut offset: usize, buf: &mut [u8]) -> IoResult { @@ -168,6 +152,7 @@ impl LogFd { } impl Handle for LogFd { + #[inline] fn truncate(&self, offset: usize) -> IoResult<()> { fail_point!("log_fd::truncate::err", |_| { Err(from_nix_error(nix::Error::EINVAL, "fp")) @@ -175,6 +160,7 @@ impl Handle for LogFd { ftruncate(self.0, offset as i64).map_err(|e| from_nix_error(e, "ftruncate")) } + #[inline] fn file_size(&self) -> IoResult { fail_point!("log_fd::file_size::err", |_| { Err(from_nix_error(nix::Error::EINVAL, "fp")) @@ -183,6 +169,21 @@ impl Handle for LogFd { .map(|n| n as usize) .map_err(|e| from_nix_error(e, "lseek")) } + + #[inline] + fn sync(&self) -> IoResult<()> { + fail_point!("log_fd::sync::err", |_| { + Err(from_nix_error(nix::Error::EINVAL, "fp")) + }); + #[cfg(target_os = "linux")] + { + nix::unistd::fdatasync(self.0).map_err(|e| from_nix_error(e, "fdatasync")) + } + #[cfg(not(target_os = "linux"))] + { + nix::unistd::fsync(self.0).map_err(|e| from_nix_error(e, "fsync")) + } + } } impl Drop for LogFd { @@ -251,10 +252,6 @@ impl WriteExt for LogFile { Ok(()) } - fn sync(&mut self) -> IoResult<()> { - self.inner.sync() - } - fn allocate(&mut self, offset: usize, size: usize) -> IoResult<()> { self.inner.allocate(offset, size) } diff --git a/src/env/mod.rs b/src/env/mod.rs index 50cce189..6ae4bf9d 100644 --- a/src/env/mod.rs +++ b/src/env/mod.rs @@ -56,11 +56,12 @@ pub trait Handle { /// Returns the current size of this file. fn file_size(&self) -> Result; + + fn sync(&self) -> Result<()>; } /// WriteExt is writer extension api pub trait WriteExt { fn truncate(&mut self, offset: usize) -> Result<()>; - fn sync(&mut self) -> Result<()>; fn allocate(&mut self, offset: usize, size: usize) -> Result<()>; } diff --git a/src/env/obfuscated.rs b/src/env/obfuscated.rs index 18057696..831f5343 100644 --- a/src/env/obfuscated.rs +++ b/src/env/obfuscated.rs @@ -57,10 +57,6 @@ impl WriteExt for ObfuscatedWriter { self.0.truncate(offset) } - fn sync(&mut self) -> IoResult<()> { - self.0.sync() - } - fn allocate(&mut self, offset: usize, size: usize) -> IoResult<()> { self.0.allocate(offset, size) } diff --git a/src/file_pipe_log/log_file.rs b/src/file_pipe_log/log_file.rs index 5a621ff6..f977f695 100644 --- a/src/file_pipe_log/log_file.rs +++ b/src/file_pipe_log/log_file.rs @@ -37,10 +37,10 @@ pub(super) fn build_file_writer( /// Append-only writer for log file. It also handles the file header write. pub struct LogFileWriter { + handle: Arc, writer: F::Writer, written: usize, capacity: usize, - last_sync: usize, } impl LogFileWriter { @@ -52,10 +52,10 @@ impl LogFileWriter { ) -> Result { let file_size = handle.file_size()?; let mut f = Self { + handle, writer, written: file_size, capacity: file_size, - last_sync: file_size, }; // TODO: add tests for file_size in [header_len, max_encode_len]. if file_size < LogFileFormat::encode_len(format.version) || force_reset { @@ -68,7 +68,6 @@ impl LogFileWriter { fn write_header(&mut self, format: LogFileFormat) -> Result<()> { self.writer.seek(SeekFrom::Start(0))?; - self.last_sync = 0; self.written = 0; let mut buf = Vec::with_capacity(LogFileFormat::encode_len(format.version)); format.encode(&mut buf)?; @@ -121,19 +120,11 @@ impl LogFileWriter { } pub fn sync(&mut self) -> Result<()> { - if self.last_sync < self.written { - let _t = StopWatch::new(&*LOG_SYNC_DURATION_HISTOGRAM); - self.writer.sync()?; - self.last_sync = self.written; - } + let _t = StopWatch::new(&*LOG_SYNC_DURATION_HISTOGRAM); + self.handle.sync()?; Ok(()) } - #[inline] - pub fn since_last_sync(&self) -> usize { - self.written - self.last_sync - } - #[inline] pub fn offset(&self) -> usize { self.written diff --git a/src/file_pipe_log/pipe.rs b/src/file_pipe_log/pipe.rs index 870d568a..81f65c72 100644 --- a/src/file_pipe_log/pipe.rs +++ b/src/file_pipe_log/pipe.rs @@ -14,7 +14,9 @@ use crate::config::Config; use crate::env::FileSystem; use crate::event_listener::EventListener; use crate::metrics::*; -use crate::pipe_log::{FileBlockHandle, FileId, FileSeq, LogFileContext, LogQueue, PipeLog}; +use crate::pipe_log::{ + FileBlockHandle, FileId, FileSeq, LogFileContext, LogQueue, PipeLog, ReactiveBytes, +}; use crate::{perf_context, Error, Result}; use super::format::{FileNameExt, LogFileFormat}; @@ -119,7 +121,6 @@ pub(super) struct SinglePipe { dir: String, file_format: LogFileFormat, target_file_size: usize, - bytes_per_sync: usize, file_system: Arc, listeners: Vec>, @@ -223,7 +224,6 @@ impl SinglePipe { dir: cfg.dir.clone(), file_format: LogFileFormat::new(cfg.format_version, alignment), target_file_size: cfg.target_file_size.0 as usize, - bytes_per_sync: cfg.bytes_per_sync.0 as usize, file_system, listeners, @@ -349,12 +349,24 @@ impl SinglePipe { reader.read(handle) } - fn append(&self, bytes: &[u8]) -> Result { + fn append(&self, bytes: &mut T) -> Result { fail_point!("file_pipe_log::append"); let mut active_file = self.active_file.lock(); + if active_file.writer.offset() >= self.target_file_size { + if let Err(e) = self.rotate_imp(&mut active_file) { + panic!( + "error when rotate [{:?}:{}]: {}", + self.queue, active_file.seq, e + ); + } + } + let seq = active_file.seq; - #[cfg(feature = "failpoints")] let format = active_file.format; + let ctx = LogFileContext { + id: FileId::new(self.queue, seq), + version: format.version, + }; let writer = &mut active_file.writer; #[cfg(feature = "failpoints")] @@ -378,7 +390,7 @@ impl SinglePipe { } } let start_offset = writer.offset(); - if let Err(e) = writer.write(bytes, self.target_file_size) { + if let Err(e) = writer.write(bytes.as_bytes(&ctx), self.target_file_size) { if let Err(te) = writer.truncate() { panic!( "error when truncate {} after error: {}, get: {}", @@ -401,15 +413,11 @@ impl SinglePipe { Ok(handle) } - fn maybe_sync(&self, force: bool) -> Result<()> { + fn sync(&self) -> Result<()> { let mut active_file = self.active_file.lock(); let seq = active_file.seq; let writer = &mut active_file.writer; - if writer.offset() >= self.target_file_size { - if let Err(e) = self.rotate_imp(&mut active_file) { - panic!("error when rotate [{:?}:{}]: {}", self.queue, seq, e); - } - } else if writer.since_last_sync() >= self.bytes_per_sync || force { + { let _t = StopWatch::new(perf_context!(log_sync_duration)); if let Err(e) = writer.sync() { panic!("error when sync [{:?}:{}]: {}", self.queue, seq, e,); @@ -468,14 +476,6 @@ impl SinglePipe { self.flush_metrics(current.total_len); Ok((current.first_seq_in_use - prev.first_seq_in_use) as usize) } - - fn fetch_active_file(&self) -> LogFileContext { - let files = self.files.read(); - LogFileContext { - id: FileId::new(self.queue, files.first_seq + files.fds.len() as u64 - 1), - version: files.fds.back().unwrap().format.version, - } - } } /// A [`PipeLog`] implementation that stores data in filesystem. @@ -516,13 +516,17 @@ impl PipeLog for DualPipes { } #[inline] - fn append(&self, queue: LogQueue, bytes: &[u8]) -> Result { + fn append( + &self, + queue: LogQueue, + bytes: &mut T, + ) -> Result { self.pipes[queue as usize].append(bytes) } #[inline] - fn maybe_sync(&self, queue: LogQueue, force: bool) -> Result<()> { - self.pipes[queue as usize].maybe_sync(force) + fn sync(&self, queue: LogQueue) -> Result<()> { + self.pipes[queue as usize].sync() } #[inline] @@ -544,11 +548,6 @@ impl PipeLog for DualPipes { fn purge_to(&self, file_id: FileId) -> Result { self.pipes[file_id.queue as usize].purge_to(file_id.seq) } - - #[inline] - fn fetch_active_file(&self, queue: LogQueue) -> LogFileContext { - self.pipes[queue as usize].fetch_active_file() - } } #[cfg(test)] @@ -614,7 +613,6 @@ mod tests { let cfg = Config { dir: path.to_owned(), target_file_size: ReadableSize::kb(1), - bytes_per_sync: ReadableSize::kb(32), ..Default::default() }; let queue = LogQueue::Append; @@ -626,17 +624,17 @@ mod tests { // generate file 1, 2, 3 let content: Vec = vec![b'a'; 1024]; - let file_handle = pipe_log.append(queue, &content).unwrap(); - pipe_log.maybe_sync(queue, false).unwrap(); + let file_handle = pipe_log.append(queue, &mut &content).unwrap(); assert_eq!(file_handle.id.seq, 1); assert_eq!(file_handle.offset, header_size); - assert_eq!(pipe_log.file_span(queue).1, 2); + assert_eq!(pipe_log.file_span(queue).1, 1); - let file_handle = pipe_log.append(queue, &content).unwrap(); - pipe_log.maybe_sync(queue, false).unwrap(); + let file_handle = pipe_log.append(queue, &mut &content).unwrap(); assert_eq!(file_handle.id.seq, 2); assert_eq!(file_handle.offset, header_size); - assert_eq!(pipe_log.file_span(queue).1, 3); + assert_eq!(pipe_log.file_span(queue).1, 2); + + pipe_log.rotate(queue).unwrap(); // purge file 1 assert_eq!(pipe_log.purge_to(FileId { queue, seq: 2 }).unwrap(), 1); @@ -647,13 +645,11 @@ mod tests { // append position let s_content = b"short content".to_vec(); - let file_handle = pipe_log.append(queue, &s_content).unwrap(); - pipe_log.maybe_sync(queue, false).unwrap(); + let file_handle = pipe_log.append(queue, &mut &s_content).unwrap(); assert_eq!(file_handle.id.seq, 3); assert_eq!(file_handle.offset, header_size); - let file_handle = pipe_log.append(queue, &s_content).unwrap(); - pipe_log.maybe_sync(queue, false).unwrap(); + let file_handle = pipe_log.append(queue, &mut &s_content).unwrap(); assert_eq!(file_handle.id.seq, 3); assert_eq!( file_handle.offset, @@ -679,11 +675,6 @@ mod tests { // leave only 1 file to truncate pipe_log.purge_to(FileId { queue, seq: 3 }).unwrap(); assert_eq!(pipe_log.file_span(queue), (3, 3)); - - // fetch active file - let file_context = pipe_log.fetch_active_file(LogQueue::Append); - assert_eq!(file_context.version, cfg.format_version); - assert_eq!(file_context.id.seq, 3); } #[test] @@ -770,7 +761,6 @@ mod tests { let cfg = Config { dir: path.to_owned(), target_file_size: ReadableSize(1), - bytes_per_sync: ReadableSize::kb(32), // super large capacity for recycling purge_threshold: ReadableSize::mb(100), enable_log_recycle: true, @@ -787,9 +777,10 @@ mod tests { } let mut handles = Vec::new(); for i in 0..10 { - handles.push(pipe_log.append(&content(i)).unwrap()); - pipe_log.maybe_sync(true).unwrap(); + handles.push(pipe_log.append(&mut &content(i)).unwrap()); + pipe_log.sync().unwrap(); } + pipe_log.rotate().unwrap(); let (first, last) = pipe_log.file_span(); assert_eq!(pipe_log.purge_to(last).unwrap() as u64, last - first); // Try to read stale file. @@ -808,8 +799,8 @@ mod tests { // Try to reuse. let mut handles = Vec::new(); for i in 0..10 { - handles.push(pipe_log.append(&content(i + 1)).unwrap()); - pipe_log.maybe_sync(true).unwrap(); + handles.push(pipe_log.append(&mut &content(i + 1)).unwrap()); + pipe_log.sync().unwrap(); } // Verify the data. for (i, handle) in handles.into_iter().enumerate() { diff --git a/src/log_batch.rs b/src/log_batch.rs index 5d3f0676..714120cd 100644 --- a/src/log_batch.rs +++ b/src/log_batch.rs @@ -11,7 +11,7 @@ use protobuf::Message; use crate::codec::{self, NumberEncoder}; use crate::memtable::EntryIndex; use crate::metrics::StopWatch; -use crate::pipe_log::{FileBlockHandle, FileId, LogFileContext}; +use crate::pipe_log::{FileBlockHandle, FileId, LogFileContext, ReactiveBytes}; use crate::util::{crc32, lz4}; use crate::{perf_context, Error, Result}; @@ -902,6 +902,13 @@ impl LogBatch { } } +impl ReactiveBytes for LogBatch { + fn as_bytes(&mut self, ctx: &LogFileContext) -> &[u8] { + self.prepare_write(ctx).unwrap(); + self.encoded_bytes() + } +} + /// Verifies the checksum of a slice of bytes that sequentially holds data and /// checksum. The checksum field may be signed by XOR-ing with an u32. fn verify_checksum_with_signature(buf: &[u8], signature: Option) -> Result<()> { diff --git a/src/memtable.rs b/src/memtable.rs index 2c6fc055..0cb80fbc 100644 --- a/src/memtable.rs +++ b/src/memtable.rs @@ -721,16 +721,12 @@ impl MemTable { } } - /// Returns the number of entries smaller than or equal to `gate`. - pub fn entries_count_before(&self, mut gate: FileId) -> usize { - gate.seq += 1; - let idx = self - .entry_indexes - .binary_search_by_key(&gate, |ei| ei.entries.unwrap().id); - match idx { - Ok(idx) => idx, - Err(idx) => idx, - } + #[inline] + pub fn has_at_least_some_entries_before(&self, gate: FileId, count: usize) -> bool { + debug_assert!(count > 0); + self.entry_indexes + .get(count - 1) + .map_or(false, |ei| ei.entries.unwrap().id.seq <= gate.seq) } /// Returns the region ID. diff --git a/src/pipe_log.rs b/src/pipe_log.rs index ff392cb6..aa9b7925 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -146,23 +146,45 @@ impl LogFileContext { } } +/// Some bytes whose value might be dependent on the file it is written to. +pub trait ReactiveBytes { + fn as_bytes(&mut self, ctx: &LogFileContext) -> &[u8]; +} + +impl ReactiveBytes for &T +where + T: AsRef<[u8]> + ?Sized, +{ + fn as_bytes(&mut self, _ctx: &LogFileContext) -> &[u8] { + (*self).as_ref() + } +} + /// A `PipeLog` serves reads and writes over multiple queues of log files. +/// +/// # Safety +/// +/// The pipe will panic if it encounters an unrecoverable failure. Otherwise the +/// operations on it should be atomic, i.e. failed operation will not affect +/// other ones, and user can still use it afterwards without breaking +/// consistency. pub trait PipeLog: Sized { /// Reads some bytes from the specified position. fn read_bytes(&self, handle: FileBlockHandle) -> Result>; /// Appends some bytes to the specified log queue. Returns file position of /// the written bytes. - /// - /// The result of `fetch_active_file` will not be affected by this method. - fn append(&self, queue: LogQueue, bytes: &[u8]) -> Result; + fn append( + &self, + queue: LogQueue, + bytes: &mut T, + ) -> Result; - /// Hints it to synchronize buffered writes. The synchronization is - /// mandotory when `sync` is true. + /// Synchronizes all buffered writes. /// /// This operation might incurs a great latency overhead. It's advised to /// call it once every batch of writes. - fn maybe_sync(&self, queue: LogQueue, sync: bool) -> Result<()>; + fn sync(&self, queue: LogQueue) -> Result<()>; /// Returns the smallest and largest file sequence number, still in use, /// of the specified log queue. @@ -194,8 +216,4 @@ pub trait PipeLog: Sized { /// /// Returns the number of deleted files. fn purge_to(&self, file_id: FileId) -> Result; - - /// Returns [`LogFileContext`] of the active file in the specific - /// log queue. - fn fetch_active_file(&self, queue: LogQueue) -> LogFileContext; } diff --git a/src/purge.rs b/src/purge.rs index 9be81c5b..cfefd219 100644 --- a/src/purge.rs +++ b/src/purge.rs @@ -76,7 +76,7 @@ where let mut should_compact = HashSet::new(); if self.needs_rewrite_log_files(LogQueue::Rewrite) { should_compact.extend(self.rewrite_rewrite_queue()?); - self.purge_to( + self.rescan_memtables_and_purge_stale_files( LogQueue::Rewrite, self.pipe_log.file_span(LogQueue::Rewrite).1, )?; @@ -110,7 +110,10 @@ where if append_queue_barrier == first_append && first_append < latest_append { warn!("Unable to purge expired files: blocked by barrier"); } - self.purge_to(LogQueue::Append, append_queue_barrier)?; + self.rescan_memtables_and_purge_stale_files( + LogQueue::Append, + append_queue_barrier, + )?; } } Ok(should_compact.into_iter().collect()) @@ -139,7 +142,7 @@ where if exit_after_step == Some(2) { return; } - self.purge_to( + self.rescan_memtables_and_purge_stale_files( LogQueue::Append, self.pipe_log.file_span(LogQueue::Append).1, ) @@ -150,7 +153,7 @@ where pub fn must_rewrite_rewrite_queue(&self) { let _lk = self.force_rewrite_candidates.try_lock().unwrap(); self.rewrite_rewrite_queue().unwrap(); - self.purge_to( + self.rescan_memtables_and_purge_stale_files( LogQueue::Rewrite, self.pipe_log.file_span(LogQueue::Rewrite).1, ) @@ -208,23 +211,32 @@ where let mut new_candidates = HashMap::with_capacity(rewrite_candidates.len()); let memtables = self.memtables.collect(|t| { - if let Some(f) = t.min_file_seq(LogQueue::Append) { - let sparse = t - .entries_count_before(FileId::new(LogQueue::Append, rewrite_watermark)) - < MAX_REWRITE_ENTRIES_PER_REGION; - // counter is the times that target region triggers force compact. - let compact_counter = rewrite_candidates.get(&t.region_id()).unwrap_or(&0); - if f < compact_watermark - && !sparse - && *compact_counter < MAX_COUNT_BEFORE_FORCE_REWRITE - { + let min_append_seq = t.min_file_seq(LogQueue::Append).unwrap_or(u64::MAX); + let old = min_append_seq < compact_watermark || t.rewrite_count() > 0; + let has_something_to_rewrite = min_append_seq <= rewrite_watermark; + let append_heavy = t.has_at_least_some_entries_before( + FileId::new(LogQueue::Append, rewrite_watermark), + MAX_REWRITE_ENTRIES_PER_REGION + t.rewrite_count(), + ); + let full_heavy = t.has_at_least_some_entries_before( + FileId::new(LogQueue::Append, rewrite_watermark), + MAX_REWRITE_ENTRIES_PER_REGION, + ); + // counter is the times that target region triggers force compact. + let compact_counter = rewrite_candidates.get(&t.region_id()).unwrap_or(&0); + if old && full_heavy { + if *compact_counter < MAX_COUNT_BEFORE_FORCE_REWRITE { + // repeatedly ask user to compact these heavy regions. should_compact.push(t.region_id()); new_candidates.insert(t.region_id(), *compact_counter + 1); - } else if f < rewrite_watermark { - return sparse || *compact_counter >= MAX_COUNT_BEFORE_FORCE_REWRITE; + return false; + } else { + // user is not responsive, do the rewrite ourselves. + should_compact.push(t.region_id()); + return has_something_to_rewrite; } } - false + !append_heavy && has_something_to_rewrite }); self.rewrite_memtables( @@ -266,7 +278,7 @@ where } // Exclusive. - fn purge_to(&self, queue: LogQueue, seq: FileSeq) -> Result<()> { + fn rescan_memtables_and_purge_stale_files(&self, queue: LogQueue, seq: FileSeq) -> Result<()> { let min_seq = self.memtables.fold(seq, |min, t| { t.min_file_seq(queue).map_or(min, |m| std::cmp::min(min, m)) }); @@ -347,16 +359,15 @@ where rewrite_watermark: Option, sync: bool, ) -> Result<()> { - let len = log_batch.finish_populate(self.cfg.batch_compression_threshold.0 as usize)?; - if len == 0 { - return self.pipe_log.maybe_sync(LogQueue::Rewrite, sync); + if log_batch.is_empty() { + debug_assert!(sync); + return self.pipe_log.sync(LogQueue::Rewrite); + } + log_batch.finish_populate(self.cfg.batch_compression_threshold.0 as usize)?; + let file_handle = self.pipe_log.append(LogQueue::Rewrite, log_batch)?; + if sync { + self.pipe_log.sync(LogQueue::Rewrite)? } - let file_context = self.pipe_log.fetch_active_file(LogQueue::Rewrite); - log_batch.prepare_write(&file_context)?; - let file_handle = self - .pipe_log - .append(LogQueue::Rewrite, log_batch.encoded_bytes())?; - self.pipe_log.maybe_sync(LogQueue::Rewrite, sync)?; log_batch.finish_write(file_handle); self.memtables.apply_rewrite_writes( log_batch.drain(), diff --git a/tests/failpoints/test_engine.rs b/tests/failpoints/test_engine.rs index d042f35a..7a51d717 100644 --- a/tests/failpoints/test_engine.rs +++ b/tests/failpoints/test_engine.rs @@ -127,7 +127,7 @@ fn test_pipe_log_listeners() { assert_eq!(hook.0[&LogQueue::Append].appends(), i); assert_eq!(hook.0[&LogQueue::Append].applys(), i); } - assert_eq!(hook.0[&LogQueue::Append].files(), 11); + assert_eq!(hook.0[&LogQueue::Append].files(), 10); engine.purge_expired_files().unwrap(); assert_eq!(hook.0[&LogQueue::Append].purged(), 8); @@ -154,7 +154,7 @@ fn test_pipe_log_listeners() { assert_eq!(hook.0[&LogQueue::Append].applys(), 32); engine.purge_expired_files().unwrap(); - assert_eq!(hook.0[&LogQueue::Append].purged(), 13); + assert_eq!(hook.0[&LogQueue::Append].purged(), 14); assert_eq!(hook.0[&LogQueue::Rewrite].purged(), rewrite_files as u64); // Write region 3 without applying. diff --git a/tests/failpoints/test_io_error.rs b/tests/failpoints/test_io_error.rs index 219cc630..52302327 100644 --- a/tests/failpoints/test_io_error.rs +++ b/tests/failpoints/test_io_error.rs @@ -84,7 +84,6 @@ fn test_file_write_error() { .unwrap(); let cfg = Config { dir: dir.path().to_str().unwrap().to_owned(), - bytes_per_sync: ReadableSize::kb(1024), target_file_size: ReadableSize::kb(1024), ..Default::default() }; @@ -133,7 +132,6 @@ fn test_file_rotate_error() { .unwrap(); let cfg = Config { dir: dir.path().to_str().unwrap().to_owned(), - bytes_per_sync: ReadableSize::kb(1024), target_file_size: ReadableSize::kb(4), ..Default::default() }; @@ -150,6 +148,9 @@ fn test_file_rotate_error() { engine .write(&mut generate_batch(1, 3, 4, Some(&entry)), false) .unwrap(); + engine + .write(&mut generate_batch(1, 4, 5, Some(&entry)), false) + .unwrap(); assert_eq!(engine.file_span(LogQueue::Append).1, 1); // The next write will be followed by a rotate. { @@ -209,7 +210,6 @@ fn test_concurrent_write_error() { .unwrap(); let cfg = Config { dir: dir.path().to_str().unwrap().to_owned(), - bytes_per_sync: ReadableSize::kb(1024), target_file_size: ReadableSize::kb(1024), ..Default::default() }; @@ -295,7 +295,6 @@ fn test_non_atomic_write_error() { .unwrap(); let cfg = Config { dir: dir.path().to_str().unwrap().to_owned(), - bytes_per_sync: ReadableSize::kb(1024), target_file_size: ReadableSize::kb(1024), ..Default::default() };