Skip to content

Commit

Permalink
(ENH)(TEST) light grammar for filtering data dicts by tag
Browse files Browse the repository at this point in the history
  • Loading branch information
rciric committed Aug 29, 2023
1 parent 6568cc4 commit 39d7cf8
Show file tree
Hide file tree
Showing 2 changed files with 288 additions and 0 deletions.
229 changes: 229 additions & 0 deletions src/gramform/tagops.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
# -*- coding: utf-8 -*-
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
"""
Data tags
~~~~~~~~~
Transformations and grammar for operations on data tags.
"""
from dataclasses import dataclass, field
from functools import reduce
from typing import (
Any,
Callable,
Dict,
Literal,
Mapping,
Optional,
Sequence,
Tuple,
)

from .grammar import (
Grammar,
Grouping,
GroupingPool,
LeafInterpreter,
Literalisation,
TransformPool,
TransformPrimitive,
)


@dataclass(frozen=True)
class DataTagGrammar(Grammar):
groupings: GroupingPool = GroupingPool(
Grouping(open='(', close=')'),
)
transforms: TransformPool = field(
default_factory=lambda: TransformPool(
UnionNode(),
IntersectionNode(),
ComplementNode(),
ExclusiveOrNode(),
)
)
whitespace: bool = False
default_interpreter: Optional[LeafInterpreter] = field(
default_factory=lambda: TagSelectInterpreter()
)
default_root_transform: Optional[TransformPrimitive] = field(
default_factory=lambda: ReturnSelected()
)


@dataclass(frozen=True)
class TagSelectInterpreter(LeafInterpreter):
def __call__(self, leaf: str) -> Callable:
def select_by_tag(
tags: Mapping[str, Sequence[str]],
keys: Sequence[str],
) -> Tuple[Mapping[str, Any], Sequence[str]]:
return tags[leaf], keys

return select_by_tag


@dataclass(frozen=True)
class ReturnSelected(TransformPrimitive):
min_arity: int = 1
max_arity: int = 1
priority: int = float('inf')
associative: bool = False
commutative: bool = False
literals: Sequence[Literalisation] = ()

def __call__(self, *pparams, **params) -> Callable:
f = pparams[0]

def return_selected(
arg: Any,
**datatypes,
) -> Mapping[str, Any]:
keys = set(datatypes.keys())
return {k: datatypes[k] for k in f(arg, keys)[0]}

return return_selected


# ---------------------------------- Union --------------------------------- #


@dataclass(frozen=True)
class UnionInfixLiteralisation(Literalisation):
affix: Literal['prefix', 'suffix', 'infix', 'leaf'] = 'infix'
regex: str = r'\|'

def parse_params(self, params: Dict[str, Any]) -> Dict[str, Any]:
return params


@dataclass(frozen=True)
class UnionNode(TransformPrimitive):
min_arity: int = 2
max_arity: int = float('inf')
priority: int = 4
associative: bool = True
commutative: bool = True
literals: Sequence[Literalisation] = (UnionInfixLiteralisation(),)

def ascend(self, *pparams, **params) -> Callable:
def union(
arg: Any,
keys: Sequence[str],
) -> Tuple[Mapping[str, Any], Sequence[str]]:
arg = tuple(set(f(arg, keys)[0]) for f in pparams)
arg = reduce((lambda x, y: x | y), arg)
return arg, keys

return union


# ------------------------------ Intersection ------------------------------ #


@dataclass(frozen=True)
class IntersectionInfixLiteralisation(Literalisation):
affix: Literal['prefix', 'suffix', 'infix', 'leaf'] = 'infix'
regex: str = r'\&'

def parse_params(self, params: Dict[str, Any]) -> Dict[str, Any]:
return params


@dataclass(frozen=True)
class IntersectionNode(TransformPrimitive):
min_arity: int = 2
max_arity: int = float('inf')
priority: int = 2
associative: bool = True
commutative: bool = True
literals: Sequence[Literalisation] = (IntersectionInfixLiteralisation(),)

def ascend(self, *pparams, **params) -> Callable:
def intersection(
arg: Any,
keys: Sequence[str],
) -> Tuple[Mapping[str, Any], Sequence[str]]:
arg = tuple(set(f(arg, keys)[0]) for f in pparams)
print(arg)
arg = reduce((lambda x, y: x & y), arg)
return arg, keys

return intersection


# ------------------------------- Complement ------------------------------- #


@dataclass(frozen=True)
class ComplementPrefixExclLiteralisation(Literalisation):
affix: Literal['prefix', 'suffix', 'infix', 'leaf'] = 'prefix'
regex: str = r'\!'

def parse_params(self, params: Dict[str, Any]) -> Dict[str, Any]:
return params


@dataclass(frozen=True)
class ComplementPrefixTildeLiteralisation(Literalisation):
affix: Literal['prefix', 'suffix', 'infix', 'leaf'] = 'prefix'
regex: str = r'\~'

def parse_params(self, params: Dict[str, Any]) -> Dict[str, Any]:
return params


@dataclass(frozen=True)
class ComplementNode(TransformPrimitive):
min_arity: int = 1
max_arity: int = 1
priority: int = 1
literals: Sequence[Literalisation] = (
ComplementPrefixExclLiteralisation(),
ComplementPrefixTildeLiteralisation(),
)

def ascend(self, *pparams, **params) -> Callable:
f = pparams[0]

def complement(
arg: Any,
keys: Sequence[str],
) -> Tuple[Mapping[str, Any], Sequence[str]]:
return set(keys) - set(f(arg, keys)[0]), keys

return complement


# ------------------------------ Exclusive Or ------------------------------ #


@dataclass(frozen=True)
class ExclusiveOrInfixLiteralisation(Literalisation):
affix: Literal['prefix', 'suffix', 'infix', 'leaf'] = 'infix'
regex: str = r'\^'

def parse_params(self, params: Dict[str, Any]) -> Dict[str, Any]:
return params


@dataclass(frozen=True)
class ExclusiveOrNode(TransformPrimitive):
min_arity: int = 2
max_arity: int = float('inf')
priority: int = 3
associative: bool = True
commutative: bool = True
literals: Sequence[Literalisation] = (ExclusiveOrInfixLiteralisation(),)

def ascend(self, *pparams, **params) -> Callable:
def xor(
arg: Any,
keys: Sequence[str],
) -> Tuple[Mapping[str, Any], Sequence[str]]:
arg = tuple(set(f(arg, keys)[0]) for f in pparams)
arg = reduce((lambda x, y: x ^ y), arg)
return arg, keys

return xor
59 changes: 59 additions & 0 deletions tests/test_tagops.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# -*- coding: utf-8 -*-
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
"""
Unit tests for data tag grammar
"""
from pkg_resources import resource_filename
from gramform.tagops import DataTagGrammar


def dataset():
return {
'a': 'A',
'b': 'B',
'c': 'C',
'd': 'D',
'e': 'E',
}


def tags():
return {
'a': 'a',
'b': 'b',
'c': 'c',
'd': 'd',
'e': 'e',
'ab': {'a', 'b'},
'bc': {'b', 'c'},
'cd': {'c', 'd'},
'de': {'d', 'e'},
'abc': {'a', 'b', 'c'},
'bcd': {'b', 'c', 'd'},
'cde': {'c', 'd', 'e'},
'abcd': {'a', 'b', 'c', 'd'},
'bcde': {'b', 'c', 'd', 'e'},
'abcde': {'a', 'b', 'c', 'd', 'e'},
}


def test_tags():
grammar = DataTagGrammar()
f = grammar.compile('~a&bcd')
assert(set(f(tags(), **dataset()).keys()) == {'b', 'c', 'd'})

f = grammar.compile('~bcd|!a')
assert(set(f(tags(), **dataset()).keys()) == {'a', 'b', 'c', 'd', 'e'})

f = grammar.compile('~bcd&~a')
assert(set(f(tags(), **dataset()).keys()) == {'e'})

g = grammar.compile('~(bcd|a)')
assert(
set(f(tags(), **dataset()).keys()) ==
set(g(tags(), **dataset()).keys())
)

f = grammar.compile('~(abc^bcde)^bcd')
assert(set(f(tags(), **dataset()).keys()) == {'d'})

0 comments on commit 39d7cf8

Please sign in to comment.