diff --git a/tests/unit/forklift/test_legacy.py b/tests/unit/forklift/test_legacy.py index 463e0cb9cddd..41c542153eeb 100644 --- a/tests/unit/forklift/test_legacy.py +++ b/tests/unit/forklift/test_legacy.py @@ -2611,6 +2611,81 @@ def storage_service_store(path, file_path, *, meta): # Ensure that a Filename object has been created. db_request.db.query(Filename).filter(Filename.filename == filename).one() + @pytest.mark.parametrize( + "project_name, filename_prefix, version", + [ + ("flufl.enum", "flufl_enum", "1.0.0"), + ("foo-.bar", "foo_bar", "1.0.0"), + ("leo", "leo", "6.7.9-9"), + ("leo_something", "leo-something", "6.7.9-9"), + ], + ) + def test_upload_succeeds_pep625_normalized_filename( + self, + monkeypatch, + db_request, + pyramid_config, + metrics, + project_name, + filename_prefix, + version, + ): + user = UserFactory.create() + EmailFactory.create(user=user) + project = ProjectFactory.create(name=project_name) + RoleFactory.create(user=user, project=project) + + filename = f"{filename_prefix}-{version}.tar.gz" + filebody = _get_whl_testdata(name=project_name, version=version) + + @pretend.call_recorder + def storage_service_store(path, file_path, *, meta): + with open(file_path, "rb") as fp: + if file_path.endswith(".metadata"): + assert fp.read() == b"Fake metadata" + else: + assert fp.read() == filebody + + storage_service = pretend.stub(store=storage_service_store) + + db_request.find_service = pretend.call_recorder( + lambda svc, name=None, context=None: { + IFileStorage: storage_service, + IMetricsService: metrics, + }.get(svc) + ) + + monkeypatch.setattr(legacy, "_is_valid_dist_file", lambda *a, **kw: True) + + pyramid_config.testing_securitypolicy(identity=user) + db_request.user = user + db_request.user_agent = "warehouse-tests/6.6.6" + db_request.POST = MultiDict( + { + "metadata_version": "1.2", + "name": project.name, + "version": version, + "filetype": "sdist", + "pyversion": "source", + "md5_digest": hashlib.md5(filebody).hexdigest(), + "content": pretend.stub( + filename=filename, + file=io.BytesIO(filebody), + type="application/zip", + ), + } + ) + + resp = legacy.file_upload(db_request) + + assert resp.status_code == 200 + + # Ensure that a File object has been created. + db_request.db.query(File).filter(File.filename == filename).one() + + # Ensure that a Filename object has been created. + db_request.db.query(Filename).filter(Filename.filename == filename).one() + def test_upload_succeeds_with_wheel_after_sdist( self, tmpdir, monkeypatch, pyramid_config, db_request, metrics ): diff --git a/warehouse/forklift/legacy.py b/warehouse/forklift/legacy.py index a9c07d59c7b4..75901f829410 100644 --- a/warehouse/forklift/legacy.py +++ b/warehouse/forklift/legacy.py @@ -804,9 +804,25 @@ def file_upload(request): # we can split on the first hyphen. filename.partition("-")[0] if filename.endswith(".whl") - # For source releases, we know that the version should not contain any - # hyphens, so we can split on the last hyphen to get the project name. - else filename.rpartition("-")[0] + # For source releases, the version might contain a hyphen as a + # post-release separator, so we get the prefix by removing the provided + # version. + # Per 625, the version should be normalized, but we aren't currently + # enforcing this, so we permit a filename with either the exact + # provided version if it contains a hyphen, or any version that doesn't + # contain a hyphen. + else ( + # A hyphen is being used for a post-release separator, so partition + # the prefix twice + filename.rpartition("-")[0].rpartition("-")[0] + # Check if the provided version contains a hyphen and the same + # version is being used in the filename + if "-" in form.version.data + and filename.endswith(f"-{form.version.data}.tar.gz") + # The only hyphen should be between the prefix and the version, so + # we only need to partition the prefix once + else filename.rpartition("-")[0] + ) ) # Normalize the prefix in the filename. Eventually this should be unnecessary once