Skip to content

Commit

Permalink
Merge pull request #33 from AstarVienna/fh/morebangstuff
Browse files Browse the repository at this point in the history
Add more bang-string-key functionality
  • Loading branch information
teutoburg committed Feb 19, 2024
2 parents 3905bde + ee44f80 commit b7fc50f
Show file tree
Hide file tree
Showing 3 changed files with 274 additions and 59 deletions.
3 changes: 2 additions & 1 deletion astar_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-

from .nested_mapping import NestedMapping
from .nested_mapping import (NestedMapping, RecursiveNestedMapping,
NestedChainMap, is_bangkey, is_nested_mapping)
from .unique_list import UniqueList
from .badges import Badge, BadgeReport
from .loggers import get_logger, get_astar_logger
213 changes: 156 additions & 57 deletions astar_utils/nested_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@

from typing import TextIO, Optional, Union, Any
from io import StringIO
from collections.abc import (Iterable, Iterator, Collection, Sequence, Mapping,
MutableMapping)
from collections import abc, ChainMap

from more_itertools import ilen

Expand All @@ -13,40 +12,40 @@
logger = get_logger(__name__)


class NestedMapping(MutableMapping):
class NestedMapping(abc.MutableMapping):
# TODO: improve docstring
"""Dictionary-like structure that supports nested !-bang string keys."""

def __init__(self, new_dict: Optional[Iterable] = None,
def __init__(self, new_dict: Optional[abc.Iterable] = None,
title: Optional[str] = None):
self.dic: MutableMapping[str, Any] = {}
self.dic: abc.MutableMapping[str, Any] = {}
self._title = title
if isinstance(new_dict, MutableMapping):
if isinstance(new_dict, abc.MutableMapping):
self.update(new_dict)
elif isinstance(new_dict, Iterable):
elif isinstance(new_dict, abc.Iterable):
for entry in new_dict:
self.update(entry)

def update(self, new_dict: MutableMapping[str, Any]) -> None:
def update(self, new_dict: abc.MutableMapping[str, Any]) -> None:
if isinstance(new_dict, NestedMapping):
new_dict = new_dict.dic # Avoid updating with another one

# TODO: why do we check for dict here but not in the else?
if isinstance(new_dict, Mapping) and "alias" in new_dict:
if isinstance(new_dict, abc.Mapping) and "alias" in new_dict:
alias = new_dict["alias"]
propdict = new_dict.get("properties", {})
if alias in self.dic:
self.dic[alias] = recursive_update(self.dic[alias], propdict)
else:
self.dic[alias] = propdict
elif isinstance(new_dict, Sequence):
elif isinstance(new_dict, abc.Sequence):
# To catch list of tuples
self.update(dict([new_dict]))
else:
# Catch any bang-string properties keys
to_pop = []
for key in new_dict:
if key.startswith("!"):
if is_bangkey(key):
logger.debug(
"Bang-string key %s was seen in .update. This should "
"not occur outside mocking in testing!", key)
Expand All @@ -60,49 +59,55 @@ def update(self, new_dict: MutableMapping[str, Any]) -> None:

def __getitem__(self, key: str):
"""x.__getitem__(y) <==> x[y]."""
if isinstance(key, str) and key.startswith("!"):
key_chunks = self._split_subkey(key)
entry = self.dic
for chunk in key_chunks:
self._guard_submapping(
entry, key_chunks[:key_chunks.index(chunk)], "get")
try:
entry = entry[chunk]
except KeyError as err:
raise KeyError(key) from err
return entry
return self.dic[key]
if not is_bangkey(key):
return self.dic[key]

key_chunks = self._split_subkey(key)
entry = self.dic
for chunk in key_chunks:
self._guard_submapping(
entry, key_chunks[:key_chunks.index(chunk)], "get")
try:
entry = entry[chunk]
except KeyError as err:
raise KeyError(key) from err

if is_nested_mapping(entry):
return self.__class__(entry)
return entry

def __setitem__(self, key: str, value) -> None:
"""Set self[key] to value."""
if isinstance(key, str) and key.startswith("!"):
*key_chunks, final_key = self._split_subkey(key)
entry = self.dic
for chunk in key_chunks:
if chunk not in entry:
entry[chunk] = {}
entry = entry[chunk]
self._guard_submapping(entry, key_chunks, "set")
entry[final_key] = value
else:
if not is_bangkey(key):
self.dic[key] = value
return

*key_chunks, final_key = self._split_subkey(key)
entry = self.dic
for chunk in key_chunks:
if chunk not in entry:
entry[chunk] = {}
entry = entry[chunk]
self._guard_submapping(entry, key_chunks, "set")
entry[final_key] = value

def __delitem__(self, key: str) -> None:
"""Delete self[key]."""
if isinstance(key, str) and key.startswith("!"):
*key_chunks, final_key = self._split_subkey(key)
entry = self.dic
for chunk in key_chunks:
self._guard_submapping(
entry, key_chunks[:key_chunks.index(chunk)], "del")
try:
entry = entry[chunk]
except KeyError as err:
raise KeyError(key) from err
self._guard_submapping(entry, key_chunks, "del")
del entry[final_key]
else:
if not is_bangkey(key):
del self.dic[key]
return

*key_chunks, final_key = self._split_subkey(key)
entry = self.dic
for chunk in key_chunks:
self._guard_submapping(
entry, key_chunks[:key_chunks.index(chunk)], "del")
try:
entry = entry[chunk]
except KeyError as err:
raise KeyError(key) from err
self._guard_submapping(entry, key_chunks, "del")
del entry[final_key]

@staticmethod
def _split_subkey(key: str):
Expand All @@ -118,7 +123,7 @@ def _guard_submapping(entry, key_chunks, kind: str = "get") -> None:
"set": "overwritten with a new sub-mapping",
"del": "be deleted from"}
submsg = kinds.get(kind, "modified")
if not isinstance(entry, Mapping):
if not isinstance(entry, abc.Mapping):
raise KeyError(
f"Bang-key '!{'.'.join(key_chunks)}' doesn't point to a sub-"
f"mapping but to a single value, which cannot be {submsg}. "
Expand All @@ -127,17 +132,17 @@ def _guard_submapping(entry, key_chunks, kind: str = "get") -> None:
"re-assign a new sub-mapping to the key.")

def _staggered_items(self, key: Union[str, None],
value: Mapping) -> Iterator[tuple[str, Any]]:
value: abc.Mapping) -> abc.Iterator[tuple[str, Any]]:
simple = []
for subkey, subvalue in value.items():
new_key = self._join_subkey(key, subkey)
if isinstance(subvalue, Mapping):
if isinstance(subvalue, abc.Mapping):
yield from self._staggered_items(new_key, subvalue)
else:
simple.append((new_key, subvalue))
yield from simple

def __iter__(self) -> Iterator[str]:
def __iter__(self) -> abc.Iterator[str]:
"""Implement iter(self)."""
yield from (item[0] for item in self._staggered_items(None, self.dic))

Expand All @@ -152,15 +157,15 @@ def _write_subkey(key: str, pre: str, final: bool, stream: TextIO) -> str:
stream.write(f"{newpre}{key}: ")
return newpre

def _write_subitems(self, items: Collection[tuple[str, Any]], pre: str,
def _write_subitems(self, items: abc.Collection[tuple[str, Any]], pre: str,
stream: TextIO, nested: bool = False
) -> list[tuple[str, Any]]:
# TODO: could this (and _write_subdict) use _staggered_items instead??
n_items = len(items)
simple: list[tuple[str, Any]] = []

for i_sub, (key, val) in enumerate(items):
is_super = isinstance(val, Mapping)
is_super = isinstance(val, abc.Mapping)
if not nested or is_super:
final = i_sub == n_items - 1 and not simple
newpre = self._write_subkey(key, pre, final, stream)
Expand All @@ -175,7 +180,7 @@ def _write_subitems(self, items: Collection[tuple[str, Any]], pre: str,

return simple

def _write_subdict(self, subdict: Mapping, stream: TextIO,
def _write_subdict(self, subdict: abc.Mapping, stream: TextIO,
pad: str = "") -> None:
pre = pad.replace("├─", "│ ").replace("└─", " ")
simple = self._write_subitems(subdict.items(), pre, stream, True)
Expand All @@ -202,21 +207,115 @@ def title(self) -> str:
"""Return title if set, or default to class name."""
return self._title or self.__class__.__name__

def _repr_pretty_(self, printer, cycle):
"""For ipython."""
if cycle:
printer.text("NestedMapping(...)")
else:
printer.text(str(self))


class RecursiveNestedMapping(NestedMapping):
"""Like NestedMapping but internally resolves any bang-string values.
In the event of an infinite loop of recursive bang-string keys pointing
back to each other, this should savely and quickly throw a
``RecursionError``.
"""

def __getitem__(self, key: str):
"""x.__getitem__(y) <==> x[y]."""
value = super().__getitem__(key)
while is_bangkey(value):
try:
value = self[value]
except KeyError:
return value
return value

@classmethod
def from_maps(cls, maps, key):
"""Yield instances from maps if key is found."""
for i, mapping in enumerate(maps):
if key in mapping:
# Don't use .get here to avoid chaining empty mappings
yield RecursiveNestedMapping(
mapping[key], title=f"[{i}] mapping")


class NestedChainMap(ChainMap):
"""Subclass of ``collections.ChainMap`` using ``RecursiveNestedMapping``.
Only overrides ``__getitem__`` to allow for both recursive bang-string keys
accross the individual mappings and to "collect" sub-mappings from the same
bang-key in multiple mappings into a new `NestedChainMap`.
Also overrides ``__str__`` and provides a ``_repr_pretty_`` for nice output
to an IPython console.
In the absence of any nested mappings or bang-string keys, this will work
like the base class ``collections.ChainMap``, meaning an ordinary ``dict``
can also be used as one of the individual mappings.
In the event of an infinite loop of recursive bang-string keys pointing
back to each other, this should savely and quickly throw a
``RecursionError``.
"""

def __getitem__(self, key):
"""x.__getitem__(y) <==> x[y]."""
value = super().__getitem__(key)

if isinstance(value, abc.Mapping):
submaps = tuple(RecursiveNestedMapping.from_maps(self.maps, key))
if len(submaps) == 1:
# Don't need the chain if it's just one...
return submaps[0]
return NestedChainMap(*submaps)

if is_bangkey(value):
value = self[value]
return value

def __str__(self):
"""Return str(self)."""
return "\n\n".join(str(mapping) for mapping in self.maps)

def _repr_pretty_(self, printer, cycle):
"""For ipython."""
if cycle:
printer.text("NestedChainMap(...)")
else:
printer.text(str(self))


def is_bangkey(key) -> bool:
"""Return ``True`` if the key is a ``str`` and starts with a "!"."""
return isinstance(key, str) and key.startswith("!")


def is_nested_mapping(mapping) -> bool:
"""Return ``True`` if `mapping` contains any further map as a value."""
if not isinstance(mapping, abc.Mapping):
return False
return any(isinstance(value, abc.Mapping) for value in mapping.values())


def recursive_update(old_dict: MutableMapping, new_dict: Mapping) -> MutableMapping:
def recursive_update(old_dict: abc.MutableMapping,
new_dict: abc.Mapping) -> abc.MutableMapping:
if new_dict is not None:
for key in new_dict:
if old_dict is not None and key in old_dict:
if isinstance(old_dict[key], Mapping):
if isinstance(new_dict[key], Mapping):
if isinstance(old_dict[key], abc.Mapping):
if isinstance(new_dict[key], abc.Mapping):
old_dict[key] = recursive_update(old_dict[key],
new_dict[key])
else:
logger.warning("Overwriting dict %s with non-dict: %s",
old_dict[key], new_dict[key])
old_dict[key] = new_dict[key]
else:
if isinstance(new_dict[key], Mapping):
if isinstance(new_dict[key], abc.Mapping):
logger.warning("Overwriting non-dict %s with dict: %s",
old_dict[key], new_dict[key])
old_dict[key] = new_dict[key]
Expand Down
Loading

0 comments on commit b7fc50f

Please sign in to comment.