From 1653c7510db3b30c66594a4327b8860791136fd2 Mon Sep 17 00:00:00 2001 From: Neil Jerram Date: Tue, 21 Mar 2017 12:27:48 +0000 Subject: [PATCH 1/3] Don't resync or sleep on normal read timeout --- controller.py | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/controller.py b/controller.py index 49b90620..ccdfff67 100644 --- a/controller.py +++ b/controller.py @@ -298,17 +298,30 @@ def _manage_resource(self, resource_type): and starts a watch. If an error occurs within the watch, will re-sync with the API and re-start the watch. """ + sync_needed = True while True: try: - # Sync existing resources for this type. - resource_version = self._sync_resources(resource_type) + if sync_needed: + # Sync existing resources for this type. + resource_version = self._sync_resources(resource_type) + + # There are many exception conditions below for which we would + # need to sync again. Even though sync isn't needed in the + # most mainline case - read timeout - we save some lines of + # code by setting sync_needed True here, and resetting it below + # in the cases where it isn't needed. + sync_needed = True # Start a watch from the latest resource_version. self._watch_resource(resource_type, resource_version) except requests.exceptions.ConnectTimeout as e: _log.warning("Connection attempt timed out: %s ...%s", resource_type, e) except requests.ConnectionError as e: - _log.warning("Connection error: %s ...%s", resource_type, e) + if "Read timed out" in str(e): + _log.debug("Normal read time out for %s", resource_type) + sync_needed = False + else: + _log.warning("Connection error: %s ...%s", resource_type, e) except requests.exceptions.ChunkedEncodingError: _log.exception("Read error querying: %s", resource_type) except requests.HTTPError: @@ -320,10 +333,14 @@ def _manage_resource(self, resource_type): except Exception: _log.exception("Unhandled exception killed %s manager", resource_type) finally: - # Sleep for a second so that we don't tight-loop. - _log.warning("Re-starting watch on resource: %s", - resource_type) - time.sleep(1) + if sync_needed: + # Sleep for a second so that we don't tight-loop. + _log.warning("Re-starting watch on resource: %s", + resource_type) + time.sleep(1) + else: + _log.debug ("Re-starting watch on resource: %s", + resource_type) def _watch_resource(self, resource_type, resource_version): """ From 5d7beb0400daf2840d48e57e87416fa78ab86562 Mon Sep 17 00:00:00 2001 From: Neil Jerram Date: Tue, 21 Mar 2017 12:35:34 +0000 Subject: [PATCH 2/3] Downgrade logs that are overly prevalent in normal operation The warning log for restarting a watch after an exception is unnecessarily severe, given that there has already been another warning or exception log describing the situation. The info log for starting a watch now occurs 3 times every 10 seconds, in normal operation, so should be debug. (Also fix "re-" misspellings.) --- controller.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/controller.py b/controller.py index ccdfff67..7c864985 100644 --- a/controller.py +++ b/controller.py @@ -295,8 +295,8 @@ def _process_update(self, event_type, resource_type, resource): def _manage_resource(self, resource_type): """ Routine for a worker thread. Syncs with API for the given resource - and starts a watch. If an error occurs within the watch, will re-sync - with the API and re-start the watch. + and starts a watch. If an error occurs within the watch, will resync + with the API and restart the watch. """ sync_needed = True while True: @@ -335,12 +335,12 @@ def _manage_resource(self, resource_type): finally: if sync_needed: # Sleep for a second so that we don't tight-loop. - _log.warning("Re-starting watch on resource: %s", - resource_type) + _log.info("Restarting watch on resource: %s", + resource_type) time.sleep(1) else: - _log.debug ("Re-starting watch on resource: %s", - resource_type) + _log.debug("Restarting watch on resource: %s", + resource_type) def _watch_resource(self, resource_type, resource_version): """ @@ -348,7 +348,7 @@ def _watch_resource(self, resource_type, resource_version): Add any events to the event queue. """ path = WATCH_URLS[resource_type] % self.k8s_api - _log.info("Starting watch on: %s", path) + _log.debug("Starting watch on: %s", path) while True: # Attempt to stream API resources. response = self._api_get(path, From 4689f2c9f9e9dc38468cdb1409f5eeaa0848c3f4 Mon Sep 17 00:00:00 2001 From: Neil Jerram Date: Tue, 21 Mar 2017 14:12:37 +0000 Subject: [PATCH 3/3] Fix version tracking on read timeout after an update As we now timeout and restart reads (aka watches) after 10 seconds, what was happening was: - start a new read, from resource version X - get an update from k8s (a genuine one, e.g. new namespace), with resource version Y > X - continue the existing read... (because that what 'stream=True' means) - ...until it times out after another 10s: requests raises an exception that is caught in _manage_resource - _manage_resource correctly skips calling _sync_resources, then calls _watch_resource again with resource version X - k8s returns 410 error because X is too old; the call should have had version Y: exception is raised and caught in _manage_resource - this exception causes us to resync, thus defeating our attempts to avoid an (unnecessary resync). We can't pass the new resource version back up to _manage_resource when the read timeout exception is raised, so rework version tracking to use a dict in the Controller object. --- controller.py | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/controller.py b/controller.py index 7c864985..a16a9a29 100644 --- a/controller.py +++ b/controller.py @@ -97,6 +97,11 @@ def __init__(self): self.add_handler(RESOURCE_TYPE_POD, TYPE_DELETED, delete_pod) + self._last_resource_version = {} + """ + Keeps track of last received version for each resource type. + """ + def add_handler(self, resource_type, event_type, handler): """ Adds an event handler for the given event type (ADD, DELETE) for the @@ -303,7 +308,7 @@ def _manage_resource(self, resource_type): try: if sync_needed: # Sync existing resources for this type. - resource_version = self._sync_resources(resource_type) + self._sync_resources(resource_type) # There are many exception conditions below for which we would # need to sync again. Even though sync isn't needed in the @@ -313,7 +318,7 @@ def _manage_resource(self, resource_type): sync_needed = True # Start a watch from the latest resource_version. - self._watch_resource(resource_type, resource_version) + self._watch_resource(resource_type) except requests.exceptions.ConnectTimeout as e: _log.warning("Connection attempt timed out: %s ...%s", resource_type, e) except requests.ConnectionError as e: @@ -342,7 +347,7 @@ def _manage_resource(self, resource_type): _log.debug("Restarting watch on resource: %s", resource_type) - def _watch_resource(self, resource_type, resource_version): + def _watch_resource(self, resource_type): """ Watch the given resource type starting at the given resource version. Add any events to the event queue. @@ -351,9 +356,11 @@ def _watch_resource(self, resource_type, resource_version): _log.debug("Starting watch on: %s", path) while True: # Attempt to stream API resources. - response = self._api_get(path, - stream=True, - resource_version=resource_version) + response = self._api_get( + path, + stream=True, + resource_version=self._last_resource_version[resource_type] + ) _log.debug("Watch response for %s: %s", path, response) # Check for successful response, raise error if not. @@ -394,9 +401,10 @@ def _watch_resource(self, resource_type, resource_version): # Extract the latest resource version. new_ver = resource["metadata"]["resourceVersion"] - _log.debug("Update resourceVersion, was: %s, now: %s", - resource_version, new_ver) - resource_version = new_ver + _log.info("Update resourceVersion, was: %s, now: %s", + self._last_resource_version[resource_type], + new_ver) + self._last_resource_version[resource_type] = new_ver def _sync_resources(self, resource_type): """ @@ -437,7 +445,7 @@ def _sync_resources(self, resource_type): _log.info("Done getting %s(s) - new resourceVersion: %s", resource_type, resource_version) - return resource_version + self._last_resource_version[resource_type] = resource_version def _api_get(self, path, stream, resource_version=None): """