Skip to content

Commit

Permalink
Heal cache entries with missing source distributions (#7559)
Browse files Browse the repository at this point in the history
## Summary

`uv cache prune --ci` will remove the source distribution directory. If
we then need to build a _different_ wheel (e.g., you're building a
package that has Python minor version-specific wheels), we fail, because
we expect the source to be there.

Now, if the source is missing, we re-download it. It would be slightly
easier to just _ignore_ that revision, but that would mean we'd also
lose the already-built wheels -- so if you ran against many Python
versions, we'd continuously lose the cached data.

Closes #7543.

## Test Plan

We can add tests, but they _need_ to build non-pure Python wheels, which
tends to be expensive...

For reference:

```console
$ cargo run venv --python 3.12
$ cargo run pip install mercurial==6.8.1 --verbose
$ cargo run cache prune --ci
$ cargo run venv --python 3.11
$ cargo run pip install mercurial==6.8.1 --verbose
```

I also did this with a local `.tar.gz` that I downloaded from PyPI.
  • Loading branch information
charliermarsh authored Sep 19, 2024
1 parent 5de6d23 commit f3463b3
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 10 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions crates/uv-cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ impl CacheEntry {
Self(path.into())
}

/// Return the cache entry's parent directory.
pub fn shard(&self) -> CacheShard {
CacheShard(self.dir().to_path_buf())
}

/// Convert the [`CacheEntry`] into a [`PathBuf`].
#[inline]
pub fn into_path_buf(self) -> PathBuf {
Expand Down
1 change: 1 addition & 0 deletions crates/uv-distribution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ anyhow = { workspace = true }
fs-err = { workspace = true }
futures = { workspace = true }
nanoid = { workspace = true }
owo-colors = { workspace = true }
reqwest = { workspace = true }
reqwest-middleware = { workspace = true }
rmp-serde = { workspace = true }
Expand Down
3 changes: 3 additions & 0 deletions crates/uv-distribution/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::path::PathBuf;

use owo_colors::OwoColorize;
use tokio::task::JoinError;
use url::Url;
use zip::result::ZipError;
Expand Down Expand Up @@ -93,6 +94,8 @@ pub enum Error {
MetadataLowering(#[from] MetadataError),
#[error("Distribution not found at: {0}")]
NotFound(Url),
#[error("Attempted to re-extract the source distribution for `{0}`, but the hashes didn't match. Run `{}` to clear the cache.", "uv cache clean".green())]
CacheHeal(String),

/// A generic request middleware error happened while making a request.
/// Refer to the error message for more details.
Expand Down
138 changes: 128 additions & 10 deletions crates/uv-distribution/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use platform_tags::Tags;
use pypi_types::{HashDigest, Metadata12, Metadata23, RequiresTxt};
use reqwest::Response;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::{debug, info_span, instrument, Instrument};
use tracing::{debug, info_span, instrument, warn, Instrument};
use url::Url;
use uv_cache::{Cache, CacheBucket, CacheEntry, CacheShard, Removal, WheelCache};
use uv_cache_info::CacheInfo;
Expand Down Expand Up @@ -425,6 +425,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
// Scope all operations to the revision. Within the revision, there's no need to check for
// freshness, since entries have to be fresher than the revision itself.
let cache_shard = cache_shard.shard(revision.id());
let source_dist_entry = cache_shard.entry(filename);

// If there are build settings, we need to scope to a cache shard.
let config_settings = self.build_context.config_settings();
Expand All @@ -439,13 +440,29 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
return Ok(built_wheel.with_hashes(revision.into_hashes()));
}

// Otherwise, we need to build a wheel. Before building, ensure that the source is present.
let revision = if source_dist_entry.path().is_dir() {
revision
} else {
self.heal_url_revision(
source,
filename,
ext,
url,
&source_dist_entry,
revision,
hashes,
client,
)
.await?
};

let task = self
.reporter
.as_ref()
.map(|reporter| reporter.on_build_start(source));

// Build the source distribution.
let source_dist_entry = cache_shard.entry(filename);
let (disk_filename, wheel_filename, metadata) = self
.build_distribution(source, source_dist_entry.path(), subdirectory, &cache_shard)
.await?;
Expand Down Expand Up @@ -527,6 +544,23 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
});
}

// Otherwise, we need a wheel.
let revision = if source_dist_entry.path().is_dir() {
revision
} else {
self.heal_url_revision(
source,
filename,
ext,
url,
&source_dist_entry,
revision,
hashes,
client,
)
.await?
};

// If there are build settings, we need to scope to a cache shard.
let config_settings = self.build_context.config_settings();
let cache_shard = if config_settings.is_empty() {
Expand Down Expand Up @@ -686,6 +720,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
// Scope all operations to the revision. Within the revision, there's no need to check for
// freshness, since entries have to be fresher than the revision itself.
let cache_shard = cache_shard.shard(revision.id());
let source_entry = cache_shard.entry("source");

// If there are build settings, we need to scope to a cache shard.
let config_settings = self.build_context.config_settings();
Expand All @@ -700,9 +735,14 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
return Ok(built_wheel);
}

let source_entry = cache_shard.entry("source");
// Otherwise, we need to build a wheel, which requires a source distribution.
let revision = if source_entry.path().is_dir() {
revision
} else {
self.heal_archive_revision(source, resource, &source_entry, revision, hashes)
.await?
};

// Otherwise, we need to build a wheel.
let task = self
.reporter
.as_ref()
Expand Down Expand Up @@ -785,6 +825,14 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
});
}

// Otherwise, we need a source distribution.
let revision = if source_entry.path().is_dir() {
revision
} else {
self.heal_archive_revision(source, resource, &source_entry, revision, hashes)
.await?
};

// If the backend supports `prepare_metadata_for_build_wheel`, use it.
if let Some(metadata) = self
.build_metadata(source, source_entry.path(), None)
Expand Down Expand Up @@ -1346,6 +1394,66 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
))
}

/// Heal a [`Revision`] for a local archive.
async fn heal_archive_revision(
&self,
source: &BuildableSource<'_>,
resource: &PathSourceUrl<'_>,
entry: &CacheEntry,
revision: Revision,
hashes: HashPolicy<'_>,
) -> Result<Revision, Error> {
warn!("Re-extracting missing source distribution: {source}");
let hashes = self
.persist_archive(&resource.path, resource.ext, entry.path(), hashes)
.await?;
if hashes != revision.hashes() {
return Err(Error::CacheHeal(source.to_string()));
}
Ok(revision.with_hashes(hashes))
}

/// Heal a [`Revision`] for a remote archive.
async fn heal_url_revision(
&self,
source: &BuildableSource<'_>,
filename: &str,
ext: SourceDistExtension,
url: &Url,
entry: &CacheEntry,
revision: Revision,
hashes: HashPolicy<'_>,
client: &ManagedClient<'_>,
) -> Result<Revision, Error> {
warn!("Re-downloading missing source distribution: {source}");
let cache_entry = entry.shard().entry(HTTP_REVISION);
let download = |response| {
async {
let hashes = self
.download_archive(response, source, filename, ext, entry.path(), hashes)
.await?;
if hashes != revision.hashes() {
return Err(Error::CacheHeal(source.to_string()));
}
Ok(revision.with_hashes(hashes))
}
.boxed_local()
.instrument(info_span!("download", source_dist = %source))
};
client
.managed(|client| async move {
client
.cached_client()
.skip_cache(Self::request(url.clone(), client)?, &cache_entry, download)
.await
.map_err(|err| match err {
CachedClientError::Callback(err) => err,
CachedClientError::Client(err) => Error::Client(err),
})
})
.await
}

/// Download and unzip a source distribution into the cache from an HTTP response.
async fn download_archive(
&self,
Expand Down Expand Up @@ -1395,9 +1503,14 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
fs_err::tokio::create_dir_all(target.parent().expect("Cache entry to have parent"))
.await
.map_err(Error::CacheWrite)?;
rename_with_retry(extracted, target)
.await
.map_err(Error::CacheWrite)?;
if let Err(err) = rename_with_retry(extracted, target).await {
// If the directory already exists, accept it.
if target.is_dir() {
warn!("Directory already exists: {}", target.display());
} else {
return Err(Error::CacheWrite(err));
}
}

Ok(hashes)
}
Expand Down Expand Up @@ -1448,9 +1561,14 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
fs_err::tokio::create_dir_all(target.parent().expect("Cache entry to have parent"))
.await
.map_err(Error::CacheWrite)?;
rename_with_retry(extracted, &target)
.await
.map_err(Error::CacheWrite)?;
if let Err(err) = rename_with_retry(extracted, target).await {
// If the directory already exists, accept it.
if target.is_dir() {
warn!("Directory already exists: {}", target.display());
} else {
return Err(Error::CacheWrite(err));
}
}

Ok(hashes)
}
Expand Down

0 comments on commit f3463b3

Please sign in to comment.