Skip to content

Commit

Permalink
Add request timeout (#121)
Browse files Browse the repository at this point in the history
* Add request timeout

[ML-5598](https://iguazio.atlassian.net/browse/ML-5598)

* lint
  • Loading branch information
gtopper authored Jul 11, 2024
1 parent b16fbf8 commit 1ceb24d
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 39 deletions.
10 changes: 1 addition & 9 deletions v3io/dataplane/kv_timestamp.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,19 @@
# limitations under the License.
#
import datetime
import sys

# used only n py2
BASE_DATETIME = datetime.datetime(1970, 1, 1)


def _get_timestamp_from_datetime_py3(dt):
def _get_timestamp_from_datetime(dt):
return dt.astimezone(datetime.timezone.utc).timestamp()


def _get_timestamp_from_datetime_py2(dt):
return (dt - BASE_DATETIME).total_seconds()


# _get_timestamp_from_datetime is python version specific. resolve this once
if sys.version_info[0] >= 3:
_get_timestamp_from_datetime = _get_timestamp_from_datetime_py3
else:
_get_timestamp_from_datetime = _get_timestamp_from_datetime_py2


def encode(dt):
timestamp = _get_timestamp_from_datetime(dt)

Expand Down
63 changes: 33 additions & 30 deletions v3io/dataplane/transport/httpclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import queue
import socket
import ssl
import sys

import v3io.dataplane.request
import v3io.dataplane.response
Expand All @@ -25,6 +24,9 @@


class Transport(abstract.Transport):
_connection_timeout_seconds = 20
_request_max_retries = 2

def __init__(self, logger, endpoint=None, max_connections=None, timeout=None, verbosity=None):
super(Transport, self).__init__(logger, endpoint, max_connections, timeout, verbosity)

Expand All @@ -36,24 +38,19 @@ def __init__(self, logger, endpoint=None, max_connections=None, timeout=None, ve
# create the pool connection
self._create_connections(self.max_connections, self._host, self._ssl_context)

# python 2 and 3 have different exceptions
if sys.version_info[0] >= 3:
self._wait_response_exceptions = (
http.client.RemoteDisconnected,
ConnectionResetError,
ConnectionRefusedError,
http.client.ResponseNotReady,
)
self._send_request_exceptions = (
BrokenPipeError,
http.client.CannotSendRequest,
http.client.RemoteDisconnected,
)
self._get_status_and_headers = self._get_status_and_headers_py3
else:
self._wait_response_exceptions = (http.client.BadStatusLine, socket.error)
self._send_request_exceptions = (http.client.CannotSendRequest, http.client.BadStatusLine)
self._get_status_and_headers = self._get_status_and_headers_py2
self._wait_response_exceptions = (
http.client.RemoteDisconnected,
ConnectionResetError,
ConnectionRefusedError,
http.client.ResponseNotReady,
)
self._send_request_exceptions = (
BrokenPipeError,
http.client.CannotSendRequest,
http.client.RemoteDisconnected,
socket.timeout,
)
self._get_status_and_headers = self._get_status_and_headers_py3

def close(self):
# Ignore redundant calls to close
Expand Down Expand Up @@ -154,20 +151,27 @@ def _send_request_on_connection(self, request, connection):
self.log(
"Tx", connection=connection, method=request.method, path=path, headers=request.headers, body=request.body
)

starting_offset = 0
is_body_seekable = request.body and hasattr(request.body, "seek") and hasattr(request.body, "tell")
if is_body_seekable:
starting_offset = request.body.tell()
try:

retries_left = self._request_max_retries
while True:
try:
connection.request(request.method, path, request.body, request.headers)
break
except self._send_request_exceptions as e:
self._logger.debug_with(
"Disconnected while attempting to send. Recreating connection and retrying",
f"Disconnected while attempting to send request – "
f"{retries_left} out of {self._request_max_retries} retries left.",
e=type(e),
e_msg=e,
connection=connection,
)
if retries_left == 0:
raise
retries_left -= 1
connection.close()
if is_body_seekable:
# If the first connection fails, the pointer of the body might move at the size
Expand All @@ -176,12 +180,11 @@ def _send_request_on_connection(self, request, connection):
request.body.seek(starting_offset)
connection = self._create_connection(self._host, self._ssl_context)
request.transport.connection_used = connection
connection.request(request.method, path, request.body, request.headers)
except BaseException as e:
self._logger.error_with(
"Unhandled exception while sending request", e=type(e), e_msg=e, connection=connection
)
raise e
except BaseException as e:
self._logger.error_with(
"Unhandled exception while sending request", e=type(e), e_msg=e, connection=connection
)
raise e

return request

Expand All @@ -192,9 +195,9 @@ def _create_connections(self, num_connections, host, ssl_context):

def _create_connection(self, host, ssl_context):
if ssl_context is None:
return http.client.HTTPConnection(host)
return http.client.HTTPConnection(host, timeout=self._connection_timeout_seconds)

return http.client.HTTPSConnection(host, context=ssl_context)
return http.client.HTTPSConnection(host, timeout=self._connection_timeout_seconds, context=ssl_context)

def _parse_endpoint(self, endpoint):
if endpoint.startswith("http://"):
Expand Down

0 comments on commit 1ceb24d

Please sign in to comment.