From 087d36b0f899b251f6c7aaf01caef14f96b1b351 Mon Sep 17 00:00:00 2001 From: Xinye Tao Date: Thu, 8 Sep 2022 17:00:09 +0800 Subject: [PATCH] fix panic when reusing a was-empty batch (#267) * add a test case Signed-off-by: tabokie * fix the bug Signed-off-by: tabokie * update changelog Signed-off-by: tabokie * Address comment Signed-off-by: Xinye Tao Signed-off-by: tabokie Signed-off-by: Xinye Tao --- CHANGELOG.md | 1 + src/engine.rs | 66 +++++++++++++++++++++++++++++++++------------------ 2 files changed, 44 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8dd8aca4..3294cfaf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * Unconditionally tolerate `fallocate` failures as a fix to its portability issue. Errors other than `EOPNOTSUPP` will still emit a warning. * Avoid leaving fractured write after failure by reseeking the file writer. Panic if the reseek fails as well. +* Fix panic when an empty batch is written to engine and then reused. ### New Features diff --git a/src/engine.rs b/src/engine.rs index bf9df9ef..df14cfa2 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -136,8 +136,12 @@ where /// bytes. If `sync` is true, the write will be followed by a call to /// `fdatasync` on the log file. pub fn write(&self, log_batch: &mut LogBatch, mut sync: bool) -> Result { + if log_batch.is_empty() { + return Ok(0); + } let start = Instant::now(); let len = log_batch.finish_populate(self.cfg.batch_compression_threshold.0 as usize)?; + debug_assert!(len > 0); let block_handle = { let mut writer = Writer::new(log_batch, sync); // Snapshot and clear the current perf context temporarily, so the write group @@ -151,16 +155,7 @@ where writer.entered_time = Some(now); sync |= writer.sync; let log_batch = writer.mut_payload(); - let res = if !log_batch.is_empty() { - self.pipe_log.append(LogQueue::Append, log_batch) - } else { - // TODO(tabokie): use Option instead. - Ok(FileBlockHandle { - id: FileId::new(LogQueue::Append, 0), - offset: 0, - len: 0, - }) - }; + let res = self.pipe_log.append(LogQueue::Append, log_batch); writer.set_output(res); } perf_context!(log_write_duration).observe_since(now); @@ -185,20 +180,17 @@ where set_perf_context(perf_context); writer.finish()? }; - let mut now = Instant::now(); - if len > 0 { - log_batch.finish_write(block_handle); - self.memtables.apply_append_writes(log_batch.drain()); - for listener in &self.listeners { - listener.post_apply_memtables(block_handle.id); - } - let end = Instant::now(); - let apply_duration = end.saturating_duration_since(now); - ENGINE_WRITE_APPLY_DURATION_HISTOGRAM.observe(apply_duration.as_secs_f64()); - perf_context!(apply_duration).observe(apply_duration); - now = end; - } + log_batch.finish_write(block_handle); + self.memtables.apply_append_writes(log_batch.drain()); + for listener in &self.listeners { + listener.post_apply_memtables(block_handle.id); + } + let end = Instant::now(); + let apply_duration = end.saturating_duration_since(now); + ENGINE_WRITE_APPLY_DURATION_HISTOGRAM.observe(apply_duration.as_secs_f64()); + perf_context!(apply_duration).observe(apply_duration); + now = end; ENGINE_WRITE_DURATION_HISTOGRAM.observe(now.saturating_duration_since(start).as_secs_f64()); ENGINE_WRITE_SIZE_HISTOGRAM.observe(len as f64); Ok(len) @@ -1393,6 +1385,34 @@ mod tests { ); } + #[test] + fn test_empty_batch() { + let dir = tempfile::Builder::new() + .prefix("test_empty_batch") + .tempdir() + .unwrap(); + let cfg = Config { + dir: dir.path().to_str().unwrap().to_owned(), + ..Default::default() + }; + let engine = + RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())) + .unwrap(); + let data = vec![b'x'; 16]; + let cases = [[false, false], [false, true], [true, true]]; + for (i, writes) in cases.iter().enumerate() { + let rid = i as u64; + let mut batch = LogBatch::default(); + for &has_data in writes { + if has_data { + batch.put(rid, b"key".to_vec(), data.clone()); + } + engine.write(&mut batch, true).unwrap(); + assert!(batch.is_empty()); + } + } + } + #[test] fn test_dirty_recovery() { let dir = tempfile::Builder::new()