Skip to content

Commit

Permalink
Merge pull request #80 from neiljerram/read-timeout-handling
Browse files Browse the repository at this point in the history
Read timeout handling
  • Loading branch information
Neil Jerram authored Mar 21, 2017
2 parents 447bb36 + 4689f2c commit 3f79ae7
Showing 1 changed file with 44 additions and 19 deletions.
63 changes: 44 additions & 19 deletions controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ def __init__(self):
self.add_handler(RESOURCE_TYPE_POD, TYPE_DELETED,
delete_pod)

self._last_resource_version = {}
"""
Keeps track of last received version for each resource type.
"""

def add_handler(self, resource_type, event_type, handler):
"""
Adds an event handler for the given event type (ADD, DELETE) for the
Expand Down Expand Up @@ -295,20 +300,33 @@ def _process_update(self, event_type, resource_type, resource):
def _manage_resource(self, resource_type):
"""
Routine for a worker thread. Syncs with API for the given resource
and starts a watch. If an error occurs within the watch, will re-sync
with the API and re-start the watch.
and starts a watch. If an error occurs within the watch, will resync
with the API and restart the watch.
"""
sync_needed = True
while True:
try:
# Sync existing resources for this type.
resource_version = self._sync_resources(resource_type)
if sync_needed:
# Sync existing resources for this type.
self._sync_resources(resource_type)

# There are many exception conditions below for which we would
# need to sync again. Even though sync isn't needed in the
# most mainline case - read timeout - we save some lines of
# code by setting sync_needed True here, and resetting it below
# in the cases where it isn't needed.
sync_needed = True

# Start a watch from the latest resource_version.
self._watch_resource(resource_type, resource_version)
self._watch_resource(resource_type)
except requests.exceptions.ConnectTimeout as e:
_log.warning("Connection attempt timed out: %s ...%s", resource_type, e)
except requests.ConnectionError as e:
_log.warning("Connection error: %s ...%s", resource_type, e)
if "Read timed out" in str(e):
_log.debug("Normal read time out for %s", resource_type)
sync_needed = False
else:
_log.warning("Connection error: %s ...%s", resource_type, e)
except requests.exceptions.ChunkedEncodingError:
_log.exception("Read error querying: %s", resource_type)
except requests.HTTPError:
Expand All @@ -320,23 +338,29 @@ def _manage_resource(self, resource_type):
except Exception:
_log.exception("Unhandled exception killed %s manager", resource_type)
finally:
# Sleep for a second so that we don't tight-loop.
_log.warning("Re-starting watch on resource: %s",
resource_type)
time.sleep(1)
if sync_needed:
# Sleep for a second so that we don't tight-loop.
_log.info("Restarting watch on resource: %s",
resource_type)
time.sleep(1)
else:
_log.debug("Restarting watch on resource: %s",
resource_type)

def _watch_resource(self, resource_type, resource_version):
def _watch_resource(self, resource_type):
"""
Watch the given resource type starting at the given resource version.
Add any events to the event queue.
"""
path = WATCH_URLS[resource_type] % self.k8s_api
_log.info("Starting watch on: %s", path)
_log.debug("Starting watch on: %s", path)
while True:
# Attempt to stream API resources.
response = self._api_get(path,
stream=True,
resource_version=resource_version)
response = self._api_get(
path,
stream=True,
resource_version=self._last_resource_version[resource_type]
)
_log.debug("Watch response for %s: %s", path, response)

# Check for successful response, raise error if not.
Expand Down Expand Up @@ -377,9 +401,10 @@ def _watch_resource(self, resource_type, resource_version):

# Extract the latest resource version.
new_ver = resource["metadata"]["resourceVersion"]
_log.debug("Update resourceVersion, was: %s, now: %s",
resource_version, new_ver)
resource_version = new_ver
_log.info("Update resourceVersion, was: %s, now: %s",
self._last_resource_version[resource_type],
new_ver)
self._last_resource_version[resource_type] = new_ver

def _sync_resources(self, resource_type):
"""
Expand Down Expand Up @@ -420,7 +445,7 @@ def _sync_resources(self, resource_type):

_log.info("Done getting %s(s) - new resourceVersion: %s",
resource_type, resource_version)
return resource_version
self._last_resource_version[resource_type] = resource_version

def _api_get(self, path, stream, resource_version=None):
"""
Expand Down

0 comments on commit 3f79ae7

Please sign in to comment.