From 9aa30d191f41a4859056f93c97da41f3fc1e8516 Mon Sep 17 00:00:00 2001 From: Alexander Weiss Date: Fri, 16 Feb 2024 15:07:19 +0100 Subject: [PATCH] add lock command --- crates/core/src/commands.rs | 1 + crates/core/src/commands/lock.rs | 256 +++++++++++++++++++++++++++++++ crates/core/src/error.rs | 2 + crates/core/src/lib.rs | 1 + crates/core/src/repository.rs | 18 +++ 5 files changed, 278 insertions(+) create mode 100644 crates/core/src/commands/lock.rs diff --git a/crates/core/src/commands.rs b/crates/core/src/commands.rs index e4d15a122..37b4262d0 100644 --- a/crates/core/src/commands.rs +++ b/crates/core/src/commands.rs @@ -12,6 +12,7 @@ pub mod dump; pub mod forget; pub mod init; pub mod key; +pub mod lock; pub mod merge; pub mod prune; /// The `repair` command. diff --git a/crates/core/src/commands/lock.rs b/crates/core/src/commands/lock.rs new file mode 100644 index 000000000..c034b6166 --- /dev/null +++ b/crates/core/src/commands/lock.rs @@ -0,0 +1,256 @@ +//! `lock` subcommand +use std::collections::BTreeSet; + +use chrono::{DateTime, Local}; +use derive_setters::Setters; +use log::error; +use rayon::ThreadPoolBuilder; + +use crate::{ + backend::{ + decrypt::{DecryptReadBackend, DecryptWriteBackend}, + node::NodeType, + FileType, + }, + blob::{tree::TreeStreamerOnce, BlobType}, + error::{CommandErrorKind, RepositoryErrorKind, RusticResult}, + id::Id, + index::{ + binarysorted::{IndexCollector, IndexType}, + indexer::Indexer, + GlobalIndex, ReadGlobalIndex, + }, + progress::{Progress, ProgressBars}, + repofile::{indexfile::LockOption, DeleteOption, IndexFile, SnapshotFile}, + repository::{Open, Repository}, +}; + +pub(super) mod constants { + /// The maximum number of reader threads to use for locking. + pub(super) const MAX_LOCKER_THREADS_NUM: usize = 20; +} + +#[cfg_attr(feature = "clap", derive(clap::Parser))] +#[derive(Debug, Clone, Copy, Setters)] +/// Options for the `prune` command +pub struct LockOptions { + /// Extend locks even if the files are already locked long enough + #[cfg_attr(feature = "clap", clap(long))] + always_extend_lock: bool, + + until: Option>, +} + +impl LockOptions { + pub fn lock( + &self, + repo: &Repository, + snapshots: &[SnapshotFile], + now: DateTime, + ) -> RusticResult<()> { + let pb = &repo.pb; + let be = repo.dbe(); + + let mut index_files = Vec::new(); + + let p = pb.progress_counter("reading index..."); + let mut index_collector = IndexCollector::new(IndexType::OnlyTrees); + + for index in be.stream_all::(&p)? { + let (id, index) = index?; + index_collector.extend(index.packs.clone()); + index_files.push((id, index)); + } + p.finish(); + let index = GlobalIndex::new_from_index(index_collector.into_index()); + + let snap_tress = snapshots.iter().map(|sn| sn.tree).collect(); + let packs = find_needed_packs(be, &index, snap_tress, pb)?; + self.lock_packs(repo, index_files, packs)?; + + self.lock_snapshots(repo, snapshots, now)?; + + Ok(()) + } + + pub fn lock_snapshots( + &self, + repo: &Repository, + snapshots: &[SnapshotFile], + now: DateTime, + ) -> RusticResult<()> { + let mut new_snaps = Vec::new(); + let mut remove_snaps = Vec::new(); + let mut lock_snaps = Vec::new(); + + let new_lock = DeleteOption::set_from_until(self.until); + + for snap in snapshots { + if !snap.must_keep(now) { + remove_snaps.push(snap.id); + } + + if snap.delete.needs_lock_update(&new_lock) { + new_snaps.push(SnapshotFile { + delete: new_lock, + ..snap.clone() + }); + } else if self.always_extend_lock { + lock_snaps.push(snap.id); + } + } + + // save new snapshots + let new_ids = repo.save_snapshots(new_snaps)?; + lock_snaps.extend(new_ids); + + // remove old snapshots + repo.delete_snapshots(&remove_snaps)?; + + // Do the actual locking + lock_files(repo, FileType::Snapshot, &lock_snaps, self.until)?; + + Ok(()) + } + + pub fn lock_packs( + &self, + repo: &Repository, + index_files: Vec<(Id, IndexFile)>, + packs: BTreeSet, + ) -> RusticResult<()> { + let mut lock_packs = Vec::new(); + let mut remove_index = Vec::new(); + + // Check for indexfiles-to-modify and for packs to lock + // Also already write the new index from the index files which are modified. + let p = repo.pb.progress_counter("processing index files..."); + p.set_length(index_files.len().try_into().unwrap()); + let indexer = Indexer::new_unindexed(repo.dbe().clone()).into_shared(); + for (id, mut index) in index_files { + let mut modified = false; + for pack in &mut index.packs { + if !packs.contains(&pack.id) { + continue; + } + if !pack.lock.is_locked(self.until) { + pack.lock = LockOption::set_from_until(self.until); + modified = true; + lock_packs.push(pack.id); + } else if self.always_extend_lock { + lock_packs.push(pack.id); + } + } + if modified { + for pack in index.packs { + indexer.write().unwrap().add(pack)?; + } + for pack_remove in index.packs_to_delete { + indexer.write().unwrap().add_remove(pack_remove)?; + } + remove_index.push(id); + } + p.inc(1); + } + indexer.write().unwrap().finalize()?; + p.finish(); + + // Remove old index files + let p = repo.pb.progress_counter("removing old index files..."); + repo.dbe() + .delete_list(FileType::Index, true, remove_index.iter(), p)?; + + // Do the actual locking + lock_files(repo, FileType::Pack, &lock_packs, self.until)?; + + Ok(()) + } +} + +fn lock_files( + repo: &Repository, + file_type: FileType, + ids: &[Id], + until: Option>, +) -> RusticResult<()> { + let pool = ThreadPoolBuilder::new() + .num_threads(constants::MAX_LOCKER_THREADS_NUM) + .build() + .map_err(RepositoryErrorKind::FromThreadPoolbilderError)?; + let progress_bar_ref = &repo.pb.progress_counter("locking {filetype:?} files.."); + let backend = &repo.be; + pool.in_place_scope(|scope| { + for id in ids { + scope.spawn(move |_| { + if let Err(e) = backend.lock(file_type, id, until) { + // FIXME: Use error handling + error!("lock failed for {file_type:?} {id:?}. {e}"); + }; + progress_bar_ref.inc(1); + }); + } + }); + Ok(()) +} + +/// Find packs which are needed for the given Trees +/// +/// # Arguments +/// +/// * `index` - The index to use +/// * `trees` - The trees to consider +/// * `pb` - The progress bars +/// +/// # Errors +/// +// TODO!: add errors! +fn find_needed_packs( + be: &impl DecryptReadBackend, + index: &impl ReadGlobalIndex, + trees: Vec, + pb: &impl ProgressBars, +) -> RusticResult> { + let p = pb.progress_counter("finding needed packs..."); + + let mut packs = BTreeSet::new(); + + for tree_id in &trees { + _ = packs.insert( + index + .get_id(BlobType::Tree, tree_id) + .ok_or_else(|| CommandErrorKind::IdNotFoundinIndex(*tree_id))? + .pack, + ); + } + + let mut tree_streamer = TreeStreamerOnce::new(be, index, trees, p)?; + while let Some(item) = tree_streamer.next().transpose()? { + let (_, tree) = item; + for node in tree.nodes { + match node.node_type { + NodeType::File => { + for id in node.content.iter().flatten() { + _ = packs.insert( + index + .get_id(BlobType::Data, id) + .ok_or_else(|| CommandErrorKind::IdNotFoundinIndex(*id))? + .pack, + ); + } + } + NodeType::Dir => { + let id = &node.subtree.unwrap(); + _ = packs.insert( + index + .get_id(BlobType::Tree, id) + .ok_or_else(|| CommandErrorKind::IdNotFoundinIndex(*id))? + .pack, + ); + } + _ => {} // nothing to do + } + } + } + + Ok(packs) +} diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs index 621010660..2459c1957 100644 --- a/crates/core/src/error.rs +++ b/crates/core/src/error.rs @@ -214,6 +214,8 @@ pub enum CommandErrorKind { FromRayonError(#[from] rayon::ThreadPoolBuildError), /// conversion to `u64` failed: `{0:?}` ConversionToU64Failed(TryFromIntError), + /// Id {0:?} not found in index + IdNotFoundinIndex(Id), } /// [`CryptoErrorKind`] describes the errors that can happen while dealing with Cryptographic functions diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 3ea6c8ae0..8d112165a 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -196,6 +196,7 @@ pub use crate::{ copy::CopySnapshot, forget::{ForgetGroup, ForgetGroups, ForgetSnapshot, KeepOptions}, key::KeyOptions, + lock::LockOptions, prune::{PruneOptions, PrunePlan, PruneStats}, repair::{index::RepairIndexOptions, snapshots::RepairSnapshotsOptions}, repoinfo::{BlobInfo, IndexInfos, PackInfo, RepoFileInfo, RepoFileInfos}, diff --git a/crates/core/src/repository.rs b/crates/core/src/repository.rs index 1320c9c7b..829d218a2 100644 --- a/crates/core/src/repository.rs +++ b/crates/core/src/repository.rs @@ -10,6 +10,7 @@ use std::{ }; use bytes::Bytes; +use chrono::Local; use derive_setters::Setters; use log::{debug, error, info}; use serde_with::{serde_as, DisplayFromStr}; @@ -37,6 +38,7 @@ use crate::{ copy::CopySnapshot, forget::{ForgetGroups, KeepOptions}, key::KeyOptions, + lock::LockOptions, prune::{PruneOptions, PrunePlan}, repair::{index::RepairIndexOptions, snapshots::RepairSnapshotsOptions}, repoinfo::{IndexInfos, RepoFileInfos}, @@ -1058,6 +1060,22 @@ impl Repository { opts.get_plan(self) } + /// Lock snapshot and pack files needed for the given snapshots + /// + /// # Arguments + /// + /// * `opts` - The lock options to use + /// * `snaps` - The snapshots to lock + /// * `until` - until when to lock. None means lock forever. + /// + /// # Errors + /// + // TODO: Document errors + pub fn lock(&self, opts: &LockOptions, snaps: &[SnapshotFile]) -> RusticResult<()> { + let now = Local::now(); + opts.lock(self, snaps, now) + } + /// Turn the repository into the `IndexedFull` state by reading and storing the index /// /// # Errors