From 94a322f234902fe4913887dedb07a15107a27e04 Mon Sep 17 00:00:00 2001 From: nannan00 <17491932+nannan00@users.noreply.github.com> Date: Fri, 23 Aug 2024 17:10:24 +0800 Subject: [PATCH] fix: cr --- .../bkuser/apis/open_v1/authentications.py | 24 ++++++++++--------- .../bkuser/apis/open_v2/authentications.py | 24 ++++++++++--------- src/bk-user/tests/apis/open_v1/conftest.py | 2 +- 3 files changed, 27 insertions(+), 23 deletions(-) diff --git a/src/bk-user/bkuser/apis/open_v1/authentications.py b/src/bk-user/bkuser/apis/open_v1/authentications.py index b04aa57f6..656de20e8 100644 --- a/src/bk-user/bkuser/apis/open_v1/authentications.py +++ b/src/bk-user/bkuser/apis/open_v1/authentications.py @@ -11,12 +11,14 @@ import base64 import logging +from typing import Any, Dict import jwt from django.conf import settings from django.contrib.auth import get_user_model from rest_framework import exceptions from rest_framework.authentication import BaseAuthentication +from rest_framework.request import Request from bkuser.common.cache import cachedmethod from bkuser.common.constants import BKNonEntityUser @@ -35,8 +37,8 @@ def authenticate(self, request): if not credentials: return None - verified, payload = self.verify_credentials(credentials=credentials) - if not verified: + payload = self.verify_credentials(credentials=credentials) + if payload is None: return None username = self._get_username_from_jwt_payload(payload) @@ -47,7 +49,7 @@ def authenticate(self, request): return self._get_or_create_user(username), None - def get_credentials(self, request): + def get_credentials(self, request: Request) -> Dict[str, str] | None: """从 Header 头获取接口认证的凭证信息""" credentials = { "jwt": request.META.get("HTTP_X_BKAPI_JWT"), @@ -59,17 +61,17 @@ def get_credentials(self, request): return None - def verify_credentials(self, credentials): + def verify_credentials(self, credentials: Dict[str, str]) -> Dict[str, Any] | None: """JWT 校验并解析出调用方信息""" public_key = self._get_jwt_public_key(credentials["from"]) # Note: 不从 jwt header 里取 kid 判断是网关还是 ESB 签发的,在不同环境可能不准确 jwt_payload = self._decode_jwt(credentials["jwt"], public_key) if not jwt_payload: - return False, None + return None - return True, jwt_payload + return jwt_payload - def _decode_jwt(self, content, public_key): + def _decode_jwt(self, content: str, public_key: str) -> Dict[str, Any] | None: """解析 JWT""" try: jwt_header = jwt.get_unverified_header(content) @@ -79,7 +81,7 @@ def _decode_jwt(self, content, public_key): logger.exception("decode jwt fail, jwt: %s", content) return None - def _get_username_from_jwt_payload(self, jwt_payload): + def _get_username_from_jwt_payload(self, jwt_payload: Dict[str, Any]) -> str: """从 jwt payload 里获取 username""" user = jwt_payload.get("user", {}) verified = user.get("verified", False) @@ -96,7 +98,7 @@ def _get_username_from_jwt_payload(self, jwt_payload): # 匿名用户 return BKNonEntityUser.BK__ANONYMOUS_USER.value - def _get_app_code_from_jwt_payload(self, jwt_payload): + def _get_app_code_from_jwt_payload(self, jwt_payload: Dict[str, Any]) -> str: """从 jwt payload 里获取 app_code""" app = jwt_payload.get("app", {}) @@ -120,7 +122,7 @@ def _get_or_create_user(self, username): ) return user - def _get_apigw_public_key(self): + def _get_apigw_public_key(self) -> str: """ 获取 APIGW 的 Public Key 由于配置文件里的 public key 是来自环境变量,且使用 base64 编码,因此需要解码 @@ -139,7 +141,7 @@ def _get_apigw_public_key(self): return public_key @cachedmethod(timeout=None) # 缓存不过期,除非重新部署服务 - def _get_jwt_public_key(self, request_from): + def _get_jwt_public_key(self, request_from: str) -> str: """根据来源,获取 Jwt Public Key""" # TODO 理论上 open_v2 只接 ESB,open_v3 只接 APIGW,后续新增 open_v3 后可以分离该 Auth 类逻辑 if request_from == "apigw": diff --git a/src/bk-user/bkuser/apis/open_v2/authentications.py b/src/bk-user/bkuser/apis/open_v2/authentications.py index b04aa57f6..656de20e8 100644 --- a/src/bk-user/bkuser/apis/open_v2/authentications.py +++ b/src/bk-user/bkuser/apis/open_v2/authentications.py @@ -11,12 +11,14 @@ import base64 import logging +from typing import Any, Dict import jwt from django.conf import settings from django.contrib.auth import get_user_model from rest_framework import exceptions from rest_framework.authentication import BaseAuthentication +from rest_framework.request import Request from bkuser.common.cache import cachedmethod from bkuser.common.constants import BKNonEntityUser @@ -35,8 +37,8 @@ def authenticate(self, request): if not credentials: return None - verified, payload = self.verify_credentials(credentials=credentials) - if not verified: + payload = self.verify_credentials(credentials=credentials) + if payload is None: return None username = self._get_username_from_jwt_payload(payload) @@ -47,7 +49,7 @@ def authenticate(self, request): return self._get_or_create_user(username), None - def get_credentials(self, request): + def get_credentials(self, request: Request) -> Dict[str, str] | None: """从 Header 头获取接口认证的凭证信息""" credentials = { "jwt": request.META.get("HTTP_X_BKAPI_JWT"), @@ -59,17 +61,17 @@ def get_credentials(self, request): return None - def verify_credentials(self, credentials): + def verify_credentials(self, credentials: Dict[str, str]) -> Dict[str, Any] | None: """JWT 校验并解析出调用方信息""" public_key = self._get_jwt_public_key(credentials["from"]) # Note: 不从 jwt header 里取 kid 判断是网关还是 ESB 签发的,在不同环境可能不准确 jwt_payload = self._decode_jwt(credentials["jwt"], public_key) if not jwt_payload: - return False, None + return None - return True, jwt_payload + return jwt_payload - def _decode_jwt(self, content, public_key): + def _decode_jwt(self, content: str, public_key: str) -> Dict[str, Any] | None: """解析 JWT""" try: jwt_header = jwt.get_unverified_header(content) @@ -79,7 +81,7 @@ def _decode_jwt(self, content, public_key): logger.exception("decode jwt fail, jwt: %s", content) return None - def _get_username_from_jwt_payload(self, jwt_payload): + def _get_username_from_jwt_payload(self, jwt_payload: Dict[str, Any]) -> str: """从 jwt payload 里获取 username""" user = jwt_payload.get("user", {}) verified = user.get("verified", False) @@ -96,7 +98,7 @@ def _get_username_from_jwt_payload(self, jwt_payload): # 匿名用户 return BKNonEntityUser.BK__ANONYMOUS_USER.value - def _get_app_code_from_jwt_payload(self, jwt_payload): + def _get_app_code_from_jwt_payload(self, jwt_payload: Dict[str, Any]) -> str: """从 jwt payload 里获取 app_code""" app = jwt_payload.get("app", {}) @@ -120,7 +122,7 @@ def _get_or_create_user(self, username): ) return user - def _get_apigw_public_key(self): + def _get_apigw_public_key(self) -> str: """ 获取 APIGW 的 Public Key 由于配置文件里的 public key 是来自环境变量,且使用 base64 编码,因此需要解码 @@ -139,7 +141,7 @@ def _get_apigw_public_key(self): return public_key @cachedmethod(timeout=None) # 缓存不过期,除非重新部署服务 - def _get_jwt_public_key(self, request_from): + def _get_jwt_public_key(self, request_from: str) -> str: """根据来源,获取 Jwt Public Key""" # TODO 理论上 open_v2 只接 ESB,open_v3 只接 APIGW,后续新增 open_v3 后可以分离该 Auth 类逻辑 if request_from == "apigw": diff --git a/src/bk-user/tests/apis/open_v1/conftest.py b/src/bk-user/tests/apis/open_v1/conftest.py index 3912f6bdf..eef7be2c7 100644 --- a/src/bk-user/tests/apis/open_v1/conftest.py +++ b/src/bk-user/tests/apis/open_v1/conftest.py @@ -32,7 +32,7 @@ def open_v1_api_client() -> APIClient: return_value={"jwt": "jwt", "from": "esb"}, ), mock.patch( "bkuser.apis.open_v1.authentications.ESBAuthentication.verify_credentials", - return_value=(True, {"user": {"verify": False}, "app": {"verified": True, "bk_app_code": "bk_paas"}}), + return_value={"user": {"verify": False}, "app": {"verified": True, "bk_app_code": "bk_paas"}}, ): yield client