diff --git a/controller.py b/controller.py index 49b90620..a16a9a29 100644 --- a/controller.py +++ b/controller.py @@ -97,6 +97,11 @@ def __init__(self): self.add_handler(RESOURCE_TYPE_POD, TYPE_DELETED, delete_pod) + self._last_resource_version = {} + """ + Keeps track of last received version for each resource type. + """ + def add_handler(self, resource_type, event_type, handler): """ Adds an event handler for the given event type (ADD, DELETE) for the @@ -295,20 +300,33 @@ def _process_update(self, event_type, resource_type, resource): def _manage_resource(self, resource_type): """ Routine for a worker thread. Syncs with API for the given resource - and starts a watch. If an error occurs within the watch, will re-sync - with the API and re-start the watch. + and starts a watch. If an error occurs within the watch, will resync + with the API and restart the watch. """ + sync_needed = True while True: try: - # Sync existing resources for this type. - resource_version = self._sync_resources(resource_type) + if sync_needed: + # Sync existing resources for this type. + self._sync_resources(resource_type) + + # There are many exception conditions below for which we would + # need to sync again. Even though sync isn't needed in the + # most mainline case - read timeout - we save some lines of + # code by setting sync_needed True here, and resetting it below + # in the cases where it isn't needed. + sync_needed = True # Start a watch from the latest resource_version. - self._watch_resource(resource_type, resource_version) + self._watch_resource(resource_type) except requests.exceptions.ConnectTimeout as e: _log.warning("Connection attempt timed out: %s ...%s", resource_type, e) except requests.ConnectionError as e: - _log.warning("Connection error: %s ...%s", resource_type, e) + if "Read timed out" in str(e): + _log.debug("Normal read time out for %s", resource_type) + sync_needed = False + else: + _log.warning("Connection error: %s ...%s", resource_type, e) except requests.exceptions.ChunkedEncodingError: _log.exception("Read error querying: %s", resource_type) except requests.HTTPError: @@ -320,23 +338,29 @@ def _manage_resource(self, resource_type): except Exception: _log.exception("Unhandled exception killed %s manager", resource_type) finally: - # Sleep for a second so that we don't tight-loop. - _log.warning("Re-starting watch on resource: %s", - resource_type) - time.sleep(1) + if sync_needed: + # Sleep for a second so that we don't tight-loop. + _log.info("Restarting watch on resource: %s", + resource_type) + time.sleep(1) + else: + _log.debug("Restarting watch on resource: %s", + resource_type) - def _watch_resource(self, resource_type, resource_version): + def _watch_resource(self, resource_type): """ Watch the given resource type starting at the given resource version. Add any events to the event queue. """ path = WATCH_URLS[resource_type] % self.k8s_api - _log.info("Starting watch on: %s", path) + _log.debug("Starting watch on: %s", path) while True: # Attempt to stream API resources. - response = self._api_get(path, - stream=True, - resource_version=resource_version) + response = self._api_get( + path, + stream=True, + resource_version=self._last_resource_version[resource_type] + ) _log.debug("Watch response for %s: %s", path, response) # Check for successful response, raise error if not. @@ -377,9 +401,10 @@ def _watch_resource(self, resource_type, resource_version): # Extract the latest resource version. new_ver = resource["metadata"]["resourceVersion"] - _log.debug("Update resourceVersion, was: %s, now: %s", - resource_version, new_ver) - resource_version = new_ver + _log.info("Update resourceVersion, was: %s, now: %s", + self._last_resource_version[resource_type], + new_ver) + self._last_resource_version[resource_type] = new_ver def _sync_resources(self, resource_type): """ @@ -420,7 +445,7 @@ def _sync_resources(self, resource_type): _log.info("Done getting %s(s) - new resourceVersion: %s", resource_type, resource_version) - return resource_version + self._last_resource_version[resource_type] = resource_version def _api_get(self, path, stream, resource_version=None): """