Skip to content

Commit

Permalink
fix: Batch::commit being too optimistic
Browse files Browse the repository at this point in the history
also remove heap allocation in Batch::commit
  • Loading branch information
marvin-j97 committed Dec 13, 2024
1 parent 0d6cbd2 commit 6566556
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 102 deletions.
37 changes: 18 additions & 19 deletions src/batch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,24 @@ impl Batch {
return Err(crate::Error::Poisoned);
}

let batch_seqno = self.keyspace.seqno.next();

let _ = journal_writer.write_batch(self.data.iter(), self.data.len(), batch_seqno);

if let Some(mode) = self.durability {
if let Err(e) = journal_writer.persist(mode) {
self.keyspace.is_poisoned.store(true, Ordering::Release);

log::error!(
"persist failed, which is a FATAL, and possibly hardware-related, failure: {e:?}"
);

return Err(crate::Error::Poisoned);
}
}

drop(journal_writer);

// NOTE: Fully (write) lock, so the batch can be committed atomically
log::trace!("batch: Acquiring partitions lock");
let partitions = self.keyspace.partitions.write().expect("lock is poisoned");
Expand Down Expand Up @@ -127,11 +145,6 @@ impl Batch {
lock_map
};

let batch_seqno = self.keyspace.seqno.next();

let items = self.data.iter().collect::<Vec<_>>();
let _ = journal_writer.write_batch(&items, batch_seqno)?;

#[allow(clippy::mutable_key_type)]
let mut partitions_with_possible_stall = HashSet::new();

Expand Down Expand Up @@ -164,20 +177,6 @@ impl Batch {
drop(locked_memtables);
drop(partitions);

if let Some(mode) = self.durability {
if let Err(e) = journal_writer.flush(mode) {
self.keyspace.is_poisoned.store(true, Ordering::Release);

log::error!(
"flush failed, which is a FATAL, and possibly hardware-related, failure: {e:?}"
);

return Err(crate::Error::Poisoned);
}
}

drop(journal_writer);

// IMPORTANT: Add batch size to current write buffer size
// Otherwise write buffer growth is unbounded when using batches
self.keyspace.write_buffer_manager.allocate(batch_size);
Expand Down
142 changes: 62 additions & 80 deletions src/journal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,12 @@ mod tests {
let mut writer = journal.get_writer();

writer.write_batch(
&[
&BatchItem::new("default", *b"a", *b"a", ValueType::Value),
&BatchItem::new("default", *b"b", *b"b", ValueType::Value),
],
[
BatchItem::new("default", *b"a", *b"a", ValueType::Value),
BatchItem::new("default", *b"b", *b"b", ValueType::Value),
]
.iter(),
2,
0,
)?;
writer.rotate()?;
Expand Down Expand Up @@ -155,28 +157,34 @@ mod tests {
let mut writer = journal.get_writer();

writer.write_batch(
&[
&BatchItem::new("default", *b"a", *b"a", ValueType::Value),
&BatchItem::new("default", *b"b", *b"b", ValueType::Value),
],
[
BatchItem::new("default", *b"a", *b"a", ValueType::Value),
BatchItem::new("default", *b"b", *b"b", ValueType::Value),
]
.iter(),
2,
0,
)?;
writer.rotate()?;

writer.write_batch(
&[
&BatchItem::new("default2", *b"c", *b"c", ValueType::Value),
&BatchItem::new("default2", *b"d", *b"d", ValueType::Value),
],
[
BatchItem::new("default2", *b"c", *b"c", ValueType::Value),
BatchItem::new("default2", *b"d", *b"d", ValueType::Value),
]
.iter(),
2,
1,
)?;
writer.rotate()?;

writer.write_batch(
&[
&BatchItem::new("default3", *b"c", *b"c", ValueType::Value),
&BatchItem::new("default3", *b"d", *b"d", ValueType::Value),
],
[
BatchItem::new("default3", *b"c", *b"c", ValueType::Value),
BatchItem::new("default3", *b"d", *b"d", ValueType::Value),
]
.iter(),
2,
1,
)?;
}
Expand Down Expand Up @@ -211,10 +219,12 @@ mod tests {
let mut writer = journal.get_writer();

writer.write_batch(
&[
&BatchItem::new("default", *b"a", *b"a", ValueType::Value),
&BatchItem::new("default", *b"b", *b"b", ValueType::Value),
],
[
BatchItem::new("default", *b"a", *b"a", ValueType::Value),
BatchItem::new("default", *b"b", *b"b", ValueType::Value),
]
.iter(),
2,
0,
)?;
writer.rotate()?;
Expand All @@ -240,23 +250,22 @@ mod tests {
let path = dir.path().join("0");

let values = [
&BatchItem::new("default", *b"abc", *b"def", ValueType::Value),
&BatchItem::new("default", *b"yxc", *b"ghj", ValueType::Value),
BatchItem::new("default", *b"abc", *b"def", ValueType::Value),
BatchItem::new("default", *b"yxc", *b"ghj", ValueType::Value),
];

{
let journal = Journal::create_new(&path)?;
journal.get_writer().write_batch(&values, 0)?;
journal
.get_writer()
.write_batch(values.iter(), values.len(), 0)?;
}

{
let journal = Journal::from_file(&path)?;
let reader = journal.get_reader()?;
let collected = reader.flatten().collect::<Vec<_>>();
assert_eq!(
values.into_iter().cloned().collect::<Vec<_>>(),
collected.first().unwrap().items
);
assert_eq!(values.to_vec(), collected.first().unwrap().items);
}

// Mangle journal
Expand All @@ -270,10 +279,7 @@ mod tests {
let journal = Journal::from_file(&path)?;
let reader = journal.get_reader()?;
let collected = reader.flatten().collect::<Vec<_>>();
assert_eq!(
values.into_iter().cloned().collect::<Vec<_>>(),
collected.first().unwrap().items
);
assert_eq!(values.to_vec(), collected.first().unwrap().items);
}

// Mangle journal
Expand All @@ -287,10 +293,7 @@ mod tests {
let journal = Journal::from_file(&path)?;
let reader = journal.get_reader()?;
let collected = reader.flatten().collect::<Vec<_>>();
assert_eq!(
values.into_iter().cloned().collect::<Vec<_>>(),
collected.first().unwrap().items
);
assert_eq!(values.to_vec(), collected.first().unwrap().items);
}

Ok(())
Expand All @@ -302,23 +305,22 @@ mod tests {
let path = dir.path().join("0");

let values = [
&BatchItem::new("default", *b"abc", *b"def", ValueType::Value),
&BatchItem::new("default", *b"yxc", *b"ghj", ValueType::Value),
BatchItem::new("default", *b"abc", *b"def", ValueType::Value),
BatchItem::new("default", *b"yxc", *b"ghj", ValueType::Value),
];

{
let journal = Journal::create_new(&path)?;
journal.get_writer().write_batch(&values, 0)?;
journal
.get_writer()
.write_batch(values.iter(), values.len(), 0)?;
}

{
let journal = Journal::from_file(&path)?;
let reader = journal.get_reader()?;
let collected = reader.flatten().collect::<Vec<_>>();
assert_eq!(
values.into_iter().cloned().collect::<Vec<_>>(),
collected.first().unwrap().items
);
assert_eq!(values.to_vec(), collected.first().unwrap().items);
}

// Mangle journal
Expand All @@ -337,10 +339,7 @@ mod tests {
let journal = Journal::from_file(&path)?;
let reader = journal.get_reader()?;
let collected = reader.flatten().collect::<Vec<_>>();
assert_eq!(
values.into_iter().cloned().collect::<Vec<_>>(),
collected.first().unwrap().items
);
assert_eq!(values.to_vec(), collected.first().unwrap().items);
}

// Mangle journal
Expand All @@ -359,10 +358,7 @@ mod tests {
let journal = Journal::from_file(&path)?;
let reader = journal.get_reader()?;
let collected = reader.flatten().collect::<Vec<_>>();
assert_eq!(
values.into_iter().cloned().collect::<Vec<_>>(),
collected.first().unwrap().items
);
assert_eq!(values.to_vec(), collected.first().unwrap().items);
}

Ok(())
Expand All @@ -374,23 +370,22 @@ mod tests {
let path = dir.path().join("0");

let values = [
&BatchItem::new("default", *b"abc", *b"def", ValueType::Value),
&BatchItem::new("default", *b"yxc", *b"ghj", ValueType::Value),
BatchItem::new("default", *b"abc", *b"def", ValueType::Value),
BatchItem::new("default", *b"yxc", *b"ghj", ValueType::Value),
];

{
let journal = Journal::create_new(&path)?;
journal.get_writer().write_batch(&values, 0)?;
journal
.get_writer()
.write_batch(values.iter(), values.len(), 0)?;
}

{
let journal = Journal::from_file(&path)?;
let reader = journal.get_reader()?;
let collected = reader.flatten().collect::<Vec<_>>();
assert_eq!(
values.into_iter().cloned().collect::<Vec<_>>(),
collected.first().unwrap().items
);
assert_eq!(values.to_vec(), collected.first().unwrap().items);
}

// Mangle journal
Expand All @@ -404,10 +399,7 @@ mod tests {
let journal = Journal::from_file(&path)?;
let reader = journal.get_reader()?;
let collected = reader.flatten().collect::<Vec<_>>();
assert_eq!(
values.into_iter().cloned().collect::<Vec<_>>(),
collected.first().unwrap().items
);
assert_eq!(values.to_vec(), collected.first().unwrap().items);
}

// Mangle journal
Expand All @@ -421,10 +413,7 @@ mod tests {
let journal = Journal::from_file(&path)?;
let reader = journal.get_reader()?;
let collected = reader.flatten().collect::<Vec<_>>();
assert_eq!(
values.into_iter().cloned().collect::<Vec<_>>(),
collected.first().unwrap().items
);
assert_eq!(values.to_vec(), collected.first().unwrap().items);
}

Ok(())
Expand All @@ -436,23 +425,22 @@ mod tests {
let path = dir.path().join("0");

let values = [
&BatchItem::new("default", *b"abc", *b"def", ValueType::Value),
&BatchItem::new("default", *b"yxc", *b"ghj", ValueType::Value),
BatchItem::new("default", *b"abc", *b"def", ValueType::Value),
BatchItem::new("default", *b"yxc", *b"ghj", ValueType::Value),
];

{
let journal = Journal::create_new(&path)?;
journal.get_writer().write_batch(&values, 0)?;
journal
.get_writer()
.write_batch(values.iter(), values.len(), 0)?;
}

{
let journal = Journal::from_file(&path)?;
let reader = journal.get_reader()?;
let collected = reader.flatten().collect::<Vec<_>>();
assert_eq!(
values.into_iter().cloned().collect::<Vec<_>>(),
collected.first().unwrap().items
);
assert_eq!(values.to_vec(), collected.first().unwrap().items);
}

// Mangle journal
Expand All @@ -473,10 +461,7 @@ mod tests {
let journal = Journal::from_file(&path)?;
let reader = journal.get_reader()?;
let collected = reader.flatten().collect::<Vec<_>>();
assert_eq!(
values.into_iter().cloned().collect::<Vec<_>>(),
collected.first().unwrap().items
);
assert_eq!(values.to_vec(), collected.first().unwrap().items);
}

// Mangle journal
Expand All @@ -497,10 +482,7 @@ mod tests {
let journal = Journal::from_file(&path)?;
let reader = journal.get_reader()?;
let collected = reader.flatten().collect::<Vec<_>>();
assert_eq!(
values.into_iter().cloned().collect::<Vec<_>>(),
collected.first().unwrap().items
);
assert_eq!(values.to_vec(), collected.first().unwrap().items);
}

Ok(())
Expand Down
11 changes: 8 additions & 3 deletions src/journal/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,13 @@ impl Writer {
Ok(byte_count)
}

pub fn write_batch(&mut self, items: &[&BatchItem], seqno: SeqNo) -> crate::Result<usize> {
if items.is_empty() {
pub fn write_batch<'a>(
&mut self,
items: impl Iterator<Item = &'a BatchItem>,
batch_size: usize,
seqno: SeqNo,
) -> crate::Result<usize> {
if batch_size == 0 {
return Ok(0);
}

Expand All @@ -225,7 +230,7 @@ impl Writer {

// NOTE: entries.len() is surely never > u32::MAX
#[allow(clippy::cast_possible_truncation)]
let item_count = items.len() as u32;
let item_count = batch_size as u32;

let mut hasher = xxhash_rust::xxh3::Xxh3::new();
let mut byte_count = 0;
Expand Down

0 comments on commit 6566556

Please sign in to comment.