From 01c2e30c620b1160c61903e91cf4dfb5c78881df Mon Sep 17 00:00:00 2001 From: ramnes Date: Mon, 2 Oct 2023 17:16:40 +0200 Subject: [PATCH 1/4] Move from requests to httpx --- recombee_api_client/api_client.py | 35 ++++++++++++++++--------------- setup.py | 2 +- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/recombee_api_client/api_client.py b/recombee_api_client/api_client.py index 4b6546c..ce4386d 100644 --- a/recombee_api_client/api_client.py +++ b/recombee_api_client/api_client.py @@ -5,7 +5,7 @@ from typing import Union from enum import Enum -import requests +import httpx from hashlib import sha1 from urllib.parse import quote @@ -43,6 +43,7 @@ def __init__(self, database_id: str, token: str, protocol: str = 'https', option self.protocol = protocol self.base_uri = self.__get_base_uri(options=options or {}, region=region) + self.client = httpx.Client() def send(self, request: Request) -> Union[dict, str, list]: """ @@ -66,7 +67,7 @@ def send(self, request: Request) -> Union[dict, str, list]: return self.__post(request, uri, timeout) elif request.method == 'delete': return self.__delete(request, uri, timeout) - except requests.exceptions.Timeout: + except httpx.TimeoutException: raise ApiTimeoutException(request) @staticmethod @@ -100,33 +101,33 @@ def __get_http_headers(additional_headers: dict = None) -> dict: return headers def __put(self, request: Request, uri: str, timeout: int): - response = requests.put(uri, - data=json.dumps(request.get_body_parameters()), - headers=self.__get_http_headers({'Content-Type': 'application/json'}), - timeout=timeout) + response = self.client.put(uri, + data=json.dumps(request.get_body_parameters()), + headers=self.__get_http_headers({'Content-Type': 'application/json'}), + timeout=timeout) self.__check_errors(response, request) return response.json() def __get(self, request: Request, uri: str, timeout: int): - response = requests.get(uri, - headers=self.__get_http_headers(), - timeout=timeout) + response = self.client.get(uri, + headers=self.__get_http_headers(), + timeout=timeout) self.__check_errors(response, request) return response.json() def __post(self, request: Request, uri: str, timeout: int): - response = requests.post(uri, - data=json.dumps(request.get_body_parameters()), - headers=self.__get_http_headers({'Content-Type': 'application/json'}), - timeout=timeout) + response = self.client.post(uri, + data=json.dumps(request.get_body_parameters()), + headers=self.__get_http_headers({'Content-Type': 'application/json'}), + timeout=timeout) self.__check_errors(response, request) return response.json() def __delete(self, request: Request, uri: str, timeout: int): - response = requests.delete(uri, - data=json.dumps(request.get_body_parameters()), - headers=self.__get_http_headers({'Content-Type': 'application/json'}), - timeout=timeout) + response = self.client.delete(uri, + data=json.dumps(request.get_body_parameters()), + headers=self.__get_http_headers({'Content-Type': 'application/json'}), + timeout=timeout) self.__check_errors(response, request) return response.json() diff --git a/setup.py b/setup.py index db3c102..f6b63e7 100644 --- a/setup.py +++ b/setup.py @@ -50,7 +50,7 @@ packages=find_packages(exclude=['contrib', 'docs', 'tests']), - install_requires=['requests'], + install_requires=['httpx'], python_requires='>=3.4', From b5770b4bf94ff845185074f3c1deac2ce713f1d9 Mon Sep 17 00:00:00 2001 From: ramnes Date: Mon, 2 Oct 2023 17:28:58 +0200 Subject: [PATCH 2/4] Remove mangling to make inheritance simpler --- recombee_api_client/api_client.py | 76 +++++++++++++++---------------- 1 file changed, 38 insertions(+), 38 deletions(-) diff --git a/recombee_api_client/api_client.py b/recombee_api_client/api_client.py index ce4386d..e8f2ca4 100644 --- a/recombee_api_client/api_client.py +++ b/recombee_api_client/api_client.py @@ -42,7 +42,7 @@ def __init__(self, database_id: str, token: str, protocol: str = 'https', option self.token = token self.protocol = protocol - self.base_uri = self.__get_base_uri(options=options or {}, region=region) + self.base_uri = self._get_base_uri(options=options or {}, region=region) self.client = httpx.Client() def send(self, request: Request) -> Union[dict, str, list]: @@ -51,27 +51,27 @@ def send(self, request: Request) -> Union[dict, str, list]: """ if isinstance(request, Batch) and len(request.requests) > self.BATCH_MAX_SIZE: - return self.__send_multipart_batch(request) + return self._send_multipart_batch(request) timeout = request.timeout / 1000 - uri = self.__process_request_uri(request) - uri = self.__sign_url(uri) + uri = self._process_request_uri(request) + uri = self._sign_url(uri) protocol = 'https' if request.ensure_https else self.protocol uri = protocol + '://' + self.base_uri + uri try: if request.method == 'put': - return self.__put(request, uri, timeout) + return self._put(request, uri, timeout) elif request.method == 'get': - return self.__get(request, uri, timeout) + return self._get(request, uri, timeout) elif request.method == 'post': - return self.__post(request, uri, timeout) + return self._post(request, uri, timeout) elif request.method == 'delete': - return self.__delete(request, uri, timeout) + return self._delete(request, uri, timeout) except httpx.TimeoutException: raise ApiTimeoutException(request) @staticmethod - def __get_regional_base_uri(region: Region) -> str: + def _get_regional_base_uri(region: Region) -> str: uri = { Region.AP_SE: 'rapi-ap-se.recombee.com', Region.CA_EAST: 'rapi-ca-east.recombee.com', @@ -84,106 +84,106 @@ def __get_regional_base_uri(region: Region) -> str: return uri @staticmethod - def __get_base_uri(options: dict, region: str) -> str: + def _get_base_uri(options: dict, region: str) -> str: base_uri = os.environ.get('RAPI_URI') or options.get('base_uri') if region is not None: if base_uri: raise ValueError('base_uri and region cannot be specified at the same time') - base_uri = RecombeeClient.__get_regional_base_uri(region) + base_uri = RecombeeClient._get_regional_base_uri(region) return base_uri or 'rapi.recombee.com' @staticmethod - def __get_http_headers(additional_headers: dict = None) -> dict: + def _get_http_headers(additional_headers: dict = None) -> dict: headers = {'User-Agent': 'recombee-python-api-client/4.1.0'} if additional_headers: headers.update(additional_headers) return headers - def __put(self, request: Request, uri: str, timeout: int): + def _put(self, request: Request, uri: str, timeout: int): response = self.client.put(uri, data=json.dumps(request.get_body_parameters()), - headers=self.__get_http_headers({'Content-Type': 'application/json'}), + headers=self._get_http_headers({'Content-Type': 'application/json'}), timeout=timeout) - self.__check_errors(response, request) + self._check_errors(response, request) return response.json() - def __get(self, request: Request, uri: str, timeout: int): + def _get(self, request: Request, uri: str, timeout: int): response = self.client.get(uri, - headers=self.__get_http_headers(), + headers=self._get_http_headers(), timeout=timeout) - self.__check_errors(response, request) + self._check_errors(response, request) return response.json() - def __post(self, request: Request, uri: str, timeout: int): + def _post(self, request: Request, uri: str, timeout: int): response = self.client.post(uri, data=json.dumps(request.get_body_parameters()), - headers=self.__get_http_headers({'Content-Type': 'application/json'}), + headers=self._get_http_headers({'Content-Type': 'application/json'}), timeout=timeout) - self.__check_errors(response, request) + self._check_errors(response, request) return response.json() - def __delete(self, request: Request, uri: str, timeout: int): + def _delete(self, request: Request, uri: str, timeout: int): response = self.client.delete(uri, data=json.dumps(request.get_body_parameters()), - headers=self.__get_http_headers({'Content-Type': 'application/json'}), + headers=self._get_http_headers({'Content-Type': 'application/json'}), timeout=timeout) - self.__check_errors(response, request) + self._check_errors(response, request) return response.json() - def __check_errors(self, response, request: Request): + def _check_errors(self, response, request: Request): status_code = response.status_code if status_code == 200 or status_code == 201: return raise ResponseException(request, status_code, response.text) @staticmethod - def __get_list_chunks(l: list, n: int) -> list: + def _get_list_chunks(l: list, n: int) -> list: """Yield successive n-sized chunks from l.""" for i in range(0, len(l), n): yield l[i:i + n] - def __send_multipart_batch(self, batch: Batch) -> list: - requests_parts = [rqs for rqs in self.__get_list_chunks(batch.requests, self.BATCH_MAX_SIZE)] + def _send_multipart_batch(self, batch: Batch) -> list: + requests_parts = [rqs for rqs in self._get_list_chunks(batch.requests, self.BATCH_MAX_SIZE)] responses = [self.send(Batch(rqs)) for rqs in requests_parts] return sum(responses, []) - def __process_request_uri(self, request: Request) -> str: + def _process_request_uri(self, request: Request) -> str: uri = request.path - uri += self.__query_parameters_to_url(request) + uri += self._query_parameters_to_url(request) return uri - def __query_parameters_to_url(self, request: Request) -> str: + def _query_parameters_to_url(self, request: Request) -> str: ps = '' query_params = request.get_query_parameters() for name in query_params: val = query_params[name] ps += '&' if ps.find('?') != -1 else '?' - ps += "%s=%s" % (name, self.__format_query_parameter_value(val)) + ps += "%s=%s" % (name, self._format_query_parameter_value(val)) return ps @staticmethod - def __format_query_parameter_value(value) -> str: + def _format_query_parameter_value(value) -> str: if isinstance(value, list): return ','.join([quote(str(v)) for v in value]) return quote(str(value)) # Sign request with HMAC, request URI must be exactly the same # We have 30s to complete request with this token - def __sign_url(self, req_part: str) -> str: + def _sign_url(self, req_part: str) -> str: uri = '/' + self.database_id + req_part - time_part = self.__hmac_time(uri) - sign = self.__hmac_sign(uri, time_part) + time_part = self._hmac_time(uri) + sign = self._hmac_sign(uri, time_part) res = uri + time_part + '&hmac_sign=' + sign return res - def __hmac_time(self, uri: str) -> str: + def _hmac_time(self, uri: str) -> str: res = '&' if uri.find('?') != -1 else '?' res += "hmac_timestamp=%s" % int(time.time()) return res - def __hmac_sign(self, uri: str, time_part: str) -> str: + def _hmac_sign(self, uri: str, time_part: str) -> str: url = uri + time_part sign = hmac.new(str.encode(self.token), str.encode(url), sha1).hexdigest() return sign From 7ad7789f2c0474097e4ad9861c22d7013eacf75a Mon Sep 17 00:00:00 2001 From: ramnes Date: Mon, 2 Oct 2023 17:31:58 +0200 Subject: [PATCH 3/4] Keep HTTP handlers as simple as possible --- recombee_api_client/api_client.py | 49 ++++++++++++++----------------- 1 file changed, 22 insertions(+), 27 deletions(-) diff --git a/recombee_api_client/api_client.py b/recombee_api_client/api_client.py index e8f2ca4..4ae730f 100644 --- a/recombee_api_client/api_client.py +++ b/recombee_api_client/api_client.py @@ -60,16 +60,19 @@ def send(self, request: Request) -> Union[dict, str, list]: uri = protocol + '://' + self.base_uri + uri try: if request.method == 'put': - return self._put(request, uri, timeout) + response = self._put(request, uri, timeout) elif request.method == 'get': - return self._get(request, uri, timeout) + response = self._get(request, uri, timeout) elif request.method == 'post': - return self._post(request, uri, timeout) + response = self._post(request, uri, timeout) elif request.method == 'delete': - return self._delete(request, uri, timeout) + response = self._delete(request, uri, timeout) except httpx.TimeoutException: raise ApiTimeoutException(request) + self._check_errors(response, request) + return response.json() + @staticmethod def _get_regional_base_uri(region: Region) -> str: uri = { @@ -101,35 +104,27 @@ def _get_http_headers(additional_headers: dict = None) -> dict: return headers def _put(self, request: Request, uri: str, timeout: int): - response = self.client.put(uri, - data=json.dumps(request.get_body_parameters()), - headers=self._get_http_headers({'Content-Type': 'application/json'}), - timeout=timeout) - self._check_errors(response, request) - return response.json() + return self.client.put(uri, + data=json.dumps(request.get_body_parameters()), + headers=self._get_http_headers({'Content-Type': 'application/json'}), + timeout=timeout) def _get(self, request: Request, uri: str, timeout: int): - response = self.client.get(uri, - headers=self._get_http_headers(), - timeout=timeout) - self._check_errors(response, request) - return response.json() + return self.client.get(uri, + headers=self._get_http_headers(), + timeout=timeout) def _post(self, request: Request, uri: str, timeout: int): - response = self.client.post(uri, - data=json.dumps(request.get_body_parameters()), - headers=self._get_http_headers({'Content-Type': 'application/json'}), - timeout=timeout) - self._check_errors(response, request) - return response.json() + return self.client.post(uri, + data=json.dumps(request.get_body_parameters()), + headers=self._get_http_headers({'Content-Type': 'application/json'}), + timeout=timeout) def _delete(self, request: Request, uri: str, timeout: int): - response = self.client.delete(uri, - data=json.dumps(request.get_body_parameters()), - headers=self._get_http_headers({'Content-Type': 'application/json'}), - timeout=timeout) - self._check_errors(response, request) - return response.json() + return self.client.delete(uri, + data=json.dumps(request.get_body_parameters()), + headers=self._get_http_headers({'Content-Type': 'application/json'}), + timeout=timeout) def _check_errors(self, response, request: Request): status_code = response.status_code From 5b9af6f11504b3894fe58bfe7a00f498555c175d Mon Sep 17 00:00:00 2001 From: ramnes Date: Mon, 2 Oct 2023 17:34:00 +0200 Subject: [PATCH 4/4] Implement AsyncRecombeeClient --- recombee_api_client/api_client.py | 48 ++++++++++++++++++++++--------- 1 file changed, 34 insertions(+), 14 deletions(-) diff --git a/recombee_api_client/api_client.py b/recombee_api_client/api_client.py index 4ae730f..01b50d0 100644 --- a/recombee_api_client/api_client.py +++ b/recombee_api_client/api_client.py @@ -36,6 +36,7 @@ class RecombeeClient: :param region: region of the Recombee cluster where the database is located """ BATCH_MAX_SIZE = 10000 + CLIENT_CLS = httpx.Client def __init__(self, database_id: str, token: str, protocol: str = 'https', options: dict = None, region: Region = None): self.database_id = database_id @@ -43,13 +44,9 @@ def __init__(self, database_id: str, token: str, protocol: str = 'https', option self.protocol = protocol self.base_uri = self._get_base_uri(options=options or {}, region=region) - self.client = httpx.Client() + self.client = self.CLIENT_CLS() - def send(self, request: Request) -> Union[dict, str, list]: - """ - :param request: Request to be sent to Recombee recommender - """ - + def _send(self, request: Request) -> Union[dict, str, list]: if isinstance(request, Batch) and len(request.requests) > self.BATCH_MAX_SIZE: return self._send_multipart_batch(request) @@ -58,15 +55,22 @@ def send(self, request: Request) -> Union[dict, str, list]: uri = self._sign_url(uri) protocol = 'https' if request.ensure_https else self.protocol uri = protocol + '://' + self.base_uri + uri + + if request.method == 'put': + return self._put(request, uri, timeout) + elif request.method == 'get': + return self._get(request, uri, timeout) + elif request.method == 'post': + return self._post(request, uri, timeout) + elif request.method == 'delete': + return self._delete(request, uri, timeout) + + def send(self, request: Request) -> Union[dict, str, list]: + """ + :param request: Request to be sent to Recombee recommender + """ try: - if request.method == 'put': - response = self._put(request, uri, timeout) - elif request.method == 'get': - response = self._get(request, uri, timeout) - elif request.method == 'post': - response = self._post(request, uri, timeout) - elif request.method == 'delete': - response = self._delete(request, uri, timeout) + response = self._send(request) except httpx.TimeoutException: raise ApiTimeoutException(request) @@ -182,3 +186,19 @@ def _hmac_sign(self, uri: str, time_part: str) -> str: url = uri + time_part sign = hmac.new(str.encode(self.token), str.encode(url), sha1).hexdigest() return sign + + +class AsyncRecombeeClient(RecombeeClient): + CLIENT_CLS = httpx.AsyncClient + + async def send(self, request: Request) -> Union[dict, str, list]: + """ + :param request: Request to be sent to Recombee recommender + """ + try: + response = await self._send(request) + except httpx.TimeoutException: + raise ApiTimeoutException(request) + + self._check_errors(response, request) + return response.json()