Skip to content

Commit

Permalink
refactor(storage): fix writing cells to file; finish tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pashinov authored and 0xdeafbeef committed Mar 20, 2024
1 parent bfa9305 commit 644f95d
Show file tree
Hide file tree
Showing 15 changed files with 235 additions and 244 deletions.
3 changes: 1 addition & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,4 @@ target/
perf.data*
.scratch

.DS_Store
storage/tmp/
.DS_Store
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ serde_json = "1.0.114"
tracing-appender = "0.2.3"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing-test = "0.2"
tempfile = "3.10"

[lints]
workspace = true
10 changes: 9 additions & 1 deletion storage/src/db/file_db/mapped_file.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::fs;
use std::path::Path;

use anyhow::Result;
Expand All @@ -17,7 +18,14 @@ impl MappedFile {
where
P: AsRef<Path>,
{
let file_db = FileDb::open(path)?;
let file_db = FileDb::new(
path,
fs::OpenOptions::new()
.write(true)
.read(true)
.truncate(true)
.create(true),
)?;
file_db.file.set_len(length as u64)?;

Self::from_existing_file(file_db)
Expand Down
54 changes: 22 additions & 32 deletions storage/src/db/file_db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,67 +2,57 @@ use std::fs::File;
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};

use anyhow::{Context, Result};
use everscale_types::models::*;

pub use mapped_file::MappedFile;

mod mapped_file;

pub struct FileDb {
file: File,
path: PathBuf,
_path: PathBuf,
}

impl FileDb {
pub fn open<P>(path: P) -> Result<Self>
pub fn new<P>(path: P, options: &mut std::fs::OpenOptions) -> std::io::Result<Self>
where
P: AsRef<Path>,
{
let file = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.read(true)
.open(&path)
.context(format!("Failed to create file {:?}", path.as_ref()))?;
let file = options.open(&path)?;

Ok(Self {
file,
path: PathBuf::from(path.as_ref()),
_path: PathBuf::from(path.as_ref()),
})
}

pub fn write(&mut self, buf: &[u8]) -> Result<()> {
self.file.write(buf)?;
Ok(())
}

pub fn write_all(&mut self, buf: &[u8]) -> Result<()> {
self.file.write_all(buf)?;
Ok(())
}

pub fn flush(&mut self) -> Result<()> {
self.file.flush()?;
Ok(())
pub fn file(&self) -> &File {
&self.file
}
}

pub fn seek(&mut self, pos: SeekFrom) -> Result<()> {
self.file.seek(pos)?;
Ok(())
impl Write for FileDb {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.file.write(buf)
}

pub fn file(&self) -> &File {
&self.file
#[inline]
fn flush(&mut self) -> std::io::Result<()> {
self.file.flush()
}
}

pub fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
impl Read for FileDb {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let bytes = self.file.read(buf)?;
Ok(bytes)
}
}

impl Seek for FileDb {
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
self.file.seek(pos)
}
}

impl Into<File> for FileDb {
fn into(self) -> File {
self.file
Expand Down
1 change: 0 additions & 1 deletion storage/src/db/kv_db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::thread::available_parallelism;

use anyhow::{Context, Result};
use bytesize::ByteSize;
use serde::{Deserialize, Serialize};
use weedb::{Caches, WeeDb};

pub use weedb::Stats as RocksdbStats;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use anyhow::{Context, Result};
use everscale_types::cell::CellDescriptor;
use smallvec::SmallVec;

use tycho_util::FastHashMap;
Expand All @@ -29,8 +30,13 @@ impl<'a> CellWriter<'a> {
#[allow(unused)]
pub fn write(&self, root_hash: &[u8; 32], is_cancelled: Option<Arc<AtomicBool>>) -> Result<()> {
// Open target file in advance to get the error immediately (if any)
let file_path = self.base_path.join(hex::encode(root_hash));
let file_db = FileDb::open(file_path)?;
let file_db = FileDb::new(
self.base_path,
fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true),
)?;

// Load cells from db in reverse order into the temp file
tracing::info!("started loading cells");
Expand Down Expand Up @@ -99,13 +105,13 @@ impl<'a> CellWriter<'a> {
.file
.read_exact(&mut cell_buffer[..cell_size as usize])?;

let d1 = cell_buffer[0];
let d2 = cell_buffer[1];
let ref_count = (d1 & 7) as usize;
let data_size = ((d2 >> 1) + (d2 & 1 != 0) as u8) as usize;
let descriptor = CellDescriptor {
d1: cell_buffer[0],
d2: cell_buffer[1],
};

let ref_offset = 2 + data_size;
for r in 0..ref_count {
let ref_offset = 2 + descriptor.byte_len() as usize;
for r in 0..descriptor.reference_count() as usize {
let ref_offset = ref_offset + r * REF_SIZE;
let slice = &mut cell_buffer[ref_offset..ref_offset + REF_SIZE];

Expand All @@ -121,11 +127,10 @@ impl<'a> CellWriter<'a> {
Ok(())
}

pub fn remove(&self, root_hash: &[u8; 32]) -> Result<()> {
let file_path = self.base_path.join(hex::encode(root_hash));
fs::remove_file(&file_path).context(format!(
pub fn remove(&self) -> Result<()> {
fs::remove_file(&self.base_path).context(format!(
"Failed to remove persistent state file {:?}",
file_path
self.base_path
))
}
}
Expand All @@ -150,18 +155,21 @@ fn write_rev_cells<P: AsRef<Path>>(

struct LoadedCell {
hash: [u8; 32],
d1: u8,
d2: u8,
descriptor: CellDescriptor,
data: SmallVec<[u8; 128]>,
indices: SmallVec<[u32; 4]>,
}

let file_path = base_path
.as_ref()
.join(hex::encode(root_hash))
.with_extension("temp");
let file_path = base_path.as_ref().with_extension("temp");

let file_db = FileDb::open(&file_path)?;
let file_db = FileDb::new(
&file_path,
fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true),
)?;
let remove_on_drop = RemoveOnDrop(file_path);

let raw = db.raw().as_ref();
Expand Down Expand Up @@ -197,12 +205,17 @@ fn write_rev_cells<P: AsRef<Path>>(
.get_pinned_cf_opt(&cf, hash, read_options)?
.ok_or(CellWriterError::CellNotFound)?;

let value = value.as_ref();
let value = match crate::refcount::strip_refcount(value.as_ref()) {
Some(bytes) => bytes,
None => {
return Err(CellWriterError::CellNotFound.into());
}
};
if value.is_empty() {
return Err(CellWriterError::InvalidCell.into());
}

let (d1, d2, data) = deserialize_cell(&value[1..], &mut references_buffer)
let (descriptor, data) = deserialize_cell(value, &mut references_buffer)
.ok_or(CellWriterError::InvalidCell)?;

let mut reference_indices = SmallVec::with_capacity(references_buffer.len());
Expand Down Expand Up @@ -242,8 +255,7 @@ fn write_rev_cells<P: AsRef<Path>>(
index,
StackItem::Loaded(LoadedCell {
hash,
d1,
d2,
descriptor,
data: SmallVec::from_slice(data),
indices: reference_indices,
}),
Expand Down Expand Up @@ -283,7 +295,7 @@ fn write_rev_cells<P: AsRef<Path>>(
cell_sizes.push(cell_size as u8);
total_size += cell_size as u64;

temp_file_buffer.write_all(&[loaded.d1, loaded.d2])?;
temp_file_buffer.write_all(&[loaded.descriptor.d1, loaded.descriptor.d2])?;
temp_file_buffer.write_all(&loaded.data)?;
for index in loaded.indices {
let index = remap.get(&index).with_context(|| {
Expand All @@ -309,64 +321,38 @@ fn write_rev_cells<P: AsRef<Path>>(
fn deserialize_cell<'a>(
value: &'a [u8],
references_buffer: &mut SmallVec<[[u8; 32]; 4]>,
) -> Option<(u8, u8, &'a [u8])> {
) -> Option<(CellDescriptor, &'a [u8])> {
let mut index = Index {
value_len: value.len(),
offset: 0,
};

index.require(3)?;
let cell_type = value[*index];
index.advance(1);
let bit_length = u16::from_le_bytes((&value[*index..*index + 2]).try_into().unwrap());
index.advance(2);
index.require(4)?;
let mut descriptor = CellDescriptor::new([value[*index], value[*index + 1]]);
descriptor.d1 &= !CellDescriptor::STORE_HASHES_MASK;

let d2 = (((bit_length >> 2) as u8) & !0b1) | ((bit_length % 8 != 0) as u8);
index.advance(2);
let bit_length = u16::from_le_bytes([value[*index], value[*index + 1]]);
index.advance(2);

// TODO: Replace with `(big_length + 7) / 8`
let data_len = ((d2 >> 1) + u8::from(d2 & 1 != 0)) as usize;
let data_len = descriptor.byte_len() as usize;
index.require(data_len)?;
let data = &value[*index..*index + data_len];
index.advance(data_len);

// NOTE: additional byte is required here due to internal structure
index.advance(((bit_length + 8) / 8) as usize);

index.require(1)?;
let level_mask = value[*index];
// skip store_hashes
index.advance(2);

index.require(2)?;
let has_hashes = value[*index];
index.advance(1);
if has_hashes != 0 {
let count = value[*index];
index.advance(1 + (count * 32) as usize);
}

index.require(2)?;
let has_depths = value[*index];
index.advance(1);
if has_depths != 0 {
let count = value[*index];
index.advance(1 + (count * 2) as usize);
}

index.require(1)?;
let reference_count = value[*index];
index.advance(1);
assert_eq!((bit_length as usize + 7) / 8, data_len);

let d1 = reference_count | (((cell_type != 0x01) as u8) << 3) | (level_mask << 5);
index.advance((32 + 2) * descriptor.hash_count() as usize);

for _ in 0..reference_count {
for _ in 0..descriptor.reference_count() {
index.require(32)?;
let mut hash = [0; 32];
hash.copy_from_slice(&value[*index..*index + 32]);
references_buffer.push(hash);
index.advance(32);
}

Some((d1, d2, data))
Some((descriptor, data))
}

#[cfg(not(target_os = "macos"))]
Expand Down Expand Up @@ -411,7 +397,7 @@ struct Index {
impl Index {
#[inline(always)]
fn require(&self, len: usize) -> Option<()> {
if self.offset + len < self.value_len {
if self.offset + len <= self.value_len {
Some(())
} else {
None
Expand Down
Loading

0 comments on commit 644f95d

Please sign in to comment.