Skip to content

Commit

Permalink
Merge pull request #904 from lsst/tickets/DM-41117
Browse files Browse the repository at this point in the history
DM-41117: Improvements to caching and Butler initialization
  • Loading branch information
andy-slac authored Nov 14, 2023
2 parents faf02a0 + 922359c commit c817402
Show file tree
Hide file tree
Showing 23 changed files with 1,394 additions and 716 deletions.
5 changes: 5 additions & 0 deletions python/lsst/daf/butler/_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,11 @@ def get_known_repos(cls) -> set[str]:
"""
return ButlerRepoIndex.get_known_repos()

@abstractmethod
def _caching_context(self) -> AbstractContextManager[None]:
"""Context manager that enables caching."""
raise NotImplementedError()

@abstractmethod
def transaction(self) -> AbstractContextManager[None]:
"""Context manager supporting `Butler` transactions.
Expand Down
4 changes: 2 additions & 2 deletions python/lsst/daf/butler/_named.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ def freeze(self) -> NamedKeyMapping[K, V]:
to a new variable (and considering any previous references
invalidated) should allow for more accurate static type checking.
"""
if not isinstance(self._dict, MappingProxyType):
if not isinstance(self._dict, MappingProxyType): # type: ignore[unreachable]
self._dict = MappingProxyType(self._dict) # type: ignore
return self

Expand Down Expand Up @@ -578,7 +578,7 @@ def freeze(self) -> NamedValueAbstractSet[K]:
to a new variable (and considering any previous references
invalidated) should allow for more accurate static type checking.
"""
if not isinstance(self._mapping, MappingProxyType):
if not isinstance(self._mapping, MappingProxyType): # type: ignore[unreachable]
self._mapping = MappingProxyType(self._mapping) # type: ignore
return self

Expand Down
4 changes: 4 additions & 0 deletions python/lsst/daf/butler/_registry_shim.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ def refresh(self) -> None:
# Docstring inherited from a base class.
self._registry.refresh()

def caching_context(self) -> contextlib.AbstractContextManager[None]:
# Docstring inherited from a base class.
return self._butler._caching_context()

@contextlib.contextmanager
def transaction(self, *, savepoint: bool = False) -> Iterator[None]:
# Docstring inherited from a base class.
Expand Down
4 changes: 4 additions & 0 deletions python/lsst/daf/butler/direct_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,10 @@ def isWriteable(self) -> bool:
# Docstring inherited.
return self._registry.isWriteable()

def _caching_context(self) -> contextlib.AbstractContextManager[None]:
"""Context manager that enables caching."""
return self._registry.caching_context()

@contextlib.contextmanager
def transaction(self) -> Iterator[None]:
"""Context manager supporting `Butler` transactions.
Expand Down
4 changes: 2 additions & 2 deletions python/lsst/daf/butler/persistence_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import uuid
from collections.abc import Callable, Hashable
from contextvars import Context, ContextVar, Token, copy_context
from typing import TYPE_CHECKING, ParamSpec, TypeVar, cast
from typing import TYPE_CHECKING, ParamSpec, TypeVar

if TYPE_CHECKING:
from ._dataset_ref import DatasetRef
Expand Down Expand Up @@ -198,4 +198,4 @@ def run(self, function: Callable[_Q, _T], *args: _Q.args, **kwargs: _Q.kwargs) -
# cast the result as we know this is exactly what the return type will
# be.
result = self._ctx.run(self._functionRunner, function, *args, **kwargs) # type: ignore
return cast(_T, result)
return result
79 changes: 79 additions & 0 deletions python/lsst/daf/butler/registry/_caching_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# This file is part of daf_butler.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (http://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This software is dual licensed under the GNU General Public License and also
# under a 3-clause BSD license. Recipients may choose which of these licenses
# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
# respectively. If you choose the GPL option then the following text applies
# (but note that there is still no warranty even if you opt for BSD instead):
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from __future__ import annotations

__all__ = ["CachingContext"]

from typing import TYPE_CHECKING

from ._collection_record_cache import CollectionRecordCache
from ._collection_summary_cache import CollectionSummaryCache
from ._dataset_type_cache import DatasetTypeCache

if TYPE_CHECKING:
from .interfaces import DatasetRecordStorage


class CachingContext:
"""Collection of caches for various types of records retrieved from
database.
Notes
-----
Caching is usually disabled for most of the record types, but it can be
explicitly and temporarily enabled in some context (e.g. quantum graph
building) using Registry method. This class is a collection of cache
instances which will be `None` when caching is disabled. Instance of this
class is passed to the relevant managers that can use it to query or
populate caches when caching is enabled.
Dataset type cache is always enabled for now, this avoids the need for
explicitly enabling caching in pipetask executors.
"""

collection_records: CollectionRecordCache | None = None
"""Cache for collection records (`CollectionRecordCache`)."""

collection_summaries: CollectionSummaryCache | None = None
"""Cache for collection summary records (`CollectionSummaryCache`)."""

dataset_types: DatasetTypeCache[DatasetRecordStorage]
"""Cache for dataset types, never disabled (`DatasetTypeCache`)."""

def __init__(self) -> None:
self.dataset_types = DatasetTypeCache()

def enable(self) -> None:
"""Enable caches, initializes all caches."""
self.collection_records = CollectionRecordCache()
self.collection_summaries = CollectionSummaryCache()

def disable(self) -> None:
"""Disable caches, sets all caches to `None`."""
self.collection_records = None
self.collection_summaries = None
165 changes: 165 additions & 0 deletions python/lsst/daf/butler/registry/_collection_record_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
# This file is part of daf_butler.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (http://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This software is dual licensed under the GNU General Public License and also
# under a 3-clause BSD license. Recipients may choose which of these licenses
# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
# respectively. If you choose the GPL option then the following text applies
# (but note that there is still no warranty even if you opt for BSD instead):
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from __future__ import annotations

__all__ = ("CollectionRecordCache",)

from collections.abc import Iterable, Iterator
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
from .interfaces import CollectionRecord


class CollectionRecordCache:
"""Cache for collection records.
Notes
-----
This class stores collection records and can retrieve them using either
collection name or collection key. One complication is that key type can be
either collection name or a distinct integer value. To optimize storage
when the key is the same as collection name, this class only stores key to
record mapping when key is of a non-string type.
In come contexts (e.g. ``resolve_wildcard``) a full list of collections is
needed. To signify that cache content can be used in such contexts, cache
defines special ``full`` flag that needs to be set by client.
"""

def __init__(self) -> None:
self._by_name: dict[str, CollectionRecord] = {}
# This dict is only used for records whose key type is not str.
self._by_key: dict[Any, CollectionRecord] = {}
self._full = False

@property
def full(self) -> bool:
"""`True` if cache holds all known collection records (`bool`)."""
return self._full

def add(self, record: CollectionRecord) -> None:
"""Add one record to the cache.
Parameters
----------
record : `CollectionRecord`
Collection record, replaces any existing record with the same name
or key.
"""
# In case we replace same record name with different key, find the
# existing record and drop its key.
if (old_record := self._by_name.get(record.name)) is not None:
self._by_key.pop(old_record.key)
if (old_record := self._by_key.get(record.key)) is not None:
self._by_name.pop(old_record.name)
self._by_name[record.name] = record
if not isinstance(record.key, str):
self._by_key[record.key] = record

def set(self, records: Iterable[CollectionRecord], *, full: bool = False) -> None:
"""Replace cache contents with the new set of records.
Parameters
----------
records : `~collections.abc.Iterable` [`CollectionRecord`]
Collection records.
full : `bool`
If `True` then ``records`` contain all known collection records.
"""
self.clear()
for record in records:
self._by_name[record.name] = record
if not isinstance(record.key, str):
self._by_key[record.key] = record
self._full = full

def clear(self) -> None:
"""Remove all records from the cache."""
self._by_name = {}
self._by_key = {}
self._full = False

def discard(self, record: CollectionRecord) -> None:
"""Remove single record from the cache.
Parameters
----------
record : `CollectionRecord`
Collection record to remove.
"""
self._by_name.pop(record.name, None)
if not isinstance(record.key, str):
self._by_key.pop(record.key, None)

def get_by_name(self, name: str) -> CollectionRecord | None:
"""Return collection record given its name.
Parameters
----------
name : `str`
Collection name.
Returns
-------
record : `CollectionRecord` or `None`
Collection record, `None` is returned if the name is not in the
cache.
"""
return self._by_name.get(name)

def get_by_key(self, key: Any) -> CollectionRecord | None:
"""Return collection record given its key.
Parameters
----------
key : `Any`
Collection key.
Returns
-------
record : `CollectionRecord` or `None`
Collection record, `None` is returned if the key is not in the
cache.
"""
if isinstance(key, str):
return self._by_name.get(key)
return self._by_key.get(key)

def records(self) -> Iterator[CollectionRecord]:
"""Return iterator for the set of records in the cache, can only be
used if `full` is true.
Raises
------
RuntimeError
Raised if ``self.full`` is `False`.
"""
if not self._full:
raise RuntimeError("cannot call records() if cache is not full")
return iter(self._by_name.values())
86 changes: 86 additions & 0 deletions python/lsst/daf/butler/registry/_collection_summary_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# This file is part of daf_butler.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (http://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This software is dual licensed under the GNU General Public License and also
# under a 3-clause BSD license. Recipients may choose which of these licenses
# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
# respectively. If you choose the GPL option then the following text applies
# (but note that there is still no warranty even if you opt for BSD instead):
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from __future__ import annotations

__all__ = ("CollectionSummaryCache",)

from collections.abc import Iterable, Mapping
from typing import Any

from ._collection_summary import CollectionSummary


class CollectionSummaryCache:
"""Cache for collection summaries.
Notes
-----
This class stores `CollectionSummary` records indexed by collection keys.
For cache to be usable the records that are given to `update` method have
to include all dataset types, i.e. the query that produces records should
not be constrained by dataset type.
"""

def __init__(self) -> None:
self._cache: dict[Any, CollectionSummary] = {}

def update(self, summaries: Mapping[Any, CollectionSummary]) -> None:
"""Add records to the cache.
Parameters
----------
summaries : `~collections.abc.Mapping` [`Any`, `CollectionSummary`]
Summary records indexed by collection key, records must include all
dataset types.
"""
self._cache.update(summaries)

def find_summaries(self, keys: Iterable[Any]) -> tuple[dict[Any, CollectionSummary], set[Any]]:
"""Return summary records given a set of keys.
Parameters
----------
keys : `~collections.abc.Iterable` [`Any`]
Sequence of collection keys.
Returns
-------
summaries : `dict` [`Any`, `CollectionSummary`]
Dictionary of summaries indexed by collection keys, includes
records found in the cache.
missing_keys : `set` [`Any`]
Collection keys that are not present in the cache.
"""
found = {}
not_found = set()
for key in keys:
if (summary := self._cache.get(key)) is not None:
found[key] = summary
else:
not_found.add(key)
return found, not_found
Loading

0 comments on commit c817402

Please sign in to comment.