diff --git a/recombee_api_client/api_client.py b/recombee_api_client/api_client.py index 4b6546c..01b50d0 100644 --- a/recombee_api_client/api_client.py +++ b/recombee_api_client/api_client.py @@ -5,7 +5,7 @@ from typing import Union from enum import Enum -import requests +import httpx from hashlib import sha1 from urllib.parse import quote @@ -36,41 +36,49 @@ class RecombeeClient: :param region: region of the Recombee cluster where the database is located """ BATCH_MAX_SIZE = 10000 + CLIENT_CLS = httpx.Client def __init__(self, database_id: str, token: str, protocol: str = 'https', options: dict = None, region: Region = None): self.database_id = database_id self.token = token self.protocol = protocol - self.base_uri = self.__get_base_uri(options=options or {}, region=region) + self.base_uri = self._get_base_uri(options=options or {}, region=region) + self.client = self.CLIENT_CLS() - def send(self, request: Request) -> Union[dict, str, list]: - """ - :param request: Request to be sent to Recombee recommender - """ - + def _send(self, request: Request) -> Union[dict, str, list]: if isinstance(request, Batch) and len(request.requests) > self.BATCH_MAX_SIZE: - return self.__send_multipart_batch(request) + return self._send_multipart_batch(request) timeout = request.timeout / 1000 - uri = self.__process_request_uri(request) - uri = self.__sign_url(uri) + uri = self._process_request_uri(request) + uri = self._sign_url(uri) protocol = 'https' if request.ensure_https else self.protocol uri = protocol + '://' + self.base_uri + uri + + if request.method == 'put': + return self._put(request, uri, timeout) + elif request.method == 'get': + return self._get(request, uri, timeout) + elif request.method == 'post': + return self._post(request, uri, timeout) + elif request.method == 'delete': + return self._delete(request, uri, timeout) + + def send(self, request: Request) -> Union[dict, str, list]: + """ + :param request: Request to be sent to Recombee recommender + """ try: - if request.method == 'put': - return self.__put(request, uri, timeout) - elif request.method == 'get': - return self.__get(request, uri, timeout) - elif request.method == 'post': - return self.__post(request, uri, timeout) - elif request.method == 'delete': - return self.__delete(request, uri, timeout) - except requests.exceptions.Timeout: + response = self._send(request) + except httpx.TimeoutException: raise ApiTimeoutException(request) + self._check_errors(response, request) + return response.json() + @staticmethod - def __get_regional_base_uri(region: Region) -> str: + def _get_regional_base_uri(region: Region) -> str: uri = { Region.AP_SE: 'rapi-ap-se.recombee.com', Region.CA_EAST: 'rapi-ca-east.recombee.com', @@ -83,106 +91,114 @@ def __get_regional_base_uri(region: Region) -> str: return uri @staticmethod - def __get_base_uri(options: dict, region: str) -> str: + def _get_base_uri(options: dict, region: str) -> str: base_uri = os.environ.get('RAPI_URI') or options.get('base_uri') if region is not None: if base_uri: raise ValueError('base_uri and region cannot be specified at the same time') - base_uri = RecombeeClient.__get_regional_base_uri(region) + base_uri = RecombeeClient._get_regional_base_uri(region) return base_uri or 'rapi.recombee.com' @staticmethod - def __get_http_headers(additional_headers: dict = None) -> dict: + def _get_http_headers(additional_headers: dict = None) -> dict: headers = {'User-Agent': 'recombee-python-api-client/4.1.0'} if additional_headers: headers.update(additional_headers) return headers - def __put(self, request: Request, uri: str, timeout: int): - response = requests.put(uri, - data=json.dumps(request.get_body_parameters()), - headers=self.__get_http_headers({'Content-Type': 'application/json'}), - timeout=timeout) - self.__check_errors(response, request) - return response.json() + def _put(self, request: Request, uri: str, timeout: int): + return self.client.put(uri, + data=json.dumps(request.get_body_parameters()), + headers=self._get_http_headers({'Content-Type': 'application/json'}), + timeout=timeout) - def __get(self, request: Request, uri: str, timeout: int): - response = requests.get(uri, - headers=self.__get_http_headers(), - timeout=timeout) - self.__check_errors(response, request) - return response.json() + def _get(self, request: Request, uri: str, timeout: int): + return self.client.get(uri, + headers=self._get_http_headers(), + timeout=timeout) - def __post(self, request: Request, uri: str, timeout: int): - response = requests.post(uri, - data=json.dumps(request.get_body_parameters()), - headers=self.__get_http_headers({'Content-Type': 'application/json'}), - timeout=timeout) - self.__check_errors(response, request) - return response.json() + def _post(self, request: Request, uri: str, timeout: int): + return self.client.post(uri, + data=json.dumps(request.get_body_parameters()), + headers=self._get_http_headers({'Content-Type': 'application/json'}), + timeout=timeout) - def __delete(self, request: Request, uri: str, timeout: int): - response = requests.delete(uri, - data=json.dumps(request.get_body_parameters()), - headers=self.__get_http_headers({'Content-Type': 'application/json'}), - timeout=timeout) - self.__check_errors(response, request) - return response.json() + def _delete(self, request: Request, uri: str, timeout: int): + return self.client.delete(uri, + data=json.dumps(request.get_body_parameters()), + headers=self._get_http_headers({'Content-Type': 'application/json'}), + timeout=timeout) - def __check_errors(self, response, request: Request): + def _check_errors(self, response, request: Request): status_code = response.status_code if status_code == 200 or status_code == 201: return raise ResponseException(request, status_code, response.text) @staticmethod - def __get_list_chunks(l: list, n: int) -> list: + def _get_list_chunks(l: list, n: int) -> list: """Yield successive n-sized chunks from l.""" for i in range(0, len(l), n): yield l[i:i + n] - def __send_multipart_batch(self, batch: Batch) -> list: - requests_parts = [rqs for rqs in self.__get_list_chunks(batch.requests, self.BATCH_MAX_SIZE)] + def _send_multipart_batch(self, batch: Batch) -> list: + requests_parts = [rqs for rqs in self._get_list_chunks(batch.requests, self.BATCH_MAX_SIZE)] responses = [self.send(Batch(rqs)) for rqs in requests_parts] return sum(responses, []) - def __process_request_uri(self, request: Request) -> str: + def _process_request_uri(self, request: Request) -> str: uri = request.path - uri += self.__query_parameters_to_url(request) + uri += self._query_parameters_to_url(request) return uri - def __query_parameters_to_url(self, request: Request) -> str: + def _query_parameters_to_url(self, request: Request) -> str: ps = '' query_params = request.get_query_parameters() for name in query_params: val = query_params[name] ps += '&' if ps.find('?') != -1 else '?' - ps += "%s=%s" % (name, self.__format_query_parameter_value(val)) + ps += "%s=%s" % (name, self._format_query_parameter_value(val)) return ps @staticmethod - def __format_query_parameter_value(value) -> str: + def _format_query_parameter_value(value) -> str: if isinstance(value, list): return ','.join([quote(str(v)) for v in value]) return quote(str(value)) # Sign request with HMAC, request URI must be exactly the same # We have 30s to complete request with this token - def __sign_url(self, req_part: str) -> str: + def _sign_url(self, req_part: str) -> str: uri = '/' + self.database_id + req_part - time_part = self.__hmac_time(uri) - sign = self.__hmac_sign(uri, time_part) + time_part = self._hmac_time(uri) + sign = self._hmac_sign(uri, time_part) res = uri + time_part + '&hmac_sign=' + sign return res - def __hmac_time(self, uri: str) -> str: + def _hmac_time(self, uri: str) -> str: res = '&' if uri.find('?') != -1 else '?' res += "hmac_timestamp=%s" % int(time.time()) return res - def __hmac_sign(self, uri: str, time_part: str) -> str: + def _hmac_sign(self, uri: str, time_part: str) -> str: url = uri + time_part sign = hmac.new(str.encode(self.token), str.encode(url), sha1).hexdigest() return sign + + +class AsyncRecombeeClient(RecombeeClient): + CLIENT_CLS = httpx.AsyncClient + + async def send(self, request: Request) -> Union[dict, str, list]: + """ + :param request: Request to be sent to Recombee recommender + """ + try: + response = await self._send(request) + except httpx.TimeoutException: + raise ApiTimeoutException(request) + + self._check_errors(response, request) + return response.json() diff --git a/setup.py b/setup.py index db3c102..f6b63e7 100644 --- a/setup.py +++ b/setup.py @@ -50,7 +50,7 @@ packages=find_packages(exclude=['contrib', 'docs', 'tests']), - install_requires=['requests'], + install_requires=['httpx'], python_requires='>=3.4',