diff --git a/tests/common/constants.py b/tests/common/constants.py index e6392da7bde9..a7689f0868c3 100644 --- a/tests/common/constants.py +++ b/tests/common/constants.py @@ -107,3 +107,88 @@ "YzMjY1LCJleHAiOjE2NTA2NjQxNjUsImlhdCI6MTY1MDY2Mzg2NX0.R4q-vWAFXHrBSBK" "AZuHHIsGOkqlirPxEtLfjLIDiLr0" ) + +""" + { + "namespace_id": "123", + "namespace_path": "foo", + "project_id": "55235664", + "project_path": "foo/bar", + "user_id": "123", + "user_login": "user", + "user_email": "user@example.com", + "user_access_level": "owner", + "pipeline_id": "123", + "pipeline_source": "push", + "job_id": "123", + "ref": "main", + "ref_type": "branch", + "ref_path": "refs/heads/main", + "ref_protected": "true", + "runner_id": 123, + "runner_environment": "gitlab-hosted", + "sha": "93969f556a29853b507bdcd9dec6b4217a4ea2e7", + "project_visibility": "private", + "ci_config_ref_uri": "gitlab.com/foo/bar//.gitlab-ci.yml@refs/heads/main", + "ci_config_sha": "93969f556a29853b507bdcd9dec6b4217a4ea2e7", + "jti": "2a5381a8-baf5-43c5-823d-544a08a067fb", + "iat": 1750405794, + "nbf": 1750405789, + "exp": 1750409394, + "iss": "https://gitlab.com", + "sub": "project_path:foo/bar:ref_type:branch:ref:main", + "aud": "pypi" + } +""" +DUMMY_GITLAB_OIDC_JWT = ( + "eyJraWQiOiI0aTNzRkU3c3hxTlBPVDdGZHZjR0ExWlZHR0lfci10c0RYbkV1WVQ0WnFFI" + "iwidHlwIjoiSldUIiwiYWxnIjoiUlMyNTYifQ.eyJuYW1lc3BhY2VfaWQiOiIxMjMiLCJ" + "uYW1lc3BhY2VfcGF0aCI6ImZvbyIsInByb2plY3RfaWQiOiI1NTIzNTY2NCIsInByb2pl" + "Y3RfcGF0aCI6ImZvby9iYXIiLCJ1c2VyX2lkIjoiMTIzIiwidXNlcl9sb2dpbiI6InVzZ" + "XIiLCJ1c2VyX2VtYWlsIjoidXNlckBleGFtcGxlLmNvbSIsInVzZXJfYWNjZXNzX2xldm" + "VsIjoib3duZXIiLCJwaXBlbGluZV9pZCI6IjEyMyIsInBpcGVsaW5lX3NvdXJjZSI6InB" + "1c2giLCJqb2JfaWQiOiIxMjMiLCJyZWYiOiJtYWluIiwicmVmX3R5cGUiOiJicmFuY2gi" + "LCJyZWZfcGF0aCI6InJlZnMvaGVhZHMvbWFpbiIsInJlZl9wcm90ZWN0ZWQiOiJ0cnVlI" + "iwicnVubmVyX2lkIjoxMjMsInJ1bm5lcl9lbnZpcm9ubWVudCI6ImdpdGxhYi1ob3N0ZW" + "QiLCJzaGEiOiI5Mzk2OWY1NTZhMjk4NTNiNTA3YmRjZDlkZWM2YjQyMTdhNGVhMmU3Iiw" + "icHJvamVjdF92aXNpYmlsaXR5IjoicHJpdmF0ZSIsImNpX2NvbmZpZ19yZWZfdXJpIjoi" + "Z2l0bGFiLmNvbS9mb28vYmFyLy8uZ2l0bGFiLWNpLnltbEByZWZzL2hlYWRzL21haW4iL" + "CJjaV9jb25maWdfc2hhIjoiOTM5NjlmNTU2YTI5ODUzYjUwN2JkY2Q5ZGVjNmI0MjE3YT" + "RlYTJlNyIsImp0aSI6IjJhNTM4MWE4LWJhZjUtNDNjNS04MjNkLTU0NGEwOGEwNjdmYiI" + "sImlhdCI6MTc1MDQwNTc5NCwibmJmIjoxNzUwNDA1Nzg5LCJleHAiOjE3NTA0MDkzOTQs" + "ImlzcyI6Imh0dHBzOi8vZ2l0bGFiLmNvbSIsInN1YiI6InByb2plY3RfcGF0aDpmb28vY" + "mFyOnJlZl90eXBlOmJyYW5jaDpyZWY6bWFpbiIsImF1ZCI6InB5cGkifQ.g3ceS-5KkGC" + "28nIKI-9HNf6lOPmBNyUAcQI-IjwZKpwcrWTcIM0lb6qKn1DyIqz2nE2W-SW3-hfZOIq9" + "6xJhGuZfl2bAV7uj_WwUnEoh2hSqW99T2_2bqkrLqkwcJ2w5yvEE2WtzUsxpRqEmuhxHH" + "uTOMLJPgydTqnJ2qe2oQxxWtpv_P0VjpZ_QXOk_KP6dBOHNdj7Iu7myCgybU5BTSyT33N" + "kbWMiZHHBL6Andl3dU9eQD17-BYkuoQCzLxepoAzNKgiIRL3OULljfWP8wXcLymDXaj1Q" + "A0sI9uulFYYx3ZgEmziSjv_e297kkrW3E_kbGyyGkTFxiOX6zSsKfUg" +) + +""" + { + "iss": "https://accounts.google.com", + "azp": "32555350559.apps.googleusercontent.com", + "aud": "pypi", + "sub": "111260650121185072906", + "hd": "google.com", + "email": "user@example.com", + "email_verified": "true", + "at_hash": "_LLKKivfvfme9eoQ3WcMIg", + "iat": "1650053185", + "exp": "1650056785", + "alg": "RS256", + "kid": "f1338ca26835863f671403941738a7b49e740fc0", + "typ": "JWT" +} +""" +DUMMY_GOOGLE_OIDC_JWT = ( + "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJodHRwczovL2FjY291bnRz" + "Lmdvb2dsZS5jb20iLCJhenAiOiIzMjU1NTM1MDU1OS5hcHBzLmdvb2dsZXVzZXJjb250Z" + "W50LmNvbSIsImF1ZCI6InB5cGkiLCJzdWIiOiIxMTEyNjA2NTAxMjExODUwNzI5MDYiLC" + "JoZCI6Imdvb2dsZS5jb20iLCJlbWFpbCI6InVzZXJAZXhhbXBsZS5jb20iLCJlbWFpbF9" + "2ZXJpZmllZCI6InRydWUiLCJhdF9oYXNoIjoiX0xMS0tpdmZ2Zm1lOWVvUTNXY01JZyIs" + "ImlhdCI6IjE2NTAwNTMxODUiLCJleHAiOiIxNjUwMDU2Nzg1IiwiYWxnIjoiUlMyNTYiL" + "CJraWQiOiJmMTMzOGNhMjY4MzU4NjNmNjcxNDAzOTQxNzM4YTdiNDllNzQwZmMwIiwidH" + "lwIjoiSldUIn0.wlPNSE6eTFvznJawgpa6cHC3a8sU5_VBH8si9h-sgi0" +) diff --git a/tests/conftest.py b/tests/conftest.py index 65cfa0039681..6fee2bff5ef6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -349,6 +349,9 @@ def get_app_config(database, nondefaults=None): "statuspage.url": "https://2p66nmmycsj3.statuspage.io", "warehouse.xmlrpc.cache.url": "redis://localhost:0/", "terms.revision": "initial", + "oidc.jwk_cache_url": "redis://localhost:0/", + "warehouse.oidc.audience": "pypi", + "oidc.backend": "warehouse.oidc.services.NullOIDCPublisherService", } if nondefaults: @@ -609,6 +612,28 @@ def db_request(pyramid_request, db_session, tm): return pyramid_request +@pytest.fixture +def _enable_all_oidc_providers(webtest): + flags = ( + AdminFlagValue.DISALLOW_ACTIVESTATE_OIDC, + AdminFlagValue.DISALLOW_GITLAB_OIDC, + AdminFlagValue.DISALLOW_GITHUB_OIDC, + AdminFlagValue.DISALLOW_GOOGLE_OIDC, + ) + original_flag_values = {} + db_sess = webtest.extra_environ["warehouse.db_session"] + + for flag in flags: + flag_db = db_sess.get(AdminFlag, flag.value) + original_flag_values[flag] = flag_db.enabled + flag_db.enabled = False + yield + + for flag in flags: + flag_db = db_sess.get(AdminFlag, flag.value) + flag_db.enabled = original_flag_values[flag] + + @pytest.fixture def _enable_organizations(db_request): flag = db_request.db.get(AdminFlag, AdminFlagValue.DISABLE_ORGANIZATIONS.value) diff --git a/tests/functional/forklift/test_legacy.py b/tests/functional/forklift/test_legacy.py index 1cdb3b04eb09..39bfae7573c8 100644 --- a/tests/functional/forklift/test_legacy.py +++ b/tests/functional/forklift/test_legacy.py @@ -21,10 +21,21 @@ from webob.multidict import MultiDict -from tests.common.db.oidc import GitHubPublisherFactory +from tests.common.db.oidc import ( + ActiveStatePublisherFactory, + GitHubPublisherFactory, + GitLabPublisherFactory, + GooglePublisherFactory, +) from tests.common.db.packaging import ProjectFactory, RoleFactory from warehouse.macaroons import caveats +from ...common.constants import ( + DUMMY_ACTIVESTATE_OIDC_JWT, + DUMMY_GITHUB_OIDC_JWT, + DUMMY_GITLAB_OIDC_JWT, + DUMMY_GOOGLE_OIDC_JWT, +) from ...common.db.accounts import UserFactory from ...common.db.macaroons import MacaroonFactory @@ -407,3 +418,173 @@ def test_provenance_upload(webtest): status=HTTPStatus.OK, ) assert response.json == project.releases[0].files[0].provenance.provenance + + +@pytest.mark.parametrize( + ("publisher_factory", "publisher_data", "oidc_jwt"), + [ + ( + GitHubPublisherFactory, + { + "repository_name": "bar", + "repository_owner": "foo", + "repository_owner_id": "123", + "workflow_filename": "example.yml", + "environment": "fake", + }, + DUMMY_GITHUB_OIDC_JWT, + ), + ( + ActiveStatePublisherFactory, + { + "organization": "fakeorg", + "activestate_project_name": "fakeproject", + "actor": "foo", + "actor_id": "fake", + }, + DUMMY_ACTIVESTATE_OIDC_JWT, + ), + ( + GitLabPublisherFactory, + { + "namespace": "foo", + "project": "bar", + "workflow_filepath": ".gitlab-ci.yml", + "environment": "", + }, + DUMMY_GITLAB_OIDC_JWT, + ), + ( + GooglePublisherFactory, + { + "email": "user@example.com", + "sub": "111260650121185072906", + }, + DUMMY_GOOGLE_OIDC_JWT, + ), + ], +) +@pytest.mark.usefixtures("_enable_all_oidc_providers") +def test_trusted_publisher_upload_ok( + webtest, publisher_factory, publisher_data, oidc_jwt +): + user = UserFactory.create(with_verified_primary_email=True, clear_pwd="password") + project = ProjectFactory.create(name="sampleproject") + RoleFactory.create(user=user, project=project, role_name="Owner") + publisher_factory.create( + projects=[project], + **publisher_data, + ) + + response = webtest.post_json( + "/_/oidc/mint-token", + params={ + "token": oidc_jwt, + }, + status=HTTPStatus.OK, + ) + + assert "success" in response.json + assert response.json["success"] + assert "token" in response.json + pypi_token = response.json["token"] + assert pypi_token.startswith("pypi-") + + with open(_ASSETS / "sampleproject-3.0.0.tar.gz", "rb") as f: + content = f.read() + + webtest.set_authorization(("Basic", ("__token__", pypi_token))) + webtest.post( + "/legacy/?:action=file_upload", + params={ + "name": "sampleproject", + "sha256_digest": ( + "117ed88e5db073bb92969a7545745fd977ee85b7019706dd256a64058f70963d" + ), + "filetype": "sdist", + "metadata_version": "2.1", + "version": "3.0.0", + }, + upload_files=[("content", "sampleproject-3.0.0.tar.gz", content)], + status=HTTPStatus.OK, + ) + + assert len(project.releases) == 1 + release = project.releases[0] + assert release.files.count() == 1 + + +@pytest.mark.parametrize( + ("publisher_factory", "publisher_data", "oidc_jwt"), + [ + ( + GitHubPublisherFactory, + { + "repository_name": "wrong", + "repository_owner": "foo", + "repository_owner_id": "123", + "workflow_filename": "example.yml", + "environment": "fake", + }, + DUMMY_GITHUB_OIDC_JWT, + ), + ( + ActiveStatePublisherFactory, + { + "organization": "wrong", + "activestate_project_name": "fakeproject", + "actor": "foo", + "actor_id": "fake", + }, + DUMMY_ACTIVESTATE_OIDC_JWT, + ), + ( + GitLabPublisherFactory, + { + "namespace": "wrong", + "project": "bar", + "workflow_filepath": ".gitlab-ci.yml", + "environment": "fake", + }, + DUMMY_GITLAB_OIDC_JWT, + ), + ( + GooglePublisherFactory, + { + "email": "wrong@example.com", + "sub": "111260650121185072906", + }, + DUMMY_GOOGLE_OIDC_JWT, + ), + ], +) +@pytest.mark.usefixtures("_enable_all_oidc_providers") +def test_trusted_publisher_upload_fails_wrong_publisher( + webtest, publisher_factory, publisher_data, oidc_jwt +): + user = UserFactory.create(with_verified_primary_email=True, clear_pwd="password") + project = ProjectFactory.create(name="sampleproject") + RoleFactory.create(user=user, project=project, role_name="Owner") + publisher_factory.create( + projects=[project], + **publisher_data, + ) + response = webtest.post_json( + "/_/oidc/mint-token", + params={ + "token": oidc_jwt, + }, + status=HTTPStatus.UNPROCESSABLE_CONTENT, + ) + + assert "token" not in response.json + assert "message" in response.json + assert response.json["message"] == "Token request failed" + assert "errors" in response.json + assert response.json["errors"] == [ + { + "code": "invalid-publisher", + "description": "valid token, but no corresponding publisher " + "(Publisher with matching claims was not found)", + } + ] diff --git a/tests/unit/manage/test_views.py b/tests/unit/manage/test_views.py index 9f8f2f723350..d45ba2d3757e 100644 --- a/tests/unit/manage/test_views.py +++ b/tests/unit/manage/test_views.py @@ -7551,6 +7551,90 @@ def test_add_oidc_publisher_already_registered_with_project( ) ] + def test_add_oidc_publisher_already_registered_after_normalization( + self, monkeypatch, db_request + ): + publisher = GitHubPublisher( + repository_name="some-repository", + repository_owner="some-owner", + repository_owner_id="666", + workflow_filename="some-workflow-filename.yml", + environment="some-environment", + ) + post_body = MultiDict( + { + "owner": "some-owner", + "repository": "some-repository", + "workflow_filename": "some-workflow-filename.yml", + "environment": "SOME-environment", + } + ) + db_request.user = UserFactory.create() + EmailFactory(user=db_request.user, verified=True, primary=True) + db_request.db.add(publisher) + db_request.db.flush() # To get it in the DB + + project = pretend.stub( + name="fakeproject", + oidc_publishers=[publisher], + record_event=pretend.call_recorder(lambda *a, **kw: None), + ) + + db_request.registry = pretend.stub( + settings={ + "github.token": "fake-api-token", + } + ) + db_request.flags = pretend.stub( + disallow_oidc=pretend.call_recorder(lambda f=None: False) + ) + db_request.session = pretend.stub( + flash=pretend.call_recorder(lambda *a, **kw: None) + ) + db_request.POST = post_body + + view = views.ManageOIDCPublisherViews(project, db_request) + monkeypatch.setattr( + views.GitHubPublisherForm, + "_lookup_owner", + lambda *a: {"login": "some-owner", "id": "some-owner-id"}, + ) + + monkeypatch.setattr( + view, "_hit_ratelimits", pretend.call_recorder(lambda: None) + ) + monkeypatch.setattr( + view, "_check_ratelimits", pretend.call_recorder(lambda: None) + ) + + assert view.add_github_oidc_publisher() == { + "disabled": { + "GitHub": False, + "GitLab": False, + "Google": False, + "ActiveState": False, + }, + "project": project, + "github_publisher_form": view.github_publisher_form, + "gitlab_publisher_form": view.gitlab_publisher_form, + "google_publisher_form": view.google_publisher_form, + "activestate_publisher_form": view.activestate_publisher_form, + "prefilled_provider": view.prefilled_provider, + } + assert view.metrics.increment.calls == [ + pretend.call( + "warehouse.oidc.add_publisher.attempt", + tags=["publisher:GitHub"], + ), + ] + assert project.record_event.calls == [] + assert db_request.session.flash.calls == [ + pretend.call( + f"{str(publisher)} is already registered with fakeproject", + queue="error", + ) + ] + @pytest.mark.parametrize( ("view_name", "publisher_name"), [ diff --git a/tests/unit/oidc/models/test_activestate.py b/tests/unit/oidc/models/test_activestate.py index 8797b2d451fe..1902ab039f03 100644 --- a/tests/unit/oidc/models/test_activestate.py +++ b/tests/unit/oidc/models/test_activestate.py @@ -37,14 +37,6 @@ SUBJECT = f"org:{ORG_URL_NAME}:project:{PROJECT_NAME}" -def test_lookup_strategies(): - assert ( - len(ActiveStatePublisher.__lookup_strategies__) - == len(PendingActiveStatePublisher.__lookup_strategies__) - == 1 - ) - - def new_signed_claims( sub: str = SUBJECT, actor: str = ACTOR, @@ -381,6 +373,13 @@ def test_exists(self, db_request, exists_in_db): assert publisher.exists(db_request.db) == exists_in_db + def test_lookup_no_matching_publishers(self, db_request): + signed_claims = new_signed_claims(actor_id="my_id") + + with pytest.raises(InvalidPublisherError) as e: + ActiveStatePublisher.lookup_by_claims(db_request.db, signed_claims) + assert str(e.value) == "Publisher with matching claims was not found" + class TestPendingActiveStatePublisher: def test_reify_does_not_exist_yet(self, db_request): diff --git a/tests/unit/oidc/models/test_core.py b/tests/unit/oidc/models/test_core.py index 0832e4dddd9c..bd1ed8080969 100644 --- a/tests/unit/oidc/models/test_core.py +++ b/tests/unit/oidc/models/test_core.py @@ -43,9 +43,8 @@ def test_check_claim_invariant(): class TestOIDCPublisher: def test_lookup_by_claims_raises(self): - with pytest.raises(errors.InvalidPublisherError) as e: + with pytest.raises(NotImplementedError): _core.OIDCPublisher.lookup_by_claims(pretend.stub(), pretend.stub()) - assert str(e.value) == "All lookup strategies exhausted" def test_oidc_publisher_not_default_verifiable(self): publisher = _core.OIDCPublisher(projects=[]) diff --git a/tests/unit/oidc/models/test_github.py b/tests/unit/oidc/models/test_github.py index aafa23ff4f46..e095596d2663 100644 --- a/tests/unit/oidc/models/test_github.py +++ b/tests/unit/oidc/models/test_github.py @@ -96,13 +96,6 @@ def test_check_sub(claim): class TestGitHubPublisher: - def test_lookup_strategies(self): - assert ( - len(github.GitHubPublisher.__lookup_strategies__) - == len(github.PendingGitHubPublisher.__lookup_strategies__) - == 2 - ) - @pytest.mark.parametrize("environment", [None, "some_environment"]) def test_lookup_fails_invalid_workflow_ref(self, environment): signed_claims = { @@ -116,7 +109,8 @@ def test_lookup_fails_invalid_workflow_ref(self, environment): # The `job_workflow_ref` is malformed, so no queries are performed. with pytest.raises( - errors.InvalidPublisherError, match="All lookup strategies exhausted" + errors.InvalidPublisherError, + match="Could not job extract workflow filename from OIDC claims", ): github.GitHubPublisher.lookup_by_claims(pretend.stub(), signed_claims) @@ -165,6 +159,28 @@ def test_lookup_escapes(self, db_request, environment, workflow_a, workflow_b): == workflow ) + def test_lookup_no_matching_publishers(self, db_request): + GitHubPublisherFactory( + id="aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa", + repository_owner="foo", + repository_name="bar", + repository_owner_id="1234", + workflow_filename="release.yml", + environment="environment", + ) + signed_claims = { + "repository": "foo/bar", + "job_workflow_ref": ( + "foo/bar/.github/workflows/release.yml@refs/heads/main" + ), + "repository_owner_id": "1234", + "environment": "another_environment", + } + + with pytest.raises(errors.InvalidPublisherError) as e: + github.GitHubPublisher.lookup_by_claims(db_request.db, signed_claims) + assert str(e.value) == "Publisher with matching claims was not found" + def test_github_publisher_all_known_claims(self): assert github.GitHubPublisher.all_known_claims() == { # required verifiable claims diff --git a/tests/unit/oidc/models/test_gitlab.py b/tests/unit/oidc/models/test_gitlab.py index 0202a3e7b893..0ff5708f43b6 100644 --- a/tests/unit/oidc/models/test_gitlab.py +++ b/tests/unit/oidc/models/test_gitlab.py @@ -64,13 +64,6 @@ def test_check_sub(claim): class TestGitLabPublisher: - def test_lookup_strategies(self): - assert ( - len(gitlab.GitLabPublisher.__lookup_strategies__) - == len(gitlab.PendingGitLabPublisher.__lookup_strategies__) - == 2 - ) - @pytest.mark.parametrize("environment", [None, "some_environment"]) def test_lookup_fails_invalid_ci_config_ref_uri(self, environment): signed_claims = { @@ -83,7 +76,8 @@ def test_lookup_fails_invalid_ci_config_ref_uri(self, environment): # The `ci_config_ref_uri` is malformed, so no queries are performed. with pytest.raises( - errors.InvalidPublisherError, match="All lookup strategies exhausted" + errors.InvalidPublisherError, + match="Could not extract workflow filename from OIDC claims", ): gitlab.GitLabPublisher.lookup_by_claims(pretend.stub(), signed_claims) @@ -131,6 +125,15 @@ def test_lookup_escapes( == workflow_filepath ) + def test_lookup_no_matching_publisher(self, db_request): + signed_claims = { + "project_path": "foo/bar", + "ci_config_ref_uri": ("gitlab.com/foo/bar//.gitlab-ci.yml@refs/heads/main"), + } + with pytest.raises(errors.InvalidPublisherError) as e: + gitlab.GitLabPublisher.lookup_by_claims(db_request.db, signed_claims) + assert str(e.value) == "Publisher with matching claims was not found" + def test_gitlab_publisher_all_known_claims(self): assert gitlab.GitLabPublisher.all_known_claims() == { # required verifiable claims diff --git a/tests/unit/oidc/models/test_google.py b/tests/unit/oidc/models/test_google.py index 04a3c0a2de20..a63886d306a7 100644 --- a/tests/unit/oidc/models/test_google.py +++ b/tests/unit/oidc/models/test_google.py @@ -19,14 +19,6 @@ from warehouse.oidc.models import _core, google -def test_lookup_strategies(): - assert ( - len(google.GooglePublisher.__lookup_strategies__) - == len(google.PendingGooglePublisher.__lookup_strategies__) - == 2 - ) - - class TestGooglePublisher: def test_publisher_name(self): publisher = google.GooglePublisher(email="fake@example.com") @@ -196,6 +188,16 @@ def test_google_publisher_sub_is_optional(self, expected_sub, actual_sub, valid) ) assert str(e.value) == "Check failed for optional claim 'sub'" + def test_lookup_no_matching_publishers(self, db_request): + signed_claims = { + "email": "fake@example.com", + "email_verified": True, + } + + with pytest.raises(errors.InvalidPublisherError) as e: + google.GooglePublisher.lookup_by_claims(db_request.db, signed_claims) + assert str(e.value) == "Publisher with matching claims was not found" + @pytest.mark.parametrize("exists_in_db", [True, False]) def test_exists(self, db_request, exists_in_db): publisher = google.GooglePublisher( diff --git a/warehouse/oidc/models/_core.py b/warehouse/oidc/models/_core.py index 4cbc3511c2d1..7ffd2dc7bafc 100644 --- a/warehouse/oidc/models/_core.py +++ b/warehouse/oidc/models/_core.py @@ -13,7 +13,7 @@ from __future__ import annotations from collections.abc import Callable -from typing import TYPE_CHECKING, Any, TypedDict, TypeVar, Unpack +from typing import TYPE_CHECKING, Any, Self, TypedDict, TypeVar, Unpack import rfc3986 import sentry_sdk @@ -162,32 +162,15 @@ class OIDCPublisherMixin: # required and optional attributes, and thus can't be naively looked # up from a raw claim set. # - # Each subclass should explicitly override this list to contain - # class methods that take a `SignedClaims` and return a SQLAlchemy - # expression that, when queried, should produce exactly one or no result. - # This list should be ordered by specificity, e.g. selecting for the - # expression with the most optional constraints first, and ending with - # the expression with only required constraints. + # Each subclass should explicitly override this method, which takes + # a set of claims (`SignedClaims`) and returns a Publisher. + # In case that multiple publishers satisfy the given claims, the + # most specific publisher should be the one returned, i.e. the one with + # the most optional constraints satisfied. # - # TODO(ww): In principle this list is computable directly from - # `__required_verifiable_claims__` and `__optional_verifiable_claims__`, - # but there are a few problems: those claim sets don't map to their - # "equivalent" column (only to an instantiated property), and may not - # even have an "equivalent" column. - __lookup_strategies__: list = [] - @classmethod - def lookup_by_claims(cls, session, signed_claims: SignedClaims): - for lookup in cls.__lookup_strategies__: - query = lookup(cls, signed_claims) - if not query: - # We might not build a query if we know the claim set can't - # satisfy it. If that's the case, then we skip. - continue - - if publisher := query.with_session(session).one_or_none(): - return publisher - raise InvalidPublisherError("All lookup strategies exhausted") + def lookup_by_claims(cls, session, signed_claims: SignedClaims) -> Self: + raise NotImplementedError @classmethod def all_known_claims(cls) -> set[str]: @@ -244,7 +227,7 @@ def check_claims_existence(cls, signed_claims: SignedClaims) -> None: with sentry_sdk.new_scope() as scope: scope.fingerprint = [claim_name] sentry_sdk.capture_message( - f"JWT for {cls.__name__} is missing claim: " f"{claim_name}" + f"JWT for {cls.__name__} is missing claim: {claim_name}" ) raise InvalidPublisherError(f"Missing claim {claim_name!r}") diff --git a/warehouse/oidc/models/activestate.py b/warehouse/oidc/models/activestate.py index 9544e6ac762f..936c2b43a8eb 100644 --- a/warehouse/oidc/models/activestate.py +++ b/warehouse/oidc/models/activestate.py @@ -13,7 +13,7 @@ import urllib -from typing import Any +from typing import Any, Self from sqlalchemy import ForeignKey, String, UniqueConstraint, and_, exists from sqlalchemy.dialects.postgresql import UUID @@ -92,18 +92,6 @@ class ActiveStatePublisherMixin: "project_visibility", } - @staticmethod - def __lookup_all__(klass, signed_claims: SignedClaims): - return Query(klass).filter_by( - organization=signed_claims["organization"], - activestate_project_name=signed_claims["project"], - actor_id=signed_claims["actor_id"], - ) - - __lookup_strategies__ = [ - __lookup_all__, - ] - @property def sub(self) -> str: return f"org:{self.organization}:project:{self.activestate_project_name}" @@ -147,6 +135,17 @@ def exists(self, session) -> bool: ) ).scalar() + @classmethod + def lookup_by_claims(cls, session, signed_claims: SignedClaims) -> Self: + query: Query = Query(cls).filter_by( + organization=signed_claims["organization"], + activestate_project_name=signed_claims["project"], + actor_id=signed_claims["actor_id"], + ) + if publisher := query.with_session(session).one_or_none(): + return publisher + raise InvalidPublisherError("Publisher with matching claims was not found") + class ActiveStatePublisher(ActiveStatePublisherMixin, OIDCPublisher): __tablename__ = "activestate_oidc_publishers" diff --git a/warehouse/oidc/models/github.py b/warehouse/oidc/models/github.py index e728fbab3b11..aaa1beca02fc 100644 --- a/warehouse/oidc/models/github.py +++ b/warehouse/oidc/models/github.py @@ -12,8 +12,9 @@ import re -from typing import Any +from typing import Any, Self +from more_itertools import first_true from pypi_attestations import GitHubPublisher as GitHubIdentity, Publisher from sqlalchemy import ForeignKey, String, UniqueConstraint, and_, exists from sqlalchemy.dialects.postgresql import UUID @@ -192,49 +193,50 @@ class GitHubPublisherMixin: "ref_protected", } - @staticmethod - def __lookup_all__(klass, signed_claims: SignedClaims) -> Query | None: - # This lookup requires the environment claim to be present; - # if it isn't, bail out early. - if not (environment := signed_claims.get("environment")): - return None - - repository = signed_claims["repository"] - repository_owner, repository_name = repository.split("/", 1) - workflow_ref = signed_claims["job_workflow_ref"] - - if not (workflow_filename := _extract_workflow_filename(workflow_ref)): - return None + # Get the most specific publisher from a list of publishers, + # where publishers constrained with an environment are more + # specific than publishers not constrained on environment. + @classmethod + def _get_publisher_for_environment( + cls, publishers: list[Self], environment: str | None + ) -> Self | None: + if environment: + if specific_publisher := first_true( + publishers, pred=lambda p: p.environment == environment.lower() + ): + return specific_publisher + + if general_publisher := first_true( + publishers, pred=lambda p: p.environment == "" + ): + return general_publisher - return Query(klass).filter_by( - repository_name=repository_name, - repository_owner=repository_owner, - repository_owner_id=signed_claims["repository_owner_id"], - environment=environment.lower(), - workflow_filename=workflow_filename, - ) + return None - @staticmethod - def __lookup_no_environment__(klass, signed_claims: SignedClaims) -> Query | None: + @classmethod + def lookup_by_claims(cls, session, signed_claims: SignedClaims) -> Self: repository = signed_claims["repository"] repository_owner, repository_name = repository.split("/", 1) - workflow_ref = signed_claims["job_workflow_ref"] + job_workflow_ref = signed_claims["job_workflow_ref"] + environment = signed_claims.get("environment") - if not (workflow_filename := _extract_workflow_filename(workflow_ref)): - return None + if not (job_workflow_filename := _extract_workflow_filename(job_workflow_ref)): + raise InvalidPublisherError( + "Could not job extract workflow filename from OIDC claims" + ) - return Query(klass).filter_by( + query: Query = Query(cls).filter_by( repository_name=repository_name, repository_owner=repository_owner, repository_owner_id=signed_claims["repository_owner_id"], - environment="", - workflow_filename=workflow_filename, + workflow_filename=job_workflow_filename, ) + publishers = query.with_session(session).all() - __lookup_strategies__ = [ - __lookup_all__, - __lookup_no_environment__, - ] + if publisher := cls._get_publisher_for_environment(publishers, environment): + return publisher + else: + raise InvalidPublisherError("Publisher with matching claims was not found") @property def _workflow_slug(self): diff --git a/warehouse/oidc/models/gitlab.py b/warehouse/oidc/models/gitlab.py index baf25a921091..5ec9916d7f33 100644 --- a/warehouse/oidc/models/gitlab.py +++ b/warehouse/oidc/models/gitlab.py @@ -12,8 +12,9 @@ import re -from typing import Any +from typing import Any, Self +from more_itertools import first_true from pypi_attestations import GitLabPublisher as GitLabIdentity, Publisher from sqlalchemy import ForeignKey, String, UniqueConstraint, and_, exists from sqlalchemy.dialects.postgresql import UUID @@ -188,47 +189,48 @@ class GitLabPublisherMixin: "groups_direct", } - @staticmethod - def __lookup_all__(klass, signed_claims: SignedClaims) -> Query | None: - # This lookup requires the environment claim to be present; - # if it isn't, bail out early. - if not (environment := signed_claims.get("environment")): - return None - - project_path = signed_claims["project_path"] - ci_config_ref_uri = signed_claims["ci_config_ref_uri"] - namespace, project = project_path.rsplit("/", 1) - - if not (workflow_filepath := _extract_workflow_filepath(ci_config_ref_uri)): - return None + # Get the most specific publisher from a list of publishers, + # where publishers constrained with an environment are more + # specific than publishers not constrained on environment. + @classmethod + def _get_publisher_for_environment( + cls, publishers: list[Self], environment: str | None + ) -> Self | None: + if environment: + if specific_publisher := first_true( + publishers, pred=lambda p: p.environment == environment.lower() + ): + return specific_publisher + + if general_publisher := first_true( + publishers, pred=lambda p: p.environment == "" + ): + return general_publisher - return Query(klass).filter_by( - namespace=namespace, - project=project, - environment=environment, - workflow_filepath=workflow_filepath, - ) + return None - @staticmethod - def __lookup_no_environment__(klass, signed_claims: SignedClaims) -> Query | None: + @classmethod + def lookup_by_claims(cls, session, signed_claims: SignedClaims) -> Self: project_path = signed_claims["project_path"] ci_config_ref_uri = signed_claims["ci_config_ref_uri"] namespace, project = project_path.rsplit("/", 1) + environment = signed_claims.get("environment") if not (workflow_filepath := _extract_workflow_filepath(ci_config_ref_uri)): - return None + raise InvalidPublisherError( + "Could not extract workflow filename from OIDC claims" + ) - return Query(klass).filter_by( + query: Query = Query(cls).filter_by( namespace=namespace, project=project, - environment="", workflow_filepath=workflow_filepath, ) + publishers = query.with_session(session).all() + if publisher := cls._get_publisher_for_environment(publishers, environment): + return publisher - __lookup_strategies__ = [ - __lookup_all__, - __lookup_no_environment__, - ] + raise InvalidPublisherError("Publisher with matching claims was not found") @property def project_path(self): diff --git a/warehouse/oidc/models/google.py b/warehouse/oidc/models/google.py index 04476d22cc6e..5ee90e855edd 100644 --- a/warehouse/oidc/models/google.py +++ b/warehouse/oidc/models/google.py @@ -10,13 +10,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Any +from typing import Any, Self +from more_itertools import first_true from pypi_attestations import GooglePublisher as GoogleIdentity, Publisher from sqlalchemy import ForeignKey, String, UniqueConstraint, and_, exists from sqlalchemy.dialects.postgresql import UUID from sqlalchemy.orm import Query, mapped_column +from warehouse.oidc.errors import InvalidPublisherError from warehouse.oidc.interfaces import SignedClaims from warehouse.oidc.models._core import ( CheckClaimCallable, @@ -68,20 +70,21 @@ class GooglePublisherMixin: __unchecked_claims__ = {"azp", "google"} - @staticmethod - def __lookup_all__(klass, signed_claims: SignedClaims) -> Query | None: - return Query(klass).filter_by( - email=signed_claims["email"], sub=signed_claims["sub"] - ) + @classmethod + def lookup_by_claims(cls, session, signed_claims: SignedClaims) -> Self: + query: Query = Query(cls).filter_by(email=signed_claims["email"]) + publishers = query.with_session(session).all() + + if sub := signed_claims.get("sub"): + if specific_publisher := first_true( + publishers, pred=lambda p: p.sub == sub + ): + return specific_publisher - @staticmethod - def __lookup_no_sub__(klass, signed_claims: SignedClaims) -> Query | None: - return Query(klass).filter_by(email=signed_claims["email"], sub="") + if general_publisher := first_true(publishers, pred=lambda p: p.sub == ""): + return general_publisher - __lookup_strategies__ = [ - __lookup_all__, - __lookup_no_sub__, - ] + raise InvalidPublisherError("Publisher with matching claims was not found") @property def publisher_name(self):