From 1ceb24d7ef719c6298daf7bec150e125488ac3ce Mon Sep 17 00:00:00 2001 From: Gal Topper Date: Thu, 11 Jul 2024 16:50:08 +0800 Subject: [PATCH] Add request timeout (#121) * Add request timeout [ML-5598](https://iguazio.atlassian.net/browse/ML-5598) * lint --- v3io/dataplane/kv_timestamp.py | 10 +--- v3io/dataplane/transport/httpclient.py | 63 ++++++++++++++------------ 2 files changed, 34 insertions(+), 39 deletions(-) diff --git a/v3io/dataplane/kv_timestamp.py b/v3io/dataplane/kv_timestamp.py index f3d2d59..fdda3c2 100644 --- a/v3io/dataplane/kv_timestamp.py +++ b/v3io/dataplane/kv_timestamp.py @@ -13,13 +13,12 @@ # limitations under the License. # import datetime -import sys # used only n py2 BASE_DATETIME = datetime.datetime(1970, 1, 1) -def _get_timestamp_from_datetime_py3(dt): +def _get_timestamp_from_datetime(dt): return dt.astimezone(datetime.timezone.utc).timestamp() @@ -27,13 +26,6 @@ def _get_timestamp_from_datetime_py2(dt): return (dt - BASE_DATETIME).total_seconds() -# _get_timestamp_from_datetime is python version specific. resolve this once -if sys.version_info[0] >= 3: - _get_timestamp_from_datetime = _get_timestamp_from_datetime_py3 -else: - _get_timestamp_from_datetime = _get_timestamp_from_datetime_py2 - - def encode(dt): timestamp = _get_timestamp_from_datetime(dt) diff --git a/v3io/dataplane/transport/httpclient.py b/v3io/dataplane/transport/httpclient.py index 63924d4..ab8ce73 100644 --- a/v3io/dataplane/transport/httpclient.py +++ b/v3io/dataplane/transport/httpclient.py @@ -16,7 +16,6 @@ import queue import socket import ssl -import sys import v3io.dataplane.request import v3io.dataplane.response @@ -25,6 +24,9 @@ class Transport(abstract.Transport): + _connection_timeout_seconds = 20 + _request_max_retries = 2 + def __init__(self, logger, endpoint=None, max_connections=None, timeout=None, verbosity=None): super(Transport, self).__init__(logger, endpoint, max_connections, timeout, verbosity) @@ -36,24 +38,19 @@ def __init__(self, logger, endpoint=None, max_connections=None, timeout=None, ve # create the pool connection self._create_connections(self.max_connections, self._host, self._ssl_context) - # python 2 and 3 have different exceptions - if sys.version_info[0] >= 3: - self._wait_response_exceptions = ( - http.client.RemoteDisconnected, - ConnectionResetError, - ConnectionRefusedError, - http.client.ResponseNotReady, - ) - self._send_request_exceptions = ( - BrokenPipeError, - http.client.CannotSendRequest, - http.client.RemoteDisconnected, - ) - self._get_status_and_headers = self._get_status_and_headers_py3 - else: - self._wait_response_exceptions = (http.client.BadStatusLine, socket.error) - self._send_request_exceptions = (http.client.CannotSendRequest, http.client.BadStatusLine) - self._get_status_and_headers = self._get_status_and_headers_py2 + self._wait_response_exceptions = ( + http.client.RemoteDisconnected, + ConnectionResetError, + ConnectionRefusedError, + http.client.ResponseNotReady, + ) + self._send_request_exceptions = ( + BrokenPipeError, + http.client.CannotSendRequest, + http.client.RemoteDisconnected, + socket.timeout, + ) + self._get_status_and_headers = self._get_status_and_headers_py3 def close(self): # Ignore redundant calls to close @@ -154,20 +151,27 @@ def _send_request_on_connection(self, request, connection): self.log( "Tx", connection=connection, method=request.method, path=path, headers=request.headers, body=request.body ) + starting_offset = 0 is_body_seekable = request.body and hasattr(request.body, "seek") and hasattr(request.body, "tell") if is_body_seekable: starting_offset = request.body.tell() - try: + + retries_left = self._request_max_retries + while True: try: connection.request(request.method, path, request.body, request.headers) + break except self._send_request_exceptions as e: self._logger.debug_with( - "Disconnected while attempting to send. Recreating connection and retrying", + f"Disconnected while attempting to send request – " + f"{retries_left} out of {self._request_max_retries} retries left.", e=type(e), e_msg=e, - connection=connection, ) + if retries_left == 0: + raise + retries_left -= 1 connection.close() if is_body_seekable: # If the first connection fails, the pointer of the body might move at the size @@ -176,12 +180,11 @@ def _send_request_on_connection(self, request, connection): request.body.seek(starting_offset) connection = self._create_connection(self._host, self._ssl_context) request.transport.connection_used = connection - connection.request(request.method, path, request.body, request.headers) - except BaseException as e: - self._logger.error_with( - "Unhandled exception while sending request", e=type(e), e_msg=e, connection=connection - ) - raise e + except BaseException as e: + self._logger.error_with( + "Unhandled exception while sending request", e=type(e), e_msg=e, connection=connection + ) + raise e return request @@ -192,9 +195,9 @@ def _create_connections(self, num_connections, host, ssl_context): def _create_connection(self, host, ssl_context): if ssl_context is None: - return http.client.HTTPConnection(host) + return http.client.HTTPConnection(host, timeout=self._connection_timeout_seconds) - return http.client.HTTPSConnection(host, context=ssl_context) + return http.client.HTTPSConnection(host, timeout=self._connection_timeout_seconds, context=ssl_context) def _parse_endpoint(self, endpoint): if endpoint.startswith("http://"):