Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add read-data-subset to CheckOptions; allow to check given trees #262

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ gethostname = "0.5.0"
humantime = "2.1.0"
itertools = "0.13.0"
quick_cache = "0.6.2"
safe-transmute = "0.11.3"
shell-words = "1.1.0"
strum = { version = "0.26.3", features = ["derive"] }
zstd = "0.13.2"
Expand Down
281 changes: 232 additions & 49 deletions crates/core/src/commands/check.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,122 @@
//! `check` subcommand
use std::collections::HashMap;
use std::{
collections::{BTreeSet, HashMap},
str::FromStr,
};

use bytes::Bytes;
use bytesize::ByteSize;
use derive_setters::Setters;
use itertools::Itertools;
use log::{debug, error, warn};
use rayon::prelude::{IntoParallelIterator, ParallelBridge, ParallelIterator};
use rand::{prelude::SliceRandom, thread_rng, Rng};
use rayon::prelude::{IntoParallelIterator, ParallelIterator};
use zstd::stream::decode_all;

use crate::{
backend::{cache::Cache, decrypt::DecryptReadBackend, node::NodeType, FileType, ReadBackend},
blob::{tree::TreeStreamerOnce, BlobId, BlobType},
crypto::hasher::hash,
error::{RusticErrorKind, RusticResult},
error::{CommandErrorKind, RusticErrorKind, RusticResult},
id::Id,
index::{
binarysorted::{IndexCollector, IndexType},
GlobalIndex, ReadGlobalIndex,
},
progress::{Progress, ProgressBars},
repofile::{
packfile::PackId, IndexFile, IndexPack, PackHeader, PackHeaderLength, PackHeaderRef,
SnapshotFile,
},
repository::{Open, Repository},
TreeId,
};

#[derive(Clone, Copy, Debug, Default)]
/// Options to specify which subset of packs will be read
pub enum ReadSubsetOption {
#[default]
/// Read all pack files
All,
/// Read a random subset of pack files with (approximately) the given percentage of total size
Percentage(f64),
/// Read a random subset of pack files with (approximately) the given size
Size(u64),
/// Read a subset of packfiles based on Ids: Using (1,n) .. (n,n) in separate runs will cover all pack files
IdSubSet((u32, u32)),
}

impl ReadSubsetOption {
fn apply(self, packs: impl IntoIterator<Item = IndexPack>) -> Vec<IndexPack> {

Check warning on line 48 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L48

Added line #L48 was not covered by tests
self.apply_with_rng(packs, &mut thread_rng())
}

fn apply_with_rng(
self,
packs: impl IntoIterator<Item = IndexPack>,
rng: &mut impl Rng,
) -> Vec<IndexPack> {
fn id_matches_n_m(id: &Id, n: u32, m: u32) -> bool {
let short_id: u32 = id.transmute();

Check warning on line 58 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L57-L58

Added lines #L57 - L58 were not covered by tests
short_id % m == n % m
}

let mut total_size: u64 = 0;
let mut packs: Vec<_> = packs

Check warning on line 63 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L63

Added line #L63 was not covered by tests
.into_iter()
.inspect(|p| total_size += u64::from(p.pack_size()))
.collect();

// Apply read-subset option
if let Some(mut size) = match self {
Self::All => None,

Check warning on line 70 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L70

Added line #L70 was not covered by tests
// we need some casts to compute percentage...
#[allow(clippy::cast_possible_truncation)]
#[allow(clippy::cast_precision_loss)]
#[allow(clippy::cast_sign_loss)]
Self::Percentage(p) => Some((total_size as f64 * p / 100.0) as u64),
Self::Size(s) => Some(s),
Self::IdSubSet((n, m)) => {
packs.retain(|p| id_matches_n_m(&p.id, n, m));
None

Check warning on line 79 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L79

Added line #L79 was not covered by tests
}
} {
// random subset of given size is required
packs.shuffle(rng);
packs.retain(|p| {

Check warning on line 84 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L84

Added line #L84 was not covered by tests
let p_size = u64::from(p.pack_size());
if size > p_size {
size = size.saturating_sub(p_size);
true

Check warning on line 88 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L88

Added line #L88 was not covered by tests
} else {
false

Check warning on line 90 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L90

Added line #L90 was not covered by tests
}
});
}
packs
}
}

impl FromStr for ReadSubsetOption {
type Err = CommandErrorKind;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let result = if s == "all" {
Self::All
} else if let Some(p) = s.strip_suffix('%') {
// try to read percentage
Self::Percentage(p.parse()?)
} else if let Some((n, m)) = s.split_once('/') {
// try to read n/m
Self::IdSubSet((n.parse()?, m.parse()?))
} else {
Self::Size(
ByteSize::from_str(s)
.map_err(CommandErrorKind::FromByteSizeParser)?

Check warning on line 112 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L112

Added line #L112 was not covered by tests
.as_u64(),
)
};
Ok(result)
}
}

#[cfg_attr(feature = "clap", derive(clap::Parser))]
#[derive(Clone, Copy, Debug, Default, Setters)]
#[setters(into)]
Expand All @@ -34,9 +126,13 @@
#[cfg_attr(feature = "clap", clap(long, conflicts_with = "no_cache"))]
pub trust_cache: bool,

/// Read all data blobs
/// Also read and check pack files
#[cfg_attr(feature = "clap", clap(long))]
pub read_data: bool,

/// Read and check pack files
#[cfg_attr(feature = "clap", clap(long, default_value = "all"))]
pub read_data_subset: ReadSubsetOption,
}

impl CheckOptions {
Expand All @@ -54,7 +150,11 @@
/// # Errors
///
/// If the repository is corrupted
pub(crate) fn run<P: ProgressBars, S: Open>(self, repo: &Repository<P, S>) -> RusticResult<()> {
pub(crate) fn run<P: ProgressBars, S: Open>(
self,
repo: &Repository<P, S>,
trees: Vec<TreeId>,
) -> RusticResult<()> {
let be = repo.dbe();
let cache = repo.cache();
let hot_be = &repo.be_hot;
Expand Down Expand Up @@ -105,37 +205,32 @@
}
}

let total_pack_size: u64 = index_collector
.data_packs()
.iter()
.map(|(_, size)| u64::from(*size))
.sum::<u64>()
+ index_collector
.tree_packs()
.iter()
.map(|(_, size)| u64::from(*size))
.sum::<u64>();

let index_be = GlobalIndex::new_from_index(index_collector.into_index());

check_snapshots(be, &index_be, pb)?;
let packs = check_trees(be, &index_be, trees, pb)?;

if self.read_data {
let packs = index_be
.into_index()
.into_iter()
.filter(|p| packs.contains(&p.id));

let packs = self.read_data_subset.apply(packs);

Check warning on line 218 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L218

Added line #L218 was not covered by tests

repo.warm_up_wait(packs.iter().map(|pack| pack.id))?;

let total_pack_size = packs.iter().map(|pack| u64::from(pack.pack_size())).sum();
let p = pb.progress_bytes("reading pack data...");
p.set_length(total_pack_size);

index_be
.into_index()
.into_iter()
.par_bridge()
.for_each(|pack| {
let id = pack.id;
let data = be.read_full(FileType::Pack, &id).unwrap();
match check_pack(be, pack, data, &p) {
Ok(()) => {}
Err(err) => error!("Error reading pack {id} : {err}",),
}
});
packs.into_par_iter().for_each(|pack| {
let id = pack.id;
let data = be.read_full(FileType::Pack, &id).unwrap();
match check_pack(be, pack, data, &p) {
Ok(()) => {}
Err(err) => error!("Error reading pack {id} : {err}",),

Check warning on line 231 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L230-L231

Added lines #L230 - L231 were not covered by tests
}
});
p.finish();
}
Ok(())
Expand Down Expand Up @@ -375,19 +470,13 @@
/// # Errors
///
/// If a snapshot or tree is missing or has a different size
fn check_snapshots(
fn check_trees(
be: &impl DecryptReadBackend,
index: &impl ReadGlobalIndex,
snap_trees: Vec<TreeId>,
pb: &impl ProgressBars,
) -> RusticResult<()> {
let p = pb.progress_counter("reading snapshots...");
let snap_trees: Vec<_> = be
.stream_all::<SnapshotFile>(&p)?
.iter()
.map_ok(|(_, snap)| snap.tree)
.try_collect()?;
p.finish();

) -> RusticResult<BTreeSet<PackId>> {
let mut packs = BTreeSet::new();
let p = pb.progress_counter("checking trees...");
let mut tree_streamer = TreeStreamerOnce::new(be, index, snap_trees, p)?;
while let Some(item) = tree_streamer.next().transpose()? {
Expand All @@ -404,12 +493,17 @@
error!("file {:?} blob {} has null ID", path.join(node.name()), i);
}

if !index.has_data(id) {
error!(
"file {:?} blob {} is missing in index",
path.join(node.name()),
id
);
match index.get_data(id) {
None => {
error!(

Check warning on line 498 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L498

Added line #L498 was not covered by tests
"file {:?} blob {} is missing in index",
path.join(node.name()),

Check warning on line 500 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L500

Added line #L500 was not covered by tests
id
);
}
Some(entry) => {
_ = packs.insert(entry.pack);
}
}
}
},
Expand All @@ -423,7 +517,18 @@
Some(tree) if tree.is_null() => {
error!("dir {:?} subtree has null ID", path.join(node.name()));
}
_ => {} // subtree is ok
Some(id) => match index.get_tree(&id) {
None => {
error!(

Check warning on line 522 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L522

Added line #L522 was not covered by tests
"dir {:?} subtree blob {} is missing in index",
path.join(node.name()),
id
);
}
Some(entry) => {
_ = packs.insert(entry.pack);
}
}, // subtree is ok
}
}

Expand All @@ -432,7 +537,7 @@
}
}

Ok(())
Ok(packs)
}

/// Check if a pack is valid
Expand Down Expand Up @@ -519,3 +624,81 @@

Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
use insta::assert_ron_snapshot;
use rand::{rngs::StdRng, Rng, SeedableRng};
use rstest::{fixture, rstest};

const PACK_SIZE: u32 = 100_000_000;

#[fixture]
fn rng() -> StdRng {
StdRng::seed_from_u64(5)
}
fn test_packs(rng: &mut impl Rng) -> Vec<IndexPack> {
(0..500)
.map(|_| IndexPack {
id: PackId::from(Id::random_from_rng(rng)),
blobs: Vec::new(),
time: None,
size: Some(rng.gen_range(0..PACK_SIZE)),
})
.collect()
}

#[rstest]
#[case("all")]
#[case("5/12")]
#[case("5%")]
#[case("250MiB")]
fn test_read_subset(mut rng: StdRng, #[case] s: &str) {
let size =
|packs: &[IndexPack]| -> u64 { packs.iter().map(|p| u64::from(p.pack_size())).sum() };

let test_packs = test_packs(&mut rng);
let total_size = size(&test_packs);

let subset: ReadSubsetOption = s.parse().unwrap();
let packs = subset.apply_with_rng(test_packs, &mut rng);
let test_size = size(&packs);

match subset {
ReadSubsetOption::All => assert_eq!(test_size, total_size),
#[allow(clippy::cast_possible_truncation)]
#[allow(clippy::cast_precision_loss)]
#[allow(clippy::cast_sign_loss)]
ReadSubsetOption::Percentage(s) => assert!(test_size <= (total_size as f64 * s) as u64),
ReadSubsetOption::Size(size) => {
assert!(test_size <= size && size <= test_size + u64::from(PACK_SIZE));
}
ReadSubsetOption::IdSubSet(_) => {}
};

let ids: Vec<_> = packs.iter().map(|pack| pack.id).collect();
assert_ron_snapshot!(s, ids);
}

fn test_read_subset_n_m() {
let test_packs = test_packs(&mut thread_rng());
let mut all_packs: BTreeSet<_> = test_packs.iter().map(|pack| pack.id).collect();

let mut run_with = |s: &str| {
let subset: ReadSubsetOption = s.parse().unwrap();
let packs = subset.apply(test_packs.clone());
for pack in packs {
assert!(all_packs.remove(&pack.id));
}
};

run_with("1/5");
run_with("2/5");
run_with("3/5");
run_with("4/5");
run_with("5/5");

assert!(all_packs.is_empty());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
source: crates/core/src/commands/check.rs
expression: ids
---
[
Id("50ec36b55a9b35de779a2757571d2e4b16f6d1ca3ff73ade120901262c2b265d"),
Id("d07f392b1dfebfdf50bb10a4a7857b543cbe246f84549af8902dfde9737fd425"),
Id("f879520cb3e653a0c77dcc37dcd813a71ec7d972a4a6185b13f2ed240c6e72e1"),
Id("8ba2cc8cf44054f35be413f12ba83969deb01f1c44660822cee1b960d69a7526"),
Id("e6bae2c9c6b9d8b8a72b45590ffc6c8e034083d6a8180877f6d270537a1ac214"),
Id("f77e206f69693ae3490de38ce00f5e89ae7db4808b770c60e07d64815ee0478d"),
Id("20b51c8c49aff07d7063c76a863cbdcea845989ef79d4a3f8ff599687eaebe48"),
Id("9630b7b1e6329e7c28eb0eeb4e0df36bbf45acf3ba5de4a0403b77e47216857a"),
Id("33e053041d2de235e03cc219a8b8300d8f1e35ee034c45f4613ea782d5e672f2"),
Id("7583c1099bf604771a03af7627f4122a59da07db7358484b8543e881a7939b3f"),
Id("a94f61701a165181c6940584ca0cd2c2355e5e1eb65a3a295fc4d1c02fa81138"),
Id("5ae22a813d32049b56ac2760a5a34b8f66e30b5232f66bb8eae420c7022197e8"),
]
Loading
Loading