Skip to content

feat!: Add read-data-subset to CheckOptions; allow to check given trees #262

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Sep 30, 2024
280 changes: 231 additions & 49 deletions crates/core/src/commands/check.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,121 @@
//! `check` subcommand
use std::collections::HashMap;
use std::{
collections::{BTreeSet, HashMap},
str::FromStr,
};

use bytes::Bytes;
use bytesize::ByteSize;
use derive_setters::Setters;
use itertools::Itertools;
use log::{debug, error, warn};
use rayon::prelude::{IntoParallelIterator, ParallelBridge, ParallelIterator};
use rand::{prelude::SliceRandom, thread_rng, Rng};
use rayon::prelude::{IntoParallelIterator, ParallelIterator};
use zstd::stream::decode_all;

use crate::{
backend::{cache::Cache, decrypt::DecryptReadBackend, node::NodeType, FileType, ReadBackend},
blob::{tree::TreeStreamerOnce, BlobId, BlobType},
crypto::hasher::hash,
error::{RusticErrorKind, RusticResult},
error::{CommandErrorKind, RusticErrorKind, RusticResult},
id::Id,
index::{
binarysorted::{IndexCollector, IndexType},
GlobalIndex, ReadGlobalIndex,
},
progress::{Progress, ProgressBars},
repofile::{
packfile::PackId, IndexFile, IndexPack, PackHeader, PackHeaderLength, PackHeaderRef,
SnapshotFile,
},
repository::{Open, Repository},
TreeId,
};

#[derive(Clone, Copy, Debug, Default)]
/// Options to specify which subset of packs will be read
pub enum ReadSubsetOption {
#[default]
/// Read all pack files
All,
/// Read a random subset of pack files with (approximately) the given percentage of total size
Percentage(f64),
/// Read a random subset of pack files with (approximately) the given size
Size(u64),
/// Read a subset of packfiles based on Ids: Using (1,n) .. (n,n) in separate runs will cover all pack files
IdSubSet((u32, u32)),
}

impl ReadSubsetOption {
fn apply(self, packs: impl IntoIterator<Item = IndexPack>) -> Vec<IndexPack> {

Check warning on line 48 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L48

Added line #L48 was not covered by tests
self.apply_with_rng(packs, &mut thread_rng())
}

fn apply_with_rng(
self,
packs: impl IntoIterator<Item = IndexPack>,
rng: &mut impl Rng,
) -> Vec<IndexPack> {
fn id_matches_n_m(id: &Id, n: u32, m: u32) -> bool {

Check warning on line 57 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L57

Added line #L57 was not covered by tests
id.as_u32() % m == n % m
}

let mut total_size: u64 = 0;
let mut packs: Vec<_> = packs

Check warning on line 62 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L62

Added line #L62 was not covered by tests
.into_iter()
.inspect(|p| total_size += u64::from(p.pack_size()))
.collect();

// Apply read-subset option
if let Some(mut size) = match self {
Self::All => None,

Check warning on line 69 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L69

Added line #L69 was not covered by tests
// we need some casts to compute percentage...
#[allow(clippy::cast_possible_truncation)]
#[allow(clippy::cast_precision_loss)]
#[allow(clippy::cast_sign_loss)]
Self::Percentage(p) => Some((total_size as f64 * p / 100.0) as u64),
Self::Size(s) => Some(s),
Self::IdSubSet((n, m)) => {
packs.retain(|p| id_matches_n_m(&p.id, n, m));
None

Check warning on line 78 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L78

Added line #L78 was not covered by tests
}
} {
// random subset of given size is required
packs.shuffle(rng);
packs.retain(|p| {

Check warning on line 83 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L83

Added line #L83 was not covered by tests
let p_size = u64::from(p.pack_size());
if size > p_size {
size = size.saturating_sub(p_size);
true

Check warning on line 87 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L87

Added line #L87 was not covered by tests
} else {
false

Check warning on line 89 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L89

Added line #L89 was not covered by tests
}
});
}
packs
}
}

impl FromStr for ReadSubsetOption {
type Err = CommandErrorKind;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let result = if s == "all" {
Self::All
} else if let Some(p) = s.strip_suffix('%') {
// try to read percentage
Self::Percentage(p.parse()?)
} else if let Some((n, m)) = s.split_once('/') {
// try to read n/m
Self::IdSubSet((n.parse()?, m.parse()?))
} else {
Self::Size(
ByteSize::from_str(s)
.map_err(CommandErrorKind::FromByteSizeParser)?

Check warning on line 111 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L111

Added line #L111 was not covered by tests
.as_u64(),
)
};
Ok(result)
}
}

#[cfg_attr(feature = "clap", derive(clap::Parser))]
#[derive(Clone, Copy, Debug, Default, Setters)]
#[setters(into)]
Expand All @@ -34,9 +125,13 @@
#[cfg_attr(feature = "clap", clap(long, conflicts_with = "no_cache"))]
pub trust_cache: bool,

/// Read all data blobs
/// Also read and check pack files
#[cfg_attr(feature = "clap", clap(long))]
pub read_data: bool,

/// Read and check pack files
#[cfg_attr(feature = "clap", clap(long, default_value = "all"))]
pub read_data_subset: ReadSubsetOption,
}

impl CheckOptions {
Expand All @@ -54,7 +149,11 @@
/// # Errors
///
/// If the repository is corrupted
pub(crate) fn run<P: ProgressBars, S: Open>(self, repo: &Repository<P, S>) -> RusticResult<()> {
pub(crate) fn run<P: ProgressBars, S: Open>(
self,
repo: &Repository<P, S>,
trees: Vec<TreeId>,
) -> RusticResult<()> {
let be = repo.dbe();
let cache = repo.cache();
let hot_be = &repo.be_hot;
Expand Down Expand Up @@ -105,37 +204,32 @@
}
}

let total_pack_size: u64 = index_collector
.data_packs()
.iter()
.map(|(_, size)| u64::from(*size))
.sum::<u64>()
+ index_collector
.tree_packs()
.iter()
.map(|(_, size)| u64::from(*size))
.sum::<u64>();

let index_be = GlobalIndex::new_from_index(index_collector.into_index());

check_snapshots(be, &index_be, pb)?;
let packs = check_trees(be, &index_be, trees, pb)?;

if self.read_data {
let packs = index_be
.into_index()
.into_iter()
.filter(|p| packs.contains(&p.id));

let packs = self.read_data_subset.apply(packs);

Check warning on line 217 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L217

Added line #L217 was not covered by tests

repo.warm_up_wait(packs.iter().map(|pack| pack.id))?;

let total_pack_size = packs.iter().map(|pack| u64::from(pack.pack_size())).sum();
let p = pb.progress_bytes("reading pack data...");
p.set_length(total_pack_size);

index_be
.into_index()
.into_iter()
.par_bridge()
.for_each(|pack| {
let id = pack.id;
let data = be.read_full(FileType::Pack, &id).unwrap();
match check_pack(be, pack, data, &p) {
Ok(()) => {}
Err(err) => error!("Error reading pack {id} : {err}",),
}
});
packs.into_par_iter().for_each(|pack| {
let id = pack.id;
let data = be.read_full(FileType::Pack, &id).unwrap();
match check_pack(be, pack, data, &p) {
Ok(()) => {}
Err(err) => error!("Error reading pack {id} : {err}",),

Check warning on line 230 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L229-L230

Added lines #L229 - L230 were not covered by tests
}
});
p.finish();
}
Ok(())
Expand Down Expand Up @@ -375,19 +469,13 @@
/// # Errors
///
/// If a snapshot or tree is missing or has a different size
fn check_snapshots(
fn check_trees(
be: &impl DecryptReadBackend,
index: &impl ReadGlobalIndex,
snap_trees: Vec<TreeId>,
pb: &impl ProgressBars,
) -> RusticResult<()> {
let p = pb.progress_counter("reading snapshots...");
let snap_trees: Vec<_> = be
.stream_all::<SnapshotFile>(&p)?
.iter()
.map_ok(|(_, snap)| snap.tree)
.try_collect()?;
p.finish();

) -> RusticResult<BTreeSet<PackId>> {
let mut packs = BTreeSet::new();
let p = pb.progress_counter("checking trees...");
let mut tree_streamer = TreeStreamerOnce::new(be, index, snap_trees, p)?;
while let Some(item) = tree_streamer.next().transpose()? {
Expand All @@ -404,12 +492,17 @@
error!("file {:?} blob {} has null ID", path.join(node.name()), i);
}

if !index.has_data(id) {
error!(
"file {:?} blob {} is missing in index",
path.join(node.name()),
id
);
match index.get_data(id) {
None => {
error!(

Check warning on line 497 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L497

Added line #L497 was not covered by tests
"file {:?} blob {} is missing in index",
path.join(node.name()),

Check warning on line 499 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L499

Added line #L499 was not covered by tests
id
);
}
Some(entry) => {
_ = packs.insert(entry.pack);
}
}
}
},
Expand All @@ -423,7 +516,18 @@
Some(tree) if tree.is_null() => {
error!("dir {:?} subtree has null ID", path.join(node.name()));
}
_ => {} // subtree is ok
Some(id) => match index.get_tree(&id) {
None => {
error!(

Check warning on line 521 in crates/core/src/commands/check.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/commands/check.rs#L521

Added line #L521 was not covered by tests
"dir {:?} subtree blob {} is missing in index",
path.join(node.name()),
id
);
}
Some(entry) => {
_ = packs.insert(entry.pack);
}
}, // subtree is ok
}
}

Expand All @@ -432,7 +536,7 @@
}
}

Ok(())
Ok(packs)
}

/// Check if a pack is valid
Expand Down Expand Up @@ -519,3 +623,81 @@

Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
use insta::assert_ron_snapshot;
use rand::{rngs::StdRng, Rng, SeedableRng};
use rstest::{fixture, rstest};

const PACK_SIZE: u32 = 100_000_000;

#[fixture]
fn rng() -> StdRng {
StdRng::seed_from_u64(5)
}
fn test_packs(rng: &mut impl Rng) -> Vec<IndexPack> {
(0..500)
.map(|_| IndexPack {
id: PackId::from(Id::random_from_rng(rng)),
blobs: Vec::new(),
time: None,
size: Some(rng.gen_range(0..PACK_SIZE)),
})
.collect()
}

#[rstest]
#[case("all")]
#[case("5/12")]
#[case("5%")]
#[case("250MiB")]
fn test_read_subset(mut rng: StdRng, #[case] s: &str) {
let size =
|packs: &[IndexPack]| -> u64 { packs.iter().map(|p| u64::from(p.pack_size())).sum() };

let test_packs = test_packs(&mut rng);
let total_size = size(&test_packs);

let subset: ReadSubsetOption = s.parse().unwrap();
let packs = subset.apply_with_rng(test_packs, &mut rng);
let test_size = size(&packs);

match subset {
ReadSubsetOption::All => assert_eq!(test_size, total_size),
#[allow(clippy::cast_possible_truncation)]
#[allow(clippy::cast_precision_loss)]
#[allow(clippy::cast_sign_loss)]
ReadSubsetOption::Percentage(s) => assert!(test_size <= (total_size as f64 * s) as u64),
ReadSubsetOption::Size(size) => {
assert!(test_size <= size && size <= test_size + u64::from(PACK_SIZE));
}
ReadSubsetOption::IdSubSet(_) => {}
};

let ids: Vec<_> = packs.iter().map(|pack| (pack.id, pack.size)).collect();
assert_ron_snapshot!(s, ids);
}

fn test_read_subset_n_m() {
let test_packs = test_packs(&mut thread_rng());
let mut all_packs: BTreeSet<_> = test_packs.iter().map(|pack| pack.id).collect();

let mut run_with = |s: &str| {
let subset: ReadSubsetOption = s.parse().unwrap();
let packs = subset.apply(test_packs.clone());
for pack in packs {
assert!(all_packs.remove(&pack.id));
}
};

run_with("1/5");
run_with("2/5");
run_with("3/5");
run_with("4/5");
run_with("5/5");

assert!(all_packs.is_empty());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
source: crates/core/src/commands/check.rs
expression: ids
---
[
(Id("50ec36b55a9b35de779a2757571d2e4b16f6d1ca3ff73ade120901262c2b265d"), Some(25543042)),
(Id("d07f392b1dfebfdf50bb10a4a7857b543cbe246f84549af8902dfde9737fd425"), Some(16489215)),
(Id("f879520cb3e653a0c77dcc37dcd813a71ec7d972a4a6185b13f2ed240c6e72e1"), Some(3342498)),
(Id("8ba2cc8cf44054f35be413f12ba83969deb01f1c44660822cee1b960d69a7526"), Some(46887261)),
(Id("e6bae2c9c6b9d8b8a72b45590ffc6c8e034083d6a8180877f6d270537a1ac214"), Some(47685315)),
(Id("f77e206f69693ae3490de38ce00f5e89ae7db4808b770c60e07d64815ee0478d"), Some(9217773)),
(Id("20b51c8c49aff07d7063c76a863cbdcea845989ef79d4a3f8ff599687eaebe48"), Some(83677501)),
(Id("9630b7b1e6329e7c28eb0eeb4e0df36bbf45acf3ba5de4a0403b77e47216857a"), Some(24144078)),
(Id("33e053041d2de235e03cc219a8b8300d8f1e35ee034c45f4613ea782d5e672f2"), Some(2254122)),
(Id("7583c1099bf604771a03af7627f4122a59da07db7358484b8543e881a7939b3f"), Some(68829)),
(Id("a94f61701a165181c6940584ca0cd2c2355e5e1eb65a3a295fc4d1c02fa81138"), Some(2347337)),
(Id("5ae22a813d32049b56ac2760a5a34b8f66e30b5232f66bb8eae420c7022197e8"), Some(452910)),
]
Loading