Skip to content

Commit

Permalink
feat!(commands): Add possibility for locking
Browse files Browse the repository at this point in the history
  • Loading branch information
aawsome committed Sep 27, 2024
1 parent c504c29 commit 1b9d988
Show file tree
Hide file tree
Showing 13 changed files with 459 additions and 23 deletions.
21 changes: 21 additions & 0 deletions crates/core/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::{io::Read, ops::Deref, path::PathBuf, sync::Arc};

use anyhow::Result;
use bytes::Bytes;
use chrono::{DateTime, Local};
use enum_map::Enum;
use log::trace;

Expand Down Expand Up @@ -337,6 +338,26 @@ pub trait WriteBackend: ReadBackend {
///
/// The result of the removal.
fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()>;

/// Specify if the backend is able to lock files
fn can_lock(&self) -> bool {
false
}

/// Lock the given file.
///
/// # Arguments
///
/// * `tpe` - The type of the file.
/// * `id` - The id of the file.
/// * `until` - The date until when to lock. May be `None` which usually specifies a unlimited lock
///
/// # Errors
///
/// If the file could not be read.
fn lock(&self, _tpe: FileType, _id: &Id, _until: Option<DateTime<Local>>) -> Result<()> {
Ok(())
}
}

#[cfg(test)]
Expand Down
17 changes: 10 additions & 7 deletions crates/core/src/backend/decrypt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,15 +283,18 @@ pub trait DecryptWriteBackend: WriteBackend + Clone + 'static {
&self,
list: I,
p: impl Progress,
) -> RusticResult<()> {
) -> RusticResult<Vec<F::Id>> {
p.set_length(list.len() as u64);
list.par_bridge().try_for_each(|file| -> RusticResult<_> {
_ = self.save_file(file)?;
p.inc(1);
Ok(())
})?;
let ids = list
.par_bridge()
.map(|file| -> RusticResult<F::Id> {
let id = self.save_file(file)?.into();
p.inc(1);
Ok(id)
})
.collect::<RusticResult<_>>()?;
p.finish();
Ok(())
Ok(ids)
}

/// Deletes the given list of files.
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub mod dump;
pub mod forget;
pub mod init;
pub mod key;
pub mod lock;
pub mod merge;
pub mod prune;
/// The `repair` command.
Expand Down
5 changes: 2 additions & 3 deletions crates/core/src/commands/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use log::trace;
use rayon::prelude::{IntoParallelRefIterator, ParallelBridge, ParallelIterator};

use crate::{
backend::{decrypt::DecryptWriteBackend, node::NodeType},
backend::node::NodeType,
blob::{packer::Packer, tree::TreeStreamerOnce, BlobId, BlobType},
error::RusticResult,
index::{indexer::Indexer, ReadIndex},
Expand Down Expand Up @@ -130,8 +130,7 @@ pub(crate) fn copy<'a, Q, R: IndexedFull, P: ProgressBars, S: IndexedIds>(
_ = tree_packer.finalize()?;
indexer.write().unwrap().finalize()?;

let p = pb.progress_counter("saving snapshots...");
be_dest.save_list(snaps.iter(), p)?;
let _ = repo_dest.save_snapshots(snaps)?;
Ok(())
}

Expand Down
8 changes: 5 additions & 3 deletions crates/core/src/commands/forget.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,10 +524,12 @@ impl KeepOptions {

while let Some(sn) = iter.next() {
let (keep, reasons) = {
if sn.must_keep(now) {
(true, vec!["snapshot"])
if sn.is_locked(now) {
(true, vec!["locked"])
} else if sn.must_keep(now) {
(true, vec!["delete mark"])
} else if sn.must_delete(now) {
(false, vec!["snapshot"])
(false, vec!["delete mark"])
} else {
let reasons =
group_keep.matches(&sn, last.as_ref(), iter.peek().is_some(), latest_time);
Expand Down
279 changes: 279 additions & 0 deletions crates/core/src/commands/lock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
//! `lock` subcommand
use std::collections::BTreeSet;

use chrono::{DateTime, Local};
use derive_setters::Setters;
use log::error;
use rayon::ThreadPoolBuilder;

use crate::{
backend::{
decrypt::{DecryptReadBackend, DecryptWriteBackend},
node::NodeType,
},
blob::{tree::TreeStreamerOnce, BlobType},
error::{CommandErrorKind, RepositoryErrorKind, RusticResult},
index::{
binarysorted::{IndexCollector, IndexType},
indexer::Indexer,
GlobalIndex, ReadGlobalIndex,
},
progress::{Progress, ProgressBars},
repofile::{IndexFile, IndexId, KeyId, PackId, RepoId, SnapshotFile, SnapshotId},
repository::{Open, Repository},
BlobId, TreeId,
};

pub(super) mod constants {
/// The maximum number of reader threads to use for locking.
pub(super) const MAX_LOCKER_THREADS_NUM: usize = 20;
}

#[derive(Debug, Clone, Default, Copy, Setters)]
/// Options for the `lock` command
pub struct LockOptions {
/// Extend locks even if the files are already locked long enough
always_extend_lock: bool,

/// Specify until when to extend the lock. If None, lock forever
until: Option<DateTime<Local>>,
}

impl LockOptions {
/// Lock the given snapshots and corresponding pack files
pub fn lock<P: ProgressBars, S: Open>(
&self,
repo: &Repository<P, S>,
snapshots: &[SnapshotFile],
now: DateTime<Local>,
) -> RusticResult<()> {
let pb = &repo.pb;
let be = repo.dbe();

let mut index_files = Vec::new();

let p = pb.progress_counter("reading index...");
let mut index_collector = IndexCollector::new(IndexType::Full);
for index in be.stream_all::<IndexFile>(&p)? {
let (id, index) = index?;
index_collector.extend(index.packs.clone());
index_files.push((id, index));
}
let index = GlobalIndex::new_from_index(index_collector.into_index());
p.finish();

let snap_tress = snapshots.iter().map(|sn| sn.tree).collect();
let packs = find_needed_packs(be, &index, snap_tress, pb)?;
self.lock_packs(repo, index_files, packs)?;

self.lock_snapshots(repo, snapshots, now)?;

Ok(())
}

fn lock_snapshots<P: ProgressBars, S: Open>(
&self,
repo: &Repository<P, S>,
snapshots: &[SnapshotFile],
now: DateTime<Local>,
) -> RusticResult<()> {
let mut new_snaps = Vec::new();
let mut remove_snaps = Vec::new();
let mut lock_snaps = Vec::new();

for snap in snapshots {
if !snap.delete.is_locked(self.until) {
new_snaps.push(SnapshotFile {
delete: self.until.into(),
..snap.clone()
});
if !snap.must_keep(now) {
remove_snaps.push(snap.id);
}
} else if self.always_extend_lock {
lock_snaps.push(snap.id);
}
}

// save new snapshots
let new_ids = repo.save_snapshots(new_snaps)?;
lock_snaps.extend(new_ids);

// remove old snapshots
repo.delete_snapshots(&remove_snaps)?;

// Do the actual locking
lock_files(repo, &lock_snaps, self.until)?;

Ok(())
}

fn lock_packs<P: ProgressBars, S: Open>(
&self,
repo: &Repository<P, S>,
index_files: Vec<(IndexId, IndexFile)>,
packs: BTreeSet<PackId>,
) -> RusticResult<()> {
let mut lock_packs = Vec::new();
let mut remove_index = Vec::new();

// Check for indexfiles-to-modify and for packs to lock
// Also already write the new index from the index files which are modified.
let p = repo.pb.progress_counter("processing index files...");
p.set_length(index_files.len().try_into().unwrap());
let indexer = Indexer::new_unindexed(repo.dbe().clone()).into_shared();
for (id, mut index) in index_files {
let mut modified = false;
for pack in &mut index.packs {
if !packs.contains(&pack.id) {
continue;
}
if !pack.lock.is_locked(self.until) {
pack.lock = self.until.into();
modified = true;
lock_packs.push(pack.id);
} else if self.always_extend_lock {
lock_packs.push(pack.id);
}
}
if modified {
for pack in index.packs {
indexer.write().unwrap().add(pack)?;
}
for pack_remove in index.packs_to_delete {
indexer.write().unwrap().add_remove(pack_remove)?;
}
remove_index.push(id);
}
p.inc(1);
}
indexer.write().unwrap().finalize()?;
p.finish();

// Remove old index files
let p = repo.pb.progress_counter("removing old index files...");
repo.dbe().delete_list(true, remove_index.iter(), p)?;

// Do the actual locking
lock_files(repo, &lock_packs, self.until)?;

Ok(())
}
}

pub fn lock_repo<P: ProgressBars, S>(
repo: &Repository<P, S>,
until: Option<DateTime<Local>>,
) -> RusticResult<()> {
lock_all_files::<P, S, KeyId>(repo, until)?;
lock_all_files::<P, S, SnapshotId>(repo, until)?;
lock_all_files::<P, S, IndexId>(repo, until)?;
lock_all_files::<P, S, PackId>(repo, until)?;
Ok(())
}

pub fn lock_all_files<P: ProgressBars, S, ID: RepoId>(
repo: &Repository<P, S>,
until: Option<DateTime<Local>>,
) -> RusticResult<()> {
let p = &repo
.pb
.progress_spinner(format!("listing {:?} files..", ID::TYPE));
let ids: Vec<ID> = repo.list()?.collect();
p.finish();
lock_files(repo, &ids, until)
}

fn lock_files<P: ProgressBars, S, ID: RepoId>(
repo: &Repository<P, S>,
ids: &[ID],
until: Option<DateTime<Local>>,
) -> RusticResult<()> {
let pool = ThreadPoolBuilder::new()
.num_threads(constants::MAX_LOCKER_THREADS_NUM)
.build()
.map_err(RepositoryErrorKind::FromThreadPoolbilderError)?;
let p = &repo
.pb
.progress_counter(format!("locking {:?} files..", ID::TYPE));
p.set_length(ids.len().try_into().unwrap());
let backend = &repo.be;
pool.in_place_scope(|scope| {
for id in ids {
scope.spawn(move |_| {
if let Err(e) = backend.lock(ID::TYPE, id, until) {
// FIXME: Use error handling
error!("lock failed for {:?} {id:?}. {e}", ID::TYPE);
};
p.inc(1);
});
}
});
p.finish();
Ok(())
}

/// Find packs which are needed for the given Trees
///
/// # Arguments
///
/// * `index` - The index to use
/// * `trees` - The trees to consider
/// * `pb` - The progress bars
///
/// # Errors
///
// TODO!: add errors!
fn find_needed_packs(
be: &impl DecryptReadBackend,
index: &impl ReadGlobalIndex,
trees: Vec<TreeId>,
pb: &impl ProgressBars,
) -> RusticResult<BTreeSet<PackId>> {
let p = pb.progress_counter("finding needed packs...");

let mut packs = BTreeSet::new();

for tree_id in &trees {
let blob_id = BlobId::from(*tree_id);
_ = packs.insert(
index
.get_id(BlobType::Tree, &blob_id)
.ok_or_else(|| CommandErrorKind::BlobIdNotFoundinIndex(blob_id))?
.pack,
);
}

let mut tree_streamer = TreeStreamerOnce::new(be, index, trees, p)?;
while let Some(item) = tree_streamer.next().transpose()? {
let (_, tree) = item;
for node in tree.nodes {
match node.node_type {
NodeType::File => {
for id in node.content.iter().flatten() {
let blob_id = BlobId::from(*id);
_ = packs.insert(
index
.get_id(BlobType::Data, &blob_id)
.ok_or_else(|| CommandErrorKind::BlobIdNotFoundinIndex(blob_id))?
.pack,
);
}
}
NodeType::Dir => {
let id = &node.subtree.unwrap();
let blob_id = BlobId::from(*id);
_ = packs.insert(
index
.get_id(BlobType::Tree, &blob_id)
.ok_or_else(|| CommandErrorKind::BlobIdNotFoundinIndex(blob_id))?
.pack,
);
}
_ => {} // nothing to do
}
}
}

Ok(packs)
}
Loading

0 comments on commit 1b9d988

Please sign in to comment.