diff --git a/src/gramform/tagops.py b/src/gramform/tagops.py new file mode 100644 index 0000000..a5220ad --- /dev/null +++ b/src/gramform/tagops.py @@ -0,0 +1,229 @@ +# -*- coding: utf-8 -*- +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +""" +Data tags +~~~~~~~~~ +Transformations and grammar for operations on data tags. +""" +from dataclasses import dataclass, field +from functools import reduce +from typing import ( + Any, + Callable, + Dict, + Literal, + Mapping, + Optional, + Sequence, + Tuple, +) + +from .grammar import ( + Grammar, + Grouping, + GroupingPool, + LeafInterpreter, + Literalisation, + TransformPool, + TransformPrimitive, +) + + +@dataclass(frozen=True) +class DataTagGrammar(Grammar): + groupings: GroupingPool = GroupingPool( + Grouping(open='(', close=')'), + ) + transforms: TransformPool = field( + default_factory=lambda: TransformPool( + UnionNode(), + IntersectionNode(), + ComplementNode(), + ExclusiveOrNode(), + ) + ) + whitespace: bool = False + default_interpreter: Optional[LeafInterpreter] = field( + default_factory=lambda: TagSelectInterpreter() + ) + default_root_transform: Optional[TransformPrimitive] = field( + default_factory=lambda: ReturnSelected() + ) + + +@dataclass(frozen=True) +class TagSelectInterpreter(LeafInterpreter): + def __call__(self, leaf: str) -> Callable: + def select_by_tag( + tags: Mapping[str, Sequence[str]], + keys: Sequence[str], + ) -> Tuple[Mapping[str, Any], Sequence[str]]: + return tags[leaf], keys + + return select_by_tag + + +@dataclass(frozen=True) +class ReturnSelected(TransformPrimitive): + min_arity: int = 1 + max_arity: int = 1 + priority: int = float('inf') + associative: bool = False + commutative: bool = False + literals: Sequence[Literalisation] = () + + def __call__(self, *pparams, **params) -> Callable: + f = pparams[0] + + def return_selected( + arg: Any, + **datatypes, + ) -> Mapping[str, Any]: + keys = set(datatypes.keys()) + return {k: datatypes[k] for k in f(arg, keys)[0]} + + return return_selected + + +# ---------------------------------- Union --------------------------------- # + + +@dataclass(frozen=True) +class UnionInfixLiteralisation(Literalisation): + affix: Literal['prefix', 'suffix', 'infix', 'leaf'] = 'infix' + regex: str = r'\|' + + def parse_params(self, params: Dict[str, Any]) -> Dict[str, Any]: + return params + + +@dataclass(frozen=True) +class UnionNode(TransformPrimitive): + min_arity: int = 2 + max_arity: int = float('inf') + priority: int = 4 + associative: bool = True + commutative: bool = True + literals: Sequence[Literalisation] = (UnionInfixLiteralisation(),) + + def ascend(self, *pparams, **params) -> Callable: + def union( + arg: Any, + keys: Sequence[str], + ) -> Tuple[Mapping[str, Any], Sequence[str]]: + arg = tuple(set(f(arg, keys)[0]) for f in pparams) + arg = reduce((lambda x, y: x | y), arg) + return arg, keys + + return union + + +# ------------------------------ Intersection ------------------------------ # + + +@dataclass(frozen=True) +class IntersectionInfixLiteralisation(Literalisation): + affix: Literal['prefix', 'suffix', 'infix', 'leaf'] = 'infix' + regex: str = r'\&' + + def parse_params(self, params: Dict[str, Any]) -> Dict[str, Any]: + return params + + +@dataclass(frozen=True) +class IntersectionNode(TransformPrimitive): + min_arity: int = 2 + max_arity: int = float('inf') + priority: int = 2 + associative: bool = True + commutative: bool = True + literals: Sequence[Literalisation] = (IntersectionInfixLiteralisation(),) + + def ascend(self, *pparams, **params) -> Callable: + def intersection( + arg: Any, + keys: Sequence[str], + ) -> Tuple[Mapping[str, Any], Sequence[str]]: + arg = tuple(set(f(arg, keys)[0]) for f in pparams) + print(arg) + arg = reduce((lambda x, y: x & y), arg) + return arg, keys + + return intersection + + +# ------------------------------- Complement ------------------------------- # + + +@dataclass(frozen=True) +class ComplementPrefixExclLiteralisation(Literalisation): + affix: Literal['prefix', 'suffix', 'infix', 'leaf'] = 'prefix' + regex: str = r'\!' + + def parse_params(self, params: Dict[str, Any]) -> Dict[str, Any]: + return params + + +@dataclass(frozen=True) +class ComplementPrefixTildeLiteralisation(Literalisation): + affix: Literal['prefix', 'suffix', 'infix', 'leaf'] = 'prefix' + regex: str = r'\~' + + def parse_params(self, params: Dict[str, Any]) -> Dict[str, Any]: + return params + + +@dataclass(frozen=True) +class ComplementNode(TransformPrimitive): + min_arity: int = 1 + max_arity: int = 1 + priority: int = 1 + literals: Sequence[Literalisation] = ( + ComplementPrefixExclLiteralisation(), + ComplementPrefixTildeLiteralisation(), + ) + + def ascend(self, *pparams, **params) -> Callable: + f = pparams[0] + + def complement( + arg: Any, + keys: Sequence[str], + ) -> Tuple[Mapping[str, Any], Sequence[str]]: + return set(keys) - set(f(arg, keys)[0]), keys + + return complement + + +# ------------------------------ Exclusive Or ------------------------------ # + + +@dataclass(frozen=True) +class ExclusiveOrInfixLiteralisation(Literalisation): + affix: Literal['prefix', 'suffix', 'infix', 'leaf'] = 'infix' + regex: str = r'\^' + + def parse_params(self, params: Dict[str, Any]) -> Dict[str, Any]: + return params + + +@dataclass(frozen=True) +class ExclusiveOrNode(TransformPrimitive): + min_arity: int = 2 + max_arity: int = float('inf') + priority: int = 3 + associative: bool = True + commutative: bool = True + literals: Sequence[Literalisation] = (ExclusiveOrInfixLiteralisation(),) + + def ascend(self, *pparams, **params) -> Callable: + def xor( + arg: Any, + keys: Sequence[str], + ) -> Tuple[Mapping[str, Any], Sequence[str]]: + arg = tuple(set(f(arg, keys)[0]) for f in pparams) + arg = reduce((lambda x, y: x ^ y), arg) + return arg, keys + + return xor diff --git a/tests/test_tagops.py b/tests/test_tagops.py new file mode 100644 index 0000000..7a3a8e4 --- /dev/null +++ b/tests/test_tagops.py @@ -0,0 +1,59 @@ +# -*- coding: utf-8 -*- +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +""" +Unit tests for data tag grammar +""" +from pkg_resources import resource_filename +from gramform.tagops import DataTagGrammar + + +def dataset(): + return { + 'a': 'A', + 'b': 'B', + 'c': 'C', + 'd': 'D', + 'e': 'E', + } + + +def tags(): + return { + 'a': 'a', + 'b': 'b', + 'c': 'c', + 'd': 'd', + 'e': 'e', + 'ab': {'a', 'b'}, + 'bc': {'b', 'c'}, + 'cd': {'c', 'd'}, + 'de': {'d', 'e'}, + 'abc': {'a', 'b', 'c'}, + 'bcd': {'b', 'c', 'd'}, + 'cde': {'c', 'd', 'e'}, + 'abcd': {'a', 'b', 'c', 'd'}, + 'bcde': {'b', 'c', 'd', 'e'}, + 'abcde': {'a', 'b', 'c', 'd', 'e'}, + } + + +def test_tags(): + grammar = DataTagGrammar() + f = grammar.compile('~a&bcd') + assert(set(f(tags(), **dataset()).keys()) == {'b', 'c', 'd'}) + + f = grammar.compile('~bcd|!a') + assert(set(f(tags(), **dataset()).keys()) == {'a', 'b', 'c', 'd', 'e'}) + + f = grammar.compile('~bcd&~a') + assert(set(f(tags(), **dataset()).keys()) == {'e'}) + + g = grammar.compile('~(bcd|a)') + assert( + set(f(tags(), **dataset()).keys()) == + set(g(tags(), **dataset()).keys()) + ) + + f = grammar.compile('~(abc^bcde)^bcd') + assert(set(f(tags(), **dataset()).keys()) == {'d'})