Skip to content

Commit

Permalink
refactor: make PackageCache multi-process safe (#837)
Browse files Browse the repository at this point in the history
  • Loading branch information
baszalmstra committed Sep 3, 2024
1 parent c02c992 commit 3c4d098
Show file tree
Hide file tree
Showing 41 changed files with 965 additions and 579 deletions.
File renamed without changes.
4 changes: 1 addition & 3 deletions .github/workflows/rust-compile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ jobs:
submodules: recursive
- uses: actions-rust-lang/setup-rust-toolchain@v1
- run: |
for package in $(cargo metadata --no-deps --format-version=1 | jq -r '.packages[] | .name'); do
cargo rustdoc -p "$package" --all-features -- -D warnings -W unreachable-pub
done
RUSTDOCFLAGS="-Dwarnings -Wunreachable-pub" cargo doc --no-deps --all --all-features
format_and_lint:
name: Format and Lint
Expand Down
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ async-compression = { version = "0.4.8", features = [
"bzip2",
"zstd",
] }
async-fd-lock = "0.2.0"
fs4 = "0.9.1"
async-trait = "0.1.80"
axum = { version = "0.7.5", default-features = false, features = [
"tokio",
Expand Down Expand Up @@ -113,7 +115,7 @@ retry-policies = { version = "0.4.0", default-features = false }
rmp-serde = { version = "1.2.0" }
rstest = { version = "0.21.0" }
rstest_reuse = "0.7.0"
simd-json = { version = "0.13.10" , features = ["serde_impl"]}
simd-json = { version = "0.13.10", features = ["serde_impl"] }
serde = { version = "1.0.198" }
serde_json = { version = "1.0.116" }
serde_repr = "0.1"
Expand Down
27 changes: 14 additions & 13 deletions crates/rattler/src/install/installer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,22 @@ use std::{
sync::Arc,
};

use super::{unlink_package, AppleCodeSignBehavior, InstallDriver, InstallOptions, Transaction};
use crate::install::link_script::LinkScriptError;
use crate::{
default_cache_dir,
install::{clobber_registry::ClobberedPath, link_script::PrePostLinkResult},
package_cache::PackageCache,
};
pub use error::InstallerError;
use futures::{stream::FuturesUnordered, FutureExt, StreamExt, TryFutureExt};
#[cfg(feature = "indicatif")]
pub use indicatif::{
DefaultProgressFormatter, IndicatifReporter, IndicatifReporterBuilder, Placement,
ProgressFormatter,
};
use rattler_cache::package_cache::CacheLock;
use rattler_cache::package_cache::CacheReporter;
use rattler_conda_types::{
prefix_record::{Link, LinkType},
Platform, PrefixRecord, RepoDataRecord,
Expand All @@ -26,14 +35,6 @@ use reqwest::Client;
use simple_spawn_blocking::tokio::run_blocking_task;
use tokio::{sync::Semaphore, task::JoinError};

use super::{unlink_package, AppleCodeSignBehavior, InstallDriver, InstallOptions, Transaction};
use crate::install::link_script::LinkScriptError;
use crate::{
default_cache_dir,
install::{clobber_registry::ClobberedPath, link_script::PrePostLinkResult},
package_cache::{CacheReporter, PackageCache},
};

/// An installer that can install packages into a prefix.
#[derive(Default)]
pub struct Installer {
Expand Down Expand Up @@ -366,7 +367,7 @@ impl Installer {
let cache_index = r.on_populate_cache_start(idx, &record);
(r, cache_index)
});
let cache_path = populate_cache(
let cache_lock = populate_cache(
&record,
downloader,
&package_cache,
Expand All @@ -376,7 +377,7 @@ impl Installer {
if let Some((reporter, index)) = populate_cache_report {
reporter.on_populate_cache_complete(index);
}
Ok((cache_path, record))
Ok((cache_lock, record))
})
.map_err(JoinError::try_into_panic)
.map(|res| match res {
Expand Down Expand Up @@ -405,14 +406,14 @@ impl Installer {
}

// Install the package if it was fetched.
if let Some((cached_path, record)) = package_to_install.await? {
if let Some((cache_lock, record)) = package_to_install.await? {
let reporter = reporter
.as_deref()
.map(|r| (r, r.on_link_start(idx, &record)));
link_package(
&record,
prefix.as_ref(),
&cached_path,
cache_lock.path(),
base_install_options.clone(),
driver,
)
Expand Down Expand Up @@ -519,7 +520,7 @@ async fn populate_cache(
downloader: reqwest_middleware::ClientWithMiddleware,
cache: &PackageCache,
reporter: Option<(Arc<dyn Reporter>, usize)>,
) -> Result<PathBuf, InstallerError> {
) -> Result<CacheLock, InstallerError> {
struct CacheReporterBridge {
reporter: Arc<dyn Reporter>,
cache_index: usize,
Expand Down
11 changes: 6 additions & 5 deletions crates/rattler/src/install/link_script.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use std::{
borrow::Borrow,
collections::{HashMap, HashSet},
fmt::{Display, Formatter},
path::Path,
};

Expand Down Expand Up @@ -57,11 +58,11 @@ impl LinkScriptType {
}
}

impl ToString for LinkScriptType {
fn to_string(&self) -> String {
impl Display for LinkScriptType {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
LinkScriptType::PreUnlink => "pre-unlink".to_string(),
LinkScriptType::PostLink => "post-link".to_string(),
LinkScriptType::PreUnlink => write!(f, "pre-unlink"),
LinkScriptType::PostLink => write!(f, "post-link"),
}
}
}
Expand Down Expand Up @@ -103,7 +104,7 @@ pub fn run_link_scripts<'a>(
let mut messages = HashMap::<PackageName, String>::new();
for record in prefix_records {
let prec = &record.repodata_record.package_record;
let link_file = target_prefix.join(&link_script_type.get_path(prec, platform));
let link_file = target_prefix.join(link_script_type.get_path(prec, platform));

if link_file.exists() {
env.insert(
Expand Down
4 changes: 2 additions & 2 deletions crates/rattler/src/install/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -799,7 +799,7 @@ mod test {
async move {
// Populate the cache
let package_info = ArchiveIdentifier::try_from_url(&package_url).unwrap();
let package_dir = package_cache
let package_cache_lock = package_cache
.get_or_fetch_from_url(
package_info,
package_url.clone(),
Expand All @@ -811,7 +811,7 @@ mod test {

// Install the package to the prefix
link_package(
&package_dir,
package_cache_lock.path(),
prefix_path,
install_driver,
InstallOptions {
Expand Down
12 changes: 6 additions & 6 deletions crates/rattler/src/install/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,16 @@ pub async fn install_package_to_environment(

// Create the conda-meta directory if it doesnt exist yet.
let target_prefix = target_prefix.to_path_buf();
match tokio::task::spawn_blocking(move || {
let result = tokio::task::spawn_blocking(move || {
let conda_meta_path = target_prefix.join("conda-meta");
std::fs::create_dir_all(&conda_meta_path)?;

// Write the conda-meta information
let pkg_meta_path = conda_meta_path.join(prefix_record.file_name());
prefix_record.write_to_path(pkg_meta_path, true)
})
.await
{
.await;
match result {
Ok(result) => Ok(result?),
Err(err) => {
if let Ok(panic) = err.try_into_panic() {
Expand Down Expand Up @@ -95,7 +95,7 @@ pub async fn execute_operation(
default_retry_policy(),
None,
)
.map_ok(|cache_dir| Some((install_record.clone(), cache_dir)))
.map_ok(|cache_lock| Some((install_record.clone(), cache_lock)))
.map_err(anyhow::Error::from)
.await
.unwrap()
Expand All @@ -104,10 +104,10 @@ pub async fn execute_operation(
};

// If there is a package to install, do that now.
if let Some((record, package_dir)) = install_package {
if let Some((record, package_cache_lock)) = install_package {
install_package_to_environment(
target_prefix,
package_dir,
package_cache_lock.path().to_path_buf(),
record.clone(),
install_driver,
install_options,
Expand Down
6 changes: 5 additions & 1 deletion crates/rattler_cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ readme.workspace = true

[dependencies]
anyhow.workspace = true
dashmap.workspace = true
dirs.workspace = true
futures.workspace = true
fxhash.workspace = true
itertools.workspace = true
parking_lot.workspace = true
Expand All @@ -20,12 +22,14 @@ rattler_digest = { version = "1.0.1", path = "../rattler_digest", default-featur
rattler_networking = { version = "0.21.2", path = "../rattler_networking", default-features = false }
rattler_package_streaming = { version = "0.22.4", path = "../rattler_package_streaming", default-features = false, features = ["reqwest"] }
reqwest.workspace = true
tokio.workspace = true
tokio = { workspace = true, features = ["macros"] }
tracing.workspace = true
url.workspace = true
thiserror.workspace = true
reqwest-middleware.workspace = true
digest.workspace = true
fs4 = { workspace = true, features = ["fs-err-tokio"] }
simple_spawn_blocking = { version = "1.0.0", path = "../simple_spawn_blocking"}

[dev-dependencies]
assert_matches.workspace = true
Expand Down
64 changes: 64 additions & 0 deletions crates/rattler_cache/src/package_cache/cache_key.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use rattler_conda_types::package::ArchiveIdentifier;
use rattler_conda_types::PackageRecord;
use rattler_digest::Sha256Hash;
use std::fmt::{Display, Formatter};

/// Provides a unique identifier for packages in the cache.
/// TODO: This could not be unique over multiple subdir. How to handle?
/// TODO: Wouldn't it be better to cache based on hashes?
#[derive(Debug, Hash, Clone, Eq, PartialEq)]
pub struct CacheKey {
name: String,
version: String,
build_string: String,
sha256: Option<Sha256Hash>,
}

impl CacheKey {
/// Adds a sha256 hash of the archive.
pub fn with_sha256(mut self, sha256: Sha256Hash) -> Self {
self.sha256 = Some(sha256);
self
}

/// Potentially adds a sha256 hash of the archive.
pub fn with_opt_sha256(mut self, sha256: Option<Sha256Hash>) -> Self {
self.sha256 = sha256;
self
}
}

impl CacheKey {
/// Return the sha256 hash of the package if it is known.
pub fn sha256(&self) -> Option<Sha256Hash> {
self.sha256
}
}

impl From<ArchiveIdentifier> for CacheKey {
fn from(pkg: ArchiveIdentifier) -> Self {
CacheKey {
name: pkg.name,
version: pkg.version,
build_string: pkg.build_string,
sha256: None,
}
}
}

impl From<&PackageRecord> for CacheKey {
fn from(record: &PackageRecord) -> Self {
Self {
name: record.name.as_normalized().to_string(),
version: record.version.to_string(),
build_string: record.build.clone(),
sha256: record.sha256,
}
}
}

impl Display for CacheKey {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}-{}-{}", &self.name, &self.version, &self.build_string)
}
}
Loading

0 comments on commit 3c4d098

Please sign in to comment.