From 6dbc195de0f9c511577fc4a950831761ef144de2 Mon Sep 17 00:00:00 2001 From: banesullivan Date: Thu, 16 Jun 2022 11:43:33 -0600 Subject: [PATCH 1/4] Abstract caching and support entrypoints --- large_image/cache_util/base.py | 79 ++++++++++++++++++++++++++ large_image/cache_util/cachefactory.py | 67 +++++++++++++++------- large_image/cache_util/memcache.py | 59 +++++++++---------- large_image/config.py | 2 +- 4 files changed, 155 insertions(+), 52 deletions(-) create mode 100644 large_image/cache_util/base.py diff --git a/large_image/cache_util/base.py b/large_image/cache_util/base.py new file mode 100644 index 000000000..46fd059da --- /dev/null +++ b/large_image/cache_util/base.py @@ -0,0 +1,79 @@ +import hashlib +import time + +import cachetools + + +class BaseCache(cachetools.Cache): + """Base interface to cachetools.Cache for use with large-image.""" + + def __init__(self, *args, getsizeof=None, **kwargs): + super().__init__(*args, getsizeof=getsizeof, **kwargs) + self.lastError = {} + self.throttleErrors = 10 # seconds between logging errors + + def logError(self, err, func, msg): + """ + Log errors, but throttle them so as not to spam the logs. + + :param err: error to log. + :param func: function to use for logging. This is something like + logprint.exception or logger.error. + :param msg: the message to log. + """ + curtime = time.time() + key = (err, func) + if (curtime - self.lastError.get(key, {}).get('time', 0) > self.throttleErrors): + skipped = self.lastError.get(key, {}).get('skipped', 0) + if skipped: + msg += ' (%d similar messages)' % skipped + self.lastError[key] = {'time': curtime, 'skipped': 0} + func(msg) + else: + self.lastError[key]['skipped'] += 1 + + def __repr__(self): + raise NotImplementedError + + def __iter__(self): + raise NotImplementedError + + def __len__(self): + raise NotImplementedError + + def __contains__(self, key): + raise NotImplementedError + + def __delitem__(self, key): + raise NotImplementedError + + def _hashKey(self, key): + return hashlib.sha256(key.encode()).hexdigest() + + def __getitem__(self, key): + # hashedKey = self._hashKey(key) + raise NotImplementedError + + def __setitem__(self, key, value): + # hashedKey = self._hashKey(key) + raise NotImplementedError + + @property + def curritems(self): + raise NotImplementedError + + @property + def currsize(self): + raise NotImplementedError + + @property + def maxsize(self): + raise NotImplementedError + + def clear(self): + raise NotImplementedError + + @staticmethod + def getCache(): + # return cache, cacheLock + raise NotImplementedError diff --git a/large_image/cache_util/cachefactory.py b/large_image/cache_util/cachefactory.py index ffdd8f461..eca594724 100644 --- a/large_image/cache_util/cachefactory.py +++ b/large_image/cache_util/cachefactory.py @@ -14,9 +14,9 @@ # limitations under the License. ############################################################################# - import math import threading +from collections import OrderedDict import cachetools @@ -25,6 +25,11 @@ except ImportError: psutil = None +try: + from importlib.metadata import entry_points +except ImportError: + from importlib_metadata import entry_points + from .. import config try: @@ -32,6 +37,33 @@ except ImportError: MemCache = None +availableCaches = OrderedDict() + + +def loadCaches(entryPointName='large_image.cache', sourceDict=availableCaches): + """ + Load all caches from entrypoints and add them to the + availableCaches dictionary. + + :param entryPointName: the name of the entry points to load. + :param sourceDict: a dictionary to populate with the loaded caches. + """ + epoints = entry_points() + if entryPointName in epoints: + for entryPoint in epoints[entryPointName]: + try: + cacheClass = entryPoint.load() + sourceDict[entryPoint.name.lower()] = cacheClass + config.getConfig('logprint').debug('Loaded cache %s' % entryPoint.name) + except Exception: + config.getConfig('logprint').exception( + 'Failed to load cache %s' % entryPoint.name) + # Load memcached last for now + if MemCache is not None: + # TODO: put this in an entry point for a new package + availableCaches['memcached'] = MemCache + # NOTE: `python` cache is viewed as a fallback and isn't listed in `availableCaches` + def pickAvailableCache(sizeEach, portion=8, maxItems=None, cacheName=None): """ @@ -89,32 +121,23 @@ def getCacheSize(self, numItems, cacheName=None): return numItems def getCache(self, numItems=None, cacheName=None): + loadCaches() # memcached is the fallback default, if available. - cacheBackend = config.getConfig('cache_backend', 'python') + cacheBackend = config.getConfig('cache_backend', None) + + if cacheBackend is None and len(availableCaches): + cacheBackend = next(iter(availableCaches)) + config.getConfig('logprint').info('Automatically setting `%s` as cache_backend from availableCaches' % cacheBackend) + config.setConfig('cache_backend', cacheBackend) + if cacheBackend: cacheBackend = str(cacheBackend).lower() + cache = None - if cacheBackend == 'memcached' and MemCache and numItems is None: - # lock needed because pylibmc(memcached client) is not threadsafe - cacheLock = threading.Lock() + # TODO: why have this numItems check? + if numItems is None and cacheBackend in availableCaches: + cache, cacheLock = availableCaches[cacheBackend].getCache() - # check if credentials and location exist, otherwise assume - # location is 127.0.0.1 (localhost) with no password - url = config.getConfig('cache_memcached_url') - if not url: - url = '127.0.0.1' - memcachedUsername = config.getConfig('cache_memcached_username') - if not memcachedUsername: - memcachedUsername = None - memcachedPassword = config.getConfig('cache_memcached_password') - if not memcachedPassword: - memcachedPassword = None - try: - cache = MemCache(url, memcachedUsername, memcachedPassword, - mustBeAvailable=True) - except Exception: - config.getConfig('logger').info('Cannot use memcached for caching.') - cache = None if cache is None: # fallback backend cacheBackend = 'python' cache = cachetools.LRUCache(self.getCacheSize(numItems, cacheName=cacheName)) diff --git a/large_image/cache_util/memcache.py b/large_image/cache_util/memcache.py index a3088b12c..2d75b019c 100644 --- a/large_image/cache_util/memcache.py +++ b/large_image/cache_util/memcache.py @@ -14,15 +14,14 @@ # limitations under the License. ############################################################################# -import hashlib +import threading import time -import cachetools - from .. import config +from .base import BaseCache -class MemCache(cachetools.Cache): +class MemCache(BaseCache): """Use memcached as the backing cache.""" def __init__(self, url='127.0.0.1', username=None, password=None, @@ -56,8 +55,6 @@ def __init__(self, url='127.0.0.1', username=None, password=None, self._client['large_image_cache_test'] = time.time() self._clientParams = (url, dict( binary=True, username=username, password=password, behaviors=behaviors)) - self.lastError = {} - self.throttleErrors = 10 # seconds between logging errors def __repr__(self): return "Memcache doesn't list its keys" @@ -75,31 +72,11 @@ def __contains__(self, key): return None def __delitem__(self, key): - hashedKey = hashlib.sha256(key.encode()).hexdigest() + hashedKey = self._hashKey(key) del self._client[hashedKey] - def logError(self, err, func, msg): - """ - Log errors, but throttle them so as not to spam the logs. - - :param err: error to log. - :param func: function to use for logging. This is something like - logprint.exception or logger.error. - :param msg: the message to log. - """ - curtime = time.time() - key = (err, func) - if (curtime - self.lastError.get(key, {}).get('time', 0) > self.throttleErrors): - skipped = self.lastError.get(key, {}).get('skipped', 0) - if skipped: - msg += ' (%d similar messages)' % skipped - self.lastError[key] = {'time': curtime, 'skipped': 0} - func(msg) - else: - self.lastError[key]['skipped'] += 1 - def __getitem__(self, key): - hashedKey = hashlib.sha256(key.encode()).hexdigest() + hashedKey = self._hashKey(key) try: return self._client[hashedKey] except KeyError: @@ -114,7 +91,7 @@ def __getitem__(self, key): return self.__missing__(key) def __setitem__(self, key, value): - hashedKey = hashlib.sha256(key.encode()).hexdigest() + hashedKey = self._hashKey(key) try: self._client[hashedKey] = value except (TypeError, KeyError) as exc: @@ -166,3 +143,27 @@ def _getStat(self, key): def clear(self): self._client.flush_all() + + @staticmethod + def getCache(): + # lock needed because pylibmc(memcached client) is not threadsafe + cacheLock = threading.Lock() + + # check if credentials and location exist, otherwise assume + # location is 127.0.0.1 (localhost) with no password + url = config.getConfig('cache_memcached_url') + if not url: + url = '127.0.0.1' + memcachedUsername = config.getConfig('cache_memcached_username') + if not memcachedUsername: + memcachedUsername = None + memcachedPassword = config.getConfig('cache_memcached_password') + if not memcachedPassword: + memcachedPassword = None + try: + cache = MemCache(url, memcachedUsername, memcachedPassword, + mustBeAvailable=True) + except Exception: + config.getConfig('logger').info('Cannot use memcached for caching.') + cache = None + return cache, cacheLock diff --git a/large_image/config.py b/large_image/config.py index e6b61b99a..6573f9d1c 100644 --- a/large_image/config.py +++ b/large_image/config.py @@ -12,7 +12,7 @@ 'logprint': fallbackLogger, # For tiles - 'cache_backend': 'python', # 'python' or 'memcached' + 'cache_backend': None, # 'python' or 'memcached' # 'python' cache can use 1/(val) of the available memory 'cache_python_memory_portion': 32, # cache_memcached_url may be a list From 0ed2b36618a6a285c4bb87a0c79cf7bcccfdcc50 Mon Sep 17 00:00:00 2001 From: banesullivan Date: Thu, 16 Jun 2022 13:12:22 -0600 Subject: [PATCH 2/4] Cleanup --- large_image/cache_util/base.py | 11 +++--- large_image/cache_util/cache.py | 6 ++- large_image/cache_util/cachefactory.py | 51 +++++++++++++++----------- large_image/cache_util/memcache.py | 4 +- 4 files changed, 42 insertions(+), 30 deletions(-) diff --git a/large_image/cache_util/base.py b/large_image/cache_util/base.py index 46fd059da..1eaa471a8 100644 --- a/large_image/cache_util/base.py +++ b/large_image/cache_util/base.py @@ -1,4 +1,5 @@ import hashlib +import threading import time import cachetools @@ -39,13 +40,13 @@ def __iter__(self): raise NotImplementedError def __len__(self): - raise NotImplementedError + raise NotImplementedError def __contains__(self, key): raise NotImplementedError def __delitem__(self, key): - raise NotImplementedError + raise NotImplementedError def _hashKey(self, key): return hashlib.sha256(key.encode()).hexdigest() @@ -68,12 +69,12 @@ def currsize(self): @property def maxsize(self): - raise NotImplementedError + raise NotImplementedError def clear(self): - raise NotImplementedError + raise NotImplementedError @staticmethod - def getCache(): + def getCache() -> tuple['BaseCache', threading.Lock]: # return cache, cacheLock raise NotImplementedError diff --git a/large_image/cache_util/cache.py b/large_image/cache_util/cache.py index 790071061..8ae303a2c 100644 --- a/large_image/cache_util/cache.py +++ b/large_image/cache_util/cache.py @@ -142,7 +142,11 @@ def __new__(metacls, name, bases, namespace, **kwargs): # noqa - N804 cacheName = cls if LruCacheMetaclass.namedCaches.get(cacheName) is None: - cache, cacheLock = CacheFactory().getCache(maxSize, cacheName=cacheName) + cache, cacheLock = CacheFactory().getCache( + numItems=maxSize, + cacheName=cacheName, + inProcess=True, + ) LruCacheMetaclass.namedCaches[cacheName] = (cache, cacheLock) config.getConfig('logger').debug( 'Created LRU Cache for %r with %d maximum size' % (cacheName, cache.maxsize)) diff --git a/large_image/cache_util/cachefactory.py b/large_image/cache_util/cachefactory.py index eca594724..21da6bfbd 100644 --- a/large_image/cache_util/cachefactory.py +++ b/large_image/cache_util/cachefactory.py @@ -16,7 +16,6 @@ import math import threading -from collections import OrderedDict import cachetools @@ -37,10 +36,12 @@ except ImportError: MemCache = None -availableCaches = OrderedDict() +# DO NOT MANUALLY ADD ANYTHING TO `_availableCaches` +# use entrypoints and let loadCaches fill in `_availableCaches` +_availableCaches = {} -def loadCaches(entryPointName='large_image.cache', sourceDict=availableCaches): +def loadCaches(entryPointName='large_image.cache', sourceDict=_availableCaches): """ Load all caches from entrypoints and add them to the availableCaches dictionary. @@ -48,20 +49,23 @@ def loadCaches(entryPointName='large_image.cache', sourceDict=availableCaches): :param entryPointName: the name of the entry points to load. :param sourceDict: a dictionary to populate with the loaded caches. """ + if len(_availableCaches): + return epoints = entry_points() if entryPointName in epoints: for entryPoint in epoints[entryPointName]: try: cacheClass = entryPoint.load() sourceDict[entryPoint.name.lower()] = cacheClass - config.getConfig('logprint').debug('Loaded cache %s' % entryPoint.name) + config.getConfig('logprint').debug(f'Loaded cache {entryPoint.name}') except Exception: config.getConfig('logprint').exception( - 'Failed to load cache %s' % entryPoint.name) + f'Failed to load cache {entryPoint.name}' + ) # Load memcached last for now if MemCache is not None: # TODO: put this in an entry point for a new package - availableCaches['memcached'] = MemCache + _availableCaches['memcached'] = MemCache # NOTE: `python` cache is viewed as a fallback and isn't listed in `availableCaches` @@ -120,29 +124,32 @@ def getCacheSize(self, numItems, cacheName=None): pass return numItems - def getCache(self, numItems=None, cacheName=None): + def getCache(self, numItems=None, cacheName=None, inProcess=False): loadCaches() - # memcached is the fallback default, if available. - cacheBackend = config.getConfig('cache_backend', None) - if cacheBackend is None and len(availableCaches): - cacheBackend = next(iter(availableCaches)) - config.getConfig('logprint').info('Automatically setting `%s` as cache_backend from availableCaches' % cacheBackend) - config.setConfig('cache_backend', cacheBackend) + # Default to `python` cache for inProcess + cacheBackend = config.getConfig('cache_backend', 'python' if inProcess else None) - if cacheBackend: - cacheBackend = str(cacheBackend).lower() + # set the config cache_backend if caches available and not set + if cacheBackend is None and len(_availableCaches): + cacheBackend = next(iter(_availableCaches)) + config.getConfig('logprint').info( + f'Automatically setting `{cacheBackend}` as cache_backend from availableCaches' + ) + config.setConfig('cache_backend', cacheBackend) - cache = None - # TODO: why have this numItems check? - if numItems is None and cacheBackend in availableCaches: - cache, cacheLock = availableCaches[cacheBackend].getCache() + if isinstance(cacheBackend, str): + cacheBackend = cacheBackend.lower() - if cache is None: # fallback backend + if not inProcess and cacheBackend in _availableCaches: + cache, cacheLock = _availableCaches[cacheBackend].getCache() + else: # fallback backend or inProcess cacheBackend = 'python' cache = cachetools.LRUCache(self.getCacheSize(numItems, cacheName=cacheName)) cacheLock = threading.Lock() - if numItems is None and not CacheFactory.logged: - config.getConfig('logprint').info('Using %s for large_image caching' % cacheBackend) + + if not inProcess and not CacheFactory.logged: + config.getConfig('logprint').info(f'Using {cacheBackend} for large_image caching') CacheFactory.logged = True + return cache, cacheLock diff --git a/large_image/cache_util/memcache.py b/large_image/cache_util/memcache.py index 2d75b019c..21dbc8821 100644 --- a/large_image/cache_util/memcache.py +++ b/large_image/cache_util/memcache.py @@ -145,7 +145,7 @@ def clear(self): self._client.flush_all() @staticmethod - def getCache(): + def getCache() -> tuple['MemCache', threading.Lock]: # lock needed because pylibmc(memcached client) is not threadsafe cacheLock = threading.Lock() @@ -162,7 +162,7 @@ def getCache(): memcachedPassword = None try: cache = MemCache(url, memcachedUsername, memcachedPassword, - mustBeAvailable=True) + mustBeAvailable=True) except Exception: config.getConfig('logger').info('Cannot use memcached for caching.') cache = None From 9850cbe52373488d1bd7874d1653bcb76aa24b1e Mon Sep 17 00:00:00 2001 From: banesullivan Date: Thu, 16 Jun 2022 16:48:21 -0600 Subject: [PATCH 3/4] Fix type annotation --- large_image/cache_util/base.py | 3 ++- large_image/cache_util/memcache.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/large_image/cache_util/base.py b/large_image/cache_util/base.py index 1eaa471a8..51797c9b0 100644 --- a/large_image/cache_util/base.py +++ b/large_image/cache_util/base.py @@ -1,6 +1,7 @@ import hashlib import threading import time +from typing import Tuple import cachetools @@ -75,6 +76,6 @@ def clear(self): raise NotImplementedError @staticmethod - def getCache() -> tuple['BaseCache', threading.Lock]: + def getCache() -> Tuple['BaseCache', threading.Lock]: # return cache, cacheLock raise NotImplementedError diff --git a/large_image/cache_util/memcache.py b/large_image/cache_util/memcache.py index 21dbc8821..98005501e 100644 --- a/large_image/cache_util/memcache.py +++ b/large_image/cache_util/memcache.py @@ -16,6 +16,7 @@ import threading import time +from typing import Tuple from .. import config from .base import BaseCache @@ -145,7 +146,7 @@ def clear(self): self._client.flush_all() @staticmethod - def getCache() -> tuple['MemCache', threading.Lock]: + def getCache() -> Tuple['MemCache', threading.Lock]: # lock needed because pylibmc(memcached client) is not threadsafe cacheLock = threading.Lock() From 83dc6f101bd0f1c1dc0d5d660459ad9d870f259a Mon Sep 17 00:00:00 2001 From: banesullivan Date: Thu, 16 Jun 2022 17:10:33 -0600 Subject: [PATCH 4/4] Fix getCache when improperly configured --- large_image/cache_util/cachefactory.py | 35 +++++++++++++++++++------- large_image/exceptions.py | 8 ++++++ 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/large_image/cache_util/cachefactory.py b/large_image/cache_util/cachefactory.py index 21da6bfbd..836639ded 100644 --- a/large_image/cache_util/cachefactory.py +++ b/large_image/cache_util/cachefactory.py @@ -30,6 +30,7 @@ from importlib_metadata import entry_points from .. import config +from ..exceptions import TileCacheError try: from .memcache import MemCache @@ -100,6 +101,26 @@ def pickAvailableCache(sizeEach, portion=8, maxItems=None, cacheName=None): return numItems +def getFirstAvailableCache(): + cacheBackend = config.getConfig('cache_backend', None) + if cacheBackend is not None: + raise ValueError('cache_backend already set') + loadCaches() + cache, cacheLock = None, None + for cacheBackend in _availableCaches: + try: + cache, cacheLock = _availableCaches[cacheBackend].getCache() + break + except TileCacheError: + continue + if cache is not None: + config.getConfig('logprint').info( + f'Automatically setting `{cacheBackend}` as cache_backend from availableCaches' + ) + config.setConfig('cache_backend', cacheBackend) + return cache, cacheLock + + class CacheFactory: logged = False @@ -130,20 +151,16 @@ def getCache(self, numItems=None, cacheName=None, inProcess=False): # Default to `python` cache for inProcess cacheBackend = config.getConfig('cache_backend', 'python' if inProcess else None) - # set the config cache_backend if caches available and not set - if cacheBackend is None and len(_availableCaches): - cacheBackend = next(iter(_availableCaches)) - config.getConfig('logprint').info( - f'Automatically setting `{cacheBackend}` as cache_backend from availableCaches' - ) - config.setConfig('cache_backend', cacheBackend) - if isinstance(cacheBackend, str): cacheBackend = cacheBackend.lower() + cache = None if not inProcess and cacheBackend in _availableCaches: cache, cacheLock = _availableCaches[cacheBackend].getCache() - else: # fallback backend or inProcess + elif not inProcess and cacheBackend is None: + cache, cacheLock = getFirstAvailableCache() + + if cache is None: # fallback backend or inProcess cacheBackend = 'python' cache = cachetools.LRUCache(self.getCacheSize(numItems, cacheName=cacheName)) cacheLock = threading.Lock() diff --git a/large_image/exceptions.py b/large_image/exceptions.py index 8fe0683be..3c9911ecd 100644 --- a/large_image/exceptions.py +++ b/large_image/exceptions.py @@ -26,6 +26,14 @@ def __init__(self, *args, **kwargs): return super().__init__(errno.ENOENT, *args, **kwargs) +class TileCacheError(TileGeneralError): + pass + + +class TileCacheConfigurationError(TileCacheError): + pass + + TileGeneralException = TileGeneralError TileSourceException = TileSourceError TileSourceAssetstoreException = TileSourceAssetstoreError