Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more bang-string-key functionality #33

Merged
merged 10 commits into from
Feb 19, 2024
3 changes: 2 additions & 1 deletion astar_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-

from .nested_mapping import NestedMapping
from .nested_mapping import (NestedMapping, RecursiveNestedMapping,
NestedChainMap, is_bangkey, is_nested_mapping)
from .unique_list import UniqueList
from .badges import Badge, BadgeReport
from .loggers import get_logger, get_astar_logger
213 changes: 156 additions & 57 deletions astar_utils/nested_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@

from typing import TextIO, Optional, Union, Any
from io import StringIO
from collections.abc import (Iterable, Iterator, Collection, Sequence, Mapping,
MutableMapping)
from collections import abc, ChainMap

from more_itertools import ilen

Expand All @@ -13,40 +12,40 @@
logger = get_logger(__name__)


class NestedMapping(MutableMapping):
class NestedMapping(abc.MutableMapping):
# TODO: improve docstring
"""Dictionary-like structure that supports nested !-bang string keys."""

def __init__(self, new_dict: Optional[Iterable] = None,
def __init__(self, new_dict: Optional[abc.Iterable] = None,
title: Optional[str] = None):
self.dic: MutableMapping[str, Any] = {}
self.dic: abc.MutableMapping[str, Any] = {}
self._title = title
if isinstance(new_dict, MutableMapping):
if isinstance(new_dict, abc.MutableMapping):
self.update(new_dict)
elif isinstance(new_dict, Iterable):
elif isinstance(new_dict, abc.Iterable):
for entry in new_dict:
self.update(entry)

def update(self, new_dict: MutableMapping[str, Any]) -> None:
def update(self, new_dict: abc.MutableMapping[str, Any]) -> None:
if isinstance(new_dict, NestedMapping):
new_dict = new_dict.dic # Avoid updating with another one

# TODO: why do we check for dict here but not in the else?
if isinstance(new_dict, Mapping) and "alias" in new_dict:
if isinstance(new_dict, abc.Mapping) and "alias" in new_dict:
alias = new_dict["alias"]
propdict = new_dict.get("properties", {})
if alias in self.dic:
self.dic[alias] = recursive_update(self.dic[alias], propdict)
else:
self.dic[alias] = propdict
elif isinstance(new_dict, Sequence):
elif isinstance(new_dict, abc.Sequence):
# To catch list of tuples
self.update(dict([new_dict]))
else:
# Catch any bang-string properties keys
to_pop = []
for key in new_dict:
if key.startswith("!"):
if is_bangkey(key):
logger.debug(
"Bang-string key %s was seen in .update. This should "
"not occur outside mocking in testing!", key)
Expand All @@ -60,49 +59,55 @@ def update(self, new_dict: MutableMapping[str, Any]) -> None:

def __getitem__(self, key: str):
"""x.__getitem__(y) <==> x[y]."""
if isinstance(key, str) and key.startswith("!"):
key_chunks = self._split_subkey(key)
entry = self.dic
for chunk in key_chunks:
self._guard_submapping(
entry, key_chunks[:key_chunks.index(chunk)], "get")
try:
entry = entry[chunk]
except KeyError as err:
raise KeyError(key) from err
return entry
return self.dic[key]
if not is_bangkey(key):
return self.dic[key]

key_chunks = self._split_subkey(key)
entry = self.dic
for chunk in key_chunks:
self._guard_submapping(
entry, key_chunks[:key_chunks.index(chunk)], "get")
try:
entry = entry[chunk]
except KeyError as err:
raise KeyError(key) from err

if is_nested_mapping(entry):
return self.__class__(entry)
return entry

def __setitem__(self, key: str, value) -> None:
"""Set self[key] to value."""
if isinstance(key, str) and key.startswith("!"):
*key_chunks, final_key = self._split_subkey(key)
entry = self.dic
for chunk in key_chunks:
if chunk not in entry:
entry[chunk] = {}
entry = entry[chunk]
self._guard_submapping(entry, key_chunks, "set")
entry[final_key] = value
else:
if not is_bangkey(key):
self.dic[key] = value
return

*key_chunks, final_key = self._split_subkey(key)
entry = self.dic
for chunk in key_chunks:
if chunk not in entry:
entry[chunk] = {}
entry = entry[chunk]
self._guard_submapping(entry, key_chunks, "set")
entry[final_key] = value

def __delitem__(self, key: str) -> None:
"""Delete self[key]."""
if isinstance(key, str) and key.startswith("!"):
*key_chunks, final_key = self._split_subkey(key)
entry = self.dic
for chunk in key_chunks:
self._guard_submapping(
entry, key_chunks[:key_chunks.index(chunk)], "del")
try:
entry = entry[chunk]
except KeyError as err:
raise KeyError(key) from err
self._guard_submapping(entry, key_chunks, "del")
del entry[final_key]
else:
if not is_bangkey(key):
del self.dic[key]
return

*key_chunks, final_key = self._split_subkey(key)
entry = self.dic
for chunk in key_chunks:
self._guard_submapping(
entry, key_chunks[:key_chunks.index(chunk)], "del")
try:
entry = entry[chunk]
except KeyError as err:
raise KeyError(key) from err
self._guard_submapping(entry, key_chunks, "del")
del entry[final_key]

@staticmethod
def _split_subkey(key: str):
Expand All @@ -118,7 +123,7 @@ def _guard_submapping(entry, key_chunks, kind: str = "get") -> None:
"set": "overwritten with a new sub-mapping",
"del": "be deleted from"}
submsg = kinds.get(kind, "modified")
if not isinstance(entry, Mapping):
if not isinstance(entry, abc.Mapping):
raise KeyError(
f"Bang-key '!{'.'.join(key_chunks)}' doesn't point to a sub-"
f"mapping but to a single value, which cannot be {submsg}. "
Expand All @@ -127,17 +132,17 @@ def _guard_submapping(entry, key_chunks, kind: str = "get") -> None:
"re-assign a new sub-mapping to the key.")

def _staggered_items(self, key: Union[str, None],
value: Mapping) -> Iterator[tuple[str, Any]]:
value: abc.Mapping) -> abc.Iterator[tuple[str, Any]]:
simple = []
for subkey, subvalue in value.items():
new_key = self._join_subkey(key, subkey)
if isinstance(subvalue, Mapping):
if isinstance(subvalue, abc.Mapping):
yield from self._staggered_items(new_key, subvalue)
else:
simple.append((new_key, subvalue))
yield from simple

def __iter__(self) -> Iterator[str]:
def __iter__(self) -> abc.Iterator[str]:
"""Implement iter(self)."""
yield from (item[0] for item in self._staggered_items(None, self.dic))

Expand All @@ -152,15 +157,15 @@ def _write_subkey(key: str, pre: str, final: bool, stream: TextIO) -> str:
stream.write(f"{newpre}{key}: ")
return newpre

def _write_subitems(self, items: Collection[tuple[str, Any]], pre: str,
def _write_subitems(self, items: abc.Collection[tuple[str, Any]], pre: str,
stream: TextIO, nested: bool = False
) -> list[tuple[str, Any]]:
# TODO: could this (and _write_subdict) use _staggered_items instead??
n_items = len(items)
simple: list[tuple[str, Any]] = []

for i_sub, (key, val) in enumerate(items):
is_super = isinstance(val, Mapping)
is_super = isinstance(val, abc.Mapping)
if not nested or is_super:
final = i_sub == n_items - 1 and not simple
newpre = self._write_subkey(key, pre, final, stream)
Expand All @@ -175,7 +180,7 @@ def _write_subitems(self, items: Collection[tuple[str, Any]], pre: str,

return simple

def _write_subdict(self, subdict: Mapping, stream: TextIO,
def _write_subdict(self, subdict: abc.Mapping, stream: TextIO,
pad: str = "") -> None:
pre = pad.replace("├─", "│ ").replace("└─", " ")
simple = self._write_subitems(subdict.items(), pre, stream, True)
Expand All @@ -202,21 +207,115 @@ def title(self) -> str:
"""Return title if set, or default to class name."""
return self._title or self.__class__.__name__

def _repr_pretty_(self, printer, cycle):
"""For ipython."""
if cycle:
printer.text("NestedMapping(...)")
else:
printer.text(str(self))


class RecursiveNestedMapping(NestedMapping):
"""Like NestedMapping but internally resolves any bang-string values.

In the event of an infinite loop of recursive bang-string keys pointing
back to each other, this should savely and quickly throw a
``RecursionError``.
"""

def __getitem__(self, key: str):
"""x.__getitem__(y) <==> x[y]."""
value = super().__getitem__(key)
while is_bangkey(value):
try:
value = self[value]
except KeyError:
return value
return value

@classmethod
def from_maps(cls, maps, key):
"""Yield instances from maps if key is found."""
for i, mapping in enumerate(maps):
if key in mapping:
# Don't use .get here to avoid chaining empty mappings
yield RecursiveNestedMapping(
mapping[key], title=f"[{i}] mapping")


class NestedChainMap(ChainMap):
"""Subclass of ``collections.ChainMap`` using ``RecursiveNestedMapping``.

Only overrides ``__getitem__`` to allow for both recursive bang-string keys
accross the individual mappings and to "collect" sub-mappings from the same
bang-key in multiple mappings into a new `NestedChainMap`.

Also overrides ``__str__`` and provides a ``_repr_pretty_`` for nice output
to an IPython console.

In the absence of any nested mappings or bang-string keys, this will work
like the base class ``collections.ChainMap``, meaning an ordinary ``dict``
can also be used as one of the individual mappings.

In the event of an infinite loop of recursive bang-string keys pointing
back to each other, this should savely and quickly throw a
``RecursionError``.
"""

def __getitem__(self, key):
"""x.__getitem__(y) <==> x[y]."""
value = super().__getitem__(key)

if isinstance(value, abc.Mapping):
submaps = tuple(RecursiveNestedMapping.from_maps(self.maps, key))
if len(submaps) == 1:
# Don't need the chain if it's just one...
return submaps[0]
return NestedChainMap(*submaps)

if is_bangkey(value):
value = self[value]
return value

def __str__(self):
"""Return str(self)."""
return "\n\n".join(str(mapping) for mapping in self.maps)

def _repr_pretty_(self, printer, cycle):
"""For ipython."""
if cycle:
printer.text("NestedChainMap(...)")
else:
printer.text(str(self))


def is_bangkey(key) -> bool:
"""Return ``True`` if the key is a ``str`` and starts with a "!"."""
return isinstance(key, str) and key.startswith("!")


def is_nested_mapping(mapping) -> bool:
"""Return ``True`` if `mapping` contains any further map as a value."""
if not isinstance(mapping, abc.Mapping):
return False
return any(isinstance(value, abc.Mapping) for value in mapping.values())


def recursive_update(old_dict: MutableMapping, new_dict: Mapping) -> MutableMapping:
def recursive_update(old_dict: abc.MutableMapping,
new_dict: abc.Mapping) -> abc.MutableMapping:
if new_dict is not None:
for key in new_dict:
if old_dict is not None and key in old_dict:
if isinstance(old_dict[key], Mapping):
if isinstance(new_dict[key], Mapping):
if isinstance(old_dict[key], abc.Mapping):
if isinstance(new_dict[key], abc.Mapping):
old_dict[key] = recursive_update(old_dict[key],
new_dict[key])
else:
logger.warning("Overwriting dict %s with non-dict: %s",
old_dict[key], new_dict[key])
old_dict[key] = new_dict[key]
else:
if isinstance(new_dict[key], Mapping):
if isinstance(new_dict[key], abc.Mapping):
logger.warning("Overwriting non-dict %s with dict: %s",
old_dict[key], new_dict[key])
old_dict[key] = new_dict[key]
Expand Down
Loading
Loading