Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optional file-based listings caching #895

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ Base Classes
fsspec.core.OpenFiles
fsspec.core.get_fs_token_paths
fsspec.core.url_to_fs
fsspec.dircache.DirCache
fsspec.dircache.DisabledListingsCache
fsspec.dircache.MemoryListingsCache
fsspec.dircache.FileListingsCache
fsspec.FSMap
fsspec.generic.GenericFileSystem
fsspec.registry.register_implementation
Expand Down Expand Up @@ -82,7 +84,13 @@ Base Classes

.. autofunction:: fsspec.core.url_to_fs

.. autoclass:: fsspec.dircache.DirCache
.. autoclass:: fsspec.dircache.DisabledListingsCache
:members: __init__

.. autoclass:: fsspec.dircache.MemoryListingsCache
:members: __init__

.. autoclass:: fsspec.dircache.FileListingsCache
:members: __init__

.. autoclass:: fsspec.FSMap
Expand Down
8 changes: 8 additions & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
Changelog
=========

Dev
--------

Enhancements

- add file-based listing cache using diskcache (#895)
warning: use new ``listings_cache_options`` instead of ``use_listings_cache`` etc.

2024.3.1
--------

Expand Down
29 changes: 20 additions & 9 deletions docs/source/features.rst
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,26 @@ Listings Caching
----------------

For some implementations, getting file listings (i.e., ``ls`` and anything that
depends on it) is expensive. These implementations use dict-like instances of
:class:`fsspec.dircache.DirCache` to manage the listings.

The cache allows for time-based expiry of entries with the ``listings_expiry_time``
parameter, or LRU expiry with the ``max_paths`` parameter. These can be
set on any implementation instance that uses listings caching; or to skip the
caching altogether, use ``use_listings_cache=False``. That would be appropriate
when the target location is known to be volatile because it is being written
to from other sources.
depends on it) is expensive. These implementations maye use either dict-like instances of
:class:`fsspec.dircache.MemoryListingsCache` or file-based caching with instances of
:class:`fsspec.dircache.FileListingsCache` to manage the listings.

The listings cache can be controlled via the keyword ``listings_cache_options`` which is a dictionary.
The type of cache that is used, can be controlled via the keyword ``cache_type`` (`disabled`, `memory` or `file`).
The cache allows for time-based expiry of entries with the keyword ``expiry_time``. If the target location is known to
be volatile because e.g. it is being written to from other sources we recommend to disable the listings cache.
If you want to use the file-based caching, you can also provide the argument
``directory`` to determine where the cache file is stored.

Example for ``listings_cache_options``:

.. code-block:: json
{
"cache_type": "file",
"expiry_time": 3600,
"directory": "/tmp/cache"
}
When the ``fsspec`` instance writes to the backend, the method ``invalidate_cache``
is called, so that subsequent listing of the given paths will force a refresh. In
Expand Down
12 changes: 10 additions & 2 deletions fsspec/asyn.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,15 +312,23 @@ class AsyncFileSystem(AbstractFileSystem):
mirror_sync_methods = True
disable_throttling = False

def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, **kwargs):
def __init__(
self,
*args,
asynchronous=False,
loop=None,
batch_size=None,
listings_cache_options=None,
**kwargs,
):
self.asynchronous = asynchronous
self._pid = os.getpid()
if not asynchronous:
self._loop = loop or get_loop()
else:
self._loop = None
self.batch_size = batch_size
super().__init__(*args, **kwargs)
super().__init__(listings_cache_options, *args, **kwargs)

@property
def loop(self):
Expand Down
163 changes: 143 additions & 20 deletions fsspec/dircache.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,45 @@
import logging
import time
from collections.abc import MutableMapping
from enum import Enum
from functools import lru_cache
from typing import Optional, Union

from fsspec.implementations.local import LocalFileSystem

class DirCache(MutableMapping):
logger = logging.getLogger(__name__)


class DisabledListingsCache(MutableMapping):
def __init__(self, *args, **kwargs):
pass

def __getitem__(self, item):
raise KeyError

def __setitem__(self, key, value):
pass

def __delitem__(self, key):
pass

def __iter__(self):
return iter(())

def __len__(self):
return 0

def clear(self):
pass

def __contains__(self, item):
return False

def __reduce__(self):
return (DisabledListingsCache, ())


class MemoryListingsCache(MutableMapping):
"""
Caching of directory listings, in a structure like::

Expand All @@ -26,19 +62,14 @@ class DirCache(MutableMapping):

def __init__(
self,
use_listings_cache=True,
gutzbenj marked this conversation as resolved.
Show resolved Hide resolved
listings_expiry_time=None,
expiry_time=None,
max_paths=None,
**kwargs,
):
"""

Parameters
----------
use_listings_cache: bool
If False, this cache never returns items, but always reports KeyError,
and setting items has no effect
listings_expiry_time: int or float (optional)
expiry_time: int or float (optional)
Time in seconds that a listing is considered valid. If None,
listings do not expire.
max_paths: int (optional)
Expand All @@ -49,15 +80,14 @@ def __init__(
self._times = {}
if max_paths:
self._q = lru_cache(max_paths + 1)(lambda key: self._cache.pop(key, None))
self.use_listings_cache = use_listings_cache
self.listings_expiry_time = listings_expiry_time
self.max_paths = max_paths
self._expiry_time = expiry_time
self._max_paths = max_paths

def __getitem__(self, item):
if self.listings_expiry_time is not None:
if self._times.get(item, 0) - time.time() < -self.listings_expiry_time:
if self._expiry_time is not None:
if self._times.get(item, 0) - time.time() < -self._expiry_time:
del self._cache[item]
if self.max_paths:
if self._max_paths:
self._q(item)
return self._cache[item] # maybe raises KeyError

Expand All @@ -75,12 +105,10 @@ def __contains__(self, item):
return False

def __setitem__(self, key, value):
if not self.use_listings_cache:
return
if self.max_paths:
if self._max_paths:
self._q(key)
self._cache[key] = value
if self.listings_expiry_time is not None:
if self._expiry_time is not None:
self._times[key] = time.time()

def __delitem__(self, key):
Expand All @@ -93,6 +121,101 @@ def __iter__(self):

def __reduce__(self):
return (
DirCache,
(self.use_listings_cache, self.listings_expiry_time, self.max_paths),
MemoryListingsCache,
(self._expiry_time, self._max_paths),
)


class FileListingsCache(MutableMapping):
def __init__(
self,
expiry_time: Optional[int],
directory: Optional[str],
):
"""

Parameters
----------
expiry_time: int or float (optional)
Time in seconds that a listing is considered valid. If None,
listings do not expire.
directory: str (optional)
Directory path at which the listings cache file is stored. If None,
an autogenerated path at the user folder is created.

"""
try:
import platformdirs
from diskcache import Cache
except ImportError as e:
raise ImportError(
"The optional dependencies ``platformdirs`` and ``diskcache`` are required for file-based dircache."
) from e

if not directory:
directory = platformdirs.user_cache_dir(appname="fsspec")
directory = f"{directory}/dircache/{str(expiry_time)}"

fs = LocalFileSystem()

try:
fs.mkdir(directory, create_parents=True)
except OSError as e:
logger.error(f"Directory for dircache could not be created at {directory}.")
raise e
else:
logger.info(f"Dircache located at {directory}.")

self._expiry_time = expiry_time
self._directory = directory
self._cache = Cache(directory=str(directory))

def __getitem__(self, item):
"""Draw item as fileobject from cache, retry if timeout occurs"""
return self._cache.get(key=item, read=True, retry=True)

def clear(self):
self._cache.clear()

def __len__(self):
return len(list(self._cache.iterkeys()))

def __contains__(self, item):
value = self._cache.get(item, retry=True) # None, if expired
if value:
return True
return False

def __setitem__(self, key, value):
self._cache.set(key=key, value=value, expire=self._expiry_time, retry=True)

def __delitem__(self, key):
del self._cache[key]

def __iter__(self):
return (k for k in self._cache.iterkeys() if k in self)

def __reduce__(self):
return (
FileListingsCache,
(self._expiry_time, self._directory),
)


class CacheType(Enum):
DISABLED = DisabledListingsCache
MEMORY = MemoryListingsCache
FILE = FileListingsCache


def create_listings_cache(
cache_type: CacheType,
expiry_time: Optional[int],
**kwargs,
) -> Optional[Union[MemoryListingsCache, FileListingsCache]]:
cache_map = {
CacheType.DISABLED: DisabledListingsCache,
CacheType.MEMORY: MemoryListingsCache,
CacheType.FILE: FileListingsCache,
}
return cache_map[cache_type](expiry_time, **kwargs)
47 changes: 33 additions & 14 deletions fsspec/implementations/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import logging
import re
import weakref
from copy import copy
from urllib.parse import urlparse

import aiohttp
Expand Down Expand Up @@ -58,6 +57,7 @@ def __init__(
client_kwargs=None,
get_client=get_client,
encoded=False,
listings_cache_options=None,
**storage_options,
):
"""
Expand All @@ -83,11 +83,39 @@ def __init__(
A callable which takes keyword arguments and constructs
an aiohttp.ClientSession. It's state will be managed by
the HTTPFileSystem class.
listings_cache_options: dict
Options for the listings cache.
storage_options: key-value
Any other parameters passed on to requests
cache_type, cache_options: defaults used in open
"""
super().__init__(self, asynchronous=asynchronous, loop=loop, **storage_options)
# TODO: remove in future release
# Clean caching-related parameters from `storage_options`
# before propagating them as `request_options` through `self.kwargs`.
old_listings_cache_kwargs = {
"use_listings_cache",
"listings_expiry_time",
"max_paths",
"skip_instance_cache",
}
# intersection of old_listings_cache_kwargs and storage_options
old_listings_cache_kwargs = old_listings_cache_kwargs.intersection(
storage_options
)
if old_listings_cache_kwargs:
logger.warning(
f"The following parameters are not used anymore and will be ignored: {old_listings_cache_kwargs}. "
f"Use new `listings_cache_options` instead."
)
for key in old_listings_cache_kwargs:
del storage_options[key]
super().__init__(
self,
asynchronous=asynchronous,
loop=loop,
listings_cache_options=listings_cache_options,
**storage_options,
)
self.block_size = block_size if block_size is not None else DEFAULT_BLOCK_SIZE
self.simple_links = simple_links
self.same_schema = same_scheme
Expand All @@ -96,19 +124,10 @@ def __init__(
self.client_kwargs = client_kwargs or {}
self.get_client = get_client
self.encoded = encoded
self.kwargs = storage_options
self._session = None

# Clean caching-related parameters from `storage_options`
# before propagating them as `request_options` through `self.kwargs`.
# TODO: Maybe rename `self.kwargs` to `self.request_options` to make
# it clearer.
request_options = copy(storage_options)
self.use_listings_cache = request_options.pop("use_listings_cache", False)
request_options.pop("listings_expiry_time", None)
request_options.pop("max_paths", None)
request_options.pop("skip_instance_cache", None)
Comment on lines -109 to -110
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about those args? I think they are not needed/used anymore.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to make sure not to leak any kwargs to downstream implementations, since some of them take any kwargs and assume they are to be propagated to, for example, the http session constructor. This could in principle be handled in _Cached.__call__, which happens before any implementation-specific __init__.

Copy link
Contributor Author

@gutzbenj gutzbenj Apr 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use the listings_cache_options specifically, they are only passed to the DirCache class in AbstractFileSystem class but do not go further down the hierarchy.

self.kwargs = request_options
self.kwargs = storage_options
self._session = None

@property
def fsid(self):
Expand Down Expand Up @@ -201,7 +220,7 @@ async def _ls_real(self, url, detail=True, **kwargs):
return sorted(out)

async def _ls(self, url, detail=True, **kwargs):
if self.use_listings_cache and url in self.dircache:
if url in self.dircache:
out = self.dircache[url]
else:
out = await self._ls_real(url, detail=detail, **kwargs)
Expand Down
Loading
Loading