From 6dd51dda10c891cc6ed4f1f052f6217894435ea3 Mon Sep 17 00:00:00 2001 From: Alexander Weiss Date: Sat, 5 Oct 2024 07:02:10 +0200 Subject: [PATCH 1/2] feat: Add lock_repo command --- crates/core/src/backend.rs | 31 ++++++- crates/core/src/backend/cache.rs | 9 ++ crates/core/src/backend/decrypt.rs | 9 ++ crates/core/src/backend/dry_run.rs | 9 ++ crates/core/src/backend/hotcold.rs | 9 ++ crates/core/src/backend/lock.rs | 113 ++++++++++++++++++++++++++ crates/core/src/commands.rs | 1 + crates/core/src/commands/lock.rs | 75 +++++++++++++++++ crates/core/src/error.rs | 2 + crates/core/src/repository.rs | 27 +++++- crates/core/src/repository/warm_up.rs | 7 +- 11 files changed, 286 insertions(+), 6 deletions(-) create mode 100644 crates/core/src/backend/lock.rs create mode 100644 crates/core/src/commands/lock.rs diff --git a/crates/core/src/backend.rs b/crates/core/src/backend.rs index b232ceb1..ba05d4df 100644 --- a/crates/core/src/backend.rs +++ b/crates/core/src/backend.rs @@ -6,6 +6,7 @@ pub(crate) mod dry_run; pub(crate) mod hotcold; pub(crate) mod ignore; pub(crate) mod local_destination; +pub(crate) mod lock; pub(crate) mod node; pub(crate) mod stdin; pub(crate) mod warm_up; @@ -14,8 +15,9 @@ use std::{io::Read, ops::Deref, path::PathBuf, sync::Arc}; use anyhow::Result; use bytes::Bytes; +use chrono::{DateTime, Local}; use enum_map::Enum; -use log::trace; +use log::{debug, trace}; #[cfg(test)] use mockall::mock; @@ -337,6 +339,27 @@ pub trait WriteBackend: ReadBackend { /// /// The result of the removal. fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()>; + + /// Specify if the backend is able to lock files + fn can_lock(&self) -> bool { + false + } + + /// Lock the given file. + /// + /// # Arguments + /// + /// * `tpe` - The type of the file. + /// * `id` - The id of the file. + /// * `until` - The date until when to lock. May be `None` which usually specifies a unlimited lock + /// + /// # Errors + /// + /// If the file could not be read. + fn lock(&self, tpe: FileType, id: &Id, until: Option>) -> Result<()> { + debug!("no locking implemented. {tpe:?}, {id}, {until:?}"); + Ok(()) + } } #[cfg(test)] @@ -374,6 +397,12 @@ impl WriteBackend for Arc { fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()> { self.deref().remove(tpe, id, cacheable) } + fn can_lock(&self) -> bool { + self.deref().can_lock() + } + fn lock(&self, tpe: FileType, id: &Id, until: Option>) -> Result<()> { + self.deref().lock(tpe, id, until) + } } impl ReadBackend for Arc { diff --git a/crates/core/src/backend/cache.rs b/crates/core/src/backend/cache.rs index d00d9699..fc7f73b7 100644 --- a/crates/core/src/backend/cache.rs +++ b/crates/core/src/backend/cache.rs @@ -8,6 +8,7 @@ use std::{ use anyhow::Result; use bytes::Bytes; +use chrono::{DateTime, Local}; use dirs::cache_dir; use log::{trace, warn}; use walkdir::WalkDir; @@ -210,6 +211,14 @@ impl WriteBackend for CachedBackend { } self.be.remove(tpe, id, cacheable) } + + fn can_lock(&self) -> bool { + self.be.can_lock() + } + + fn lock(&self, tpe: FileType, id: &Id, until: Option>) -> Result<()> { + self.be.lock(tpe, id, until) + } } /// Backend that caches data in a directory. diff --git a/crates/core/src/backend/decrypt.rs b/crates/core/src/backend/decrypt.rs index e90d6f86..80970684 100644 --- a/crates/core/src/backend/decrypt.rs +++ b/crates/core/src/backend/decrypt.rs @@ -2,6 +2,7 @@ use std::{num::NonZeroU32, sync::Arc}; use anyhow::Result; use bytes::Bytes; +use chrono::{DateTime, Local}; use crossbeam_channel::{unbounded, Receiver}; use rayon::prelude::*; use zstd::stream::{copy_encode, decode_all, encode_all}; @@ -578,6 +579,14 @@ impl WriteBackend for DecryptBackend { fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()> { self.be.remove(tpe, id, cacheable) } + + fn can_lock(&self) -> bool { + self.be.can_lock() + } + + fn lock(&self, tpe: FileType, id: &Id, until: Option>) -> Result<()> { + self.be.lock(tpe, id, until) + } } #[cfg(test)] diff --git a/crates/core/src/backend/dry_run.rs b/crates/core/src/backend/dry_run.rs index e93c9023..261bcdd6 100644 --- a/crates/core/src/backend/dry_run.rs +++ b/crates/core/src/backend/dry_run.rs @@ -1,5 +1,6 @@ use anyhow::Result; use bytes::Bytes; +use chrono::{DateTime, Local}; use zstd::decode_all; use crate::{ @@ -156,4 +157,12 @@ impl WriteBackend for DryRunBackend { self.be.remove(tpe, id, cacheable) } } + + fn can_lock(&self) -> bool { + self.be.can_lock() + } + + fn lock(&self, tpe: FileType, id: &Id, until: Option>) -> Result<()> { + self.be.lock(tpe, id, until) + } } diff --git a/crates/core/src/backend/hotcold.rs b/crates/core/src/backend/hotcold.rs index 75a1f750..798005d6 100644 --- a/crates/core/src/backend/hotcold.rs +++ b/crates/core/src/backend/hotcold.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use anyhow::Result; use bytes::Bytes; +use chrono::{DateTime, Local}; use crate::{ backend::{FileType, ReadBackend, WriteBackend}, @@ -98,4 +99,12 @@ impl WriteBackend for HotColdBackend { } Ok(()) } + + fn can_lock(&self) -> bool { + self.be.can_lock() + } + + fn lock(&self, tpe: FileType, id: &Id, until: Option>) -> Result<()> { + self.be.lock(tpe, id, until) + } } diff --git a/crates/core/src/backend/lock.rs b/crates/core/src/backend/lock.rs new file mode 100644 index 00000000..a183c037 --- /dev/null +++ b/crates/core/src/backend/lock.rs @@ -0,0 +1,113 @@ +use std::{process::Command, sync::Arc}; + +use anyhow::Result; +use bytes::Bytes; +use chrono::{DateTime, Local}; +use log::{debug, warn}; + +use crate::{ + backend::{FileType, ReadBackend, WriteBackend}, + id::Id, + CommandInput, +}; + +/// A backend which warms up files by simply accessing them. +#[derive(Clone, Debug)] +pub struct LockBackend { + /// The backend to use. + be: Arc, + /// The command to be called to lock files in the backend + command: CommandInput, +} + +impl LockBackend { + /// Creates a new `WarmUpAccessBackend`. + /// + /// # Arguments + /// + /// * `be` - The backend to use. + pub fn new_lock(be: Arc, command: CommandInput) -> Arc { + Arc::new(Self { be, command }) + } +} + +impl ReadBackend for LockBackend { + fn location(&self) -> String { + self.be.location() + } + + fn list_with_size(&self, tpe: FileType) -> Result> { + self.be.list_with_size(tpe) + } + + fn read_full(&self, tpe: FileType, id: &Id) -> Result { + self.be.read_full(tpe, id) + } + + fn read_partial( + &self, + tpe: FileType, + id: &Id, + cacheable: bool, + offset: u32, + length: u32, + ) -> Result { + self.be.read_partial(tpe, id, cacheable, offset, length) + } + + fn list(&self, tpe: FileType) -> Result> { + self.be.list(tpe) + } + + fn needs_warm_up(&self) -> bool { + self.be.needs_warm_up() + } + + fn warm_up(&self, tpe: FileType, id: &Id) -> Result<()> { + self.be.warm_up(tpe, id) + } +} + +fn path(tpe: FileType, id: &Id) -> String { + let hex_id = id.to_hex(); + match tpe { + FileType::Config => "config".into(), + FileType::Pack => format!("data/{}/{}", &hex_id[0..2], &*hex_id), + _ => format!("{}/{}", tpe.dirname(), &*hex_id), + } +} + +impl WriteBackend for LockBackend { + fn create(&self) -> Result<()> { + self.be.create() + } + + fn write_bytes(&self, tpe: FileType, id: &Id, cacheable: bool, buf: Bytes) -> Result<()> { + self.be.write_bytes(tpe, id, cacheable, buf) + } + + fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()> { + self.be.remove(tpe, id, cacheable) + } + + fn can_lock(&self) -> bool { + true + } + + fn lock(&self, tpe: FileType, id: &Id, until: Option>) -> Result<()> { + let until = until.map_or_else(String::new, |u| u.to_rfc3339()); + let path = path(tpe, id); + let args = self.command.args().iter().map(|c| { + c.replace("%id", &id.to_hex()) + .replace("%type", tpe.dirname()) + .replace("%path", &path) + .replace("%until", &until) + }); + debug!("calling {:?}...", self.command); + let status = Command::new(self.command.command()).args(args).status()?; + if !status.success() { + warn!("lock command was not successful for {tpe:?}, id: {id}. {status}"); + } + Ok(()) + } +} diff --git a/crates/core/src/commands.rs b/crates/core/src/commands.rs index e4d15a12..37b4262d 100644 --- a/crates/core/src/commands.rs +++ b/crates/core/src/commands.rs @@ -12,6 +12,7 @@ pub mod dump; pub mod forget; pub mod init; pub mod key; +pub mod lock; pub mod merge; pub mod prune; /// The `repair` command. diff --git a/crates/core/src/commands/lock.rs b/crates/core/src/commands/lock.rs new file mode 100644 index 00000000..08186ffb --- /dev/null +++ b/crates/core/src/commands/lock.rs @@ -0,0 +1,75 @@ +//! `lock` subcommand + +use chrono::{DateTime, Local}; +use log::error; +use rayon::ThreadPoolBuilder; + +use crate::{ + error::{CommandErrorKind, RepositoryErrorKind, RusticResult}, + progress::{Progress, ProgressBars}, + repofile::{configfile::ConfigId, IndexId, KeyId, PackId, RepoId, SnapshotId}, + repository::Repository, + WriteBackend, +}; + +pub(super) mod constants { + /// The maximum number of reader threads to use for locking. + pub(super) const MAX_LOCKER_THREADS_NUM: usize = 20; +} + +pub fn lock_repo( + repo: &Repository, + until: Option>, +) -> RusticResult<()> { + lock_all_files::(repo, until)?; + lock_all_files::(repo, until)?; + lock_all_files::(repo, until)?; + lock_all_files::(repo, until)?; + lock_all_files::(repo, until)?; + Ok(()) +} + +pub fn lock_all_files( + repo: &Repository, + until: Option>, +) -> RusticResult<()> { + if !repo.be.can_lock() { + return Err(CommandErrorKind::NoLockingConfigured.into()); + } + + let p = &repo + .pb + .progress_spinner(format!("listing {:?} files..", ID::TYPE)); + let ids: Vec = repo.list()?.collect(); + p.finish(); + lock_files(repo, &ids, until) +} + +fn lock_files( + repo: &Repository, + ids: &[ID], + until: Option>, +) -> RusticResult<()> { + let pool = ThreadPoolBuilder::new() + .num_threads(constants::MAX_LOCKER_THREADS_NUM) + .build() + .map_err(RepositoryErrorKind::FromThreadPoolbilderError)?; + let p = &repo + .pb + .progress_counter(format!("locking {:?} files..", ID::TYPE)); + p.set_length(ids.len().try_into().unwrap()); + let backend = &repo.be; + pool.in_place_scope(|scope| { + for id in ids { + scope.spawn(move |_| { + if let Err(e) = backend.lock(ID::TYPE, id, until) { + // FIXME: Use error handling + error!("lock failed for {:?} {id:?}. {e}", ID::TYPE); + }; + p.inc(1); + }); + } + }); + p.finish(); + Ok(()) +} diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs index 8c6c0967..e4b58732 100644 --- a/crates/core/src/error.rs +++ b/crates/core/src/error.rs @@ -229,6 +229,8 @@ pub enum CommandErrorKind { NoKeepOption, /// {0:?} FromParseError(#[from] shell_words::ParseError), + /// No locking capability configured for the backend + NoLockingConfigured, } /// [`CryptoErrorKind`] describes the errors that can happen while dealing with Cryptographic functions diff --git a/crates/core/src/repository.rs b/crates/core/src/repository.rs index 03a7d261..96bc46a2 100644 --- a/crates/core/src/repository.rs +++ b/crates/core/src/repository.rs @@ -13,6 +13,7 @@ use std::{ }; use bytes::Bytes; +use chrono::{DateTime, Local}; use derive_setters::Setters; use log::{debug, error, info}; use serde_with::{serde_as, DisplayFromStr}; @@ -23,6 +24,7 @@ use crate::{ decrypt::{DecryptBackend, DecryptReadBackend, DecryptWriteBackend}, hotcold::HotColdBackend, local_destination::LocalDestination, + lock::LockBackend, node::Node, warm_up::WarmUpAccessBackend, FileType, ReadBackend, WriteBackend, @@ -39,6 +41,7 @@ use crate::{ copy::CopySnapshot, forget::{ForgetGroups, KeepOptions}, key::KeyOptions, + lock::lock_repo, prune::{PruneOptions, PrunePlan}, repair::{ index::{index_checked_from_collector, RepairIndexOptions}, @@ -151,7 +154,7 @@ pub struct RepositoryOptions { /// Warm up needed data pack files by running the command with %id replaced by pack id #[cfg_attr( feature = "clap", - clap(long, global = true, conflicts_with = "warm_up",) + clap(long, global = true, conflicts_with = "warm_up") )] #[cfg_attr(feature = "merge", merge(strategy = conflate::option::overwrite_none))] pub warm_up_command: Option, @@ -161,6 +164,11 @@ pub struct RepositoryOptions { #[serde_as(as = "Option")] #[cfg_attr(feature = "merge", merge(strategy = conflate::option::overwrite_none))] pub warm_up_wait: Option, + + /// Lock files by running the command with %id replaced by pack id, %type by the file type, %path by file path and %until by the until date + #[cfg_attr(feature = "clap", clap(long, global = true))] + #[cfg_attr(feature = "merge", merge(strategy = conflate::option::overwrite_none))] + pub lock_command: Option, } impl RepositoryOptions { @@ -360,6 +368,10 @@ impl

Repository { be = WarmUpAccessBackend::new_warm_up(be); } + if let Some(lock_command) = opts.lock_command.as_ref() { + be = LockBackend::new_lock(be, lock_command.clone()); + } + let mut name = be.location(); if let Some(be_hot) = &be_hot { be = Arc::new(HotColdBackend::new(be, be_hot.clone())); @@ -1195,6 +1207,19 @@ impl Repository { opts.get_plan(self) } + /// Lock the complete repository, i.e. everything in the storage backend. + /// + /// # Arguments + /// + /// * `until` - until when to lock. None means lock forever. + /// + /// # Errors + /// + // TODO: Document errors + pub fn lock_repo(&self, until: Option>) -> RusticResult<()> { + lock_repo(self, until) + } + /// Turn the repository into the `IndexedFull` state by reading and storing the index /// /// # Errors diff --git a/crates/core/src/repository/warm_up.rs b/crates/core/src/repository/warm_up.rs index 14ea76da..081e0d16 100644 --- a/crates/core/src/repository/warm_up.rs +++ b/crates/core/src/repository/warm_up.rs @@ -92,13 +92,12 @@ fn warm_up_command( let p = pb.progress_counter("warming up packs..."); p.set_length(packs.len() as u64); for pack in packs { - let args: Vec<_> = command + let args = command .args() .iter() - .map(|c| c.replace("%id", &pack.to_hex())) - .collect(); + .map(|c| c.replace("%id", &pack.to_hex())); debug!("calling {command:?}..."); - let status = Command::new(command.command()).args(&args).status()?; + let status = Command::new(command.command()).args(args).status()?; if !status.success() { warn!("warm-up command was not successful for pack {pack:?}. {status}"); } From 33290a914537f83b97812faa5502ccad7e4d89e3 Mon Sep 17 00:00:00 2001 From: simonsan <14062932+simonsan@users.noreply.github.com> Date: Sun, 6 Oct 2024 16:03:14 +0200 Subject: [PATCH 2/2] Proposed changes from review Signed-off-by: simonsan <14062932+simonsan@users.noreply.github.com> --- crates/core/src/backend.rs | 7 ++++++- crates/core/src/backend/cache.rs | 4 ++++ crates/core/src/backend/decrypt.rs | 4 ++++ crates/core/src/backend/dry_run.rs | 4 ++++ crates/core/src/backend/hotcold.rs | 4 ++++ crates/core/src/backend/lock.rs | 22 ++++++++++++++++++---- crates/core/src/commands/lock.rs | 6 +++--- 7 files changed, 43 insertions(+), 8 deletions(-) diff --git a/crates/core/src/backend.rs b/crates/core/src/backend.rs index ba05d4df..24948af2 100644 --- a/crates/core/src/backend.rs +++ b/crates/core/src/backend.rs @@ -358,7 +358,12 @@ pub trait WriteBackend: ReadBackend { /// If the file could not be read. fn lock(&self, tpe: FileType, id: &Id, until: Option>) -> Result<()> { debug!("no locking implemented. {tpe:?}, {id}, {until:?}"); - Ok(()) + + if self.can_lock() { + unimplemented!("Using default implementation. No locking implemented in backend."); + } else { + Err(anyhow::anyhow!("No locking configured.")) + } } } diff --git a/crates/core/src/backend/cache.rs b/crates/core/src/backend/cache.rs index fc7f73b7..d849a6c5 100644 --- a/crates/core/src/backend/cache.rs +++ b/crates/core/src/backend/cache.rs @@ -217,6 +217,10 @@ impl WriteBackend for CachedBackend { } fn lock(&self, tpe: FileType, id: &Id, until: Option>) -> Result<()> { + if !self.can_lock() { + return Err(anyhow::anyhow!("No locking configured.")); + } + self.be.lock(tpe, id, until) } } diff --git a/crates/core/src/backend/decrypt.rs b/crates/core/src/backend/decrypt.rs index 80970684..20e373ea 100644 --- a/crates/core/src/backend/decrypt.rs +++ b/crates/core/src/backend/decrypt.rs @@ -585,6 +585,10 @@ impl WriteBackend for DecryptBackend { } fn lock(&self, tpe: FileType, id: &Id, until: Option>) -> Result<()> { + if !self.can_lock() { + return Err(anyhow::anyhow!("No locking configured.")); + } + self.be.lock(tpe, id, until) } } diff --git a/crates/core/src/backend/dry_run.rs b/crates/core/src/backend/dry_run.rs index 261bcdd6..244d8710 100644 --- a/crates/core/src/backend/dry_run.rs +++ b/crates/core/src/backend/dry_run.rs @@ -163,6 +163,10 @@ impl WriteBackend for DryRunBackend { } fn lock(&self, tpe: FileType, id: &Id, until: Option>) -> Result<()> { + if !self.can_lock() { + return Err(anyhow::anyhow!("No locking configured.")); + } + self.be.lock(tpe, id, until) } } diff --git a/crates/core/src/backend/hotcold.rs b/crates/core/src/backend/hotcold.rs index 798005d6..e98fd7f7 100644 --- a/crates/core/src/backend/hotcold.rs +++ b/crates/core/src/backend/hotcold.rs @@ -105,6 +105,10 @@ impl WriteBackend for HotColdBackend { } fn lock(&self, tpe: FileType, id: &Id, until: Option>) -> Result<()> { + if !self.can_lock() { + return Err(anyhow::anyhow!("No locking configured.")); + } + self.be.lock(tpe, id, until) } } diff --git a/crates/core/src/backend/lock.rs b/crates/core/src/backend/lock.rs index a183c037..0575a6cb 100644 --- a/crates/core/src/backend/lock.rs +++ b/crates/core/src/backend/lock.rs @@ -3,7 +3,7 @@ use std::{process::Command, sync::Arc}; use anyhow::Result; use bytes::Bytes; use chrono::{DateTime, Local}; -use log::{debug, warn}; +use log::{debug, error}; use crate::{ backend::{FileType, ReadBackend, WriteBackend}, @@ -68,7 +68,7 @@ impl ReadBackend for LockBackend { } } -fn path(tpe: FileType, id: &Id) -> String { +fn path_to_id_from_file_type(tpe: FileType, id: &Id) -> String { let hex_id = id.to_hex(); match tpe { FileType::Config => "config".into(), @@ -95,19 +95,33 @@ impl WriteBackend for LockBackend { } fn lock(&self, tpe: FileType, id: &Id, until: Option>) -> Result<()> { + if !self.can_lock() { + return Err(anyhow::anyhow!("No locking configured.")); + } + let until = until.map_or_else(String::new, |u| u.to_rfc3339()); - let path = path(tpe, id); + + let path = path_to_id_from_file_type(tpe, id); + let args = self.command.args().iter().map(|c| { c.replace("%id", &id.to_hex()) .replace("%type", tpe.dirname()) .replace("%path", &path) .replace("%until", &until) }); + debug!("calling {:?}...", self.command); + let status = Command::new(self.command.command()).args(args).status()?; + if !status.success() { - warn!("lock command was not successful for {tpe:?}, id: {id}. {status}"); + error!("lock command was not successful for {tpe:?}, id: {id}. {status}"); + + return Err(anyhow::anyhow!( + "lock command was not successful for {tpe:?}, id: {id}. {status}" + )); } + Ok(()) } } diff --git a/crates/core/src/commands/lock.rs b/crates/core/src/commands/lock.rs index 08186ffb..47b93118 100644 --- a/crates/core/src/commands/lock.rs +++ b/crates/core/src/commands/lock.rs @@ -62,9 +62,9 @@ fn lock_files( pool.in_place_scope(|scope| { for id in ids { scope.spawn(move |_| { - if let Err(e) = backend.lock(ID::TYPE, id, until) { - // FIXME: Use error handling - error!("lock failed for {:?} {id:?}. {e}", ID::TYPE); + if let Err(err) = backend.lock(ID::TYPE, id, until) { + // FIXME: Use error handling, e.g. use a channel to collect the errors + error!("lock failed for {:?} {id:?}. {err}", ID::TYPE); }; p.inc(1); });