diff --git a/Cargo.lock b/Cargo.lock
index c62426cf6..d610a90b2 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -95,9 +95,9 @@ checksum = "0952808a6c2afd1aa8947271f3a60f1a6763c7b912d210184c5149b5cf147247"
[[package]]
name = "arc-swap"
-version = "1.7.0"
+version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7b3d0060af21e8d11a926981cc00c6c1541aa91dd64b9f881985c3da1094425f"
+checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
[[package]]
name = "argh"
@@ -118,7 +118,7 @@ dependencies = [
"argh_shared",
"proc-macro2",
"quote",
- "syn 2.0.53",
+ "syn 2.0.55",
]
[[package]]
@@ -235,7 +235,7 @@ dependencies = [
"regex",
"rustc-hash",
"shlex",
- "syn 2.0.53",
+ "syn 2.0.55",
]
[[package]]
@@ -406,7 +406,7 @@ dependencies = [
"heck",
"proc-macro2",
"quote",
- "syn 2.0.53",
+ "syn 2.0.55",
]
[[package]]
@@ -534,7 +534,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.53",
+ "syn 2.0.55",
]
[[package]]
@@ -607,7 +607,7 @@ checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.53",
+ "syn 2.0.55",
]
[[package]]
@@ -694,7 +694,7 @@ checksum = "323d8b61c76be2c16eb2d72d007f1542fdeb3760fdf2e2cae219fc0da3db0c09"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.53",
+ "syn 2.0.55",
]
[[package]]
@@ -742,7 +742,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.53",
+ "syn 2.0.55",
]
[[package]]
@@ -837,9 +837,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "itoa"
-version = "1.0.10"
+version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c"
+checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b"
[[package]]
name = "jobserver"
@@ -958,9 +958,9 @@ dependencies = [
[[package]]
name = "memchr"
-version = "2.7.1"
+version = "2.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149"
+checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d"
[[package]]
name = "minimal-lexical"
@@ -1183,7 +1183,7 @@ dependencies = [
"pest_meta",
"proc-macro2",
"quote",
- "syn 2.0.53",
+ "syn 2.0.55",
]
[[package]]
@@ -1227,9 +1227,9 @@ checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec"
[[package]]
name = "platforms"
-version = "3.3.0"
+version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "626dec3cac7cc0e1577a2ec3fc496277ec2baa084bebad95bb6fdbfae235f84c"
+checksum = "db23d408679286588f4d4644f965003d056e3dd5abcaaa938116871d7ce2fee7"
[[package]]
name = "powerfmt"
@@ -1245,12 +1245,12 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "prettyplease"
-version = "0.2.16"
+version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a41cf62165e97c7f814d2221421dbb9afcbcdb0a88068e5ea206e19951c2cbb5"
+checksum = "8d3928fb5db768cb86f891ff014f0144589297e3c6a1aba6ed7cecfdace270c7"
dependencies = [
"proc-macro2",
- "syn 2.0.53",
+ "syn 2.0.55",
]
[[package]]
@@ -1290,9 +1290,9 @@ dependencies = [
[[package]]
name = "quick_cache"
-version = "0.4.1"
+version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "58c20af3800cee5134b79a3bd4a3d4b583c16ccfa5f53338f46400851a5b3819"
+checksum = "b1380629287ed1247c1e0fcc6d43efdcec508b65382c9ab775cc8f3df7ca07b0"
dependencies = [
"ahash",
"equivalent",
@@ -1397,9 +1397,9 @@ dependencies = [
[[package]]
name = "rayon"
-version = "1.9.0"
+version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e4963ed1bc86e4f3ee217022bd855b297cef07fb9eac5dfa1f788b220b49b3bd"
+checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa"
dependencies = [
"either",
"rayon-core",
@@ -1438,14 +1438,14 @@ dependencies = [
[[package]]
name = "regex"
-version = "1.10.3"
+version = "1.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15"
+checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c"
dependencies = [
"aho-corasick",
"memchr",
"regex-automata 0.4.6",
- "regex-syntax 0.8.2",
+ "regex-syntax 0.8.3",
]
[[package]]
@@ -1465,7 +1465,7 @@ checksum = "86b83b8b9847f9bf95ef68afb0b8e6cdb80f498442f5179a29fad448fcc1eaea"
dependencies = [
"aho-corasick",
"memchr",
- "regex-syntax 0.8.2",
+ "regex-syntax 0.8.3",
]
[[package]]
@@ -1476,9 +1476,9 @@ checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
[[package]]
name = "regex-syntax"
-version = "0.8.2"
+version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
+checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56"
[[package]]
name = "ring"
@@ -1657,7 +1657,7 @@ checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.53",
+ "syn 2.0.55",
]
[[package]]
@@ -1808,9 +1808,9 @@ dependencies = [
[[package]]
name = "syn"
-version = "2.0.53"
+version = "2.0.55"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7383cd0e49fff4b6b90ca5670bfd3e9d6a733b3f90c686605aa7eec8c4996032"
+checksum = "002a1b3dbf967edfafc32655d0f377ab0bb7b994aa1d32c8cc7e9b8bf3ebb8f0"
dependencies = [
"proc-macro2",
"quote",
@@ -1879,7 +1879,7 @@ checksum = "c61f3ba182994efc43764a46c018c347bc492c79f024e705f46567b418f6d4f7"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.53",
+ "syn 2.0.55",
]
[[package]]
@@ -1971,7 +1971,7 @@ dependencies = [
"proc-macro2",
"quote",
"rustc-hash",
- "syn 2.0.53",
+ "syn 2.0.55",
"tl-scheme",
]
@@ -1990,9 +1990,9 @@ dependencies = [
[[package]]
name = "tokio"
-version = "1.36.0"
+version = "1.37.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931"
+checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787"
dependencies = [
"backtrace",
"bytes",
@@ -2015,7 +2015,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.53",
+ "syn 2.0.55",
]
[[package]]
@@ -2063,7 +2063,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.53",
+ "syn 2.0.55",
]
[[package]]
@@ -2418,7 +2418,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote",
- "syn 2.0.53",
+ "syn 2.0.55",
"wasm-bindgen-shared",
]
@@ -2440,7 +2440,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.53",
+ "syn 2.0.55",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
@@ -2698,7 +2698,7 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.53",
+ "syn 2.0.55",
]
[[package]]
@@ -2709,9 +2709,9 @@ checksum = "525b4ec142c6b68a2d10f01f7bbf6755599ca3f81ea53b8431b7dd348f5fdb2d"
[[package]]
name = "zstd-sys"
-version = "2.0.9+zstd.1.5.5"
+version = "2.0.10+zstd.1.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9e16efa8a874a0481a574084d34cc26fdb3b99627480f785888deb6386506656"
+checksum = "c253a4914af5bafc8fa8c86ee400827e83cf6ec01195ec1f1ed8441bf00d65aa"
dependencies = [
"cc",
"pkg-config",
diff --git a/storage/src/db/file_db/mapped_file.rs b/storage/src/db/file_db/mapped_file.rs
index 897c29f38..4594397aa 100644
--- a/storage/src/db/file_db/mapped_file.rs
+++ b/storage/src/db/file_db/mapped_file.rs
@@ -1,41 +1,32 @@
-use std::fs;
+use std::fs::File;
+use std::os::fd::AsRawFd;
use std::path::Path;
-use anyhow::Result;
-
-use crate::FileDb;
-
/// Memory buffer that is mapped to a file
pub struct MappedFile {
- file_db: FileDb,
+ file: File,
length: usize,
ptr: *mut libc::c_void,
}
impl MappedFile {
/// Opens a file and maps it to memory. Resizes the file to `length` bytes.
- pub fn new
(path: &P, length: usize) -> Result
- where
- P: AsRef,
- {
- let file_db = FileDb::new(
- path,
- fs::OpenOptions::new()
- .write(true)
- .read(true)
- .truncate(true)
- .create(true),
- )?;
- file_db.file.set_len(length as u64)?;
+ pub fn new>(path: P, length: usize) -> std::io::Result {
+ let file = std::fs::OpenOptions::new()
+ .write(true)
+ .read(true)
+ .truncate(true)
+ .create(true)
+ .open(path)?;
- Self::from_existing_file(file_db)
+ file.set_len(length as u64)?;
+
+ Self::from_existing_file(file)
}
/// Opens an existing file and maps it to memory
- pub fn from_existing_file(file_db: FileDb) -> Result {
- use std::os::unix::io::AsRawFd;
-
- let length = file_db.file.metadata()?.len() as usize;
+ pub fn from_existing_file(file: File) -> std::io::Result {
+ let length = file.metadata()?.len() as usize;
// SAFETY: File was opened successfully, file mode is RW, offset is aligned
let ptr = unsafe {
@@ -44,24 +35,20 @@ impl MappedFile {
length,
libc::PROT_READ | libc::PROT_WRITE,
libc::MAP_SHARED,
- file_db.file.as_raw_fd(),
+ file.as_raw_fd(),
0,
)
};
if ptr == libc::MAP_FAILED {
- return Err(std::io::Error::last_os_error().into());
+ return Err(std::io::Error::last_os_error());
}
if unsafe { libc::madvise(ptr, length, libc::MADV_RANDOM) } != 0 {
- return Err(std::io::Error::last_os_error().into());
+ return Err(std::io::Error::last_os_error());
}
- Ok(Self {
- file_db,
- length,
- ptr,
- })
+ Ok(Self { file, length, ptr })
}
/// Mapped buffer length in bytes
@@ -102,8 +89,8 @@ impl Drop for MappedFile {
panic!("failed to unmap file: {}", std::io::Error::last_os_error());
}
- let _ = self.file_db.file.set_len(0);
- let _ = self.file_db.file.sync_all();
+ let _ = self.file.set_len(0);
+ let _ = self.file.sync_all();
}
}
diff --git a/storage/src/db/file_db/mod.rs b/storage/src/db/file_db/mod.rs
index fbff8234a..be5c69323 100644
--- a/storage/src/db/file_db/mod.rs
+++ b/storage/src/db/file_db/mod.rs
@@ -1,60 +1,149 @@
-use std::fs::File;
-use std::io::{Read, Seek, SeekFrom, Write};
+use std::fs::{File, OpenOptions};
+use std::os::fd::AsRawFd;
use std::path::{Path, PathBuf};
+use std::sync::Arc;
-pub use mapped_file::MappedFile;
+pub use self::mapped_file::MappedFile;
+pub use self::temp_file::TempFile;
mod mapped_file;
+mod temp_file;
-pub struct FileDb {
- file: File,
- _path: PathBuf,
-}
+#[derive(Clone)]
+pub struct FileDb(Arc);
impl FileDb {
- pub fn new(path: P, options: &mut std::fs::OpenOptions) -> std::io::Result
+ pub fn new(root: P) -> Self
where
P: AsRef,
{
- let file = options.open(&path)?;
+ Self(Arc::new(FileDbInner {
+ base_dir: root.as_ref().to_path_buf(),
+ }))
+ }
- Ok(Self {
- file,
- _path: PathBuf::from(path.as_ref()),
- })
+ pub fn path(&self) -> &Path {
+ &self.0.base_dir
}
- pub fn file(&self) -> &File {
- &self.file
+ pub fn ensure_exists(&self) -> std::io::Result<()> {
+ std::fs::create_dir_all(&self.0.base_dir)
+ }
+
+ pub fn create_dir_all>(&self, rel_path: P) -> std::io::Result<()> {
+ std::fs::create_dir_all(self.0.base_dir.join(rel_path))
+ }
+
+ pub fn remove_file>(&self, rel_path: P) -> std::io::Result<()> {
+ std::fs::remove_file(self.0.base_dir.join(rel_path))
+ }
+
+ pub fn file>(&self, rel_path: P) -> FileBuilder {
+ FileBuilder {
+ path: self.0.base_dir.join(rel_path.as_ref()),
+ options: std::fs::OpenOptions::new(),
+ prealloc: None,
+ }
}
-}
-impl Write for FileDb {
- fn write(&mut self, buf: &[u8]) -> std::io::Result {
- self.file.write(buf)
+ pub fn subdir>(&self, rel_path: P) -> Self {
+ Self(Arc::new(FileDbInner {
+ base_dir: self.0.base_dir.join(rel_path),
+ }))
}
- #[inline]
- fn flush(&mut self) -> std::io::Result<()> {
- self.file.flush()
+ pub fn file_exists>(&self, rel_path: P) -> bool {
+ self.path().join(rel_path).is_file()
}
+
+ pub fn entries(&self) -> std::io::Result {
+ std::fs::read_dir(&self.0.base_dir)
+ }
+}
+
+struct FileDbInner {
+ base_dir: PathBuf,
}
-impl Read for FileDb {
- fn read(&mut self, buf: &mut [u8]) -> std::io::Result {
- let bytes = self.file.read(buf)?;
- Ok(bytes)
+pub struct FileBuilder {
+ path: PathBuf,
+ options: OpenOptions,
+ prealloc: Option,
+}
+
+impl FileBuilder {
+ pub fn open(&self) -> std::io::Result {
+ let file = self.options.open(&self.path)?;
+ if let Some(prealloc) = self.prealloc {
+ alloc_file(&file, prealloc)?;
+ }
+ Ok(file)
+ }
+
+ pub fn open_as_temp(&self) -> std::io::Result {
+ let file = self.open()?;
+ Ok(TempFile::new(self.path.clone(), file))
+ }
+
+ pub fn open_as_mapped(&self) -> std::io::Result {
+ match self.prealloc {
+ Some(length) => MappedFile::new(&self.path, length),
+ None => MappedFile::from_existing_file(self.open()?),
+ }
+ }
+
+ pub fn append(&mut self, append: bool) -> &mut Self {
+ self.options.append(append);
+ self
+ }
+
+ pub fn create(&mut self, create: bool) -> &mut Self {
+ self.options.create(create);
+ self
+ }
+
+ pub fn create_new(&mut self, create_new: bool) -> &mut Self {
+ self.options.create_new(create_new);
+ self
+ }
+
+ pub fn read(&mut self, read: bool) -> &mut Self {
+ self.options.read(read);
+ self
+ }
+
+ pub fn truncate(&mut self, truncate: bool) -> &mut Self {
+ self.options.truncate(truncate);
+ self
+ }
+
+ pub fn write(&mut self, write: bool) -> &mut Self {
+ self.options.write(write);
+ self
+ }
+
+ pub fn prealloc(&mut self, prealloc: usize) -> &mut Self {
+ self.prealloc = Some(prealloc);
+ self
}
}
-impl Seek for FileDb {
- fn seek(&mut self, pos: SeekFrom) -> std::io::Result {
- self.file.seek(pos)
+#[cfg(not(target_os = "macos"))]
+fn alloc_file(file: &File, len: usize) -> std::io::Result<()> {
+ let res = unsafe { libc::posix_fallocate(file.as_raw_fd(), 0, len as i64) };
+ if res == 0 {
+ Ok(())
+ } else {
+ Err(std::io::Error::last_os_error())
}
}
-impl From for File {
- fn from(val: FileDb) -> Self {
- val.file
+#[cfg(target_os = "macos")]
+pub fn alloc_file(file: &File, len: usize) -> std::io::Result<()> {
+ let res = unsafe { libc::ftruncate(file.as_raw_fd(), len as i64) };
+ if res < 0 {
+ Err(std::io::Error::last_os_error())
+ } else {
+ Ok(())
}
}
diff --git a/storage/src/db/file_db/temp_file.rs b/storage/src/db/file_db/temp_file.rs
new file mode 100644
index 000000000..afa233b73
--- /dev/null
+++ b/storage/src/db/file_db/temp_file.rs
@@ -0,0 +1,67 @@
+use std::fs::File;
+use std::mem::ManuallyDrop;
+use std::path::PathBuf;
+
+pub struct TempFile {
+ file: ManuallyDrop,
+ file_path: Option,
+}
+
+impl TempFile {
+ pub fn new(path: PathBuf, file: File) -> Self {
+ Self {
+ file: ManuallyDrop::new(file),
+ file_path: Some(path),
+ }
+ }
+
+ pub fn disarm(mut self) -> File {
+ self.file_path = None;
+
+ // SAFETY: File will not be dropped as `file_path` is `None`.
+ unsafe { ManuallyDrop::take(&mut self.file) }
+ }
+}
+
+impl AsRef for TempFile {
+ #[inline]
+ fn as_ref(&self) -> &File {
+ &self.file
+ }
+}
+
+impl AsMut for TempFile {
+ #[inline]
+ fn as_mut(&mut self) -> &mut File {
+ &mut self.file
+ }
+}
+
+impl std::ops::Deref for TempFile {
+ type Target = File;
+
+ #[inline]
+ fn deref(&self) -> &Self::Target {
+ &self.file
+ }
+}
+
+impl std::ops::DerefMut for TempFile {
+ #[inline]
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.file
+ }
+}
+
+impl Drop for TempFile {
+ fn drop(&mut self) {
+ if let Some(file_path) = self.file_path.take() {
+ // SAFETY: File will only be dropped once.
+ unsafe { ManuallyDrop::drop(&mut self.file) };
+
+ if let Err(e) = std::fs::remove_file(&file_path) {
+ tracing::error!(path = %file_path.display(), "failed to remove file: {e:?}");
+ }
+ }
+ }
+}
diff --git a/storage/src/db/kv_db/migrations/mod.rs b/storage/src/db/kv_db/migrations/mod.rs
index 710c980eb..17a01170f 100644
--- a/storage/src/db/kv_db/migrations/mod.rs
+++ b/storage/src/db/kv_db/migrations/mod.rs
@@ -34,7 +34,7 @@ impl VersionProvider for NodeStateVersionProvider {
let slice = version.as_ref();
slice
.try_into()
- .map_err(|_| weedb::Error::InvalidDbVersion)
+ .map_err(|_e| weedb::Error::InvalidDbVersion)
.map(Some)
}
None => Ok(None),
diff --git a/storage/src/lib.rs b/storage/src/lib.rs
index 73b31cfdc..e671a026c 100644
--- a/storage/src/lib.rs
+++ b/storage/src/lib.rs
@@ -8,11 +8,14 @@ pub use self::store::*;
mod db;
mod models;
mod store;
-mod utils;
-pub struct Storage {
- file_db_path: PathBuf,
+mod util {
+ pub use stored_value::*;
+
+ mod stored_value;
+}
+pub struct Storage {
runtime_storage: Arc,
block_handle_storage: Arc,
block_storage: Arc,
@@ -28,27 +31,24 @@ impl Storage {
file_db_path: PathBuf,
max_cell_cache_size_bytes: u64,
) -> anyhow::Result> {
+ let files_dir = FileDb::new(file_db_path);
+
let block_handle_storage = Arc::new(BlockHandleStorage::new(db.clone()));
let runtime_storage = Arc::new(RuntimeStorage::new(block_handle_storage.clone()));
let block_storage = Arc::new(BlockStorage::new(db.clone(), block_handle_storage.clone())?);
let shard_state_storage = ShardStateStorage::new(
db.clone(),
+ &files_dir,
block_handle_storage.clone(),
block_storage.clone(),
- file_db_path.clone(),
max_cell_cache_size_bytes,
)?;
- let persistent_state_storage = PersistentStateStorage::new(
- file_db_path.clone(),
- db.clone(),
- block_handle_storage.clone(),
- )?;
+ let persistent_state_storage =
+ PersistentStateStorage::new(db.clone(), &files_dir, block_handle_storage.clone())?;
let node_state_storage = NodeStateStorage::new(db.clone());
let block_connection_storage = BlockConnectionStorage::new(db);
Ok(Arc::new(Self {
- file_db_path,
-
block_handle_storage,
block_storage,
shard_state_storage,
@@ -59,32 +59,37 @@ impl Storage {
}))
}
- #[inline(always)]
+ #[inline]
pub fn runtime_storage(&self) -> &RuntimeStorage {
&self.runtime_storage
}
- #[inline(always)]
+ #[inline]
pub fn persistent_state_storage(&self) -> &PersistentStateStorage {
&self.persistent_state_storage
}
- #[inline(always)]
+ #[inline]
pub fn block_handle_storage(&self) -> &BlockHandleStorage {
&self.block_handle_storage
}
- #[inline(always)]
+ #[inline]
+ pub fn block_storage(&self) -> &BlockStorage {
+ &self.block_storage
+ }
+
+ #[inline]
pub fn block_connection_storage(&self) -> &BlockConnectionStorage {
&self.block_connection_storage
}
- #[inline(always)]
+ #[inline]
pub fn shard_state_storage(&self) -> &ShardStateStorage {
&self.shard_state_storage
}
- #[inline(always)]
+ #[inline]
pub fn node_state(&self) -> &NodeStateStorage {
&self.node_state_storage
}
diff --git a/storage/src/models/block_meta.rs b/storage/src/models/block_meta.rs
index b5bbcb4f4..3581ab629 100644
--- a/storage/src/models/block_meta.rs
+++ b/storage/src/models/block_meta.rs
@@ -4,7 +4,7 @@ use anyhow::Result;
use bytes::Buf;
use everscale_types::models::BlockInfo;
-use crate::utils::{StoredValue, StoredValueBuffer};
+use crate::util::{StoredValue, StoredValueBuffer};
#[derive(Debug, Copy, Clone)]
pub struct BlockMetaData {
@@ -30,16 +30,6 @@ pub struct BriefBlockInfo {
pub after_split: bool,
}
-impl BriefBlockInfo {
- pub fn with_mc_seqno(self, mc_seqno: u32) -> BlockMetaData {
- BlockMetaData {
- is_key_block: self.is_key_block,
- gen_utime: self.gen_utime,
- mc_ref_seqno: Some(mc_seqno),
- }
- }
-}
-
impl From<&BlockInfo> for BriefBlockInfo {
fn from(info: &BlockInfo) -> Self {
Self {
diff --git a/storage/src/store/block/mod.rs b/storage/src/store/block/mod.rs
index e9d683f1e..1e1b45efd 100644
--- a/storage/src/store/block/mod.rs
+++ b/storage/src/store/block/mod.rs
@@ -17,7 +17,7 @@ use tycho_block_util::block::{
};
use crate::db::*;
-use crate::utils::*;
+use crate::util::*;
use crate::{models::*, BlockHandleStorage, HandleCreationStatus};
pub struct BlockStorage {
diff --git a/storage/src/store/block_connection/mod.rs b/storage/src/store/block_connection/mod.rs
index 3a6dd2f55..17d0a139c 100644
--- a/storage/src/store/block_connection/mod.rs
+++ b/storage/src/store/block_connection/mod.rs
@@ -5,7 +5,7 @@ use everscale_types::models::*;
use crate::db::*;
use crate::models::*;
-use crate::utils::*;
+use crate::util::*;
/// Stores relations between blocks
pub struct BlockConnectionStorage {
diff --git a/storage/src/store/block_handle/mod.rs b/storage/src/store/block_handle/mod.rs
index 1790047c9..15341fed9 100644
--- a/storage/src/store/block_handle/mod.rs
+++ b/storage/src/store/block_handle/mod.rs
@@ -8,7 +8,7 @@ use tycho_util::FastDashMap;
use crate::db::*;
use crate::models::*;
-use crate::utils::*;
+use crate::util::*;
pub struct BlockHandleStorage {
db: Arc,
diff --git a/storage/src/store/node_state/mod.rs b/storage/src/store/node_state/mod.rs
index a9ae5926a..d8e3f4fcb 100644
--- a/storage/src/store/node_state/mod.rs
+++ b/storage/src/store/node_state/mod.rs
@@ -5,7 +5,7 @@ use everscale_types::models::*;
use parking_lot::Mutex;
use crate::db::*;
-use crate::utils::*;
+use crate::util::*;
pub struct NodeStateStorage {
db: Arc,
diff --git a/storage/src/store/persistent_state/cell_writer.rs b/storage/src/store/persistent_state/cell_writer.rs
index 036ba6a19..bd3f6f3dd 100644
--- a/storage/src/store/persistent_state/cell_writer.rs
+++ b/storage/src/store/persistent_state/cell_writer.rs
@@ -1,46 +1,38 @@
use std::collections::hash_map;
-use std::fs;
-use std::fs::File;
use std::io::{Read, Seek, SeekFrom, Write};
-use std::os::unix::io::AsRawFd;
-use std::path::{Path, PathBuf};
+use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use anyhow::{Context, Result};
-use everscale_types::cell::CellDescriptor;
+use everscale_types::cell::{CellDescriptor, HashBytes};
use smallvec::SmallVec;
-
use tycho_util::FastHashMap;
-use crate::db::Db;
-use crate::FileDb;
+use crate::db::{Db, FileDb, TempFile};
pub struct CellWriter<'a> {
db: &'a Db,
- base_path: &'a Path,
+ states_dir: &'a FileDb,
+ block_root_hash: &'a HashBytes,
}
impl<'a> CellWriter<'a> {
#[allow(unused)]
- pub fn new(db: &'a Db, base_path: &'a Path) -> Self {
- Self { db, base_path }
+ pub fn new(db: &'a Db, states_dir: &'a FileDb, block_root_hash: &'a HashBytes) -> Self {
+ Self {
+ db,
+ states_dir,
+ block_root_hash,
+ }
}
#[allow(unused)]
pub fn write(&self, root_hash: &[u8; 32], is_cancelled: Option>) -> Result<()> {
- // Open target file in advance to get the error immediately (if any)
- let file_db = FileDb::new(
- self.base_path,
- fs::OpenOptions::new()
- .write(true)
- .create(true)
- .truncate(true),
- )?;
-
// Load cells from db in reverse order into the temp file
tracing::info!("started loading cells");
- let mut intermediate = write_rev_cells(self.db, self.base_path, root_hash, &is_cancelled)
+ let mut intermediate = self
+ .write_rev(root_hash, &is_cancelled)
.context("Failed to write reversed cells data")?;
tracing::info!("finished loading cells");
let cell_count = intermediate.cell_sizes.len() as u32;
@@ -49,14 +41,22 @@ impl<'a> CellWriter<'a> {
let offset_size =
std::cmp::min(number_of_bytes_to_fit(intermediate.total_size), 8) as usize;
- // Reserve space for the file
- alloc_file(
- file_db.file(),
- 22 + offset_size * (1 + cell_count as usize) + (intermediate.total_size as usize),
- )?;
+ // Compute file size
+ let file_size =
+ 22 + offset_size * (1 + cell_count as usize) + (intermediate.total_size as usize);
+
+ // Create states file
+ let mut file = self
+ .states_dir
+ .file(self.file_name())
+ .create(true)
+ .write(true)
+ .truncate(true)
+ .prealloc(file_size)
+ .open()?;
// Write cells data in BOC format
- let mut buffer = std::io::BufWriter::with_capacity(FILE_BUFFER_LEN / 2, file_db.file());
+ let mut buffer = std::io::BufWriter::with_capacity(FILE_BUFFER_LEN / 2, file);
// Header | current len: 0
let flags = 0b1000_0000u8 | (REF_SIZE as u8);
@@ -128,194 +128,194 @@ impl<'a> CellWriter<'a> {
}
pub fn remove(&self) -> Result<()> {
- fs::remove_file(self.base_path).context(format!(
- "Failed to remove persistent state file {:?}",
- self.base_path
+ let file_name = self.file_name();
+ self.states_dir.remove_file(&file_name).context(format!(
+ "Failed to remove persistent state file {}",
+ self.states_dir.path().join(file_name).display()
))
}
-}
-struct IntermediateState {
- file: File,
- cell_sizes: Vec,
- total_size: u64,
- _remove_on_drop: RemoveOnDrop,
-}
-
-fn write_rev_cells>(
- db: &Db,
- base_path: P,
- root_hash: &[u8; 32],
- is_cancelled: &Option>,
-) -> Result {
- enum StackItem {
- New([u8; 32]),
- Loaded(LoadedCell),
- }
-
- struct LoadedCell {
- hash: [u8; 32],
- descriptor: CellDescriptor,
- data: SmallVec<[u8; 128]>,
- indices: SmallVec<[u32; 4]>,
- }
+ fn write_rev(
+ &self,
+ root_hash: &[u8; 32],
+ is_cancelled: &Option>,
+ ) -> Result {
+ enum StackItem {
+ New([u8; 32]),
+ Loaded(LoadedCell),
+ }
- let file_path = base_path.as_ref().with_extension("temp");
+ struct LoadedCell {
+ hash: [u8; 32],
+ descriptor: CellDescriptor,
+ data: SmallVec<[u8; 128]>,
+ indices: SmallVec<[u32; 4]>,
+ }
- let file_db = FileDb::new(
- &file_path,
- fs::OpenOptions::new()
- .read(true)
- .write(true)
+ let mut file = self
+ .states_dir
+ .file(self.file_name().with_extension("temp"))
.create(true)
- .truncate(true),
- )?;
- let remove_on_drop = RemoveOnDrop(file_path);
+ .write(true)
+ .read(true)
+ .truncate(true)
+ .open_as_temp()?;
- let raw = db.raw().as_ref();
- let read_options = db.cells.read_config();
- let cf = db.cells.cf();
+ let raw = self.db.raw().as_ref();
+ let read_options = self.db.cells.read_config();
+ let cf = self.db.cells.cf();
- let mut references_buffer = SmallVec::<[[u8; 32]; 4]>::with_capacity(4);
+ let mut references_buffer = SmallVec::<[[u8; 32]; 4]>::with_capacity(4);
- let mut indices = FastHashMap::default();
- let mut remap = FastHashMap::default();
- let mut cell_sizes = Vec::::with_capacity(FILE_BUFFER_LEN);
- let mut stack = Vec::with_capacity(32);
+ let mut indices = FastHashMap::default();
+ let mut remap = FastHashMap::default();
+ let mut cell_sizes = Vec::::with_capacity(FILE_BUFFER_LEN);
+ let mut stack = Vec::with_capacity(32);
- let mut total_size = 0u64;
- let mut iteration = 0u32;
- let mut remap_index = 0u32;
+ let mut total_size = 0u64;
+ let mut iteration = 0u32;
+ let mut remap_index = 0u32;
- stack.push((iteration, StackItem::New(*root_hash)));
- indices.insert(*root_hash, (iteration, false));
+ stack.push((iteration, StackItem::New(*root_hash)));
+ indices.insert(*root_hash, (iteration, false));
- let mut temp_file_buffer = std::io::BufWriter::with_capacity(FILE_BUFFER_LEN, file_db.into());
+ let mut temp_file_buffer = std::io::BufWriter::with_capacity(FILE_BUFFER_LEN, &mut *file);
- while let Some((index, data)) = stack.pop() {
- if let Some(is_cancelled) = is_cancelled {
- if iteration % 1000 == 0 && is_cancelled.load(Ordering::Relaxed) {
- anyhow::bail!("Persistent state writing cancelled.")
+ while let Some((index, data)) = stack.pop() {
+ if let Some(is_cancelled) = is_cancelled {
+ if iteration % 1000 == 0 && is_cancelled.load(Ordering::Relaxed) {
+ anyhow::bail!("Persistent state writing cancelled.")
+ }
}
- }
- match data {
- StackItem::New(hash) => {
- let value = raw
- .get_pinned_cf_opt(&cf, hash, read_options)?
- .ok_or(CellWriterError::CellNotFound)?;
+ match data {
+ StackItem::New(hash) => {
+ let value = raw
+ .get_pinned_cf_opt(&cf, hash, read_options)?
+ .ok_or(CellWriterError::CellNotFound)?;
- let value = match crate::refcount::strip_refcount(value.as_ref()) {
- Some(bytes) => bytes,
- None => {
- return Err(CellWriterError::CellNotFound.into());
+ let value = match crate::refcount::strip_refcount(value.as_ref()) {
+ Some(bytes) => bytes,
+ None => {
+ return Err(CellWriterError::CellNotFound.into());
+ }
+ };
+ if value.is_empty() {
+ return Err(CellWriterError::InvalidCell.into());
}
- };
- if value.is_empty() {
- return Err(CellWriterError::InvalidCell.into());
- }
- let (descriptor, data) = deserialize_cell(value, &mut references_buffer)
- .ok_or(CellWriterError::InvalidCell)?;
+ let (descriptor, data) = deserialize_cell(value, &mut references_buffer)
+ .ok_or(CellWriterError::InvalidCell)?;
- let mut reference_indices = SmallVec::with_capacity(references_buffer.len());
+ let mut reference_indices = SmallVec::with_capacity(references_buffer.len());
- let mut indices_buffer = [0; 4];
- let mut keys = [std::ptr::null(); 4];
- let mut preload_count = 0;
+ let mut indices_buffer = [0; 4];
+ let mut keys = [std::ptr::null(); 4];
+ let mut preload_count = 0;
- for hash in &references_buffer {
- let index = match indices.entry(*hash) {
- hash_map::Entry::Vacant(entry) => {
- remap_index += 1;
-
- entry.insert((remap_index, false));
+ for hash in &references_buffer {
+ let index = match indices.entry(*hash) {
+ hash_map::Entry::Vacant(entry) => {
+ remap_index += 1;
- indices_buffer[preload_count] = remap_index;
- keys[preload_count] = hash.as_ptr();
- preload_count += 1;
+ entry.insert((remap_index, false));
- remap_index
- }
- hash_map::Entry::Occupied(entry) => {
- let (remap_index, written) = *entry.get();
- if !written {
indices_buffer[preload_count] = remap_index;
keys[preload_count] = hash.as_ptr();
preload_count += 1;
- }
- remap_index
- }
- };
- reference_indices.push(index);
- }
+ remap_index
+ }
+ hash_map::Entry::Occupied(entry) => {
+ let (remap_index, written) = *entry.get();
+ if !written {
+ indices_buffer[preload_count] = remap_index;
+ keys[preload_count] = hash.as_ptr();
+ preload_count += 1;
+ }
+ remap_index
+ }
+ };
- stack.push((
- index,
- StackItem::Loaded(LoadedCell {
- hash,
- descriptor,
- data: SmallVec::from_slice(data),
- indices: reference_indices,
- }),
- ));
-
- if preload_count > 0 {
- indices_buffer[..preload_count].reverse();
- keys[..preload_count].reverse();
-
- for i in 0..preload_count {
- let index = indices_buffer[i];
- let hash = unsafe { *keys[i].cast::<[u8; 32]>() };
- stack.push((index, StackItem::New(hash)));
+ reference_indices.push(index);
}
- }
- references_buffer.clear();
- }
- StackItem::Loaded(loaded) => {
- match remap.entry(index) {
- hash_map::Entry::Vacant(entry) => {
- entry.insert(iteration.to_be_bytes());
+ stack.push((
+ index,
+ StackItem::Loaded(LoadedCell {
+ hash,
+ descriptor,
+ data: SmallVec::from_slice(data),
+ indices: reference_indices,
+ }),
+ ));
+
+ if preload_count > 0 {
+ indices_buffer[..preload_count].reverse();
+ keys[..preload_count].reverse();
+
+ for i in 0..preload_count {
+ let index = indices_buffer[i];
+ let hash = unsafe { *keys[i].cast::<[u8; 32]>() };
+ stack.push((index, StackItem::New(hash)));
+ }
}
- hash_map::Entry::Occupied(_) => continue,
- };
- if let Some((_, written)) = indices.get_mut(&loaded.hash) {
- *written = true;
+ references_buffer.clear();
}
+ StackItem::Loaded(loaded) => {
+ match remap.entry(index) {
+ hash_map::Entry::Vacant(entry) => {
+ entry.insert(iteration.to_be_bytes());
+ }
+ hash_map::Entry::Occupied(_) => continue,
+ };
- iteration += 1;
- if iteration % 100000 == 0 {
- tracing::info!(iteration);
- }
+ if let Some((_, written)) = indices.get_mut(&loaded.hash) {
+ *written = true;
+ }
- let cell_size = 2 + loaded.data.len() + loaded.indices.len() * REF_SIZE;
- cell_sizes.push(cell_size as u8);
- total_size += cell_size as u64;
-
- temp_file_buffer.write_all(&[loaded.descriptor.d1, loaded.descriptor.d2])?;
- temp_file_buffer.write_all(&loaded.data)?;
- for index in loaded.indices {
- let index = remap.get(&index).with_context(|| {
- format!("Child not found. Iteration {iteration}. Child {index}")
- })?;
- temp_file_buffer.write_all(index)?;
+ iteration += 1;
+ if iteration % 100000 == 0 {
+ tracing::info!(iteration);
+ }
+
+ let cell_size = 2 + loaded.data.len() + loaded.indices.len() * REF_SIZE;
+ cell_sizes.push(cell_size as u8);
+ total_size += cell_size as u64;
+
+ temp_file_buffer.write_all(&[loaded.descriptor.d1, loaded.descriptor.d2])?;
+ temp_file_buffer.write_all(&loaded.data)?;
+ for index in loaded.indices {
+ let index = remap.get(&index).with_context(|| {
+ format!("Child not found. Iteration {iteration}. Child {index}")
+ })?;
+ temp_file_buffer.write_all(index)?;
+ }
}
}
}
+
+ drop(temp_file_buffer);
+
+ file.flush()?;
+
+ Ok(IntermediateState {
+ file,
+ cell_sizes,
+ total_size,
+ })
}
- let mut file: File = temp_file_buffer.into_inner()?;
- file.flush()?;
+ fn file_name(&self) -> PathBuf {
+ PathBuf::from(self.block_root_hash.to_string())
+ }
+}
- Ok(IntermediateState {
- file,
- cell_sizes,
- total_size,
- _remove_on_drop: remove_on_drop,
- })
+struct IntermediateState {
+ file: TempFile,
+ cell_sizes: Vec,
+ total_size: u64,
}
fn deserialize_cell<'a>(
@@ -355,40 +355,10 @@ fn deserialize_cell<'a>(
Some((descriptor, data))
}
-#[cfg(not(target_os = "macos"))]
-fn alloc_file(file: &File, len: usize) -> std::io::Result<()> {
- let res = unsafe { libc::posix_fallocate(file.as_raw_fd(), 0, len as i64) };
- if res == 0 {
- Ok(())
- } else {
- Err(std::io::Error::last_os_error())
- }
-}
-
-#[cfg(target_os = "macos")]
-pub fn alloc_file(file: &File, len: usize) -> std::io::Result<()> {
- let res = unsafe { libc::ftruncate(file.as_raw_fd(), len as i64) };
- if res < 0 {
- Err(std::io::Error::last_os_error())
- } else {
- Ok(())
- }
-}
-
fn number_of_bytes_to_fit(l: u64) -> u32 {
8 - l.leading_zeros() / 8
}
-struct RemoveOnDrop(PathBuf);
-
-impl Drop for RemoveOnDrop {
- fn drop(&mut self) {
- if let Err(e) = fs::remove_file(&self.0) {
- tracing::error!(path = %self.0.display(), "failed to remove file: {e:?}");
- }
- }
-}
-
struct Index {
value_len: usize,
offset: usize,
diff --git a/storage/src/store/persistent_state/mod.rs b/storage/src/store/persistent_state/mod.rs
index 038d68e8c..da6858b2c 100644
--- a/storage/src/store/persistent_state/mod.rs
+++ b/storage/src/store/persistent_state/mod.rs
@@ -1,4 +1,3 @@
-use std::fs;
use std::io::{BufReader, Read, Seek, SeekFrom};
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
@@ -17,28 +16,30 @@ use crate::FileDb;
mod cell_writer;
const KEY_BLOCK_UTIME_STEP: u32 = 86400;
+const BASE_DIR: &str = "states";
pub struct PersistentStateStorage {
- block_handle_storage: Arc,
- storage_path: PathBuf,
db: Arc,
+ storage_dir: FileDb,
+ block_handle_storage: Arc,
is_cancelled: Arc,
}
impl PersistentStateStorage {
pub fn new(
- file_db_path: PathBuf,
db: Arc,
+ files_dir: &FileDb,
block_handle_storage: Arc,
) -> Result {
- let dir = file_db_path.join("states");
- fs::create_dir_all(&dir)?;
- let is_cancelled = Arc::new(Default::default());
+ let storage_dir = files_dir.subdir(BASE_DIR);
+ storage_dir.ensure_exists()?;
+
+ let is_cancelled = Arc::new(AtomicBool::new(false));
Ok(Self {
- block_handle_storage,
- storage_path: dir,
db,
+ storage_dir,
+ block_handle_storage,
is_cancelled,
})
}
@@ -51,12 +52,13 @@ impl PersistentStateStorage {
) -> Result<()> {
let block_id = *block_id;
let root_hash = *root_hash;
- let db = self.db.clone();
let is_cancelled = Some(self.is_cancelled.clone());
- let base_path = self.get_state_file_path(mc_block_id, &block_id);
+
+ let db = self.db.clone();
+ let states_dir = self.prepare_persistent_states_dir(mc_block_id)?;
tokio::task::spawn_blocking(move || {
- let cell_writer = cell_writer::CellWriter::new(&db, &base_path);
+ let cell_writer = cell_writer::CellWriter::new(&db, &states_dir, &block_id.root_hash);
match cell_writer.write(&root_hash.0, is_cancelled) {
Ok(()) => {
tracing::info!(
@@ -87,18 +89,20 @@ impl PersistentStateStorage {
offset: u64,
size: u64,
) -> Option {
- let path = self.get_state_file_path(mc_block_id, block_id);
+ let path = self
+ .mc_states_dir(mc_block_id)
+ .join(block_id.root_hash.to_string());
tokio::task::spawn_blocking(move || {
// TODO: cache file handles
- let mut file_db = FileDb::new(path, fs::OpenOptions::new().read(true)).ok()?;
+ let mut file = std::fs::OpenOptions::new().read(true).open(path).ok()?;
- if let Err(e) = file_db.seek(SeekFrom::Start(offset)) {
+ if let Err(e) = file.seek(SeekFrom::Start(offset)) {
tracing::error!("failed to seek state file offset: {e:?}");
return None;
}
- let mut buf_reader = BufReader::new(file_db.file());
+ let mut buf_reader = BufReader::new(file);
let mut result = BytesMut::zeroed(size as usize);
let mut result_cursor = 0;
@@ -133,24 +137,22 @@ impl PersistentStateStorage {
pub fn state_exists(&self, mc_block_id: &BlockId, block_id: &BlockId) -> bool {
// TODO: cache file handles
- self.get_state_file_path(mc_block_id, block_id).is_file()
+ self.mc_states_dir(mc_block_id)
+ .join(block_id.root_hash.to_string())
+ .is_file()
}
- pub fn prepare_persistent_states_dir(&self, mc_block: &BlockId) -> Result<()> {
- let dir_path = mc_block.seqno.to_string();
- let path = self.storage_path.join(dir_path);
- if !path.exists() {
+ pub fn prepare_persistent_states_dir(&self, mc_block: &BlockId) -> Result {
+ let states_dir = self.storage_dir.subdir(mc_block.seqno.to_string());
+ if !states_dir.path().is_dir() {
tracing::info!(mc_block = %mc_block, "creating persistent state directory");
- fs::create_dir(path)?;
+ states_dir.ensure_exists()?;
}
- Ok(())
+ Ok(states_dir)
}
- fn get_state_file_path(&self, mc_block_id: &BlockId, block_id: &BlockId) -> PathBuf {
- self.storage_path
- .clone()
- .join(mc_block_id.seqno.to_string())
- .join(block_id.root_hash.to_string())
+ fn mc_states_dir(&self, mc_block_id: &BlockId) -> PathBuf {
+ self.storage_dir.path().join(mc_block_id.seqno.to_string())
}
pub fn cancel(&self) {
@@ -197,7 +199,7 @@ impl PersistentStateStorage {
let mut directories_to_remove: Vec = Vec::new();
let mut files_to_remove: Vec = Vec::new();
- for entry in fs::read_dir(&self.storage_path)?.flatten() {
+ for entry in self.storage_dir.entries()?.flatten() {
let path = entry.path();
if path.is_file() {
@@ -220,14 +222,14 @@ impl PersistentStateStorage {
for dir in directories_to_remove {
tracing::info!(dir = %dir.display(), "removing an old persistent state directory");
- if let Err(e) = fs::remove_dir_all(&dir) {
+ if let Err(e) = std::fs::remove_dir_all(&dir) {
tracing::error!(dir = %dir.display(), "failed to remove an old persistent state: {e:?}");
}
}
for file in files_to_remove {
tracing::info!(file = %file.display(), "removing file");
- if let Err(e) = fs::remove_file(&file) {
+ if let Err(e) = std::fs::remove_file(&file) {
tracing::error!(file = %file.display(), "failed to remove file: {e:?}");
}
}
diff --git a/storage/src/store/shard_state/mod.rs b/storage/src/store/shard_state/mod.rs
index 8533eb200..f4a3bdeda 100644
--- a/storage/src/store/shard_state/mod.rs
+++ b/storage/src/store/shard_state/mod.rs
@@ -1,4 +1,3 @@
-use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Instant;
@@ -13,7 +12,7 @@ use self::cell_storage::*;
use self::replace_transaction::ShardStateReplaceTransaction;
use crate::db::*;
-use crate::utils::*;
+use crate::util::*;
use crate::{models::BlockHandle, BlockHandleStorage, BlockStorage};
mod cell_storage;
@@ -21,13 +20,15 @@ mod entries_buffer;
mod replace_transaction;
mod shard_state_reader;
+const DOWNLOADS_DIR: &str = "downloads";
+
pub struct ShardStateStorage {
db: Arc,
+ downloads_dir: FileDb,
block_handle_storage: Arc,
block_storage: Arc,
cell_storage: Arc,
- downloads_dir: Arc,
gc_lock: tokio::sync::Mutex<()>,
min_ref_mc_state: Arc,
@@ -38,12 +39,14 @@ pub struct ShardStateStorage {
impl ShardStateStorage {
pub fn new(
db: Arc,
+ files_dir: &FileDb,
block_handle_storage: Arc,
block_storage: Arc,
- file_db_path: PathBuf,
cache_size_bytes: u64,
) -> Result {
- let downloads_dir = prepare_file_db_dir(file_db_path, "downloads")?;
+ let downloads_dir = files_dir.subdir(DOWNLOADS_DIR);
+ downloads_dir.ensure_exists()?;
+
let cell_storage = CellStorage::new(db.clone(), cache_size_bytes);
let res = Self {
@@ -155,9 +158,9 @@ impl ShardStateStorage {
pub fn begin_replace(&'_ self, block_id: &BlockId) -> Result> {
ShardStateReplaceTransaction::new(
&self.db,
+ &self.downloads_dir,
&self.cell_storage,
&self.min_ref_mc_state,
- self.downloads_dir.as_ref(),
block_id,
)
}
@@ -359,12 +362,6 @@ pub struct ShardStateStorageMetrics {
pub max_new_sc_cell_count: usize,
}
-fn prepare_file_db_dir(file_db_path: PathBuf, folder: &str) -> Result> {
- let dir = Arc::new(file_db_path.join(folder));
- std::fs::create_dir_all(dir.as_ref())?;
- Ok(dir)
-}
-
#[derive(thiserror::Error, Debug)]
enum ShardStateStorageError {
#[error("Not found")]
diff --git a/storage/src/store/shard_state/replace_transaction.rs b/storage/src/store/shard_state/replace_transaction.rs
index 8ffde75e6..b33421ddc 100644
--- a/storage/src/store/shard_state/replace_transaction.rs
+++ b/storage/src/store/shard_state/replace_transaction.rs
@@ -1,5 +1,6 @@
+use std::fs::File;
use std::io::Write;
-use std::path::{Path, PathBuf};
+use std::path::PathBuf;
use std::sync::Arc;
use anyhow::{Context, Result};
@@ -10,7 +11,7 @@ use super::cell_storage::*;
use super::entries_buffer::*;
use super::shard_state_reader::*;
use crate::db::*;
-use crate::utils::*;
+use crate::util::*;
use tycho_block_util::state::*;
use tycho_util::progress_bar::*;
@@ -27,17 +28,14 @@ pub struct ShardStateReplaceTransaction<'a> {
}
impl<'a> ShardStateReplaceTransaction<'a> {
- pub fn new(
+ pub fn new(
db: &'a Db,
+ downloads_dir: &FileDb,
cell_storage: &'a Arc,
min_ref_mc_state: &'a Arc,
- path: P,
block_id: &BlockId,
- ) -> Result
- where
- P: AsRef,
- {
- let file_ctx = FilesContext::new(path, block_id)?;
+ ) -> Result {
+ let file_ctx = FilesContext::new(downloads_dir, block_id)?;
Ok(Self {
db,
@@ -475,14 +473,11 @@ impl<'a> FinalizationContext<'a> {
struct FilesContext {
cells_path: PathBuf,
hashes_path: PathBuf,
- cells_file: Option,
+ cells_file: Option,
}
impl FilesContext {
- pub fn new(root_path: P, block_id: &BlockId) -> Result
- where
- P: AsRef,
- {
+ pub fn new(downloads_dir: &FileDb, block_id: &BlockId) -> Result {
let block_id = format!(
"({},{:016x},{})",
block_id.shard.workchain(),
@@ -490,26 +485,25 @@ impl FilesContext {
block_id.seqno
);
- let cells_path = root_path.as_ref().join(format!("state_cells_{block_id}"));
- let hashes_path = root_path.as_ref().join(format!("state_hashes_{block_id}"));
+ let cells_file_name = format!("state_cells_{block_id}");
+ let hashes_file_name = format!("state_hashes_{block_id}");
- let cells_file = Some(FileDb::new(
- &cells_path,
- std::fs::OpenOptions::new()
- .write(true)
- .create(true)
- .truncate(true)
- .read(true),
- )?);
+ let cells_file = downloads_dir
+ .file(&cells_file_name)
+ .write(true)
+ .create(true)
+ .truncate(true)
+ .read(true)
+ .open()?;
Ok(Self {
- cells_file,
- cells_path,
- hashes_path,
+ cells_path: downloads_dir.path().join(cells_file_name),
+ hashes_path: downloads_dir.path().join(hashes_file_name),
+ cells_file: Some(cells_file),
})
}
- pub fn cells_file(&mut self) -> Result<&mut FileDb> {
+ pub fn cells_file(&mut self) -> Result<&mut File> {
match &mut self.cells_file {
Some(file) => Ok(file),
None => Err(FilesContextError::AlreadyFinalized.into()),
diff --git a/storage/src/utils/stored_value.rs b/storage/src/util/stored_value.rs
similarity index 97%
rename from storage/src/utils/stored_value.rs
rename to storage/src/util/stored_value.rs
index deaea8198..c37374412 100644
--- a/storage/src/utils/stored_value.rs
+++ b/storage/src/util/stored_value.rs
@@ -4,7 +4,6 @@ use smallvec::SmallVec;
use anyhow::Result;
use everscale_types::cell::HashBytes;
use everscale_types::models::{BlockId, BlockIdShort, ShardIdent};
-use tycho_util::byte_reader::ByteOrderRead;
/// A trait for writing or reading data from a stack-allocated buffer
pub trait StoredValue {
@@ -104,8 +103,12 @@ impl StoredValue for BlockId {
let shard = ShardIdent::deserialize(reader)?;
let seqno = reader.get_u32();
- let root_hash = HashBytes::from(reader.read_u256()?);
- let file_hash = HashBytes::from(reader.read_u256()?);
+
+ let mut root_hash = HashBytes::default();
+ root_hash.0.copy_from_slice(&reader[..32]);
+ let mut file_hash = HashBytes::default();
+ file_hash.0.copy_from_slice(&reader[32..]);
+
Ok(Self {
shard,
seqno,
diff --git a/storage/src/utils/mod.rs b/storage/src/utils/mod.rs
deleted file mode 100644
index d28b57e54..000000000
--- a/storage/src/utils/mod.rs
+++ /dev/null
@@ -1,3 +0,0 @@
-pub use self::stored_value::*;
-
-mod stored_value;
diff --git a/storage/tests/global-config.json b/storage/tests/global-config.json
deleted file mode 100644
index bd4533a2f..000000000
--- a/storage/tests/global-config.json
+++ /dev/null
@@ -1,22 +0,0 @@
-{
- "@type": "config.global",
- "dht": {
- "@type": "dht.config.global",
- "k": 6,
- "a": 3,
- "static_nodes": {
- "@type": "dht.nodes",
- "nodes": []
- }
- },
- "validator": {
- "@type": "validator.config.global",
- "zero_state": {
- "workchain": -1,
- "shard": -9223372036854775808,
- "seqno": 0,
- "root_hash": "WP/KGheNr/cF3lQhblQzyb0ufYUAcNM004mXhHq56EU=",
- "file_hash": "0nC4eylStbp9qnCq8KjDYb789NjS25L5ZA1UQwcIOOQ="
- }
- }
-}
\ No newline at end of file
diff --git a/storage/tests/mod.rs b/storage/tests/mod.rs
index 46294c55a..06ccfc2c8 100644
--- a/storage/tests/mod.rs
+++ b/storage/tests/mod.rs
@@ -1,11 +1,10 @@
-use anyhow::{anyhow, Result};
-use base64::prelude::BASE64_STANDARD;
-use base64::Engine;
+use std::str::FromStr;
+
+use anyhow::Result;
use bytesize::ByteSize;
use everscale_types::boc::Boc;
-use everscale_types::cell::{Cell, DynCell, HashBytes};
-use everscale_types::models::{BlockId, ShardIdent, ShardState};
-use serde::{Deserialize, Deserializer};
+use everscale_types::cell::{Cell, DynCell};
+use everscale_types::models::{BlockId, ShardState};
use tycho_block_util::state::ShardStateStuff;
use tycho_storage::{BlockMetaData, Db, DbOptions, Storage};
@@ -38,104 +37,6 @@ impl ShardStateCombined {
}
}
-#[derive(Deserialize)]
-struct GlobalConfigJson {
- validator: ValidatorJson,
-}
-
-#[derive(Deserialize)]
-struct ValidatorJson {
- zero_state: BlockIdJson,
-}
-
-#[derive(Debug, Default)]
-pub struct BlockIdJson {
- workchain: i32,
- shard: u64,
- seqno: u32,
- root_hash: HashBytes,
- file_hash: HashBytes,
-}
-
-impl<'de> Deserialize<'de> for BlockIdJson {
- fn deserialize(deserializer: D) -> std::result::Result
- where
- D: Deserializer<'de>,
- {
- use serde::de::Error;
-
- #[derive(Deserialize)]
- struct BlockIdJsonHelper {
- workchain: i32,
- shard: i64,
- seqno: u32,
- root_hash: String,
- file_hash: String,
- }
-
- let BlockIdJsonHelper {
- workchain,
- shard,
- seqno,
- root_hash,
- file_hash,
- } = BlockIdJsonHelper::deserialize(deserializer)?;
-
- let shard = shard as u64;
-
- let mut result = Self {
- workchain,
- shard,
- seqno,
- ..Default::default()
- };
-
- result.root_hash =
- HashBytes::from_slice(&BASE64_STANDARD.decode(root_hash).map_err(Error::custom)?);
-
- result.file_hash =
- HashBytes::from_slice(&BASE64_STANDARD.decode(file_hash).map_err(Error::custom)?);
-
- Ok(result)
- }
-}
-
-impl TryFrom for BlockId {
- type Error = anyhow::Error;
-
- fn try_from(value: BlockIdJson) -> Result {
- Ok(Self {
- shard: ShardIdent::new(value.workchain, value.shard)
- .ok_or(anyhow!("Invalid ShardIdent"))?,
- seqno: value.seqno,
- root_hash: value.root_hash,
- file_hash: value.file_hash,
- })
- }
-}
-
-#[derive(Debug)]
-struct GlobalConfig {
- block_id: BlockId,
-}
-
-impl GlobalConfig {
- pub fn from_file(path: impl AsRef) -> Result {
- let data = std::fs::read_to_string(path.as_ref())?;
- Ok(serde_json::from_str::(&data)?.try_into()?)
- }
-}
-
-impl TryFrom for GlobalConfig {
- type Error = anyhow::Error;
-
- fn try_from(value: GlobalConfigJson) -> Result {
- Ok(Self {
- block_id: value.validator.zero_state.try_into()?,
- })
- }
-}
-
fn compare_cells(orig_cell: &DynCell, stored_cell: &DynCell) {
assert_eq!(orig_cell.repr_hash(), stored_cell.repr_hash());
@@ -177,17 +78,17 @@ async fn persistent_storage_everscale() -> Result<()> {
// Read zerostate
let zero_state_raw = ShardStateCombined::from_file("tests/everscale_zerostate.boc")?;
- // Read global config
- let global_config = GlobalConfig::from_file("tests/global-config.json")?;
+ // Parse block id
+ let block_id = BlockId::from_str("-1:8000000000000000:0:58ffca1a178daff705de54216e5433c9bd2e7d850070d334d38997847ab9e845:d270b87b2952b5ba7daa70aaf0a8c361befcf4d8d2db92f9640d5443070838e4")?;
// Write zerostate to db
let (handle, _) = storage.block_handle_storage().create_or_load_handle(
- &global_config.block_id,
+ &block_id,
BlockMetaData::zero_state(zero_state_raw.gen_utime().unwrap()),
)?;
let zerostate = ShardStateStuff::new(
- global_config.block_id,
+ block_id,
zero_state_raw.cell.clone(),
storage.shard_state_storage().min_ref_mc_state(),
)?;
diff --git a/util/src/byte_reader.rs b/util/src/byte_reader.rs
deleted file mode 100644
index 5eb65aeda..000000000
--- a/util/src/byte_reader.rs
+++ /dev/null
@@ -1,11 +0,0 @@
-pub trait ByteOrderRead {
- fn read_u256(&mut self) -> std::io::Result<[u8; 32]>;
-}
-
-impl ByteOrderRead for T {
- fn read_u256(&mut self) -> std::io::Result<[u8; 32]> {
- let mut buf = [0; 32];
- self.read_exact(&mut buf)?;
- Ok(buf)
- }
-}
diff --git a/util/src/lib.rs b/util/src/lib.rs
index 1aa5a8d6e..40cbda55e 100644
--- a/util/src/lib.rs
+++ b/util/src/lib.rs
@@ -1,7 +1,6 @@
use std::collections::HashMap;
use std::collections::HashSet;
-pub mod byte_reader;
pub mod progress_bar;
pub mod serde_helpers;
pub mod time;