Skip to content

Commit

Permalink
feat(cli): archive utils for normal sync
Browse files Browse the repository at this point in the history
  • Loading branch information
pashinov committed Jun 21, 2024
1 parent 79b66c6 commit 6b2569b
Show file tree
Hide file tree
Showing 7 changed files with 384 additions and 111 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions block-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ arc-swap = { workspace = true }
bytes = { workspace = true }
everscale-types = { workspace = true }
hex = { workspace = true }
libc = { workspace = true }
parking_lot = { workspace = true }
sha2 = { workspace = true }
smallvec = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }

# local deps
tycho-util = { workspace = true }
Expand Down
77 changes: 77 additions & 0 deletions block-util/src/archive/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,92 @@
use std::collections::BTreeMap;

use bytes::Bytes;
use everscale_types::models::{Block, BlockId, BlockProof};

pub use self::entry_id::{ArchiveEntryId, GetFileName};
pub use self::reader::{ArchiveEntry, ArchiveReader, ArchiveReaderError, ArchiveVerifier};
use crate::block::{BlockProofStuff, BlockStuff};

mod entry_id;
mod reader;
mod writer;

pub const ARCHIVE_PREFIX: [u8; 4] = u32::to_le_bytes(0xae8fdd01);
pub const ARCHIVE_ENTRY_PREFIX: [u8; 2] = u16::to_le_bytes(0x1e8b);
pub const ARCHIVE_ENTRY_HEADER_LEN: usize = ARCHIVE_ENTRY_PREFIX.len() + 2 + 4; // magic + filename len + data len

pub struct Archive {
pub mc_block_ids: BTreeMap<u32, BlockId>,
pub blocks: BTreeMap<BlockId, ArchiveDataEntry>,
}

impl Archive {
pub fn new(data: &[u8]) -> anyhow::Result<Self> {
let reader = ArchiveReader::new(data)?;

let mut res = Archive {
mc_block_ids: Default::default(),
blocks: Default::default(),
};

for entry_data in reader {
let entry = entry_data?;
match ArchiveEntryId::from_filename(entry.name)? {
ArchiveEntryId::Block(id) => {
let block = BlockStuff::deserialize_checked(&id, entry.data)?.into_block();

res.blocks.entry(id).or_default().block =
Some(WithArchiveData::new(block, entry.data.to_vec()));

if id.shard.is_masterchain() {
res.mc_block_ids.insert(id.seqno, id);
}
}
ArchiveEntryId::Proof(id) if id.shard.workchain() == -1 => {
let proof = BlockProofStuff::deserialize(&id, entry.data, false)?
.proof()
.clone();

res.blocks.entry(id).or_default().proof =
Some(WithArchiveData::new(proof, entry.data.to_vec()));
res.mc_block_ids.insert(id.seqno, id);
}
ArchiveEntryId::ProofLink(id) if id.shard.workchain() != -1 => {
let proof = BlockProofStuff::deserialize(&id, entry.data, true)?
.proof()
.clone();

res.blocks.entry(id).or_default().proof =
Some(WithArchiveData::new(proof, entry.data.to_vec()));
}
_ => continue,
}
}

Ok(res)
}

pub fn lowest_mc_id(&self) -> Option<&BlockId> {
self.mc_block_ids.values().next()
}

pub fn highest_mc_id(&self) -> Option<&BlockId> {
self.mc_block_ids.values().next_back()
}

pub fn get_block_by_id(&self, id: &BlockId) -> Option<Block> {
self.blocks
.get(id)
.and_then(|entry| entry.block.as_ref().map(|x| x.data.clone()))
}
}

#[derive(Default)]
pub struct ArchiveDataEntry {
pub block: Option<WithArchiveData<Block>>,
pub proof: Option<WithArchiveData<BlockProof>>,
}

#[derive(Clone)]
pub enum ArchiveData {
/// The raw data is known.
Expand Down
194 changes: 194 additions & 0 deletions block-util/src/archive/writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
#![allow(clippy::disallowed_types)]
use std::fs::File;
use std::io::{IoSlice, Write};
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use anyhow::{Context, Result};
use parking_lot::Mutex;

use crate::archive::Archive;

pub struct ArchiveWriter {
pool_state: Arc<ArchiveWritersPoolState>,
state: ArchiveWriterState,
}

impl ArchiveWriter {
pub fn parse_archive(&self) -> Result<Archive> {
match &self.state {
ArchiveWriterState::InMemory(buffer) => Archive::new(buffer),
ArchiveWriterState::File { file, .. } => {
let mapped_file =
FileWriter::new(file).context("Failed to map temp archive file")?;

Archive::new(mapped_file.as_slice())
}
}
}

fn acquire_memory(&mut self, additional: usize) -> std::io::Result<()> {
if let ArchiveWriterState::InMemory(buffer) = &self.state {
let move_to_file = {
let mut acquired_memory = self.pool_state.acquired_memory.lock();
if *acquired_memory + additional > self.pool_state.save_to_disk_threshold {
*acquired_memory -= buffer.len();
true
} else {
*acquired_memory += additional;
false
}
};

if move_to_file {
let (path, mut file) = self.pool_state.acquire_file()?;
file.write_all(buffer)?;
self.state = ArchiveWriterState::File { path, file };
}
}

Ok(())
}
}

impl Drop for ArchiveWriter {
fn drop(&mut self) {
match &self.state {
ArchiveWriterState::InMemory(buffer) => {
*self.pool_state.acquired_memory.lock() -= buffer.len();
}
ArchiveWriterState::File { path, .. } => {
if let Err(e) = std::fs::remove_file(path) {
tracing::error!(
target: "sync",
path = %path.display(),
"failed to remove temp archive file: {e:?}"
);
}
}
}
}
}

impl Write for ArchiveWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.acquire_memory(buf.len())?;

match &mut self.state {
ArchiveWriterState::InMemory(buffer) => buffer.write(buf),
ArchiveWriterState::File { file, .. } => file.write(buf),
}
}

fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> std::io::Result<usize> {
let len = bufs.iter().map(|b| b.len()).sum();

self.acquire_memory(len)?;

match &mut self.state {
ArchiveWriterState::InMemory(buffer) => {
buffer.reserve(len);
for buf in bufs {
buffer.extend_from_slice(buf);
}
Ok(len)
}
ArchiveWriterState::File { file, .. } => file.write_vectored(bufs),
}
}

#[inline(always)]
fn flush(&mut self) -> std::io::Result<()> {
match &mut self.state {
ArchiveWriterState::InMemory(_) => Ok(()),
ArchiveWriterState::File { file, .. } => file.flush(),
}
}
}

struct ArchiveWritersPoolState {
save_to_disk_threshold: usize,
// NOTE: `AtomicUsize` is not used here because there is a complex
// InMemory-to-File transition
acquired_memory: Mutex<usize>,
temp_file_index: AtomicUsize,
base_path: PathBuf,
}

impl ArchiveWritersPoolState {
fn acquire_file(&self) -> std::io::Result<(PathBuf, File)> {
let temp_file_index = self.temp_file_index.fetch_add(1, Ordering::AcqRel);
let path = self
.base_path
.join(format!("temp_archive{temp_file_index:04}"));

let file = std::fs::OpenOptions::new()
.write(true)
.read(true)
.create(true)
.truncate(true)
.open(&path)?;

Ok((path, file))
}
}

enum ArchiveWriterState {
InMemory(Vec<u8>),
File { path: PathBuf, file: File },
}

struct FileWriter<'a> {
_file: &'a File,
length: usize,
ptr: *mut libc::c_void,
}

impl<'a> FileWriter<'a> {
fn new(file: &'a File) -> std::io::Result<Self> {
use std::os::unix::io::AsRawFd;

let length = file.metadata()?.len() as usize;

// SAFETY: File was opened successfully, file mode is R, offset is aligned
let ptr = unsafe {
libc::mmap(
std::ptr::null_mut(),
length,
libc::PROT_READ,
libc::MAP_SHARED,
file.as_raw_fd(),
0,
)
};
if ptr == libc::MAP_FAILED {
return Err(std::io::Error::last_os_error());
}

if unsafe { libc::madvise(ptr, length, libc::MADV_SEQUENTIAL) } != 0 {
return Err(std::io::Error::last_os_error());
}

Ok(Self {
_file: file,
length,
ptr,
})
}

fn as_slice(&self) -> &[u8] {
unsafe { std::slice::from_raw_parts(self.ptr as *const u8, self.length) }
}
}

impl Drop for FileWriter<'_> {
fn drop(&mut self) {
// SAFETY: File still exists, ptr and length were initialized once on creation
if unsafe { libc::munmap(self.ptr, self.length) } != 0 {
// TODO: how to handle this?
let e = std::io::Error::last_os_error();
panic!("failed to unmap temp archive file: {e:?}");
}
}
}
8 changes: 8 additions & 0 deletions cli/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,10 @@ impl Node {
}
};

if !self.is_synced()? {
// start normal sync
}

Ok(last_key_block_id)
}

Expand Down Expand Up @@ -585,6 +589,10 @@ impl Node {

Ok(())
}

pub fn is_synced(&self) -> Result<bool> {
todo!()
}
}

struct CollatorStateSubscriber {
Expand Down
Loading

0 comments on commit 6b2569b

Please sign in to comment.