diff --git a/src/batch/mod.rs b/src/batch/mod.rs index bcfefa9..ce7a936 100644 --- a/src/batch/mod.rs +++ b/src/batch/mod.rs @@ -95,6 +95,24 @@ impl Batch { return Err(crate::Error::Poisoned); } + let batch_seqno = self.keyspace.seqno.next(); + + let _ = journal_writer.write_batch(self.data.iter(), self.data.len(), batch_seqno); + + if let Some(mode) = self.durability { + if let Err(e) = journal_writer.persist(mode) { + self.keyspace.is_poisoned.store(true, Ordering::Release); + + log::error!( + "persist failed, which is a FATAL, and possibly hardware-related, failure: {e:?}" + ); + + return Err(crate::Error::Poisoned); + } + } + + drop(journal_writer); + // NOTE: Fully (write) lock, so the batch can be committed atomically log::trace!("batch: Acquiring partitions lock"); let partitions = self.keyspace.partitions.write().expect("lock is poisoned"); @@ -127,11 +145,6 @@ impl Batch { lock_map }; - let batch_seqno = self.keyspace.seqno.next(); - - let items = self.data.iter().collect::>(); - let _ = journal_writer.write_batch(&items, batch_seqno)?; - #[allow(clippy::mutable_key_type)] let mut partitions_with_possible_stall = HashSet::new(); @@ -164,20 +177,6 @@ impl Batch { drop(locked_memtables); drop(partitions); - if let Some(mode) = self.durability { - if let Err(e) = journal_writer.flush(mode) { - self.keyspace.is_poisoned.store(true, Ordering::Release); - - log::error!( - "flush failed, which is a FATAL, and possibly hardware-related, failure: {e:?}" - ); - - return Err(crate::Error::Poisoned); - } - } - - drop(journal_writer); - // IMPORTANT: Add batch size to current write buffer size // Otherwise write buffer growth is unbounded when using batches self.keyspace.write_buffer_manager.allocate(batch_size); diff --git a/src/journal/mod.rs b/src/journal/mod.rs index 7cd9e85..52e59ee 100644 --- a/src/journal/mod.rs +++ b/src/journal/mod.rs @@ -124,10 +124,12 @@ mod tests { let mut writer = journal.get_writer(); writer.write_batch( - &[ - &BatchItem::new("default", *b"a", *b"a", ValueType::Value), - &BatchItem::new("default", *b"b", *b"b", ValueType::Value), - ], + [ + BatchItem::new("default", *b"a", *b"a", ValueType::Value), + BatchItem::new("default", *b"b", *b"b", ValueType::Value), + ] + .iter(), + 2, 0, )?; writer.rotate()?; @@ -155,28 +157,34 @@ mod tests { let mut writer = journal.get_writer(); writer.write_batch( - &[ - &BatchItem::new("default", *b"a", *b"a", ValueType::Value), - &BatchItem::new("default", *b"b", *b"b", ValueType::Value), - ], + [ + BatchItem::new("default", *b"a", *b"a", ValueType::Value), + BatchItem::new("default", *b"b", *b"b", ValueType::Value), + ] + .iter(), + 2, 0, )?; writer.rotate()?; writer.write_batch( - &[ - &BatchItem::new("default2", *b"c", *b"c", ValueType::Value), - &BatchItem::new("default2", *b"d", *b"d", ValueType::Value), - ], + [ + BatchItem::new("default2", *b"c", *b"c", ValueType::Value), + BatchItem::new("default2", *b"d", *b"d", ValueType::Value), + ] + .iter(), + 2, 1, )?; writer.rotate()?; writer.write_batch( - &[ - &BatchItem::new("default3", *b"c", *b"c", ValueType::Value), - &BatchItem::new("default3", *b"d", *b"d", ValueType::Value), - ], + [ + BatchItem::new("default3", *b"c", *b"c", ValueType::Value), + BatchItem::new("default3", *b"d", *b"d", ValueType::Value), + ] + .iter(), + 2, 1, )?; } @@ -211,10 +219,12 @@ mod tests { let mut writer = journal.get_writer(); writer.write_batch( - &[ - &BatchItem::new("default", *b"a", *b"a", ValueType::Value), - &BatchItem::new("default", *b"b", *b"b", ValueType::Value), - ], + [ + BatchItem::new("default", *b"a", *b"a", ValueType::Value), + BatchItem::new("default", *b"b", *b"b", ValueType::Value), + ] + .iter(), + 2, 0, )?; writer.rotate()?; @@ -240,23 +250,22 @@ mod tests { let path = dir.path().join("0"); let values = [ - &BatchItem::new("default", *b"abc", *b"def", ValueType::Value), - &BatchItem::new("default", *b"yxc", *b"ghj", ValueType::Value), + BatchItem::new("default", *b"abc", *b"def", ValueType::Value), + BatchItem::new("default", *b"yxc", *b"ghj", ValueType::Value), ]; { let journal = Journal::create_new(&path)?; - journal.get_writer().write_batch(&values, 0)?; + journal + .get_writer() + .write_batch(values.iter(), values.len(), 0)?; } { let journal = Journal::from_file(&path)?; let reader = journal.get_reader()?; let collected = reader.flatten().collect::>(); - assert_eq!( - values.into_iter().cloned().collect::>(), - collected.first().unwrap().items - ); + assert_eq!(values.to_vec(), collected.first().unwrap().items); } // Mangle journal @@ -270,10 +279,7 @@ mod tests { let journal = Journal::from_file(&path)?; let reader = journal.get_reader()?; let collected = reader.flatten().collect::>(); - assert_eq!( - values.into_iter().cloned().collect::>(), - collected.first().unwrap().items - ); + assert_eq!(values.to_vec(), collected.first().unwrap().items); } // Mangle journal @@ -287,10 +293,7 @@ mod tests { let journal = Journal::from_file(&path)?; let reader = journal.get_reader()?; let collected = reader.flatten().collect::>(); - assert_eq!( - values.into_iter().cloned().collect::>(), - collected.first().unwrap().items - ); + assert_eq!(values.to_vec(), collected.first().unwrap().items); } Ok(()) @@ -302,23 +305,22 @@ mod tests { let path = dir.path().join("0"); let values = [ - &BatchItem::new("default", *b"abc", *b"def", ValueType::Value), - &BatchItem::new("default", *b"yxc", *b"ghj", ValueType::Value), + BatchItem::new("default", *b"abc", *b"def", ValueType::Value), + BatchItem::new("default", *b"yxc", *b"ghj", ValueType::Value), ]; { let journal = Journal::create_new(&path)?; - journal.get_writer().write_batch(&values, 0)?; + journal + .get_writer() + .write_batch(values.iter(), values.len(), 0)?; } { let journal = Journal::from_file(&path)?; let reader = journal.get_reader()?; let collected = reader.flatten().collect::>(); - assert_eq!( - values.into_iter().cloned().collect::>(), - collected.first().unwrap().items - ); + assert_eq!(values.to_vec(), collected.first().unwrap().items); } // Mangle journal @@ -337,10 +339,7 @@ mod tests { let journal = Journal::from_file(&path)?; let reader = journal.get_reader()?; let collected = reader.flatten().collect::>(); - assert_eq!( - values.into_iter().cloned().collect::>(), - collected.first().unwrap().items - ); + assert_eq!(values.to_vec(), collected.first().unwrap().items); } // Mangle journal @@ -359,10 +358,7 @@ mod tests { let journal = Journal::from_file(&path)?; let reader = journal.get_reader()?; let collected = reader.flatten().collect::>(); - assert_eq!( - values.into_iter().cloned().collect::>(), - collected.first().unwrap().items - ); + assert_eq!(values.to_vec(), collected.first().unwrap().items); } Ok(()) @@ -374,23 +370,22 @@ mod tests { let path = dir.path().join("0"); let values = [ - &BatchItem::new("default", *b"abc", *b"def", ValueType::Value), - &BatchItem::new("default", *b"yxc", *b"ghj", ValueType::Value), + BatchItem::new("default", *b"abc", *b"def", ValueType::Value), + BatchItem::new("default", *b"yxc", *b"ghj", ValueType::Value), ]; { let journal = Journal::create_new(&path)?; - journal.get_writer().write_batch(&values, 0)?; + journal + .get_writer() + .write_batch(values.iter(), values.len(), 0)?; } { let journal = Journal::from_file(&path)?; let reader = journal.get_reader()?; let collected = reader.flatten().collect::>(); - assert_eq!( - values.into_iter().cloned().collect::>(), - collected.first().unwrap().items - ); + assert_eq!(values.to_vec(), collected.first().unwrap().items); } // Mangle journal @@ -404,10 +399,7 @@ mod tests { let journal = Journal::from_file(&path)?; let reader = journal.get_reader()?; let collected = reader.flatten().collect::>(); - assert_eq!( - values.into_iter().cloned().collect::>(), - collected.first().unwrap().items - ); + assert_eq!(values.to_vec(), collected.first().unwrap().items); } // Mangle journal @@ -421,10 +413,7 @@ mod tests { let journal = Journal::from_file(&path)?; let reader = journal.get_reader()?; let collected = reader.flatten().collect::>(); - assert_eq!( - values.into_iter().cloned().collect::>(), - collected.first().unwrap().items - ); + assert_eq!(values.to_vec(), collected.first().unwrap().items); } Ok(()) @@ -436,23 +425,22 @@ mod tests { let path = dir.path().join("0"); let values = [ - &BatchItem::new("default", *b"abc", *b"def", ValueType::Value), - &BatchItem::new("default", *b"yxc", *b"ghj", ValueType::Value), + BatchItem::new("default", *b"abc", *b"def", ValueType::Value), + BatchItem::new("default", *b"yxc", *b"ghj", ValueType::Value), ]; { let journal = Journal::create_new(&path)?; - journal.get_writer().write_batch(&values, 0)?; + journal + .get_writer() + .write_batch(values.iter(), values.len(), 0)?; } { let journal = Journal::from_file(&path)?; let reader = journal.get_reader()?; let collected = reader.flatten().collect::>(); - assert_eq!( - values.into_iter().cloned().collect::>(), - collected.first().unwrap().items - ); + assert_eq!(values.to_vec(), collected.first().unwrap().items); } // Mangle journal @@ -473,10 +461,7 @@ mod tests { let journal = Journal::from_file(&path)?; let reader = journal.get_reader()?; let collected = reader.flatten().collect::>(); - assert_eq!( - values.into_iter().cloned().collect::>(), - collected.first().unwrap().items - ); + assert_eq!(values.to_vec(), collected.first().unwrap().items); } // Mangle journal @@ -497,10 +482,7 @@ mod tests { let journal = Journal::from_file(&path)?; let reader = journal.get_reader()?; let collected = reader.flatten().collect::>(); - assert_eq!( - values.into_iter().cloned().collect::>(), - collected.first().unwrap().items - ); + assert_eq!(values.to_vec(), collected.first().unwrap().items); } Ok(()) diff --git a/src/journal/writer.rs b/src/journal/writer.rs index fe8cbd4..ab5be78 100644 --- a/src/journal/writer.rs +++ b/src/journal/writer.rs @@ -214,8 +214,13 @@ impl Writer { Ok(byte_count) } - pub fn write_batch(&mut self, items: &[&BatchItem], seqno: SeqNo) -> crate::Result { - if items.is_empty() { + pub fn write_batch<'a>( + &mut self, + items: impl Iterator, + batch_size: usize, + seqno: SeqNo, + ) -> crate::Result { + if batch_size == 0 { return Ok(0); } @@ -225,7 +230,7 @@ impl Writer { // NOTE: entries.len() is surely never > u32::MAX #[allow(clippy::cast_possible_truncation)] - let item_count = items.len() as u32; + let item_count = batch_size as u32; let mut hasher = xxhash_rust::xxh3::Xxh3::new(); let mut byte_count = 0;